Orchestration

Orchestrate complex data workflows. Schedule jobs, manage dependencies, and automate data processing pipelines.

Getting Started

What is Orchestration?

Orchestration lets you:

  • Define data pipelines with multiple steps
  • Schedule automatic execution (hourly, daily, weekly)
  • Handle dependencies between steps
  • Monitor job execution
  • Retry on failures

Creating Your First Pipeline

  1. Click Orchestration in sidebar
  2. Click New Pipeline
  3. Define workflow steps
  4. Set schedule and triggers

Defining Workflows

Pipeline Steps

Each step is a task that:

  • Queries your database
  • Processes data
  • Runs a function
  • Triggers another step

Step Types

Query Steps - Run SQL or code:

-- Extract users from database
SELECT user_id, created_at FROM users 
WHERE created_at > CURRENT_DATE - INTERVAL '7 days'

Transform Steps - Process data:

# Aggregate data
users_per_day = data.groupby('created_at').size()

Function Steps - Call your code:

def send_notifications(users):
    for user in users:
        send_email(user.email, "Weekly Report")

Export Steps - Output results:

# Save to database
results.to_credvault(collection='reports')

# Send to external system
push_to_slack(channel='#data-team', message=summary)

Scheduling Pipelines

Cron Expressions

Define when pipelines run:

0 9 * * * ........... Every day at 9 AM
0 */6 * * * ......... Every 6 hours
0 0 * * MON ......... Every Monday at midnight
0 0 1 * * ........... First day of every month

Triggers

Pipelines run based on:

  • Schedule - Cron expression
  • Event - Database change, webhook, external event
  • Manual - User clicks Run
  • On-demand - Via API call

Monitoring Pipelines

View Execution History

See all pipeline runs:

  • Start and end time
  • Status (success, failed, running)
  • Duration
  • Output results
  • Error messages

Pipeline Logs

Click any run to see detailed logs:

2024-06-12 09:00:00 - Pipeline started
2024-06-12 09:00:01 - Step 1: Extract data (127 records)
2024-06-12 09:00:05 - Step 2: Transform data (127 → 98 after filtering)
2024-06-12 09:00:08 - Step 3: Load to database (98 inserted)
2024-06-12 09:00:09 - Pipeline completed successfully

Debugging Failed Runs

  1. Click failed run
  2. See error message
  3. Check logs
  4. Fix and rerun

Error Handling

Retry Policies

Configure what happens on failure:

Max retries: 3
Wait between retries: 60 seconds
On final failure: Send alert

Conditional Steps

Skip or branch based on results:

if records_count > 0:
    # Process records
    send_notification(f"Processed {records_count}")
else:
    # No data, skip processing
    log_message("No new data")

Dependencies and Ordering

Sequential Steps

Steps run one after another:

1. Extract data from database
   ↓
2. Validate data quality
   ↓
3. Transform and aggregate
   ↓
4. Load to destination

Parallel Steps

Steps run simultaneously:

Step 1: Extract from source A
Step 2: Extract from source B  (both run at same time)
   ↓
Step 3: Combine and transform

Common Workflows

Daily Report Generation

Every day at 9 AM:
1. Query sales data
2. Calculate totals and trends
3. Generate visualizations
4. Send to stakeholders via email

Data Quality Checks

Every 6 hours:
1. Count records in each table
2. Check for null values
3. Verify data types
4. Alert if anomalies found

ETL Pipeline

Every night at 2 AM:
1. Extract from external API
2. Transform to standard format
3. Load to database
4. Update reports

Advanced Features

Notifications

Receive alerts:

  • Email on success/failure
  • Slack messages
  • PagerDuty alerts
  • Custom webhooks

Resource Limits

Prevent runaway jobs:

Max duration: 1 hour
Max memory: 4 GB
Max concurrent runs: 3

Versioning

Track pipeline changes:

  • Save versions
  • Compare versions
  • Rollback to previous
  • View change history

Best Practices

Pipeline Design

  • Keep steps focused and single-purpose
  • Use meaningful step names
  • Add comments explaining complex logic
  • Test with small datasets first

Monitoring

  • Set up alerts for failures
  • Review logs regularly
  • Monitor execution time trends
  • Track resource usage

Maintenance

  • Archive old pipelines
  • Update schedules seasonally
  • Review and optimize slow steps
  • Keep error handling up to date

Troubleshooting

Pipeline runs too long

  1. Check individual step times
  2. Identify slowest step
  3. Optimize that step:
    • Add database indexes
    • Reduce dataset size
    • Parallelize if possible

Out of memory errors

  1. Process data in batches
  2. Delete intermediate results
  3. Use streaming where possible
  4. Increase resource limits

Jobs not running on schedule

  1. Verify schedule is correct
  2. Check timezone settings
  3. Look for previous failed run blocking next
  4. Restart orchestration service if needed