SemiLayerDocs

Ingest — CDC patterns

"How do I get my database's changes into SemiLayer?" Five patterns, one API shape. Pick the one that matches the data source you already run.

The common contract: your CDC pipeline collects { id, action } pairs and POSTs them to /v1/ingest/:lens. Batch them, debounce them, and retry on 429 — see Webhook ingest for the API.

This page is about the upstream half — getting changes out of your database reliably.


Postgres — logical replication (preferred)

The cleanest path. Create a publication, subscribe with wal2json or pgoutput, stream changes as JSON.

1. Enable logical replication

-- postgresql.conf
wal_level            = logical
max_replication_slots = 4
max_wal_senders      = 4

-- SQL
CREATE PUBLICATION semilayer_products FOR TABLE products;
SELECT pg_create_logical_replication_slot('semilayer_products', 'wal2json');

2. Stream the slot and batch

Node.js example using pg-logical-replication:

import { LogicalReplicationService, Wal2Json } from 'pg-logical-replication'

const service = new LogicalReplicationService({
  connectionString: process.env.DATABASE_URL,
})

const plugin = new Wal2Json.Plugin()
const batch: Array<{ id: string; action: 'upsert' | 'delete' }> = []

service.on('data', async (lsn, log) => {
  for (const change of (log as any).change) {
    if (change.kind === 'insert' || change.kind === 'update') {
      const id = change.columnvalues[change.columnnames.indexOf('id')]
      batch.push({ id: String(id), action: 'upsert' })
    } else if (change.kind === 'delete') {
      const id = change.oldkeys.keyvalues[change.oldkeys.keynames.indexOf('id')]
      batch.push({ id: String(id), action: 'delete' })
    }
  }
  if (batch.length >= 1000) await flush()
})

setInterval(flush, 2000)   // debounce: flush at least every 2s

await service.subscribe(plugin, 'semilayer_products')

async function flush() {
  if (!batch.length) return
  const changes = batch.splice(0)        // atomic take
  await postToSemilayer(changes)
}

Why this is the default choice: deletes are explicit, ordering is exact, there's no "missed write" window. Works on every managed Postgres that exposes logical replication (RDS, Aurora, Supabase, Neon, Cloud SQL).

Caveats: requires wal_level=logical, which some heavily-customized Postgres setups don't allow. The replication slot holds WAL until your consumer catches up — if you stop your CDC worker, disk usage grows.


Postgres — triggers + tombstone table

If logical replication isn't available, install a trigger on each watched table that writes to a lightweight semilayer_changes log.

1. Create the log + trigger

CREATE TABLE semilayer_changes (
  id         BIGSERIAL PRIMARY KEY,
  lens       TEXT NOT NULL,
  row_id     TEXT NOT NULL,
  action     TEXT NOT NULL CHECK (action IN ('upsert', 'delete')),
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE OR REPLACE FUNCTION semilayer_emit_change() RETURNS TRIGGER AS $$
BEGIN
  IF (TG_OP = 'INSERT') OR (TG_OP = 'UPDATE') THEN
    INSERT INTO semilayer_changes (lens, row_id, action)
    VALUES (TG_ARGV[0], NEW.id::text, 'upsert');
    RETURN NEW;
  ELSIF (TG_OP = 'DELETE') THEN
    INSERT INTO semilayer_changes (lens, row_id, action)
    VALUES (TG_ARGV[0], OLD.id::text, 'delete');
    RETURN OLD;
  END IF;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER products_semilayer
AFTER INSERT OR UPDATE OR DELETE ON products
FOR EACH ROW EXECUTE FUNCTION semilayer_emit_change('products');

2. Drain in a worker

setInterval(async () => {
  const { rows } = await pg.query(`
    DELETE FROM semilayer_changes
    WHERE id IN (
      SELECT id FROM semilayer_changes
      ORDER BY id
      LIMIT 1000
    )
    RETURNING lens, row_id, action
  `)

  const byLens = groupBy(rows, (r) => r.lens)
  for (const [lens, changes] of Object.entries(byLens)) {
    await postToSemilayer(lens, changes.map((c) => ({
      id: c.row_id,
      action: c.action,
    })))
  }
}, 2000)

Why triggers over polling the source table directly: tombstone rows mean you see deletes. updated_at-based polling can't.

Caveats: the trigger runs inside every write transaction. On a write-heavy table, this adds a tiny but nonzero latency cost. The semilayer_changes table grows unless your worker drains it reliably — monitor its row count.


MySQL — binlog via Debezium

MySQL's binlog is the equivalent of Postgres logical replication. The canonical reader is Debezium, which publishes changes as JSON to Kafka or (with the Debezium Server + HTTP sink) directly to a webhook.

Debezium Server config (application.properties)

debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.database.hostname=mysql.internal
debezium.source.database.user=debezium
debezium.source.database.server.id=1234
debezium.source.topic.prefix=products
debezium.source.database.include.list=shop
debezium.source.table.include.list=shop.products

# HTTP sink — point at a small adapter you run
debezium.sink.type=http
debezium.sink.http.url=http://cdc-adapter.internal/debezium

Adapter

Debezium emits one HTTP POST per change. Your adapter batches them:

// cdc-adapter — receives Debezium events, flushes to SemiLayer
app.post('/debezium', (req, reply) => {
  const { op, after, before } = req.body.payload
  const action = op === 'd' ? 'delete' : 'upsert'
  const id = String((op === 'd' ? before : after).id)
  batch.push({ id, action })
  reply.code(204).send()
})

setInterval(flush, 2000)

Why Debezium: battle-tested, handles MySQL's quirks (binlog format, DDL changes, GTID-based restarts). Pluggable into any CDC target.

Caveats: operationally heavier than the Postgres options — Debezium Server is a separate process you run. For a light setup, a handwritten mysql-binlog-connector-java wrapper is simpler.


AWS — DMS + Kinesis + Lambda

When your source lives in RDS and your stack is all-AWS, DMS into Kinesis is the shortest production path.

RDS Postgres/MySQL
       │
       │  AWS DMS (CDC task, Kinesis target)
       ▼
Kinesis Data Stream
       │
       │  Lambda trigger (batch size 100–1000)
       ▼
Lambda handler batches → POST /v1/ingest/products

DMS task

  • Source endpoint: RDS with CDC enabled (rds.logical_replication = 1 for Postgres).
  • Target endpoint: Kinesis Data Stream.
  • Task type: "Replicate data changes only" (CDC).
  • Table mappings: just the tables you care about.

Lambda handler

export const handler = async (event: KinesisStreamEvent) => {
  const changes: Array<{ id: string; action: 'upsert' | 'delete' }> = []

  for (const record of event.Records) {
    const payload = JSON.parse(
      Buffer.from(record.kinesis.data, 'base64').toString('utf8'),
    )
    // DMS event shape: { data: {...}, metadata: { operation: 'insert' | 'update' | 'delete', table-name: ... } }
    const op = payload.metadata.operation
    const action = op === 'delete' ? 'delete' : 'upsert'
    const id = String(payload.data.id)
    changes.push({ id, action })
  }

  await fetch('https://api.semilayer.com/v1/ingest/products', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${process.env.SEMILAYER_INGEST_KEY}`,
      'Content-Type':  'application/json',
    },
    body: JSON.stringify({ mode: 'records', changes }),
  })
}

Lambda batch size controls the natural batch into SemiLayer — 500–1000 is a good starting point.

Why this pattern: no servers to manage. DMS handles the source-side complexity (WAL retention, reconnects, DDL awareness). Lambda autoscales.

Caveats: DMS has cold-start overhead on CDC resumption after long pauses. For always-on workloads it's fine; for bursty workloads budget a few minutes of catch-up after quiet periods.


DIY — cron + updated_at polling

When CDC infra is overkill or unavailable. Your worker polls the source every N seconds for rows updated since last check and forwards them.

// Runs every 30 seconds
let cursor = await loadCursor('products')

const rows = await pg.query(`
  SELECT id, updated_at
  FROM products
  WHERE updated_at > $1
  ORDER BY updated_at
  LIMIT 5000
`, [cursor])

if (rows.length) {
  await postToSemilayer('products', rows.map((r) => ({
    id: String(r.id),
    action: 'upsert',
  })))
  await saveCursor('products', rows[rows.length - 1].updated_at)
}

Caveats — important:

  • Deletes are invisible. If your app supports deletes, add a deleted_at column and either (a) include it in the lens config as a filter field and soft-delete only, or (b) run a second reconciliation pass that compares source id sets to indexed id sets.
  • updated_at discipline required. Every write to every watched row must touch updated_at. Forgetting to bump it on a specific code path silently drops changes.
  • Polling burns work when the source is quiet. A 30s cadence × 24h × a query that returns nothing = 2,880 empty round-trips daily. CDC is cheaper for anything above trivial scale.

Good enough for: prototypes, low-write internal tools, append-only data (where deletes don't happen). Not recommended for anything production-shaped.


Choosing

Your stackRecommended
Postgres (any flavor with wal_level=logical)Logical replication + your own worker
Postgres, can't touch wal_levelTriggers + tombstone table
MySQLDebezium
RDS + AWS all the wayDMS → Kinesis → Lambda
No CDC, low write rate, append-onlysyncInterval on the lens (Keeping data fresh)
No CDC, need deletes handled automaticallysmartSyncInterval: '24h' — scheduled full scan with tombstone detection, no webhook code to write (Keeping data fresh)
No CDC, need deletes and precise timingTriggers even on MySQL — AFTER DELETE into a changes table

The webhook endpoint doesn't care where the changes came from. Pick the upstream that matches what you already run; the integration is the same 20 lines of "batch + POST + retry on 429."

Next: Troubleshooting — what to do when changes aren't landing.