Building Data Pipelines

Learn to design and build reliable data pipelines for production use.

Pipeline Architecture

The Three Layers

graph TD
    A["INGESTION LAYER<br/>━━━━━━━━━<br/>Extract from sources<br/>SQL • APIs • Files • Streams"] --> B
    B["TRANSFORMATION LAYER<br/>━━━━━━━━━<br/>Clean • Validate • Enrich • Aggregate"] --> C
    C["PRESENTATION LAYER<br/>━━━━━━━━━<br/>Reports • Dashboards • Alerts"]
    
    style A fill:#1f97d4,stroke:#0b5394,stroke-width:2px,color:#fff
    style B fill:#ff9900,stroke:#ec7211,stroke-width:2px,color:#fff
    style C fill:#37475a,stroke:#1f1f1f,stroke-width:2px,color:#fff

Each layer has specific jobs and quality standards.

Layer 1: Ingestion

Extract Data from Sources

# SQL databases
def extract_from_postgres():
    query = "SELECT * FROM customers WHERE active = true"
    return pd.read_sql(query, postgres_conn)

# APIs
def extract_from_api():
    response = requests.get('https://api.external.com/data')
    return pd.DataFrame(response.json())

# Files
def extract_from_csv():
    return pd.read_csv('s3://bucket/data.csv')

# Messages
def extract_from_kafka():
    # Stream real-time events
    for message in kafka_consumer:
        yield json.loads(message.value)

Ingestion Quality Checks

def validate_ingestion(data):
    """Ensure data was extracted correctly"""
    
    checks = {
        'not_empty': len(data) > 0,
        'all_columns': all(col in data.columns for col in EXPECTED_COLS),
        'no_duplicates': data.duplicated().sum() == 0,
        'recent_data': data['created_at'].max() > pd.Timestamp.now() - pd.Timedelta(days=1)
    }
    
    failed = [k for k, v in checks.items() if not v]
    
    if failed:
        raise ValueError(f"Ingestion validation failed: {failed}")
    
    return True

# Use in pipeline
def ingest_step():
    data = extract_from_source()
    validate_ingestion(data)
    return data

Layer 2: Transformation

Clean Data

def clean_data(data):
    """Remove bad data"""
    
    # Remove duplicates
    data = data.drop_duplicates()
    
    # Remove rows with nulls in critical columns
    data = data.dropna(subset=['customer_id', 'email'])
    
    # Fix data types
    data['revenue'] = pd.to_numeric(data['revenue'], errors='coerce')
    data['created_at'] = pd.to_datetime(data['created_at'])
    
    # Remove outliers
    data = data[data['revenue'] <= 1000000]  # Unrealistic values
    
    return data

Validate Data Quality

def validate_data_quality(data):
    """Check transformed data meets standards"""
    
    rules = {
        'completeness': data.isnull().sum() / len(data) < 0.05,  # <5% nulls
        'revenue_positive': (data['revenue'] > 0).sum() / len(data) > 0.95,  # >95% positive
        'date_order': data['created_at'] <= data['last_purchase'],  # Logical order
        'unique_ids': data.duplicated(subset=['customer_id']).sum() == 0,  # No duplicates
    }
    
    results = {k: v.all() if hasattr(v, 'all') else v for k, v in rules.items()}
    
    if not all(results.values()):
        raise ValueError(f"Quality validation failed: {results}")
    
    return results

Enrich Data

def enrich_data(data):
    """Add value to data"""
    
    # Join with other data
    data = data.merge(
        get_customer_segments(),
        on='customer_id',
        how='left'
    )
    
    # Calculate derived fields
    data['lifetime_value'] = (
        data['revenue'] * data['months_active']
    )
    
    # Add lookups
    data['region_name'] = data['region_code'].map(REGION_LOOKUP)
    
    # Add timestamps
    data['processed_at'] = pd.Timestamp.now()
    
    return data

Aggregate Data

def aggregate_data(data):
    """Summarize to appropriate level"""
    
    # Daily summary
    daily = data.groupby('date').agg({
        'revenue': ['sum', 'mean'],
        'orders': 'count',
        'customers': 'nunique'
    }).reset_index()
    
    # By region
    by_region = data.groupby('region').agg({
        'revenue': 'sum',
        'avg_order_value': 'mean'
    }).reset_index()
    
    # Pivot for analysis
    pivot = data.pivot_table(
        values='revenue',
        index='date',
        columns='region',
        aggfunc='sum'
    )
    
    return {
        'daily': daily,
        'by_region': by_region,
        'pivot': pivot
    }

Layer 3: Presentation

Load to Destinations

def load_to_destinations(data):
    """Put clean data where users can access it"""
    
    # Production database
    data.to_sql(
        'customer_metrics',
        production_db,
        if_exists='replace',
        index=False
    )
    
    # Data warehouse
    data.to_parquet('s3://warehouse/customer_metrics/')
    
    # For reports
    data.to_csv('s3://reports/customer_metrics_latest.csv')
    
    # Cache for dashboards
    cache.set('customer_metrics', data.to_json())

Generate Reports

def generate_report(data):
    """Create human-readable report"""
    
    report = f"""
    Customer Metrics Report
    Generated: {pd.Timestamp.now()}
    
    Total Customers: {len(data)}
    Total Revenue: ${data['revenue'].sum():,.0f}
    Average Order Value: ${data['revenue'].mean():.2f}
    
    Top 5 Regions by Revenue:
    {data.groupby('region')['revenue'].sum().nlargest(5).to_string()}
    
    Alerts:
    - {len(data[data['revenue'] < 0])} rows with negative revenue
    - {data['created_at'].isnull().sum()} rows missing dates
    """
    
    return report

# Send to stakeholders
report = generate_report(processed_data)
send_email(to='team@company.com', subject='Daily Metrics', body=report)

Complete Pipeline Example

Daily Customer Metrics Pipeline

# ORCHESTRATION: Schedule daily at 9 AM

def run_daily_pipeline():
    """Complete data pipeline"""
    
    # INGESTION LAYER
    print("Step 1: Extracting data...")
    raw_customers = extract_from_postgres()
    raw_orders = extract_from_api()
    validate_ingestion(raw_customers)
    validate_ingestion(raw_orders)
    
    # TRANSFORMATION LAYER
    print("Step 2: Cleaning data...")
    clean_customers = clean_data(raw_customers)
    clean_orders = clean_data(raw_orders)
    
    print("Step 3: Validating quality...")
    validate_data_quality(clean_customers)
    validate_data_quality(clean_orders)
    
    print("Step 4: Enriching data...")
    merged = clean_customers.merge(
        clean_orders,
        on='customer_id',
        how='left'
    )
    enriched = enrich_data(merged)
    
    print("Step 5: Aggregating...")
    aggregated = aggregate_data(enriched)
    
    # PRESENTATION LAYER
    print("Step 6: Loading to destinations...")
    load_to_destinations(aggregated['daily'])
    
    print("Step 7: Generating reports...")
    report = generate_report(aggregated['daily'])
    send_email('team@company.com', report)
    
    print("✓ Pipeline completed successfully")

# Result: 9:15 AM - Team has fresh data and reports!

Error Handling

Graceful Failures

def pipeline_with_retry(max_retries=3):
    """Retry failed steps"""
    
    for attempt in range(max_retries):
        try:
            data = extract_from_source()
            validate_ingestion(data)
            break  # Success
        except Exception as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt  # 1s, 2s, 4s
                print(f"Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                raise  # Give up after max retries

# In Orchestration, set retry policy:
# max_retries: 3
# wait_between: exponential backoff

Alerting on Failures

def run_pipeline_with_alerts():
    """Alert stakeholders if pipeline fails"""
    
    try:
        run_daily_pipeline()
    except Exception as e:
        # Pipeline failed
        alert = {
            'severity': 'critical',
            'title': 'Daily Pipeline Failed',
            'error': str(e),
            'time': pd.Timestamp.now()
        }
        
        # Send alerts to different channels
        send_email('ops@company.com', json.dumps(alert))
        send_slack('#data-alerts', alert)
        send_pagerduty(alert)
        
        # And fail the pipeline step
        raise

Monitoring Pipeline Health

Track Pipeline Metrics

from credvault import metrics

def monitored_pipeline():
    """Pipeline with monitoring"""
    
    # Track execution time
    start = time.time()
    
    try:
        data = extract()
        metrics.gauge('extraction_rows', len(data))
        
        clean = clean_data(data)
        metrics.gauge('rows_after_cleaning', len(clean))
        metrics.gauge('rows_removed', len(data) - len(clean))
        
        validated = validate_quality(clean)
        metrics.counter('validation_passed', 1)
        
        load_data(validated)
        metrics.counter('successful_loads', 1)
        
    except Exception as e:
        metrics.counter('pipeline_failures', 1)
        raise
    
    finally:
        duration = time.time() - start
        metrics.histogram('pipeline_duration_seconds', duration)
        print(f"Pipeline took {duration:.1f} seconds")

Check Pipeline Status

In monitoring dashboard:

Daily Customer Pipeline:
- Status: Running
- Last run: 2024-06-12 09:15 UTC
- Duration: 4 minutes 23 seconds
- Rows extracted: 125,432
- Rows cleaned: 124,998 (434 removed)
- Validation: PASSED
- Load: SUCCESS
- Next run: Tomorrow 09:00 UTC

Best Practices

Design Principles

Idempotent: Running twice gives same result ✓ Atomic: Either all succeeds or all fails ✓ Observable: You can see what happened ✓ Resilient: Handles failures gracefully ✓ Scalable: Handles growth over time

Avoid Common Pitfalls

Don't hardcode paths and credentials ✓ Do use configuration files and secrets

Don't skip validation ✓ Do validate at every step

Don't assume source data is perfect ✓ Do handle edge cases and errors

Don't lose track of data lineage ✓ Do document data flow and transformations