I've recently rewrote the AIP-100 to deal with eliminating starvation in 
Airflow scheduler, and documented all the strategies ever considered as a 
replacement for the current task queue selector:

https://cwiki.apache.org/confluence/display/AIRFLOW/%5BDRAFT%5D+AIP-100+Eliminate+Scheduler+Starvation+On+Concurrency+Limits

Hopefully, the document will help us stay focused on the possible alternatives 
and lead to a fruitful discussion. 


On Sunday, September 28th, 2025 at 11:21 PM, asquator <[email protected]> 
wrote:

> 
> 
> Hey,
> 
> > Regarding lazy creation of tasks I assume this would be a very complex
> 
> internal change.
> 
> Absolutely, I agree. Especially about the UI and the BI.
> 
> > I understand that scheduling is easier if tasks need not be assessed but
> 
> then you have the complexity one step later: Whenenver a task is
> finished (success, failed, given-up) the scheduler would need to
> calculate and create the successor tasks as INSERT. I am not sure
> whether this (lazy) creation then saves more work compared.
> 
> Currently we just mark the successor task as SCHEDULED, sounds like purely 
> doing a different operation.
> 
> > Would it be an option to add another
> 
> state for candidates that are blocked by limits? Like
> "scheduled/predecessors make then ready" and "scheduled/other
> constraints throttle" (like limits on pools, tasks, runs...) - then
> these buckets could be queried directly with constraints?
> 
> The variety of states will probably just complicate the matters. I also don't 
> think that new states will help here, as validating all the limits (we have 
> 4+1) isn't that hard, and tasks can easily be blocked by several limits. The 
> situation may change very rapidly and tracking these transitions will be a 
> pain.
> 
> > One thing that needs to be considered and is maximum complexity in
> 
> scheduling logic is the mapped-task-group way of scheduling as all
> parallel path's are scheduled in parallel and even on operator level
> caching is hardly to be achieved.
> 
> I think caching on operator level should be enough as all the limit and 
> priority information for mapped TIs of the same task is contained in the 
> operator itself. As we scheduled mapped TIs from first to last, we can store 
> an in-memory index of the last scheduled TI in the batch. If it's not in 
> cache, a query can be fired to compute the diff. That's why I assumed that 
> the amount of memory should be enough to store the operators of running dags 
> (of the correct versions).
> 
> > Caching Dags in memory also need to be considered careful. Depending on
> 
> the size also you can run into memory limitations with many many Dags.
> 
> Exactly, I think it's the top level concern here. If we're able decide that 
> the amount of running DAGs shouldn't be large enough, or just introduce an 
> incredibly hard-to-achieve limit (say 1000000) + LRU eviction, things may 
> become easier to solve. If not - well, we're having the same problem. Only 
> stored procedures will help here as data in the DB is best treated... by the 
> DB.
> 
> > One option might be to prepare the data for scheduling decisions in a
> 
> dedicated data structure like a table in the DB such that quering is
> fast. [...] Assume you have
> a dedicated table only for "schedulable work" and the scheduler would
> not use the TI table for checking next tasks?
> 
> I don't see how it's better than querying the TI table itself. Well, I 
> actually tested the following approach:
> 1. Run the current CS query
> 2. Rerun it until `max_tis` tasks are found or all of them are checked
> 3. Filter out the `starved_` entities and save the tasks fit to be checked 
> (IDs) into a temp table `task_sieve`.
> 
> Basically the solution here:
> [https://github.com/apache/airflow/issues/45636#issuecomment-2592113479], but 
> involving a temp table.
> It was slow as reinserting thousands/millions of IDs into the same table 
> multiple times is a grind.
> 
> Regarding preserving information between iterations - maybe... But again we 
> lock ourselves to the DB, and it's hard to do without sprocs. Again, query 
> time isn't the main concern. The technological problem is how to check all 
> the concurrency limits on possibly a large amount of TIs stored in the DB.
> 
> > Anyway as redundantly in the other emails I assume it would be good to
> 
> have a write-up of different options along with requirements
> 
> We will be there one day. We've tried several approaches that fit into the 
> current critical section, and the idea of trying something else is new, so we 
> just outline possible strategies. So far only one successful POC was 
> implemented in #55537 and benchmarks have shown pretty good results. Doing a 
> POC in the scheduler isn't that easy, so if we can first expose the ideas to 
> the community and get some feedback, it's a good option to start. I've also 
> seen many users suffer from starvation, and I believe it will become a much 
> bigger problem in the near future as pipelines will grow in size, and mapped 
> tasks will be (ab)used even more intensively.
> 
> 
> On Sunday, September 28th, 2025 at 10:34 PM, Jens Scheffler 
> [email protected] wrote:
> 
> > Hi,
> > 
> > oh, two large / related discussions emails.The discussio here is already
> > very fragmented, a lot of ideas have been sketched and it is a very long
> > read to get the context and history. And concepts rotated several times.
> > So also here I'd propose to write an AIP with ideas of potential options
> > and pro/con, measurements, PoC results to individually discuss. When the
> > solution space settles then we can decide.
> > 
> > Regarding lazy creation of tasks I assume this would be a very complex
> > internal change. Because many API and UI functions assume all relevant
> > tasks are existing in the DB. If they are lazy created we would need to
> > check a lot of code how it reacts if data is missing.
> > 
> > I understand that scheduling is easier if tasks need not be assessed but
> > then you have the complexity one step later: Whenenver a task is
> > finished (success, failed, given-up) the scheduler would need to
> > calculate and create the successor tasks as INSERT. I am not sure
> > whether this (lazy) creation then saves more work compared.
> > 
> > The effort that can be saved might be the list of candidates but also
> > the candidates must be elaborated. Would it be an option to add another
> > state for candidates that are blocked by limits? Like
> > "scheduled/predecessors make then ready" and "scheduled/other
> > constraints throttle" (like limits on pools, tasks, runs...) - then
> > these buckets could be queried directly with constraints?
> > 
> > One thing that needs to be considered and is maximum complexity in
> > scheduling logic is the mapped-task-group way of scheduling as all
> > parallel path's are scheduled in parallel and even on operator level
> > caching is hardly to be achieved.
> > 
> > Caching Dags in memory also need to be considered careful. Depending on
> > the size also you can run into memory limitations with many many Dags.
> > 
> > One option might be to prepare the data for scheduling decisions in a
> > dedicated data structure like a table in the DB such that quering is
> > fast. With the risk of redundancy and inconsistencies. But this would
> > also require a bit of thinking as an additional option. Assume you have
> > a dedicated table only for "schedulable work" and the scheduler would
> > not use the TI table for checking next tasks?
> > 
> > Anyway as redundantly in the other emails I assume it would be good to
> > have a write-up of different options along with requirements to be able
> > to check grade of fulfilment, see problems, pro and con and then decide.
> > 
> > Jens
> > 
> > On 28.09.25 20:11, asquator wrote:
> > 
> > > The idea proposed above is:
> > > "Don't even create tasks that can't be run due to concurrency limits 
> > > (except executor slots)"
> > > OR (slightly weaker):
> > > "Make concurrency limits (except executor slots) apply at the SCHEDULED 
> > > state except of QUEUED"
> > > 
> > > Let's reason logically about the worst cases and the bottlenecks here.
> > > 
> > > If we MUST maintain the per-task priorities as they're implemented today, 
> > > then in the worst case we MUST go over ALL the candidate tasks. That's 
> > > because WE MUST look from top to bottom every time, and we also WANT to 
> > > avoid starvation of tasks at the bottom, if the first batch can't run due 
> > > to concurrency limits. It follows that if we have global per-task 
> > > priorities as we do, in the worst case we WILL check every candidate task 
> > > (for mapped tasks we don't have to iterate over every mapped TI, as 
> > > looking at the operator itself is enough). As micro-batching (fetchmany) 
> > > is pretty slow when done in a loop, especially in python, we have the 
> > > following question:
> > > 
> > > Is holding ALL of the individual tasks (operators, not including mapped 
> > > duplicates) in memory allowed at once?
> > > 
> > > If we can give up the per-task priorities or make them weaker (I opened a 
> > > discussion for that), we can do round-robin scheduling and avoid holding 
> > > all at once in memory. Just do micro-batching across scheduler iterations 
> > > and be good. The concurrency limits will be checked when creating 
> > > tasks/transferring them to SCHEDULED, say per N DAG runs each time, while 
> > > every DAG run will have the `last_scheduling_decision` defined.
> > > 
> > > We'll probably want a strong priority limit one day, and defining it on 
> > > task level is not desired. Say it's defined on DAG run level, then we'll 
> > > have to actually subtract pool slots and other concurrency slots even if 
> > > tasks are not actually scheduled, when we descend to a lower priority 
> > > rank. The micro-batching on DAG run level looks appealing, as we can 
> > > store the DAG runs in a local cache per scheduler and avoid repetitive 
> > > queries to fetch the same DAG runs over and over again. The iterations 
> > > will be done in python, and state will be preserved between scheduler 
> > > iterations.
> > > 
> > > So, rewriting it in code demands some drastic changes in scheduler logic 
> > > and I believe they're possible, especially using the SCHEDULED state to 
> > > sieve out tasks that can't be run due to concurrency limit. The QUEUED 
> > > state will simply become a sieve for executor slots limits. Without this 
> > > drastic change, the sprocs will remain a second-to-none solution - we 
> > > can't both store thousands of TIs in the DB and expect to come up with a 
> > > python solution.
> > > 
> > > Caching DAG runs in memory means again that we HAVE TO store as many DAG 
> > > runs in state RUNNING between scheduler iterations so we avoid frequent 
> > > RTT queries, which are pretty painful, as we can examine thousands of DAG 
> > > runs in every iterations.
> > > 
> > > The question restated:
> > > How many concurrent active DAG runs should we prepare for, so we can 
> > > decide whether they can be stored in memory?
> > > 
> > > > Hello, I have thought of a way that might work, in order to both fix
> > > > starvation, and reduce complexity, while increasing maintainability.
> > > > 
> > > > The main issue that causes starvation is that we have scheduled task
> > > > instances which /cannot/ be ran, and filtering it dynamically with sql,
> > > > is not really possible without either, significantly hindering
> > > > performance, or introducing query and maintainability complexity, just
> > > > like the stores sql procedures do, though they do solve the issue, they
> > > > create a new one where we have 3 implementations, one for each sql
> > > > dialect, which in my opinion, is better to be avoided.
> > > > 
> > > > The proposed solution is that in the dagrun 'update_state' function
> > > > (which is part of a critical section) we only create tasks which can be
> > > > set to running, using the new fields added, and moving the heavy part
> > > > away from the task selection in the '_executable_tis_to_queued' method,
> > > > where we will only have to do one query, no loop, and only check
> > > > executor slots.
> > > > 
> > > > To achieve this, we can use the 'concurrency map' created in sql in the
> > > > pessimistic-task-selection-with-window-functions pr
> > > > 
> > > > (https://github.com/apache/airflow/pull/53492)
> > > > 
> > > > and use it to get simple and lightweight mappings, which will look like
> > > > so (sorted by priority_order and DR logical date):
> > > > 
> > > > -------------------------
> > > > 
> > > > {dag_id} | {dagrun_id} | {task_id} | currently running
> > > > mapped tasks (if any) | total running tis per dagrun | total
> > > > tis per task run | pool_slots
> > > > 
> > > > --------------------------
> > > > 
> > > > And a second key value pairs for pools:
> > > > 
> > > > {pool_id} | available_pool_slots
> > > > 
> > > > And now, after we have sorted these fields, we can select the top N
> > > > dagruns to have their state updated according to the order of the sorted
> > > > table above, and to the limit for dagruns to examine each loop, and
> > > > create task instances for those dagruns only, and we create as many
> > > > tasks as we can, by priority, as long as the concurrency limits are
> > > > preserved, this keeps the current behaviour, while examining more
> > > > prioritized operators first, and then moving to less prioritized ones,
> > > > we can also improve it by increasing the limit of dagruns to be examined
> > > > or introducing a 'last_scheduling_decision' sorting as well so that we
> > > > update the state of all dagruns in a round robin fashion rather than
> > > > only getting the most prioritized ones first, however, this does break
> > > > the current priority behaviour a little.
> > > > 
> > > > This means that instead of taking a lock on the pools table during the
> > > > task fetching, we take the lock on the pool while examining dagruns and
> > > > updating their state, and during task fetching we lock only the
> > > > 'task_instance' rows, meaning schedulers can now set tasks to running in
> > > > parallel.
> > > > 
> > > > There are a few other possible derivations of the same solution,
> > > > including, looking at each dagrun individually and sending a query per
> > > > dagrun, and thus, only locking the pool/pools the current dagrun's
> > > > operators are assigned to, allowing multiple schedulers to do the same
> > > > work on different dagruns as long as they are in different pools.
> > > > 
> > > > Another derivation could be to just go over all dagruns with no limit
> > > > until we cannot create new tasks instances that can be set to running
> > > > (according to the concurrency limits), which will change the sorting
> > > > order a little, where operator priority order comes AFTER dagrun logical
> > > > date.
> > > > 
> > > > In conclusion, this solution could be a viable option, it also
> > > > simplifies the task selection process a lot and can help remove a lot of
> > > > code, while making it easier to maintain, and using a simpler algorithm,
> > > > and it also changes the semantic of the scheduled' state of the
> > > > task_instance, where now, a 'scheduled' ti, can be sent to the executor
> > > > immediately, and so tasks will not be in the 'scheduled' state for long.
> > > > 
> > > > This proposed solution is, in my opinion the simplest solution, and most
> > > > maintainable one, though I might be biased, as I have explored multiple
> > > > solutions with Asquator, and I have come to the conclusion that solving
> > > > it using SQL is not a viable option, I have yet to open the PR to
> > > > airflow, and only have it locally on my computer as I am making a lot of
> > > > experimental commits, and a lot of things are changing very quickly,
> > > > once I have stabilized things a little, I will open the PR for anyone
> > > > who's interested.
> > > > 
> > > > I would appreciate any feedback or other ideas/propositions from the
> > > > airflow community, both on this solution and its derivations or on any
> > > > alternative solutions.
> > > > 
> > > > On 9/24/25 9:11 PM, asquator wrote:
> > > > 
> > > > > It is a complication, but it seems as we can't do any better and 
> > > > > remain scalable. In the end we want priorities enforced (mb not in 
> > > > > the way they're implemented today, but it's part of another talk), 
> > > > > and we don't know how many tasks we'll have to iterate over in 
> > > > > advance, so fetching them into python is a death sentence in some 
> > > > > situations (not joking, tried that with fetchmany and chunked 
> > > > > streaming, it was way too slow).
> > > > > 
> > > > > I actually thought of another optimization here:
> > > > > Instead of fetching the entire TI relation, we can ignore mapped 
> > > > > tasks and only fetch individual tasks (operators), expanding them on 
> > > > > the fly into the maximum number of TIs that can be created. And yet 
> > > > > this approach is not scalable, as some enormous backfill of a DAG 
> > > > > with just 10 tasks will make it fetch MBs of data every time. It's 
> > > > > very slow and loads the DB server with heavy network requests.
> > > > > 
> > > > > Well, it's not just about throughput, but starvation of tasks that 
> > > > > can't run for hours sometimes, and unfortunately we encounter this in 
> > > > > production very often.
> > > > > 
> > > > > On Wednesday, September 24th, 2025 at 3:10 AM, Matthew 
> > > > > [email protected] wrote:
> > > > > 
> > > > > > Hi,
> > > > > > This seems like a significant level of technical complication/debt 
> > > > > > relative
> > > > > > to even a 1.5x/2x gain (which as noted is only in certain 
> > > > > > workloads).
> > > > > > Given airflow scheduling code in general is not something one could
> > > > > > describe as simple, introducing large amounts of static code that 
> > > > > > lives in
> > > > > > stored procs seems unwise.If at all possible making this
> > > > > > interface pluggable and provided via provider would be the saner 
> > > > > > approach
> > > > > > in my opinion.
> > > > > > 
> > > > > > On Tue, Sep 23, 2025 at 11:16 AM [email protected] wrote:
> > > > > > 
> > > > > > > Hello,
> > > > > > > 
> > > > > > > A new approach utilizing stored SQL functions is proposed here as 
> > > > > > > a
> > > > > > > solution to unnecessary processing/starvation:
> > > > > > > 
> > > > > > > https://github.com/apache/airflow/pull/55537
> > > > > > > 
> > > > > > > Benchmarks show an actual improvement in queueing throughput, 
> > > > > > > around
> > > > > > > 1.5x-2x for the workloads tested.
> > > > > > > 
> > > > > > > Regarding DB server load, I wasn't able to note any difference so 
> > > > > > > far, we
> > > > > > > probably have to run heavier jobs to test that. Looks like a 
> > > > > > > smooth line to
> > > > > > > me.
> > > > > > > 
> > > > > > > ---------------------------------------------------------------------
> > > > > > > To unsubscribe, e-mail:[email protected]
> > > > > > > For additional commands, e-mail:[email protected]
> > > > > > > ---------------------------------------------------------------------
> > > > > > > To unsubscribe, e-mail:[email protected]
> > > > > > > For additional commands, e-mail:[email protected]
> > > > > > > ---------------------------------------------------------------------
> > > > > > > To unsubscribe, e-mail: [email protected]
> > > > > > > For additional commands, e-mail: [email protected]
> > 
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: [email protected]
> > For additional commands, e-mail: [email protected]

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to