Skip to content

Node.js

@kurrent/projections-testing runs projections from inside your existing test suite on the same engine as gaffer and KurrentDB. Works with vitest, jest, mocha, or any node test runner.

Terminal window
npm install --save-dev @kurrent/projections-testing

@kurrent/kurrentdb-client is a peer dependency, needed only when subscribing to a live KurrentDB cluster from a test.

  • Node.js 22 or later, enforced through the package’s engines field.
  • ESM. The package is ESM-only, so your project needs "type": "module" in package.json.
  • TypeScript 5.2 or later if you use TypeScript. The API relies on using (explicit resource management) for automatic session cleanup.
  • A tsconfig.json with at least:
    • "target": "ES2022" or later.
    • "lib": ["ES2022", "ESNext.Disposable"]. The ESNext.Disposable entry is required for using test = projection.test(); without it TypeScript fails with a cryptic error about Symbol.dispose not existing rather than a missing-lib hint.
    • "moduleResolution": "Node16", "NodeNext", or "Bundler".

No special test-runner configuration is needed; it works with vitest, jest, mocha, or node --test as-is.

Given a projection that counts orders and sums their value:

projections/order-count.js
fromAll().when({
$init: () => ({ count: 0, totalCents: 0 }),
OrderPlaced: (state, event) => ({
count: state.count + 1,
totalCents: state.totalCents + event.body.cents,
}),
});

Load its source and run events through it:

import { createProjection } from "@kurrent/projections-testing";
import { readFile } from "fs/promises";
const source = await readFile("./projections/order-count.js", "utf8");
const projection = createProjection<{ count: number; totalCents: number }>(
source,
{ engineVersion: 2 },
);
for (const result of projection.run([
{
eventType: "OrderPlaced",
streamId: "order-1",
sequenceNumber: 0,
isJson: true,
data: { cents: 2999 },
},
{
eventType: "OrderPlaced",
streamId: "order-2",
sequenceNumber: 1,
isJson: true,
data: { cents: 4999 },
},
])) {
if (result.status !== "processed") continue;
console.log(result.state); // { count: 1, totalCents: 2999 }, { count: 2, totalCents: 7998 }
}

Create a projection from JavaScript source. The projection compiles lazily on first validate(), run(), or test() call.

Options:

  • engineVersion: 1 or 2. Required.
  • quirksVersion: target KurrentDB version ("MAJOR.MINOR.PATCH", e.g. "26.1.0"). Unset (the default) reproduces every known engine quirk; set a version to turn off quirks fixed upstream as of that version.
  • config: per-projection settings.
    • executionTimeoutMs: max handler execution time per event in ms (default 5000).
  • databaseConfig: database-wide settings.
    • compilationTimeoutMs: max compile time in ms (default 5000).
    • executionTimeoutMs: default max handler execution time in ms (default 5000).
    • maxStateSizeBytes: max serialized projection state size in bytes (default 16 MiB).

Compile the projection and return its source definition. Throws if the source is invalid.

const info = projection.validate();
console.log(info.source); // { type: "all" } or { type: "categories", categories: ["order"] }
console.log(info.events); // ["OrderPlaced"] or "all"

Run the projection over events, yielding a StepResult after each one. Accepts:

  • Iterable<EventInput> - arrays, generators.
  • AsyncIterable<EventInput> - async generators, client streams.
  • KurrentDBClient - subscribes to the appropriate streams based on the projection’s declared source.

StepResult is a discriminated union on status (see Step results for the full field list). Guard before destructuring:

for (const result of projection.run(events)) {
if (result.status !== "processed") continue;
expect(result.state.count).toBe(2);
}

Async and live-client paths look the same:

// Async
for await (const result of projection.run(asyncEvents)) {
if (result.status === "processed") {
/* ... */
}
}

Pass a KurrentDBClient and the projection subscribes to the streams its source declares (fromAll, fromCategory("order"), etc.). The subscription is unbounded - break out of the loop when an assertion holds or a fixed number of events have flowed through.

import { KurrentDBClient, jsonEvent } from "@kurrent/kurrentdb-client";
import { createProjection } from "@kurrent/projections-testing";
import { readFile } from "fs/promises";
const client = KurrentDBClient.connectionString(
"kurrentdb://localhost:2113?tls=false",
);
// Seed some events
await client.appendToStream("order-1", [
jsonEvent({ type: "OrderPlaced", data: { cents: 2999 } }),
jsonEvent({ type: "OrderPlaced", data: { cents: 4999 } }),
]);
const source = await readFile("./projections/order-count.js", "utf8");
const projection = createProjection<{ count: number; totalCents: number }>(
source,
{ engineVersion: 2 },
);
let final;
for await (const result of projection.run(client)) {
if (result.status !== "processed") continue;
final = result.state;
if (final.count >= 2) break;
}
expect(final).toEqual({ count: 2, totalCents: 7998 });
await client.dispose();

Breaking out of the loop disposes the subscription cleanly.

Open an interactive test session for feeding events one at a time. Use this when you want to assert against intermediate state rather than only the final aggregate.

const test = projection.test();
const step = test.feed({
eventType: "OrderPlaced",
streamId: "order-1",
sequenceNumber: 0,
isJson: true,
data: { cents: 2999 },
});
if (step.status !== "processed") {
throw new Error(`expected processed, got ${step.status}: ${step.reason}`);
}
expect(step.state).toEqual({ count: 1, totalCents: 2999 });
expect(step.emitted).toHaveLength(0);
expect(step.logs).toEqual([]);
test.dispose(); // or `using test = projection.test()` for auto-cleanup

For projections that partition state (foreachStream, partitionBy), inspect each partition:

test.getState("order-1"); // state for the order-1 partition
test.getStateRaw("order-1"); // raw persisted state JSON, before parse (see Raw state and diagnostics)
test.getState("order-2"); // state for the order-2 partition
test.getSharedState(); // shared state (biState projections)
test.getResult("order-1"); // result for order-1 (V1: post-transform, V2: post-handler state)

Some KurrentDB quirks only surface in how state is persisted, and state / getState() hide them by parsing the persisted JSON on read (see also State serialization).

  • step.diagnostics lists the quirks encountered while processing the event (empty when none; it can carry more than one, and the same code can repeat). The motivating case is quirk.serialize.rawString: a bare string state that isn’t valid JSON is persisted un-encoded, so the projection would fault when it reloads (JSON.parse can’t read it). Non-persistence quirks appear here too, such as quirk.log.multiParam fired at each multi-argument log() call.
  • step.stateRaw and getStateRaw(partition?) return the persisted state JSON string before JSON.parse, so you can assert the exact persisted value.
// A projection whose handler returns a bare string as its state.
const test = createProjection(`fromAll().when({ Set: (s, e) => e.body.name })`, {
engineVersion: 2,
}).test();
const step = test.feed({
eventType: "Set",
streamId: "s-1",
sequenceNumber: 0,
isJson: true,
data: { name: "alice" },
});
if (step.status !== "processed") throw new Error(step.reason);
expect(step.state).toBe("alice"); // parsed
expect(step.stateRaw).toBe('"alice"'); // persisted, JSON-encoded
expect(step.diagnostics.map((d) => d.code)).toContain(
"quirk.serialize.rawString",
);

Helpers for constructing the system events KurrentDB emits (stream deletions, etc.):

import { systemEvents } from "@kurrent/projections-testing";
test.feed(systemEvents.streamDeleted("order-1", 5));

run() and test().feed() both yield a StepResult, a discriminated union on status. Narrow on status before reading the processed-only fields.

A processed result (the handler ran) carries:

FieldDescription
stateParsed projection state for the affected partition.
stateRawThe persisted state JSON string before JSON.parse, or null when no state was produced. See State serialization.
resultThe partition’s result. V1: state after transformBy/filterBy, or state if no transform applies. V2: post-handler state; transforms are not invoked.
sharedStateShared state for biState projections; undefined otherwise.
partitionThe partition key that was updated. Absent for unpartitioned projections.
eventThe input event, round-tripped verbatim, so you can assert against it.
emittedEvents emitted during processing (emit / linkTo).
logsMessages from log() calls.
diagnosticsQuirks that fired while processing this event, empty when none. See State serialization.

A skipped result (the event never reached the handler) carries the same event plus a reason: unhandled, non-json, link, no-partition, no-delete-handler, or wrong-stream.

Projection state is persisted as JSON by the same engine KurrentDB uses, so a few JavaScript values serialize in ways worth knowing when you assert against state:

  • BigInt serializes to a decimal string: 10n persists as "10", matching KurrentDB.
  • undefined object properties are dropped, exactly like JSON.stringify; in an array position undefined becomes null.
  • NaN and Infinity throw a StateSerializationError, because KurrentDB rejects them (the quirk.serialize.nonFinite quirk).
  • Deeply nested, circular, or oversized state throws a StateSerializationError: state nested more than 64 levels deep, a value containing a circular reference (s.self = s), an array with more than ~2 billion elements, or serialized state exceeding maxStateSizeBytes (default 16 MiB).

state and getState() hide the persisted form by parsing it on read. To assert against the raw JSON (including quirks like a bare non-JSON string state persisted un-encoded), read stateRaw / getStateRaw() and inspect diagnostics (see Raw state and diagnostics).

Three event shapes are accepted:

// Manual test event (isJson is required)
{ eventType: "OrderPlaced", streamId: "order-1", sequenceNumber: 0, isJson: true, data: { cents: 2999 } }
// KurrentDB RecordedEvent (from client subscriptions)
{ type: "OrderPlaced", streamId: "order-1", revision: 0n, isJson: true, id: "...", created: new Date(), ... }
// KurrentDB ResolvedEvent (from $by_event_type, $by_category, etc.)
{ event: { type: "OrderPlaced", streamId: "order-1", revision: 0n, isJson: true, ... } }

data and metadata accept objects (auto-stringified to JSON) or strings (passed through).

For manual test events, eventType and streamId must be non-empty and sequenceNumber must be a non-negative integer, matching what KurrentDB can actually deliver to a handler.

The created timestamp on events from KurrentDB carries sub-millisecond precision: a 7-digit fractional second (.NET round-trip "o" format). Parse it with new Date(event.created) for assertions. Some strict ISO-8601 parsers reject the extra digits.

Events are matched against the projection’s declared source: a fromStream("a") / fromStreams / fromCategory projection only processes events on streams it subscribes to, and others are skipped with reason: "wrong-stream" (mirroring KurrentDB delivery). fromAll() accepts every stream.

Runtime errors propagate as typed ProjectionError subclasses with structured fields:

import {
ProjectionHandlerError,
InvalidProjectionError,
ProjectionError,
} from "@kurrent/projections-testing";
try {
test.feed(event);
} catch (err) {
if (err instanceof ProjectionHandlerError) {
err.description; // "boom"
err.event.eventType; // "OrderPlaced"
err.event.streamId; // "order-1"
err.event.sequenceNumber; // 42
err.message; // formatted with source snippet and caret
}
if (err instanceof ProjectionError) {
err.code; // "handler-error", "malformed-event", etc.
err.description; // human-readable description
err.diagnostics; // quirks that fired on the throwing event (e.g. quirk.serialize.nonFinite)
}
}

When a quirk throws, err.diagnostics carries it (and any quirk that fired earlier in the same event), the same Diagnostic shape as step.diagnostics on a processed step. So a test can assert on a throwing quirk the same way it asserts on a non-throwing one.

The full error hierarchy: InvalidProjectionError, CompilationTimeoutError, InvalidArgumentError, ProjectionHandlerError, ExecutionTimeoutError, MalformedEventError, StateSerializationError, ProjectionTransformError.