SemiLayerDocs

Realtime

Two primitives, one WebSocket, every insert/update/delete on your source.

  • subscribe — live tail of all matching rows. Yields every change to the lens, optionally filtered.
  • observe — single-record watch. First yield is the current state; every subsequent yield is an update.
// Live tail every change to products
for await (const event of beam.products.stream.subscribe()) {
  console.log(event.kind, event.record)
  // event.kind: 'insert' | 'update' | 'delete'
  // event.record: ProductsMetadata
}

// Watch one product
for await (const snapshot of beam.observe('products', 'p_7712')) {
  setProduct(snapshot) // ProductsMetadata — fires once with current state, then on every change
}

Both are AsyncIterables. Both ride the same shared WebSocket — one connection per environment, multiplexed across every live op.

How it reaches you

source row changes
       │
       ▼
ingest worker writes to vectors.live_tail_records + NOTIFY
       │
       ▼
ListenerPool (one pg.Client LISTENING per lens channel)
       │
       ▼
filter + hash dedup per subscriber
       │
       ▼
WebSocket 'event' frame  →  { kind, record } on your iterator

No polling. No long-poll fallback. One Postgres NOTIFY per ingested batch, fanned out by a connection pool on the service to every live subscriber.

When to use what

NeedReach for
Live tail of all changes (dashboards, activity feeds)subscribe
Single-record state that updates in place (detail page)observe
"New content" ticks for a ranked feed (no row payload)feed.<name>.subscribe
Progressive results as a big query returnsstream.query / stream.search (chunked, not live — see Pagination & streaming)

feed.subscribe is a different primitive — it yields FeedTickEvent counts, not row payloads. Use it when you want "new stuff" signals for a ranked list. Use subscribe when you want the row data itself.

Access rules

Live ops are gated by grants.stream:

products: {
  grants: {
    stream: {
      enabled: true,                        // default
      modes: ['chunked', 'live'],           // default — both
      maxLiveSubscriptions: 10,             // optional per-lens cap
    },
    search: 'public',                       // the function form of search grants
                                            // also governs subscribe/observe gating
  },
}
  • enabled: false — disables all streaming on this lens.
  • modes: ['chunked'] — turns off live ops (subscribe/observe/feed.subscribe) but keeps stream.search/stream.query.
  • maxLiveSubscriptions — per-lens cap on concurrent live subs. Combined with an org-level tier cap.

Filter on subscribe

subscribe accepts an optional filter (equality-only in v1):

for await (const event of beam.products.stream.subscribe({
  filter: { category: 'footwear', in_stock: true },
})) {
  // only emits when the record matches all filter keys
}

No $-operator grammar here (unlike query.where) — the filter runs in-process on every NOTIFY, so it's kept intentionally cheap. Complex predicates are v0.2.

No auto-reconnect (v1)

If the WebSocket drops — server restart, network blip, laptop sleep — the client iterator throws BeamStreamClosedError. You wrap in a retry:

while (true) {
  try {
    for await (const event of beam.products.stream.subscribe()) {
      handle(event)
    }
  } catch (err) {
    if (err instanceof BeamStreamClosedError) {
      await new Promise((r) => setTimeout(r, 1000))
      continue
    }
    throw err
  }
}

React's useSubscribe / useObserve hooks wrap this retry loop for you — see Subscribe.

Transparent reconnect is on the roadmap. When it ships, the pattern above will still work; the hook contract won't change.

Where to start

  • Subscribe — live tail, filter, useSubscribe.
  • Observe — single-record state, useObserve.