Skip to content

Subscriptions

This example demonstrates real-time GraphQL subscriptions using Effect Streams. Clients can subscribe to events and receive updates as they happen.

4002/graphiql
pnpm example:subscriptions
# Server starts at http://localhost:4002
# WebSocket endpoint: ws://localhost:4002/graphql
  • Defining subscriptions with Effect Streams
  • Using subscriptions with arguments
  • Broadcasting messages with Effect PubSub
  • WebSocket transport with graphql-ws protocol

Subscriptions in Effect GQL use Effect Streams to emit values over time:

import { Stream, Duration, Schedule } from "effect"
subscription("tick", {
type: Tick,
description: "Emits a tick every second",
subscribe: () =>
Effect.succeed(
Stream.iterate(1, (n) => n + 1).pipe(
Stream.schedule(Schedule.spaced(Duration.seconds(1))),
Stream.map((count) => ({
count,
timestamp: Date.now(),
}))
)
),
})

Subscriptions can accept arguments just like queries:

subscription("countdown", {
type: S.Number,
args: S.Struct({ from: S.Number }),
description: "Counts down from a number to zero",
subscribe: ({ from }) =>
Effect.succeed(
Stream.range(0, from).pipe(
Stream.map((i) => from - i),
Stream.schedule(Schedule.spaced(Duration.seconds(1)))
)
),
})

For real-time messaging, use Effect’s PubSub to broadcast to all subscribers:

import { PubSub, Ref } from "effect"
// Create a PubSub for messages
const messagePubSub = Effect.runSync(PubSub.unbounded<Message>())
// Mutation publishes to PubSub
mutation("sendMessage", {
args: S.Struct({ content: S.String, author: S.String }),
type: Message,
resolve: (args) =>
Effect.gen(function* () {
const message: Message = {
id: String(++messageIdCounter),
content: args.content,
author: args.author,
createdAt: Date.now(),
}
// Broadcast to all subscribers
yield* PubSub.publish(messagePubSub, message)
return message
}),
})
// Subscription receives from PubSub
subscription("newMessage", {
type: Message,
description: "Receive real-time message updates",
subscribe: () =>
Effect.gen(function* () {
const queue = yield* PubSub.subscribe(messagePubSub)
return Stream.fromQueue(queue)
}),
})

Filter subscription streams based on arguments:

subscription("messagesByAuthor", {
type: Message,
args: S.Struct({ author: S.String }),
description: "Receive messages from a specific author",
subscribe: ({ author }) =>
Effect.gen(function* () {
const queue = yield* PubSub.subscribe(messagePubSub)
return Stream.fromQueue(queue).pipe(
Stream.filter((msg) => msg.author === author)
)
}),
})

To enable subscriptions over WebSocket, pass the schema to the server options:

import { serve } from "@effect-gql/node"
serve(app, Layer.empty, {
port: 4002,
subscriptions: {
schema,
path: "/graphql",
},
onStart: (url) => {
console.log(`Server ready at ${url}`)
console.log(`WebSocket: ws://localhost:4002/graphql`)
},
})
import { Effect, Stream, Layer, Duration, PubSub, Schedule } from "effect"
import * as S from "effect/Schema"
import { HttpRouter, HttpServerResponse } from "@effect/platform"
import {
GraphQLSchemaBuilder,
query,
mutation,
subscription,
makeGraphQLRouter,
} from "@effect-gql/core"
import { serve } from "@effect-gql/node"
// Domain models
const Tick = S.Struct({
count: S.Number,
timestamp: S.Number,
})
const Message = S.Struct({
id: S.String,
content: S.String,
author: S.String,
createdAt: S.Number,
})
// PubSub for broadcasting messages
let messageIdCounter = 0
const messagePubSub = Effect.runSync(PubSub.unbounded<Message>())
// Build schema
const schema = GraphQLSchemaBuilder.empty
.pipe(
query("hello", {
type: S.String,
resolve: () => Effect.succeed("Hello!"),
}),
mutation("sendMessage", {
args: S.Struct({ content: S.String, author: S.String }),
type: Message,
resolve: (args) =>
Effect.gen(function* () {
const message = {
id: String(++messageIdCounter),
content: args.content,
author: args.author,
createdAt: Date.now(),
}
yield* PubSub.publish(messagePubSub, message)
return message
}),
}),
// Tick subscription
subscription("tick", {
type: Tick,
subscribe: () =>
Effect.succeed(
Stream.iterate(1, (n) => n + 1).pipe(
Stream.schedule(Schedule.spaced(Duration.seconds(1))),
Stream.map((count) => ({ count, timestamp: Date.now() }))
)
),
}),
// New message subscription
subscription("newMessage", {
type: Message,
subscribe: () =>
Effect.gen(function* () {
const queue = yield* PubSub.subscribe(messagePubSub)
return Stream.fromQueue(queue)
}),
}),
)
.buildSchema()
// Router and server setup
const graphqlRouter = makeGraphQLRouter(schema, Layer.empty, {
path: "/graphql",
graphiql: { path: "/graphiql", endpoint: "/graphql" },
})
const app = HttpRouter.empty.pipe(
HttpRouter.get("/health", HttpServerResponse.json({ status: "ok" })),
HttpRouter.concat(graphqlRouter)
)
serve(app, Layer.empty, {
port: 4002,
subscriptions: { schema, path: "/graphql" },
})
  1. Open GraphiQL at http://localhost:4002/graphiql
  2. Start a subscription:
subscription {
tick {
count
timestamp
}
}
  1. In another tab, subscribe to messages:
subscription {
newMessage {
id
content
author
}
}
  1. Send a message via mutation:
mutation {
sendMessage(content: "Hello!", author: "Alice") {
id
}
}

The message will appear in the subscription tab.

import { createClient } from 'graphql-ws';
const client = createClient({
url: 'ws://localhost:4002/graphql',
});
// Subscribe to ticks
const unsubscribe = client.subscribe(
{
query: 'subscription { tick { count timestamp } }',
},
{
next: (data) => console.log('Tick:', data),
error: (err) => console.error(err),
complete: () => console.log('Complete'),
}
);
// Later: unsubscribe()