Bidirectional task communication — send typed data into running tasks from your backend or frontend, with four receiving patterns from suspending (frees compute) to non-blocking.
Requires SDK v4.4.2+: npm install @trigger.dev/sdk@latest @trigger.dev/react-hooks@latest
Four receiving patterns
.on(): listen in the background
Fires your handler every time data arrives. The task keeps running. Useful for cancel signals and agent steering.
cancelSignal.on((data) => {
console.log("Cancelled:", data.reason);
controller.abort();
});
.wait(): suspend until data arrives
The task process is freed entirely — no compute cost while you wait. Resumes at the exact stopping point when data arrives, like a typed waitpoint.
const result = await approval.wait({ timeout: "7d" });
if (result.ok && result.output.approved) {
await publish(draft);
}
.once(): block for the next message
Blocks for the next message while keeping the process alive.
const data = await approval.once({ timeoutMs: 300_000 }).unwrap();
.peek(): check without waiting
Non-blocking. Returns the latest buffered value, or undefined if nothing has arrived yet.
const latest = cancelSignal.peek();
if (latest) {
// Someone already sent a cancel before we checked
}
Send from your backend
await userMessage.send(runId, {
type: "follow-up",
text: "Focus on competitor pricing",
});
Send from your frontend
Use the useInputStreamSend React hook with full type safety.
"use client";
import { useInputStreamSend } from "@trigger.dev/react-hooks";
import { userMessage } from "@/trigger/streams";
export function ResearchChat({ runId, accessToken }: {
runId: string;
accessToken: string;
}) {
const { send, isReady } = useInputStreamSend(
userMessage.id, runId, { accessToken }
);
return (
<div>
<button disabled={!isReady}
onClick={() => send({ type: "follow-up", text: "Go deeper on the pricing model" })}>
Go deeper
</button>
<button disabled={!isReady}
onClick={() => send({ type: "wrap-up", text: "That's enough, compile the report" })}>
Wrap up
</button>
</div>
);
}
Get started
Define a typed stream:
// trigger/streams.ts
import { streams } from "@trigger.dev/sdk";
export const userMessage = streams.input<{
type: "instruction" | "follow-up" | "wrap-up";
text: string;
}>({ id: "user-message" });
Listen in your task:
// trigger/research.ts
import { task } from "@trigger.dev/sdk";
import { userMessage } from "./streams";
export const researchAgent = task({
id: "research-agent",
run: async (payload: { topic: string }) => {
const messages: string[] = [];
let shouldWrapUp = false;
userMessage.on((data) => {
if (data.type === "wrap-up") {
shouldWrapUp = true;
}
messages.push(data.text);
});
let context = payload.topic;
while (!shouldWrapUp) {
const result = await generateResearch(context);
await saveFindings(result);
if (messages.length > 0) {
context = `${context}\n\nUser feedback:\n${messages.splice(0).join("\n")}`;
}
}
return await compileFinalReport(context);
},
});
Fetched June 3, 2026


