Queue Management
Overview
Both platforms queue inference requests when backend capacity is saturated, but the queue architectures are fundamentally different. Replicate uses Redis Streams as a durable, distributed queue between the API layer and Director workers — requests persist across process restarts and can be claimed by any Director instance. Workers AI uses bounded in-memory queues local to each constellation-server instance — requests exist only in the process that received them and are lost if it restarts.
Replicate: Redis Streams with Consumer Groups
Queue Naming
Every deployable gets its own Redis Stream. The queue name is derived
from the deployable’s key prefix and kind
(deployable_config.py:565-589):
| DeployableKind | Key Prefix | Queue Name |
|---|---|---|
deployment-prediction | dp- | input:prediction:dp-{hex} |
function-prediction | fp- | input:prediction:fp-{hex} |
version-prediction | vp- | input:prediction:vp-{hex} |
version-training | vt- | input:training:vt-{hex} |
Function and version predictions that use the unified pipeline
cluster config share a single queue: input:prediction:fp-{0*32}
(deployable_config.py:44-45).
MessageQueue
Director’s MessageQueue
(director/redis/queue.go) wraps a Redis
Streams consumer group. Each Director pod is a consumer in the
"director" consumer group. The queue runs two goroutines via
errgroup:
-
Fetcher: Waits on a request channel, calls
XREAD(via theQueueClient.Readinterface) with the configured block duration, and sends the result back on a response channel. The fetcher is the only goroutine that reads from Redis —GetMessagesends a request struct and waits for the response (queue.go:131-148). -
Claimer: Runs every 2 seconds (
claimInterval), callingXCLAIMon all unacked messages to maintain ownership. This prevents Redis from reassigning messages to other consumers in the group while the current Director is still processing them. After the fetcher stops (shutdown requested), the claimer continues until all messages are acked or the context is canceled (queue.go:150-191).
Messages are tracked in an unackedMessages map keyed by unique ID
(stream name + message ID). When a Director worker finishes
processing a prediction, it calls Ack on the message, which
removes it from the map. If a Director pod crashes, its unacked
messages remain in the consumer group’s pending entries list (PEL).
The API sweeper reclaims these orphaned messages via XPENDING +
XCLAIM after 30 seconds of idle time, marks the predictions as
failed, and deletes the messages. See
Queue Sweeping below.
Sharded Queues
Each deployable’s queue is sharded across multiple Redis streams
(:s0, :s1, etc.). The ShardedQueueClient
(queue.go:317-356) manages these shards,
using "director" as the consumer group name across all of them.
Every ack pipelines XACK + XDEL — the message is both
acknowledged in the consumer group and deleted from the stream in
a single round-trip (queue.go:329-341).
preferSameStream
When preferSameStream is enabled, the MessageQueue remembers
which stream shard it last read from and passes it as
PreferStream to the next Read call
(queue.go:246-252). This is a hotswap
optimization — by preferring the same shard, the Director pod
improves weight cache locality (the model weights are already
loaded for predictions from that shard’s deployable).
The streamChanged flag on the message indicates when the Director
switched to a different stream, which signals the worker loop that
it may need to handle a model swap.
MultiMessageQueue (Blue/Green)
MultiMessageQueue
(queue.go:73-77) wraps multiple Queue
backends and rotates between them on each GetMessage call. When
multiple backends are present, the block duration is hard-coded to
100ms to stay responsive regardless of which backend has messages
(queue.go:300-308).
This supports blue/green Redis migration — during a migration, both
the old and new Redis instances are active, and the
MultiMessageQueue polls both. Messages carry an explicit
reference to the queue they came from so that acks and claims go to
the correct backend.
Queue Pruning (Autoscaler)
The autoscaler’s startQueuePruner
(autoscaler.go:299-311) runs hourly, scanning
every deployable’s queue for stuck messages. A message is
“stuck” if it’s older than 24 hours (stuckTimeout)
(prune.go:25).
The pruner checks two sources per queue
(prune.go:118-148):
XPENDING: Messages that were delivered to a consumer but never acknowledged — these were likely picked up by a Director that crashed or got stuck during processing.XRANGE: Messages still in the stream that were never delivered to any consumer — these were likely enqueued when no Director pods were running (e.g. setup failed and the deployment scaled to zero).
Stuck messages are deleted in chunks of 50 via pipelined XACK +
XDEL (prune.go:151-161).
Queue Sweeping (API)
The API service runs its own sweeper
(api/internal/sweeper/sweeper.go) that
overlaps with the autoscaler’s pruner but operates differently:
- Frequency: Every ~30 seconds (with ±50% jitter), vs the pruner’s hourly cycle.
- Scope: Scans all streams matching each deployable kind prefix
(
deployment-prediction,function-prediction,hotswap-prediction,version-prediction,version-training). - Mechanism: Uses
ClaimStaleMessages(XPENDINGfiltered by 30-second idle threshold, thenXCLAIM) to take ownership of messages that a Director claimed but stopped processing — e.g. because the pod crashed. Then sends a failure update to the web service and deletes the message. - Purpose: Catches predictions where the Director that claimed them is gone. The sweeper marks them as failed so the user gets a response rather than waiting indefinitely. Messages that were never claimed by any Director (sitting unclaimed in the stream) are not visible to the sweeper — those are handled by the autoscaler’s pruner after 24 hours.
The sweeper also runs two deadline-related loops:
- Cordon loop (~1 second): Queries the
meta:deadlinessorted set for predictions whose deadline has passed within the last 10 seconds. Deadlines are stored as scores at enqueue time. Matched predictions are moved to a presorted garbage set and terminated with statusabortedviaTerminatePrediction. This handles predictions still sitting in the queue (or still running) when their time expires. - GC loop (~30 seconds): Processes the presorted garbage set, deleting the actual stream messages for predictions that were cordoned or otherwise terminated.
These three loops — sweep, cordon, GC — cover different failure modes. The sweep loop handles crashed Directors (idle PEL entries). The cordon loop enforces prediction deadlines regardless of Director state. The GC loop cleans up stream messages after either of the other two has terminated the prediction. The autoscaler’s hourly pruner is a slower safety net for messages that somehow survive all three.
Consumer Pruning (API)
A separate ConsumerPruner
(api/internal/sweeper/consumer_pruner.go)
removes stale consumers from consumer groups. Consumers that
haven’t claimed a message in 24 hours (staleConsumerTimeout) are
pruned. This cleans up after Director pods that were terminated
without gracefully leaving the consumer group.
Workers AI: Bounded In-Memory Queues
ModelRequestQueue
ModelRequestQueue
(queue.rs) is an in-memory queue per model per
constellation-server instance. It’s created when an EndpointGroup
is initialized and holds requests that couldn’t immediately acquire
a permit on any endpoint.
The queue supports two algorithm variants, selectable per model via Config API:
- Fifo (
fifo_queue.rs): SimpleVecDeque<(account_id, Sender)>. Requests are processed in arrival order. All accounts share a single queue. - Fair (
fair_queue.rs): Per-accountVecDeque<Sender>queues stored in aDashMap, with a round-robinVecDeque<account_id>that rotates through accounts. An account is added to the rotation on its first enqueue and removed when its queue empties.
Both variants retain only live senders — abandoned slots (where the
requester timed out or disconnected) are cleaned up on each
enqueue call via retain(|q| !q.is_closed()).
Capacity
Queue capacity is calculated from model config
(service-discovery/lib.rs:79-94):
capacity = max(queue_config.capacity, colo_queue_size)
× sum(endpoint.max_concurrent_requests)
The result is clamped to [0, 1000]. In practice, only one of
queue_config.capacity or colo_queue_size is set — the max is
taking the non-zero value.
For the Fair queue, capacity is enforced per account — each
account can have up to capacity items in its individual queue.
For the Fifo queue, capacity is a global limit on total queue size.
Permit System
The PermitManager
(permits.rs) tracks concurrency per endpoint. Each
endpoint has a concurrency_limit (from
EndpointConfig.max_concurrent_requests) and a map of active
permits keyed by request ID.
try_acquire_permit() is non-blocking — it checks
active_permits() < concurrency_limit and either returns a
Permit or None. Permits have release-on-drop semantics: when a
Permit is dropped, it calls release() which removes the entry
from the map and calls notify.notify_one() to wake the queue
processor (permits.rs:376-383).
There are three permit types:
Permit: Assigned to a specific request ID. Released on drop (unless detached).UnassignedPermit: Holds a concurrency slot without a request ID. Used by the queue processor to reserve capacity before matching to a queued request. Decrementsunassigned_counton drop (permits.rs:308-315).DetachedPermit: A permit that has been persisted elsewhere (e.g. Consul KV) and should NOT be released on drop. Created viapermit.detach().
The Three Outcomes
EndpointGroup.find_best_available_endpoint
(endpoint_group.rs:149-288) is the core
dispatch function. It tries endpoints in priority order and
produces one of three outcomes:
- Leased: A permit was acquired immediately on a healthy endpoint. The request proceeds to inference with no queuing.
- Queued: All endpoints are at capacity, but the queue has
room. A
oneshot::Receiveris returned — the caller awaits it and gets an endpoint+permit when the queue processor fulfills the request. - NoSlotsAvailable: All endpoints are at capacity AND the queue is full. The request is rejected with an out-of-capacity error.
Background Processor
Each ModelRequestQueue spawns a long-running tokio task
(queue.rs:252-264) that:
- Waits on a
Notifysignal (fired when a permit is released or config changes). - Calls
fulfill_next_queued()in a loop until it returns false. fulfill_next_queued()gets the next queued request (respecting the Fair/Fifo algorithm), tries to acquire an unassigned permit on a healthy endpoint, and if successful, sends the(Endpoint, UnassignedPermit)pair through the oneshot channel to the waiting request.
If no healthy endpoints exist when the processor runs, the entire
queue is cleared — all waiting requests receive a
NoLocalResourcesAvailable error
(queue.rs:203-209).
Hot-Swappable Algorithm
When the queue algorithm changes (Fair↔Fifo) via config update,
set_config (queue.rs:88-158):
- Creates a new queue implementation.
- Drains all pending requests from the old queue.
- Re-enqueues them into the new queue (skipping closed senders).
- Spawns a new processor task and aborts the old one.
Requests that can’t be re-enqueued (e.g. new queue is smaller)
receive a NoLocalResourcesAvailable error.
Key Differences
| Aspect | Replicate | Workers AI |
|---|---|---|
| Queue backing | Redis Streams (durable, distributed) | In-memory VecDeque (per-process, volatile) |
| Scope | One stream per deployable, shared across all Director pods | One queue per model per constellation-server instance |
| Durability | Messages survive process restarts and Redis failovers | Messages lost on process restart |
| Consumer model | Consumer groups — any Director can claim any message | Direct dispatch — queue processor matches to local endpoints only |
| Fairness | None — all requests to a deployable are equal | Per-account fair queuing (round-robin) or FIFO, configurable per model |
| Capacity control | Unbounded stream (pruned after 24h) | Bounded: colo_queue_size × total_concurrency, clamped to 1000 |
| Backpressure | None at queue level — requests accumulate until pruned | Immediate rejection (NoSlotsAvailable) when queue is full |
| Stuck message handling | Autoscaler pruner (hourly, 24h threshold) + API sweeper (30s, claims and fails) | Not needed — in-memory queue with sender liveness checks |
| Concurrency control | Director processes one prediction at a time per worker goroutine | Permit-based: PermitManager per endpoint with configurable concurrency_limit |
| Blue/green migration | MultiMessageQueue rotates across Redis backends | Not applicable — no external queue to migrate |
The most significant architectural difference is durability vs bounded backpressure. Replicate’s Redis Streams are durable — a prediction enqueued during a Director outage will be picked up when a new pod starts. But this durability comes with unbounded queue growth and requires two separate cleanup mechanisms (pruner + sweeper) to handle stuck messages. Workers AI’s in-memory queues are volatile but provide immediate backpressure — when capacity is exhausted, the caller gets an error right away rather than waiting for a timeout or pruner cycle.