Building Data Pipelines
Learn to design and build reliable data pipelines for production use.
Pipeline Architecture
The Three Layers
graph TD
A["INGESTION LAYER<br/>━━━━━━━━━<br/>Extract from sources<br/>SQL • APIs • Files • Streams"] --> B
B["TRANSFORMATION LAYER<br/>━━━━━━━━━<br/>Clean • Validate • Enrich • Aggregate"] --> C
C["PRESENTATION LAYER<br/>━━━━━━━━━<br/>Reports • Dashboards • Alerts"]
style A fill:#1f97d4,stroke:#0b5394,stroke-width:2px,color:#fff
style B fill:#ff9900,stroke:#ec7211,stroke-width:2px,color:#fff
style C fill:#37475a,stroke:#1f1f1f,stroke-width:2px,color:#fff
Each layer has specific jobs and quality standards.
Layer 1: Ingestion
Extract Data from Sources
# SQL databases
def extract_from_postgres():
query = "SELECT * FROM customers WHERE active = true"
return pd.read_sql(query, postgres_conn)
# APIs
def extract_from_api():
response = requests.get('https://api.external.com/data')
return pd.DataFrame(response.json())
# Files
def extract_from_csv():
return pd.read_csv('s3://bucket/data.csv')
# Messages
def extract_from_kafka():
# Stream real-time events
for message in kafka_consumer:
yield json.loads(message.value)
Ingestion Quality Checks
def validate_ingestion(data):
"""Ensure data was extracted correctly"""
checks = {
'not_empty': len(data) > 0,
'all_columns': all(col in data.columns for col in EXPECTED_COLS),
'no_duplicates': data.duplicated().sum() == 0,
'recent_data': data['created_at'].max() > pd.Timestamp.now() - pd.Timedelta(days=1)
}
failed = [k for k, v in checks.items() if not v]
if failed:
raise ValueError(f"Ingestion validation failed: {failed}")
return True
# Use in pipeline
def ingest_step():
data = extract_from_source()
validate_ingestion(data)
return data
Layer 2: Transformation
Clean Data
def clean_data(data):
"""Remove bad data"""
# Remove duplicates
data = data.drop_duplicates()
# Remove rows with nulls in critical columns
data = data.dropna(subset=['customer_id', 'email'])
# Fix data types
data['revenue'] = pd.to_numeric(data['revenue'], errors='coerce')
data['created_at'] = pd.to_datetime(data['created_at'])
# Remove outliers
data = data[data['revenue'] <= 1000000] # Unrealistic values
return data
Validate Data Quality
def validate_data_quality(data):
"""Check transformed data meets standards"""
rules = {
'completeness': data.isnull().sum() / len(data) < 0.05, # <5% nulls
'revenue_positive': (data['revenue'] > 0).sum() / len(data) > 0.95, # >95% positive
'date_order': data['created_at'] <= data['last_purchase'], # Logical order
'unique_ids': data.duplicated(subset=['customer_id']).sum() == 0, # No duplicates
}
results = {k: v.all() if hasattr(v, 'all') else v for k, v in rules.items()}
if not all(results.values()):
raise ValueError(f"Quality validation failed: {results}")
return results
Enrich Data
def enrich_data(data):
"""Add value to data"""
# Join with other data
data = data.merge(
get_customer_segments(),
on='customer_id',
how='left'
)
# Calculate derived fields
data['lifetime_value'] = (
data['revenue'] * data['months_active']
)
# Add lookups
data['region_name'] = data['region_code'].map(REGION_LOOKUP)
# Add timestamps
data['processed_at'] = pd.Timestamp.now()
return data
Aggregate Data
def aggregate_data(data):
"""Summarize to appropriate level"""
# Daily summary
daily = data.groupby('date').agg({
'revenue': ['sum', 'mean'],
'orders': 'count',
'customers': 'nunique'
}).reset_index()
# By region
by_region = data.groupby('region').agg({
'revenue': 'sum',
'avg_order_value': 'mean'
}).reset_index()
# Pivot for analysis
pivot = data.pivot_table(
values='revenue',
index='date',
columns='region',
aggfunc='sum'
)
return {
'daily': daily,
'by_region': by_region,
'pivot': pivot
}
Layer 3: Presentation
Load to Destinations
def load_to_destinations(data):
"""Put clean data where users can access it"""
# Production database
data.to_sql(
'customer_metrics',
production_db,
if_exists='replace',
index=False
)
# Data warehouse
data.to_parquet('s3://warehouse/customer_metrics/')
# For reports
data.to_csv('s3://reports/customer_metrics_latest.csv')
# Cache for dashboards
cache.set('customer_metrics', data.to_json())
Generate Reports
def generate_report(data):
"""Create human-readable report"""
report = f"""
Customer Metrics Report
Generated: {pd.Timestamp.now()}
Total Customers: {len(data)}
Total Revenue: ${data['revenue'].sum():,.0f}
Average Order Value: ${data['revenue'].mean():.2f}
Top 5 Regions by Revenue:
{data.groupby('region')['revenue'].sum().nlargest(5).to_string()}
Alerts:
- {len(data[data['revenue'] < 0])} rows with negative revenue
- {data['created_at'].isnull().sum()} rows missing dates
"""
return report
# Send to stakeholders
report = generate_report(processed_data)
send_email(to='team@company.com', subject='Daily Metrics', body=report)
Complete Pipeline Example
Daily Customer Metrics Pipeline
# ORCHESTRATION: Schedule daily at 9 AM
def run_daily_pipeline():
"""Complete data pipeline"""
# INGESTION LAYER
print("Step 1: Extracting data...")
raw_customers = extract_from_postgres()
raw_orders = extract_from_api()
validate_ingestion(raw_customers)
validate_ingestion(raw_orders)
# TRANSFORMATION LAYER
print("Step 2: Cleaning data...")
clean_customers = clean_data(raw_customers)
clean_orders = clean_data(raw_orders)
print("Step 3: Validating quality...")
validate_data_quality(clean_customers)
validate_data_quality(clean_orders)
print("Step 4: Enriching data...")
merged = clean_customers.merge(
clean_orders,
on='customer_id',
how='left'
)
enriched = enrich_data(merged)
print("Step 5: Aggregating...")
aggregated = aggregate_data(enriched)
# PRESENTATION LAYER
print("Step 6: Loading to destinations...")
load_to_destinations(aggregated['daily'])
print("Step 7: Generating reports...")
report = generate_report(aggregated['daily'])
send_email('team@company.com', report)
print("✓ Pipeline completed successfully")
# Result: 9:15 AM - Team has fresh data and reports!
Error Handling
Graceful Failures
def pipeline_with_retry(max_retries=3):
"""Retry failed steps"""
for attempt in range(max_retries):
try:
data = extract_from_source()
validate_ingestion(data)
break # Success
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 1s, 2s, 4s
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
raise # Give up after max retries
# In Orchestration, set retry policy:
# max_retries: 3
# wait_between: exponential backoff
Alerting on Failures
def run_pipeline_with_alerts():
"""Alert stakeholders if pipeline fails"""
try:
run_daily_pipeline()
except Exception as e:
# Pipeline failed
alert = {
'severity': 'critical',
'title': 'Daily Pipeline Failed',
'error': str(e),
'time': pd.Timestamp.now()
}
# Send alerts to different channels
send_email('ops@company.com', json.dumps(alert))
send_slack('#data-alerts', alert)
send_pagerduty(alert)
# And fail the pipeline step
raise
Monitoring Pipeline Health
Track Pipeline Metrics
from credvault import metrics
def monitored_pipeline():
"""Pipeline with monitoring"""
# Track execution time
start = time.time()
try:
data = extract()
metrics.gauge('extraction_rows', len(data))
clean = clean_data(data)
metrics.gauge('rows_after_cleaning', len(clean))
metrics.gauge('rows_removed', len(data) - len(clean))
validated = validate_quality(clean)
metrics.counter('validation_passed', 1)
load_data(validated)
metrics.counter('successful_loads', 1)
except Exception as e:
metrics.counter('pipeline_failures', 1)
raise
finally:
duration = time.time() - start
metrics.histogram('pipeline_duration_seconds', duration)
print(f"Pipeline took {duration:.1f} seconds")
Check Pipeline Status
In monitoring dashboard:
Daily Customer Pipeline:
- Status: Running
- Last run: 2024-06-12 09:15 UTC
- Duration: 4 minutes 23 seconds
- Rows extracted: 125,432
- Rows cleaned: 124,998 (434 removed)
- Validation: PASSED
- Load: SUCCESS
- Next run: Tomorrow 09:00 UTC
Best Practices
Design Principles
✓ Idempotent: Running twice gives same result ✓ Atomic: Either all succeeds or all fails ✓ Observable: You can see what happened ✓ Resilient: Handles failures gracefully ✓ Scalable: Handles growth over time
Avoid Common Pitfalls
❌ Don't hardcode paths and credentials ✓ Do use configuration files and secrets
❌ Don't skip validation ✓ Do validate at every step
❌ Don't assume source data is perfect ✓ Do handle edge cases and errors
❌ Don't lose track of data lineage ✓ Do document data flow and transformations
Related Topics
- Orchestration - Schedule pipelines
- Lineage - Track data flow
- Metadata - Document data
- Data Workflow Guide - See big picture