Error Handling Strategy for S3 Operations

Overview

This document defines the standardized error handling approach for all S3 operations in the blog_data pipeline.

Principles

  1. Fail Fast: Let errors propagate to Prefect for proper retry handling
  2. Informative Logging: Log errors with context before re-raising
  3. Consistent Patterns: Use the same error handling pattern across all S3 modules
  4. Prefect Integration: Leverage Prefect's built-in retry mechanisms

Error Categories

1. S3 Client Errors (ClientError)

When: AWS S3 API calls fail (network issues, permissions, bucket not found, etc.)

Handling:

  • Log error with full context (bucket, key, operation)
  • Include exc_info=True for stack trace
  • Re-raise to allow Prefect retry logic

Example:

try:
    client.put_object(Bucket=bucket, Key=key, Body=data)
except ClientError as e:
    logger.error("Failed to write to S3 s3://%s/%s: %s", bucket, key, e, exc_info=True)
    raise

2. Data Validation Errors (ValueError)

When: Invalid data format, missing required fields, foreign key violations

Handling:

  • Log error with specific validation failure details
  • Raise ValueError with descriptive message
  • Do NOT retry (data issue, not transient)

Example:

if invalid_records:
    logger.error("Invalid foreign keys in %s: %s", entity_name, invalid_records)
    raise ValueError(f"Foreign key validation failed for {entity_name}")

3. Configuration Errors (ValueError)

When: Missing or invalid configuration (S3 bucket names, Neo4j URI, etc.)

Handling:

  • Raise ValueError immediately with clear message
  • Do NOT retry (configuration issue)

Example:

if not config.neo4j_uri:
    raise ValueError("NEO4J_URI is not configured")

4. File Not Found (404)

When: S3 object doesn't exist

Handling:

  • For existence checks: Return False (don't raise)
  • For required files: Log error and re-raise
  • Use s3_object_exists() for explicit checks

Example:

try:
    client.head_object(Bucket=bucket, Key=key)
    return True
except ClientError as e:
    if e.response["Error"]["Code"] == "404":
        return False
    raise  # Re-raise other errors

Standard Error Handling Patterns

Pattern 1: S3 Read Operations

def read_operation(bucket: str, key: str) -> pd.DataFrame:
    """Read data from S3."""
    logger = _get_logger()

    try:
        client = get_s3_client()
        response = client.get_object(Bucket=bucket, Key=key)
        # ... process data ...
        logger.info("Read %d records from s3://%s/%s", len(df), bucket, key)
        return df

    except ClientError as e:
        logger.error("Failed to read from S3 s3://%s/%s: %s", bucket, key, e, exc_info=True)
        raise

Pattern 2: S3 Write Operations

def write_operation(df: pd.DataFrame, bucket: str, key: str) -> str:
    """Write data to S3."""
    logger = _get_logger()

    try:
        # ... prepare data ...
        client = get_s3_client()
        client.put_object(Bucket=bucket, Key=key, Body=data)
        s3_uri = f"s3://{bucket}/{key}"
        logger.info("Wrote %d records to %s", len(df), s3_uri)
        return s3_uri

    except ClientError as e:
        logger.error("Failed to write to S3 s3://%s/%s: %s", bucket, key, e, exc_info=True)
        raise

Pattern 3: Domain-Specific Operations

def domain_operation(filename: str, entity_name: Optional[str] = None) -> pd.DataFrame:
    """Domain-specific S3 operation."""
    from src.config import get_config
    config = get_config()
    logger = _get_logger()

    try:
        df = read_csv_from_s3(config.bucket, filename)
        if entity_name:
            logger.info("Loaded %s (%d records)", entity_name, len(df))
        return df

    except ClientError as e:
        if entity_name:
            logger.error("Failed to load %s: %s", entity_name, e, exc_info=True)
        raise

Logging Standards

Log Levels

  • INFO: Successful operations with record counts
  • WARNING: Non-fatal issues (duplicates found, missing optional data)
  • ERROR: Failed operations that will be retried or cause task failure

Log Message Format

Success:

logger.info("Loaded %s from %s bucket (%d records)", entity_name, bucket_type, count)
logger.info("Saved %s to %s bucket (%d records)", entity_name, bucket_type, count)

Errors:

logger.error("Failed to load %s from %s bucket: %s", entity_name, bucket_type, error, exc_info=True)
logger.error("Failed to save %s to %s bucket: %s", entity_name, bucket_type, error, exc_info=True)

Warnings:

logger.warning("Found %d duplicate %s values in %s", count, column, entity_name)
logger.warning("No records to save for %s", entity_name)

Prefect Integration

Task Retries

All S3 tasks should use Prefect's retry mechanism:

@task(name="task-name", retries=2, retry_delay_seconds=10)
def my_task():
    # S3 operations that may fail transiently
    pass

Error Propagation

  • Let ClientError propagate to Prefect
  • Prefect will retry based on task configuration
  • After max retries, Prefect marks task as failed

Non-Retryable Errors

For errors that shouldn't be retried (data validation, configuration):

  • Raise ValueError instead of ClientError
  • Prefect will fail immediately without retries

Implementation Checklist

  • All S3 read operations follow Pattern 1
  • All S3 write operations follow Pattern 2
  • All domain operations follow Pattern 3
  • All errors logged with exc_info=True
  • All success operations logged with record counts
  • All tasks have retry configuration
  • ValueError used for non-retryable errors
  • ClientError re-raised for Prefect retry

Current Status

Yes Fully Compliant Modules (Priority 3 Complete)

All S3 modules now follow the standardized error handling strategy:

  • src/s3_utils.py Yes

    • All functions follow standard patterns
    • ClientError propagates for Prefect retry
    • S3 URIs included in all error messages
    • Comprehensive error handling tests (11 tests)
  • tasks/cleaning/utils.py Yes

    • Wrapper functions delegate to s3_utils
    • Error handling inherited from s3_utils
  • tasks/loading/utils.py Yes

    • Wrapper functions delegate to s3_utils
    • Error handling inherited from s3_utils
  • src/data_sources/base.py Yes

    • Updated to let ClientError propagate
    • Only catches non-retryable errors (ValueError, KeyError)
    • Removed broad exception catching (OSError, UnicodeEncodeError)
    • Added proper docstring documentation of exceptions

Yes Test Coverage

Comprehensive error handling tests added to tests/test_s3_utils.py:

  1. ClientError Propagation Tests (8 tests)

    • Read from non-existent bucket
    • Read from non-existent key
    • Write to non-existent bucket
    • Domain-specific functions (read_raw_csv, read_clean_csv, write_raw_csv, write_clean_csv)
  2. ValueError Tests (2 tests)

    • Invalid S3 URI scheme
    • Missing key in S3 URI
  3. Special Case Tests (1 test)

    • s3_object_exists returns False for 404 (doesn't raise)
    • s3_object_exists raises for other errors

Total: 23 tests passing (12 original + 11 error handling)

Implementation Summary

Changes Made (Priority 3)

  1. Task 3.1: Documented error handling strategy Yes
  2. Task 3.2: Standardized error handling in src/s3_utils.py Yes
    • Added S3 URI to all error messages
  3. Task 3.3: Standardized error handling in src/data_sources/base.py Yes
    • Removed broad exception catching
    • Let ClientError propagate
    • Only catch non-retryable errors
  4. Task 3.4: Added comprehensive error handling tests Yes
    • 11 new tests covering all error scenarios
  5. Task 3.5: Updated documentation with implementation status Yes

Benefits Achieved

  1. Consistent Error Handling: All S3 modules follow the same patterns
  2. Better Debugging: S3 URIs included in all error messages
  3. Proper Retry Behavior: ClientError propagates to Prefect for automatic retry
  4. Test Coverage: Comprehensive tests verify error handling works correctly
  5. Clear Documentation: Strategy document guides future development