releases.shpreview

Input streams: send data into running tasks

v4.4.2

Bidirectional task communication — send typed data into running tasks from your backend or frontend, with four receiving patterns from suspending (frees compute) to non-blocking.

Requires SDK v4.4.2+: npm install @trigger.dev/sdk@latest @trigger.dev/react-hooks@latest

Four receiving patterns

.on(): listen in the background

Fires your handler every time data arrives. The task keeps running. Useful for cancel signals and agent steering.

cancelSignal.on((data) => {
  console.log("Cancelled:", data.reason);
  controller.abort();
});

.wait(): suspend until data arrives

The task process is freed entirely — no compute cost while you wait. Resumes at the exact stopping point when data arrives, like a typed waitpoint.

const result = await approval.wait({ timeout: "7d" });
if (result.ok && result.output.approved) {
  await publish(draft);
}

.once(): block for the next message

Blocks for the next message while keeping the process alive.

const data = await approval.once({ timeoutMs: 300_000 }).unwrap();

.peek(): check without waiting

Non-blocking. Returns the latest buffered value, or undefined if nothing has arrived yet.

const latest = cancelSignal.peek();
if (latest) {
  // Someone already sent a cancel before we checked
}

Send from your backend

await userMessage.send(runId, {
  type: "follow-up",
  text: "Focus on competitor pricing",
});

Send from your frontend

Use the useInputStreamSend React hook with full type safety.

"use client";
import { useInputStreamSend } from "@trigger.dev/react-hooks";
import { userMessage } from "@/trigger/streams";

export function ResearchChat({ runId, accessToken }: {
  runId: string;
  accessToken: string;
}) {
  const { send, isReady } = useInputStreamSend(
    userMessage.id, runId, { accessToken }
  );

  return (
    <div>
      <button disabled={!isReady}
        onClick={() => send({ type: "follow-up", text: "Go deeper on the pricing model" })}>
        Go deeper
      </button>
      <button disabled={!isReady}
        onClick={() => send({ type: "wrap-up", text: "That's enough, compile the report" })}>
        Wrap up
      </button>
    </div>
  );
}

Get started

Define a typed stream:

// trigger/streams.ts
import { streams } from "@trigger.dev/sdk";

export const userMessage = streams.input<{
  type: "instruction" | "follow-up" | "wrap-up";
  text: string;
}>({ id: "user-message" });

Listen in your task:

// trigger/research.ts
import { task } from "@trigger.dev/sdk";
import { userMessage } from "./streams";

export const researchAgent = task({
  id: "research-agent",
  run: async (payload: { topic: string }) => {
    const messages: string[] = [];
    let shouldWrapUp = false;

    userMessage.on((data) => {
      if (data.type === "wrap-up") {
        shouldWrapUp = true;
      }
      messages.push(data.text);
    });

    let context = payload.topic;
    while (!shouldWrapUp) {
      const result = await generateResearch(context);
      await saveFindings(result);
      if (messages.length > 0) {
        context = `${context}\n\nUser feedback:\n${messages.splice(0).join("\n")}`;
      }
    }
    return await compileFinalReport(context);
  },
});

Fetched June 3, 2026