Error Handling Strategy for S3 Operations
Overview
This document defines the standardized error handling approach for all S3 operations in the blog_data pipeline.
Principles
- Fail Fast: Let errors propagate to Prefect for proper retry handling
- Informative Logging: Log errors with context before re-raising
- Consistent Patterns: Use the same error handling pattern across all S3 modules
- Prefect Integration: Leverage Prefect's built-in retry mechanisms
Error Categories
1. S3 Client Errors (ClientError)
When: AWS S3 API calls fail (network issues, permissions, bucket not found, etc.)
Handling:
- Log error with full context (bucket, key, operation)
- Include
exc_info=Truefor stack trace - Re-raise to allow Prefect retry logic
Example:
try:
client.put_object(Bucket=bucket, Key=key, Body=data)
except ClientError as e:
logger.error("Failed to write to S3 s3://%s/%s: %s", bucket, key, e, exc_info=True)
raise
2. Data Validation Errors (ValueError)
When: Invalid data format, missing required fields, foreign key violations
Handling:
- Log error with specific validation failure details
- Raise ValueError with descriptive message
- Do NOT retry (data issue, not transient)
Example:
if invalid_records:
logger.error("Invalid foreign keys in %s: %s", entity_name, invalid_records)
raise ValueError(f"Foreign key validation failed for {entity_name}")
3. Configuration Errors (ValueError)
When: Missing or invalid configuration (S3 bucket names, Neo4j URI, etc.)
Handling:
- Raise ValueError immediately with clear message
- Do NOT retry (configuration issue)
Example:
if not config.neo4j_uri:
raise ValueError("NEO4J_URI is not configured")
4. File Not Found (404)
When: S3 object doesn't exist
Handling:
- For existence checks: Return False (don't raise)
- For required files: Log error and re-raise
- Use
s3_object_exists()for explicit checks
Example:
try:
client.head_object(Bucket=bucket, Key=key)
return True
except ClientError as e:
if e.response["Error"]["Code"] == "404":
return False
raise # Re-raise other errors
Standard Error Handling Patterns
Pattern 1: S3 Read Operations
def read_operation(bucket: str, key: str) -> pd.DataFrame:
"""Read data from S3."""
logger = _get_logger()
try:
client = get_s3_client()
response = client.get_object(Bucket=bucket, Key=key)
# ... process data ...
logger.info("Read %d records from s3://%s/%s", len(df), bucket, key)
return df
except ClientError as e:
logger.error("Failed to read from S3 s3://%s/%s: %s", bucket, key, e, exc_info=True)
raise
Pattern 2: S3 Write Operations
def write_operation(df: pd.DataFrame, bucket: str, key: str) -> str:
"""Write data to S3."""
logger = _get_logger()
try:
# ... prepare data ...
client = get_s3_client()
client.put_object(Bucket=bucket, Key=key, Body=data)
s3_uri = f"s3://{bucket}/{key}"
logger.info("Wrote %d records to %s", len(df), s3_uri)
return s3_uri
except ClientError as e:
logger.error("Failed to write to S3 s3://%s/%s: %s", bucket, key, e, exc_info=True)
raise
Pattern 3: Domain-Specific Operations
def domain_operation(filename: str, entity_name: Optional[str] = None) -> pd.DataFrame:
"""Domain-specific S3 operation."""
from src.config import get_config
config = get_config()
logger = _get_logger()
try:
df = read_csv_from_s3(config.bucket, filename)
if entity_name:
logger.info("Loaded %s (%d records)", entity_name, len(df))
return df
except ClientError as e:
if entity_name:
logger.error("Failed to load %s: %s", entity_name, e, exc_info=True)
raise
Logging Standards
Log Levels
- INFO: Successful operations with record counts
- WARNING: Non-fatal issues (duplicates found, missing optional data)
- ERROR: Failed operations that will be retried or cause task failure
Log Message Format
Success:
logger.info("Loaded %s from %s bucket (%d records)", entity_name, bucket_type, count)
logger.info("Saved %s to %s bucket (%d records)", entity_name, bucket_type, count)
Errors:
logger.error("Failed to load %s from %s bucket: %s", entity_name, bucket_type, error, exc_info=True)
logger.error("Failed to save %s to %s bucket: %s", entity_name, bucket_type, error, exc_info=True)
Warnings:
logger.warning("Found %d duplicate %s values in %s", count, column, entity_name)
logger.warning("No records to save for %s", entity_name)
Prefect Integration
Task Retries
All S3 tasks should use Prefect's retry mechanism:
@task(name="task-name", retries=2, retry_delay_seconds=10)
def my_task():
# S3 operations that may fail transiently
pass
Error Propagation
- Let
ClientErrorpropagate to Prefect - Prefect will retry based on task configuration
- After max retries, Prefect marks task as failed
Non-Retryable Errors
For errors that shouldn't be retried (data validation, configuration):
- Raise
ValueErrorinstead ofClientError - Prefect will fail immediately without retries
Implementation Checklist
- All S3 read operations follow Pattern 1
- All S3 write operations follow Pattern 2
- All domain operations follow Pattern 3
- All errors logged with
exc_info=True - All success operations logged with record counts
- All tasks have retry configuration
- ValueError used for non-retryable errors
- ClientError re-raised for Prefect retry
Current Status
Yes Fully Compliant Modules (Priority 3 Complete)
All S3 modules now follow the standardized error handling strategy:
-
src/s3_utils.pyYes- All functions follow standard patterns
- ClientError propagates for Prefect retry
- S3 URIs included in all error messages
- Comprehensive error handling tests (11 tests)
-
tasks/cleaning/utils.pyYes- Wrapper functions delegate to s3_utils
- Error handling inherited from s3_utils
-
tasks/loading/utils.pyYes- Wrapper functions delegate to s3_utils
- Error handling inherited from s3_utils
-
src/data_sources/base.pyYes- Updated to let ClientError propagate
- Only catches non-retryable errors (ValueError, KeyError)
- Removed broad exception catching (OSError, UnicodeEncodeError)
- Added proper docstring documentation of exceptions
Yes Test Coverage
Comprehensive error handling tests added to tests/test_s3_utils.py:
-
ClientError Propagation Tests (8 tests)
- Read from non-existent bucket
- Read from non-existent key
- Write to non-existent bucket
- Domain-specific functions (read_raw_csv, read_clean_csv, write_raw_csv, write_clean_csv)
-
ValueError Tests (2 tests)
- Invalid S3 URI scheme
- Missing key in S3 URI
-
Special Case Tests (1 test)
- s3_object_exists returns False for 404 (doesn't raise)
- s3_object_exists raises for other errors
Total: 23 tests passing (12 original + 11 error handling)
Implementation Summary
Changes Made (Priority 3)
- Task 3.1: Documented error handling strategy Yes
- Task 3.2: Standardized error handling in
src/s3_utils.pyYes- Added S3 URI to all error messages
- Task 3.3: Standardized error handling in
src/data_sources/base.pyYes- Removed broad exception catching
- Let ClientError propagate
- Only catch non-retryable errors
- Task 3.4: Added comprehensive error handling tests Yes
- 11 new tests covering all error scenarios
- Task 3.5: Updated documentation with implementation status Yes
Benefits Achieved
- Consistent Error Handling: All S3 modules follow the same patterns
- Better Debugging: S3 URIs included in all error messages
- Proper Retry Behavior: ClientError propagates to Prefect for automatic retry
- Test Coverage: Comprehensive tests verify error handling works correctly
- Clear Documentation: Strategy document guides future development