# Streams

arbe persists every [thread](./threads.md) as a [durable stream](./durable-streams.md) — the append-only product log for that thread. Durable Streams give append-only byte storage; arbe adds a thread entity, payload contract, permission-checked proxy, and dispatch semantics.

Today arbe has one product stream family: `arbe-thread-{threadId}`. Each item is an `ArbeStreamEntry<ArbeThreadPayload>`: `{ id, ts, authorId?, payload }`. `id` is the dedup key. `ts` is persisted event time in unix ms. `authorId` is the acting/authoring agent when there is one; pi-runtime and system-authored lifecycle entries may omit it. Payloads split into three groups:

- `chat` — human/bot authored thread messages
- `pi.*` — pi transcript events carried through from the runtime (`pi.chunk`, `pi.assistant`, `pi.tool_result`, `pi.compaction`)
- `signal.*` — arbe-owned lifecycle and control facts (`signal.thread.*`, `signal.house.*`, `signal.environment.*`, `signal.permission.*`, `signal.dispatch.*`)

Canonical storage keeps durable facts, not adapter transcripts. OpenAI/Anthropic/pi `user` and `assistant` roles are projection concerns at runtime edges. Thread history should still render directly from canonical events: `chat` entries are content, `signal.*` entries narrate state, and failed fanout is recorded as `signal.dispatch.failed` instead of pretending the initial write rolled back.

## Client boundary

`@arbe/streams/client` owns transport. It exposes two separate layers:

- `createDurableStreamClient(baseUrl, options)` talks directly to Durable Streams paths with service auth. Route handlers use it with `Bearer DURABLE_STREAMS_SECRET` for create, delete, append, batch append, read, read-from-offset, and raw stream handles.
- `createScopedStreamClient(options)` talks to arbe's app-facing proxy where callers know a scope/thread id, not a stream path or service secret. It owns scope URL construction, append helpers, long-poll tailing, offset tracking, abort-aware waits, and durable-stream protocol headers.

Do not merge these APIs. The low-level client must not learn arbe permissions, record lookup, thread lifecycle, or scoped URL rules. The scoped client must not receive the durable-stream service secret. `@arbe/core/client` may expose convenience methods such as `postMessage()` and `tailThreadStream()`, but those delegate to `@arbe/streams` instead of reimplementing tail loops.

## Proxy and offsets

Browsers, CLI, and JS clients read through `/api/threads/:id/stream`; the app checks membership and injects the durable-stream secret upstream. Offsets stay opaque — store them, never parse or synthesize them (rules and sentinels in [durable streams](./durable-streams.md)). Long-poll readers resume from `stream-next-offset`; `stream-up-to-date` says whether an empty response reached the tail.

`createScopedStreamClient` starts tailing from the caller's `fromOffset`, retries transient long-poll failures, throws on auth failures, and races body reads against abort signals so CLI follow commands exit promptly.

## Event contract

Identity fields are semantic, not interchangeable:

- `authorId` answers who authored content. Use it on chat/message content.
- `actorId` answers who caused a workflow/system transition when the payload needs an explicit causal id.
- `agentId` answers which agent record an event is about.

Current thread envelopes use `authorId`; many `signal.*` payloads rely on that envelope. If a future payload needs to distinguish author, actor, and subject inside the same event, add explicit payload fields rather than overloading `authorId`.

Rules:

- keep thread history human-facing and directly renderable
- keep transcript projection out of canonical storage
- use concrete ids for pairing permission/tool/request/resume facts
- keep pi payloads at the pi boundary; validate arbe-owned payloads with schemas
- do not add generic actor envelopes or `user.*` / `assistant.*` core variants
- do not add `span.*` unless the event is durable product history, not tracing exhaust

## Dispatch interaction

`POST /api/threads/:id/entries` appends the triggering `chat` entry, then dispatch runs from that durable fact. In-process bot replies append `pi.assistant` authored by the bot and recurse. Env-bound dispatch streams pi JSONL into `pi.*` entries, then appends terminal `signal.thread.status_changed` / `signal.dispatch.*` facts. Substates such as queued, dispatching, streaming, reset, failed, or permission waiting belong on the stream; the `threads.status` column stays coarse.

Append-first matters: if post-write bot fanout fails, preserve the user message and append a failure signal. The log tells the truth.

Code: `packages/streams/client.ts`, `packages/core/entries.ts`, `packages/core/schemas/stream-events/{envelope,thread}.ts`, `packages/core/schemas/thread.ts` (`threadStreamId`), `apps/www/src/routes/api/threads/[id]/stream/`.<br>
See [durable streams](./durable-streams.md), [dispatch](./dispatch.md), [threads](./threads.md), [surfaces](../surfaces.md).
