Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Request Lifecycle Management

Overview

As one might expect, a given prediction or inference request touches different components on each platform. The diagrams below show the (simplified) full path from client to inference server and back.

Replicate

graph TD
    Client([Client])
    Web[web]

    subgraph GPU cluster
        API[api]
        Redis[(redis)]
        Director[director]
        Cog[cog]
    end

    Client -->|HTTP| API
    Web -.->|playground| API
    API -->|LPUSH / XADD| Redis
    Redis -->|XREADGROUP| Director
    Director -->|HTTP| Cog
    Cog -->|response| Director
    Director -->|webhook| API
    API -->|poll / webhook| Client

Key points: the request is asynchronous by default. The client submits a prediction, the API enqueues it, and Director picks it up later. The client polls, receives a webhook when the result is ready, or opts into “blocking” mode at the per-request level. Streaming predictions respond over SSE but still enter through the same queue.

Workers AI

graph TD
    Client([Client])
    WCE[worker-constellation-entry]
    CE[constellation-entry]
    CS[constellation-server]

    subgraph GPU cluster
        GPU[GPU container]
    end

    Client -->|HTTP| WCE
    WCE -->|HTTP| CE
    CE -->|pipefitter / HTTP| CS
    CS -->|HTTP| GPU
    GPU -->|response| CS
    CS -->|response| CE
    CE -->|response| WCE
    WCE -->|response| Client

Key points: the request is synchronous. The client holds an open HTTP connection while the request flows through the stack to a GPU container and back. There is no persistent queue — requests wait in bounded in-memory queues or get rejected with a 429 if no capacity is available.


Replicate Request Processing

The Replicate request path has three major phases: API-side routing and enqueue, transient queuing in Redis, and Director-side dequeue and execution. Durable state lives in PostgreSQL (via web), not in the queue. The sections below cover each phase in detail.

Behavior varies by workload type (see Workload Types appendix). All workload types share the same core queue and worker infrastructure but differ in configuration.

Queue and Message Handling

Each GPU provider’s managed Kubernetes cluster runs its own Redis cluster (with sentinel for HA). Director uses Redis Streams with consumer groups for request queuing. Each deployable has its own stream — the API pushes prediction requests to the stream, and Director pods consume from it. Queue names combine a workload-type prefix (input:prediction:dp-, input:prediction:fp-, etc.) with the deployable’s key.

Queues use shuffle sharding by default (since October 2024) — each tenant is assigned a subset of stream shards, providing fairness by preventing one account’s batch from blocking others. Hotswap predictions enable PreferSameStream for weight cache locality.

See Queue Management for full details on queue internals: MessageQueue fetcher/claimer goroutines, sharded queues, blue/green migration via MultiMessageQueue, and stuck message cleanup.

Worker Loop

Each Director instance runs worker goroutines that poll the queue. The number of workers is controlled by DIRECTOR_CONCURRENT_PREDICTIONS (default: 1). This is set by cluster when the model’s Cog config specifies concurrency.max > 1 (cluster/pkg/kubernetes/deployable.go:1149-1153). Most models run with the default of 1 worker.

The worker loop (director/worker.go:134) continuously:

  1. Polls Redis Stream for messages using consumer groups
  2. Checks if prediction execution is allowed (spending validation, principal checks)
  3. Calculates effective deadline from prediction metadata, deployment lifetime, and timeout config
  4. Creates a Tracker instance for state management
  5. Executes prediction via Executor interface (which calls into Cog)
  6. Handles terminal states (success, failure, cancellation)
  7. Acknowledges message in Redis

The worker loop behavior is the same across all workload types. What differs is the job kind passed to the executor:

  • Predictions (deployment, version): DIRECTOR_JOB_KIND=prediction (default)
  • Trainings: DIRECTOR_JOB_KIND=training
  • Procedures (function predictions): DIRECTOR_JOB_KIND=procedure

Note: Internally, Director converts procedure to prediction when sending to Cog because the API doesn’t yet support the procedure value (director/worker.go:731-737).

Code references:

Tracker and State Transitions

The Tracker (director/tracker.go) manages state transitions and notifications. It defines four webhook event types:

  • start - Prediction begins execution
  • output - Intermediate outputs during execution
  • logs - Log messages during execution
  • completed - Prediction reaches terminal state

The Tracker maintains:

  • Prediction state (struct at director/tracker.go:36-69)
  • Subscriber functions that receive state updates
  • Pending webhook events (if notifications are suppressed temporarily)
  • Upload tracking for async file completions
  • Moderation state (if content moderation is enabled)

Webhooks are sent to the Replicate API at state transitions. The webhook sender uses a PreemptingExecutor (director/executor.go) that may drop intermediate webhooks if new updates arrive before the previous one is sent - this is intentional since the latest state supersedes earlier ones. Terminal webhooks are always delivered.

Code references:

Streaming

Director supports Server-Sent Events (SSE) for streaming outputs. Streaming is enabled when the API includes a stream_publish_url in the prediction’s internal metadata (cog/types.go:230). The StreamClient (director/streaming.go) publishes output events to this URL as they occur.

Concurrency Control

Director supports concurrent predictions within a single container via DIRECTOR_CONCURRENT_PREDICTIONS. See Concurrency Deep Dive for details on configuration and usage patterns.

Workers AI Request Processing

Routing Architecture

Workers AI requests flow through multiple components:

  1. worker-constellation-entry: Pipeline Worker (TypeScript) that receives API requests from SDK or REST API. Handles input validation, model lookup, and request formatting before forwarding to constellation-entry (apps/worker-constellation-entry/src/worker.ts).

  2. constellation-entry: Rust service running on every metal. Fetches model config from QS, selects target colos using routing algorithms, resolves constellation-server addresses, and forwards via pipefitter or HTTP (gitlab.cfdata.org/cloudflare/ai/constellation-entry).

  3. constellation-server: Colo-level load balancer (Rust). Manages permits and request queues per model, selects GPU container via service discovery, handles inference forwarding and cross-colo failover (gitlab.cfdata.org/cloudflare/ai/constellation-server).

  4. GPU container: Inference server running the model.

Wiki references:

constellation-entry Routing

constellation-entry fetches model configuration from Quicksilver (distributed KV store), which includes the list of colos where the model is deployed (src/model_config.rs). It then applies a routing algorithm to select target colos. Routing algorithms include:

  • Random: Random selection among available colos (default)
  • LatencySensitive: Prefer nearby colos, with step-based selection
  • ForcedColo: Override to a specific colo (for debugging/testing)
  • ForcedResourceFlow: Use resource-flow routing (src/resource_flow.rs)

Routing uses a step-based algorithm (StepBasedRouter) that generates a prioritized list of colos. The algorithm can be customized per-model via routing_params in the model config.

Once colos are selected, constellation-entry resolves constellation-server addresses via QS or DNS (src/cs_resolver.rs) and forwards the request via pipefitter or HTTP (src/pipefitter_transport.rs).

Code references:

Wiki references:

constellation-server Load Balancing

constellation-server runs in each colo and handles:

Request handling (src/main.rs, server/src/lib.rs):

  • Listens on Unix domain socket (for pipefitter) and TCP port (src/cli.rs:23,31)
  • Extracts routing metadata: model ID, account ID, fallback routing info
  • Tracks distributed tracing spans
  • Emits metrics to Ready Analytics

Endpoint load balancing and concurrency (server/src/endpoint_lb.rs):

Service discovery (service-discovery/src/lib.rs):

Inference forwarding (server/src/inference/mod.rs):

  • Regular HTTP requests: Buffers and forwards to GPU container with retry logic
  • WebSocket requests: Upgrades connection and relays bidirectionally (no retries)
  • Backend-specific handlers for Triton, TGI, TEI, and generic HTTP (PipeHttp — used for vLLM and others) in server/src/inference/

Failover and forwarding (server/src/colo_forwarding.rs):

  • If target returns error or is unavailable, may forward to different colo
  • Tracks inflight requests via headers (cf-ai-requests-inflight) to coordinate across instances
  • For pipefitter: Returns 500 with headers telling pipefitter to retry different colos
  • Handles graceful draining: continues accepting requests while failing health checks

Code references:

Wiki references:

Queuing and Backpressure

When GPU containers are busy, backpressure propagates through the stack:

  1. constellation-server: find_upstream() attempts to acquire a permit for a GPU endpoint. If all permits are taken, the request enters a bounded local colo queue (ModelRequestQueue). The queue is a future that resolves when a permit frees up (via tokio::sync::Notify). If the queue is also full, the request gets NoSlotsAvailableServerError::OutOfCapacity. OOC may also trigger forwarding to another constellation-server in the same colo (server/src/endpoint_lb/endpoint_group.rs:200-277).

  2. constellation-entry: Receives OOC from constellation-server (error codes 3033, 3040, 3047, 3048). Has a retry loop with separate budgets for errors (up to 3) and OOC (varies by request size). For small requests, pipefitter handles cross-colo forwarding. For large requests (>256 KiB), constellation-entry retries across target colos itself. All OOC variants are mapped to generic 3040 before returning to the client (proxy_server.rs:1509-1600, errors.rs:62-67).

  3. worker-constellation-entry: Has an optional retry loop on 429/424 responses, but outOfCapacityRetries defaults to 0 (configurable per-model via sdkConfig). By default, OOC goes straight to the client (ai/session.ts:101,137-141).

  4. Client: Receives HTTP 429 with "Capacity temporarily exceeded, please try again." (internal code 3040).

There is no persistent queue in the Workers AI stack. Backpressure is handled through permit-based admission control, bounded in-memory queues, cross-colo retry, and ultimately rejection to the client. This contrasts with Replicate’s Redis Streams-based queue, where requests persist until a Director instance dequeues them.

State and Observability

Workers AI observability has two main channels from constellation-server:

Ready Analytics (ready-analytics/src/): Per-inference events sent via logfwdr (Unix socket). Each InferenceEvent includes:

  • Request timing: request_time_ms, inference_time_ms, queuing_time_ms, time_to_first_token
  • Billing: neurons, cost metric values
  • Routing: colo_id, source_colo_id, colo_path, upstream_address
  • Queue state: local_queue_size
  • Flags: streamed, beta, external endpoint, queued in colo

See ready-analytics/src/inference_event.rs for full field list.

Prometheus metrics (metrics/src/lib.rs): Local metrics scraped by monitoring. Includes:

  • requests (by status), errors (by internal/HTTP code)
  • total_permits, mean_used_permits, mean_queue_size, full_fraction (per model)
  • infer_per_backend, infer_per_health_state
  • forward_failures (per target colo)
  • backoff_removed_endpoints (endpoints in backoff state)

Note: “Workers Analytics” in the public docs refers to analytics for Workers code execution, not GPU inference. GPU inference observability flows through Ready Analytics.

Streaming

constellation-server supports two streaming mechanisms:

HTTP streaming (server/src/inference/pipe_http.rs): For PipeHttp backends, the response body can be InferenceBody::Streaming (line 222-230), which passes the Incoming body through without buffering via chunked transfer encoding. The permit is held until the connection completes (line 396-397 for HTTP/1, line 472 for HTTP/2). If the backend sends SSE-formatted data (e.g., TGI’s streaming responses), constellation-server passes it through transparently without parsing.

WebSocket (server/src/inference/mod.rs:180-400): Bidirectional WebSocket connections are handled by handle_ws_inference_request. After the HTTP upgrade, handle_websocket_request relays messages between client and inference server. A max connection duration is enforced (max_connection_duration_s, line 332-338).

Loopback SSE (server/src/loopback/): Separate from client-facing streaming. constellation-server maintains SSE connections to inference servers that support async batch processing. These connections receive completion events (InferenceSuccess, InferenceError, WebsocketFinished, GpuFree) which trigger permit release and async queue acknowledgment. See loopback/sse.rs for the SSE client implementation.

Concurrency Control

constellation-server uses permit-based concurrency control per model via EndpointLoadBalancer (server/src/endpoint_lb.rs). Concurrency limits are configured in Config API. When local queuing is enabled, requests wait in an explicit ModelRequestQueue (FIFO or Fair algorithm) until a permit becomes available. Different models can have different concurrency limits on the same server.


Key Differences

AspectReplicateWorkers AI
Request modelAsynchronous by default (poll/webhook); sync opt-in via Prefer: waitSynchronous by default; async opt-in via queueRequest
Queue persistenceRedis Streams — requests survive component restartsNo persistent queue — bounded in-memory queues, rejection on overflow
BackpressureQueue absorbs bursts; Director dequeues when readyPermit-based admission → local queue → cross-colo retry → 429 to client
State managementDirector Tracker with webhook notifications (start, output, logs, completed)Stateless request-response; observability via Ready Analytics events
StreamingSSE via stream_publish_url; Director publishes output eventsHTTP chunked transfer passthrough; WebSocket relay; loopback SSE for async batch
RoutingAPI routes to correct GPU cluster’s Redis; Director pulls from local queueconstellation-entry selects colos via routing algorithms; constellation-server selects GPU endpoint
ConcurrencyDIRECTOR_CONCURRENT_PREDICTIONS per container (default 1)Permit-based per model in constellation-server; FIFO or Fair queue algorithm
FairnessPer-tenant shuffle sharding at queue levelPer-account round-robin in fair queue; per-account rate limiting
TransportHTTP between Director and CogPipefitter (small requests) or HTTP (>256 KiB) between constellation-entry and constellation-server
Failure handlingWebhook terminal states (failed/canceled); message ACK in RedisError codes propagated through stack; OOC triggers cross-colo retry