I've recently rewrote the AIP-100 to deal with eliminating starvation in Airflow scheduler, and documented all the strategies ever considered as a replacement for the current task queue selector:
https://cwiki.apache.org/confluence/display/AIRFLOW/%5BDRAFT%5D+AIP-100+Eliminate+Scheduler+Starvation+On+Concurrency+Limits Hopefully, the document will help us stay focused on the possible alternatives and lead to a fruitful discussion. On Sunday, September 28th, 2025 at 11:21 PM, asquator <[email protected]> wrote: > > > Hey, > > > Regarding lazy creation of tasks I assume this would be a very complex > > internal change. > > Absolutely, I agree. Especially about the UI and the BI. > > > I understand that scheduling is easier if tasks need not be assessed but > > then you have the complexity one step later: Whenenver a task is > finished (success, failed, given-up) the scheduler would need to > calculate and create the successor tasks as INSERT. I am not sure > whether this (lazy) creation then saves more work compared. > > Currently we just mark the successor task as SCHEDULED, sounds like purely > doing a different operation. > > > Would it be an option to add another > > state for candidates that are blocked by limits? Like > "scheduled/predecessors make then ready" and "scheduled/other > constraints throttle" (like limits on pools, tasks, runs...) - then > these buckets could be queried directly with constraints? > > The variety of states will probably just complicate the matters. I also don't > think that new states will help here, as validating all the limits (we have > 4+1) isn't that hard, and tasks can easily be blocked by several limits. The > situation may change very rapidly and tracking these transitions will be a > pain. > > > One thing that needs to be considered and is maximum complexity in > > scheduling logic is the mapped-task-group way of scheduling as all > parallel path's are scheduled in parallel and even on operator level > caching is hardly to be achieved. > > I think caching on operator level should be enough as all the limit and > priority information for mapped TIs of the same task is contained in the > operator itself. As we scheduled mapped TIs from first to last, we can store > an in-memory index of the last scheduled TI in the batch. If it's not in > cache, a query can be fired to compute the diff. That's why I assumed that > the amount of memory should be enough to store the operators of running dags > (of the correct versions). > > > Caching Dags in memory also need to be considered careful. Depending on > > the size also you can run into memory limitations with many many Dags. > > Exactly, I think it's the top level concern here. If we're able decide that > the amount of running DAGs shouldn't be large enough, or just introduce an > incredibly hard-to-achieve limit (say 1000000) + LRU eviction, things may > become easier to solve. If not - well, we're having the same problem. Only > stored procedures will help here as data in the DB is best treated... by the > DB. > > > One option might be to prepare the data for scheduling decisions in a > > dedicated data structure like a table in the DB such that quering is > fast. [...] Assume you have > a dedicated table only for "schedulable work" and the scheduler would > not use the TI table for checking next tasks? > > I don't see how it's better than querying the TI table itself. Well, I > actually tested the following approach: > 1. Run the current CS query > 2. Rerun it until `max_tis` tasks are found or all of them are checked > 3. Filter out the `starved_` entities and save the tasks fit to be checked > (IDs) into a temp table `task_sieve`. > > Basically the solution here: > [https://github.com/apache/airflow/issues/45636#issuecomment-2592113479], but > involving a temp table. > It was slow as reinserting thousands/millions of IDs into the same table > multiple times is a grind. > > Regarding preserving information between iterations - maybe... But again we > lock ourselves to the DB, and it's hard to do without sprocs. Again, query > time isn't the main concern. The technological problem is how to check all > the concurrency limits on possibly a large amount of TIs stored in the DB. > > > Anyway as redundantly in the other emails I assume it would be good to > > have a write-up of different options along with requirements > > We will be there one day. We've tried several approaches that fit into the > current critical section, and the idea of trying something else is new, so we > just outline possible strategies. So far only one successful POC was > implemented in #55537 and benchmarks have shown pretty good results. Doing a > POC in the scheduler isn't that easy, so if we can first expose the ideas to > the community and get some feedback, it's a good option to start. I've also > seen many users suffer from starvation, and I believe it will become a much > bigger problem in the near future as pipelines will grow in size, and mapped > tasks will be (ab)used even more intensively. > > > On Sunday, September 28th, 2025 at 10:34 PM, Jens Scheffler > [email protected] wrote: > > > Hi, > > > > oh, two large / related discussions emails.The discussio here is already > > very fragmented, a lot of ideas have been sketched and it is a very long > > read to get the context and history. And concepts rotated several times. > > So also here I'd propose to write an AIP with ideas of potential options > > and pro/con, measurements, PoC results to individually discuss. When the > > solution space settles then we can decide. > > > > Regarding lazy creation of tasks I assume this would be a very complex > > internal change. Because many API and UI functions assume all relevant > > tasks are existing in the DB. If they are lazy created we would need to > > check a lot of code how it reacts if data is missing. > > > > I understand that scheduling is easier if tasks need not be assessed but > > then you have the complexity one step later: Whenenver a task is > > finished (success, failed, given-up) the scheduler would need to > > calculate and create the successor tasks as INSERT. I am not sure > > whether this (lazy) creation then saves more work compared. > > > > The effort that can be saved might be the list of candidates but also > > the candidates must be elaborated. Would it be an option to add another > > state for candidates that are blocked by limits? Like > > "scheduled/predecessors make then ready" and "scheduled/other > > constraints throttle" (like limits on pools, tasks, runs...) - then > > these buckets could be queried directly with constraints? > > > > One thing that needs to be considered and is maximum complexity in > > scheduling logic is the mapped-task-group way of scheduling as all > > parallel path's are scheduled in parallel and even on operator level > > caching is hardly to be achieved. > > > > Caching Dags in memory also need to be considered careful. Depending on > > the size also you can run into memory limitations with many many Dags. > > > > One option might be to prepare the data for scheduling decisions in a > > dedicated data structure like a table in the DB such that quering is > > fast. With the risk of redundancy and inconsistencies. But this would > > also require a bit of thinking as an additional option. Assume you have > > a dedicated table only for "schedulable work" and the scheduler would > > not use the TI table for checking next tasks? > > > > Anyway as redundantly in the other emails I assume it would be good to > > have a write-up of different options along with requirements to be able > > to check grade of fulfilment, see problems, pro and con and then decide. > > > > Jens > > > > On 28.09.25 20:11, asquator wrote: > > > > > The idea proposed above is: > > > "Don't even create tasks that can't be run due to concurrency limits > > > (except executor slots)" > > > OR (slightly weaker): > > > "Make concurrency limits (except executor slots) apply at the SCHEDULED > > > state except of QUEUED" > > > > > > Let's reason logically about the worst cases and the bottlenecks here. > > > > > > If we MUST maintain the per-task priorities as they're implemented today, > > > then in the worst case we MUST go over ALL the candidate tasks. That's > > > because WE MUST look from top to bottom every time, and we also WANT to > > > avoid starvation of tasks at the bottom, if the first batch can't run due > > > to concurrency limits. It follows that if we have global per-task > > > priorities as we do, in the worst case we WILL check every candidate task > > > (for mapped tasks we don't have to iterate over every mapped TI, as > > > looking at the operator itself is enough). As micro-batching (fetchmany) > > > is pretty slow when done in a loop, especially in python, we have the > > > following question: > > > > > > Is holding ALL of the individual tasks (operators, not including mapped > > > duplicates) in memory allowed at once? > > > > > > If we can give up the per-task priorities or make them weaker (I opened a > > > discussion for that), we can do round-robin scheduling and avoid holding > > > all at once in memory. Just do micro-batching across scheduler iterations > > > and be good. The concurrency limits will be checked when creating > > > tasks/transferring them to SCHEDULED, say per N DAG runs each time, while > > > every DAG run will have the `last_scheduling_decision` defined. > > > > > > We'll probably want a strong priority limit one day, and defining it on > > > task level is not desired. Say it's defined on DAG run level, then we'll > > > have to actually subtract pool slots and other concurrency slots even if > > > tasks are not actually scheduled, when we descend to a lower priority > > > rank. The micro-batching on DAG run level looks appealing, as we can > > > store the DAG runs in a local cache per scheduler and avoid repetitive > > > queries to fetch the same DAG runs over and over again. The iterations > > > will be done in python, and state will be preserved between scheduler > > > iterations. > > > > > > So, rewriting it in code demands some drastic changes in scheduler logic > > > and I believe they're possible, especially using the SCHEDULED state to > > > sieve out tasks that can't be run due to concurrency limit. The QUEUED > > > state will simply become a sieve for executor slots limits. Without this > > > drastic change, the sprocs will remain a second-to-none solution - we > > > can't both store thousands of TIs in the DB and expect to come up with a > > > python solution. > > > > > > Caching DAG runs in memory means again that we HAVE TO store as many DAG > > > runs in state RUNNING between scheduler iterations so we avoid frequent > > > RTT queries, which are pretty painful, as we can examine thousands of DAG > > > runs in every iterations. > > > > > > The question restated: > > > How many concurrent active DAG runs should we prepare for, so we can > > > decide whether they can be stored in memory? > > > > > > > Hello, I have thought of a way that might work, in order to both fix > > > > starvation, and reduce complexity, while increasing maintainability. > > > > > > > > The main issue that causes starvation is that we have scheduled task > > > > instances which /cannot/ be ran, and filtering it dynamically with sql, > > > > is not really possible without either, significantly hindering > > > > performance, or introducing query and maintainability complexity, just > > > > like the stores sql procedures do, though they do solve the issue, they > > > > create a new one where we have 3 implementations, one for each sql > > > > dialect, which in my opinion, is better to be avoided. > > > > > > > > The proposed solution is that in the dagrun 'update_state' function > > > > (which is part of a critical section) we only create tasks which can be > > > > set to running, using the new fields added, and moving the heavy part > > > > away from the task selection in the '_executable_tis_to_queued' method, > > > > where we will only have to do one query, no loop, and only check > > > > executor slots. > > > > > > > > To achieve this, we can use the 'concurrency map' created in sql in the > > > > pessimistic-task-selection-with-window-functions pr > > > > > > > > (https://github.com/apache/airflow/pull/53492) > > > > > > > > and use it to get simple and lightweight mappings, which will look like > > > > so (sorted by priority_order and DR logical date): > > > > > > > > ------------------------- > > > > > > > > {dag_id} | {dagrun_id} | {task_id} | currently running > > > > mapped tasks (if any) | total running tis per dagrun | total > > > > tis per task run | pool_slots > > > > > > > > -------------------------- > > > > > > > > And a second key value pairs for pools: > > > > > > > > {pool_id} | available_pool_slots > > > > > > > > And now, after we have sorted these fields, we can select the top N > > > > dagruns to have their state updated according to the order of the sorted > > > > table above, and to the limit for dagruns to examine each loop, and > > > > create task instances for those dagruns only, and we create as many > > > > tasks as we can, by priority, as long as the concurrency limits are > > > > preserved, this keeps the current behaviour, while examining more > > > > prioritized operators first, and then moving to less prioritized ones, > > > > we can also improve it by increasing the limit of dagruns to be examined > > > > or introducing a 'last_scheduling_decision' sorting as well so that we > > > > update the state of all dagruns in a round robin fashion rather than > > > > only getting the most prioritized ones first, however, this does break > > > > the current priority behaviour a little. > > > > > > > > This means that instead of taking a lock on the pools table during the > > > > task fetching, we take the lock on the pool while examining dagruns and > > > > updating their state, and during task fetching we lock only the > > > > 'task_instance' rows, meaning schedulers can now set tasks to running in > > > > parallel. > > > > > > > > There are a few other possible derivations of the same solution, > > > > including, looking at each dagrun individually and sending a query per > > > > dagrun, and thus, only locking the pool/pools the current dagrun's > > > > operators are assigned to, allowing multiple schedulers to do the same > > > > work on different dagruns as long as they are in different pools. > > > > > > > > Another derivation could be to just go over all dagruns with no limit > > > > until we cannot create new tasks instances that can be set to running > > > > (according to the concurrency limits), which will change the sorting > > > > order a little, where operator priority order comes AFTER dagrun logical > > > > date. > > > > > > > > In conclusion, this solution could be a viable option, it also > > > > simplifies the task selection process a lot and can help remove a lot of > > > > code, while making it easier to maintain, and using a simpler algorithm, > > > > and it also changes the semantic of the scheduled' state of the > > > > task_instance, where now, a 'scheduled' ti, can be sent to the executor > > > > immediately, and so tasks will not be in the 'scheduled' state for long. > > > > > > > > This proposed solution is, in my opinion the simplest solution, and most > > > > maintainable one, though I might be biased, as I have explored multiple > > > > solutions with Asquator, and I have come to the conclusion that solving > > > > it using SQL is not a viable option, I have yet to open the PR to > > > > airflow, and only have it locally on my computer as I am making a lot of > > > > experimental commits, and a lot of things are changing very quickly, > > > > once I have stabilized things a little, I will open the PR for anyone > > > > who's interested. > > > > > > > > I would appreciate any feedback or other ideas/propositions from the > > > > airflow community, both on this solution and its derivations or on any > > > > alternative solutions. > > > > > > > > On 9/24/25 9:11 PM, asquator wrote: > > > > > > > > > It is a complication, but it seems as we can't do any better and > > > > > remain scalable. In the end we want priorities enforced (mb not in > > > > > the way they're implemented today, but it's part of another talk), > > > > > and we don't know how many tasks we'll have to iterate over in > > > > > advance, so fetching them into python is a death sentence in some > > > > > situations (not joking, tried that with fetchmany and chunked > > > > > streaming, it was way too slow). > > > > > > > > > > I actually thought of another optimization here: > > > > > Instead of fetching the entire TI relation, we can ignore mapped > > > > > tasks and only fetch individual tasks (operators), expanding them on > > > > > the fly into the maximum number of TIs that can be created. And yet > > > > > this approach is not scalable, as some enormous backfill of a DAG > > > > > with just 10 tasks will make it fetch MBs of data every time. It's > > > > > very slow and loads the DB server with heavy network requests. > > > > > > > > > > Well, it's not just about throughput, but starvation of tasks that > > > > > can't run for hours sometimes, and unfortunately we encounter this in > > > > > production very often. > > > > > > > > > > On Wednesday, September 24th, 2025 at 3:10 AM, Matthew > > > > > [email protected] wrote: > > > > > > > > > > > Hi, > > > > > > This seems like a significant level of technical complication/debt > > > > > > relative > > > > > > to even a 1.5x/2x gain (which as noted is only in certain > > > > > > workloads). > > > > > > Given airflow scheduling code in general is not something one could > > > > > > describe as simple, introducing large amounts of static code that > > > > > > lives in > > > > > > stored procs seems unwise.If at all possible making this > > > > > > interface pluggable and provided via provider would be the saner > > > > > > approach > > > > > > in my opinion. > > > > > > > > > > > > On Tue, Sep 23, 2025 at 11:16 AM [email protected] wrote: > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > A new approach utilizing stored SQL functions is proposed here as > > > > > > > a > > > > > > > solution to unnecessary processing/starvation: > > > > > > > > > > > > > > https://github.com/apache/airflow/pull/55537 > > > > > > > > > > > > > > Benchmarks show an actual improvement in queueing throughput, > > > > > > > around > > > > > > > 1.5x-2x for the workloads tested. > > > > > > > > > > > > > > Regarding DB server load, I wasn't able to note any difference so > > > > > > > far, we > > > > > > > probably have to run heavier jobs to test that. Looks like a > > > > > > > smooth line to > > > > > > > me. > > > > > > > > > > > > > > --------------------------------------------------------------------- > > > > > > > To unsubscribe, e-mail:[email protected] > > > > > > > For additional commands, e-mail:[email protected] > > > > > > > --------------------------------------------------------------------- > > > > > > > To unsubscribe, e-mail:[email protected] > > > > > > > For additional commands, e-mail:[email protected] > > > > > > > --------------------------------------------------------------------- > > > > > > > To unsubscribe, e-mail: [email protected] > > > > > > > For additional commands, e-mail: [email protected] > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: [email protected] > > For additional commands, e-mail: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
