Architecture
Last Updated: 2025-10-30 Related Diagrams: See Architecture Diagrams section
Overview
Prefect-orchestrated data pipeline with modular Python components for extracting, cleaning, and loading rocket data into Neo4j. Includes Cloudinary CDN integration for optimized image delivery.
System Design
┌─────────────────────────────────────────────────────────────┐
│ Self-hosted Prefect 3 on ECS │
│ Scheduling, Monitoring, Flow Orchestration │
└──────────────────────┬──────────────────────────────────────┘
│
▼
┌─────────────────────────────┐
│ Data Pipeline Flows │
│ - Extraction │
│ - Cleaning │
│ - Graph Load │
│ - Schema Generation │
└─────────────────────────────┘
│
▼
┌─────────────────┐
│ Data Extractors │
│ (Modular) │
└────────┬────────┘
│
┌──────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────────┐
│ Scraper │ │ Configuration│ │ Entity-Based │
│ - S3 Buffer │ │ - Pydantic │ │ Output │
│ - URL Dedup │ │ - AWS Config │ │ clubs/ │
│ - Rate Limit │ │ - Sources │ │ kits/ │
│ - Simple │ │ - Validation │ │ motors/ │
│ (~250 LOC) │ │ │ │ manufacturers/ │
│ │ │ │ │ vendors/ │
└──────────────┘ └──────────────┘ └──────────────────┘
Core Components
Prefect Flows (Primary Interface)
Data Extraction Flow (flows/data_extraction.py)
- Orchestrates data extraction from all sources
- Uses
DataExtractionPipelineto coordinate extractors - Outputs raw CSV files to S3 bucket (
blog-data-raw) - Scheduled daily or triggered on-demand
Data Cleaning Flow (flows/data_cleaning.py)
- Cleans and transforms raw data from S3
- Standardizes formats, removes duplicates
- Outputs clean CSV files to S3 bucket (
blog-data-clean) - Triggered on-demand or as part of main pipeline
Graph Load Flow (flows/graph_load.py)
- Loads cleaned data into Neo4j
- Creates graph schema and relationships
- Validates graph structure
- Triggered on-demand or as part of main pipeline
Schema Generation Flow (flows/schema_generation.py)
- Generates TypeScript types and Zod schemas from Neo4j
- Creates type definitions for frontend consumption
- Triggered on-demand or as part of main pipeline
Python Modules (Supporting Code)
src/data_sources/ - Modular Extractors
base.py- Base classes (BaseDataSource,SourceSpecification)- Individual extractor modules (see Data Sources documentation)
src/config.py - Configuration
- Environment-based configuration
- AWS, scraper, and pipeline settings
- Validation on startup
src/logger.py - Logging & Metrics
- Structured logging with colored output
- Metrics tracking (cache hits, extraction times)
- Performance monitoring
src/scraper.py - Web Scraping
- Simple S3-based URL deduplication (~250 lines)
- Session-based buffer organization
- Rate limiting
- Methods:
get_html(),get_json(),post_json(),post_html()
src/pipeline.py - Orchestration
- Coordinates multiple data sources
- Handles errors gracefully
- Tracks metrics across sources
Data Flow
Web Sources → Scraper (S3 URL dedup) → Extractors → Entity-Based Output
↓ ↓
Image Download data_raw/
↓ ├── clubs/
Cloudinary Upload ├── kits/
↓ ├── motors/
Cloudinary CDN ├── manufacturers/
↓ └── vendors/
URLs in CSV ────────────────────────────────────┘
↓
Data Cleaning Flow
↓
data_clean/
↓
Graph Load Flow
↓
Neo4j
↓
Schema Generation Flow
↓
TypeScript Types & Zod
↓
Vercel Frontend
(loads images from
Cloudinary CDN)
Image Processing Flow:
- Product images are downloaded during scraping
- Images uploaded to Cloudinary CDN with automatic optimization (WebP/AVIF)
- Cloudinary URLs stored in CSV alongside product data
- Frontend loads optimized images directly from Cloudinary CDN
See IMAGE_PROCESSING.md for detailed image processing documentation.
Data Organization Patterns
Entity-Based Organization
Output is organized by entity type, not by data source:
data_raw/
├── clubs/ # Club entities from multiple sources
├── kits/ # Kit/product entities from multiple sources
├── motors/ # Motor entities
├── manufacturers/ # Manufacturer entities
└── vendors/ # Vendor entities
Key Principles:
- Multiple sources can contribute to the same entity folder
- Reflects domain model (entities) not technical implementation (sources)
- Simplifies downstream processing (all clubs in one place)
- Aligns with graph database schema
Configuration:
SourceSpecification(
name="example",
entity_folder="clubs", # Output goes to data_raw/clubs/
output_filename="example_clubs.csv"
)
File Splitting Pattern
A single data source can output to multiple files based on field values:
Use Case: API returns mixed entity types in one response
Pattern:
SourceSpecification(
split_by_field="entity_type",
split_file_mapping={
"manufacturer": "manufacturers/company.csv",
"vendor": "vendors/company.csv",
}
)
Behavior:
- Extract records from source
- Group records by
split_by_fieldvalue - Save each group to mapped filename
- Remove the split field from output CSVs
Data Flow:
API Response (100 records)
↓
Group by entity_type
↓
├─ manufacturer (60 records) → manufacturers/company.csv
├─ vendor (30 records) → vendors/company.csv
└─ other (10 records) → (skipped if not in mapping)
Multi-Extractor Pattern
Complex data sources can have multiple extractors, each handling different entity types:
Pattern:
External API
↓
├── Metadata Extractor → manufacturers/*.csv (via file splitting)
├── Motors Extractor → motors/motors.csv
└── Samples Extractor → motors/samples.csv
Benefits:
- Separation of concerns (single responsibility per extractor)
- Independent execution and testing
- Supports complex data source integration
Modular Data Sources
Data sources are named by the source and the entity type. This implies that a source may provide many entity types and that entity types come from many sources.
Data sources read from the external source and write to CSV files that form the output interface. Each data source is independent with its own:
SourceSpecification
Defines configuration for a data source extractor.
Core Parameters:
SourceSpecification(
name="example", # Unique identifier
base_url="https://example.com", # Base URL for requests
required_fields=["id", "name"], # Fields to validate
output_filename="example.csv", # Default output filename
description="Example extractor", # Human-readable description
# Request settings
rate_limit=1.0, # Delay between requests (seconds)
timeout=30, # Request timeout (seconds)
max_retries=3, # Maximum retry attempts
# Data organization
entity_folder="kits", # Output to data_raw/kits/
# File splitting (optional)
split_by_field="entity_type", # Field to split records by
split_file_mapping={ # Map field values to files
"manufacturer": "manufacturers/file.csv",
"vendor": "vendors/file.csv",
},
# Custom configuration
custom_config={ # Source-specific options
"extract_details": False,
"include_images": True
}
)
BaseDataSource Interface
class MyExtractor(BaseDataSource):
def extract(self, limit=None):
"""Extract data with optional limit."""
# Extraction logic
return records
def run_extraction(self):
"""Complete extraction with validation and saving."""
records = self.extract()
self.validate_data(records)
self.save_to_csv(records)
return True
Source Registration
Data sources are registered in a simple dictionary in src/data_sources/__init__.py:
# In src/data_sources/__init__.py
SOURCES = {
"example": (EXAMPLE_SPEC, ExampleExtractor),
# ... other sources
}
# Factory function
def create_source(source_name, config, scraper, logger, metrics):
"""Create a data source instance by name."""
if source_name not in SOURCES:
return None
spec, source_class = SOURCES[source_name]
return source_class(spec, config, scraper, logger, metrics)
Usage:
from src.data_sources import create_source
source = create_source("example", config, scraper, logger, metrics)
records = source.extract()
Key Features
S3 URL Deduplication Buffer
Purpose: Ensure each URL is only requested once per extraction session.
How it works:
- Check S3 for buffered content using
session_id/md5(url)key - If buffered: Return content from S3 (no HTTP request)
- If not buffered: Fetch from web, store in S3, return content
Key characteristics:
- Simple session-based organization
- No TTL or expiration (manual cleanup)
- No performance metrics tracking
- ~250 lines of code (drastically simplified from 1,730 lines)
Simple buffer design: This is a request deduplication system to avoid redundant HTTP requests within a session, not a traditional cache with performance tracking.
Error Handling
- Automatic retries (3 attempts with exponential backoff)
- Graceful degradation (continues on failure)
- Comprehensive error logging
- Metrics tracking for errors
Rate Limiting
- Configurable delays per source
- Prevents server overload
- Respectful web scraping
Data Validation
- Required field checking
- Data type validation
- Minimum record count validation
- Validation errors logged but don't stop extraction
Configuration
Environment Variables (.env.local)
# AWS S3 (for caching)
AWS_BUCKET_NAME=your-bucket-name
AWS_ACCESS_KEY_ID=your-key
AWS_SECRET_ACCESS_KEY=your-secret
AWS_REGION=us-east-1
# Scraper Settings
SCRAPER_RATE_LIMIT=1.0
SCRAPER_TIMEOUT=30
SCRAPER_MAX_RETRIES=3
# Output
OUTPUT_DIR=./data_raw
LOG_FILE=data_extraction.log
Data Source Configuration
Each source has independent settings:
enabled- Enable/disable extractionrate_limit- Delay between requests (optional)custom_config- Source-specific options (optional)
Note: URL deduplication is always enabled and session-based. There is no per-source cache configuration.
Testing
Unit Tests
# Fast tests (no web calls)
pytest tests/ -m "not integration"
Integration Tests
# Real web calls with limit=1
pytest tests/test_real_extraction.py -v -m integration
Test Standards:
- Extract only 1 item to validate structure
- Verify URL deduplication works (check S3 buffer)
- Use buffered content for subsequent tests (avoid redundant requests)
- Skip gracefully if source unavailable
Adding a New Data Source
1. Create Extractor Module
Create src/data_sources/my_source.py:
from .base import BaseDataSource, SourceSpecification
MY_SOURCE_SPEC = SourceSpecification(
name="my_source",
base_url="https://example.com",
required_fields=["id", "name"],
output_filename="my_source.csv",
entity_folder="kits", # Output to data_raw/kits/
description="My source extractor"
)
class MySourceExtractor(BaseDataSource):
def extract(self, limit=None):
# Construct full URL
url = f"{self.spec.base_url}/path"
html = self.scraper.get_html(url)
# Parse and return records
return records
2. Register in SOURCES Dictionary
Update src/data_sources/__init__.py:
from .my_source import MySourceExtractor, MY_SOURCE_SPEC
SOURCES = {
# ... existing sources
"my_source": (MY_SOURCE_SPEC, MySourceExtractor),
}
3. Add Tests
Create tests in tests/test_data_sources.py:
def test_my_source_extraction():
source = create_source("my_source", config, scraper, logger, metrics)
records = source.extract(limit=1)
assert len(records) == 1
assert "id" in records[0]
assert "name" in records[0]
4. Use in Notebook
from src.pipeline import DataExtractionPipeline
pipeline = DataExtractionPipeline()
results = pipeline.run_extraction(sources=['my_source'])
Performance
Expected Extraction Times (with caching)
- NAR Clubs: ~5 seconds (100+ clubs)
- Rocketarium: ~3 seconds (50+ products)
- Estes: ~30 seconds (200+ kits)
- LOC Precision: ~20 seconds (150+ products)
Buffer Reuse
- First run in session: All URLs fetched from web
- Within same session: URLs already buffered are reused
- New session: New session ID = new buffer namespace
Limit Parameter
Use limit parameter for testing:
# Extract only 1 record for validation
records = source.extract(limit=1)
# Extract 10 records for testing
records = source.extract(limit=10)
# Extract all records (production)
records = source.extract()
Best Practices
For Pipeline Users
- Monitor the self-hosted Prefect UI for flow runs and status
- Check
data_raw/for extraction output - Review logs in
data_extraction.log - Use skip flags to run individual flows as needed
- Check Neo4j for loaded data validation
For Developers
- Always use
limit=1in integration tests - Format code with
blackandisort - Add tests for new data sources
- Update documentation when adding features
- Use environment variables for configuration
Troubleshooting
Issue: Configuration errors: AWS_BUCKET_NAME is not set
- Solution: Create
.env.localwith AWS credentials
Issue: Failed to fetch URL after 3 attempts
- Solution: Check network connection, increase timeout/retries
Issue: Buffer hit rate is 0%
- Solution: Verify S3 credentials and bucket permissions
- Note: Each new session starts with empty buffer (this is expected)
Issue: Estes extraction returns 0 records
- Solution: Website structure may have changed, update extractor
Infrastructure
AWS S3 Setup
Use Terraform for automated setup:
cd terraform
./setup.sh
This creates:
- S3 bucket with versioning
- IAM user with minimal permissions
- Lifecycle policies for buffer cleanup (optional)
See docs/TERRAFORM.md for details.
AWS Infrastructure Architecture
The blog-data pipeline runs on AWS infrastructure managed entirely through Terraform.
Infrastructure Diagram
graph TB
subgraph AWS["AWS Account: 421115711209"]
subgraph VPC["VPC: blog-data-vpc"]
subgraph PublicSubnet["Public Subnet"]
NAT["NAT Gateway"]
end
subgraph PrivateSubnet["Private Subnet"]
ECS["ECS Cluster<br/>blog-data-pipeline<br/>Fargate"]
Lambda["Lambda<br/>ork-processor"]
end
end
subgraph Storage["Storage & Secrets"]
S3["S3 Bucket<br/>ron-website-docs"]
Secrets["Secrets Manager<br/>neo4j/credentials"]
ECR["ECR Repository<br/>blog-data"]
end
subgraph Database["External Database"]
Neo4j["Neo4j Aura<br/>5f047702.databases.neo4j.io"]
end
subgraph Monitoring["Monitoring & Logging"]
CloudWatch["CloudWatch<br/>Logs & Metrics"]
CloudTrail["CloudTrail<br/>Audit Logs"]
end
subgraph Security["Security & Encryption"]
KMS["KMS Key<br/>Encryption"]
IAM["IAM Roles & Policies<br/>blog-data-terraform-admin"]
SG["Security Groups<br/>ECS Tasks"]
end
end
subgraph External["External Services"]
Prefect["Self-hosted Prefect 3<br/>Orchestration"]
end
ECS -->|Pull Images| ECR
ECS -->|Read/Write| S3
ECS -->|Get Credentials| Secrets
ECS -->|Query/Update| Neo4j
ECS -->|Send Logs| CloudWatch
ECS -->|Orchestrated by| Prefect
Lambda -->|Read/Write| S3
Lambda -->|Get Credentials| Secrets
Lambda -->|Query/Update| Neo4j
Lambda -->|Send Logs| CloudWatch
Secrets -->|Encrypted with| KMS
S3 -->|Encrypted with| KMS
ECR -->|Encrypted with| KMS
IAM -->|Manages| ECS
IAM -->|Manages| Lambda
SG -->|Protects| ECS
CloudTrail -->|Audits| IAM
AWS Components
Compute
| Component | Type | Purpose |
|---|---|---|
| ECS Cluster | Fargate | Run Prefect pipeline flows |
| Lambda Function | ork-processor | Process OpenRocket files |
Storage
| Component | Type | Purpose |
|---|---|---|
| S3 Bucket | ron-website-docs | Store pipeline data |
| ECR Repository | blog-data | Store container images |
| Secrets Manager | neo4j/credentials | Store database credentials |
Networking
| Component | Type | Purpose |
|---|---|---|
| VPC | blog-data-vpc | Network isolation |
| Public Subnet | Subnet | NAT Gateway |
| Private Subnet | Subnet | ECS & Lambda |
| NAT Gateway | Gateway | Outbound internet access |
| Security Groups | SG | Network access control |
Security
| Component | Type | Purpose |
|---|---|---|
| KMS Key | Encryption | Encrypt S3, ECR, Secrets |
| IAM Roles | Role | ECS task execution & permissions |
| IAM Policies | Policy | Least-privilege access |
| CloudTrail | Audit | Track all API calls |
Monitoring
| Component | Type | Purpose |
|---|---|---|
| CloudWatch Logs | Logs | ECS & Lambda logs |
| CloudWatch Metrics | Metrics | Performance monitoring |
| CloudWatch Alarms | Alarms | Error notifications |
Deployment Data Flow
Pipeline Execution
- Self-hosted Prefect 3 triggers ECS task
- ECS Task pulls container image from ECR
- ECS Task retrieves Neo4j credentials from Secrets Manager
- ECS Task reads/writes data to S3
- ECS Task queries/updates Neo4j database
- ECS Task sends logs to CloudWatch
- CloudWatch triggers alarms on errors
Lambda Processing
- S3 Event triggers Lambda function
- Lambda retrieves credentials from Secrets Manager
- Lambda processes file from S3
- Lambda updates Neo4j database
- Lambda sends logs to CloudWatch
Security Architecture
Encryption
- At Rest: KMS encryption for S3, ECR, Secrets Manager
- In Transit: TLS/HTTPS for all external connections
- Credentials: Stored in Secrets Manager with automatic rotation (optional)
Access Control
- IAM Roles: Separate roles for ECS task execution and application
- Security Groups: Restrict network access to required services
- Least Privilege: Each role has minimal required permissions
Audit & Compliance
- CloudTrail: All API calls logged and audited
- CloudWatch: Application logs retained for 30 days
- IAM Policies: Documented and version controlled
Terraform Files
| File | Purpose |
|---|---|
main.tf | Provider configuration |
vpc.tf | VPC, subnets, NAT gateway |
ecs.tf | ECS cluster, task definitions, services |
lambda.tf | Lambda functions and permissions |
s3.tf | S3 buckets and configurations |
iam.tf | IAM roles and policies |
variables.tf | Input variables |
outputs.tf | Output values |
Deployment
See ../docs/infra.md for infrastructure setup and deployment instructions.
Architecture Diagrams
The blog_data pipeline has a small set of architecture diagrams generated from code in this repository. These diagrams focus on the Prefect worker pool, ECS tasks, and the AWS services they interact with.
All diagrams are stored under docs/diagrams/ and are generated by
generate_architecture_diagram.py in the repo root.
Available Diagrams
0. Prefect Runtime (diagram_0_blog_data_prefect_runtime.png)
Purpose: Show how the blog_data Prefect worker pool runs on ECS and talks to its backing AWS and external services.
Shows:
- Prefect API on ECS, exposed via
pipelines.rocketclub.online - Prefect worker pool
blog-data-poolrunning ECS tasks - ECR repository holding the
blog_dataruntime image - S3 cache, raw and clean buckets used by the pipeline
- AWS Secrets Manager for
blog-data/*credentials - Neo4j Aura and Cloudinary as external services
Use for: Understanding where Prefect actually runs, what AWS services the workers talk to, and how images are pulled and executed.
1. Data Pipeline (diagram_1_blog_data_data_pipeline.png)
Purpose: Show the end-to-end data processing pipeline as implemented in the Prefect flows.
Shows:
- Prefect flows (main pipeline + subflows) as a single logical node
- Web scraping cache in S3
- Raw extracted CSVs in the
blog-data-raw-*bucket - Cleaned CSVs in the
blog-data-clean-*bucket - Graph load into Neo4j Aura
- Image upload to Cloudinary
Use for: Understanding how extraction, cleaning, graph load, and image upload relate to each other and to the backing storage systems.
2. CI & Prefect Deployer (diagram_2_blog_data_ci_and_deployer.png)
Purpose: Show how code changes in blog_data become Prefect deployments on the self-hosted Prefect API.
Shows:
- Developer pushes to
rlhatcher/blog_dataon GitHub - CircleCI
blog_dataworkflow builds and pushes the runtime image to ECR - CircleCI triggers a Prefect deployer ECS task (defined in blog_infra)
- Deployer task runs
prefect deploy --allagainst the self-hosted Prefect API - Prefect API schedules runs on the
blog-data-poolworkers
Use for: Understanding the full CI/CD path from Git commit to updated Prefect deployments and worker execution.
Diagram Generation
Script: generate_architecture_diagram.py
Dependencies:
- Python dev dependencies (including
diagramsandgraphviz) from thedevdependency group inpyproject.toml - A system Graphviz install (e.g.
brew install graphvizon macOS)
From the blog_data repository root:
uv sync --group dev # ensure dev dependencies, including diagrams and graphviz
uv run python generate_architecture_diagram.py
Output:
- PNG files in
docs/diagrams/ - Each diagram focused on blog_data flows, ECS tasks, and AWS services
Diagram Maintenance
When to Update:
- Changing Prefect work pool names or worker topology
- Changing S3 bucket structure or data flow
- Changing how CI/CD triggers the Prefect deployer
- Adding or removing AWS or external services used by blog_data
How to Update:
- Update
generate_architecture_diagram.pywith any new nodes or edges - Regenerate all diagrams using the commands above
- Review the generated PNGs under
docs/diagrams/ - Update this documentation if needed
Validation: See DIAGRAM_VALIDATION_REPORT.md for diagram validation checklist.
External Services Integration
Cloudinary CDN
Purpose: Image hosting and optimization
Configuration:
- Cloud Name: ronaldhatcher
- Folder Structure:
kits/{manufacturer}/{product_id} - Transformations: Auto-format (WebP/AVIF), responsive sizing, quality optimization
Credentials:
- Stored in AWS Secrets Manager:
blog-data/cloudinary/credentials - Retrieved at runtime by ECS tasks
- Never stored in environment variables or code
Documentation:
- IMAGE_PROCESSING.md - Complete image processing guide
- CLOUDINARY_CONFIGURATION.md - Detailed configuration reference
- Diagram 7 - Visual flow diagram
Neo4j Aura
Purpose: Graph database for rocket data relationships
Configuration:
- Instance: 5f047702.databases.neo4j.io
- Protocol: neo4j+s (secure)
Credentials:
- Stored in AWS Secrets Manager:
blog-data/neo4j/credentials - Retrieved at runtime by ECS tasks and Lambda functions
Prefect Orchestration (self-hosted Prefect 3 on ECS)
Purpose: Workflow orchestration and scheduling
Configuration:
- API: Self-hosted Prefect 3 server at
https://pipelines.rocketclub.online - Work Pool:
blog-data-pool(ECS Fargate) - Deployments: Managed by the Prefect deployer ECS task triggered from CircleCI
Credentials:
- Prefect HTTP basic auth is configured and stored in AWS Secrets Manager by the
blog_infraTerraform stack (seeblog_infra/PREFECT_AUTH_SETUP.md). - The Prefect deployer ECS task and CI use this configuration to authenticate to the API; this repo does not define or manage those secrets directly.
Vercel
Purpose: Next.js frontend hosting
Data Access:
- Reads cleaned CSV from S3 bucket
- Queries Neo4j for graph data
- Loads images from Cloudinary CDN
Related Documentation
- IMAGE_PROCESSING.md - Image processing pipeline
- CLOUDINARY_CONFIGURATION.md - Cloudinary setup and configuration
- DIAGRAM_VALIDATION_REPORT.md - Diagram validation
- ARCHITECTURE_DIAGRAM_ASSESSMENT.md - Diagram improvements
- infra.md - Infrastructure setup and deployment
- TERRAFORM.md - Terraform configuration details