Documentation Index
Fetch the complete documentation index at: https://mintlify.com/JanuaryLabs/deepagents/llms.txt
Use this file to discover all available pages before exploring further.
Why Streaming?
Streaming enables:
- Real-time user feedback
- Progressive rendering
- Lower perceived latency
- Interruptible generation
- Tool call visibility
DeepAgents provides multiple streaming interfaces for different use cases.
Basic Streaming
Text Stream
Stream text chunks as they’re generated:
import { agent, execute } from '@deepagents/agent';
import { openai } from '@ai-sdk/openai';
const assistant = agent({
name: 'assistant',
model: openai('gpt-4o'),
prompt: 'You are helpful.',
});
const stream = execute(assistant, 'Tell me about AI agents', {});
for await (const chunk of stream.textStream) {
process.stdout.write(chunk);
}
Full Stream
Access all stream events (text, tool calls, etc.):
const stream = execute(assistant, message, context);
for await (const chunk of stream.fullStream) {
switch (chunk.type) {
case 'text-delta':
process.stdout.write(chunk.textDelta);
break;
case 'tool-call':
console.log('\nCalling tool:', chunk.toolName);
break;
case 'tool-result':
console.log('Tool result:', chunk.result);
break;
case 'finish':
console.log('\nFinished:', chunk.finishReason);
break;
}
}
Stream Results
The execute() function returns a stream result object:
interface StreamResult {
textStream: AsyncIterable<string>; // Text chunks only
fullStream: AsyncIterable<StreamChunk>; // All events
text: Promise<string>; // Complete text
usage: Promise<TokenUsage>; // Token counts
finishReason: Promise<FinishReason>; // Why it stopped
rawResponse: Promise<RawResponse>; // Raw model response
}
Getting Complete Text
Wait for the full response:
const stream = execute(assistant, message, context);
const completeText = await stream.text;
console.log(completeText);
Getting Usage Info
const stream = execute(assistant, message, context);
const usage = await stream.usage;
console.log('Prompt tokens:', usage.promptTokens);
console.log('Completion tokens:', usage.completionTokens);
console.log('Total tokens:', usage.totalTokens);
Finish Reason
const stream = execute(assistant, message, context);
const reason = await stream.finishReason;
switch (reason) {
case 'stop':
console.log('Completed naturally');
break;
case 'length':
console.log('Hit token limit');
break;
case 'content-filter':
console.log('Blocked by content filter');
break;
case 'tool-calls':
console.log('Finished with tool calls');
break;
}
Stream Types
Different chunk types in the full stream:
type StreamChunk =
| { type: 'text-delta'; textDelta: string }
| { type: 'tool-call'; toolCallId: string; toolName: string; args: any }
| { type: 'tool-result'; toolCallId: string; result: any }
| { type: 'tool-call-delta'; toolCallId: string; argsTextDelta: string }
| { type: 'finish'; finishReason: FinishReason; usage: TokenUsage }
| { type: 'error'; error: Error };
Swarm Streaming
The swarm() function provides high-level streaming for multi-agent systems:
import { swarm } from '@deepagents/agent';
const stream = swarm(
coordinator,
'Complete this task',
context,
abortSignal
);
for await (const chunk of stream.textStream) {
process.stdout.write(chunk);
}
Swarm Stream Result
interface SwarmResult {
textStream: AsyncIterable<string>;
text: Promise<string>;
messages: Promise<UIMessage[]>;
agent: Promise<Agent>; // Final agent that completed
}
Getting Messages
const stream = swarm(coordinator, message, context);
const messages = await stream.messages;
for (const msg of messages) {
console.log(msg.role, ':', msg.parts);
}
Abort Signals
Cancel streaming execution:
const controller = new AbortController();
const stream = swarm(
agent,
message,
context,
controller.signal
);
// Cancel after 10 seconds
setTimeout(() => controller.abort(), 10000);
try {
for await (const chunk of stream.textStream) {
process.stdout.write(chunk);
}
} catch (error) {
if (error.name === 'AbortError') {
console.log('\nCancelled by user');
}
}
Progress Tracking
Track tool calls and agent transitions:
const stream = execute(agent, message, context);
let toolCallCount = 0;
const tools: string[] = [];
for await (const chunk of stream.fullStream) {
if (chunk.type === 'text-delta') {
process.stdout.write(chunk.textDelta);
} else if (chunk.type === 'tool-call') {
toolCallCount++;
tools.push(chunk.toolName);
console.log(`\n[Tool ${toolCallCount}] ${chunk.toolName}`);
} else if (chunk.type === 'tool-result') {
console.log(`[Result] ${JSON.stringify(chunk.result).slice(0, 50)}...`);
}
}
console.log(`\n\nUsed ${toolCallCount} tools: ${tools.join(', ')}`);
Buffering Strategies
Character Buffering
Buffer chunks for smoother output:
const stream = execute(agent, message, context);
let buffer = '';
const BUFFER_SIZE = 10;
for await (const chunk of stream.textStream) {
buffer += chunk;
if (buffer.length >= BUFFER_SIZE) {
process.stdout.write(buffer);
buffer = '';
}
}
if (buffer) {
process.stdout.write(buffer);
}
Line Buffering
Wait for complete lines:
const stream = execute(agent, message, context);
let buffer = '';
for await (const chunk of stream.textStream) {
buffer += chunk;
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
console.log(line);
}
}
if (buffer) {
console.log(buffer);
}
Time Buffering
Flush periodically:
const stream = execute(agent, message, context);
let buffer = '';
let lastFlush = Date.now();
const FLUSH_INTERVAL = 100; // ms
for await (const chunk of stream.textStream) {
buffer += chunk;
const now = Date.now();
if (now - lastFlush >= FLUSH_INTERVAL) {
process.stdout.write(buffer);
buffer = '';
lastFlush = now;
}
}
if (buffer) {
process.stdout.write(buffer);
}
UI Integration
React Integration
import { useState, useEffect } from 'react';
function ChatComponent() {
const [text, setText] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const handleSend = async (message: string) => {
setIsStreaming(true);
setText('');
const stream = execute(agent, message, context);
for await (const chunk of stream.textStream) {
setText(prev => prev + chunk);
}
setIsStreaming(false);
};
return (
<div>
<div>{text}</div>
{isStreaming && <span>...</span>}
<button onClick={() => handleSend('Hello')}>Send</button>
</div>
);
}
Server-Sent Events (SSE)
import { Readable } from 'stream';
app.get('/stream', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
const stream = execute(agent, req.query.message, context);
for await (const chunk of stream.textStream) {
res.write(`data: ${JSON.stringify({ text: chunk })}\n\n`);
}
res.write('data: [DONE]\n\n');
res.end();
});
WebSocket Streaming
import WebSocket from 'ws';
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws) => {
ws.on('message', async (message) => {
const stream = execute(agent, message.toString(), context);
for await (const chunk of stream.fullStream) {
ws.send(JSON.stringify(chunk));
}
ws.send(JSON.stringify({ type: 'done' }));
});
});
Error Handling
Stream Errors
const stream = execute(agent, message, context);
try {
for await (const chunk of stream.textStream) {
process.stdout.write(chunk);
}
} catch (error) {
console.error('Stream error:', error);
// Handle gracefully
}
Partial Results
Access partial text on error:
const stream = execute(agent, message, context);
let partialText = '';
try {
for await (const chunk of stream.textStream) {
partialText += chunk;
process.stdout.write(chunk);
}
} catch (error) {
console.log('\n\nPartial result:', partialText);
}
Chunk Size
Larger chunks = fewer events, less overhead
Buffer Wisely
Balance smoothness vs latency
Async Iterators
Use for await...of for backpressure
Memory
Clear old chunks from memory
Best Practices
Error Boundaries
Always wrap streams in try-catch
Abort Signals
Provide cancellation for long tasks
Progress Indicators
Show tool calls and status updates
Graceful Degradation
Fall back to complete text on errors
Next Steps
Agent Package
Full API reference
Examples
See streaming examples