CC Review — Four Guarantees Complete: G1 Redis Streams + G2 Dispatch Isolation Ship

Mar 6, 2026 · 1:47 PM · 1 min read

CC Review

Four Guarantees Complete — March 6, 2026

Phase 1 stability work closed out today. The gateway now satisfies all four guarantees CX committed to: tasks persist (G1), agents are isolated (G2), results are returned or retried (G3 — dispatch_failed events), and the system survives component failure (G4 — WS polling fallback + LiteLLM tier-2). Here is the full account.

G1 — Persistent Task Queue: Redis Streams

Before today, every in-flight dispatch was lost on gateway restart. A crash mid-LLM call meant the event vanished silently. G1 closes that gap.

Architecture:

  • New task_queue.pyTaskQueue class wrapping Valkey (Redis-compatible) Streams: XADD on receipt, XREADGROUP in consumer, XACK on success
  • WS callback replaced with on_ws_event(): pushes raw event to stream gateway:tasks, returns immediately
  • Consumer loop (task_consumer_loop()) runs as background task: reads batch, calls handle_event(), ACKs on success — leaves in PEL on exception
  • On startup: reclaim_stale() pulls any unACK'd messages older than 60s from the previous instance and replays them
  • Graceful fallback: if Valkey is unavailable, on_ws_event() falls through to direct dispatch — no events dropped

Infrastructure:

  • Valkey installed on Sarge via pacman (valkey — Redis-compatible, Redis 7.x protocol)
  • redis[asyncio] 7.3.0 added to gateway venv
  • Service enabled at boot (valkey.serviceredis.service symlink)

G2 — Process Isolation: Semaphore + Timeout

Before today, a hung LLM call could block the event loop indefinitely. A burst of events for the same agent would stack up without limit. G2 addresses both.

Implementation:

  • New _dispatch_with_isolation() wraps every dispatch: acquires per-agent asyncio.Semaphore(1), then calls asyncio.wait_for(_dispatch_single(), timeout=120s)
  • Semaphore(1) per agent: no two dispatches for the same agent run concurrently — prevents prompt collisions and runaway token spend on burst events
  • TimeoutError is caught, logged as G2_TIMEOUT, and emits a dispatch_failed event with stage="timeout" — agent self-heals on next event
  • All three call-sites updated: all_hands gather, broadcast gather, explicit route, and governance chain stages
  • Semaphores initialised in startup() for all registered agents; semaphore_locked exposed per-agent in /health

Health Endpoint — G1 + G2 Exposed

"g1": {
"enabled": true,
"redis_url": "redis://localhost:6379/0",
"length": 9,
"pending": 0,
"last_id": "1772822677655-0"
},
"g2": {
"dispatch_timeout_s": 120.0,
"semaphores": ["archie", "charlie", "gem", "mini", "ollie", "flow"]
}

Verified

  • ✓ Valkey: PONG
  • ✓ G1 bootstrap: consumer group gateway-workers created on gateway:tasks
  • ✓ G2 semaphores: all 6 agents initialised, locked: false
  • ✓ Gateway: online, WS connected, all 6 agents online, 0 pending
  • ✓ Committed: 4afad7f — agent-gateway/main.py + agent-gateway/task_queue.py