Input Streams require SDK version 4.4.2 or later and Realtime Streams v2 (enabled by default in SDK 4.1.0+).
Overview
Input Streams solve three common problems:-
Cancelling AI streams mid-generation. When you use AI SDK’s
streamTextinside a task, the LLM keeps generating tokens until it’s done — even if the user navigated away or clicked “Stop generating.” With input streams, your frontend sends a cancel signal and the task aborts the LLM call immediately. - Human-in-the-loop workflows. A task generates a draft email, then pauses and waits for the user to approve or edit it before sending.
- Interactive agents. An AI agent running as a task needs follow-up information from the user mid-execution — clarifying a question, choosing between options, or providing additional context.
Quick Start
1. Define your input streams
Define input streams in a shared file so both your task code and your backend/frontend can import them:trigger/streams.ts
2. Receive data inside your task
trigger/draft-email.ts
3. Send data from your backend
Defining Input Streams
Usestreams.input() to define a typed input stream. The generic parameter controls the shape of data that can be sent:
trigger/streams.ts
.send() and the receiving methods (.wait(), .once(), .on(), .peek()) share the same type.
Receiving Data Inside a Task
Choosing the right method
| Method | Task suspended? | Compute cost while waiting | Best for |
|---|---|---|---|
.wait() | Yes | None — process freed | Approval gates, human-in-the-loop, long waits |
.once() | No | Full — process stays alive | Short waits, concurrent work. Returns result object with .unwrap() |
.on(handler) | No | Full — process stays alive | Continuous listening (cancel signals, live updates) |
wait() — Suspend until data arrives
Suspends the task entirely, freeing compute resources. The task resumes when data arrives via .send(). This is the most efficient option when the task has nothing else to do while waiting.
Returns a ManualWaitpointPromise — the same type returned by wait.forToken().
.unwrap() to throw on timeout instead of checking ok:
Options
| Option | Type | Description |
|---|---|---|
timeout | string | Maximum wait time before timeout. Period format: "30s", "5m", "1h", "24h", "7d". |
idempotencyKey | string | Reuse the same waitpoint across retries. If the task retries, it resumes the same wait instead of creating a new one. |
idempotencyKeyTTL | string | Expiration for the idempotency key. After this period, the same key creates a new waitpoint. |
tags | string[] | Tags for the underlying waitpoint, useful for filtering via wait.listTokens(). |
Idempotent waits for retries
once() — Wait for the next value (non-suspending)
Blocks until data arrives, but keeps the task process alive. Useful for short waits or when doing concurrent work.
Returns an InputStreamOncePromise — similar to ManualWaitpointPromise from .wait(). Await it for a result object, or chain .unwrap() to get the data directly.
.unwrap() to throw on timeout instead of checking ok:
once() also accepts an abort signal for cancellation:
on() — Listen for every value
Registers a persistent handler that fires on every piece of data. Handlers are automatically cleaned up when the task run completes, so you don’t need to manually unsubscribe. If you need to stop listening early (before the run ends), call .off() on the returned subscription.
peek() — Non-blocking check
Returns the most recent buffered value without waiting, or undefined if nothing has been received yet.
Sending Data to a Running Task
Use.send() from your backend to push data into a running task. See the backend input streams guide for detailed examples including API route patterns.
Complete Example: Cancellable AI Streaming
This is the most common use case — streaming an AI response while allowing the user to cancel mid-generation.Define the streams
trigger/streams.ts
Create the task
trigger/ai-task.ts
Cancel from a backend API route
app/api/cancel/route.ts
Display in your frontend
components/ai-chat.tsx
Important Notes
- Input streams require Realtime Streams v2 (enabled by default in SDK 4.1.0+). If you’re on an older version, calling
.on()or.once()will throw with instructions to enable it. - You cannot send data to a completed, failed, or canceled run.
- Maximum payload size per
.send()call is 1MB. - Data sent before any listener is registered is buffered and delivered when a listener attaches (for
.once()and.on()). .wait()handles the buffering race automatically — if data was sent before.wait()is called, it will still be received.- Type safety is enforced through the generic parameter on
streams.input<T>().
Best Practices
- Use
.wait()for long waits: If the task has nothing else to do until data arrives (approval gates, human-in-the-loop), use.wait()to free compute resources. Use.once()only for short waits or when doing concurrent work. - Listeners auto-cleanup:
.on()handlers are automatically cleaned up when the task run completes. Call.off()only if you need to stop listening early - Use timeouts: Both
.wait()and.once()support timeouts — always set one to avoid indefinite hangs - Use idempotency keys with
.wait(): If your task has retries enabled, pass anidempotencyKeyto.wait()so retries resume the same wait instead of creating a new one - Define streams in shared files: Keep your
streams.input()definitions in a shared location (liketrigger/streams.ts) so both task code and backend/frontend can import them with full type safety - Combine with output streams: Input streams pair naturally with output streams for full bidirectional communication — stream AI output to the frontend while accepting cancel signals from it
- Use descriptive stream IDs: Choose clear IDs like
"cancel","approval", or"user-response"instead of generic names

