Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Queue Management

Overview

Both platforms queue inference requests when backend capacity is saturated, but the queue architectures are fundamentally different. Replicate uses Redis Streams as a durable, distributed queue between the API layer and Director workers — requests persist across process restarts and can be claimed by any Director instance. Workers AI uses bounded in-memory queues local to each constellation-server instance — requests exist only in the process that received them and are lost if it restarts.


Replicate: Redis Streams with Consumer Groups

Queue Naming

Every deployable gets its own Redis Stream. The queue name is derived from the deployable’s key prefix and kind (deployable_config.py:565-589):

DeployableKindKey PrefixQueue Name
deployment-predictiondp-input:prediction:dp-{hex}
function-predictionfp-input:prediction:fp-{hex}
version-predictionvp-input:prediction:vp-{hex}
version-trainingvt-input:training:vt-{hex}

Function and version predictions that use the unified pipeline cluster config share a single queue: input:prediction:fp-{0*32} (deployable_config.py:44-45).

MessageQueue

Director’s MessageQueue (director/redis/queue.go) wraps a Redis Streams consumer group. Each Director pod is a consumer in the "director" consumer group. The queue runs two goroutines via errgroup:

  • Fetcher: Waits on a request channel, calls XREAD (via the QueueClient.Read interface) with the configured block duration, and sends the result back on a response channel. The fetcher is the only goroutine that reads from Redis — GetMessage sends a request struct and waits for the response (queue.go:131-148).

  • Claimer: Runs every 2 seconds (claimInterval), calling XCLAIM on all unacked messages to maintain ownership. This prevents Redis from reassigning messages to other consumers in the group while the current Director is still processing them. After the fetcher stops (shutdown requested), the claimer continues until all messages are acked or the context is canceled (queue.go:150-191).

Messages are tracked in an unackedMessages map keyed by unique ID (stream name + message ID). When a Director worker finishes processing a prediction, it calls Ack on the message, which removes it from the map. If a Director pod crashes, its unacked messages remain in the consumer group’s pending entries list (PEL). The API sweeper reclaims these orphaned messages via XPENDING + XCLAIM after 30 seconds of idle time, marks the predictions as failed, and deletes the messages. See Queue Sweeping below.

Sharded Queues

Each deployable’s queue is sharded across multiple Redis streams (:s0, :s1, etc.). The ShardedQueueClient (queue.go:317-356) manages these shards, using "director" as the consumer group name across all of them.

Every ack pipelines XACK + XDEL — the message is both acknowledged in the consumer group and deleted from the stream in a single round-trip (queue.go:329-341).

preferSameStream

When preferSameStream is enabled, the MessageQueue remembers which stream shard it last read from and passes it as PreferStream to the next Read call (queue.go:246-252). This is a hotswap optimization — by preferring the same shard, the Director pod improves weight cache locality (the model weights are already loaded for predictions from that shard’s deployable).

The streamChanged flag on the message indicates when the Director switched to a different stream, which signals the worker loop that it may need to handle a model swap.

MultiMessageQueue (Blue/Green)

MultiMessageQueue (queue.go:73-77) wraps multiple Queue backends and rotates between them on each GetMessage call. When multiple backends are present, the block duration is hard-coded to 100ms to stay responsive regardless of which backend has messages (queue.go:300-308).

This supports blue/green Redis migration — during a migration, both the old and new Redis instances are active, and the MultiMessageQueue polls both. Messages carry an explicit reference to the queue they came from so that acks and claims go to the correct backend.

Queue Pruning (Autoscaler)

The autoscaler’s startQueuePruner (autoscaler.go:299-311) runs hourly, scanning every deployable’s queue for stuck messages. A message is “stuck” if it’s older than 24 hours (stuckTimeout) (prune.go:25).

The pruner checks two sources per queue (prune.go:118-148):

  • XPENDING: Messages that were delivered to a consumer but never acknowledged — these were likely picked up by a Director that crashed or got stuck during processing.
  • XRANGE: Messages still in the stream that were never delivered to any consumer — these were likely enqueued when no Director pods were running (e.g. setup failed and the deployment scaled to zero).

Stuck messages are deleted in chunks of 50 via pipelined XACK + XDEL (prune.go:151-161).

Queue Sweeping (API)

The API service runs its own sweeper (api/internal/sweeper/sweeper.go) that overlaps with the autoscaler’s pruner but operates differently:

  • Frequency: Every ~30 seconds (with ±50% jitter), vs the pruner’s hourly cycle.
  • Scope: Scans all streams matching each deployable kind prefix (deployment-prediction, function-prediction, hotswap-prediction, version-prediction, version-training).
  • Mechanism: Uses ClaimStaleMessages (XPENDING filtered by 30-second idle threshold, then XCLAIM) to take ownership of messages that a Director claimed but stopped processing — e.g. because the pod crashed. Then sends a failure update to the web service and deletes the message.
  • Purpose: Catches predictions where the Director that claimed them is gone. The sweeper marks them as failed so the user gets a response rather than waiting indefinitely. Messages that were never claimed by any Director (sitting unclaimed in the stream) are not visible to the sweeper — those are handled by the autoscaler’s pruner after 24 hours.

The sweeper also runs two deadline-related loops:

  • Cordon loop (~1 second): Queries the meta:deadlines sorted set for predictions whose deadline has passed within the last 10 seconds. Deadlines are stored as scores at enqueue time. Matched predictions are moved to a presorted garbage set and terminated with status aborted via TerminatePrediction. This handles predictions still sitting in the queue (or still running) when their time expires.
  • GC loop (~30 seconds): Processes the presorted garbage set, deleting the actual stream messages for predictions that were cordoned or otherwise terminated.

These three loops — sweep, cordon, GC — cover different failure modes. The sweep loop handles crashed Directors (idle PEL entries). The cordon loop enforces prediction deadlines regardless of Director state. The GC loop cleans up stream messages after either of the other two has terminated the prediction. The autoscaler’s hourly pruner is a slower safety net for messages that somehow survive all three.

Consumer Pruning (API)

A separate ConsumerPruner (api/internal/sweeper/consumer_pruner.go) removes stale consumers from consumer groups. Consumers that haven’t claimed a message in 24 hours (staleConsumerTimeout) are pruned. This cleans up after Director pods that were terminated without gracefully leaving the consumer group.


Workers AI: Bounded In-Memory Queues

ModelRequestQueue

ModelRequestQueue (queue.rs) is an in-memory queue per model per constellation-server instance. It’s created when an EndpointGroup is initialized and holds requests that couldn’t immediately acquire a permit on any endpoint.

The queue supports two algorithm variants, selectable per model via Config API:

  • Fifo (fifo_queue.rs): Simple VecDeque<(account_id, Sender)>. Requests are processed in arrival order. All accounts share a single queue.
  • Fair (fair_queue.rs): Per-account VecDeque<Sender> queues stored in a DashMap, with a round-robin VecDeque<account_id> that rotates through accounts. An account is added to the rotation on its first enqueue and removed when its queue empties.

Both variants retain only live senders — abandoned slots (where the requester timed out or disconnected) are cleaned up on each enqueue call via retain(|q| !q.is_closed()).

Capacity

Queue capacity is calculated from model config (service-discovery/lib.rs:79-94):

capacity = max(queue_config.capacity, colo_queue_size)
         × sum(endpoint.max_concurrent_requests)

The result is clamped to [0, 1000]. In practice, only one of queue_config.capacity or colo_queue_size is set — the max is taking the non-zero value.

For the Fair queue, capacity is enforced per account — each account can have up to capacity items in its individual queue. For the Fifo queue, capacity is a global limit on total queue size.

Permit System

The PermitManager (permits.rs) tracks concurrency per endpoint. Each endpoint has a concurrency_limit (from EndpointConfig.max_concurrent_requests) and a map of active permits keyed by request ID.

try_acquire_permit() is non-blocking — it checks active_permits() < concurrency_limit and either returns a Permit or None. Permits have release-on-drop semantics: when a Permit is dropped, it calls release() which removes the entry from the map and calls notify.notify_one() to wake the queue processor (permits.rs:376-383).

There are three permit types:

  • Permit: Assigned to a specific request ID. Released on drop (unless detached).
  • UnassignedPermit: Holds a concurrency slot without a request ID. Used by the queue processor to reserve capacity before matching to a queued request. Decrements unassigned_count on drop (permits.rs:308-315).
  • DetachedPermit: A permit that has been persisted elsewhere (e.g. Consul KV) and should NOT be released on drop. Created via permit.detach().

The Three Outcomes

EndpointGroup.find_best_available_endpoint (endpoint_group.rs:149-288) is the core dispatch function. It tries endpoints in priority order and produces one of three outcomes:

  1. Leased: A permit was acquired immediately on a healthy endpoint. The request proceeds to inference with no queuing.
  2. Queued: All endpoints are at capacity, but the queue has room. A oneshot::Receiver is returned — the caller awaits it and gets an endpoint+permit when the queue processor fulfills the request.
  3. NoSlotsAvailable: All endpoints are at capacity AND the queue is full. The request is rejected with an out-of-capacity error.

Background Processor

Each ModelRequestQueue spawns a long-running tokio task (queue.rs:252-264) that:

  1. Waits on a Notify signal (fired when a permit is released or config changes).
  2. Calls fulfill_next_queued() in a loop until it returns false.
  3. fulfill_next_queued() gets the next queued request (respecting the Fair/Fifo algorithm), tries to acquire an unassigned permit on a healthy endpoint, and if successful, sends the (Endpoint, UnassignedPermit) pair through the oneshot channel to the waiting request.

If no healthy endpoints exist when the processor runs, the entire queue is cleared — all waiting requests receive a NoLocalResourcesAvailable error (queue.rs:203-209).

Hot-Swappable Algorithm

When the queue algorithm changes (Fair↔Fifo) via config update, set_config (queue.rs:88-158):

  1. Creates a new queue implementation.
  2. Drains all pending requests from the old queue.
  3. Re-enqueues them into the new queue (skipping closed senders).
  4. Spawns a new processor task and aborts the old one.

Requests that can’t be re-enqueued (e.g. new queue is smaller) receive a NoLocalResourcesAvailable error.


Key Differences

AspectReplicateWorkers AI
Queue backingRedis Streams (durable, distributed)In-memory VecDeque (per-process, volatile)
ScopeOne stream per deployable, shared across all Director podsOne queue per model per constellation-server instance
DurabilityMessages survive process restarts and Redis failoversMessages lost on process restart
Consumer modelConsumer groups — any Director can claim any messageDirect dispatch — queue processor matches to local endpoints only
FairnessNone — all requests to a deployable are equalPer-account fair queuing (round-robin) or FIFO, configurable per model
Capacity controlUnbounded stream (pruned after 24h)Bounded: colo_queue_size × total_concurrency, clamped to 1000
BackpressureNone at queue level — requests accumulate until prunedImmediate rejection (NoSlotsAvailable) when queue is full
Stuck message handlingAutoscaler pruner (hourly, 24h threshold) + API sweeper (30s, claims and fails)Not needed — in-memory queue with sender liveness checks
Concurrency controlDirector processes one prediction at a time per worker goroutinePermit-based: PermitManager per endpoint with configurable concurrency_limit
Blue/green migrationMultiMessageQueue rotates across Redis backendsNot applicable — no external queue to migrate

The most significant architectural difference is durability vs bounded backpressure. Replicate’s Redis Streams are durable — a prediction enqueued during a Director outage will be picked up when a new pod starts. But this durability comes with unbounded queue growth and requires two separate cleanup mechanisms (pruner + sweeper) to handle stuck messages. Workers AI’s in-memory queues are volatile but provide immediate backpressure — when capacity is exhausted, the caller gets an error right away rather than waiting for a timeout or pruner cycle.