Skip to main content

Overview

The stream persistence system provides durable storage for streaming AI responses with support for:
  • SQLite-backed storage for streams and chunks
  • Resume interrupted streams
  • Watch streams from other processes
  • Cancel running streams
  • Cleanup completed streams
Stream persistence is server-only and requires Node.js. It is not available in browser environments.

Core Components

SqliteStreamStore

SQLite-backed stream and chunk storage

StreamManager

High-level API for stream lifecycle management

StreamStore

Abstract base class for custom storage backends

persistedWriter

Low-level writer wrapper for stream persistence

Quick Start

import { SqliteStreamStore, StreamManager } from '@deepagents/context';

// Create a SQLite store
const store = new SqliteStreamStore('./streams.db');

// Create a stream manager
const manager = new StreamManager({ store });

// Register a new stream
const { stream, created } = await manager.register('stream-123');

// Persist a stream
await manager.persist(myReadableStream, 'stream-123', {
  strategy: 'interval',
  flushSize: 10,
});

// Watch a stream from another process
const watchStream = manager.watch('stream-123');
for await (const chunk of watchStream) {
  console.log(chunk);
}

// Cleanup when done
await manager.cleanup('stream-123');
store.close();

SqliteStreamStore

SQLite-backed storage for streams and chunks.

Constructor

class SqliteStreamStore extends StreamStore {
  constructor(pathOrDb: string | DatabaseSync)
}
import { SqliteStreamStore } from '@deepagents/context';

// Create or open database file
const store = new SqliteStreamStore('./data/streams.db');

Methods

Create a new stream entry:
await store.createStream({
  id: 'stream-1',
  status: 'queued',
  createdAt: Date.now(),
  startedAt: null,
  finishedAt: null,
  cancelRequestedAt: null,
  error: null,
});
Retrieve a stream by ID:
const stream = await store.getStream('stream-1');
if (stream) {
  console.log(stream.status); // 'queued' | 'running' | 'completed' | 'failed' | 'cancelled'
}
Update stream status:
await store.updateStreamStatus('stream-1', 'running');
await store.updateStreamStatus('stream-1', 'completed');
await store.updateStreamStatus('stream-1', 'failed', { error: 'Connection lost' });
Append chunks to a stream:
await store.appendChunks([
  { streamId: 'stream-1', seq: 0, data: { type: 'text-delta', text: 'Hello' }, createdAt: Date.now() },
  { streamId: 'stream-1', seq: 1, data: { type: 'text-delta', text: ' world' }, createdAt: Date.now() },
]);
Retrieve chunks with pagination:
// Get first 100 chunks
const chunks = await store.getChunks('stream-1', 0, 100);

// Get next 100 chunks
const nextChunks = await store.getChunks('stream-1', 100, 100);
Delete a stream and all its chunks:
await store.deleteStream('stream-1');
Reopen a terminal stream for retry:
const stream = await store.reopenStream('stream-1');
console.log(stream.status); // 'queued'
Only completed, failed, or cancelled streams can be reopened.
Close the database connection:
store.close(); // Idempotent

StreamManager

High-level API for managing stream lifecycle.

Constructor

interface StreamManagerOptions {
  store: StreamStore;
  watchPolling?: WatchStreamOptions;
  cancelPolling?: PersistCancelPollingOptions;
  onPollingEvent?: (event: StreamPollingTelemetryEvent) => void;
}

const manager = new StreamManager({
  store,
  watchPolling: {
    minMs: 25,
    maxMs: 500,
    multiplier: 2,
    jitterRatio: 0.15,
    statusCheckEvery: 3,
    chunkPageSize: 128,
  },
  cancelPolling: {
    minMs: 50,
    maxMs: 500,
    multiplier: 2,
    jitterRatio: 0.15,
  },
});

register()

Register or retrieve an existing stream:
const { stream, created } = await manager.register('stream-1');

if (created) {
  console.log('New stream created');
} else {
  console.log('Stream already exists');
}

persist()

Persist a ReadableStream with automatic chunking and cancellation detection:
import { createOpenAI } from '@ai-sdk/openai';
import { streamText } from 'ai';

const openai = createOpenAI({ apiKey: process.env.OPENAI_API_KEY });

const result = streamText({
  model: openai('gpt-4'),
  prompt: 'Write a story',
});

await manager.persist(result.fullStream, 'stream-1', {
  strategy: 'interval',  // or 'buffered'
  flushSize: 10,         // Flush every 10 chunks
  cancelPolling: {
    minMs: 100,
    maxMs: 1000,
  },
  onCancelDetected: async ({ streamId, latencyMs }) => {
    console.log(`Stream ${streamId} cancelled (latency: ${latencyMs}ms)`);
  },
});
Flush chunks at regular intervals:
await manager.persist(stream, 'stream-1', {
  strategy: 'interval',
  flushSize: 10,  // Flush every 10 chunks
});

watch()

Watch a stream’s chunks in real-time with adaptive polling:
const watchStream = manager.watch('stream-1', {
  minMs: 25,           // Start polling every 25ms
  maxMs: 500,          // Max polling interval 500ms
  multiplier: 2,       // Double delay on empty polls
  jitterRatio: 0.15,   // Add 15% jitter
  statusCheckEvery: 3, // Check status every 3 polls
  chunkPageSize: 128,  // Fetch 128 chunks per poll
});

for await (const chunk of watchStream) {
  console.log(chunk);
}
The watch stream automatically closes when:
  • The stream reaches a terminal status (completed/failed/cancelled)
  • The stream is deleted

cancel()

Request cancellation of a running stream:
await manager.cancel('stream-1');

reopen()

Reopen a terminal stream for retry:
try {
  await manager.persist(stream, 'stream-1');
} catch (error) {
  console.error('Stream failed, reopening...');
  await manager.reopen('stream-1');
  await manager.persist(stream, 'stream-1');
}

cleanup()

Delete a stream and all its chunks:
await manager.cleanup('stream-1');

Polling Configuration

Adaptive Polling

Both watch and persist operations use adaptive polling with exponential backoff:
interface PollingConfig {
  minMs: number;        // Minimum delay between polls
  maxMs: number;        // Maximum delay between polls
  multiplier: number;   // Delay multiplier on empty polls
  jitterRatio: number;  // Random jitter ratio (0-1)
}
How it works:
  1. Start with minMs delay
  2. On empty poll, multiply delay by multiplier
  3. Cap at maxMs
  4. Add random jitter: delay * (1 ± jitterRatio)
  5. On non-empty poll, reset to minMs

Default Configurations

const DEFAULT_WATCH_POLLING = {
  minMs: 25,
  maxMs: 500,
  multiplier: 2,
  jitterRatio: 0.15,
  statusCheckEvery: 3,  // Check status every 3 polls
  chunkPageSize: 128,   // Fetch 128 chunks per page
};

Telemetry

Monitor polling behavior with telemetry events:
const manager = new StreamManager({
  store,
  onPollingEvent: (event) => {
    switch (event.type) {
      case 'watch:poll':
        console.log(`Polled from seq ${event.fromSeq}, got ${event.chunkCount} chunks`);
        break;
      case 'watch:empty':
        console.log(`Empty poll, waiting ${event.delayMs}ms`);
        break;
      case 'watch:chunks':
        console.log(`Delivered ${event.delivered} chunks, last seq ${event.lastSeq}`);
        break;
      case 'watch:closed':
        console.log(`Stream closed: ${event.reason}`);
        break;
      case 'persist:cancel-poll':
        console.log(`Cancel poll: status=${event.status}, delay=${event.delayMs}ms`);
        break;
      case 'persist:cancel-detected':
        console.log(`Cancel detected! Latency: ${event.latencyMs}ms`);
        break;
    }
  },
});

Stream Status

Streams progress through these statuses:
type StreamStatus = 
  | 'queued'      // Registered but not started
  | 'running'     // Currently streaming
  | 'completed'   // Successfully finished
  | 'failed'      // Error occurred
  | 'cancelled';  // Cancelled by request

Status Transitions

1

queued

Initial state after register()
2

running

Transitions when persist() starts
3

Terminal States

  • completed: Stream finished successfully
  • failed: Error occurred (includes error message)
  • cancelled: User called cancel()

Use Cases

Persist long-running AI responses for reliability:
// Start generation
const { stream } = await manager.register('report-gen-1');

// Persist in background
manager.persist(generationStream, 'report-gen-1')
  .catch(async (error) => {
    console.error('Generation failed:', error);
    // Retry
    await manager.reopen('report-gen-1');
    await manager.persist(retryStream, 'report-gen-1');
  });

// Watch from UI
const watchStream = manager.watch('report-gen-1');
for await (const chunk of watchStream) {
  updateUI(chunk);
}

Best Practices

Clean up database connections:
const store = new SqliteStreamStore('./streams.db');
try {
  // Use store
} finally {
  store.close();
}
Generate unique IDs to avoid conflicts:
import { generateId } from 'ai';

const streamId = `task-${generateId()}`;
await manager.persist(stream, streamId);
Remove old streams to prevent database growth:
// After successful completion
await manager.cleanup(streamId);

// Or implement periodic cleanup
setInterval(async () => {
  const oldStreams = await findCompletedStreamsOlderThan(7 * 24 * 60 * 60 * 1000);
  for (const id of oldStreams) {
    await manager.cleanup(id);
  }
}, 24 * 60 * 60 * 1000); // Daily
Always handle cancellation in persist:
await manager.persist(stream, streamId, {
  onCancelDetected: async ({ streamId, latencyMs }) => {
    console.log(`Cancelled: ${streamId} (${latencyMs}ms latency)`);
    // Clean up resources
    await releaseResources();
  },
});

Next Steps

Messages

Learn about message fragments and reminders

Fragment Builders

Complete fragment builder API reference

StreamStore API

Complete StreamStore API reference

Renderers

Transform fragments into different formats