Hello, I have thought of a way that might work, in order to both fix
starvation, and reduce complexity, while increasing maintainability.
The main issue that causes starvation is that we have scheduled task
instances which /cannot/ be ran, and filtering it dynamically with sql,
is not really possible without either, significantly hindering
performance, or introducing query and maintainability complexity, just
like the stores sql procedures do, though they do solve the issue, they
create a new one where we have 3 implementations, one for each sql
dialect, which in my opinion, is better to be avoided.
The proposed solution is that in the dagrun 'update_state' function
(which is part of a critical section) we only create tasks which can be
set to running, using the new fields added, and moving the heavy part
away from the task selection in the '_executable_tis_to_queued' method,
where we will only have to do one query, no loop, and only check
executor slots.
To achieve this, we can use the 'concurrency map' created in sql in the
pessimistic-task-selection-with-window-functions pr
(https://github.com/apache/airflow/pull/53492)
and use it to get simple and lightweight mappings, which will look like
so (sorted by priority_order and DR logical date):
-------------------------
{dag_id} | {dagrun_id} | {task_id} | currently running
mapped tasks (if any) | total running tis per dagrun | total
tis per task run | pool_slots
--------------------------
And a second key value pairs for pools:
{pool_id} | available_pool_slots
And now, after we have sorted these fields, we can select the top N
dagruns to have their state updated according to the order of the sorted
table above, and to the limit for dagruns to examine each loop, and
create task instances for those dagruns only, and we create as many
tasks as we can, by priority, as long as the concurrency limits are
preserved, this keeps the current behaviour, while examining more
prioritized operators first, and then moving to less prioritized ones,
we can also improve it by increasing the limit of dagruns to be examined
or introducing a 'last_scheduling_decision' sorting as well so that we
update the state of all dagruns in a round robin fashion rather than
only getting the most prioritized ones first, however, this does break
the current priority behaviour a little.
This means that instead of taking a lock on the pools table during the
task fetching, we take the lock on the pool while examining dagruns and
updating their state, and during task fetching we lock only the
'task_instance' rows, meaning schedulers can now set tasks to running in
parallel.
There are a few other possible derivations of the same solution,
including, looking at each dagrun individually and sending a query per
dagrun, and thus, only locking the pool/pools the current dagrun's
operators are assigned to, allowing multiple schedulers to do the same
work on different dagruns as long as they are in different pools.
Another derivation could be to just go over all dagruns with no limit
until we cannot create new tasks instances that can be set to running
(according to the concurrency limits), which will change the sorting
order a little, where operator priority order comes AFTER dagrun logical
date.
In conclusion, this solution could be a viable option, it also
simplifies the task selection process a lot and can help remove a lot of
code, while making it easier to maintain, and using a simpler algorithm,
and it also changes the semantic of the scheduled' state of the
task_instance, where now, a 'scheduled' ti, can be sent to the executor
immediately, and so tasks will not be in the 'scheduled' state for long.
This proposed solution is, in my opinion the simplest solution, and most
maintainable one, though I might be biased, as I have explored multiple
solutions with Asquator, and I have come to the conclusion that solving
it using SQL is not a viable option, I have yet to open the PR to
airflow, and only have it locally on my computer as I am making a lot of
experimental commits, and a lot of things are changing very quickly,
once I have stabilized things a little, I will open the PR for anyone
who's interested.
I would appreciate any feedback or other ideas/propositions from the
airflow community, both on this solution and its derivations or on any
alternative solutions.
On 9/24/25 9:11 PM, asquator wrote:
It is a complication, but it seems as we can't do any better and remain
scalable. In the end we want priorities enforced (mb not in the way they're
implemented today, but it's part of another talk), and we don't know how many
tasks we'll have to iterate over in advance, so fetching them into python is a
death sentence in some situations (not joking, tried that with fetchmany and
chunked streaming, it was way too slow).
I actually thought of another optimization here:
Instead of fetching the entire TI relation, we can ignore mapped tasks and only
fetch individual tasks (operators), expanding them on the fly into the maximum
number of TIs that can be created. And yet this approach is not scalable, as
some enormous backfill of a DAG with just 10 tasks will make it fetch MBs of
data every time. It's very slow and loads the DB server with heavy network
requests.
Well, it's not just about throughput, but starvation of tasks that can't run
for hours sometimes, and unfortunately we encounter this in production very
often.
On Wednesday, September 24th, 2025 at 3:10 AM, Matthew
Phillips<[email protected]> wrote:
Hi,
This seems like a significant level of technical complication/debt relative
to even a 1.5x/2x gain (which as noted is only in certain workloads).
Given airflow scheduling code in general is not something one could
describe as simple, introducing large amounts of static code that lives in
stored procs seems unwise.If at all possible making this
interface pluggable and provided via provider would be the saner approach
in my opinion.
On Tue, Sep 23, 2025 at 11:16 AM [email protected] wrote:
Hello,
A new approach utilizing stored SQL functions is proposed here as a
solution to unnecessary processing/starvation:
https://github.com/apache/airflow/pull/55537
Benchmarks show an actual improvement in queueing throughput, around
1.5x-2x for the workloads tested.
Regarding DB server load, I wasn't able to note any difference so far, we
probably have to run heavier jobs to test that. Looks like a smooth line to
me.
---------------------------------------------------------------------
To unsubscribe, e-mail:[email protected]
For additional commands, e-mail:[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail:[email protected]
For additional commands, e-mail:[email protected]