Back arrow Back to Home

Async Task Processing with Celery

🥬 Celery 💾 Redis 🐍 Python ⚡ FastAPI

Implementing robust background job processing for an order management system handling 5,000+ daily transactions with guaranteed delivery, automatic retries, and real-time monitoring.

When Request Handlers Do Too Much

The clients platform's order API was doing too much work synchronously. Creating an order triggered: SAP ERP sync, inventory reservation, notification emails, PDF invoice generation, analytics tracking, and audit logging — all within the HTTP request.

Response times averaged 8-15 seconds per order. Under load, timeouts increased. If any downstream service failed, the entire order creation failed. Users were abandoning orders due to perceived slowness, and operations team was getting overwhelmed with support tickets.

Synchronous Processing Problem

Request 0ms SAP Sync 2000ms Inventory 1500ms Emails 3000ms PDF Gen 2500ms Analytics 500ms Audit 500ms Response 10,000ms ⚠️ User blocked for 10+ seconds waiting for response
0s
Avg Response Time
Order creation
0%
Timeout Rate
Failed orders
0%
Cart Abandonment
Due to slowness
0
Daily Tickets
Support burden

Async-First Design with Celery

The solution was to acknowledge orders immediately and process everything else asynchronously. The API validates the order, persists it with "pending" status, and returns within 200ms. Background workers handle all downstream processing.

Async Processing Architecture

Fast Path (~200ms) API Request FastAPI Validate 50ms Save Order 100ms Queue Tasks 50ms Response ✓ 200ms Task Queue Redis (Broker) process_order, sync_sap, send_email, ... Celery Worker Pool SAP Worker Prefetch: 2 Email Worker Prefetch: 10 PDF Worker Prefetch: 4 Analytics Prefetch: 20 General Prefetch: 8 Redis Result Backend Task Status
Celery Celery 5.x Redis Redis Cluster FastAPI FastAPI Python Python 3.11 Flower Celery Beat

Key Implementation Details

Detail 01

Dedicated Queues & Workers

Different task types have different characteristics: SAP sync is slow but critical, emails are fast but high-volume, PDF generation is CPU-intensive. We created dedicated queues and worker pools with tuned concurrency and prefetch settings for each type.
Detail 02

Automatic Retry with Backoff

External service calls fail intermittently. Tasks retry automatically with exponential backoff: 10s, 30s, 90s, 270s. After max retries, tasks move to a dead letter queue for manual review. Critical tasks (like SAP sync) alert on-call when they reach DLQ.
Detail 03

Task Chaining & Workflows

Complex order processing uses Celery chains and chords. The order workflow: validate → reserve_inventory → sync_to_sap → (send_email | generate_pdf | update_analytics). Parallel steps run concurrently, sequential steps respect dependencies.
Detail 04

Idempotent Task Design

Tasks may execute multiple times due to retries, worker restarts, or network issues. Every task is idempotent: calling it twice with the same arguments produces the same result. We use database upserts and idempotency keys for external API calls.
Detail 05

Real-Time Monitoring

Flower provides real-time visibility into worker status, queue depths, and task execution. Custom Prometheus metrics track task success rates, execution times, and queue latency. Alerts fire when queue depth grows or error rates spike.
tasks/order_tasks.py
from celery import shared_task, chain, chord
from celery.exceptions import MaxRetriesExceededError
import structlog

logger = structlog.get_logger()

@shared_task(
    bind=True,
    queue="sap",
    max_retries=5,
    autoretry_for=(SAPConnectionError, SAPTimeoutError),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True,
)
def sync_order_to_sap(self, order_id: str):
    """Sync order to SAP ERP with automatic retry and idempotency."""
    log = logger.bind(order_id=order_id, task_id=self.request.id)
    
    try:
        # Idempotency check
        if sap_service.order_exists(order_id):
            log.info("Order already synced to SAP, skipping")
            return {"status": "already_synced"}
        
        order = order_repository.get(order_id)
        result = sap_service.create_order(order)
        
        # Update status
        order.sap_order_number = result.order_number
        order.sap_synced_at = datetime.utcnow()
        order_repository.save(order)
        
        log.info("Order synced to SAP", sap_order=result.order_number)
        return {"status": "synced", "sap_order": result.order_number}
        
    except MaxRetriesExceededError:
        log.error("Max retries exceeded for SAP sync")
        alert_ops("SAP sync failed permanently", order_id=order_id)
        raise
tasks/workflows.py
from celery import chain, chord, group
from tasks.order_tasks import (
    validate_order, reserve_inventory, sync_order_to_sap,
    send_confirmation_email, generate_invoice_pdf, update_analytics
)

def process_order_workflow(order_id: str):
    """
    Order processing workflow using Celery primitives.
    
    Flow:
    1. Validate order (must complete first)
    2. Reserve inventory (depends on validation)
    3. Sync to SAP (depends on inventory)  
    4. Parallel: email + PDF + analytics (after SAP sync)
    """
    workflow = chain(
        # Sequential dependencies
        validate_order.s(order_id),
        reserve_inventory.s(),
        sync_order_to_sap.s(),
        
        # Parallel processing after sync
        chord(
            group(
                send_confirmation_email.s(),
                generate_invoice_pdf.s(),
                update_analytics.s(),
            ),
            finalize_order.s(order_id)
        )
    )
    
    # Execute workflow asynchronously
    return workflow.apply_async()


# API endpoint stays fast
@router.post("/orders")
async def create_order(order: OrderCreate):
    # Quick validation and persist
    order_record = await order_service.create(order)
    
    # Queue background processing
    process_order_workflow.delay(order_record.id)
    
    # Return immediately (~200ms)
    return {"order_id": order_record.id, "status": "processing"}

Performance & Reliability Improvements

The async architecture transformed the order experience. Users get instant feedback, system reliability improved dramatically, and the operations team can finally focus on strategic work instead of firefighting.

0ms
API Response Time
↓ 98% faster
0%
Task Success Rate
With retries
0K
Daily Orders
Zero timeouts
0
Daily Tickets
↓ 96% reduction
Metric Synchronous Async (Celery) Impact
Order API Response 12,000 ms avg 200 ms avg 60x faster
Timeout Rate 23% 0% Eliminated
Cart Abandonment 15% 3% 80% reduction
Failed Transactions 8% permanent 0.1% after retry 80x improvement
Throughput Capacity 50 orders/min 500+ orders/min 10x capacity

Lessons Learned

  • Acknowledge Fast, Process Later — Users don't need to wait for background processing. Return immediately with a status, then process asynchronously. The UX improvement is dramatic.
  • Design for Failure — External calls fail. Local services crash. Network partitions happen. Build retry logic, dead letter queues, and alerting into every task.
  • Monitor Queue Health — Queue depth, processing time, and worker utilization tell you when to scale. Set up alerts before problems become outages.
  • Tune for Task Characteristics — CPU-bound tasks (PDF generation) need different concurrency than I/O-bound tasks (API calls). One size does not fit all.
  • Idempotency Is Non-Negotiable — Tasks will be retried. Workers will crash mid-execution. Design every task to be safely re-executable.

Need to scale your processing?

I help teams design robust async architectures that handle any workload.

Let's Talk