Skip to content
View as .md

Streams

arbe persists every thread as a durable stream — the append-only product log for that thread. Durable Streams give append-only byte storage; arbe adds a thread entity, payload contract, permission-checked proxy, and dispatch semantics.

Today arbe has one product stream family: arbe-thread-{threadId}. Each item is an ArbeStreamEntry<ArbeThreadPayload>: { id, ts, authorId?, payload }. id is the dedup key. ts is persisted event time in unix ms. authorId is the acting/authoring agent when there is one; pi-runtime and system-authored lifecycle entries may omit it. Payloads split into three groups:

  • chat — human/bot authored thread messages
  • pi.* — pi transcript events carried through from the runtime (pi.chunk, pi.assistant, pi.tool_result, pi.compaction)
  • signal.* — arbe-owned lifecycle and control facts (signal.thread.*, signal.house.*, signal.environment.*, signal.permission.*, signal.dispatch.*)

Canonical storage keeps durable facts, not adapter transcripts. OpenAI/Anthropic/pi user and assistant roles are projection concerns at runtime edges. Thread history should still render directly from canonical events: chat entries are content, signal.* entries narrate state, and failed fanout is recorded as signal.dispatch.failed instead of pretending the initial write rolled back.

Client boundary

@arbe/streams/client owns transport. It exposes two separate layers:

  • createDurableStreamClient(baseUrl, options) talks directly to Durable Streams paths with service auth. Route handlers use it with Bearer DURABLE_STREAMS_SECRET for create, delete, append, batch append, read, read-from-offset, and raw stream handles.
  • createScopedStreamClient(options) talks to arbe’s app-facing proxy where callers know a scope/thread id, not a stream path or service secret. It owns scope URL construction, append helpers, long-poll tailing, offset tracking, abort-aware waits, and durable-stream protocol headers.

Do not merge these APIs. The low-level client must not learn arbe permissions, record lookup, thread lifecycle, or scoped URL rules. The scoped client must not receive the durable-stream service secret. @arbe/core/client may expose convenience methods such as postMessage() and tailThreadStream(), but those delegate to @arbe/streams instead of reimplementing tail loops.

Proxy and offsets

Browsers, CLI, and JS clients read through /api/threads/:id/stream; the app checks membership and injects the durable-stream secret upstream. Offsets stay opaque — store them, never parse or synthesize them (rules and sentinels in durable streams). Long-poll readers resume from stream-next-offset; stream-up-to-date says whether an empty response reached the tail.

createScopedStreamClient starts tailing from the caller’s fromOffset, retries transient long-poll failures, throws on auth failures, and races body reads against abort signals so CLI follow commands exit promptly.

Event contract

Identity fields are semantic, not interchangeable:

  • authorId answers who authored content. Use it on chat/message content.
  • actorId answers who caused a workflow/system transition when the payload needs an explicit causal id.
  • agentId answers which agent record an event is about.

Current thread envelopes use authorId; many signal.* payloads rely on that envelope. If a future payload needs to distinguish author, actor, and subject inside the same event, add explicit payload fields rather than overloading authorId.

Rules:

  • keep thread history human-facing and directly renderable
  • keep transcript projection out of canonical storage
  • use concrete ids for pairing permission/tool/request/resume facts
  • keep pi payloads at the pi boundary; validate arbe-owned payloads with schemas
  • do not add generic actor envelopes or user.* / assistant.* core variants
  • do not add span.* unless the event is durable product history, not tracing exhaust

Dispatch interaction

POST /api/threads/:id/entries appends the triggering chat entry, then dispatch runs from that durable fact. In-process bot replies append pi.assistant authored by the bot and recurse. Env-bound dispatch streams pi JSONL into pi.* entries, then appends terminal signal.thread.status_changed / signal.dispatch.* facts. Substates such as queued, dispatching, streaming, reset, failed, or permission waiting belong on the stream; the threads.status column stays coarse.

Append-first matters: if post-write bot fanout fails, preserve the user message and append a failure signal. The log tells the truth.

Code: packages/streams/client.ts, packages/core/entries.ts, packages/core/schemas/stream-events/{envelope,thread}.ts, packages/core/schemas/thread.ts (threadStreamId), apps/www/src/routes/api/threads/[id]/stream/.
See durable streams, dispatch, threads, surfaces.