Skip to main content
Input Streams let you send data into a running task from your backend or frontend. While output streams send data out of tasks, input streams complete the loop — enabling bidirectional communication with your running tasks.
Input Streams require SDK version 4.4.2 or later and Realtime Streams v2 (enabled by default in SDK 4.1.0+).

Overview

Input Streams solve three common problems:
  • Cancelling AI streams mid-generation. When you use AI SDK’s streamText inside a task, the LLM keeps generating tokens until it’s done — even if the user navigated away or clicked “Stop generating.” With input streams, your frontend sends a cancel signal and the task aborts the LLM call immediately.
  • Human-in-the-loop workflows. A task generates a draft email, then pauses and waits for the user to approve or edit it before sending.
  • Interactive agents. An AI agent running as a task needs follow-up information from the user mid-execution — clarifying a question, choosing between options, or providing additional context.

Quick Start

1. Define your input streams

Define input streams in a shared file so both your task code and your backend/frontend can import them:
trigger/streams.ts
import { streams } from "@trigger.dev/sdk";

// Typed input stream — the generic parameter defines the shape of data sent in
export const cancelSignal = streams.input<{ reason?: string }>({
  id: "cancel",
});

export const approval = streams.input<{ approved: boolean; reviewer: string }>({
  id: "approval",
});

2. Receive data inside your task

trigger/draft-email.ts
import { task } from "@trigger.dev/sdk";
import { approval } from "./streams";

export const draftEmailTask = task({
  id: "draft-email",
  run: async (payload: { to: string; subject: string }) => {
    const draft = await generateDraft(payload);

    // Suspend until someone sends approval — no compute cost while waiting
    const { approved, reviewer } = await approval.wait({ timeout: "7d" }).unwrap();

    if (approved) {
      await sendEmail(draft);
      return { sent: true, reviewer };
    }

    return { sent: false, reviewer };
  },
});

3. Send data from your backend

import { approval } from "./trigger/streams";

// Approve a draft from your API route
await approval.send(runId, { approved: true, reviewer: "alice@example.com" });

Defining Input Streams

Use streams.input() to define a typed input stream. The generic parameter controls the shape of data that can be sent:
trigger/streams.ts
import { streams } from "@trigger.dev/sdk";

// Simple signal (no data needed beyond the type)
export const cancelSignal = streams.input<{ reason?: string }>({
  id: "cancel",
});

// Structured data
export const approval = streams.input<{ approved: boolean; reviewer: string }>({
  id: "approval",
});

// Complex objects
export const userResponse = streams.input<{
  action: "approve" | "reject" | "edit";
  message?: string;
  edits?: Record<string, string>;
}>({
  id: "user-response",
});
Type safety is enforced through the generic parameter — both .send() and the receiving methods (.wait(), .once(), .on(), .peek()) share the same type.

Receiving Data Inside a Task

Choosing the right method

MethodTask suspended?Compute cost while waitingBest for
.wait()YesNone — process freedApproval gates, human-in-the-loop, long waits
.once()NoFull — process stays aliveShort waits, concurrent work. Returns result object with .unwrap()
.on(handler)NoFull — process stays aliveContinuous listening (cancel signals, live updates)

wait() — Suspend until data arrives

Suspends the task entirely, freeing compute resources. The task resumes when data arrives via .send(). This is the most efficient option when the task has nothing else to do while waiting. Returns a ManualWaitpointPromise — the same type returned by wait.forToken().
import { task } from "@trigger.dev/sdk";
import { approval } from "./streams";

export const publishPost = task({
  id: "publish-post",
  run: async (payload: { postId: string }) => {
    const draft = await prepareDraft(payload.postId);
    await notifyReviewer(draft);

    // Suspend until reviewer responds — no compute cost while waiting
    const result = await approval.wait({ timeout: "7d" });

    if (result.ok) {
      if (result.output.approved) {
        await publish(draft);
        return { published: true, reviewer: result.output.reviewer };
      }
      return { published: false, reviewer: result.output.reviewer };
    }

    // Timed out after 7 days
    return { published: false, timedOut: true };
  },
});
Use .unwrap() to throw on timeout instead of checking ok:
// Throws WaitpointTimeoutError if the timeout is reached
const data = await approval.wait({ timeout: "24h" }).unwrap();
console.log(data.approved); // TData directly

Options

OptionTypeDescription
timeoutstringMaximum wait time before timeout. Period format: "30s", "5m", "1h", "24h", "7d".
idempotencyKeystringReuse the same waitpoint across retries. If the task retries, it resumes the same wait instead of creating a new one.
idempotencyKeyTTLstringExpiration for the idempotency key. After this period, the same key creates a new waitpoint.
tagsstring[]Tags for the underlying waitpoint, useful for filtering via wait.listTokens().

Idempotent waits for retries

export const processOrder = task({
  id: "process-order",
  retry: { maxAttempts: 3 },
  run: async (payload: { orderId: string }) => {
    await prepareOrder(payload.orderId);

    // Same idempotency key across retries — won't create duplicate waitpoints
    const result = await approval.wait({
      timeout: "48h",
      idempotencyKey: `order-approval-${payload.orderId}`,
    });

    if (!result.ok) {
      throw new Error("Approval timed out after 48 hours");
    }

    await fulfillOrder(payload.orderId, result.output);
  },
});

once() — Wait for the next value (non-suspending)

Blocks until data arrives, but keeps the task process alive. Useful for short waits or when doing concurrent work. Returns an InputStreamOncePromise — similar to ManualWaitpointPromise from .wait(). Await it for a result object, or chain .unwrap() to get the data directly.
import { task } from "@trigger.dev/sdk";
import { approval } from "./streams";

export const draftEmailTask = task({
  id: "draft-email",
  run: async (payload: { to: string; subject: string }) => {
    const draft = await generateDraft(payload);

    // Task pauses here until someone sends approval (with a 5-minute timeout)
    const result = await approval.once({ timeoutMs: 300_000 });

    if (!result.ok) {
      // Timed out — result.error is an InputStreamTimeoutError
      return { sent: false, timedOut: true };
    }

    if (result.output.approved) {
      await sendEmail(draft);
      return { sent: true, reviewer: result.output.reviewer };
    }

    return { sent: false, reviewer: result.output.reviewer };
  },
});
Use .unwrap() to throw on timeout instead of checking ok:
// Throws InputStreamTimeoutError if no data arrives within 5 minutes
const data = await approval.once({ timeoutMs: 300_000 }).unwrap();
console.log(data.approved); // TData directly
once() also accepts an abort signal for cancellation:
// With an abort signal — rejects the promise when aborted
const controller = new AbortController();
const result = await approval.once({ signal: controller.signal });

on() — Listen for every value

Registers a persistent handler that fires on every piece of data. Handlers are automatically cleaned up when the task run completes, so you don’t need to manually unsubscribe. If you need to stop listening early (before the run ends), call .off() on the returned subscription.
import { task } from "@trigger.dev/sdk";
import { cancelSignal } from "./streams";

export const streamingTask = task({
  id: "streaming-task",
  run: async (payload: { prompt: string }) => {
    const controller = new AbortController();

    // Listen for cancel signals — automatically cleaned up when run completes
    cancelSignal.on((data) => {
      console.log("Cancelled:", data.reason);
      controller.abort();
    });

    const result = await streamText({
      model: openai("gpt-4o"),
      prompt: payload.prompt,
      abortSignal: controller.signal,
    });
    return result;
  },
});

peek() — Non-blocking check

Returns the most recent buffered value without waiting, or undefined if nothing has been received yet.
const latest = cancelSignal.peek();
if (latest) {
  // A cancel was already sent before we checked
}

Sending Data to a Running Task

Use .send() from your backend to push data into a running task. See the backend input streams guide for detailed examples including API route patterns.
import { cancelSignal, approval } from "./trigger/streams";

// Cancel a running AI stream
await cancelSignal.send(runId, { reason: "User clicked stop" });

// Approve a draft
await approval.send(runId, { approved: true, reviewer: "alice@example.com" });

Complete Example: Cancellable AI Streaming

This is the most common use case — streaming an AI response while allowing the user to cancel mid-generation.

Define the streams

trigger/streams.ts
import { streams } from "@trigger.dev/sdk";

export const aiOutput = streams.define<string>({ id: "ai" });
export const cancelStream = streams.input<{ reason?: string }>({ id: "cancel" });

Create the task

trigger/ai-task.ts
import { task } from "@trigger.dev/sdk";
import { streamText } from "ai";
import { openai } from "@ai-sdk/openai";
import { aiOutput, cancelStream } from "./streams";

export const aiTask = task({
  id: "ai-chat",
  run: async (payload: { prompt: string }) => {
    const controller = new AbortController();

    // If the user cancels, abort the LLM call
    cancelStream.on(() => {
      controller.abort();
    });

    const result = streamText({
      model: openai("gpt-4o"),
      prompt: payload.prompt,
      abortSignal: controller.signal,
    });

    // Stream output to the frontend in real-time
    const { waitUntilComplete } = aiOutput.pipe(result.textStream);
    await waitUntilComplete();

    return { text: await result.text };
  },
});

Cancel from a backend API route

app/api/cancel/route.ts
import { cancelStream } from "@/trigger/streams";

export async function POST(req: Request) {
  const { runId } = await req.json();
  await cancelStream.send(runId, { reason: "User clicked stop" });
  return Response.json({ cancelled: true });
}

Display in your frontend

components/ai-chat.tsx
"use client";

import { useRealtimeStream } from "@trigger.dev/react-hooks";
import { aiOutput } from "@/trigger/streams";

export function AIChat({
  runId,
  accessToken,
}: {
  runId: string;
  accessToken: string;
}) {
  const { parts, error } = useRealtimeStream(aiOutput, runId, {
    accessToken,
    timeoutInSeconds: 300,
  });

  const handleCancel = async () => {
    await fetch("/api/cancel", {
      method: "POST",
      body: JSON.stringify({ runId }),
    });
  };

  if (error) return <div>Error: {error.message}</div>;
  if (!parts) return <div>Loading...</div>;

  return (
    <div>
      <div>{parts.join("")}</div>
      <button onClick={handleCancel}>Stop generating</button>
    </div>
  );
}

Important Notes

  • Input streams require Realtime Streams v2 (enabled by default in SDK 4.1.0+). If you’re on an older version, calling .on() or .once() will throw with instructions to enable it.
  • You cannot send data to a completed, failed, or canceled run.
  • Maximum payload size per .send() call is 1MB.
  • Data sent before any listener is registered is buffered and delivered when a listener attaches (for .once() and .on()).
  • .wait() handles the buffering race automatically — if data was sent before .wait() is called, it will still be received.
  • Type safety is enforced through the generic parameter on streams.input<T>().

Best Practices

  1. Use .wait() for long waits: If the task has nothing else to do until data arrives (approval gates, human-in-the-loop), use .wait() to free compute resources. Use .once() only for short waits or when doing concurrent work.
  2. Listeners auto-cleanup: .on() handlers are automatically cleaned up when the task run completes. Call .off() only if you need to stop listening early
  3. Use timeouts: Both .wait() and .once() support timeouts — always set one to avoid indefinite hangs
  4. Use idempotency keys with .wait(): If your task has retries enabled, pass an idempotencyKey to .wait() so retries resume the same wait instead of creating a new one
  5. Define streams in shared files: Keep your streams.input() definitions in a shared location (like trigger/streams.ts) so both task code and backend/frontend can import them with full type safety
  6. Combine with output streams: Input streams pair naturally with output streams for full bidirectional communication — stream AI output to the frontend while accepting cancel signals from it
  7. Use descriptive stream IDs: Choose clear IDs like "cancel", "approval", or "user-response" instead of generic names