Problem:
Cancelling a message the Flow-control does not take affect immediately. Especially with the rate option. In the current implementation, with the rate, The messages are scheduled to fire in the future. Because of this implementation, the cancel is recorded CANCEL_REQUESTED for all these messages and only logged as CANCELLED when the schedule time hits.
Another big problem is that, when you want to fire a new message with the flow control, all the slots in the future is already taken by -soon to be cancelled- messages. This makes cancel effectively unusable.
Solution:
The flow control is rewritten to take the CANCEL into account sooner. And new messages does not need to wait for old slots to be opened.
Details:
With the new implementation, the messages are not scheduled to the future but put to a ordered wait list to be fired when the flow control limit allows. When a flow control is cancelled. Again all the messages in progress are marked as CANCEL REQUESTED.
The new messages are immediately put to wait list. When a flow control limit is opened and waitlist needs to be consumed, the CANCEL REQUESTED messages are all CANCELLED immediately. This allows, new messages in the wait list are fired immediately as well.
QStash messages may contain sensitive data in the body or headers. Users need a way to redact these fields so they are not visible in the dashboard or API responses, preventing accidental data leaks. This also enables safer use of QStash in multi-tenant setups where customer data may be included in messages.
Allow specifying fields to redact when publishing a message.
" });
const res = await client.publishJSON({
url: "https://my-api...",
body: { hello: "world" },
redact: {
body: true,
header: ["Authorization"] // or header: true to redact all headers
},
});">import { Client } from "@upstash/qstash";
const client = new Client({ token: "" });
const res = await client.publishJSON({
url: "https://my-api...",
body: { hello: "world" },
redact: {
body: true,
header: ["Authorization"] // or header: true to redact all headers
},
});
In this example, the message body and the "Authorization" header are redacted and will not appear in the UI or API responses. Instead, the UI will display REDACTED: so users can verify the value without revealing the original data.
Workflow logs may contain sensitive data in the body or headers. Users need a way to redact these fields so they are not visible in the dashboard or API responses, preventing accidental data leaks. This also enables safer use of Workflow in multi-tenant setups where customer data may be included in logs.
Allow specifying fields to redact when triggering a workflow.
" });
const { workflowRunId } = await client.trigger({
url: "https://my-app.com/api/workflow",
body: { hello: "world" },
redact: {
body: true,
header: ["Authorization"] // or header: true to redact all headers
},
});">import { Client } from "@upstash/workflow";
const client = new Client({ token: "" });
const { workflowRunId } = await client.trigger({
url: "https://my-app.com/api/workflow",
body: { hello: "world" },
redact: {
body: true,
header: ["Authorization"] // or header: true to redact all headers
},
});
In this example, the log body and the "Authorization" header are redacted and will not appear in the UI or API responses. Instead, the UI will display REDACTED: so users can verify the value without revealing the original data.
TypeScript SDK (workflow-js):
Added optional workflowRunId parameter to notify method, enabling lookback functionality. When provided, notifications are stored and delivered even if sent before a workflow reaches waitForEvent, preventing race conditions. See notify documentation and wait-for-event guide for details.
Right now, the flow control is only configured with the publish. This makes it hard for our users to change the config when needed from outside.
We will give ability to pin a rate/parallelism to a flow-control. This way the pinned values will be used instead of the values passed via the messages.
We will also have an API to unpin the messages.
This is to give much-lower latencies when sending a message via accepting the request from the closest region.
Currently it is hard for our users to understand what is going on with Flow-Control/Global Parallellism.
Did it hit the limit?
How many messages are waiting because of the limit?
What is the affective Flow-Control Rate/Parallelism/Period ?
We are revising the Flow-Control implementation. The plan is to improve the get flow-control endpoint as a start to help users more
GET /v2/flow-control
Current version:
", "waitlistSize": 123, }">
{
"flowControlKey": "",
"waitlistSize": 123,
}
Improved version:
", // "$" for the messages without flow control. "waitlistSize": 123, "parallelism": 10, // If not exists, it means not limited by parallelism "parallelismCount": 5, // 5 tasks are running in parallel at the moment "rateMax": 10, // If not exists, it means not limited by rate "rateCount" : 4, // 4 tasks are started within the last period "ratePeriod": "1d", "ratePeriodStart": 1708000000 // Unix timestamp when the current rate period started }">
{
"flowControlKey": "", // "$" for the messages without flow control.
"waitlistSize": 123,
"parallelism": 10, // If not exists, it means not limited by parallelism
"parallelismCount": 5, // 5 tasks are running in parallel at the moment
"rateMax": 10, // If not exists, it means not limited by rate
"rateCount" : 4, // 4 tasks are started within the last period
"ratePeriod": "1d",
"ratePeriodStart": 1708000000 // Unix timestamp when the current rate period started
}
To access new fields via SDK -> https://upstash.com/docs/qstash/features/flowcontrol#get-a-single-flow-control-key
There is a special $ wait list on this list that reports the waitlist size for the messages that doesn't have any related flow control key and just waiting for global parallelism.
To access global parallelism info via SDK -> https://upstash.com/docs/qstash/features/flowcontrol#get-global-parallelism
You can access all these via rest api as well here: https://upstash.com/docs/qstash/api-refence/flow-control/list-flow-control-keys
Flow Control tab of QStash Console also shows all these data together with the historic global parallelism graph. And there is more detailed version of global parallelism graph on the Usage tab of the QStash Console.
TypeScript SDK (qstash-js) and Python SDK (qstash-py):
Added flow control management API: list(), get(key), and reset(key) methods on client.flowControl (TypeScript) and client.flow_control (Python). The get and list responses now include rich metrics: waitListSize, parallelismMax, parallelismCount, rateMax, rateCount, ratePeriod, and ratePeriodStart. See the Flow Control docs for code examples.
QStash Server:
GET /v2/flowControl now supports an optional search query parameter to filter keys by name. New POST /v2/flowControl/:flowControlKey/reset endpoint resets parallelism and rate counters for a key. GET /v2/globalParallelism now returns { parallelismMax, parallelismCount } instead of the old { waitListSize } shape.
This major version of the TypeScript SDK is done with breaking changes to improve the developer experience, reduce bundle size, and simplify configuration. We have prepared a migration guide for existing users below:
Added Redis Functions support. New commands are:
FCALL: Call a function with read/write capabilities FCALL_RO: Call a function in read-only mode FUNCTION DELETE, FUNCTION FLUSH, FUNCTION KILL, FUNCTION LIST, FUNCTION LOAD and FUNCTION STATS
New Hash commands:
HGETDEL: Get and delete hash fields atomically HGETEX: Get hash fields with expiration support HSETEX: Set hash fields with expiration support
New Stream Commands:
XDELEX: Extended delete for streams XACKDEL: Acknowledge and delete stream entries
New Bit operations added:
BITOP DIFF: A bit is set only if it's set in all source bitmaps BITOP DIFF1: A bit is set if it's set in the first key but not in any of the other keys BITOP ANDOR: A bit is set if it's set in X and also in one or more of Y1, Y2, ... BITOP ONE: A bit is set if it's set in exactly one source key
Added HASH expiration support. New commands are:
HEXPIRE: Set expiration time in seconds HPEXPIRE: Set expiration time in milliseconds HEXPIREAT: Set expiration time as Unix timestamp in seconds HPEXPIREAT: Set expiration time as Unix timestamp in milliseconds HTTL: Get remaining time to live in seconds HPTTL: Get remaining time to live in milliseconds HEXPIRETIME: Get absolute expiration time as Unix timestamp in seconds HPEXPIRETIME: Get absolute expiration time as Unix timestamp in milliseconds HPERSIST: Remove expiration from hash fields
We currently provide a wait–notify mechanism that allows a workflow run to pause on an event ID and resume when a notify call is received. This works well when developers control both sides of the communication and can send the notification directly.
However, when a workflow needs to wait for a notification from a third-party provider, developers must register a waiter for a specific event ID, expose a custom endpoint to receive the third-party webhook, and then manually call our notify API with the event ID to resume the workflow. This process adds overhead and can lead to race conditions.
To improve the developer experience for webhook-based integrations and eliminate race conditions, we're introducing a new native webhook wait API. This API allows a workflow run to pause execution until an auto-generated webhook URL receives a notification. The generated URL can be passed directly to third-party providers, which will automatically notify the corresponding workflow run.
Each webhook can receive multiple notifications and be awaited multiple times.
Below is an early draft of the API (subject to change before release):
{
// 👇 Create a webhook to recieve events
const webhook = await context.createWebhook("fal-generation-webhook");
await context.run("start-generation", async () => {
const { request_id } = await fal.queue.submit("fal-ai/flux/dev", {
input: {
prompt: "a cat",
seed: 6252023,
image_size: "landscape_4_3",
num_images: 4,
},
// 👇 Pass webhook.url to third-party service
webhookUrl: webhook.url,
});
return request_id
});
// 👇 Wait for webhook to be called
const result = await context.waitForWebhook("wait-until-generation-completes", webhook, "1d");
await context.run("send-generation-to-user", async () => {
// 👇 result.timeout OR result.request
const req = result.request!;
// 👇 Use the native Request object of the webhook call
const data = await req.json();
console.log("Generation completed:", data.status);
});
} ); ">import { serve } from "@upstash/workflow/nextjs"; import { fal } from "@fal-ai/client";
export const { POST } = serve( async (context) => {
// 👇 Create a webhook to recieve events
const webhook = await context.createWebhook("fal-generation-webhook");
await context.run("start-generation", async () => {
const { request_id } = await fal.queue.submit("fal-ai/flux/dev", {
input: {
prompt: "a cat",
seed: 6252023,
image_size: "landscape_4_3",
num_images: 4,
},
// 👇 Pass webhook.url to third-party service
webhookUrl: webhook.url,
});
return request_id
});
// 👇 Wait for webhook to be called
const result = await context.waitForWebhook("wait-until-generation-completes", webhook, "1d");
await context.run("send-generation-to-user", async () => {
// 👇 result.timeout OR result.request
const req = result.request!;
// 👇 Use the native Request object of the webhook call
const data = await req.json();
console.log("Generation completed:", data.status);
});
} );
Added option to upsert and query raw data using Upstash embedding service.
Implemented metadata filtering with SQL-like syntax. See Metadata Filtering.
Remember last X triggers from the console so that it can be run again easily.
This is aimed for development purposes.
Remember last X publishes from the console so that it can be run again easily.
This is aimed for development purposes.
Add ability to list the flow-controls on the console together with their waitListSizes.
Make it searchable.
Add ability to list the flow-controls on the console together with their waitListSizes.
Make it searchable.
Add ability to restart or resume a workflow from the DLQ.
Restart : Ignore already completed steps and start the workflow from the beginning with the original body and headers.
Resume: Keep the completed steps and try to run the workflow starting from that point on.
See https://upstash.com/docs/workflow/features/dlq#recovery-actions