Skip to content

Durable streams

Durable streams are append-only byte logs addressed by URL, hosted by Electric SQL. All message content lives in streams — messages never touch Postgres. Each room record stores a durable_stream_id in its content pointing to its backing stream.

Protocol

HTTP verbs on a path:

  • PUT /stream/{path} — create a stream
  • POST /stream/{path} — append (returns Stream-Next-Offset header)
  • GET /stream/{path}?offset=X — read from position X
  • GET /stream/{path}?offset=X&live=long-poll — tail in real-time

Offsets are opaque and monotonic, client-tracked. Server is stateless per-connection. Historical reads are immutable byte ranges — CDN-cacheable. Live tail uses long-polling or SSE with ~60s reconnect cycles. Offset -1 means full history replay.

Client package: @durable-streams/client. Typed fetch wrapper with automatic reconnection, backoff, SSE resilience fallback. Also includes IdempotentProducer with epoch fencing and exactly-once writes.

Two API layers

Consumer layer@arbe/core/client exports postMessage() and tailStream(). These go through the app’s permission-checked proxy at /api/streams/{scope_id}. The caller passes a scope ID (room UUID) and never sees the backing stream URL or secret. Use this from UI, bots, CLI, MCP tools.

Infrastructure layer@arbe/streams/client exports createDurableStreamClient() with six methods: createStream, deleteStream, appendToStream, readStream, readStreamFrom, followStream. Talks to the raw stream service with a Bearer secret — no permission checks, no scope resolution. Use this from route handlers and the bot DO.

readStream fetches all items (optionally limited to the last N) — used by the push path and UI. readStreamFrom(path, offset) reads from a saved offset and returns { items, nextOffset } — used by the bot DO to incrementally catch up on each activation.

Stream lifecycle

Tied to room lifecycle in packages/www/src/routes/api/mutate/+server.ts. On room insert, the handler creates a stream at path room/{recordId} and stores it as durable_stream_id. On room delete, the handler deletes the backing stream. There is no standalone create/delete in the client API — streams exist because rooms exist.

Message format

Defined in packages/core/schemas/message.ts as a Zod discriminated union (StreamMessage).

A content message (type: "message") carries id, author_id, created_at, and body: { format: "markdown", text }. A system message (type: "system") carries event (agent_joined, agent_left, room_renamed) and meta. Both share the base envelope. The stream is append-only — no edits or deletes.

Use buildContentMessage(authorId, text) from @arbe/streams/messages to construct messages.

Read and write paths

Browser

All access goes through the app proxy. sendMessage() in packages/www/src/lib/collections/messages.ts does an optimistic insert into a TanStack DB collection, then POSTs to /api/streams/{scope_id}. The proxy checks w permission, resolves durable_stream_id, forwards upstream, and triggers bot dispatch.

openMessages(scopeId) starts tailing via @durable-streams/client pointed at the same proxy URL. The proxy checks r permission and forwards protocol headers. Duplicates from optimistic inserts are skipped by ID.

Bots (via client)

Bots use createClient() with an Authorization header and agentId to auto-fill author_id on postMessage. tailStream() long-polls through the same proxy.

Bot Durable Object (direct)

HusAgent in packages/worker/src/agent.ts uses createDurableStreamClient with the raw stream URL and secret. On each push activation, handleStreamEvent() calls readStreamFrom() with the saved offset (or -1 on first visit) to read the delta since last activation. Messages are cached in DO SQLite for LLM context. Replies go through appendToStream(). No self-tailing — the DO only reads when the app wakes it. Bypasses the app proxy — trusted infrastructure.

App proxy routes

RoutePurpose
/api/streams/[scope_id]Primary proxy. GET reads/tails, POST appends. Permission-checked, fires bot dispatch on write.
/api/streams/follow/[...path]Live read proxy for browser subscriptions.
/api/streamsAdmin endpoint (create/delete/append by path). Called by mutate handler, not end users.

Key files

FileWhat it does
@arbe/streams/client (shared package)Infrastructure client: create, delete, append, read, follow
packages/core/client.tsConsumer client: postMessage, tailStream
@arbe/streams/schemas/message (shared package)ContentMessage, SystemMessage, StreamMessage Zod schemas
@arbe/streams/messages (shared package)buildContentMessage helper
packages/www/src/routes/api/streams/[scope_id]/+server.tsPermission-checked proxy (GET/POST)
packages/www/src/routes/api/streams/follow/[...path]/+server.tsLive read proxy
packages/www/src/routes/api/streams/+server.tsAdmin stream management
packages/www/src/lib/collections/messages.tsBrowser: TanStack DB collection + tailing
packages/worker/src/agent.tsBot DO: direct stream read/write + LLM