Skip to content
ElectricElectric
PricingBlog
Main Navigation
Go to Cloud
Cloud
Go to Cloud
On this page

← Blog

StreamDB — a reactive database in a Durable Stream

By James Arthur

Are you an LLM? You can read better optimized documentation at /blog/2026/03/26/stream-db.md for this page in Markdown format

StreamDB turns a Durable Stream into a reactive database.

You provide a Standard Schema and get type‑safe, multiplexed durable state with sync for your AI apps and agentic systems.

✨  Docs, demo and skills

Read the docs see the example app and install skills for your coding agent.

Wiring up agent state ​

Agent sessions need state. Tool calls, messages, active generations. User presence, agent registration. This state should be real-time and reactive as well as persistent and forkable. In an ideal world it should be unified and you should just be able to declare a schema and have it all just work. Typed, synced and reactive.

In practice you end up wiring together a database, a pub/sub layer, custom sync logic, and retry handling. Every project is a bespoke stack. Coordination is hand-rolled. Type-safety is missing. State breaks on disconnect. Collaboration is an afterthought.

Enter StreamDB ​

StreamDB wraps a Durable Stream with a Standard Schema to give you typed, reactive collections. You define your entity types — messages, presence, agents, whatever your session needs — as Standard Schema objects.

StreamDB routes events by type into TanStack DB collections, with end-to-end type-safety from producer to consumer.

Multiplexing is built in. Multiple entity types coexist on a single stream, each routed to its own collection. The stream is the source of truth. Collections materialize state by applying events in order. Clients join from any offset and catch up.

Reactive by default ​

The data comes into TanStack DB collections. Materialized using a differential dataflow based live query engine. When a new event arrives on the stream, only the affected data recalculates, giving you sub-millisecond reactivity.

Derived collections compose. You can filter messages, aggregate presence and join up your session data with your existing app data.

Durable by default ​

The underlying Durable Stream is persistent and addressable. State survives disconnects, refreshes, restarts. Clients resume from their last offset. There's no re-fetching, no state loss.

Everything is multi-user, multi-tab, multi-device and multi-agent out of the box. All reading and writing to the same stream.

Using StreamDB ​

Define your schema ​

Start with Standard Schema objects for each entity type in your session. Combine them into a unified schema with createStateSchema — this maps each type to a collection with a primary key.

ts
import { z } from 'zod'
import { createStateSchema } from '@durable-streams/state'

const schema = createStateSchema({
  messages: {
    schema: z.object({
      id: z.string(),
      role: z.enum(['user', 'assistant', 'system']),
      content: z.string(),
      createdAt: z.string(),
    }),
    type: 'message',
    primaryKey: 'id',
  },
  presence: {
    schema: z.object({
      userId: z.string(),
      status: z.enum(['online', 'offline']),
    }),
    type: 'presence',
    primaryKey: 'userId',
  },
  agents: {
    schema: z.object({
      agentId: z.string(),
      name: z.string(),
      endpoint: z.string(),
    }),
    type: 'agent',
    primaryKey: 'agentId',
  },
})

Three entity types, one schema, one stream. Messages, presence, and agent registration multiplexed together with full type-safety.

Connect to a stream ​

createStreamDB connects your schema to a Durable Stream and gives you a reactive, stream-backed database.

ts
import { createStreamDB } from '@durable-streams/state'

const db = createStreamDB({
  streamOptions: {
    url: 'https://api.electric-sql.cloud/v1/streams/my-session',
    contentType: 'application/json',
  },
  state: schema,
})

await db.preload()

preload() reads from the beginning of the stream, materializes current state, and stays connected for live updates. From this point, db.collections.messages, db.collections.presence, and db.collections.agents are live TanStack DB collections.

Query reactively ​

Bind collections to your components with useLiveQuery — standard TanStack DB.

tsx
import { useLiveQuery } from '@tanstack/react-db'

function MessageList() {
  const { data: messages } = useLiveQuery((q) =>
    q
      .from({ msg: db.collections.messages })
      .orderBy(({ msg }) => msg.createdAt, 'asc')
  )

  return <List items={messages} />
}

Queries update incrementally via differential dataflow — no re-scanning, no re-rendering unaffected rows. Derived views compose naturally: filter, join, aggregate across collections.

tsx
function OnlineAgents() {
  const { data: agents } = useLiveQuery((q) =>
    q.from({ agent: db.collections.agents })
  )

  return <AgentList items={agents} />
}

Derive collections ​

Raw stream data often needs materializing into higher-level structures — token chunks into complete messages, for example. Derived collections let you do this declaratively, using live query pipelines that update incrementally.

tsx
import { createLiveQueryCollection, collect, count, minStr } from '@tanstack/db'

const messagesCollection = createLiveQueryCollection({
  query: (q) => {
    // Group chunks by messageId, collecting the raw rows
    const collected = q
      .from({ chunk: db.collections.chunks })
      .groupBy(({ chunk }) => chunk.messageId)
      .select(({ chunk }) => ({
        messageId: chunk.messageId,
        rows: collect(chunk),
        startedAt: minStr(chunk.createdAt),
        rowCount: count(chunk),
      }))

    // Materialize grouped chunks into messages
    return q
      .from({ collected })
      .orderBy(({ collected }) => collected.startedAt, 'asc')
      .fn.select(({ collected }) => materializeMessage(collected.rows))
  },
  getKey: (row) => row.id,
})

No for loops over client data. Differential dataflow means only changed chunks trigger recalculation. Derived collections are themselves TanStack DB collections. You can query, filter, and derive further from them.

tsx
// Derive pending tool call approvals from the materialized messages
const approvalsCollection = createLiveQueryCollection({
  query: (q) =>
    q
      .from({ message: messagesCollection })
      .fn.where(({ message }) =>
        message.parts.some(
          (p) => p.type === 'tool-call' &&
            p.approval?.needsApproval === true &&
            p.approval.approved === undefined
        )
      ),
  getKey: (row) => row.id,
})

Chunks sync on the stream. Messages materialize from chunks. Approvals derive from messages. Each layer is reactive, typed, and incremental.

Write with optimistic mutations ​

Writes go through TanStack DB's optimistic action system. Local state updates instantly, persistence happens async.

ts
const db = createStreamDB({
  streamOptions: { url: streamUrl, contentType: 'application/json' },
  state: schema,
  actions: ({ db, stream }) => ({
    sendMessage: {
      onMutate: (msg) => {
        db.collections.messages.insert(msg)
      },
      mutationFn: async (msg) => {
        const txid = crypto.randomUUID()
        const event = schema.messages.insert({ value: msg, headers: { txid } })
        await stream.append(JSON.stringify(event))
        await db.utils.awaitTxId(txid)
      },
    },
  }),
})

await db.actions.sendMessage({
  id: crypto.randomUUID(),
  role: 'user',
  content: 'Hello',
  createdAt: new Date().toISOString(),
})

onMutate inserts into the local collection immediately — the UI updates before the network round-trip. mutationFn appends to the Durable Stream with a transaction ID, then waits for it to sync back. If the write fails, TanStack DB rolls back the optimistic state automatically.

Getting started ​

See the StreamDB docs and the stream-db example app.

Let your agent do it ​

Our packages also ship with agent skills via TanStack Intent. Install the Intent system:

bash
npx @tanstack/intent install

Intent finds the skills that are versioned with your dependency packages. npm update updates the skills too. Your coding agent can then provision a Durable Stream on Electric Cloud and scaffold a StreamDB in one shot.

Ask it to "create a StreamDB for my agentic session". Your agent will provision the stream, generates a schema from your requirements and wire everything up for you.

Software factories can do this programmatically. Spin up session state as part of an agent workflow with no no manual infrastructure needed.

What's underneath ​

StreamDB runs on Durable Streams, an open protocol for persistent, addressable, real-time streams designed to host agentic session data.

Building the Durable Sessions demo on Durable Streams is where this clicked for me. StreamDB eliminated the boilerplate and derived collections allowed me to materialize token chunks into the message state. All incremental, all reactive, like living in the future.

Next steps ​

  • StreamDB docs
  • Deploy on Electric Cloud
  • Join our Discord

ElectricElectric

AboutContactLegalDocsDemosBlogSign up
TanStack DBPGliteXBlueskyDiscordGitHub

© 2026 Electric DB Inc. Released under the Apache 2.0 License.

✨ Markdown