Your First Pipeline
This comprehensive tutorial will guide you through building a real-world data processing pipeline with ShedBoxAI, including AI integration and advanced features.
What We'll Build
A customer analysis pipeline that:
- Loads customer and transaction data
- Filters and analyzes the data
- Generates AI-powered insights
- Creates a formatted report
Prerequisites
- ShedBoxAI installed (Installation Guide)
- Basic understanding of YAML syntax
- Optional: OpenAI API key for AI features
Project Setup
Create a new project directory:
mkdir my-first-pipeline
cd my-first-pipeline
mkdir data output
Step 1: Prepare Sample Data
Customer Data (data/customers.csv
)
id,name,age,city,segment,lifetime_value
1,John Smith,34,New York,Premium,2500.00
2,Sarah Johnson,28,Los Angeles,Standard,1200.00
3,Mike Brown,45,Chicago,Premium,3200.00
4,Lisa Davis,31,Miami,Standard,800.00
5,Tom Wilson,52,Seattle,Enterprise,5500.00
Transaction Data (data/transactions.json
)
[
{"customer_id": 1, "amount": 250.00, "date": "2024-01-15", "category": "Software"},
{"customer_id": 2, "amount": 120.00, "date": "2024-01-16", "category": "Books"},
{"customer_id": 1, "amount": 450.00, "date": "2024-01-18", "category": "Software"},
{"customer_id": 3, "amount": 800.00, "date": "2024-01-20", "category": "Consulting"},
{"customer_id": 5, "amount": 1200.00, "date": "2024-01-22", "category": "Enterprise"}
]
Step 2: Basic Pipeline Configuration
Create pipeline.yaml
:
# Data Sources
data_sources:
customers:
type: csv
path: data/customers.csv
transactions:
type: json
path: data/transactions.json
# Basic Processing
processing:
# Filter high-value customers
contextual_filtering:
customers:
- field: lifetime_value
condition: "> 1000"
new_name: "high_value_customers"
# Calculate transaction statistics
content_summarization:
transactions:
method: statistical
fields: ["amount"]
summarize: ["mean", "sum", "count", "max"]
# Output Results
output:
type: file
path: output/basic_results.json
format: json
Test Basic Pipeline
shedboxai run pipeline.yaml -v
Step 3: Add Advanced Operations
Enhance the pipeline with data relationships and advanced operations:
# Data Sources (same as above)
data_sources:
customers:
type: csv
path: data/customers.csv
transactions:
type: json
path: data/transactions.json
# Advanced Processing
processing:
# Filter valuable customers
contextual_filtering:
customers:
- field: lifetime_value
condition: "> 1000"
new_name: "valuable_customers"
# Link customers with their transactions
relationship_highlighting:
valuable_customers:
link_fields:
- source: "valuable_customers"
match_on: "id"
to: "transactions"
target_field: "customer_id"
link_name: "customer_transactions"
# Analyze spending by customer segment
advanced_operations:
valuable_customers:
source: "valuable_customers"
group_by: "segment"
aggregate:
customer_count: "COUNT(*)"
avg_lifetime_value: "AVG(lifetime_value)"
total_value: "SUM(lifetime_value)"
sort: "-total_value"
# Create formatted customer profiles
format_conversion:
valuable_customers:
template: |
## {{item.name}} (ID: {{item.id}})
- **Age**: {{item.age}} years
- **Location**: {{item.city}}
- **Segment**: {{upper(item.segment)}}
- **Lifetime Value**: ${{item.lifetime_value}}
- **Transactions**: {{count(item.customer_transactions)}} orders
output:
type: file
path: output/advanced_results.json
format: json
Test Advanced Pipeline
shedboxai run pipeline.yaml -o output/advanced_analysis.json -v
Step 4: Add AI Integration
Add AI-powered insights (requires OpenAI API key):
Environment Setup
Create .env
file:
OPENAI_API_KEY=your_openai_api_key_here
Enhanced Configuration
# Data Sources (same as above)
data_sources:
customers:
type: csv
path: data/customers.csv
transactions:
type: json
path: data/transactions.json
# Processing with AI
processing:
# Previous operations...
contextual_filtering:
customers:
- field: lifetime_value
condition: "> 1000"
new_name: "valuable_customers"
advanced_operations:
valuable_customers:
source: "valuable_customers"
group_by: "segment"
aggregate:
customer_count: "COUNT(*)"
avg_lifetime_value: "AVG(lifetime_value)"
total_value: "SUM(lifetime_value)"
sort: "-total_value"
# AI Interface Configuration
ai_interface:
model:
type: rest
url: https://api.openai.com/v1/chat/completions
method: POST
headers:
Authorization: Bearer ${OPENAI_API_KEY}
Content-Type: application/json
options:
model: gpt-4
temperature: 0.7
max_tokens: 1500
prompts:
customer_insights:
system: "You are a senior business analyst specializing in customer analytics."
user_template: |
# Customer Analysis Data
## Customer Segments
{% for segment in segment_analysis %}
- **{{segment.segment}}**: {{segment.customer_count}} customers,
Avg Value: ${{segment.avg_lifetime_value}},
Total: ${{segment.total_value}}
{% endfor %}
## High-Value Customers
Total valuable customers: {{count(valuable_customers)}}
Based on this data, please provide:
1. Key insights about customer segments
2. Recommendations for customer retention
3. Growth opportunities
Format as a professional business report.
response_format: markdown
temperature: 0.3
output:
type: file
path: output/ai_insights.json
format: json
Run AI-Enhanced Pipeline
shedboxai run pipeline.yaml -o output/complete_analysis.json -v
Step 5: Understanding the Results
Output Structure
Your results will contain:
- Filtered customer data
- Statistical analysis
- Segment analysis
- Customer profiles
- AI-generated insights
Sample AI Output
# Customer Analysis Report
## Key Insights
1. **Premium Segment Dominance**: Premium customers represent the highest value...
2. **Geographic Distribution**: Strong presence in major metropolitan areas...
3. **Age Demographics**: Core customer base in 30-45 age range...
## Recommendations
1. **Retention Strategy**: Focus on Premium segment with personalized offerings...
2. **Expansion Opportunity**: Target similar demographics in untapped cities...
Key Learning Points
1. Progressive Enhancement
Start simple and add complexity:
- Basic filtering → Advanced operations → AI integration
2. Data Flow
Understand how data flows through operations:
Raw Data → Filter → Link → Aggregate → Format → AI Analysis
3. Expression Power
Use expressions for dynamic filtering:
condition: "lifetime_value > avg(map(customers, 'lifetime_value'))"
4. Template Flexibility
Create rich formatted output:
template: "{{item.name}} (${{item.value|currency}})"
Common Patterns
Error Handling
# Run with verbose logging
shedboxai run pipeline.yaml -v
# Continue on errors (for development)
shedboxai run pipeline.yaml --skip-errors
Incremental Development
# Test each step
shedboxai run basic_pipeline.yaml
shedboxai run advanced_pipeline.yaml
shedboxai run ai_pipeline.yaml
Output Formats
# JSON for programmatic use
output:
type: file
path: output/data.json
format: json
# YAML for readability
output:
type: file
path: output/data.yaml
format: yaml
Next Steps
Now that you've built your first pipeline, explore:
- Configuration Reference - Complete configuration options
- Operations Guide - All available operations
- Introspection - Analyze your data sources
- Examples - Real-world use cases
- CLI Reference - All command options
Troubleshooting
If you encounter issues:
- Check your YAML syntax - Use a YAML validator
- Verify file paths - Ensure data files exist
- Check environment variables - Verify API keys are set
- Use verbose mode - Add
-v
flag for detailed logs - Review Troubleshooting Guide
Congratulations! You've built a complete data processing pipeline with ShedBoxAI. 🎉