Subscriptions
This example demonstrates real-time GraphQL subscriptions using Effect Streams. Clients can subscribe to events and receive updates as they happen.
Running the Example
Section titled “Running the Example”pnpm example:subscriptions# Server starts at http://localhost:4002# WebSocket endpoint: ws://localhost:4002/graphqlWhat You’ll Learn
Section titled “What You’ll Learn”- Defining subscriptions with Effect Streams
- Using subscriptions with arguments
- Broadcasting messages with Effect PubSub
- WebSocket transport with graphql-ws protocol
Subscription Basics
Section titled “Subscription Basics”Subscriptions in Effect GQL use Effect Streams to emit values over time:
import { Stream, Duration, Schedule } from "effect"
subscription("tick", { type: Tick, description: "Emits a tick every second", subscribe: () => Effect.succeed( Stream.iterate(1, (n) => n + 1).pipe( Stream.schedule(Schedule.spaced(Duration.seconds(1))), Stream.map((count) => ({ count, timestamp: Date.now(), })) ) ),})Subscriptions with Arguments
Section titled “Subscriptions with Arguments”Subscriptions can accept arguments just like queries:
subscription("countdown", { type: S.Number, args: S.Struct({ from: S.Number }), description: "Counts down from a number to zero", subscribe: ({ from }) => Effect.succeed( Stream.range(0, from).pipe( Stream.map((i) => from - i), Stream.schedule(Schedule.spaced(Duration.seconds(1))) ) ),})Broadcasting with PubSub
Section titled “Broadcasting with PubSub”For real-time messaging, use Effect’s PubSub to broadcast to all subscribers:
import { PubSub, Ref } from "effect"
// Create a PubSub for messagesconst messagePubSub = Effect.runSync(PubSub.unbounded<Message>())
// Mutation publishes to PubSubmutation("sendMessage", { args: S.Struct({ content: S.String, author: S.String }), type: Message, resolve: (args) => Effect.gen(function* () { const message: Message = { id: String(++messageIdCounter), content: args.content, author: args.author, createdAt: Date.now(), }
// Broadcast to all subscribers yield* PubSub.publish(messagePubSub, message)
return message }),})
// Subscription receives from PubSubsubscription("newMessage", { type: Message, description: "Receive real-time message updates", subscribe: () => Effect.gen(function* () { const queue = yield* PubSub.subscribe(messagePubSub) return Stream.fromQueue(queue) }),})Filtered Subscriptions
Section titled “Filtered Subscriptions”Filter subscription streams based on arguments:
subscription("messagesByAuthor", { type: Message, args: S.Struct({ author: S.String }), description: "Receive messages from a specific author", subscribe: ({ author }) => Effect.gen(function* () { const queue = yield* PubSub.subscribe(messagePubSub) return Stream.fromQueue(queue).pipe( Stream.filter((msg) => msg.author === author) ) }),})Enabling WebSocket Support
Section titled “Enabling WebSocket Support”To enable subscriptions over WebSocket, pass the schema to the server options:
import { serve } from "@effect-gql/node"
serve(app, Layer.empty, { port: 4002, subscriptions: { schema, path: "/graphql", }, onStart: (url) => { console.log(`Server ready at ${url}`) console.log(`WebSocket: ws://localhost:4002/graphql`) },})Complete Code
Section titled “Complete Code”import { Effect, Stream, Layer, Duration, PubSub, Schedule } from "effect"import * as S from "effect/Schema"import { HttpRouter, HttpServerResponse } from "@effect/platform"import { GraphQLSchemaBuilder, query, mutation, subscription, makeGraphQLRouter,} from "@effect-gql/core"import { serve } from "@effect-gql/node"
// Domain modelsconst Tick = S.Struct({ count: S.Number, timestamp: S.Number,})
const Message = S.Struct({ id: S.String, content: S.String, author: S.String, createdAt: S.Number,})
// PubSub for broadcasting messageslet messageIdCounter = 0const messagePubSub = Effect.runSync(PubSub.unbounded<Message>())
// Build schemaconst schema = GraphQLSchemaBuilder.empty .pipe( query("hello", { type: S.String, resolve: () => Effect.succeed("Hello!"), }),
mutation("sendMessage", { args: S.Struct({ content: S.String, author: S.String }), type: Message, resolve: (args) => Effect.gen(function* () { const message = { id: String(++messageIdCounter), content: args.content, author: args.author, createdAt: Date.now(), } yield* PubSub.publish(messagePubSub, message) return message }), }),
// Tick subscription subscription("tick", { type: Tick, subscribe: () => Effect.succeed( Stream.iterate(1, (n) => n + 1).pipe( Stream.schedule(Schedule.spaced(Duration.seconds(1))), Stream.map((count) => ({ count, timestamp: Date.now() })) ) ), }),
// New message subscription subscription("newMessage", { type: Message, subscribe: () => Effect.gen(function* () { const queue = yield* PubSub.subscribe(messagePubSub) return Stream.fromQueue(queue) }), }), ) .buildSchema()
// Router and server setupconst graphqlRouter = makeGraphQLRouter(schema, Layer.empty, { path: "/graphql", graphiql: { path: "/graphiql", endpoint: "/graphql" },})
const app = HttpRouter.empty.pipe( HttpRouter.get("/health", HttpServerResponse.json({ status: "ok" })), HttpRouter.concat(graphqlRouter))
serve(app, Layer.empty, { port: 4002, subscriptions: { schema, path: "/graphql" },})Testing Subscriptions
Section titled “Testing Subscriptions”In GraphiQL
Section titled “In GraphiQL”- Open GraphiQL at
http://localhost:4002/graphiql - Start a subscription:
subscription { tick { count timestamp }}- In another tab, subscribe to messages:
subscription { newMessage { id content author }}- Send a message via mutation:
mutation { sendMessage(content: "Hello!", author: "Alice") { id }}The message will appear in the subscription tab.
With a WebSocket Client
Section titled “With a WebSocket Client”import { createClient } from 'graphql-ws';
const client = createClient({ url: 'ws://localhost:4002/graphql',});
// Subscribe to ticksconst unsubscribe = client.subscribe( { query: 'subscription { tick { count timestamp } }', }, { next: (data) => console.log('Tick:', data), error: (err) => console.error(err), complete: () => console.log('Complete'), });
// Later: unsubscribe()Next Steps
Section titled “Next Steps”- Full-Featured Example - Complete application patterns
- Subscriptions Guide - Deep dive into subscription patterns