# APOPHIS WebSocket Extension — Technical Specification ## 1. Overview This specification extends **APOPHIS v1.0** with first-class **WebSocket** contract testing. WebSockets differ fundamentally from HTTP: | Dimension | HTTP | WebSocket | |-----------|------|-----------| | Connection | Ephemeral, per-request | Persistent, bidirectional | | Protocol | Request/response pairs | Message-oriented streams | | Metadata | Status codes, headers, body | No status codes; headers only at handshake | | Lifecycle | Stateless (per request) | Stateful (connection-scoped) | | Errors | HTTP status codes | Connection close codes + application-level errors | APOPHIS must treat WebSocket routes as **first-class citizens** in the contract/stateful testing pipeline without breaking existing HTTP-only workflows. --- ## 2. Goals 1. Annotate WebSocket routes in Fastify using `@fastify/websocket` (or equivalent) with APOPHIS contract annotations. 2. Define **message-schema contracts** (what messages may be sent/received) and **state-machine contracts** (valid transitions between connection states). 3. Provide a dedicated **`ws-runner.ts`** for WebSocket contract validation. 4. Validate **message sequences** (e.g., `AUTH` → `READY` → `SUBSCRIBE` → `DATA`). 5. Support **stateful testing** for connection lifecycle, reconnection, and error handling. 6. Extend **APOSTL** with `ws_message` and `ws_state` operations. 7. Integrate seamlessly with existing `contract()` and `stateful()` methods. --- ## 3. Non-Goals - WebSocket performance/load testing (out of scope; use `autocannon` or `k6`). - Browser-based WebSocket testing (APOPHIS is server-side only). - Raw TCP/WebSocket frame inspection (we operate at the message level). --- ## 4. Architecture ### 4.1 High-Level Flow ``` ┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐ │ Fastify Route │────▶│ WS Contract │────▶│ ws-runner.ts │ │ (with ws opts) │ │ Extraction │ │ (validation) │ └─────────────────┘ └──────────────────┘ └─────────────────┘ │ │ ▼ ▼ ┌──────────────────┐ ┌─────────────────┐ │ Message Schema │ │ APOSTL Eval │ │ (x-ws-messages) │ │ (ws_message, │ └──────────────────┘ │ ws_state) │ └─────────────────┘ ``` ### 4.2 File Additions & Modifications | File | Action | Purpose | |------|--------|---------| | `src/types.ts` | **Modify** | Add WS-specific types to public API | | `src/domain/contract.ts` | **Modify** | Extract WS contracts from route schema | | `src/domain/discovery.ts` | **Modify** | Discover WebSocket routes alongside HTTP | | `src/formula/parser.ts` | **Modify** | Add `ws_message`, `ws_state` to APOSTL grammar | | `src/formula/evaluator.ts` | **Modify** | Evaluate `ws_message` and `ws_state` nodes | | `src/test/ws-runner.ts` | **Create** | Dedicated WebSocket test runner | | `src/test/ws-client.ts` | **Create** | Thin WebSocket client wrapper for testing | | `src/domain/ws-contract-validation.ts` | **Create** | WS-specific validation logic | | `src/plugin/index.ts` | **Modify** | Wire WS runner into `contract()` / `stateful()` | | `src/domain/category.ts` | **Modify** | Categorize WS routes (default: `observer` for GET upgrades) | --- ## 5. Type Changes (`src/types.ts`) ### 5.1 New Types ```typescript // ============================================================================ // WebSocket: Message Types // ============================================================================ export interface WebSocketMessage { readonly direction: 'incoming' | 'outgoing' readonly type: string // e.g., 'auth', 'ready', 'data', 'error' readonly payload: unknown readonly timestamp: number } export interface WebSocketConnection { readonly id: string readonly url: string readonly headers: Record readonly state: WebSocketState readonly messages: ReadonlyArray readonly closeCode?: number readonly closeReason?: string } export type WebSocketState = | 'connecting' | 'open' | 'authenticating' | 'ready' | 'subscribed' | 'closing' | 'closed' | 'error' // ============================================================================ // WebSocket: Contract Annotations // ============================================================================ export interface WebSocketMessageSchema { readonly type: string readonly direction: 'incoming' | 'outgoing' readonly schema: Record // JSON Schema for payload readonly required?: boolean } export interface WebSocketStateTransition { readonly from: WebSocketState readonly to: WebSocketState readonly trigger: string // message type that triggers transition readonly guard?: string // APOSTL guard formula } export interface WebSocketContract { readonly path: string readonly method: 'GET' // WS always upgrades from GET readonly category: OperationCategory readonly messages: WebSocketMessageSchema[] readonly transitions: WebSocketStateTransition[] readonly requires: string[] // APOSTL preconditions (e.g., auth token in headers) readonly ensures: string[] // APOSTL postconditions on final state readonly invariants: string[] // APOSTL invariants over message sequence readonly validateRuntime: boolean } // ============================================================================ // WebSocket: EvalContext Extension // ============================================================================ export interface WebSocketEvalContext { readonly connection: WebSocketConnection readonly message: WebSocketMessage readonly state: WebSocketState readonly previousMessage?: WebSocketMessage } // ============================================================================ // WebSocket: Test Results // ============================================================================ export interface WebSocketTestResult { readonly ok: boolean readonly name: string readonly id: number readonly connectionId: string readonly directive?: string readonly diagnostics?: WebSocketTestDiagnostics } export interface WebSocketTestDiagnostics { readonly error?: string readonly violation?: ContractViolation readonly expectedSequence?: string[] readonly actualSequence?: string[] readonly stateTransition?: { readonly from: WebSocketState readonly to: WebSocketState readonly expectedTrigger: string readonly actualTrigger: string } } ``` ### 5.2 Modified Types ```typescript // In RouteContract, add optional ws field export interface RouteContract { path: string method: string category: OperationCategory requires: string[] ensures: string[] invariants: string[] regexPatterns: Record validateRuntime: boolean schema?: Record ws?: WebSocketContract // <-- NEW: present if route is a WebSocket upgrade } // In EvalContext, add optional ws field export interface EvalContext { readonly request: { /* ... */ } readonly response: { /* ... */ } readonly previous?: EvalContext readonly ws?: WebSocketEvalContext // <-- NEW: populated during WS testing } // In OperationHeader, add ws operations export type OperationHeader = | 'request_body' | 'response_body' | 'response_code' | 'request_headers' | 'response_headers' | 'query_params' | 'cookies' | 'response_time' | 'ws_message' | 'ws_state' // <-- NEW ``` --- ## 6. Fastify WebSocket Route Annotation ### 6.1 Route Definition (Example) Using `@fastify/websocket`: ```typescript import fastify from 'fastify' import websocket from '@fastify/websocket' import apophis from 'apophis-fastify' const app = fastify() await app.register(websocket) await app.register(apophis) // WebSocket route with APOPHIS contract annotations app.get('/ws/events', { websocket: true, schema: { // Standard HTTP schema for the upgrade request querystring: { type: 'object', properties: { stream: { type: 'string', enum: ['live', 'snapshot'] } } }, // WebSocket-specific contract annotations 'x-ws-messages': [ { type: 'auth', direction: 'outgoing', schema: { type: 'object', properties: { token: { type: 'string' } }, required: ['token'] }, required: true }, { type: 'ready', direction: 'incoming', schema: { type: 'object', properties: { status: { type: 'string', const: 'ready' } } } }, { type: 'subscribe', direction: 'outgoing', schema: { type: 'object', properties: { channels: { type: 'array', items: { type: 'string' } } } } }, { type: 'data', direction: 'incoming', schema: { type: 'object', properties: { channel: { type: 'string' }, payload: { type: 'object' } } } } ], 'x-ws-transitions': [ { from: 'open', to: 'authenticating', trigger: 'auth' }, { from: 'authenticating', to: 'ready', trigger: 'ready' }, { from: 'ready', to: 'subscribed', trigger: 'subscribe' }, { from: 'subscribed', to: 'subscribed', trigger: 'data' } ], 'x-requires': [ 'request_headers(this).authorization != null' ], 'x-ensures': [ 'ws_state(this) == "ready"', 'previous(ws_message(this).type) == "auth" && ws_message(this).type == "ready"' ], 'x-invariants': [ 'for msg in ws_message(this): msg.direction == "incoming" || msg.direction == "outgoing"' ] } }, (connection, req) => { // Handler implementation connection.socket.on('message', (raw) => { const msg = JSON.parse(raw.toString()) // ... handle messages }) }) ``` ### 6.2 Schema Annotation Reference | Annotation | Type | Description | |------------|------|-------------| | `x-ws-messages` | `WebSocketMessageSchema[]` | Defines valid message types, directions, and payload schemas | | `x-ws-transitions` | `WebSocketStateTransition[]` | Defines valid state machine transitions | | `x-requires` | `string[]` | APOSTL preconditions evaluated on the HTTP upgrade request | | `x-ensures` | `string[]` | APOSTL postconditions evaluated after connection close or per-message | | `x-invariants` | `string[]` | APOSTL invariants checked after every message | | `x-category` | `string` | Override category (default: `observer` for WS upgrades) | | `x-validate-runtime` | `boolean` | Enable runtime validation of WS contracts | --- ## 7. Contract Extraction (`src/domain/contract.ts`) ### 7.1 Modified `extractContract` ```typescript export const extractContract = ( path: string, method: string, schema: Record | undefined, isWebSocket: boolean = false // <-- NEW parameter ): RouteContract => { const s = schema ?? {} // ... existing cache logic ... const override = typeof s['x-category'] === 'string' ? s['x-category'] : undefined // For WS routes, default to 'observer' unless overridden const category = isWebSocket ? (override as OperationCategory ?? 'observer') : inferCategory(path, method, override) // ... existing requires/ensures extraction ... const contract: RouteContract = { path, method: method.toUpperCase(), category, requires, ensures, invariants: EMPTY_INVARIANTS, regexPatterns: {}, validateRuntime, schema: s, } // If WebSocket, extract WS-specific contract if (isWebSocket) { contract.ws = extractWebSocketContract(s) } // ... cache and return ... return contract } ``` ### 7.2 New `extractWebSocketContract` ```typescript const extractWebSocketContract = ( schema: Record ): WebSocketContract | undefined => { const messages = schema['x-ws-messages'] const transitions = schema['x-ws-transitions'] if (!Array.isArray(messages) || messages.length === 0) { return undefined } const requires = schema['x-requires'] const ensures = schema['x-ensures'] const invariants = schema['x-invariants'] const validateRuntime = schema['x-validate-runtime'] !== false return { path: '', // populated by caller method: 'GET', category: 'observer', messages: messages as WebSocketMessageSchema[], transitions: (transitions as WebSocketStateTransition[]) ?? [], requires: Array.isArray(requires) ? requires as string[] : [], ensures: Array.isArray(ensures) ? ensures as string[] : [], invariants: Array.isArray(invariants) ? invariants as string[] : [], validateRuntime, } } ``` --- ## 8. Route Discovery (`src/domain/discovery.ts`) ### 8.1 Modified `captureRoute` ```typescript export const captureRoute = ( instance: object, route: CapturedRoute & { websocket?: boolean } // <-- NEW: websocket flag ): void => { const existing = capturedRoutes.get(instance) ?? [] existing.push(route) capturedRoutes.set(instance, existing) } ``` ### 8.2 Modified `discoverRoutes` ```typescript export const discoverRoutes = ( instance: { routes?: Array<{ method: string url: string schema?: Record websocket?: boolean }> } ): RouteContract[] => { const captured = capturedRoutes.get(instance) if (captured && captured.length > 0) { return captured.map((route) => extractContract(route.url, route.method, route.schema, route.websocket ?? false) ) } if (Array.isArray(instance.routes) && instance.routes.length > 0) { return instance.routes.map((route) => extractContract(route.url, route.method, route.schema, route.websocket ?? false) ) } return [] } ``` ### 8.3 Plugin Hook Modification (`src/plugin/index.ts:114-138`) ```typescript fastify.addHook('onRoute', (routeOptions) => { const method = Array.isArray(routeOptions.method) ? routeOptions.method.join(',') : routeOptions.method const schema = routeOptions.schema as Record | undefined const prefix = (routeOptions as unknown as Record).prefix as string | undefined const url = prefix && !routeOptions.url.startsWith(prefix) ? `${prefix}${routeOptions.url}` : routeOptions.url // Detect WebSocket routes const isWebSocket = (routeOptions as unknown as Record).websocket === true captureRoute(fastify, { method, url, schema, prefix, websocket: isWebSocket, // <-- NEW }) const contract = extractContract(url, method, schema, isWebSocket) if (contract.validateRuntime && (contract.requires.length > 0 || contract.ensures.length > 0 || contract.ws !== undefined)) { const config = routeOptions.config as Record || {} config.apophisContract = contract routeOptions.config = config as typeof routeOptions.config } }) ``` --- ## 9. APOSTL Formula Extensions ### 9.1 New Operations | Operation | Syntax | Returns | Description | |-----------|--------|---------|-------------| | `ws_message(this)` | `ws_message(this)` | `WebSocketMessage` | Current message being evaluated | | `ws_message(this).type` | `ws_message(this).type` | `string` | Type of current message | | `ws_message(this).payload` | `ws_message(this).payload` | `unknown` | Payload of current message | | `ws_message(this).direction` | `ws_message(this).direction` | `'incoming' \| 'outgoing'` | Direction of current message | | `ws_state(this)` | `ws_state(this)` | `WebSocketState` | Current connection state | | `previous(ws_message(this))` | `previous(ws_message(this))` | `WebSocketMessage` | Previous message in sequence | ### 9.2 Parser Changes (`src/formula/parser.ts`) Add `ws_message` and `ws_state` to `VALID_HEADERS`: ```typescript const VALID_HEADERS: OperationHeader[] = [ 'request_body', 'response_body', 'response_code', 'request_headers', 'response_headers', 'query_params', 'cookies', 'response_time', 'ws_message', 'ws_state' // <-- NEW ] ``` Add manual char-code parsing for `ws_message` (10 chars) and `ws_state` (8 chars) in `parseOperation`: ```typescript // In parseOperation, add after cookies check: if (!header && p + 10 <= len) { const c0 = input.charCodeAt(p) const c1 = input.charCodeAt(p + 1) // ... check for 'ws_message' ... if (c0 === 119 && c1 === 115) { // ws_ prefix const c3 = input.charCodeAt(p + 3) if (c3 === 109) { // ws_message header = 'ws_message' headerLen = 10 } else if (c3 === 115) { // ws_state header = 'ws_state' headerLen = 8 } } } ``` ### 9.3 Evaluator Changes (`src/formula/evaluator.ts`) Add cases to `resolveOperation`: ```typescript function resolveOperation(node: Extract, ctx: EvalContext): unknown { const { header, parameter, accessor } = node let target: unknown switch (header) { // ... existing cases ... case 'ws_message': target = ctx.ws?.message ?? null break case 'ws_state': target = ctx.ws?.state ?? null break default: throw new Error(`Unknown operation header: ${header}`) } // ... existing accessor logic ... } ``` --- ## 10. WebSocket Test Runner (`src/test/ws-runner.ts`) ### 10.1 Runner Interface ```typescript export interface WebSocketTestRunner { run( fastify: FastifyInjectInstance, config: TestConfig, scopeRegistry?: ScopeRegistry ): Promise } ``` ### 10.2 Pseudocode ```typescript /** * WebSocket Contract Test Runner * Validates WS routes for: * 1. Message schema compliance (payload matches JSON Schema) * 2. State machine transitions (valid sequence of states) * 3. APOSTL preconditions (on upgrade request) * 4. APOSTL postconditions (on connection close or per-message) * 5. APOSTL invariants (over entire message sequence) */ export const runWebSocketTests = async ( fastify: FastifyInjectInstance, config: TestConfig, scopeRegistry?: ScopeRegistry ): Promise => { const startTime = Date.now() const depth = resolveDepth(config.depth ?? 'standard') // 1. Discover WS routes const allRoutes = discoverRoutes(fastify) const wsRoutes = allRoutes.filter(r => r.ws !== undefined) if (wsRoutes.length === 0) { return { tests: [], summary: { passed: 0, failed: 0, skipped: 0, timeMs: 0, cacheHits: 0, cacheMisses: 0 }, routes: [], } } const scopeHeaders = scopeRegistry?.getHeaders(config.scope ?? null) ?? {} const results: TestResult[] = [] let testId = 0 // 2. For each WS route, run test sequences for (const route of wsRoutes) { const wsContract = route.ws! const commandsPerRoute = Math.max(1, Math.floor(depth.contractRuns / Math.max(wsRoutes.length, 1))) for (let run = 0; run < commandsPerRoute; run++) { testId++ const connectionId = `ws-${testId}` const name = `WS ${route.method} ${route.path} (#${testId})` // 3. Establish WebSocket connection let connection: WebSocketConnection try { connection = await establishConnection(fastify, route, scopeHeaders) } catch (err) { results.push({ ok: false, name, id: testId, diagnostics: { error: `Connection failed: ${err instanceof Error ? err.message : String(err)}` } }) continue } // 4. Validate upgrade request preconditions const upgradeCtx = buildUpgradeContext(connection, route) const preResult = validatePostconditions(wsContract.requires, upgradeCtx, { method: route.method, path: route.path, }) if (!preResult.success) { results.push({ ok: false, name, id: testId, diagnostics: { error: `Precondition failed: ${preResult.error}`, violation: preResult.violation, } }) await closeConnection(connection) continue } // 5. Run message sequence const sequenceResult = await runMessageSequence( connection, wsContract, route, testId, scopeHeaders ) results.push(...sequenceResult.results) // 6. Validate postconditions on final state const finalCtx = buildFinalContext(connection, route) const postResult = validatePostconditions(wsContract.ensures, finalCtx, { method: route.method, path: route.path, }) if (!postResult.success) { testId++ results.push({ ok: false, name: `POST ${name}`, id: testId, diagnostics: { error: `Postcondition failed: ${postResult.error}`, violation: postResult.violation, } }) } // 7. Check invariants over entire sequence const invariantResults = checkWebSocketInvariants(wsContract.invariants, connection, route) for (const inv of invariantResults) { if (!inv.success) { testId++ results.push({ ok: false, name: `INVARIANT: ${inv.name}`, id: testId, diagnostics: { error: inv.error } }) } } await closeConnection(connection) } } // 8. Build TestSuite const passed = results.filter(r => r.ok && r.directive === undefined).length const failed = results.filter(r => !r.ok).length const skipped = results.filter(r => r.directive !== undefined).length return { tests: results, summary: { passed, failed, skipped, timeMs: Date.now() - startTime, cacheHits: 0, cacheMisses: 0 }, routes: allRoutes.map(r => ({ path: r.path, method: r.method, status: r.ws !== undefined ? 'tested' : 'no-contract', })), } } ``` ### 10.3 Message Sequence Validation ```typescript interface SequenceResult { results: TestResult[] connection: WebSocketConnection } const runMessageSequence = async ( connection: WebSocketConnection, wsContract: WebSocketContract, route: RouteContract, baseTestId: number, scopeHeaders: Record ): Promise => { const results: TestResult[] = [] let currentState: WebSocketState = 'open' let testId = baseTestId // Generate a valid message sequence based on state machine const sequence = generateValidSequence(wsContract.transitions) for (const expectedMsg of sequence) { testId++ const msgName = `MSG ${connection.id} ${expectedMsg.type} (#${testId})` // Send or receive message let actualMsg: WebSocketMessage try { if (expectedMsg.direction === 'outgoing') { actualMsg = await sendMessage(connection, expectedMsg) } else { actualMsg = await receiveMessage(connection, expectedMsg, 5000) } } catch (err) { results.push({ ok: false, name: msgName, id: testId, diagnostics: { error: `Message exchange failed: ${err instanceof Error ? err.message : String(err)}`, expectedSequence: sequence.map(m => m.type), } }) break } // Validate message schema const schemaValidation = validateMessageSchema(actualMsg, wsContract.messages) if (!schemaValidation.valid) { results.push({ ok: false, name: msgName, id: testId, diagnostics: { error: `Schema violation: ${schemaValidation.error}`, violation: schemaValidation.violation, } }) continue } // Validate state transition const transition = wsContract.transitions.find(t => t.from === currentState && t.trigger === actualMsg.type ) if (!transition) { results.push({ ok: false, name: msgName, id: testId, diagnostics: { error: `Invalid state transition from ${currentState} via ${actualMsg.type}`, stateTransition: { from: currentState, to: currentState, expectedTrigger: expectedMsg.type, actualTrigger: actualMsg.type, } } }) continue } // Update state currentState = transition.to // Evaluate APOSTL formulas for this message const msgCtx = buildMessageContext(connection, actualMsg, currentState) const postResult = validatePostconditions(wsContract.ensures, msgCtx, { method: route.method, path: route.path, }) if (!postResult.success) { results.push({ ok: false, name: msgName, id: testId, diagnostics: { error: `Postcondition failed: ${postResult.error}`, violation: postResult.violation, } }) continue } results.push({ ok: true, name: msgName, id: testId }) } return { results, connection } } ``` ### 10.4 Sequence Generation ```typescript const generateValidSequence = ( transitions: WebSocketStateTransition[] ): Array<{ type: string; direction: 'incoming' | 'outgoing' }> => { // Build adjacency list const adj = new Map() for (const t of transitions) { const existing = adj.get(t.from) ?? [] existing.push(t) adj.set(t.from, existing) } // BFS/DFS to find a path from 'open' to a terminal state const sequence: Array<{ type: string; direction: 'incoming' | 'outgoing' }> = [] let current: WebSocketState = 'open' const visited = new Set() while (true) { const options = adj.get(current) ?? [] const unvisited = options.filter(t => !visited.has(`${t.from}-${t.to}-${t.trigger}`)) if (unvisited.length === 0) break const choice = unvisited[0]! visited.add(`${choice.from}-${choice.to}-${choice.trigger}`) // Determine direction based on typical patterns // (outgoing = client→server, incoming = server→client) const direction: 'incoming' | 'outgoing' = ['auth', 'subscribe', 'ping'].includes(choice.trigger) ? 'outgoing' : 'incoming' sequence.push({ type: choice.trigger, direction }) current = choice.to // Prevent infinite loops if (sequence.length > 50) break } return sequence } ``` --- ## 11. WebSocket Client (`src/test/ws-client.ts`) Thin wrapper around `ws` or native `WebSocket` for testing: ```typescript import WebSocket from 'ws' export const establishConnection = async ( fastify: FastifyInjectInstance, route: RouteContract, headers: Record ): Promise => { // Fastify inject doesn't support WS; we need the actual HTTP server const address = fastify.server?.address() if (!address || typeof address === 'string') { throw new Error('Fastify server must be listening for WebSocket tests') } const url = `ws://localhost:${address.port}${route.path}` const ws = new WebSocket(url, { headers }) return new Promise((resolve, reject) => { const connection: WebSocketConnection = { id: `ws-${Date.now()}-${Math.random().toString(36).slice(2)}`, url, headers, state: 'connecting', messages: [], } ws.on('open', () => { connection.state = 'open' resolve(connection) }) ws.on('error', (err) => { connection.state = 'error' reject(err) }) ws.on('message', (data) => { const msg: WebSocketMessage = { direction: 'incoming', type: inferMessageType(data), payload: parsePayload(data), timestamp: Date.now(), } connection.messages = [...connection.messages, msg] }) ws.on('close', (code, reason) => { connection.state = 'closed' connection.closeCode = code connection.closeReason = reason.toString() }) // Attach ws instance to connection for send/close operations (connection as any)._ws = ws }) } export const sendMessage = async ( connection: WebSocketConnection, msg: { type: string; payload?: unknown } ): Promise => { const ws = (connection as any)._ws as WebSocket const payload = JSON.stringify({ type: msg.type, ...msg.payload }) ws.send(payload) const sent: WebSocketMessage = { direction: 'outgoing', type: msg.type, payload: msg.payload, timestamp: Date.now(), } connection.messages = [...connection.messages, sent] return sent } export const receiveMessage = async ( connection: WebSocketConnection, expected: { type: string }, timeoutMs: number ): Promise => { return new Promise((resolve, reject) => { const startTime = Date.now() const check = () => { const lastMsg = connection.messages[connection.messages.length - 1] if (lastMsg && lastMsg.direction === 'incoming' && lastMsg.type === expected.type) { resolve(lastMsg) return } if (Date.now() - startTime > timeoutMs) { reject(new Error(`Timeout waiting for message type: ${expected.type}`)) return } setTimeout(check, 10) } check() }) } export const closeConnection = async ( connection: WebSocketConnection ): Promise => { const ws = (connection as any)._ws as WebSocket ws.close() return new Promise((resolve) => { ws.on('close', () => resolve()) }) } ``` --- ## 12. Stateful Testing for WebSockets ### 12.1 Connection Lifecycle States ``` connecting → open → authenticating → ready → subscribed → closing → closed ↓ ↓ ↓ ↓ error error error error ``` ### 12.2 Reconnection Testing ```typescript const testReconnection = async ( route: RouteContract, scopeHeaders: Record ): Promise => { const results: TestResult[] = [] // Test 1: Clean reconnect after normal close const conn1 = await establishConnection(fastify, route, scopeHeaders) await runAuthSequence(conn1) await closeConnection(conn1) const conn2 = await establishConnection(fastify, route, scopeHeaders) const reconnectOk = conn2.state === 'open' results.push({ ok: reconnectOk, name: `Reconnection after clean close`, id: 1, diagnostics: reconnectOk ? undefined : { error: 'Reconnection failed after clean close' } }) // Test 2: Reconnect after error const conn3 = await establishConnection(fastify, route, scopeHeaders) await sendInvalidMessage(conn3) // Force error await closeConnection(conn3) const conn4 = await establishConnection(fastify, route, scopeHeaders) const errorReconnectOk = conn4.state === 'open' results.push({ ok: errorReconnectOk, name: `Reconnection after error`, id: 2, diagnostics: errorReconnectOk ? undefined : { error: 'Reconnection failed after error' } }) return results } ``` ### 12.3 Error Handling Testing ```typescript const testErrorHandling = async ( route: RouteContract, wsContract: WebSocketContract, scopeHeaders: Record ): Promise => { const results: TestResult[] = [] // Test: Invalid message type const conn = await establishConnection(fastify, route, scopeHeaders) await sendMessage(conn, { type: 'invalid_type_xyz', payload: {} }) const errorMsg = await waitForMessage(conn, 'error', 1000) const handledCorrectly = errorMsg !== null && errorMsg.payload !== undefined results.push({ ok: handledCorrectly, name: `Error handling for invalid message type`, id: 1, diagnostics: handledCorrectly ? undefined : { error: 'Server did not respond with error message for invalid type' } }) // Test: Message without required auth const conn2 = await establishConnection(fastify, route, {}) await sendMessage(conn2, { type: 'subscribe', payload: { channels: ['test'] } }) const authError = await waitForMessage(conn2, 'error', 1000) const authHandled = authError !== null results.push({ ok: authHandled, name: `Error handling for unauthenticated subscribe`, id: 2, diagnostics: authHandled ? undefined : { error: 'Server allowed subscribe without auth' } }) return results } ``` --- ## 13. Integration with `contract()` and `stateful()` ### 13.1 Modified Plugin (`src/plugin/index.ts`) ```typescript const buildContract = (fastify: FastifyInstance, scope: ScopeRegistry) => async (opts: TestConfig = {}): Promise => { const config = { depth: opts.depth ?? 'standard', scope: opts.scope, seed: opts.seed, } const injectInstance = fastify as unknown as import('../types.js').FastifyInjectInstance // Run HTTP contract tests const httpSuite = await runPetitTests(injectInstance, config, scope) // Run WebSocket contract tests const wsSuite = await runWebSocketTests(injectInstance, config, scope) // Merge results const mergedTests = [...httpSuite.tests, ...wsSuite.tests] const mergedRoutes = [...httpSuite.routes, ...wsSuite.routes] const mergedSummary = { passed: httpSuite.summary.passed + wsSuite.summary.passed, failed: httpSuite.summary.failed + wsSuite.summary.failed, skipped: httpSuite.summary.skipped + wsSuite.summary.skipped, timeMs: httpSuite.summary.timeMs + wsSuite.summary.timeMs, cacheHits: httpSuite.summary.cacheHits + wsSuite.summary.cacheHits, cacheMisses: httpSuite.summary.cacheMisses + wsSuite.summary.cacheMisses, } // Loud failure on empty discovery if (mergedTests.length === 0) { const routes = discoverRoutes(fastify as unknown as { routes?: Array<{ method: string; url: string; schema?: Record }> }) if (routes.length === 0) { throw new Error( 'No routes discovered. Did you register APOPHIS before defining routes? ' + 'APOPHIS must be registered via `await fastify.register(apophis)` before any routes are defined.' ) } } return { tests: mergedTests, summary: mergedSummary, routes: mergedRoutes, } } const buildStateful = (fastify: FastifyInstance, scope: ScopeRegistry, cleanupManager: CleanupManager) => async (opts: TestConfig = {}): Promise => { const config = { depth: opts.depth ?? 'standard', scope: opts.scope, seed: opts.seed, } const injectInstance = fastify as unknown as import('../types.js').FastifyInjectInstance // Run HTTP stateful tests const httpSuite = await runStatefulTests(injectInstance, config, cleanupManager, scope) // Run WebSocket stateful tests (reconnection, error handling) const wsSuite = await runWebSocketStatefulTests(injectInstance, config, scope) // Merge results (same pattern as contract()) // ... return { tests: [...httpSuite.tests, ...wsSuite.tests], summary: { /* merged */ }, routes: [...httpSuite.routes, ...wsSuite.routes], } } ``` ### 13.2 New `runWebSocketStatefulTests` ```typescript export const runWebSocketStatefulTests = async ( fastify: FastifyInjectInstance, config: TestConfig, scopeRegistry?: ScopeRegistry ): Promise => { const startTime = Date.now() const depth = resolveDepth(config.depth ?? 'standard') const allRoutes = discoverRoutes(fastify) const wsRoutes = allRoutes.filter(r => r.ws !== undefined) if (wsRoutes.length === 0) { return { tests: [], summary: { passed: 0, failed: 0, skipped: 0, timeMs: 0, cacheHits: 0, cacheMisses: 0 }, routes: [], } } const scopeHeaders = scopeRegistry?.getHeaders(config.scope ?? null) ?? {} const results: TestResult[] = [] let testId = 0 for (const route of wsRoutes) { // Test 1: Connection lifecycle testId++ const lifecycleResults = await testConnectionLifecycle(route, scopeHeaders) results.push(...lifecycleResults.map((r, i) => ({ ...r, id: testId + i }))) testId += lifecycleResults.length // Test 2: Reconnection testId++ const reconnectResults = await testReconnection(route, scopeHeaders) results.push(...reconnectResults.map((r, i) => ({ ...r, id: testId + i }))) testId += reconnectResults.length // Test 3: Error handling testId++ const errorResults = await testErrorHandling(route, route.ws!, scopeHeaders) results.push(...errorResults.map((r, i) => ({ ...r, id: testId + i }))) testId += errorResults.length // Test 4: Message sequence fuzzing (fast-check) const numRuns = depth.statefulRuns const prop = fc.asyncProperty( fc.array(fc.constantFrom(...generateValidMessages(route.ws!)), { minLength: 1, maxLength: depth.maxCommands }), async (messages) => { const conn = await establishConnection(fastify, route, scopeHeaders) try { for (const msg of messages) { await sendMessage(conn, msg) const response = await receiveMessage(conn, { type: '*' }, 1000) // Validate response } return true } finally { await closeConnection(conn) } } ) try { await fc.assert(prop, { numRuns, seed: config.seed }) } catch (err) { // Format counterexample... } } const passed = results.filter(r => r.ok && r.directive === undefined).length const failed = results.filter(r => !r.ok).length const skipped = results.filter(r => r.directive !== undefined).length return { tests: results, summary: { passed, failed, skipped, timeMs: Date.now() - startTime, cacheHits: 0, cacheMisses: 0 }, routes: allRoutes.map(r => ({ path: r.path, method: r.method, status: r.ws !== undefined ? 'tested' : 'no-contract', })), } } ``` --- ## 14. Category Inference for WebSockets (`src/domain/category.ts`) ```typescript export const inferCategory = ( path: string, method: string, override: string | undefined, isWebSocket: boolean = false // <-- NEW parameter ): OperationCategory => { if (override !== undefined && override !== '') { return override as OperationCategory } // WebSocket upgrades default to observer (read-only stream) if (isWebSocket) { return 'observer' } // ... existing logic ... } ``` --- ## 15. Runtime Validation Hooks for WebSockets ### 15.1 Modified Hook Validator (`src/infrastructure/hook-validator.ts`) WebSocket runtime validation is more complex because messages arrive asynchronously. We validate: 1. **On upgrade (`onRoute` with `websocket: true`)**: Validate `x-requires` on the HTTP upgrade request. 2. **On each message**: Validate message schema against `x-ws-messages`. 3. **On state change**: Validate `x-ws-transitions`. 4. **On close**: Validate `x-ensures` on the final connection state. ```typescript // Add to registerValidationHooks: if (opts.validateRuntime) { fastify.addHook('preHandler', createPreHandler(opts)) fastify.addHook('preSerialization', (_request, reply, payload, done) => { reply[kApophisPayload] = payload done() }) fastify.addHook('onSend', createOnSend(opts)) // WebSocket-specific hooks fastify.addHook('onRoute', (routeOptions) => { const contract = (routeOptions.config as Record)?.apophisContract as RouteContract | undefined if (contract?.ws && contract.validateRuntime) { // Register WS validation middleware registerWebSocketValidation(fastify, routeOptions, contract.ws, opts) } }) } ``` ### 15.2 WebSocket Validation Middleware ```typescript const registerWebSocketValidation = ( fastify: FastifyInstance, routeOptions: any, wsContract: WebSocketContract, opts: HookOptions ): void => { // This integrates with @fastify/websocket's connection handler const originalHandler = routeOptions.handler routeOptions.handler = async (connection: any, req: any) => { const wsConnection: WebSocketConnection = { id: `ws-${Date.now()}`, url: req.url, headers: req.headers as Record, state: 'open', messages: [], } // Validate upgrade preconditions const upgradeCtx = buildUpgradeContext(wsConnection, { method: 'GET', path: req.url }) try { validateFormulas(wsContract.requires, upgradeCtx) } catch (err) { if (opts.runtimeLevel === 'error') { connection.socket.close(1008, 'Policy violation: precondition failed') return } console.warn(`WS precondition warning: ${err instanceof Error ? err.message : String(err)}`) } // Wrap message handler const originalOnMessage = connection.socket.on connection.socket.on = function(event: string, handler: Function) { if (event === 'message') { return originalOnMessage.call(this, event, (data: any) => { const msg = parseWebSocketMessage(data) wsConnection.messages = [...wsConnection.messages, msg] // Validate message schema const schemaValidation = validateMessageSchema(msg, wsContract.messages) if (!schemaValidation.valid) { if (opts.runtimeLevel === 'error') { connection.socket.close(1008, `Invalid message: ${schemaValidation.error}`) return } console.warn(`WS schema warning: ${schemaValidation.error}`) } // Validate state transition const transition = wsContract.transitions.find(t => t.from === wsConnection.state && t.trigger === msg.type ) if (transition) { wsConnection.state = transition.to } else { const validTriggers = wsContract.transitions .filter(t => t.from === wsConnection.state) .map(t => t.trigger) if (opts.runtimeLevel === 'error') { connection.socket.close(1008, `Invalid transition from ${wsConnection.state}. Valid: ${validTriggers.join(', ')}`) return } console.warn(`WS transition warning: invalid transition from ${wsConnection.state} via ${msg.type}`) } return handler(data) }) } return originalOnMessage.call(this, event, handler) } // Validate postconditions on close connection.socket.on('close', () => { wsConnection.state = 'closed' const finalCtx = buildFinalContext(wsConnection, { method: 'GET', path: req.url }) try { validateFormulas(wsContract.ensures, finalCtx) } catch (err) { if (opts.runtimeLevel === 'error') { console.error(`WS postcondition error: ${err instanceof Error ? err.message : String(err)}`) } else { console.warn(`WS postcondition warning: ${err instanceof Error ? err.message : String(err)}`) } } }) return originalHandler(connection, req) } } ``` --- ## 16. Example: Complete WebSocket Route with Contracts ```typescript import fastify from 'fastify' import websocket from '@fastify/websocket' import apophis from 'apophis-fastify' const app = fastify() await app.register(websocket) await app.register(apophis, { runtime: 'error', // Enforce contracts at runtime }) app.get('/ws/notifications', { websocket: true, schema: { querystring: { type: 'object', properties: { userId: { type: 'string', pattern: '^[a-zA-Z0-9_-]+$' } }, required: ['userId'] }, 'x-ws-messages': [ { type: 'auth', direction: 'outgoing', schema: { type: 'object', properties: { token: { type: 'string', minLength: 32 } }, required: ['token'] }, required: true }, { type: 'auth_success', direction: 'incoming', schema: { type: 'object', properties: { status: { type: 'string', const: 'authenticated' }, userId: { type: 'string' } }, required: ['status', 'userId'] } }, { type: 'subscribe', direction: 'outgoing', schema: { type: 'object', properties: { channels: { type: 'array', items: { type: 'string', enum: ['alerts', 'messages', 'system'] }, minItems: 1 } }, required: ['channels'] } }, { type: 'notification', direction: 'incoming', schema: { type: 'object', properties: { channel: { type: 'string' }, data: { type: 'object' }, timestamp: { type: 'string', format: 'date-time' } }, required: ['channel', 'data', 'timestamp'] } }, { type: 'ping', direction: 'outgoing', schema: { type: 'object', properties: { timestamp: { type: 'number' } } } }, { type: 'pong', direction: 'incoming', schema: { type: 'object', properties: { timestamp: { type: 'number' } } } } ], 'x-ws-transitions': [ { from: 'open', to: 'authenticating', trigger: 'auth' }, { from: 'authenticating', to: 'ready', trigger: 'auth_success' }, { from: 'ready', to: 'subscribed', trigger: 'subscribe' }, { from: 'subscribed', to: 'subscribed', trigger: 'notification' }, { from: 'subscribed', to: 'subscribed', trigger: 'ping' }, { from: 'subscribed', to: 'subscribed', trigger: 'pong' } ], 'x-requires': [ 'request_headers(this).authorization != null', 'query_params(this).userId matches "^[a-zA-Z0-9_-]+$"' ], 'x-ensures': [ 'ws_state(this) == "ready" => previous(ws_message(this).type) == "auth"', 'ws_message(this).type == "notification" => ws_state(this) == "subscribed"' ], 'x-invariants': [ 'for msg in ws_message(this): msg.direction == "incoming" || msg.direction == "outgoing"', 'ws_state(this) != "error"' ], 'x-validate-runtime': true } }, (connection, req) => { const userId = (req.query as any).userId connection.socket.on('message', (raw) => { const msg = JSON.parse(raw.toString()) switch (msg.type) { case 'auth': validateToken(msg.token) connection.socket.send(JSON.stringify({ type: 'auth_success', status: 'authenticated', userId })) break case 'subscribe': subscribeToChannels(userId, msg.channels) break case 'ping': connection.socket.send(JSON.stringify({ type: 'pong', timestamp: msg.timestamp })) break } }) // Simulate notifications const interval = setInterval(() => { connection.socket.send(JSON.stringify({ type: 'notification', channel: 'alerts', data: { message: 'New alert!' }, timestamp: new Date().toISOString() })) }, 5000) connection.socket.on('close', () => { clearInterval(interval) }) }) // Run tests const suite = await app.apophis.contract() console.log(`Tests: ${suite.summary.passed} passed, ${suite.summary.failed} failed`) ``` --- ## 17. Testing the WebSocket Extension ### 17.1 Unit Tests ```typescript // src/test/ws-runner.test.ts import { test } from 'node:test' import assert from 'node:assert' import { runWebSocketTests } from '../test/ws-runner.js' import { extractContract } from '../domain/contract.js' test('ws-runner: validates message schema', async () => { const route = extractContract('/ws/test', 'GET', { 'x-ws-messages': [ { type: 'hello', direction: 'incoming', schema: { type: 'object', properties: { name: { type: 'string' } } } } ] }, true) assert.ok(route.ws) assert.equal(route.ws!.messages.length, 1) }) test('ws-runner: detects invalid state transition', async () => { const transitions = [ { from: 'open', to: 'ready', trigger: 'auth' } ] const result = validateStateTransition('open', 'subscribe', transitions) assert.equal(result.valid, false) assert.ok(result.error!.includes('Invalid transition')) }) test('ws-runner: validates APOSTL ws_message operation', async () => { const formula = 'ws_message(this).type == "auth"' const ast = parse(formula) assert.equal(ast.ast.type, 'operation') assert.equal((ast.ast as any).header, 'ws_message') }) ``` ### 17.2 Integration Tests ```typescript // src/test/integration.test.ts (additions) test('integration: WebSocket contract testing', async () => { const app = fastify() await app.register(websocket) await app.register(apophis) app.get('/ws/echo', { websocket: true, schema: { 'x-ws-messages': [ { type: 'echo', direction: 'outgoing', schema: { type: 'object', properties: { text: { type: 'string' } } } }, { type: 'echo_response', direction: 'incoming', schema: { type: 'object', properties: { text: { type: 'string' } } } } ], 'x-ws-transitions': [ { from: 'open', to: 'ready', trigger: 'echo' } ], 'x-ensures': [ 'ws_message(this).type == "echo_response" => ws_message(this).payload.text == previous(ws_message(this).payload.text)' ] } }, (connection) => { connection.socket.on('message', (raw) => { const msg = JSON.parse(raw.toString()) connection.socket.send(JSON.stringify({ type: 'echo_response', text: msg.text })) }) }) await app.ready() const suite = await app.apophis.contract() assert.ok(suite.summary.passed > 0) assert.equal(suite.summary.failed, 0) }) ``` --- ## 18. Migration Guide ### 18.1 For Existing APOPHIS Users **No breaking changes.** Existing HTTP-only routes continue to work unchanged. WebSocket support is additive. ### 18.2 Enabling WebSocket Testing 1. Install `@fastify/websocket`: ```bash npm install @fastify/websocket ``` 2. Register the WebSocket plugin **before** APOPHIS: ```typescript await app.register(websocket) await app.register(apophis) ``` 3. Add `websocket: true` to route options: ```typescript app.get('/ws/stream', { websocket: true, schema: { ... } }, handler) ``` 4. Add `x-ws-messages` and `x-ws-transitions` to route schema. 5. Run tests normally: ```typescript const suite = await app.apophis.contract() // Tests both HTTP and WS ``` --- ## 19. Performance Considerations | Concern | Mitigation | |---------|------------| | WS connections are expensive | Reuse connections across test runs; pool WS clients | | Message timeouts | Configurable timeout per message type (default: 5s) | | Sequence explosion | Limit sequence length to `depth.maxCommands` | | Memory leaks | Always close connections; use `try/finally` | | Fastify inject doesn't support WS | Require `fastify.listen()` for WS tests; skip if not listening | --- ## 20. Security Considerations | Concern | Mitigation | |---------|------------| | WS messages may contain secrets | Redact `authorization`, `token`, `password` fields in diagnostics | | ReDoS in message schema patterns | Reuse existing `regex-guard.ts` for pattern validation | | Connection flooding | Limit concurrent WS connections in test runner | | Malformed binary messages | Only support JSON messages in v1; reject binary with clear error | --- ## 21. Open Questions 1. **Binary WebSocket frames**: Should v1 support binary (ArrayBuffer/Buffer) messages, or only JSON? 2. **WebSocket subprotocols**: How to handle `Sec-WebSocket-Protocol` negotiation in contracts? 3. **Multiple WebSocket routes**: Should we support route prefixes (e.g., `/ws/v1/*`, `/ws/v2/*`) with different contracts? 4. **Server-Sent Events (SSE)**: Should this extension also cover SSE, or is that a separate spec? 5. **WebSocket compression**: Should contracts specify `permessage-deflate` expectations? --- ## 22. Appendix: Complete Type Reference ```typescript // All WebSocket types are exported from src/types.ts export type { WebSocketMessage, WebSocketConnection, WebSocketState, WebSocketMessageSchema, WebSocketStateTransition, WebSocketContract, WebSocketEvalContext, WebSocketTestResult, WebSocketTestDiagnostics, } from './types.js' ``` --- ## 23. File Change Summary | File | Lines Added | Lines Modified | Description | |------|-------------|----------------|-------------| | `src/types.ts` | ~120 | ~10 | Add all WS types to public API | | `src/domain/contract.ts` | ~40 | ~15 | Extract `WebSocketContract` from schema | | `src/domain/discovery.ts` | ~5 | ~10 | Detect `websocket: true` flag | | `src/formula/parser.ts` | ~25 | ~5 | Parse `ws_message` and `ws_state` | | `src/formula/evaluator.ts` | ~15 | ~5 | Evaluate WS operations | | `src/test/ws-runner.ts` | ~350 | — | Main WS test runner | | `src/test/ws-client.ts` | ~120 | — | WS client wrapper | | `src/domain/ws-contract-validation.ts` | ~80 | — | WS-specific validation | | `src/plugin/index.ts` | ~60 | ~20 | Wire WS into contract()/stateful() | | `src/domain/category.ts` | ~5 | ~5 | Default WS to 'observer' | | `src/infrastructure/hook-validator.ts` | ~80 | ~10 | Runtime WS validation | | **Total** | **~900** | **~80** | | --- *Specification version: 1.0.0* *Target APOPHIS version: 1.0.0* *Last updated: 2025-01-09*