Skip to main content

Overview

The stream persistence API provides durable storage for streaming AI responses with SQLite backend and high-level lifecycle management.
Stream persistence is server-only and requires Node.js. Not available in browser environments.

StreamStore (Abstract)

Base class for stream storage implementations.
abstract class StreamStore {
  abstract createStream(stream: StreamData): Promise<void>;
  
  abstract upsertStream(
    stream: StreamData
  ): Promise<{ stream: StreamData; created: boolean }>;
  
  abstract getStream(streamId: string): Promise<StreamData | undefined>;
  
  abstract getStreamStatus(
    streamId: string
  ): Promise<StreamStatus | undefined>;
  
  abstract updateStreamStatus(
    streamId: string,
    status: StreamStatus,
    options?: { error?: string }
  ): Promise<void>;
  
  abstract appendChunks(chunks: StreamChunkData[]): Promise<void>;
  
  abstract getChunks(
    streamId: string,
    fromSeq?: number,
    limit?: number
  ): Promise<StreamChunkData[]>;
  
  abstract deleteStream(streamId: string): Promise<void>;
  
  abstract reopenStream(streamId: string): Promise<StreamData>;
}

Types

StreamStatus

type StreamStatus =
  | 'queued'      // Registered but not started
  | 'running'     // Currently streaming
  | 'completed'   // Successfully finished
  | 'failed'      // Error occurred
  | 'cancelled';  // Cancelled by request

StreamData

interface StreamData {
  id: string;                       // Unique stream identifier
  status: StreamStatus;             // Current status
  createdAt: number;                // Unix timestamp (ms)
  startedAt: number | null;         // When streaming began
  finishedAt: number | null;        // When streaming ended
  cancelRequestedAt: number | null; // When cancel was requested
  error: string | null;             // Error message if failed
}

StreamChunkData

interface StreamChunkData {
  streamId: string;  // Stream identifier
  seq: number;       // Sequential chunk number (0-based)
  data: unknown;     // Chunk content (JSON-serializable)
  createdAt: number; // Unix timestamp (ms)
}

SqliteStreamStore

SQLite-backed implementation of StreamStore.

Constructor

class SqliteStreamStore extends StreamStore {
  constructor(pathOrDb: string | DatabaseSync)
}
Parameters:
  • pathOrDb - File path to SQLite database or existing DatabaseSync instance
Example:
import { SqliteStreamStore } from '@deepagents/context';

// File-based
const store = new SqliteStreamStore('./data/streams.db');

// In-memory (testing)
const memStore = new SqliteStreamStore(':memory:');

// Existing connection
import { DatabaseSync } from 'node:sqlite';
const db = new DatabaseSync('./streams.db');
const store = new SqliteStreamStore(db);

createStream()

Create a new stream entry.
await createStream(stream: StreamData): Promise<void>
Parameters:
  • stream - Stream metadata
Throws:
  • Error if stream ID already exists
Example:
await store.createStream({
  id: 'stream-1',
  status: 'queued',
  createdAt: Date.now(),
  startedAt: null,
  finishedAt: null,
  cancelRequestedAt: null,
  error: null,
});

upsertStream()

Create or retrieve existing stream.
await upsertStream(
  stream: StreamData
): Promise<{ stream: StreamData; created: boolean }>
Parameters:
  • stream - Stream metadata
Returns:
  • stream - The stream (new or existing)
  • created - True if newly created, false if already existed
Example:
const { stream, created } = await store.upsertStream({
  id: 'stream-1',
  status: 'queued',
  createdAt: Date.now(),
  startedAt: null,
  finishedAt: null,
  cancelRequestedAt: null,
  error: null,
});

if (created) {
  console.log('New stream created');
} else {
  console.log('Stream already exists:', stream.status);
}

getStream()

Retrieve stream by ID.
await getStream(streamId: string): Promise<StreamData | undefined>
Parameters:
  • streamId - Stream identifier
Returns:
  • Stream data or undefined if not found
Example:
const stream = await store.getStream('stream-1');
if (stream) {
  console.log(`Status: ${stream.status}`);
  console.log(`Created: ${new Date(stream.createdAt)}`);
}

getStreamStatus()

Retrieve only the stream status.
await getStreamStatus(streamId: string): Promise<StreamStatus | undefined>
Parameters:
  • streamId - Stream identifier
Returns:
  • Stream status or undefined if not found
Example:
const status = await store.getStreamStatus('stream-1');
if (status === 'completed') {
  console.log('Stream finished successfully');
}

updateStreamStatus()

Update stream status with automatic timestamp management.
await updateStreamStatus(
  streamId: string,
  status: StreamStatus,
  options?: { error?: string }
): Promise<void>
Parameters:
  • streamId - Stream identifier
  • status - New status
  • options - Additional options
    • error - Error message (for ‘failed’ status)
Behavior:
  • 'running' - Sets startedAt to current time
  • 'completed' - Sets finishedAt to current time
  • 'failed' - Sets finishedAt and error
  • 'cancelled' - Sets cancelRequestedAt and finishedAt
Example:
// Start streaming
await store.updateStreamStatus('stream-1', 'running');

// Complete successfully
await store.updateStreamStatus('stream-1', 'completed');

// Fail with error
await store.updateStreamStatus('stream-1', 'failed', {
  error: 'Connection timeout',
});

// Cancel
await store.updateStreamStatus('stream-1', 'cancelled');

appendChunks()

Append chunks to a stream (atomic batch operation).
await appendChunks(chunks: StreamChunkData[]): Promise<void>
Parameters:
  • chunks - Array of chunks to append
Behavior:
  • Uses transaction for atomicity
  • All chunks succeed or all fail
  • Chunks serialized as JSON
Example:
await store.appendChunks([
  {
    streamId: 'stream-1',
    seq: 0,
    data: { type: 'text-delta', text: 'Hello' },
    createdAt: Date.now(),
  },
  {
    streamId: 'stream-1',
    seq: 1,
    data: { type: 'text-delta', text: ' world' },
    createdAt: Date.now(),
  },
]);

getChunks()

Retrieve chunks with pagination.
await getChunks(
  streamId: string,
  fromSeq?: number,
  limit?: number
): Promise<StreamChunkData[]>
Parameters:
  • streamId - Stream identifier
  • fromSeq - (Optional) Start from this sequence number (inclusive)
  • limit - (Optional) Maximum number of chunks to return
Returns:
  • Array of chunks ordered by sequence number
Example:
// Get first 100 chunks
const chunks = await store.getChunks('stream-1', 0, 100);

// Get next 100 chunks
const nextChunks = await store.getChunks('stream-1', 100, 100);

// Get all chunks from sequence 50
const remaining = await store.getChunks('stream-1', 50);

// Get all chunks
const allChunks = await store.getChunks('stream-1');

deleteStream()

Delete a stream and all its chunks.
await deleteStream(streamId: string): Promise<void>
Parameters:
  • streamId - Stream identifier
Behavior:
  • Deletes stream metadata
  • Deletes all associated chunks (cascade)
Example:
await store.deleteStream('stream-1');

reopenStream()

Reopen a terminal stream for retry.
await reopenStream(streamId: string): Promise<StreamData>
Parameters:
  • streamId - Stream identifier
Returns:
  • New stream data with ‘queued’ status
Throws:
  • Error if stream not found
  • Error if stream not in terminal state
Behavior:
  • Only works on ‘completed’, ‘failed’, or ‘cancelled’ streams
  • Deletes existing stream and chunks
  • Creates new stream with same ID
  • Resets all timestamps
Example:
try {
  await manager.persist(stream, 'stream-1');
} catch (error) {
  console.error('Stream failed, reopening...');
  const newStream = await store.reopenStream('stream-1');
  console.log('Reopened:', newStream.status); // 'queued'
}

close()

Close the database connection.
close(): void
Behavior:
  • Closes SQLite connection
  • Idempotent (safe to call multiple times)
  • Clears prepared statement cache
Example:
const store = new SqliteStreamStore('./streams.db');
try {
  // Use store
} finally {
  store.close();
}

StreamManager

High-level API for managing stream lifecycle.

Constructor

interface StreamManagerOptions {
  store: StreamStore;
  watchPolling?: WatchStreamOptions;
  cancelPolling?: PersistCancelPollingOptions;
  onPollingEvent?: (event: StreamPollingTelemetryEvent) => void;
}

class StreamManager {
  constructor(options: StreamManagerOptions)
}
Parameters:
  • store - Stream store implementation
  • watchPolling - (Optional) Watch polling configuration
  • cancelPolling - (Optional) Cancel polling configuration
  • onPollingEvent - (Optional) Telemetry callback
Example:
import { SqliteStreamStore, StreamManager } from '@deepagents/context';

const store = new SqliteStreamStore('./streams.db');
const manager = new StreamManager({
  store,
  watchPolling: {
    minMs: 25,
    maxMs: 500,
    multiplier: 2,
    jitterRatio: 0.15,
    statusCheckEvery: 3,
    chunkPageSize: 128,
  },
  cancelPolling: {
    minMs: 50,
    maxMs: 500,
    multiplier: 2,
    jitterRatio: 0.15,
  },
  onPollingEvent: (event) => {
    console.log('Polling event:', event);
  },
});

store (property)

Access the underlying store.
get store(): StreamStore
Example:
const stream = await manager.store.getStream('stream-1');

register()

Register or retrieve an existing stream.
await register(
  streamId: string
): Promise<{ stream: StreamData; created: boolean }>
Parameters:
  • streamId - Stream identifier
Returns:
  • stream - Stream data
  • created - True if newly created
Example:
const { stream, created } = await manager.register('stream-1');
if (created) {
  console.log('New stream registered');
}

persist()

Persist a ReadableStream with automatic chunking and cancellation detection.
await persist(
  stream: ReadableStream,
  streamId: string,
  options?: PersistStreamOptions
): Promise<{ streamId: string }>
Parameters:
  • stream - ReadableStream to persist
  • streamId - Stream identifier
  • options - (Optional) Persistence options
Options:
interface PersistStreamOptions {
  strategy?: 'interval' | 'buffered';  // Flush strategy
  flushSize?: number;                   // Chunks per flush (interval mode)
  cancelPolling?: PersistCancelPollingOptions;
  onCancelDetected?: (info: {
    streamId: string;
    latencyMs: number | null;
  }) => void | Promise<void>;
}
Returns:
  • streamId - The stream identifier
Behavior:
  • Updates status to ‘running’
  • Polls for cancellation in background
  • Persists chunks according to strategy
  • Updates status to ‘completed’ or ‘failed’
  • Aborts on cancellation
Example:
import { streamText } from 'ai';
import { createOpenAI } from '@ai-sdk/openai';

const openai = createOpenAI({ apiKey: process.env.OPENAI_API_KEY });
const result = streamText({
  model: openai('gpt-4'),
  prompt: 'Write a story',
});

try {
  await manager.persist(result.fullStream, 'story-1', {
    strategy: 'interval',
    flushSize: 10,
    onCancelDetected: async ({ streamId, latencyMs }) => {
      console.log(`Cancelled: ${streamId} (${latencyMs}ms latency)`);
      await cleanup(streamId);
    },
  });
  console.log('Stream completed');
} catch (error) {
  console.error('Stream failed:', error);
}

watch()

Watch a stream’s chunks in real-time.
watch(
  streamId: string,
  options?: WatchStreamOptions
): ReadableStream<StreamPart>
Parameters:
  • streamId - Stream identifier
  • options - (Optional) Watch options
Options:
interface WatchStreamOptions {
  minMs?: number;           // Min polling delay (default: 25)
  maxMs?: number;           // Max polling delay (default: 500)
  multiplier?: number;      // Delay multiplier (default: 2)
  jitterRatio?: number;     // Jitter ratio (default: 0.15)
  statusCheckEvery?: number; // Status check frequency (default: 3)
  chunkPageSize?: number;   // Chunks per page (default: 128)
}
Returns:
  • ReadableStream of stream chunks
Behavior:
  • Adaptive polling with exponential backoff
  • Automatically closes on terminal status
  • Resets polling delay on new chunks
Example:
const watchStream = manager.watch('stream-1', {
  minMs: 25,
  maxMs: 500,
  chunkPageSize: 128,
});

for await (const chunk of watchStream) {
  console.log('Chunk:', chunk);
}

console.log('Stream finished');

cancel()

Request cancellation of a running stream.
await cancel(streamId: string): Promise<void>
Parameters:
  • streamId - Stream identifier
Behavior:
  • Updates status to ‘cancelled’
  • Sets cancelRequestedAt timestamp
  • Persist operation detects and aborts
Example:
// Start operation
const persistPromise = manager.persist(stream, 'long-task');

// User clicks cancel
await manager.cancel('long-task');

// Persist detects cancellation and stops
await persistPromise;

reopen()

Reopen a terminal stream for retry.
await reopen(
  streamId: string
): Promise<{ stream: StreamData; created: boolean }>
Parameters:
  • streamId - Stream identifier
Returns:
  • stream - New stream data
  • created - Always true
Throws:
  • Error if stream not in terminal state
Example:
try {
  await manager.persist(stream, 'task-1');
} catch (error) {
  console.error('Failed, reopening...');
  await manager.reopen('task-1');
  await manager.persist(retryStream, 'task-1');
}

cleanup()

Delete a stream and all its chunks.
await cleanup(streamId: string): Promise<void>
Parameters:
  • streamId - Stream identifier
Example:
// After successful completion
await manager.cleanup('stream-1');

Polling Configuration

WatchPollingConfig

interface WatchPollingConfig {
  minMs: number;           // Minimum delay between polls
  maxMs: number;           // Maximum delay between polls
  multiplier: number;      // Delay multiplier on empty polls
  jitterRatio: number;     // Random jitter ratio (0-1)
  statusCheckEvery: number; // Check status every N polls
  chunkPageSize: number;   // Chunks to fetch per poll
}
Default:
const DEFAULT_WATCH_POLLING = {
  minMs: 25,
  maxMs: 500,
  multiplier: 2,
  jitterRatio: 0.15,
  statusCheckEvery: 3,
  chunkPageSize: 128,
};

CancelPollingConfig

interface CancelPollingConfig {
  minMs: number;        // Minimum delay between polls
  maxMs: number;        // Maximum delay between polls
  multiplier: number;   // Delay multiplier on empty polls
  jitterRatio: number;  // Random jitter ratio (0-1)
}
Default:
const DEFAULT_CANCEL_POLLING = {
  minMs: 50,
  maxMs: 500,
  multiplier: 2,
  jitterRatio: 0.15,
};

Telemetry Events

StreamPollingTelemetryEvent

type StreamPollingTelemetryEvent =
  | {
      type: 'watch:poll';
      streamId: string;
      fromSeq: number;
      chunkCount: number;
      statusChecked: boolean;
    }
  | {
      type: 'watch:empty';
      streamId: string;
      fromSeq: number;
      delayMs: number;
    }
  | {
      type: 'watch:chunks';
      streamId: string;
      delivered: number;
      lastSeq: number;
    }
  | {
      type: 'watch:closed';
      streamId: string;
      reason: 'terminal' | 'missing';
    }
  | {
      type: 'persist:cancel-poll';
      streamId: string;
      delayMs: number;
      status: StreamStatus | 'missing';
    }
  | {
      type: 'persist:cancel-detected';
      streamId: string;
      latencyMs: number | null;
    };
Example:
const manager = new StreamManager({
  store,
  onPollingEvent: (event) => {
    switch (event.type) {
      case 'watch:poll':
        console.log(
          `Polled ${event.chunkCount} chunks from seq ${event.fromSeq}`
        );
        break;
      case 'watch:empty':
        console.log(`Empty poll, waiting ${event.delayMs}ms`);
        break;
      case 'watch:chunks':
        console.log(
          `Delivered ${event.delivered} chunks, last=${event.lastSeq}`
        );
        break;
      case 'watch:closed':
        console.log(`Stream closed: ${event.reason}`);
        break;
      case 'persist:cancel-detected':
        console.log(`Cancel detected! Latency: ${event.latencyMs}ms`);
        break;
    }
  },
});

Best Practices

const store = new SqliteStreamStore('./streams.db');
try {
  const manager = new StreamManager({ store });
  // Use manager
} finally {
  store.close();
}
import { generateId } from 'ai';

const streamId = `task-${generateId()}`;
await manager.persist(stream, streamId);
// After success
await manager.cleanup(streamId);

// Or periodic cleanup
setInterval(async () => {
  const oldStreams = await findOldCompletedStreams();
  for (const id of oldStreams) {
    await manager.cleanup(id);
  }
}, 24 * 60 * 60 * 1000);
await manager.persist(stream, streamId, {
  onCancelDetected: async ({ streamId, latencyMs }) => {
    console.log(`Cancelled: ${streamId}`);
    await releaseResources();
  },
});

Next Steps

Stream Persistence Guide

Learn about using stream persistence

Fragment Builders

Complete fragment builder API reference

Renderers API

Complete renderer API reference

Messages

Learn about message fragments