Skip to main content
Blog

Introducing the Effect SDK for Rivet Actors

A first-class Effect SDK for Rivet Actors — your Scope, Ref, and Layer, now stateful.

Introducing the Effect SDK for Rivet Actors

Today we’re releasing the RivetKit Effect SDK (beta): a first-class Effect SDK for Rivet Actors.

Effect, Made Stateful

A Rivet Actor is a lightweight, long-lived process with durability: it holds state in memory, persists it, hibernates when idle, and wakes intact.

Effect fits this model almost exactly. Its primitives are built to live and die with one process; on an actor, they outlive it.

If you know Effect, that’s the whole idea in a sentence: an actor is a long-lived Scope holding a Ref that survives restarts. The SDK maps each primitive onto the actor without replacing it:

  • Scope → a lifetime that spans wake to sleep.
  • Ref / SubscriptionRef → persisted, reactive state that survives restart.
  • Layer / Context.Service → dependency injection, with a real engine in tests.
  • typed errors + Schema → payloads, successes, and failures validated across the wire.
  • fibers, spans, logs → structured concurrency carrying actor identity.
  • actions, SQLite, events, queues → the rest of the actor surface, typed.

Rivet handles the infrastructure for you:

  • Fast reads and writes: state lives in memory next to your code, so there are no database round trips.
  • Realtime built in: broadcast over WebSockets, with no external pub/sub or separate socket server.
  • Scales to zero and to millions: idle actors hibernate and wake on demand, with no cold starts.
  • Fault tolerant: actors restart on failure with state preserved.
  • Native Rust core: state, storage, and networking run in Rust, with extensions on the hot paths for low overhead.

See the crash course for an introduction to Actors.

Show Me The Code

A common actor written in Effect has three files: the public contract, the server-only implementation, and the caller.

api.ts is the public contract. Actions are standalone values, each with an explicit Schema payload, success, and error type. It carries no implementation, so client code can import it for full type safety without pulling in anything server-side.

import { Action, Actor } from "@rivetkit/effect";
import { Schema } from "effect";

// A typed error in the action's error channel
export class NegativeAmountError extends Schema.TaggedErrorClass<NegativeAmountError>()(
  "NegativeAmountError",
  { amount: Schema.Number, message: Schema.String },
) {}

export const Increment = Action.make("Increment", {
  payload: { amount: Schema.Number },
  success: Schema.Number,
  error: NegativeAmountError,
});

export const GetCount = Action.make("GetCount", { success: Schema.Number });

// The definition ties the actions together under one actor name.
export const Counter = Actor.make("Counter", {
  actions: [Increment, GetCount],
});

live.ts is the server-side implementation. Counter.toLayer supplies the action handlers through a wake function. It receives the actor’s persisted state; state.schema and initialValue declare its shape and starting value. Note that failure is a returned value in the typed error channel, not a throw.

import { Actor, State } from "@rivetkit/effect";
import { Effect, Schema } from "effect";
import { Counter, NegativeAmountError } from "./api.ts";

export const CounterLive = Counter.toLayer(
  Effect.fnUntraced(function* ({ rawRivetkitContext, state }) {
    return Counter.of({
      Increment: Effect.fnUntraced(function* ({ payload }) {
        // Reject before mutating, so the error path leaves state untouched.
        // The failure is a value in the typed error channel, not a throw.
        if (payload.amount < 0) {
          return yield* new NegativeAmountError({
            amount: payload.amount,
            message: `increment amount ${payload.amount} must not be negative`,
          });
        }

        // Persisted state via a `SubscriptionRef`-like API
        const next = yield* State.updateAndGet(state, (current) => ({
          count: current.count + payload.amount,
        })).pipe(Effect.orDie);

        // Broadcast the new value to every connected client.
        rawRivetkitContext.broadcast("newCount", next.count);

        return next.count;
      }),
      GetCount: () => State.get(state).pipe(Effect.map((s) => s.count), Effect.orDie),
    });
  }),
  {
    state: {
      schema: Schema.Struct({ count: Schema.Number }),
      initialValue: () => ({ count: 0 }),
    },
    name: "Counter",
    icon: "calculator",
  },
);

client.ts is the caller. Counter.client is a typed accessor whose methods are the actions from api.ts. getOrCreate("hello-world") addresses a specific actor instance by key, creating it if it doesn’t exist. The declared error arrives as a real tagged instance you can catchTag, not a string. Client.layer provides the connection to the engine.

import { Client } from "@rivetkit/effect";
import { Effect } from "effect";
import { Counter } from "./api.ts";

const program = Effect.gen(function* () {
  // A typed accessor: methods are your actions, fully typed
  const counter = (yield* Counter.client).getOrCreate("hello-world");

  yield* Effect.log(`count is now ${yield* counter.Increment({ amount: 1 })}`);
  yield* Effect.log(`count is now ${yield* counter.Increment({ amount: 5 })}`);

  // The declared error arrives as a real tagged instance, caught by tag.
  yield* counter.Increment({ amount: -1 }).pipe(
    Effect.catchTag("NegativeAmountError", (err) =>
      Effect.log(`rejected: ${err.message} (amount ${err.amount})`),
    ),
  );

  yield* Effect.log(`final count: ${yield* counter.GetCount()}`);
}).pipe(Effect.provide(Client.layer({ endpoint: "http://127.0.0.1:6420" })));

Registry.test wires the actor up for tests. Same Counter.client, no special-casing.

import { assert, layer } from "@effect/vitest";
import { Registry } from "@rivetkit/effect";
import { Effect, Layer } from "effect";
import { Counter } from "./api.ts";
import { CounterLive } from "./live.ts";

const TestLayer = Registry.test.pipe(
  Layer.provideMerge(CounterLive),
  Layer.provide(Registry.layer()),
);

layer(TestLayer)("counter", (it) => {
  it.effect("increments and reads the count back", () =>
    Effect.gen(function* () {
      const counter = (yield* Counter.client).getOrCreate("t-increment");
      assert.strictEqual(yield* counter.Increment({ amount: 1 }), 1);
      assert.strictEqual(yield* counter.Increment({ amount: 5 }), 6);
      assert.strictEqual(yield* counter.GetCount(), 6);
    }),
  );
});

Effectful Rivet in Practice

Each section below is one Effect primitive and what an actor adds to it.

Scope: a wake is an acquisition, a sleep is a release

In a normal Effect program a Scope lives for one request. An actor wake is a Scope that spans the actor’s entire awake lifetime:

  • Spans calls: the wake scope opens on wake and closes on sleep, not per request.
  • Effect.addFinalizer: runs on sleep, so it’s your hibernation hook.
  • Effect.forkScoped: background fibers run until the actor sleeps, then cancel automatically.
Counter.toLayer(
	Effect.fnUntraced(function* ({ state }) {
		// Runs on sleep, not on every call
		yield* Effect.addFinalizer(() => Effect.log("actor sleeping"));

		// Background fiber scoped to the wake; cancelled on sleep
		yield* State.changes(state).pipe(
			Stream.runForEach((s) =>
				Effect.log("state changed", { count: s.count }),
			),
			Effect.forkScoped,
		);

		return Counter.of({
			/* handlers */
		});
	}),
	{ state },
);

See actor lifecycle for how wake and sleep are scheduled.

Ref & SubscriptionRef: state that outlives the process

State keeps the Ref / SubscriptionRef shape but backs it with persistence:

  • Same API: get / set / update / updateAndGet / changes.
  • Persisted and schema-typed: every commit is written, validated against the state schema.
  • Local reads: state is held in memory, so reads have no network round trip.
  • State.changes: an Effect Stream of committed changes, a reactive view of durable state.
  • No locks: at most one instance touches the state at a time, so Effect.race / Effect.timeout / interruption compose over a consistent store.
// Atomic read-modify-write against persisted, single-writer state
const next = yield* State.updateAndGet(state, (s) => ({ count: s.count + 1 }));

// A live Stream of committed changes, backed by real persistence
yield* State.changes(state).pipe(Stream.runForEach(render), Effect.forkScoped);

This is in-memory state read straight from the actor’s process; see actor state for how it persists and when it saves. For larger or queryable data, each actor also has an embedded SQLite database, covered below.

Layers & Services: dependency injection you can test

Effect tracks a program’s dependencies in its type. You declare a need for a service with Context.Service, reach it with yield*, and satisfy it by providing a Layer; the compiler won’t let the program run until every dependency is provided. That’s Effect’s dependency injection, and actors plug straight into it:

  • Services in scope: provide a service’s Layer and yield* it in the wake scope or any handler.
  • The actor is a Layer: wiring a registry is ordinary Layer composition.

Schema: typed data and errors across the wire

Every cross-boundary call is serialized, and the SDK places Schema at that boundary:

  • Encoded vs decoded types: rich values like DateTimeUtc, URL, bigint, and Uint8Array round-trip safely.
  • Real tagged errors: a Schema.TaggedErrorClass (or a Schema.Union of them) arrives on the caller as an instance you can catchTag, not a string or unknown.

The first tab declares an action whose failures are typed; the second catches one on the caller as a real instance.

Observability across actors

Traces cross the actor boundary on their own. An Effect.withSpan in a handler nests under the SDK’s server-side span, which nests under the client-side span, so a single call from client to actor and back is one connected trace with no context-propagation plumbing to wire up.

yield* room.SendMessage({ sender: "Admin", text: "Deploying now" }).pipe(
  Effect.withSpan("broadcastAnnouncement", {
    attributes: { room: room.key },
  }),
);

The Full Actor Toolbox

The rest of the actor surface, reached from inside a handler.

Actions. The typed client works actor-to-actor, and a callee’s typed error flows into the caller’s declared error channel.

// Inside ChatRoom's wake scope
const moderatorClient = yield* Moderator.client;

SendMessage: Effect.fnUntraced(function* ({ payload }) {
  const moderator = moderatorClient.getOrCreate([...address.key, "main"]);
  // BannedWordsError from Review flows through SendMessage's error channel
  yield* moderator.Review({ text: payload.text });
  // ...persist and broadcast
}),

SQLite. An embedded, per-instance database, migrated before wake and co-located with the actor.

import { db } from "rivetkit/db";

export const ChatRoomLive = ChatRoom.toLayer(
	Effect.fnUntraced(function* ({ rawRivetkitContext, state }) {
		return ChatRoom.of({
			GetHistory: () =>
				Effect.tryPromise(() =>
					rawRivetkitContext.db.execute(
						"SELECT id, sender, text FROM messages ORDER BY id",
					),
				).pipe(Effect.orDie),
		});
	}),
	{
		state,
		db: db({
			onMigrate: async (client) => {
				await client.execute(`
					CREATE TABLE IF NOT EXISTS messages (
						id INTEGER PRIMARY KEY AUTOINCREMENT,
						sender TEXT NOT NULL,
						text TEXT NOT NULL
					)
				`);
			},
		}),
	},
);

Events. Broadcast from a handler to every connected client, no pub/sub service or separate WebSocket server.

SendMessage: Effect.fnUntraced(function* ({ payload }) {
  yield* persist(payload);
  rawRivetkitContext.broadcast("newMessage", {
    sender: payload.sender,
    text: payload.text,
  });
}),

Queues. Durable, ordered background work; a consumer maps to a forkScoped fiber that lives as long as the actor is awake. Available now through the scheduler while typed wrappers land.

// Dispatch an action to run later, by name, with the same payload a client sends
rawRivetkitContext.schedule.after(1_000, "SendMessage", {
	sender: "Admin",
	text: `Welcome to the room, ${name}!`,
});

(Uses of rawRivetkitContext are for parts of Rivet that don’t have a native Effect API yet.)

Putting It Together: a realtime chat room

One actor that uses all of the above: a custom service for domain rules, persisted state with a forked live change stream, a sleep finalizer, actor-to-actor RPC whose typed error flows through this actor’s channel, and SQLite plus broadcast for durable history and realtime. The full runnable version is the chat-room-effect example.

import { Actor, State } from "@rivetkit/effect";
import { Context, DateTime, Effect, Layer, Schema, Stream } from "effect";
import { db } from "rivetkit/db";
import { Moderator } from "../moderator/api.ts";
import { ChatRoom, MemberNotInRoomError } from "./api.ts";

// A custom service used like any other Effect program
export class RoomPolicy extends Context.Service<
	RoomPolicy,
	{
		readonly requireMember: (
			members: ReadonlyArray<{ readonly name: string }>,
			name: string,
		) => Effect.Effect<void, MemberNotInRoomError>;
	}
>()("RoomPolicy") {}

export const ChatRoomLive = ChatRoom.toLayer(
	Effect.fnUntraced(function* ({ rawRivetkitContext, state }) {
		const address = yield* Actor.CurrentAddress;
		const roomPolicy = yield* RoomPolicy;
		const moderatorClient = yield* Moderator.client;

		// Finalizers run on sleep
		yield* Effect.addFinalizer(() =>
			Effect.log("room sleeping", { actorId: address.actorId }),
		);

		// A live, durable view of state changes, cancelled on sleep
		yield* State.changes(state).pipe(
			Stream.runForEach((s) =>
				Effect.log("members", { count: s.members.length }),
			),
			Effect.forkScoped,
		);

		// Compose persisted state with a service-owned domain guard
		const ensureMember = (name: string) =>
			State.get(state).pipe(
				Effect.orDie,
				Effect.flatMap((s) =>
					roomPolicy.requireMember(s.members, name),
				),
			);

		return ChatRoom.of({
			Join: Effect.fnUntraced(function* ({ payload }) {
				const next = yield* State.updateAndGet(state, (s) => ({
					...s,
					members: [...s.members, { name: payload.name }],
				})).pipe(Effect.orDie);
				rawRivetkitContext.broadcast("memberJoined", {
					name: payload.name,
				});
				return { memberCount: next.members.length };
			}),
			SendMessage: Effect.fnUntraced(function* ({ payload }) {
				yield* ensureMember(payload.sender);

				// Actor-to-actor RPC. A BannedWordsError here flows through
				// SendMessage's declared error channel.
				const moderator = moderatorClient.getOrCreate([
					...address.key,
					"main",
				]);
				yield* moderator.Review({ text: payload.text });

				yield* Effect.tryPromise(() =>
					rawRivetkitContext.db.execute(
						"INSERT INTO messages (sender, text) VALUES (?, ?)",
						payload.sender,
						payload.text,
					),
				).pipe(Effect.orDie);

				rawRivetkitContext.broadcast("newMessage", payload);
			}),
			Archive: () => Effect.sync(() => rawRivetkitContext.destroy()),
		});
	}),
	{
		state: {
			schema: Schema.Struct({
				members: Schema.Array(Schema.Struct({ name: Schema.String })),
			}),
			initialValue: () => ({ members: [] }),
		},
		db: db({
			onMigrate: async (client) => {
				await client.execute(`
					CREATE TABLE IF NOT EXISTS messages (
						id INTEGER PRIMARY KEY AUTOINCREMENT,
						sender TEXT NOT NULL,
						text TEXT NOT NULL
					)
				`);
			},
		}),
		name: "Chat Room",
		icon: "comments",
	},
);

Building Agents with Effect AI

Effect AI gives you a provider-agnostic LanguageModel service, and an actor is the perfect place to run it:

  • Memory is just state: persist the message history in actor state, and the model sees every prior turn, even after a restart.
  • The model is a Layer: swap a real provider for a mock without touching actor code.
  • One agent per conversation: address it by key, single-writer, so concurrent messages can’t race on its history.

The handler persists the user turn, calls the model with the running history, then persists the reply. The full example is ai-agent-effect.

import { Actor, State } from "@rivetkit/effect";
import { Effect, Schema } from "effect";
import { LanguageModel } from "effect/unstable/ai";
import { Agent, Message } from "./api.ts";

export const AgentLive = Agent.toLayer(
  Effect.fnUntraced(function* ({ state }) {
    return Agent.of({
      SendMessage: Effect.fnUntraced(function* ({ payload }) {
        // Persist the user turn before calling the model, so the message
        // it replies to survives a restart mid-call.
        const history = yield* State.updateAndGet(state, (turns) => [
          ...turns,
          { role: "user", content: payload.content } satisfies Message,
        ]).pipe(Effect.orDie);

        // The handler requires the `LanguageModel` service but never builds
        // it. Sending the whole history on every call is the agent's memory.
        const response = yield* LanguageModel.generateText({
          prompt: toPrompt(history),
        }).pipe(Effect.orDie);

        // Persist the assistant turn.
        yield* State.update(state, (turns) => [
          ...turns,
          { role: "assistant", content: response.text } satisfies Message,
        ]).pipe(Effect.orDie);

        return response.text;
      }),
      GetHistory: () => State.get(state).pipe(Effect.orDie),
    });
  }),
  {
    // `Message` is Schema.Struct({ role, content }); state is the running log.
    state: { schema: Schema.Array(Message), initialValue: () => [] },
    name: "Agent",
    icon: "robot",
  },
);

Beta & Feature Support

The Effect SDK is in beta, and the @rivetkit/effect API may change between releases.

  • Typed today: actions, state, the typed client, typed errors, logging, sleep, actor address, and the registry.
  • Available via raw context: events and broadcast, schedule, embedded SQLite, queues, connections, and lifecycle hooks, until typed wrappers land.

The feature support matrix tracks the state of each feature.

The raw escape hatch

Every wake function receives rawRivetkitContext, the underlying RivetKit actor context. It points at the same actor as the typed state argument, so you can mix both freely. Calls through it are not validated by Schema; payloads are typed as in the base RivetKit API.

Counter.toLayer(
	Effect.fnUntraced(function* ({ rawRivetkitContext, state }) {
		return Counter.of({
			Increment: Effect.fnUntraced(function* ({ payload }) {
				const next = yield* State.updateAndGet(state, (s) => ({
					count: s.count + payload.amount,
				}));
				rawRivetkitContext.broadcast("newCount", next.count); // untyped, full access
				return next.count;
			}),
		});
	}),
	{ state },
);

Give feedback

The SDK is shaped by what you build with it. Join our Discord to tell us what to wrap next and to contribute.

Get Started

npm install rivetkit @rivetkit/effect effect @effect/platform-node