Problem:
Cancelling a message the Flow-control does not take affect immediately. Especially with the rate option. In the current implementation, with the rate, The messages are scheduled to fire in the future. Because of this implementation, the cancel is recorded CANCEL_REQUESTED for all these messages and only logged as CANCELLED when the schedule time hits.
Another big problem is that, when you want to fire a new message with the flow control, all the slots in the future is already taken by -soon to be cancelled- messages. This makes cancel effectively unusable.
Solution:
The flow control is rewritten to take the CANCEL into account sooner. And new messages does not need to wait for old slots to be opened.
Details:
With the new implementation, the messages are not scheduled to the future but put to a ordered wait list to be fired when the flow control limit allows. When a flow control is cancelled. Again all the messages in progress are marked as CANCEL REQUESTED.
The new messages are immediately put to wait list. When a flow control limit is opened and waitlist needs to be consumed, the CANCEL REQUESTED messages are all CANCELLED immediately. This allows, new messages in the wait list are fired immediately as well.
QStash messages may contain sensitive data in the body or headers. Users need a way to redact these fields so they are not visible in the dashboard or API responses, preventing accidental data leaks. This also enables safer use of QStash in multi-tenant setups where customer data may be included in messages.
Allow specifying fields to redact when publishing a message.
" });
const res = await client.publishJSON({
url: "https://my-api...",
body: { hello: "world" },
redact: {
body: true,
header: ["Authorization"] // or header: true to redact all headers
},
});">import { Client } from "@upstash/qstash";
const client = new Client({ token: "" });
const res = await client.publishJSON({
url: "https://my-api...",
body: { hello: "world" },
redact: {
body: true,
header: ["Authorization"] // or header: true to redact all headers
},
});
In this example, the message body and the "Authorization" header are redacted and will not appear in the UI or API responses. Instead, the UI will display REDACTED: so users can verify the value without revealing the original data.
Right now, the flow control is only configured with the publish. This makes it hard for our users to change the config when needed from outside.
We will give ability to pin a rate/parallelism to a flow-control. This way the pinned values will be used instead of the values passed via the messages.
We will also have an API to unpin the messages.
Add ability to pause and resume flow control keys.
This is to give much-lower latencies when sending a message via accepting the request from the closest region.
Currently it is hard for our users to understand what is going on with Flow-Control/Global Parallellism.
Did it hit the limit?
How many messages are waiting because of the limit?
What is the affective Flow-Control Rate/Parallelism/Period ?
We are revising the Flow-Control implementation. The plan is to improve the get flow-control endpoint as a start to help users more
GET /v2/flow-control
Current version:
", "waitlistSize": 123, }">
{
"flowControlKey": "",
"waitlistSize": 123,
}
Improved version:
", // "$" for the messages without flow control. "waitlistSize": 123, "parallelism": 10, // If not exists, it means not limited by parallelism "parallelismCount": 5, // 5 tasks are running in parallel at the moment "rateMax": 10, // If not exists, it means not limited by rate "rateCount" : 4, // 4 tasks are started within the last period "ratePeriod": "1d", "ratePeriodStart": 1708000000 // Unix timestamp when the current rate period started }">
{
"flowControlKey": "", // "$" for the messages without flow control.
"waitlistSize": 123,
"parallelism": 10, // If not exists, it means not limited by parallelism
"parallelismCount": 5, // 5 tasks are running in parallel at the moment
"rateMax": 10, // If not exists, it means not limited by rate
"rateCount" : 4, // 4 tasks are started within the last period
"ratePeriod": "1d",
"ratePeriodStart": 1708000000 // Unix timestamp when the current rate period started
}
To access new fields via SDK -> https://upstash.com/docs/qstash/features/flowcontrol#get-a-single-flow-control-key
There is a special $ wait list on this list that reports the waitlist size for the messages that doesn't have any related flow control key and just waiting for global parallelism.
To access global parallelism info via SDK -> https://upstash.com/docs/qstash/features/flowcontrol#get-global-parallelism
You can access all these via rest api as well here: https://upstash.com/docs/qstash/api-refence/flow-control/list-flow-control-keys
Flow Control tab of QStash Console also shows all these data together with the historic global parallelism graph. And there is more detailed version of global parallelism graph on the Usage tab of the QStash Console.
Remember last X publishes from the console so that it can be run again easily.
This is aimed for development purposes.
Add ability to list the flow-controls on the console together with their waitListSizes.
Make it searchable.
QStash requires a publicly available API to send messages to. During development when applications are not yet deployed, developers typically need to expose their local API by creating a public tunnel. While local tunneling works seamlessly, it requires code changes between development and production environments and increase friction for developers. To simplify the development process, Upstash provides QStash CLI, which allows you to run a development server locally for testing and development.
With this work, the same UI that is given on the console.upstash.com can be used with local server(QStash CLI)