Skip to content

Subscriptions

Effect GQL supports real-time subscriptions over two transport protocols: WebSocket (graphql-ws) and Server-Sent Events (graphql-sse). Both transports use Effect’s Stream type for subscriptions, providing a natural way to emit values over time.

GraphQL subscriptions enable clients to receive live updates when data changes:

  1. Client opens a persistent connection to the server
  2. Client sends a subscription query
  3. Server pushes updates to the client as they occur
  4. Connection stays open until client disconnects or subscription completes
FeatureWebSocketSSE
DirectionBidirectionalServer → Client only
ProtocolDedicated protocolHTTP/1.1+
Proxy supportMay require configurationWorks out of the box
Browser supportNative WebSocketNative EventSource
Connection limitsUnlimited6 per domain (HTTP/1.1)
ReconnectionManualBuilt-in
Dependenciesws package (Node.js)None

Use WebSocket when:

  • You need bidirectional communication
  • You have many concurrent subscriptions
  • You need maximum performance

Use SSE when:

  • You only need server-to-client updates
  • You’re behind strict proxies/firewalls
  • You want simpler infrastructure
  • You prefer HTTP-based solutions

Subscription resolvers are the same regardless of transport. Use the subscription function to register subscription fields:

import { GraphQLSchemaBuilder, subscription } from "@effect-gql/core"
import { Effect, Stream } from "effect"
import * as S from "effect/Schema"
const MessageSchema = S.Struct({
id: S.String,
content: S.String,
createdAt: S.String
})
const builder = GraphQLSchemaBuilder.empty.pipe(
subscription("messageAdded", {
type: MessageSchema,
subscribe: () => Effect.gen(function* () {
const pubsub = yield* PubSubService
// Return a Stream that emits new messages
return pubsub.subscribe("messages")
})
})
)
subscription("messagesInChannel", {
type: MessageSchema,
args: S.Struct({ channelId: S.String }),
subscribe: ({ channelId }) => Effect.gen(function* () {
const pubsub = yield* PubSubService
return pubsub.subscribe("messages").pipe(
Stream.filter((msg) => msg.channelId === channelId)
)
})
})

WebSocket subscriptions use the graphql-ws protocol.

import { toRouter } from "@effect-gql/core"
import { serve } from "@effect-gql/node"
const schema = builder.buildSchema()
const router = toRouter(builder, serviceLayer, { graphiql: true })
serve(router, serviceLayer, {
port: 4000,
subscriptions: {
schema, // Required: the GraphQL schema
path: "/graphql" // WebSocket path (default: "/graphql")
},
onStart: (url) => console.log(`Server at ${url}`)
})
serve(router, serviceLayer, {
subscriptions: {
schema,
// Called when client connects (for authentication)
onConnect: (connectionParams) => Effect.gen(function* () {
const token = connectionParams.authToken as string
if (!token) return false // Reject connection
const user = yield* AuthService.validateToken(token)
return { user } // Merged into GraphQL context
}),
// Called when client disconnects
onDisconnect: (ctx) => Effect.log("Client disconnected"),
// Called for each subscription
onSubscribe: (ctx, message) =>
Effect.log(`Subscription started: ${message.payload.operationName}`),
// Called when subscription completes
onComplete: (ctx, message) =>
Effect.log(`Subscription completed: ${message.id}`),
// Called on errors
onError: (ctx, errors) => Effect.logError("Subscription error", errors)
}
})
import { createClient } from "graphql-ws"
const client = createClient({
url: "ws://localhost:4000/graphql",
connectionParams: {
authToken: localStorage.getItem("token")
}
})
const unsubscribe = client.subscribe(
{
query: `subscription { messageAdded { id content } }`,
},
{
next: (data) => console.log("New message:", data),
error: (err) => console.error("Error:", err),
complete: () => console.log("Complete")
}
)

SSE subscriptions use the graphql-sse protocol (distinct connections mode).

import { createServer } from "node:http"
import { createSSEHandler } from "@effect-gql/node"
const sseHandler = createSSEHandler(schema, serviceLayer, {
onConnect: (request, headers) => Effect.gen(function* () {
const token = headers.get("authorization")?.replace("Bearer ", "")
const user = yield* AuthService.validateToken(token)
return { user }
}),
})
const server = createServer(async (req, res) => {
const url = new URL(req.url ?? "/", `http://${req.headers.host}`)
if (url.pathname === "/graphql/stream" && req.method === "POST") {
await sseHandler(req, res)
return
}
// Handle other requests...
})
server.listen(4000)
const sseHandler = createSSEHandler(schema, Layer.empty, {
// Called before subscription starts (for auth)
onConnect: (request, headers) => Effect.gen(function* () {
const token = headers.get("authorization")
const user = yield* AuthService.validateToken(token)
return { user }
}),
// Called when subscription starts streaming
onSubscribe: (ctx) => Effect.log("Subscription started"),
// Called when subscription completes
onComplete: (ctx) => Effect.log("Subscription completed"),
// Called when client disconnects
onDisconnect: (ctx) => Effect.log("Client disconnected"),
// Called on errors
onError: (ctx, error) => Effect.logError("Error", error)
})
event: next
data: {"data":{"messageAdded":{"id":"1","content":"Hello"}}}
event: next
data: {"data":{"messageAdded":{"id":"2","content":"World"}}}
event: complete
data:

Using the Fetch API:

const response = await fetch("/graphql/stream", {
method: "POST",
headers: {
"Content-Type": "application/json",
"Accept": "text/event-stream",
"Authorization": "Bearer your-token"
},
body: JSON.stringify({
query: "subscription { messageAdded { id content } }",
}),
})
const reader = response.body.getReader()
const decoder = new TextDecoder()
while (true) {
const { done, value } = await reader.read()
if (done) break
// Parse SSE events from decoder.decode(value)...
}

Using graphql-sse client:

import { createClient } from "graphql-sse"
const client = createClient({
url: "/graphql/stream",
})
for await (const result of client.iterate({
query: "subscription { messageAdded { id content } }",
})) {
console.log("New message:", result.data?.messageAdded)
}

You can offer both WebSocket and SSE subscriptions simultaneously:

import { createServer } from "node:http"
import { createGraphQLWSServer, createSSEHandler } from "@effect-gql/node"
const httpServer = createServer(async (req, res) => {
const url = new URL(req.url ?? "/", `http://${req.headers.host}`)
// SSE subscriptions
if (url.pathname === "/graphql/stream" && req.method === "POST") {
await sseHandler(req, res)
return
}
// Regular GraphQL...
})
// WebSocket subscriptions
const { handleUpgrade } = createGraphQLWSServer(schema, Layer.empty)
httpServer.on("upgrade", handleUpgrade)
httpServer.listen(4000, () => {
console.log("WebSocket: ws://localhost:4000/graphql")
console.log("SSE: http://localhost:4000/graphql/stream")
})

A typical subscription setup requires a pub/sub system. Here’s a simple in-memory implementation:

import { Context, Effect, Layer, Stream, PubSub } from "effect"
class PubSubService extends Context.Tag("PubSubService")<PubSubService, {
publish: <A>(topic: string, message: A) => Effect.Effect<void>
subscribe: <A>(topic: string) => Stream.Stream<A>
}>() {}
const makePubSubService = Effect.gen(function* () {
const topics = new Map<string, PubSub.PubSub<any>>()
const getOrCreateTopic = (topic: string) => Effect.gen(function* () {
let pubsub = topics.get(topic)
if (!pubsub) {
pubsub = yield* PubSub.unbounded<any>()
topics.set(topic, pubsub)
}
return pubsub
})
return {
publish: <A>(topic: string, message: A) =>
Effect.gen(function* () {
const pubsub = yield* getOrCreateTopic(topic)
yield* PubSub.publish(pubsub, message)
}),
subscribe: <A>(topic: string): Stream.Stream<A> =>
Stream.unwrap(
Effect.gen(function* () {
const pubsub = yield* getOrCreateTopic(topic)
return Stream.fromPubSub(pubsub)
})
)
}
})
const PubSubServiceLive = Layer.effect(PubSubService, makePubSubService)
const builder = GraphQLSchemaBuilder.empty.pipe(
// Mutation triggers subscription updates
mutation("sendMessage", {
type: MessageSchema,
args: S.Struct({ channelId: S.String, content: S.String }),
resolve: ({ channelId, content }) => Effect.gen(function* () {
const pubsub = yield* PubSubService
const message = {
id: crypto.randomUUID(),
channelId,
content,
createdAt: new Date().toISOString()
}
yield* MessageRepository.save(message)
yield* pubsub.publish(`channel:${channelId}`, message)
return message
})
}),
// Subscription listens for new messages
subscription("messageAdded", {
type: MessageSchema,
args: S.Struct({ channelId: S.String }),
subscribe: ({ channelId }) => Effect.gen(function* () {
const pubsub = yield* PubSubService
return pubsub.subscribe(`channel:${channelId}`)
})
})
)

Filter events server-side to reduce bandwidth:

subscription("userActivity", {
type: ActivitySchema,
args: S.Struct({
userId: S.optional(S.String),
types: S.optional(S.Array(S.String))
}),
subscribe: ({ userId, types }) => Effect.gen(function* () {
const pubsub = yield* PubSubService
return pubsub.subscribe<Activity>("activity").pipe(
Stream.filter((activity) =>
userId ? activity.userId === userId : true
),
Stream.filter((activity) =>
types?.length ? types.includes(activity.type) : true
)
)
})
})

Batch rapid updates to reduce client processing:

subscription("stockPrices", {
type: S.Array(StockPriceSchema),
args: S.Struct({ symbols: S.Array(S.String) }),
subscribe: ({ symbols }) => Effect.gen(function* () {
const pubsub = yield* PubSubService
return pubsub.subscribe<StockPrice>("prices").pipe(
Stream.filter((price) => symbols.includes(price.symbol)),
Stream.groupedWithin(100, { duration: "100 millis" }),
Stream.map((prices) => [...prices])
)
})
})

Send initial data before streaming updates:

subscription("messages", {
type: MessageSchema,
args: S.Struct({ channelId: S.String }),
subscribe: ({ channelId }) => Effect.gen(function* () {
const db = yield* Database
const pubsub = yield* PubSubService
const recentMessages = yield* db.getRecentMessages(channelId, 10)
const initial = Stream.fromIterable(recentMessages)
const live = pubsub.subscribe(`channel:${channelId}`)
return Stream.concat(initial, live)
})
})

Protect expensive subscriptions with complexity limits:

// WebSocket
serve(router, serviceLayer, {
subscriptions: {
schema,
complexity: { maxComplexity: 100, maxDepth: 5 },
fieldComplexities: builder.getFieldComplexities(),
}
})
// SSE
const sseHandler = createSSEHandler(schema, Layer.empty, {
complexity: { maxComplexity: 100, maxDepth: 5 },
fieldComplexities: builder.getFieldComplexities(),
})

LibraryImport
graphql-wsimport { createClient } from "graphql-ws"
Apollo Clientimport { GraphQLWsLink } from "@apollo/client/link/subscriptions"
urqlimport { subscriptionExchange } from "urql"
LibraryImport
graphql-sseimport { createClient } from "graphql-sse"
NativeEventSource or fetch API

PlatformWebSocketSSE
Node.js@effect-gql/node (requires ws)@effect-gql/node
Bun@effect-gql/bun (built-in)@effect-gql/bun
Express@effect-gql/express (requires ws)@effect-gql/express
Workers/DenoNot supported@effect-gql/web