SemiLayerDocs

Realtime — Subscribe

Live tail every insert, update, and delete on a lens. One shared WebSocket, events arrive as they happen, optional in-process filter.

subscribeorders
orders: {
  source: 'main-db',
  table: 'public.orders',
  fields: {
    id:          { type: 'number', primaryKey: true },
    customer_id: { type: 'number' },
    status:      { type: 'enum',   values: ['pending', 'shipped', 'delivered', 'cancelled'] },
    total_cents: { type: 'number' },
    updated_at:  { type: 'date' },
  },
  grants: {
    search: 'authenticated',
    stream: {
      enabled: true,
      modes: ['live'],                //  only live — no chunked exports
      maxLiveSubscriptions: 25,
    },
  },
}

Signature

stream.subscribe<M = Record<string, unknown>>(
  lens: string,
  params?: SubscribeParams,
): AsyncIterable<StreamEvent<M>>

interface SubscribeParams {
  filter?: Record<string, unknown>    // equality-only; v1 limitation
}

interface StreamEvent<M> {
  kind:   'insert' | 'update' | 'delete'
  record: M                           // typed from your lens fields
}

for await through the iterator. break to stop; the client cleans up the op. The shared WebSocket stays open for other subscriptions.

Filter — what it matches

filter: {
  category: 'footwear',                 // exact string equality
  in_stock: true,                       // boolean
  tags:     ['new', 'sale'],            // array: length + element-wise equality
}
  • Scalars: strict equality (===).
  • Arrays: same length, same elements in the same order.
  • Missing fields on the record: treated as undefined, won't match anything.

Not supported in v1: $gte / $in / $or operators, nested object equality, predicate functions. If you need richer matching, filter on the client inside your loop. The server-side filter saves bandwidth when most events don't match; the client-side filter handles anything the server can't.

React hook

useSubscribe wraps the iterator, retry loop, and event accumulation:

import { useSubscribe } from '@semilayer/react'

export function ShipmentLog() {
  const { events, latest, error, reset } = useSubscribe<OrdersMetadata>(
    'orders',
    { filter: { status: 'shipped' } },
    { enabled: true, maxEvents: 500 },   // ring buffer — drops oldest
  )

  if (error) return <RetryCard onRetry={reset} />

  return (
    <ul>
      {events.map((e, i) => (
        <li key={i}>
          <span className={kindColor(e.kind)}>{e.kind}</span>
          {' '}
          {e.record.id} · {e.record.customer_id} · {e.record.total_cents / 100}
        </li>
      ))}
    </ul>
  )
}
  • events: StreamEvent<M>[] — ring-buffered to maxEvents (default 500).
  • latest — the most recent event (or null before anything arrives).
  • error — set when the iterator throws and retry hasn't kicked in yet.
  • reset() — clear the event list and force a fresh subscribe.

The hook holds the subscription open for the lifetime of the component. On unmount, it calls the iterator's return() to cleanly close the op.

Combining with a fetched snapshot

A common pattern: load the current list with query, then subscribe for changes on top of it.

const initial = await beam.orders.query({
  where:   { status: 'shipped' },
  orderBy: { field: 'updated_at', dir: 'desc' },
  limit:   50,
})

// events that arrive on this subscribe can be merged into `initial.rows`
// on 'insert' — prepend; on 'update' — replace by id; on 'delete' — remove
for await (const event of beam.orders.stream.subscribe({
  filter: { status: 'shipped' },
})) {
  setRows((rows) => applyEvent(rows, event))
}

useSubscribe leaves the merge strategy to you — different apps want different behaviors (insert to top vs. insert to match orderBy, overwrite vs. animate, etc.).

Limits

  • One WebSocket per environment. All subscribe / observe / stream.query / stream.search / feed.subscribe calls from the same client share it.
  • maxLiveSubscriptions per lens (from grants.stream.maxLiveSubscriptions). Exceed it → 4290 rate_limited close code.
  • Org tier cap on concurrent subs across all lenses. Exceed it → 4291 quotaExceeded.
  • Heartbeats: server pings every 25s. Client SDK auto-responds with pong. Two missed pongs → server closes with 1011.

Error handling

Close codeMeaningFix
4403grants.stream.enabled is false, or modes excludes 'live', or access rule deniedCheck grants.stream + the caller's identity
4290Per-lens maxLiveSubscriptions hitClose an old sub or raise the lens's cap
4291Org-wide tier cap hitUpgrade tier, or close unused subs
1011Server-initiated close (heartbeat timeout, restart)Retry with backoff

All of these surface as BeamStreamClosedError in your iterator. The error's .code field carries the close code so the retry loop can branch on it.