Documentation Index Fetch the complete documentation index at: https://mintlify.com/JanuaryLabs/deepagents/llms.txt
Use this file to discover all available pages before exploring further.
The agent framework provides powerful streaming capabilities for real-time interaction with agents. Stream text, tool calls, and structured output as they’re generated.
Basic Streaming
Use execute() (or stream()) for streaming responses:
import { openai } from '@ai-sdk/openai' ;
import { agent , execute } from '@deepagents/agent' ;
const assistant = agent ({
name: 'assistant' ,
model: openai ( 'gpt-4o' ),
prompt: 'You are a helpful assistant.' ,
});
const stream = await execute ( assistant , 'Tell me a story' , {});
// Stream text chunks
for await ( const chunk of stream . textStream ) {
process . stdout . write ( chunk );
}
Stream Types
Text Stream
Stream generated text as it’s produced:
const stream = await execute ( assistant , 'Write a poem' , {});
for await ( const chunk of stream . textStream ) {
process . stdout . write ( chunk ); // Print each chunk immediately
}
Full Stream
Access all events including tool calls:
for await ( const event of stream . fullStream ) {
switch ( event . type ) {
case 'text-delta' :
process . stdout . write ( event . textDelta );
break ;
case 'tool-call' :
console . log ( 'Tool called:' , event . toolName );
break ;
case 'tool-result' :
console . log ( 'Tool result:' , event . result );
break ;
}
}
UI Message Stream
Stream UI-compatible messages:
for await ( const chunk of stream . toUIMessageStream ()) {
if ( chunk . type === 'text-delta' ) {
console . log ( 'Text:' , chunk . delta );
}
if ( chunk . type === 'reasoning-delta' ) {
console . log ( 'Reasoning:' , chunk . delta );
}
}
Getting Final Results
Final Text
const stream = await execute ( assistant , 'What is AI?' , {});
const text = await stream . text ;
console . log ( text ); // Complete response text
const stream = await execute ( assistant , 'Hello' , {});
const usage = await stream . totalUsage ;
console . log ( usage );
// {
// promptTokens: 150,
// completionTokens: 50,
// totalTokens: 200
// }
Sources (if available)
const stream = await execute ( assistant , 'Research topic' , {});
const sources = await stream . sources ;
console . log ( sources );
// [
// { type: 'url', url: 'https://example.com', title: 'Source 1' },
// { type: 'file', filename: 'data.pdf', mediaType: 'application/pdf' }
// ]
Streaming with Structured Output
Stream partial structured output:
import { z } from 'zod' ;
const taskSchema = z . object ({
title: z . string (),
steps: z . array ( z . string ()),
estimatedTime: z . number (),
});
const planner = agent ({
name: 'planner' ,
model: openai ( 'gpt-4o' ),
prompt: 'Create a task plan.' ,
output: taskSchema ,
});
const stream = await execute ( planner , 'Plan a vacation' , {});
// Stream partial output as it's generated
for await ( const partial of stream . partialOutputStream ) {
console . log ( 'Partial:' , partial );
// { title: 'Plan a Vacation' }
// { title: 'Plan a Vacation', steps: ['Book flights'] }
// { title: 'Plan a Vacation', steps: ['Book flights', 'Reserve hotel'], ... }
}
// Get final complete output
const final = await stream . output ;
console . log ( 'Final:' , final );
Swarm Streaming
For multi-agent systems, use swarm() to handle handoffs:
import { swarm } from '@deepagents/agent' ;
const coordinator = agent ({
name: 'coordinator' ,
model: openai ( 'gpt-4o' ),
prompt: 'Coordinate tasks between specialists.' ,
handoffs: [ specialist1 , specialist2 ],
});
const stream = swarm ( coordinator , 'Complete this task' , {});
for await ( const chunk of stream ) {
if ( chunk . type === 'text-delta' ) {
process . stdout . write ( chunk . delta );
}
if ( chunk . type === 'reasoning-delta' ) {
console . log ( '[Reasoning]' , chunk . delta );
}
}
Abort Streaming
Cancel ongoing streams:
const controller = new AbortController ();
const stream = await execute ( assistant , 'Write a long story' , {}, {
abortSignal: controller . signal ,
});
// Start streaming
const streamingPromise = ( async () => {
for await ( const chunk of stream . textStream ) {
process . stdout . write ( chunk );
}
})();
// Cancel after 5 seconds
setTimeout (() => {
controller . abort ();
console . log ( ' \n Stream aborted' );
}, 5000 );
try {
await streamingPromise ;
} catch ( error ) {
if ( error . name === 'AbortError' ) {
console . log ( 'Stream was cancelled' );
}
}
Custom Stream Processing
Apply custom transformations:
import { smoothStream } from 'ai' ;
const stream = await execute ( assistant , 'Tell me a story' , {}, {
transform: smoothStream (),
});
const stream = await execute ( assistant , 'Use tools to help' , {});
for await ( const event of stream . fullStream ) {
if ( event . type === 'tool-call' ) {
console . log ( `Calling ${ event . toolName } with:` , event . args );
}
if ( event . type === 'tool-result' ) {
console . log ( `Result from ${ event . toolName } :` , event . result );
}
if ( event . type === 'text-delta' ) {
process . stdout . write ( event . textDelta );
}
}
Streaming Utilities
Last Helper
Get the last item from a stream:
import { last } from '@deepagents/agent' ;
const stream = await execute ( assistant , 'Count to 10' , {});
// Get last chunk
const lastChunk = await last ( stream . fullStream );
console . log ( 'Last event:' , lastChunk );
Printer Utility
Print streams to stdout:
import { printer } from '@deepagents/agent' ;
const stream = await execute ( assistant , 'Tell me a story' , {});
// Print to stdout with formatting
await printer . stdout ( stream , {
reasoning: true , // Include reasoning
text: true , // Include text
wrapInTags: true , // Wrap in XML-style tags
});
Real-World Example
import { openai } from '@ai-sdk/openai' ;
import { agent , execute } from '@deepagents/agent' ;
import { tool } from 'ai' ;
import { z } from 'zod' ;
const searchTool = tool ({
description: 'Search for information' ,
parameters: z . object ({
query: z . string (),
}),
execute : async ({ query }) => {
// Simulate search
return `Results for: ${ query } ` ;
},
});
const researcher = agent ({
name: 'researcher' ,
model: openai ( 'gpt-4o' ),
prompt: 'Research topics and provide detailed information.' ,
tools: { search: searchTool },
});
const stream = await execute (
researcher ,
'Research the history of TypeScript' ,
{}
);
console . log ( 'Research starting... \n ' );
// Track tool calls
let toolCallCount = 0 ;
for await ( const event of stream . fullStream ) {
switch ( event . type ) {
case 'text-delta' :
process . stdout . write ( event . textDelta );
break ;
case 'tool-call' :
toolCallCount ++ ;
console . log ( ` \n [Tool Call # ${ toolCallCount } ]: ${ event . toolName } ` );
console . log ( `[Args]: ${ JSON . stringify ( event . args ) } ` );
break ;
case 'tool-result' :
console . log ( `[Result]: ${ event . result } \n ` );
break ;
}
}
console . log ( ' \n\n Research complete!' );
const usage = await stream . totalUsage ;
console . log ( ' \n Token usage:' );
console . log ( ` Prompt: ${ usage . promptTokens } ` );
console . log ( ` Completion: ${ usage . completionTokens } ` );
console . log ( ` Total: ${ usage . totalTokens } ` );
Streaming to Files
Write streams directly to files:
import { createWriteStream } from 'fs' ;
import { Readable } from 'stream' ;
const stream = await execute ( assistant , 'Write a long article' , {});
const writeStream = createWriteStream ( 'output.md' );
Readable . fromWeb ( stream . textStream as any ). pipe ( writeStream );
await new Promise (( resolve , reject ) => {
writeStream . on ( 'finish' , resolve );
writeStream . on ( 'error' , reject );
});
console . log ( 'Article written to output.md' );
Error Handling
Handle errors in streams:
try {
const stream = await execute ( assistant , 'Help me' , {});
for await ( const chunk of stream . textStream ) {
process . stdout . write ( chunk );
}
const text = await stream . text ;
console . log ( ' \n Completed:' , text . length , 'characters' );
} catch ( error ) {
if ( error . name === 'AbortError' ) {
console . error ( 'Stream was aborted' );
} else if ( error . message . includes ( 'rate limit' )) {
console . error ( 'Rate limit exceeded' );
} else {
console . error ( 'Stream error:' , error . message );
}
}
Best Practices
Always consume streams completely to avoid memory leaks: // ✅ Good
for await ( const chunk of stream . textStream ) {
process . stdout . write ( chunk );
}
// ❌ Incomplete consumption
const firstChunk = await stream . textStream [ Symbol . asyncIterator ](). next ();
// Stream not fully consumed!
Wrap stream processing in try-catch: try {
for await ( const chunk of stream . textStream ) {
process . stdout . write ( chunk );
}
} catch ( error ) {
console . error ( 'Stream error:' , error );
}
Provide abort signals for long-running streams: const controller = new AbortController ();
const stream = await execute ( agent , message , {}, {
abortSignal: controller . signal
});
Track token usage to manage costs: const usage = await stream . totalUsage ;
console . log ( 'Tokens used:' , usage . totalTokens );
Next Steps
API Reference Full API documentation
Execution Functions execute(), generate(), swarm() API