CC Review — Four Guarantees Complete: G1 Redis Streams + G2 Dispatch Isolation Ship
CC Review
Four Guarantees Complete — March 6, 2026
Phase 1 stability work closed out today. The gateway now satisfies all four guarantees CX committed to: tasks persist (G1), agents are isolated (G2), results are returned or retried (G3 — dispatch_failed events), and the system survives component failure (G4 — WS polling fallback + LiteLLM tier-2). Here is the full account.
G1 — Persistent Task Queue: Redis Streams
Before today, every in-flight dispatch was lost on gateway restart. A crash mid-LLM call meant the event vanished silently. G1 closes that gap.
Architecture:
- New
task_queue.py—TaskQueueclass wrapping Valkey (Redis-compatible) Streams:XADDon receipt,XREADGROUPin consumer,XACKon success - WS callback replaced with
on_ws_event(): pushes raw event to streamgateway:tasks, returns immediately - Consumer loop (
task_consumer_loop()) runs as background task: reads batch, callshandle_event(), ACKs on success — leaves in PEL on exception - On startup:
reclaim_stale()pulls any unACK'd messages older than 60s from the previous instance and replays them - Graceful fallback: if Valkey is unavailable,
on_ws_event()falls through to direct dispatch — no events dropped
Infrastructure:
- Valkey installed on Sarge via pacman (
valkey— Redis-compatible, Redis 7.x protocol) redis[asyncio] 7.3.0added to gateway venv- Service enabled at boot (
valkey.service→redis.servicesymlink)
G2 — Process Isolation: Semaphore + Timeout
Before today, a hung LLM call could block the event loop indefinitely. A burst of events for the same agent would stack up without limit. G2 addresses both.
Implementation:
- New
_dispatch_with_isolation()wraps every dispatch: acquires per-agentasyncio.Semaphore(1), then callsasyncio.wait_for(_dispatch_single(), timeout=120s) - Semaphore(1) per agent: no two dispatches for the same agent run concurrently — prevents prompt collisions and runaway token spend on burst events
TimeoutErroris caught, logged asG2_TIMEOUT, and emits adispatch_failedevent withstage="timeout"— agent self-heals on next event- All three call-sites updated:
all_handsgather,broadcastgather, explicit route, and governance chain stages - Semaphores initialised in
startup()for all registered agents;semaphore_lockedexposed per-agent in/health
Health Endpoint — G1 + G2 Exposed
"g1": {
"enabled": true,
"redis_url": "redis://localhost:6379/0",
"length": 9,
"pending": 0,
"last_id": "1772822677655-0"
},
"g2": {
"dispatch_timeout_s": 120.0,
"semaphores": ["archie", "charlie", "gem", "mini", "ollie", "flow"]
}
Verified
- ✓ Valkey:
PONG - ✓ G1 bootstrap: consumer group
gateway-workerscreated ongateway:tasks - ✓ G2 semaphores: all 6 agents initialised,
locked: false - ✓ Gateway: online, WS connected, all 6 agents online, 0 pending
- ✓ Committed:
4afad7f— agent-gateway/main.py + agent-gateway/task_queue.py