Subscriptions
Effect GQL supports real-time subscriptions over two transport protocols: WebSocket (graphql-ws) and Server-Sent Events (graphql-sse). Both transports use Effect’s Stream type for subscriptions, providing a natural way to emit values over time.
Overview
Section titled “Overview”GraphQL subscriptions enable clients to receive live updates when data changes:
- Client opens a persistent connection to the server
- Client sends a subscription query
- Server pushes updates to the client as they occur
- Connection stays open until client disconnects or subscription completes
Choosing a Transport
Section titled “Choosing a Transport”| Feature | WebSocket | SSE |
|---|---|---|
| Direction | Bidirectional | Server → Client only |
| Protocol | Dedicated protocol | HTTP/1.1+ |
| Proxy support | May require configuration | Works out of the box |
| Browser support | Native WebSocket | Native EventSource |
| Connection limits | Unlimited | 6 per domain (HTTP/1.1) |
| Reconnection | Manual | Built-in |
| Dependencies | ws package (Node.js) | None |
Use WebSocket when:
- You need bidirectional communication
- You have many concurrent subscriptions
- You need maximum performance
Use SSE when:
- You only need server-to-client updates
- You’re behind strict proxies/firewalls
- You want simpler infrastructure
- You prefer HTTP-based solutions
Defining Subscriptions
Section titled “Defining Subscriptions”Subscription resolvers are the same regardless of transport. Use the subscription function to register subscription fields:
import { GraphQLSchemaBuilder, subscription } from "@effect-gql/core"import { Effect, Stream } from "effect"import * as S from "effect/Schema"
const MessageSchema = S.Struct({ id: S.String, content: S.String, createdAt: S.String})
const builder = GraphQLSchemaBuilder.empty.pipe( subscription("messageAdded", { type: MessageSchema, subscribe: () => Effect.gen(function* () { const pubsub = yield* PubSubService // Return a Stream that emits new messages return pubsub.subscribe("messages") }) }))Subscriptions with Arguments
Section titled “Subscriptions with Arguments”subscription("messagesInChannel", { type: MessageSchema, args: S.Struct({ channelId: S.String }), subscribe: ({ channelId }) => Effect.gen(function* () { const pubsub = yield* PubSubService return pubsub.subscribe("messages").pipe( Stream.filter((msg) => msg.channelId === channelId) ) })})WebSocket Transport
Section titled “WebSocket Transport”WebSocket subscriptions use the graphql-ws protocol.
Server Setup
Section titled “Server Setup”import { toRouter } from "@effect-gql/core"import { serve } from "@effect-gql/node"
const schema = builder.buildSchema()const router = toRouter(builder, serviceLayer, { graphiql: true })
serve(router, serviceLayer, { port: 4000, subscriptions: { schema, // Required: the GraphQL schema path: "/graphql" // WebSocket path (default: "/graphql") }, onStart: (url) => console.log(`Server at ${url}`)})import { createBunWSHandlers } from "@effect-gql/bun"
const { upgrade, websocket } = createBunWSHandlers(schema, serviceLayer, { path: "/graphql"})
Bun.serve({ port: 4000, fetch(req, server) { if (upgrade(req, server)) { return // Upgraded to WebSocket } // Handle HTTP requests... }, websocket,})import express from "express"import { createServer } from "node:http"import { toMiddleware, attachWebSocket } from "@effect-gql/express"
const app = express()app.use(toMiddleware(router, Layer.empty))
const server = createServer(app)attachWebSocket(server, schema, Layer.empty, { path: "/graphql" })
server.listen(4000)import { createGraphQLWSServer } from "@effect-gql/node"import { createServer } from "node:http"
const httpServer = createServer(httpHandler)
const { handleUpgrade, close } = createGraphQLWSServer(schema, serviceLayer, { path: "/graphql"})
httpServer.on("upgrade", (request, socket, head) => { handleUpgrade(request, socket, head)})
httpServer.listen(4000)WebSocket Lifecycle Hooks
Section titled “WebSocket Lifecycle Hooks”serve(router, serviceLayer, { subscriptions: { schema,
// Called when client connects (for authentication) onConnect: (connectionParams) => Effect.gen(function* () { const token = connectionParams.authToken as string if (!token) return false // Reject connection
const user = yield* AuthService.validateToken(token) return { user } // Merged into GraphQL context }),
// Called when client disconnects onDisconnect: (ctx) => Effect.log("Client disconnected"),
// Called for each subscription onSubscribe: (ctx, message) => Effect.log(`Subscription started: ${message.payload.operationName}`),
// Called when subscription completes onComplete: (ctx, message) => Effect.log(`Subscription completed: ${message.id}`),
// Called on errors onError: (ctx, errors) => Effect.logError("Subscription error", errors) }})WebSocket Client
Section titled “WebSocket Client”import { createClient } from "graphql-ws"
const client = createClient({ url: "ws://localhost:4000/graphql", connectionParams: { authToken: localStorage.getItem("token") }})
const unsubscribe = client.subscribe( { query: `subscription { messageAdded { id content } }`, }, { next: (data) => console.log("New message:", data), error: (err) => console.error("Error:", err), complete: () => console.log("Complete") })SSE Transport
Section titled “SSE Transport”SSE subscriptions use the graphql-sse protocol (distinct connections mode).
Server Setup
Section titled “Server Setup”import { createServer } from "node:http"import { createSSEHandler } from "@effect-gql/node"
const sseHandler = createSSEHandler(schema, serviceLayer, { onConnect: (request, headers) => Effect.gen(function* () { const token = headers.get("authorization")?.replace("Bearer ", "") const user = yield* AuthService.validateToken(token) return { user } }),})
const server = createServer(async (req, res) => { const url = new URL(req.url ?? "/", `http://${req.headers.host}`)
if (url.pathname === "/graphql/stream" && req.method === "POST") { await sseHandler(req, res) return }
// Handle other requests...})
server.listen(4000)import { createBunSSEHandlers } from "@effect-gql/bun"
const sse = createBunSSEHandlers(schema, serviceLayer, { path: "/graphql/stream"})
Bun.serve({ port: 4000, fetch(req, server) { if (sse.shouldHandle(req)) { return sse.handle(req) } // Handle other requests... },})import express from "express"import { sseMiddleware } from "@effect-gql/express"
const app = express()app.use(express.json())
app.use(sseMiddleware(schema, Layer.empty, { path: "/graphql/stream"}))
app.listen(4000)import { createSSEHandler } from "@effect-gql/web"
const sseHandler = createSSEHandler(schema, Layer.empty)
export default { async fetch(request: Request) { const url = new URL(request.url)
if (url.pathname === "/graphql/stream" && request.method === "POST") { return await sseHandler(request) } // Handle other requests... }}SSE Lifecycle Hooks
Section titled “SSE Lifecycle Hooks”const sseHandler = createSSEHandler(schema, Layer.empty, { // Called before subscription starts (for auth) onConnect: (request, headers) => Effect.gen(function* () { const token = headers.get("authorization") const user = yield* AuthService.validateToken(token) return { user } }),
// Called when subscription starts streaming onSubscribe: (ctx) => Effect.log("Subscription started"),
// Called when subscription completes onComplete: (ctx) => Effect.log("Subscription completed"),
// Called when client disconnects onDisconnect: (ctx) => Effect.log("Client disconnected"),
// Called on errors onError: (ctx, error) => Effect.logError("Error", error)})SSE Event Format
Section titled “SSE Event Format”event: nextdata: {"data":{"messageAdded":{"id":"1","content":"Hello"}}}
event: nextdata: {"data":{"messageAdded":{"id":"2","content":"World"}}}
event: completedata:SSE Client
Section titled “SSE Client”Using the Fetch API:
const response = await fetch("/graphql/stream", { method: "POST", headers: { "Content-Type": "application/json", "Accept": "text/event-stream", "Authorization": "Bearer your-token" }, body: JSON.stringify({ query: "subscription { messageAdded { id content } }", }),})
const reader = response.body.getReader()const decoder = new TextDecoder()
while (true) { const { done, value } = await reader.read() if (done) break // Parse SSE events from decoder.decode(value)...}Using graphql-sse client:
import { createClient } from "graphql-sse"
const client = createClient({ url: "/graphql/stream",})
for await (const result of client.iterate({ query: "subscription { messageAdded { id content } }",})) { console.log("New message:", result.data?.messageAdded)}Both Transports
Section titled “Both Transports”You can offer both WebSocket and SSE subscriptions simultaneously:
import { createServer } from "node:http"import { createGraphQLWSServer, createSSEHandler } from "@effect-gql/node"
const httpServer = createServer(async (req, res) => { const url = new URL(req.url ?? "/", `http://${req.headers.host}`)
// SSE subscriptions if (url.pathname === "/graphql/stream" && req.method === "POST") { await sseHandler(req, res) return }
// Regular GraphQL...})
// WebSocket subscriptionsconst { handleUpgrade } = createGraphQLWSServer(schema, Layer.empty)httpServer.on("upgrade", handleUpgrade)
httpServer.listen(4000, () => { console.log("WebSocket: ws://localhost:4000/graphql") console.log("SSE: http://localhost:4000/graphql/stream")})Building a PubSub Service
Section titled “Building a PubSub Service”A typical subscription setup requires a pub/sub system. Here’s a simple in-memory implementation:
import { Context, Effect, Layer, Stream, PubSub } from "effect"
class PubSubService extends Context.Tag("PubSubService")<PubSubService, { publish: <A>(topic: string, message: A) => Effect.Effect<void> subscribe: <A>(topic: string) => Stream.Stream<A>}>() {}
const makePubSubService = Effect.gen(function* () { const topics = new Map<string, PubSub.PubSub<any>>()
const getOrCreateTopic = (topic: string) => Effect.gen(function* () { let pubsub = topics.get(topic) if (!pubsub) { pubsub = yield* PubSub.unbounded<any>() topics.set(topic, pubsub) } return pubsub })
return { publish: <A>(topic: string, message: A) => Effect.gen(function* () { const pubsub = yield* getOrCreateTopic(topic) yield* PubSub.publish(pubsub, message) }),
subscribe: <A>(topic: string): Stream.Stream<A> => Stream.unwrap( Effect.gen(function* () { const pubsub = yield* getOrCreateTopic(topic) return Stream.fromPubSub(pubsub) }) ) }})
const PubSubServiceLive = Layer.effect(PubSubService, makePubSubService)Using PubSub in Resolvers
Section titled “Using PubSub in Resolvers”const builder = GraphQLSchemaBuilder.empty.pipe( // Mutation triggers subscription updates mutation("sendMessage", { type: MessageSchema, args: S.Struct({ channelId: S.String, content: S.String }), resolve: ({ channelId, content }) => Effect.gen(function* () { const pubsub = yield* PubSubService const message = { id: crypto.randomUUID(), channelId, content, createdAt: new Date().toISOString() }
yield* MessageRepository.save(message) yield* pubsub.publish(`channel:${channelId}`, message)
return message }) }),
// Subscription listens for new messages subscription("messageAdded", { type: MessageSchema, args: S.Struct({ channelId: S.String }), subscribe: ({ channelId }) => Effect.gen(function* () { const pubsub = yield* PubSubService return pubsub.subscribe(`channel:${channelId}`) }) }))Advanced Patterns
Section titled “Advanced Patterns”Filtering Subscriptions
Section titled “Filtering Subscriptions”Filter events server-side to reduce bandwidth:
subscription("userActivity", { type: ActivitySchema, args: S.Struct({ userId: S.optional(S.String), types: S.optional(S.Array(S.String)) }), subscribe: ({ userId, types }) => Effect.gen(function* () { const pubsub = yield* PubSubService
return pubsub.subscribe<Activity>("activity").pipe( Stream.filter((activity) => userId ? activity.userId === userId : true ), Stream.filter((activity) => types?.length ? types.includes(activity.type) : true ) ) })})Batching Updates
Section titled “Batching Updates”Batch rapid updates to reduce client processing:
subscription("stockPrices", { type: S.Array(StockPriceSchema), args: S.Struct({ symbols: S.Array(S.String) }), subscribe: ({ symbols }) => Effect.gen(function* () { const pubsub = yield* PubSubService
return pubsub.subscribe<StockPrice>("prices").pipe( Stream.filter((price) => symbols.includes(price.symbol)), Stream.groupedWithin(100, { duration: "100 millis" }), Stream.map((prices) => [...prices]) ) })})Initial Data with Subscription
Section titled “Initial Data with Subscription”Send initial data before streaming updates:
subscription("messages", { type: MessageSchema, args: S.Struct({ channelId: S.String }), subscribe: ({ channelId }) => Effect.gen(function* () { const db = yield* Database const pubsub = yield* PubSubService
const recentMessages = yield* db.getRecentMessages(channelId, 10) const initial = Stream.fromIterable(recentMessages) const live = pubsub.subscribe(`channel:${channelId}`)
return Stream.concat(initial, live) })})Complexity Limiting
Section titled “Complexity Limiting”Protect expensive subscriptions with complexity limits:
// WebSocketserve(router, serviceLayer, { subscriptions: { schema, complexity: { maxComplexity: 100, maxDepth: 5 }, fieldComplexities: builder.getFieldComplexities(), }})
// SSEconst sseHandler = createSSEHandler(schema, Layer.empty, { complexity: { maxComplexity: 100, maxDepth: 5 }, fieldComplexities: builder.getFieldComplexities(),})Client Libraries
Section titled “Client Libraries”WebSocket Clients
Section titled “WebSocket Clients”| Library | Import |
|---|---|
| graphql-ws | import { createClient } from "graphql-ws" |
| Apollo Client | import { GraphQLWsLink } from "@apollo/client/link/subscriptions" |
| urql | import { subscriptionExchange } from "urql" |
SSE Clients
Section titled “SSE Clients”| Library | Import |
|---|---|
| graphql-sse | import { createClient } from "graphql-sse" |
| Native | EventSource or fetch API |
Platform Support
Section titled “Platform Support”| Platform | WebSocket | SSE |
|---|---|---|
| Node.js | @effect-gql/node (requires ws) | @effect-gql/node |
| Bun | @effect-gql/bun (built-in) | @effect-gql/bun |
| Express | @effect-gql/express (requires ws) | @effect-gql/express |
| Workers/Deno | Not supported | @effect-gql/web |
Next Steps
Section titled “Next Steps”- Server Integration - Complete server setup
- DataLoader - Optimize database queries in subscriptions
- Complexity Limiting - Protect expensive subscriptions
- Error Handling - Handle subscription errors