Architecture

Last Updated: 2025-10-30 Related Diagrams: See Architecture Diagrams section

Overview

Prefect-orchestrated data pipeline with modular Python components for extracting, cleaning, and loading rocket data into Neo4j. Includes Cloudinary CDN integration for optimized image delivery.

System Design

┌─────────────────────────────────────────────────────────────┐
│            Self-hosted Prefect 3 on ECS                     │
│  Scheduling, Monitoring, Flow Orchestration                 │
└──────────────────────┬──────────────────────────────────────┘


         ┌─────────────────────────────┐
         │  Data Pipeline Flows        │
         │  - Extraction               │
         │  - Cleaning                 │
         │  - Graph Load               │
         │  - Schema Generation        │
         └─────────────────────────────┘


              ┌─────────────────┐
              │ Data Extractors │
              │  (Modular)      │
              └────────┬────────┘

        ┌──────────────┼──────────────┐
        │              │              │
        ▼              ▼              ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────────┐
│   Scraper    │ │ Configuration│ │  Entity-Based    │
│ - S3 Buffer  │ │ - Pydantic   │ │  Output          │
│ - URL Dedup  │ │ - AWS Config │ │  clubs/          │
│ - Rate Limit │ │ - Sources    │ │  kits/           │
│ - Simple     │ │ - Validation │ │  motors/         │
│   (~250 LOC) │ │              │ │  manufacturers/  │
│              │ │              │ │  vendors/        │
└──────────────┘ └──────────────┘ └──────────────────┘

Core Components

Prefect Flows (Primary Interface)

Data Extraction Flow (flows/data_extraction.py)

  • Orchestrates data extraction from all sources
  • Uses DataExtractionPipeline to coordinate extractors
  • Outputs raw CSV files to S3 bucket (blog-data-raw)
  • Scheduled daily or triggered on-demand

Data Cleaning Flow (flows/data_cleaning.py)

  • Cleans and transforms raw data from S3
  • Standardizes formats, removes duplicates
  • Outputs clean CSV files to S3 bucket (blog-data-clean)
  • Triggered on-demand or as part of main pipeline

Graph Load Flow (flows/graph_load.py)

  • Loads cleaned data into Neo4j
  • Creates graph schema and relationships
  • Validates graph structure
  • Triggered on-demand or as part of main pipeline

Schema Generation Flow (flows/schema_generation.py)

  • Generates TypeScript types and Zod schemas from Neo4j
  • Creates type definitions for frontend consumption
  • Triggered on-demand or as part of main pipeline

Python Modules (Supporting Code)

src/data_sources/ - Modular Extractors

  • base.py - Base classes (BaseDataSource, SourceSpecification)
  • Individual extractor modules (see Data Sources documentation)

src/config.py - Configuration

  • Environment-based configuration
  • AWS, scraper, and pipeline settings
  • Validation on startup

src/logger.py - Logging & Metrics

  • Structured logging with colored output
  • Metrics tracking (cache hits, extraction times)
  • Performance monitoring

src/scraper.py - Web Scraping

  • Simple S3-based URL deduplication (~250 lines)
  • Session-based buffer organization
  • Rate limiting
  • Methods: get_html(), get_json(), post_json(), post_html()

src/pipeline.py - Orchestration

  • Coordinates multiple data sources
  • Handles errors gracefully
  • Tracks metrics across sources

Data Flow

Web Sources → Scraper (S3 URL dedup) → Extractors → Entity-Based Output
                  ↓                                           ↓
            Image Download                              data_raw/
                  ↓                                      ├── clubs/
            Cloudinary Upload                            ├── kits/
                  ↓                                      ├── motors/
            Cloudinary CDN                               ├── manufacturers/
                  ↓                                      └── vendors/
            URLs in CSV ────────────────────────────────────┘

                                                    Data Cleaning Flow

                                                        data_clean/

                                                    Graph Load Flow

                                                           Neo4j

                                                  Schema Generation Flow

                                                  TypeScript Types & Zod

                                                    Vercel Frontend
                                                    (loads images from
                                                     Cloudinary CDN)

Image Processing Flow:

  • Product images are downloaded during scraping
  • Images uploaded to Cloudinary CDN with automatic optimization (WebP/AVIF)
  • Cloudinary URLs stored in CSV alongside product data
  • Frontend loads optimized images directly from Cloudinary CDN

See IMAGE_PROCESSING.md for detailed image processing documentation.

Data Organization Patterns

Entity-Based Organization

Output is organized by entity type, not by data source:

data_raw/
├── clubs/          # Club entities from multiple sources
├── kits/           # Kit/product entities from multiple sources
├── motors/         # Motor entities
├── manufacturers/  # Manufacturer entities
└── vendors/        # Vendor entities

Key Principles:

  • Multiple sources can contribute to the same entity folder
  • Reflects domain model (entities) not technical implementation (sources)
  • Simplifies downstream processing (all clubs in one place)
  • Aligns with graph database schema

Configuration:

SourceSpecification(
    name="example",
    entity_folder="clubs",  # Output goes to data_raw/clubs/
    output_filename="example_clubs.csv"
)

File Splitting Pattern

A single data source can output to multiple files based on field values:

Use Case: API returns mixed entity types in one response

Pattern:

SourceSpecification(
    split_by_field="entity_type",
    split_file_mapping={
        "manufacturer": "manufacturers/company.csv",
        "vendor": "vendors/company.csv",
    }
)

Behavior:

  1. Extract records from source
  2. Group records by split_by_field value
  3. Save each group to mapped filename
  4. Remove the split field from output CSVs

Data Flow:

API Response (100 records)

Group by entity_type

├─ manufacturer (60 records) → manufacturers/company.csv
├─ vendor (30 records) → vendors/company.csv
└─ other (10 records) → (skipped if not in mapping)

Multi-Extractor Pattern

Complex data sources can have multiple extractors, each handling different entity types:

Pattern:

External API

├── Metadata Extractor → manufacturers/*.csv (via file splitting)
├── Motors Extractor → motors/motors.csv
└── Samples Extractor → motors/samples.csv

Benefits:

  • Separation of concerns (single responsibility per extractor)
  • Independent execution and testing
  • Supports complex data source integration

Modular Data Sources

Data sources are named by the source and the entity type. This implies that a source may provide many entity types and that entity types come from many sources.

Data sources read from the external source and write to CSV files that form the output interface. Each data source is independent with its own:

SourceSpecification

Defines configuration for a data source extractor.

Core Parameters:

SourceSpecification(
    name="example",                    # Unique identifier
    base_url="https://example.com",    # Base URL for requests
    required_fields=["id", "name"],    # Fields to validate
    output_filename="example.csv",     # Default output filename
    description="Example extractor",   # Human-readable description

    # Request settings
    rate_limit=1.0,                    # Delay between requests (seconds)
    timeout=30,                        # Request timeout (seconds)
    max_retries=3,                     # Maximum retry attempts

    # Data organization
    entity_folder="kits",              # Output to data_raw/kits/

    # File splitting (optional)
    split_by_field="entity_type",      # Field to split records by
    split_file_mapping={               # Map field values to files
        "manufacturer": "manufacturers/file.csv",
        "vendor": "vendors/file.csv",
    },

    # Custom configuration
    custom_config={                    # Source-specific options
        "extract_details": False,
        "include_images": True
    }
)

BaseDataSource Interface

class MyExtractor(BaseDataSource):
    def extract(self, limit=None):
        """Extract data with optional limit."""
        # Extraction logic
        return records

    def run_extraction(self):
        """Complete extraction with validation and saving."""
        records = self.extract()
        self.validate_data(records)
        self.save_to_csv(records)
        return True

Source Registration

Data sources are registered in a simple dictionary in src/data_sources/__init__.py:

# In src/data_sources/__init__.py
SOURCES = {
    "example": (EXAMPLE_SPEC, ExampleExtractor),
    # ... other sources
}

# Factory function
def create_source(source_name, config, scraper, logger, metrics):
    """Create a data source instance by name."""
    if source_name not in SOURCES:
        return None
    spec, source_class = SOURCES[source_name]
    return source_class(spec, config, scraper, logger, metrics)

Usage:

from src.data_sources import create_source

source = create_source("example", config, scraper, logger, metrics)
records = source.extract()

Key Features

S3 URL Deduplication Buffer

Purpose: Ensure each URL is only requested once per extraction session.

How it works:

  1. Check S3 for buffered content using session_id/md5(url) key
  2. If buffered: Return content from S3 (no HTTP request)
  3. If not buffered: Fetch from web, store in S3, return content

Key characteristics:

  • Simple session-based organization
  • No TTL or expiration (manual cleanup)
  • No performance metrics tracking
  • ~250 lines of code (drastically simplified from 1,730 lines)

Simple buffer design: This is a request deduplication system to avoid redundant HTTP requests within a session, not a traditional cache with performance tracking.

Error Handling

  • Automatic retries (3 attempts with exponential backoff)
  • Graceful degradation (continues on failure)
  • Comprehensive error logging
  • Metrics tracking for errors

Rate Limiting

  • Configurable delays per source
  • Prevents server overload
  • Respectful web scraping

Data Validation

  • Required field checking
  • Data type validation
  • Minimum record count validation
  • Validation errors logged but don't stop extraction

Configuration

Environment Variables (.env.local)

# AWS S3 (for caching)
AWS_BUCKET_NAME=your-bucket-name
AWS_ACCESS_KEY_ID=your-key
AWS_SECRET_ACCESS_KEY=your-secret
AWS_REGION=us-east-1

# Scraper Settings
SCRAPER_RATE_LIMIT=1.0
SCRAPER_TIMEOUT=30
SCRAPER_MAX_RETRIES=3

# Output
OUTPUT_DIR=./data_raw
LOG_FILE=data_extraction.log

Data Source Configuration

Each source has independent settings:

  • enabled - Enable/disable extraction
  • rate_limit - Delay between requests (optional)
  • custom_config - Source-specific options (optional)

Note: URL deduplication is always enabled and session-based. There is no per-source cache configuration.

Testing

Unit Tests

# Fast tests (no web calls)
pytest tests/ -m "not integration"

Integration Tests

# Real web calls with limit=1
pytest tests/test_real_extraction.py -v -m integration

Test Standards:

  • Extract only 1 item to validate structure
  • Verify URL deduplication works (check S3 buffer)
  • Use buffered content for subsequent tests (avoid redundant requests)
  • Skip gracefully if source unavailable

Adding a New Data Source

1. Create Extractor Module

Create src/data_sources/my_source.py:

from .base import BaseDataSource, SourceSpecification

MY_SOURCE_SPEC = SourceSpecification(
    name="my_source",
    base_url="https://example.com",
    required_fields=["id", "name"],
    output_filename="my_source.csv",
    entity_folder="kits",  # Output to data_raw/kits/
    description="My source extractor"
)

class MySourceExtractor(BaseDataSource):
    def extract(self, limit=None):
        # Construct full URL
        url = f"{self.spec.base_url}/path"
        html = self.scraper.get_html(url)
        # Parse and return records
        return records

2. Register in SOURCES Dictionary

Update src/data_sources/__init__.py:

from .my_source import MySourceExtractor, MY_SOURCE_SPEC

SOURCES = {
    # ... existing sources
    "my_source": (MY_SOURCE_SPEC, MySourceExtractor),
}

3. Add Tests

Create tests in tests/test_data_sources.py:

def test_my_source_extraction():
    source = create_source("my_source", config, scraper, logger, metrics)
    records = source.extract(limit=1)
    assert len(records) == 1
    assert "id" in records[0]
    assert "name" in records[0]

4. Use in Notebook

from src.pipeline import DataExtractionPipeline

pipeline = DataExtractionPipeline()
results = pipeline.run_extraction(sources=['my_source'])

Performance

Expected Extraction Times (with caching)

  • NAR Clubs: ~5 seconds (100+ clubs)
  • Rocketarium: ~3 seconds (50+ products)
  • Estes: ~30 seconds (200+ kits)
  • LOC Precision: ~20 seconds (150+ products)

Buffer Reuse

  • First run in session: All URLs fetched from web
  • Within same session: URLs already buffered are reused
  • New session: New session ID = new buffer namespace

Limit Parameter

Use limit parameter for testing:

# Extract only 1 record for validation
records = source.extract(limit=1)

# Extract 10 records for testing
records = source.extract(limit=10)

# Extract all records (production)
records = source.extract()

Best Practices

For Pipeline Users

  1. Monitor the self-hosted Prefect UI for flow runs and status
  2. Check data_raw/ for extraction output
  3. Review logs in data_extraction.log
  4. Use skip flags to run individual flows as needed
  5. Check Neo4j for loaded data validation

For Developers

  1. Always use limit=1 in integration tests
  2. Format code with black and isort
  3. Add tests for new data sources
  4. Update documentation when adding features
  5. Use environment variables for configuration

Troubleshooting

Issue: Configuration errors: AWS_BUCKET_NAME is not set

  • Solution: Create .env.local with AWS credentials

Issue: Failed to fetch URL after 3 attempts

  • Solution: Check network connection, increase timeout/retries

Issue: Buffer hit rate is 0%

  • Solution: Verify S3 credentials and bucket permissions
  • Note: Each new session starts with empty buffer (this is expected)

Issue: Estes extraction returns 0 records

  • Solution: Website structure may have changed, update extractor

Infrastructure

AWS S3 Setup

Use Terraform for automated setup:

cd terraform
./setup.sh

This creates:

  • S3 bucket with versioning
  • IAM user with minimal permissions
  • Lifecycle policies for buffer cleanup (optional)

See docs/TERRAFORM.md for details.

AWS Infrastructure Architecture

The blog-data pipeline runs on AWS infrastructure managed entirely through Terraform.

Infrastructure Diagram

graph TB
    subgraph AWS["AWS Account: 421115711209"]
        subgraph VPC["VPC: blog-data-vpc"]
            subgraph PublicSubnet["Public Subnet"]
                NAT["NAT Gateway"]
            end

            subgraph PrivateSubnet["Private Subnet"]
                ECS["ECS Cluster<br/>blog-data-pipeline<br/>Fargate"]
                Lambda["Lambda<br/>ork-processor"]
            end
        end

        subgraph Storage["Storage & Secrets"]
            S3["S3 Bucket<br/>ron-website-docs"]
            Secrets["Secrets Manager<br/>neo4j/credentials"]
            ECR["ECR Repository<br/>blog-data"]
        end

        subgraph Database["External Database"]
            Neo4j["Neo4j Aura<br/>5f047702.databases.neo4j.io"]
        end

        subgraph Monitoring["Monitoring & Logging"]
            CloudWatch["CloudWatch<br/>Logs & Metrics"]
            CloudTrail["CloudTrail<br/>Audit Logs"]
        end

        subgraph Security["Security & Encryption"]
            KMS["KMS Key<br/>Encryption"]
            IAM["IAM Roles & Policies<br/>blog-data-terraform-admin"]
            SG["Security Groups<br/>ECS Tasks"]
        end
    end

    subgraph External["External Services"]
        Prefect["Self-hosted Prefect 3<br/>Orchestration"]
    end

    ECS -->|Pull Images| ECR
    ECS -->|Read/Write| S3
    ECS -->|Get Credentials| Secrets
    ECS -->|Query/Update| Neo4j
    ECS -->|Send Logs| CloudWatch
    ECS -->|Orchestrated by| Prefect

    Lambda -->|Read/Write| S3
    Lambda -->|Get Credentials| Secrets
    Lambda -->|Query/Update| Neo4j
    Lambda -->|Send Logs| CloudWatch

    Secrets -->|Encrypted with| KMS
    S3 -->|Encrypted with| KMS
    ECR -->|Encrypted with| KMS

    IAM -->|Manages| ECS
    IAM -->|Manages| Lambda
    SG -->|Protects| ECS

    CloudTrail -->|Audits| IAM

AWS Components

Compute

ComponentTypePurpose
ECS ClusterFargateRun Prefect pipeline flows
Lambda Functionork-processorProcess OpenRocket files

Storage

ComponentTypePurpose
S3 Bucketron-website-docsStore pipeline data
ECR Repositoryblog-dataStore container images
Secrets Managerneo4j/credentialsStore database credentials

Networking

ComponentTypePurpose
VPCblog-data-vpcNetwork isolation
Public SubnetSubnetNAT Gateway
Private SubnetSubnetECS & Lambda
NAT GatewayGatewayOutbound internet access
Security GroupsSGNetwork access control

Security

ComponentTypePurpose
KMS KeyEncryptionEncrypt S3, ECR, Secrets
IAM RolesRoleECS task execution & permissions
IAM PoliciesPolicyLeast-privilege access
CloudTrailAuditTrack all API calls

Monitoring

ComponentTypePurpose
CloudWatch LogsLogsECS & Lambda logs
CloudWatch MetricsMetricsPerformance monitoring
CloudWatch AlarmsAlarmsError notifications

Deployment Data Flow

Pipeline Execution

  1. Self-hosted Prefect 3 triggers ECS task
  2. ECS Task pulls container image from ECR
  3. ECS Task retrieves Neo4j credentials from Secrets Manager
  4. ECS Task reads/writes data to S3
  5. ECS Task queries/updates Neo4j database
  6. ECS Task sends logs to CloudWatch
  7. CloudWatch triggers alarms on errors

Lambda Processing

  1. S3 Event triggers Lambda function
  2. Lambda retrieves credentials from Secrets Manager
  3. Lambda processes file from S3
  4. Lambda updates Neo4j database
  5. Lambda sends logs to CloudWatch

Security Architecture

Encryption

  • At Rest: KMS encryption for S3, ECR, Secrets Manager
  • In Transit: TLS/HTTPS for all external connections
  • Credentials: Stored in Secrets Manager with automatic rotation (optional)

Access Control

  • IAM Roles: Separate roles for ECS task execution and application
  • Security Groups: Restrict network access to required services
  • Least Privilege: Each role has minimal required permissions

Audit & Compliance

  • CloudTrail: All API calls logged and audited
  • CloudWatch: Application logs retained for 30 days
  • IAM Policies: Documented and version controlled

Terraform Files

FilePurpose
main.tfProvider configuration
vpc.tfVPC, subnets, NAT gateway
ecs.tfECS cluster, task definitions, services
lambda.tfLambda functions and permissions
s3.tfS3 buckets and configurations
iam.tfIAM roles and policies
variables.tfInput variables
outputs.tfOutput values

Deployment

See ../docs/infra.md for infrastructure setup and deployment instructions.

Architecture Diagrams

The blog_data pipeline has a small set of architecture diagrams generated from code in this repository. These diagrams focus on the Prefect worker pool, ECS tasks, and the AWS services they interact with.

All diagrams are stored under docs/diagrams/ and are generated by generate_architecture_diagram.py in the repo root.

Available Diagrams

0. Prefect Runtime (diagram_0_blog_data_prefect_runtime.png)

Purpose: Show how the blog_data Prefect worker pool runs on ECS and talks to its backing AWS and external services.

Shows:

  • Prefect API on ECS, exposed via pipelines.rocketclub.online
  • Prefect worker pool blog-data-pool running ECS tasks
  • ECR repository holding the blog_data runtime image
  • S3 cache, raw and clean buckets used by the pipeline
  • AWS Secrets Manager for blog-data/* credentials
  • Neo4j Aura and Cloudinary as external services

Use for: Understanding where Prefect actually runs, what AWS services the workers talk to, and how images are pulled and executed.

1. Data Pipeline (diagram_1_blog_data_data_pipeline.png)

Purpose: Show the end-to-end data processing pipeline as implemented in the Prefect flows.

Shows:

  • Prefect flows (main pipeline + subflows) as a single logical node
  • Web scraping cache in S3
  • Raw extracted CSVs in the blog-data-raw-* bucket
  • Cleaned CSVs in the blog-data-clean-* bucket
  • Graph load into Neo4j Aura
  • Image upload to Cloudinary

Use for: Understanding how extraction, cleaning, graph load, and image upload relate to each other and to the backing storage systems.

2. CI & Prefect Deployer (diagram_2_blog_data_ci_and_deployer.png)

Purpose: Show how code changes in blog_data become Prefect deployments on the self-hosted Prefect API.

Shows:

  • Developer pushes to rlhatcher/blog_data on GitHub
  • CircleCI blog_data workflow builds and pushes the runtime image to ECR
  • CircleCI triggers a Prefect deployer ECS task (defined in blog_infra)
  • Deployer task runs prefect deploy --all against the self-hosted Prefect API
  • Prefect API schedules runs on the blog-data-pool workers

Use for: Understanding the full CI/CD path from Git commit to updated Prefect deployments and worker execution.

Diagram Generation

Script: generate_architecture_diagram.py

Dependencies:

  • Python dev dependencies (including diagrams and graphviz) from the dev dependency group in pyproject.toml
  • A system Graphviz install (e.g. brew install graphviz on macOS)

From the blog_data repository root:

uv sync --group dev  # ensure dev dependencies, including diagrams and graphviz
uv run python generate_architecture_diagram.py

Output:

  • PNG files in docs/diagrams/
  • Each diagram focused on blog_data flows, ECS tasks, and AWS services

Diagram Maintenance

When to Update:

  • Changing Prefect work pool names or worker topology
  • Changing S3 bucket structure or data flow
  • Changing how CI/CD triggers the Prefect deployer
  • Adding or removing AWS or external services used by blog_data

How to Update:

  1. Update generate_architecture_diagram.py with any new nodes or edges
  2. Regenerate all diagrams using the commands above
  3. Review the generated PNGs under docs/diagrams/
  4. Update this documentation if needed

Validation: See DIAGRAM_VALIDATION_REPORT.md for diagram validation checklist.

External Services Integration

Cloudinary CDN

Purpose: Image hosting and optimization

Configuration:

  • Cloud Name: ronaldhatcher
  • Folder Structure: kits/{manufacturer}/{product_id}
  • Transformations: Auto-format (WebP/AVIF), responsive sizing, quality optimization

Credentials:

  • Stored in AWS Secrets Manager: blog-data/cloudinary/credentials
  • Retrieved at runtime by ECS tasks
  • Never stored in environment variables or code

Documentation:

Neo4j Aura

Purpose: Graph database for rocket data relationships

Configuration:

  • Instance: 5f047702.databases.neo4j.io
  • Protocol: neo4j+s (secure)

Credentials:

  • Stored in AWS Secrets Manager: blog-data/neo4j/credentials
  • Retrieved at runtime by ECS tasks and Lambda functions

Prefect Orchestration (self-hosted Prefect 3 on ECS)

Purpose: Workflow orchestration and scheduling

Configuration:

  • API: Self-hosted Prefect 3 server at https://pipelines.rocketclub.online
  • Work Pool: blog-data-pool (ECS Fargate)
  • Deployments: Managed by the Prefect deployer ECS task triggered from CircleCI

Credentials:

  • Prefect HTTP basic auth is configured and stored in AWS Secrets Manager by the blog_infra Terraform stack (see blog_infra/PREFECT_AUTH_SETUP.md).
  • The Prefect deployer ECS task and CI use this configuration to authenticate to the API; this repo does not define or manage those secrets directly.

Vercel

Purpose: Next.js frontend hosting

Data Access:

  • Reads cleaned CSV from S3 bucket
  • Queries Neo4j for graph data
  • Loads images from Cloudinary CDN