Durable streams
Durable streams are append-only byte logs addressed by URL, hosted by Electric SQL. All message content lives in streams — messages never touch Postgres. Each room record stores a durable_stream_id in its content pointing to its backing stream.
Protocol
HTTP verbs on a path:
PUT /stream/{path}— create a streamPOST /stream/{path}— append (returnsStream-Next-Offsetheader)GET /stream/{path}?offset=X— read from position XGET /stream/{path}?offset=X&live=long-poll— tail in real-time
Offsets are opaque and monotonic, client-tracked. Server is stateless per-connection. Historical reads are immutable byte ranges — CDN-cacheable. Live tail uses long-polling or SSE with ~60s reconnect cycles. Offset -1 means full history replay.
Client package: @durable-streams/client. Typed fetch wrapper with automatic reconnection, backoff, SSE resilience fallback. Also includes IdempotentProducer with epoch fencing and exactly-once writes.
Two API layers
Consumer layer — @arbe/core/client exports postMessage() and tailStream(). These go through the app’s permission-checked proxy at /api/streams/{scope_id}. The caller passes a scope ID (room UUID) and never sees the backing stream URL or secret. Use this from UI, bots, CLI, MCP tools.
Infrastructure layer — @arbe/streams/client exports createDurableStreamClient() with six methods: createStream, deleteStream, appendToStream, readStream, readStreamFrom, followStream. Talks to the raw stream service with a Bearer secret — no permission checks, no scope resolution. Use this from route handlers and the bot DO.
readStream fetches all items (optionally limited to the last N) — used by the push path and UI. readStreamFrom(path, offset) reads from a saved offset and returns { items, nextOffset } — used by the bot DO to incrementally catch up on each activation.
Stream lifecycle
Tied to room lifecycle in packages/www/src/routes/api/mutate/+server.ts. On room insert, the handler creates a stream at path room/{recordId} and stores it as durable_stream_id. On room delete, the handler deletes the backing stream. There is no standalone create/delete in the client API — streams exist because rooms exist.
Message format
Defined in packages/core/schemas/message.ts as a Zod discriminated union (StreamMessage).
A content message (type: "message") carries id, author_id, created_at, and body: { format: "markdown", text }. A system message (type: "system") carries event (agent_joined, agent_left, room_renamed) and meta. Both share the base envelope. The stream is append-only — no edits or deletes.
Use buildContentMessage(authorId, text) from @arbe/streams/messages to construct messages.
Read and write paths
Browser
All access goes through the app proxy. sendMessage() in packages/www/src/lib/collections/messages.ts does an optimistic insert into a TanStack DB collection, then POSTs to /api/streams/{scope_id}. The proxy checks w permission, resolves durable_stream_id, forwards upstream, and triggers bot dispatch.
openMessages(scopeId) starts tailing via @durable-streams/client pointed at the same proxy URL. The proxy checks r permission and forwards protocol headers. Duplicates from optimistic inserts are skipped by ID.
Bots (via client)
Bots use createClient() with an Authorization header and agentId to auto-fill author_id on postMessage. tailStream() long-polls through the same proxy.
Bot Durable Object (direct)
HusAgent in packages/worker/src/agent.ts uses createDurableStreamClient with the raw stream URL and secret. On each push activation, handleStreamEvent() calls readStreamFrom() with the saved offset (or -1 on first visit) to read the delta since last activation. Messages are cached in DO SQLite for LLM context. Replies go through appendToStream(). No self-tailing — the DO only reads when the app wakes it. Bypasses the app proxy — trusted infrastructure.
App proxy routes
| Route | Purpose |
|---|---|
/api/streams/[scope_id] | Primary proxy. GET reads/tails, POST appends. Permission-checked, fires bot dispatch on write. |
/api/streams/follow/[...path] | Live read proxy for browser subscriptions. |
/api/streams | Admin endpoint (create/delete/append by path). Called by mutate handler, not end users. |
Key files
| File | What it does |
|---|---|
@arbe/streams/client (shared package) | Infrastructure client: create, delete, append, read, follow |
packages/core/client.ts | Consumer client: postMessage, tailStream |
@arbe/streams/schemas/message (shared package) | ContentMessage, SystemMessage, StreamMessage Zod schemas |
@arbe/streams/messages (shared package) | buildContentMessage helper |
packages/www/src/routes/api/streams/[scope_id]/+server.ts | Permission-checked proxy (GET/POST) |
packages/www/src/routes/api/streams/follow/[...path]/+server.ts | Live read proxy |
packages/www/src/routes/api/streams/+server.ts | Admin stream management |
packages/www/src/lib/collections/messages.ts | Browser: TanStack DB collection + tailing |
packages/worker/src/agent.ts | Bot DO: direct stream read/write + LLM |