Files
apophis-fastify/docs/attic/extensions/HTTP-EXTENSIONS.md
T

1742 lines
45 KiB
Markdown

# APOPHIS v1.0 HTTP Extensions Technical Specification
## Overview
This document specifies four HTTP feature extensions for APOPHIS:
1. **Multipart File Uploads** (`multipart/form-data`)
2. **Custom Serializers** (Protobuf, MessagePack, XML, etc.)
3. **Streaming Responses** (chunked transfer, NDJSON)
4. **Server-Sent Events (SSE)**
Each feature is specified with symbol-level anchors, public types, affected modules, function signatures, and pseudocode for implementation.
---
## 1. Multipart File Uploads (`multipart/form-data`)
### 1.1 JSON Schema Annotations
New `x-*` properties for multipart support:
| Property | Type | Description |
|----------|------|-------------|
| `x-content-type` | `string` | Override content type for request body. Value: `"multipart/form-data"` |
| `x-multipart-fields` | `object` | Map of field names to their schema definitions |
| `x-multipart-files` | `object` | Map of file field names to constraints (maxSize, mimeTypes, etc.) |
Example schema annotation:
```typescript
// Route schema example
schema: {
body: {
type: 'object',
'x-content-type': 'multipart/form-data',
'x-multipart-fields': {
description: { type: 'string', maxLength: 500 }
},
'x-multipart-files': {
avatar: {
maxSize: 5 * 1024 * 1024, // 5MB
mimeTypes: ['image/jpeg', 'image/png', 'image/webp'],
maxCount: 1
},
attachments: {
maxSize: 10 * 1024 * 1024, // 10MB per file
mimeTypes: ['application/pdf', 'text/plain'],
maxCount: 5
}
}
}
}
```
### 1.2 Changes to `src/types.ts`
**Line 12-22**: Extend `RouteContract` interface:
```typescript
export interface RouteContract {
path: string
method: string
category: OperationCategory
requires: string[]
ensures: string[]
invariants: string[]
regexPatterns: Record<string, string>
validateRuntime: boolean
schema?: Record<string, unknown>
// NEW: Multipart configuration extracted from schema
multipart?: {
fields: Record<string, Record<string, unknown>>
files: Record<string, MultipartFileConstraint>
}
}
// NEW: Multipart file constraint type
export interface MultipartFileConstraint {
readonly maxSize: number // bytes
readonly mimeTypes: string[] // allowed MIME types
readonly maxCount: number // max files per field
}
```
**Line 8-16**: Extend `RequestStructure` interface:
```typescript
export interface RequestStructure {
method: string
url: string
headers: Record<string, string>
query?: Record<string, string>
body?: unknown
contentType?: string
// NEW: Multipart payload
multipart?: {
fields: Record<string, string>
files: MultipartFile[]
}
}
// NEW: Multipart file representation for test data generation
export interface MultipartFile {
readonly fieldname: string
readonly originalname: string
readonly mimetype: string
readonly size: number
readonly buffer: Buffer // Injected by test generator
}
```
### 1.3 Changes to `src/domain/schema-to-arbitrary.ts`
**Line 9-12**: Extend `SchemaToArbOptions`:
```typescript
export interface SchemaToArbOptions {
readonly context: 'request' | 'response'
// NEW: Generate multipart payloads when true
readonly generateMultipart?: boolean
}
```
**Line 47-74**: Add `buildMultipartArb` function after `buildStringArb`:
```typescript
const buildMultipartArb = (
schema: Record<string, unknown>
): Arbitrary<{ fields: Record<string, unknown>; files: MultipartFile[] }> => {
const fieldsSchema = schema['x-multipart-fields'] as Record<string, Record<string, unknown>> | undefined
const filesSchema = schema['x-multipart-files'] as Record<string, Record<string, unknown>> | undefined
// Build field arbitraries
const fieldArbs: Record<string, Arbitrary<unknown>> = {}
if (fieldsSchema) {
for (const [key, fieldSchema] of Object.entries(fieldsSchema)) {
if (isObject(fieldSchema)) {
fieldArbs[key] = convertSchemaInternal(fieldSchema, { context: 'request' })
}
}
}
// Build file arbitraries
const fileArbs: Arbitrary<MultipartFile[]> = fc.array(
fc.record<MultipartFile>({
fieldname: fc.string({ minLength: 1, maxLength: 50 }),
originalname: fc.string({ minLength: 1, maxLength: 100 }),
mimetype: fc.constantFrom('image/jpeg', 'image/png', 'application/pdf', 'text/plain'),
size: fc.integer({ min: 1, max: 1024 * 1024 }), // 1MB max for tests
buffer: fc.uint8Array({ minLength: 1, maxLength: 1024 }).map(arr => Buffer.from(arr)),
}),
{ minLength: 1, maxLength: 3 }
)
return fc.tuple(
Object.keys(fieldArbs).length > 0 ? fc.record(fieldArbs) : fc.constant({}),
fileArbs
).map(([fields, files]) => ({ fields, files }))
}
```
**Line 134-167**: Modify `convertSchemaInternal` to detect multipart:
```typescript
const convertSchemaInternal = (
schema: Record<string, unknown>,
options: SchemaToArbOptions
): Arbitrary<unknown> => {
const type = getString(schema, 'type')
const enumValues = getArray(schema, 'enum')
const nullable = getBoolean(schema, 'nullable')
const contentType = getString(schema, 'x-content-type')
// NEW: Handle multipart schemas
if (contentType === 'multipart/form-data' && options.generateMultipart) {
return buildMultipartArb(schema) as Arbitrary<unknown>
}
let arb: Arbitrary<unknown>
if (enumValues !== undefined && enumValues.length > 0) {
arb = fc.constantFrom(...enumValues)
} else if (type === 'string') {
arb = buildStringArb(schema)
} else if (type === 'integer') {
arb = buildIntegerArb(schema)
} else if (type === 'number') {
arb = fc.float()
} else if (type === 'boolean') {
arb = fc.boolean()
} else if (type === 'array') {
arb = buildArrayArb(schema, options)
} else if (type === 'object') {
arb = buildObjectArb(schema, options)
} else {
arb = fc.anything()
}
if (nullable === true) {
return fc.option(arb, { nil: null })
}
return arb
}
```
### 1.4 Changes to `src/infrastructure/http-executor.ts`
**Line 64-129**: Modify `executeHttp` to handle multipart payloads:
```typescript
export const executeHttp = async (
fastify: FastifyInjectInstance,
route: RouteContract,
request: RequestStructure,
previous?: EvalContext
): Promise<EvalContext> => {
const queryString = buildQueryString(request.query)
const fullUrl = queryString ? `${request.url}?${queryString}` : request.url
if (process.env.APOPHIS_DEBUG === '1') {
log.debug(`${request.method} ${fullUrl}`, {
headers: request.headers,
body: request.body,
multipart: request.multipart,
})
}
// NEW: Handle multipart uploads
let payload: unknown = request.body
let headers = { ...request.headers }
if (request.multipart) {
// Build FormData for multipart
const formData = new FormData()
// Add fields
for (const [key, value] of Object.entries(request.multipart.fields)) {
formData.append(key, String(value))
}
// Add files
for (const file of request.multipart.files) {
const blob = new Blob([file.buffer], { type: file.mimetype })
formData.append(file.fieldname, blob, file.originalname)
}
payload = formData
// FormData sets its own content-type with boundary
delete headers['content-type']
}
const response = await fastify.inject({
method: request.method,
url: fullUrl,
payload,
headers,
})
const pathParams = extractPathParams(route.path, request.url)
let responseBody: unknown
try {
responseBody = response.json()
} catch {
responseBody = undefined
}
if (process.env.APOPHIS_DEBUG === '1') {
log.debug(`${response.statusCode} ${request.method} ${fullUrl}`, {
headers: response.headers,
body: responseBody,
})
}
const ctx: EvalContext = {
request: {
body: request.body,
headers: request.headers,
query: request.query || {},
params: pathParams,
// NEW: Include multipart info in context for formula evaluation
multipart: request.multipart,
},
response: {
body: responseBody,
headers: stringifyHeaders(response.headers),
statusCode: response.statusCode,
},
previous,
}
return ctx
}
```
**Line 71-90**: Update `FastifyInjectInstance` interface to accept FormData:
```typescript
export interface FastifyInjectInstance {
routes?: Array<{ method: string; url: string; schema?: Record<string, unknown> }>
inject(opts: {
method: string
url: string
payload?: unknown | FormData
headers?: Record<string, string>
}): Promise<{
json(): unknown
statusCode: number
headers: Record<string, unknown>
}>
}
```
### 1.5 Changes to `src/infrastructure/hook-validator.ts`
**Line 53-66**: Update `buildPreContext` to include multipart:
```typescript
const buildPreContext = (request: FastifyRequest): EvalContext => ({
request: {
body: request.body,
headers: request.headers as Record<string, string>,
query: request.query as Record<string, unknown>,
params: request.params as Record<string, string>,
cookies: getCookies(request),
// NEW: Extract multipart data if present
multipart: (request as any).multipart,
},
response: {
body: null,
headers: {},
statusCode: 0,
},
})
```
### 1.6 Changes to `src/domain/request-builder.ts`
**Line 135-163**: Modify `buildRequest` to detect and build multipart:
```typescript
export const buildRequest = (
route: RouteContract,
generatedData: Record<string, unknown>,
scopeHeaders: Record<string, string>,
state: ModelState,
rng?: SeededRng
): RequestStructure => {
const url = substitutePathParams(route.path, generatedData, state, rng)
// Check if route expects multipart
const bodySchema = route.schema?.body as Record<string, unknown> | undefined
const contentType = bodySchema?.['x-content-type'] as string | undefined
if (contentType === 'multipart/form-data') {
// Extract multipart data from generated payload
const multipartData = generatedData as { fields: Record<string, unknown>; files: MultipartFile[] }
return {
method: route.method,
url,
headers: { ...scopeHeaders },
multipart: {
fields: Object.fromEntries(
Object.entries(multipartData.fields).map(([k, v]) => [k, String(v)])
),
files: multipartData.files,
},
contentType: 'multipart/form-data',
}
}
// Existing body/query extraction logic
const body = bodySchema
? extractBodyParams(generatedData, bodySchema)
: undefined
const querySchema = route.schema?.querystring as Record<string, unknown> | undefined
const query = querySchema
? extractQueryParams(generatedData, querySchema)
: extractRemainingParams(generatedData, parseRouteParams(route.path), body)
const headers = buildHeaders(route, scopeHeaders, generatedData, state)
const contentTypeHeader = body ? 'application/json' : undefined
return { method: route.method, url, headers, query, body, contentType: contentTypeHeader }
}
```
### 1.7 New APOSTL Operations/Formulas
New operation headers for multipart access:
```typescript
// In src/types.ts, extend OperationHeader:
export type OperationHeader =
| 'request_body' | 'response_body' | 'response_code'
| 'request_headers' | 'response_headers' | 'query_params'
| 'cookies' | 'response_time'
// NEW:
| 'request_files' | 'request_fields'
```
Example formulas:
```apostl
// Check file count
request_files(this).avatar.count == 1
// Check file size
request_files(this).avatar.size <= 5242880
// Check MIME type
request_files(this).avatar.mimetype matches "image/(jpeg|png|webp)"
// Check field presence
request_fields(this).description != null
// Check field value
request_fields(this).description.length > 10
```
### 1.8 Changes to `src/formula/parser.ts`
**Line 222-225**: Add new valid headers:
```typescript
const VALID_HEADERS: OperationHeader[] = [
'request_body', 'response_body', 'response_code',
'request_headers', 'response_headers', 'query_params', 'cookies', 'response_time',
// NEW:
'request_files', 'request_fields'
]
```
**Line 227-379**: Extend `parseOperation` to handle new headers (add manual charCode checks for `request_files` and `request_fields`).
### 1.9 Changes to `src/formula/evaluator.ts`
**Line 9-65**: Extend `resolveOperation`:
```typescript
function resolveOperation(node: Extract<FormulaNode, { type: 'operation' }>, ctx: EvalContext): unknown {
const { header, parameter, accessor } = node
let target: unknown
switch (header) {
// ... existing cases ...
// NEW: Multipart access
case 'request_files':
target = ctx.request.multipart?.files ?? []
break
case 'request_fields':
target = ctx.request.multipart?.fields ?? {}
break
default:
throw new Error(`Unknown operation header: ${header}`)
}
// ... existing accessor logic ...
}
```
### 1.10 Changes to `src/domain/contract.ts`
**Line 63-73**: Extract multipart config:
```typescript
const contract: RouteContract = {
path,
method: method.toUpperCase(),
category,
requires,
ensures,
invariants: EMPTY_INVARIANTS,
regexPatterns: {},
validateRuntime,
schema: s,
// NEW: Extract multipart configuration
multipart: bodySchema?.['x-content-type'] === 'multipart/form-data'
? {
fields: (bodySchema['x-multipart-fields'] as Record<string, Record<string, unknown>>) ?? {},
files: (bodySchema['x-multipart-files'] as Record<string, Record<string, unknown>>) ?? {},
}
: undefined,
}
```
### 1.11 Example Fastify Route Definition
```typescript
fastify.post('/upload', {
schema: {
description: 'Upload avatar with metadata',
body: {
type: 'object',
'x-content-type': 'multipart/form-data',
'x-multipart-fields': {
description: { type: 'string', maxLength: 500 }
},
'x-multipart-files': {
avatar: {
maxSize: 5 * 1024 * 1024,
mimeTypes: ['image/jpeg', 'image/png', 'image/webp'],
maxCount: 1
}
}
},
response: {
201: {
type: 'object',
properties: {
id: { type: 'string', format: 'uuid' },
url: { type: 'string', format: 'uri' },
size: { type: 'integer' }
},
'x-ensures': [
'response_body(this).url matches "^https?://"',
'response_body(this).size > 0'
]
}
}
}
}, async (request, reply) => {
// Handler receives multipart data via @fastify/multipart
const data = await request.file()
// ... process upload ...
return { id: '...', url: '...', size: 12345 }
})
```
### 1.12 Backward Compatibility
- Routes without `x-content-type: multipart/form-data` behave exactly as before
- `RequestStructure.multipart` is optional
- `EvalContext.request.multipart` is optional
- Test generators only produce multipart data when `generateMultipart: true` is passed
---
## 2. Custom Serializers (Protobuf, MessagePack, XML, etc.)
### 2.1 JSON Schema Annotations
| Property | Type | Description |
|----------|------|-------------|
| `x-serializer` | `string` | Serializer identifier: `"protobuf"`, `"msgpack"`, `"xml"`, `"custom"` |
| `x-serializer-schema` | `string` | Path to serializer schema file (e.g., `.proto`, `.xsd`) |
| `x-serializer-version` | `string` | Schema version for compatibility checks |
Example:
```typescript
schema: {
body: {
type: 'object',
'x-serializer': 'protobuf',
'x-serializer-schema': './schemas/user.proto',
'x-serializer-version': 'v1.2.3'
}
}
```
### 2.2 Changes to `src/types.ts`
**Line 12-22**: Extend `RouteContract`:
```typescript
export interface RouteContract {
// ... existing fields ...
// NEW: Serializer configuration
serializer?: {
name: string
schemaPath?: string
version?: string
}
}
```
**Line 8-16**: Extend `RequestStructure`:
```typescript
export interface RequestStructure {
// ... existing fields ...
// NEW: Serialization hint
serializer?: string
}
```
**Line 71-86**: Extend `EvalContext`:
```typescript
export interface EvalContext {
request: {
// ... existing fields ...
// NEW: Raw serialized payload for inspection
rawBody?: Buffer
}
response: {
// ... existing fields ...
// NEW: Raw serialized response for inspection
rawBody?: Buffer
// NEW: Serializer used for response
serializer?: string
}
}
```
### 2.3 Changes to `src/domain/schema-to-arbitrary.ts`
No changes required. Custom serializers do not affect test data generation — APOPHIS generates native JS objects, and serialization happens at the HTTP layer.
### 2.4 Changes to `src/infrastructure/http-executor.ts`
**Line 1-18**: Add serializer interface:
```typescript
// NEW: Serializer registry interface
export interface Serializer {
readonly name: string
encode(data: unknown): Buffer
decode(buffer: Buffer): unknown
}
// NEW: Serializer registry (injected)
export interface SerializerRegistry {
get(name: string): Serializer | undefined
register(name: string, serializer: Serializer): void
}
```
**Line 64-129**: Modify `executeHttp` to handle serialization:
```typescript
export const executeHttp = async (
fastify: FastifyInjectInstance,
route: RouteContract,
request: RequestStructure,
previous?: EvalContext,
// NEW: Injected serializer registry
serializers?: SerializerRegistry
): Promise<EvalContext> => {
const queryString = buildQueryString(request.query)
const fullUrl = queryString ? `${request.url}?${queryString}` : request.url
// NEW: Serialize request body if serializer specified
let payload: unknown = request.body
let requestRawBody: Buffer | undefined
if (route.serializer && serializers) {
const serializer = serializers.get(route.serializer.name)
if (serializer) {
requestRawBody = serializer.encode(request.body)
payload = requestRawBody
}
}
const response = await fastify.inject({
method: request.method,
url: fullUrl,
payload,
headers: request.headers,
})
const pathParams = extractPathParams(route.path, request.url)
// NEW: Deserialize response body if serializer specified
let responseBody: unknown
let responseRawBody: Buffer | undefined
try {
if (route.serializer && serializers) {
const serializer = serializers.get(route.serializer.name)
if (serializer) {
// Assuming response.raw is available or we can get buffer
responseRawBody = Buffer.from(JSON.stringify(response.json())) // Fallback
responseBody = serializer.decode(responseRawBody)
} else {
responseBody = response.json()
}
} else {
responseBody = response.json()
}
} catch {
responseBody = undefined
}
const ctx: EvalContext = {
request: {
body: request.body,
headers: request.headers,
query: request.query || {},
params: pathParams,
rawBody: requestRawBody,
},
response: {
body: responseBody,
headers: stringifyHeaders(response.headers),
statusCode: response.statusCode,
rawBody: responseRawBody,
serializer: route.serializer?.name,
},
previous,
}
return ctx
}
```
### 2.5 Changes to `src/infrastructure/hook-validator.ts`
**Line 68-81**: Update `buildPostContext` to capture serialized payload:
```typescript
const buildPostContext = (request: FastifyRequest, reply: FastifyReply): EvalContext => ({
request: {
body: request.body,
headers: request.headers as Record<string, string>,
query: request.query as Record<string, unknown>,
params: request.params as Record<string, string>,
cookies: getCookies(request),
// NEW: Capture raw body if available
rawBody: (request as any).rawBody,
},
response: {
body: reply[kApophisPayload] ?? null,
headers: reply.getHeaders() as Record<string, string>,
statusCode: reply.statusCode,
// NEW: Serializer info from route config
serializer: (request.routeOptions?.config as any)?.apophisContract?.serializer?.name,
},
})
```
### 2.6 New APOSTL Operations/Formulas
New formula functions for serializer validation:
```apostl
// Check serializer used
response_headers(this)['content-type'] == "application/x-protobuf"
// Check schema version (via custom header)
response_headers(this)['x-schema-version'] == "v1.2.3"
// Check raw body size
response_body(this) != null
```
### 2.7 Changes to `src/domain/contract.ts`
**Line 63-73**: Extract serializer config:
```typescript
const contract: RouteContract = {
path,
method: method.toUpperCase(),
category,
requires,
ensures,
invariants: EMPTY_INVARIANTS,
regexPatterns: {},
validateRuntime,
schema: s,
// NEW: Extract serializer configuration
serializer: s['x-serializer']
? {
name: String(s['x-serializer']),
schemaPath: s['x-serializer-schema'] as string | undefined,
version: s['x-serializer-version'] as string | undefined,
}
: undefined,
}
```
### 2.8 Example Fastify Route Definition
```typescript
// Protobuf route
fastify.post('/users', {
schema: {
body: {
type: 'object',
'x-serializer': 'protobuf',
'x-serializer-schema': './schemas/user.proto',
properties: {
name: { type: 'string' },
email: { type: 'string', format: 'email' }
}
},
response: {
201: {
type: 'object',
'x-serializer': 'protobuf',
properties: {
id: { type: 'string' },
createdAt: { type: 'string', format: 'date-time' }
}
}
}
}
}, async (request, reply) => {
// Handler receives deserialized protobuf message
// Fastify plugin handles serialization/deserialization
})
```
### 2.9 Backward Compatibility
- Routes without `x-serializer` behave exactly as before (JSON default)
- `SerializerRegistry` is optional dependency — injected at plugin initialization
- Raw body capture only occurs when serializer is configured
---
## 3. Streaming Responses (Chunked Transfer, NDJSON)
### 3.1 JSON Schema Annotations
| Property | Type | Description |
|----------|------|-------------|
| `x-streaming` | `boolean` | Enable streaming response handling |
| `x-stream-format` | `string` | Stream format: `"ndjson"`, `"sse"`, `"chunked"` |
| `x-stream-max-chunks` | `number` | Max chunks to collect for validation |
| `x-stream-timeout` | `number` | Milliseconds to wait for stream completion |
Example:
```typescript
schema: {
response: {
200: {
type: 'object',
'x-streaming': true,
'x-stream-format': 'ndjson',
'x-stream-max-chunks': 100,
'x-stream-timeout': 5000,
properties: {
items: {
type: 'array',
items: { type: 'object' }
}
}
}
}
}
```
### 3.2 Changes to `src/types.ts`
**Line 12-22**: Extend `RouteContract`:
```typescript
export interface RouteContract {
// ... existing fields ...
// NEW: Streaming configuration
streaming?: {
enabled: boolean
format: 'ndjson' | 'sse' | 'chunked'
maxChunks: number
timeoutMs: number
}
}
```
**Line 71-86**: Extend `EvalContext`:
```typescript
export interface EvalContext {
request: {
// ... existing fields ...
}
response: {
// ... existing fields ...
// NEW: Streaming response data
chunks?: unknown[]
streamFormat?: string
streamDurationMs?: number
}
}
```
### 3.3 Changes to `src/domain/schema-to-arbitrary.ts`
No changes required for streaming. Test data generation for requests is unchanged.
### 3.4 Changes to `src/infrastructure/http-executor.ts`
**Line 64-129**: Add streaming response handling:
```typescript
export const executeHttp = async (
fastify: FastifyInjectInstance,
route: RouteContract,
request: RequestStructure,
previous?: EvalContext
): Promise<EvalContext> => {
const queryString = buildQueryString(request.query)
const fullUrl = queryString ? `${request.url}?${queryString}` : request.url
const response = await fastify.inject({
method: request.method,
url: fullUrl,
payload: request.body,
headers: request.headers,
})
const pathParams = extractPathParams(route.path, request.url)
// NEW: Handle streaming responses
let responseBody: unknown
let chunks: unknown[] | undefined
let streamDurationMs: number | undefined
if (route.streaming?.enabled) {
const startTime = Date.now()
chunks = await collectStreamChunks(response, route.streaming)
streamDurationMs = Date.now() - startTime
// Aggregate chunks based on format
if (route.streaming.format === 'ndjson') {
responseBody = chunks
} else if (route.streaming.format === 'sse') {
responseBody = parseSSEEvents(chunks)
} else {
responseBody = chunks.join('')
}
} else {
try {
responseBody = response.json()
} catch {
responseBody = undefined
}
}
const ctx: EvalContext = {
request: {
body: request.body,
headers: request.headers,
query: request.query || {},
params: pathParams,
},
response: {
body: responseBody,
headers: stringifyHeaders(response.headers),
statusCode: response.statusCode,
chunks,
streamFormat: route.streaming?.format,
streamDurationMs,
},
previous,
}
return ctx
}
// NEW: Stream chunk collection
async function collectStreamChunks(
response: any,
config: { maxChunks: number; timeoutMs: number; format: string }
): Promise<unknown[]> {
const chunks: unknown[] = []
const startTime = Date.now()
// Fastify injection does not provide a portable stream-consumption API here.
// Verify implementation against light-my-request behavior before declaring streaming support.
if (response.raw && response.raw.readable) {
for await (const chunk of response.raw) {
if (Date.now() - startTime > config.timeoutMs) {
break
}
if (config.format === 'ndjson') {
// Parse each line as JSON
const lines = chunk.toString().split('\n').filter(Boolean)
for (const line of lines) {
try {
chunks.push(JSON.parse(line))
} catch {
chunks.push(line)
}
}
} else {
chunks.push(chunk.toString())
}
if (chunks.length >= config.maxChunks) {
break
}
}
} else {
// Non-streaming fallback
try {
chunks.push(response.json())
} catch {
chunks.push(response.payload)
}
}
return chunks
}
// NEW: Parse SSE events
function parseSSEEvents(chunks: unknown[]): Array<{ event?: string; data: unknown; id?: string }> {
const events: Array<{ event?: string; data: unknown; id?: string }> = []
let currentEvent: Partial<{ event?: string; data: string; id?: string }> = {}
for (const chunk of chunks) {
const lines = String(chunk).split('\n')
for (const line of lines) {
if (line.startsWith('event:')) {
currentEvent.event = line.slice(6).trim()
} else if (line.startsWith('id:')) {
currentEvent.id = line.slice(3).trim()
} else if (line.startsWith('data:')) {
currentEvent.data = (currentEvent.data || '') + line.slice(5).trim()
} else if (line === '') {
if (currentEvent.data) {
try {
events.push({ ...currentEvent, data: JSON.parse(currentEvent.data) })
} catch {
events.push({ ...currentEvent, data: currentEvent.data })
}
currentEvent = {}
}
}
}
}
return events
}
```
### 3.5 Changes to `src/infrastructure/hook-validator.ts`
**Line 68-81**: Update `buildPostContext`:
```typescript
const buildPostContext = (request: FastifyRequest, reply: FastifyReply): EvalContext => ({
request: {
body: request.body,
headers: request.headers as Record<string, string>,
query: request.query as Record<string, unknown>,
params: request.params as Record<string, string>,
cookies: getCookies(request),
},
response: {
body: reply[kApophisPayload] ?? null,
headers: reply.getHeaders() as Record<string, string>,
statusCode: reply.statusCode,
// NEW: Streaming info from route config
chunks: (reply as any).apophisChunks,
streamFormat: (request.routeOptions?.config as any)?.apophisContract?.streaming?.format,
},
})
```
### 3.6 New APOSTL Operations/Formulas
New formula functions for streaming validation:
```apostl
// Check chunk count
response_body(this).chunks.length <= 100
// Check stream duration
response_time(this) < 5000
// Check NDJSON structure (each chunk has required field)
for item in response_body(this): item.id != null
// Check SSE event type
response_body(this).events.0.event == "update"
// Check stream completed
response_headers(this)['transfer-encoding'] == "chunked"
```
### 3.7 Changes to `src/domain/contract.ts`
**Line 63-73**: Extract streaming config:
```typescript
const contract: RouteContract = {
path,
method: method.toUpperCase(),
category,
requires,
ensures,
invariants: EMPTY_INVARIANTS,
regexPatterns: {},
validateRuntime,
schema: s,
// NEW: Extract streaming configuration from response schema
streaming: (() => {
const responseSchema = (s.response ?? {}) as Record<string, Record<string, unknown>>
const firstStatus = Object.values(responseSchema)[0]
if (firstStatus?.['x-streaming'] === true) {
return {
enabled: true,
format: (firstStatus['x-stream-format'] as 'ndjson' | 'sse' | 'chunked') ?? 'chunked',
maxChunks: (firstStatus['x-stream-max-chunks'] as number) ?? 100,
timeoutMs: (firstStatus['x-stream-timeout'] as number) ?? 5000,
}
}
return undefined
})(),
}
```
### 3.8 Example Fastify Route Definition
```typescript
// NDJSON streaming route
fastify.get('/events', {
schema: {
response: {
200: {
type: 'object',
'x-streaming': true,
'x-stream-format': 'ndjson',
'x-stream-max-chunks': 50,
'x-stream-timeout': 3000,
properties: {
events: {
type: 'array',
items: {
type: 'object',
properties: {
id: { type: 'string' },
timestamp: { type: 'string', format: 'date-time' },
data: { type: 'object' }
}
}
}
},
'x-ensures': [
'for item in response_body(this): item.id != null',
'response_body(this).length <= 50'
]
}
}
}
}, async (request, reply) => {
reply.header('content-type', 'application/x-ndjson')
const stream = new Readable({
read() {
// Stream NDJSON data
this.push(JSON.stringify({ id: '1', timestamp: new Date().toISOString(), data: {} }) + '\n')
this.push(null)
}
})
return reply.send(stream)
})
```
### 3.9 Backward Compatibility
- Routes without `x-streaming` behave exactly as before
- `EvalContext.response.chunks` is optional
- Non-streaming responses fallback to existing JSON parsing
---
## 4. Server-Sent Events (SSE)
### 4.1 JSON Schema Annotations
| Property | Type | Description |
|----------|------|-------------|
| `x-sse` | `boolean` | Mark response as SSE stream |
| `x-sse-events` | `string[]` | Allowed event types |
| `x-sse-max-events` | `number` | Max events to collect for validation |
| `x-sse-timeout` | `number` | Milliseconds to wait for events |
| `x-sse-retry` | `number` | Expected retry interval (ms) |
Example:
```typescript
schema: {
response: {
200: {
type: 'object',
'x-sse': true,
'x-sse-events': ['update', 'delete', 'heartbeat'],
'x-sse-max-events': 10,
'x-sse-timeout': 30000,
'x-sse-retry': 3000,
properties: {
events: {
type: 'array',
items: {
type: 'object',
properties: {
event: { type: 'string' },
data: { type: 'object' },
id: { type: 'string' }
}
}
}
}
}
}
}
```
### 4.2 Changes to `src/types.ts`
**Line 12-22**: Extend `RouteContract`:
```typescript
export interface RouteContract {
// ... existing fields ...
// NEW: SSE configuration
sse?: {
enabled: boolean
allowedEvents: string[]
maxEvents: number
timeoutMs: number
retryMs?: number
}
}
```
**Line 71-86**: Extend `EvalContext`:
```typescript
export interface EvalContext {
request: {
// ... existing fields ...
}
response: {
// ... existing fields ...
// NEW: SSE-specific data
sseEvents?: Array<{
event?: string
data: unknown
id?: string
retry?: number
}>
sseDurationMs?: number
}
}
```
### 4.3 Changes to `src/domain/schema-to-arbitrary.ts`
No changes required. SSE is response-only; request generation is unchanged.
### 4.4 Changes to `src/infrastructure/http-executor.ts`
**Line 64-129**: Add SSE handling:
```typescript
export const executeHttp = async (
fastify: FastifyInjectInstance,
route: RouteContract,
request: RequestStructure,
previous?: EvalContext
): Promise<EvalContext> => {
const queryString = buildQueryString(request.query)
const fullUrl = queryString ? `${request.url}?${queryString}` : request.url
const response = await fastify.inject({
method: request.method,
url: fullUrl,
payload: request.body,
headers: {
...request.headers,
// NEW: SSE requires Accept: text/event-stream
accept: route.sse?.enabled ? 'text/event-stream' : request.headers.accept,
},
})
const pathParams = extractPathParams(route.path, request.url)
// NEW: Handle SSE responses
let responseBody: unknown
let sseEvents: Array<{ event?: string; data: unknown; id?: string; retry?: number }> | undefined
let sseDurationMs: number | undefined
if (route.sse?.enabled) {
const startTime = Date.now()
sseEvents = await collectSSEEvents(response, route.sse)
sseDurationMs = Date.now() - startTime
responseBody = { events: sseEvents }
} else {
try {
responseBody = response.json()
} catch {
responseBody = undefined
}
}
const ctx: EvalContext = {
request: {
body: request.body,
headers: request.headers,
query: request.query || {},
params: pathParams,
},
response: {
body: responseBody,
headers: stringifyHeaders(response.headers),
statusCode: response.statusCode,
sseEvents,
sseDurationMs,
},
previous,
}
return ctx
}
// NEW: SSE event collection
async function collectSSEEvents(
response: any,
config: { maxEvents: number; timeoutMs: number; retryMs?: number }
): Promise<Array<{ event?: string; data: unknown; id?: string; retry?: number }>> {
const events: Array<{ event?: string; data: unknown; id?: string; retry?: number }> = []
const startTime = Date.now()
if (response.raw && response.raw.readable) {
let buffer = ''
for await (const chunk of response.raw) {
if (Date.now() - startTime > config.timeoutMs) {
break
}
buffer += chunk.toString()
const lines = buffer.split('\n')
buffer = lines.pop() || '' // Keep incomplete line in buffer
let currentEvent: Partial<{ event?: string; data: string; id?: string; retry?: number }> = {}
for (const line of lines) {
if (line.startsWith('event:')) {
currentEvent.event = line.slice(6).trim()
} else if (line.startsWith('id:')) {
currentEvent.id = line.slice(3).trim()
} else if (line.startsWith('data:')) {
currentEvent.data = (currentEvent.data || '') + line.slice(5).trim() + '\n'
} else if (line.startsWith('retry:')) {
currentEvent.retry = parseInt(line.slice(6).trim(), 10)
} else if (line === '') {
// End of event
if (currentEvent.data) {
const data = currentEvent.data.trim()
try {
events.push({
...currentEvent,
data: JSON.parse(data),
})
} catch {
events.push({
...currentEvent,
data,
})
}
currentEvent = {}
if (events.length >= config.maxEvents) {
return events
}
}
}
}
}
}
return events
}
```
### 4.5 Changes to `src/infrastructure/hook-validator.ts`
**Line 68-81**: Update `buildPostContext`:
```typescript
const buildPostContext = (request: FastifyRequest, reply: FastifyReply): EvalContext => ({
request: {
body: request.body,
headers: request.headers as Record<string, string>,
query: request.query as Record<string, unknown>,
params: request.params as Record<string, string>,
cookies: getCookies(request),
},
response: {
body: reply[kApophisPayload] ?? null,
headers: reply.getHeaders() as Record<string, string>,
statusCode: reply.statusCode,
// NEW: SSE events from reply
sseEvents: (reply as any).apophisSseEvents,
},
})
```
### 4.6 New APOSTL Operations/Formulas
New formula functions for SSE validation:
```apostl
// Check event type is allowed
response_body(this).events.0.event == "update"
// Check event count
response_body(this).events.length <= 10
// Check SSE retry interval
response_body(this).events.0.retry == 3000
// Check event has data
response_body(this).events.0.data != null
// Check event ID is present
response_body(this).events.0.id != null
// Check content-type header
response_headers(this)['content-type'] == "text/event-stream"
// Check cache-control header for SSE
response_headers(this)['cache-control'] == "no-cache"
```
### 4.7 Changes to `src/formula/evaluator.ts`
**Line 143-215**: Extend `evaluateNode` to handle array indexing in accessors:
```typescript
case 'operation': {
return resolveOperation(node, ctx)
}
```
The existing accessor resolution in `resolveOperation` (lines 43-62) already supports numeric array indices via string accessors. For SSE, `response_body(this).events.0.event` will work as-is because:
- `events` resolves to the array
- `0` is used as a property key (works for arrays)
- `event` resolves to the event property
No changes needed to the evaluator for basic SSE access.
### 4.8 Changes to `src/domain/contract.ts`
**Line 63-73**: Extract SSE config:
```typescript
const contract: RouteContract = {
path,
method: method.toUpperCase(),
category,
requires,
ensures,
invariants: EMPTY_INVARIANTS,
regexPatterns: {},
validateRuntime,
schema: s,
// NEW: Extract SSE configuration from response schema
sse: (() => {
const responseSchema = (s.response ?? {}) as Record<string, Record<string, unknown>>
const firstStatus = Object.values(responseSchema)[0]
if (firstStatus?.['x-sse'] === true) {
return {
enabled: true,
allowedEvents: (firstStatus['x-sse-events'] as string[]) ?? [],
maxEvents: (firstStatus['x-sse-max-events'] as number) ?? 10,
timeoutMs: (firstStatus['x-sse-timeout'] as number) ?? 30000,
retryMs: firstStatus['x-sse-retry'] as number | undefined,
}
}
return undefined
})(),
}
```
### 4.9 Example Fastify Route Definition
```typescript
// SSE route
fastify.get('/notifications', {
schema: {
response: {
200: {
type: 'object',
'x-sse': true,
'x-sse-events': ['notification', 'heartbeat'],
'x-sse-max-events': 5,
'x-sse-timeout': 10000,
'x-sse-retry': 5000,
properties: {
events: {
type: 'array',
items: {
type: 'object',
properties: {
event: { type: 'string' },
data: { type: 'object' },
id: { type: 'string' }
}
}
}
},
'x-ensures': [
'response_headers(this)["content-type"] == "text/event-stream"',
'for event in response_body(this).events: event.data != null',
'response_body(this).events.length <= 5'
]
}
}
}
}, async (request, reply) => {
reply.header('content-type', 'text/event-stream')
reply.header('cache-control', 'no-cache')
reply.header('connection', 'keep-alive')
const stream = new Readable({
read() {
this.push(`event: notification\n`)
this.push(`id: ${Date.now()}\n`)
this.push(`data: ${JSON.stringify({ message: 'Hello' })}\n\n`)
this.push(null)
}
})
return reply.send(stream)
})
```
### 4.10 Backward Compatibility
- Routes without `x-sse` behave exactly as before
- `EvalContext.response.sseEvents` is optional
- SSE-specific headers only sent when `x-sse` is enabled
---
## Cross-Cutting Concerns
### Plugin Options Extension
**File**: `src/types.ts`, lines 257-262
Extend `ApophisOptions`:
```typescript
export interface ApophisOptions {
readonly swagger?: Record<string, unknown>
readonly runtime?: 'off' | 'warn' | 'error'
readonly cleanup?: boolean
readonly scopes?: Record<string, ScopeConfig>
// NEW: Extension options
readonly serializers?: SerializerRegistry
readonly multipart?: {
readonly maxFileSize: number
readonly maxFiles: number
}
}
```
### Plugin Registration
**File**: `src/plugin/index.ts`, lines 110-159
Update plugin to inject serializer registry:
```typescript
export const apophisPlugin = async (fastify: FastifyInstance, opts: ApophisOptions): Promise<void> => {
await registerSwagger(fastify, opts)
// NEW: Initialize serializer registry if provided
const serializerRegistry = opts.serializers ?? {
get: () => undefined,
register: () => {},
}
// ... existing route capture ...
const decorations: ApophisDecorations = {
scope,
contract: buildContract(fastify, scope),
stateful: buildStateful(fastify, scope, cleanupManager),
check: buildCheck(fastify, scope),
cleanup: buildCleanup(cleanupManager),
spec: buildSpec(fastify),
// NEW: Expose serializer registry
serializers: serializerRegistry,
}
fastify.decorate('apophis', decorations)
// ... existing runtime validation ...
}
```
### Type Exports
**File**: `src/types.ts`
Add to public API exports:
```typescript
export type { MultipartFile, MultipartFileConstraint } from './types.js'
export type { Serializer, SerializerRegistry } from './types.js'
```
### Error Suggestions Extension
**File**: `src/domain/error-suggestions.ts`
Add suggestions for new features:
```typescript
// After line 145 (cookie checks)
// Multipart checks
if (formula.includes('request_files')) {
return `File upload check failed. Ensure the file field name matches the schema, the file size is within limits, and the MIME type is allowed.`
}
if (formula.includes('request_fields')) {
return `Multipart field check failed. Ensure the field is present in the form data and matches the expected type.`
}
// Streaming checks
if (formula.includes('chunks')) {
return `Streaming response check failed. Verify the stream format matches the schema and chunk limits are respected.`
}
// SSE checks
if (formula.includes('events') && formula.includes('event')) {
return `SSE event check failed. Ensure the event type is allowed and the event data matches the schema.`
}
```
### Test Runner Integration
**File**: `src/test/petit-runner.ts`, lines 188-367
Update `runPetitTests` to pass serializer registry:
```typescript
export const runPetitTests = async (
fastify: FastifyInjectInstance,
config: TestConfig,
scopeRegistry?: ScopeRegistry,
// NEW: Optional serializer registry
serializerRegistry?: SerializerRegistry
): Promise<TestSuite> => {
// ... existing code ...
// Pass to executeHttp
const ctx = await executeHttp(fastify, command.route, request, previousCtx, serializerRegistry)
// ... rest of existing code ...
}
```
**File**: `src/test/stateful-runner.ts`, lines 222-464
Similar update for `runStatefulTests`.
---
## Implementation Order
1. **Phase 1: Foundation**
- Extend `src/types.ts` with all new interfaces
- Update `src/domain/contract.ts` to extract new annotations
- Add error suggestions in `src/domain/error-suggestions.ts`
2. **Phase 2: Multipart**
- Implement `buildMultipartArb` in `src/domain/schema-to-arbitrary.ts`
- Update `src/domain/request-builder.ts` for multipart
- Update `src/infrastructure/http-executor.ts` for FormData
- Add `request_files`/`request_fields` to parser and evaluator
3. **Phase 3: Custom Serializers**
- Define `SerializerRegistry` interface
- Update `src/infrastructure/http-executor.ts` for serialization
- Inject registry through plugin options
4. **Phase 4: Streaming**
- Implement stream collection in `src/infrastructure/http-executor.ts`
- Add NDJSON/chunked parsing
- Update contract extraction for streaming config
5. **Phase 5: SSE**
- Implement SSE event collection
- Add SSE-specific header handling
- Update contract extraction for SSE config
6. **Phase 6: Integration**
- Update test runners to pass new dependencies
- Add comprehensive tests for each feature
- Update plugin registration
---
## Testing Strategy
For each feature, add tests in `src/test/`:
1. **Multipart**: `multipart.test.ts`
- Test data generation produces valid multipart payloads
- HTTP executor correctly builds FormData
- Formula evaluation accesses files and fields
2. **Serializers**: `serializers.test.ts`
- Serializer registry registration
- Encode/decode roundtrip
- HTTP executor uses correct serializer
3. **Streaming**: `streaming.test.ts`
- NDJSON chunk parsing
- Stream timeout handling
- Chunk count validation
4. **SSE**: `sse.test.ts`
- SSE event parsing
- Event type validation
- Retry interval checking
---
## Backward Compatibility Summary
| Feature | Breaking Change? | Migration |
|---------|------------------|-----------|
| Multipart | No | Opt-in via `x-content-type` |
| Custom Serializers | No | Opt-in via `x-serializer` |
| Streaming | No | Opt-in via `x-streaming` |
| SSE | No | Opt-in via `x-sse` |
Routes without these annotations must preserve existing behavior. Regression tests must cover non-multipart, non-streaming JSON routes.