Hey,

> Regarding lazy creation of tasks I assume this would be a very complex
internal change.

Absolutely, I agree. Especially about the UI and the BI.

> I understand that scheduling is easier if tasks need not be assessed but
then you have the complexity one step later: Whenenver a task is
finished (success, failed, given-up) the scheduler would need to
calculate and create the successor tasks as INSERT. I am not sure
whether this (lazy) creation then saves more work compared.

Currently we just mark the successor task as SCHEDULED, sounds like purely 
doing a different operation.

> Would it be an option to add another
state for candidates that are blocked by limits? Like
"scheduled/predecessors make then ready" and "scheduled/other
constraints throttle" (like limits on pools, tasks, runs...) - then
these buckets could be queried directly with constraints?

The variety of states will probably just complicate the matters. I also don't 
think that new states will help here, as validating all the limits (we have 
4+1) isn't that hard, and tasks can easily be blocked by several limits. The 
situation may change very rapidly and tracking these transitions will be a pain.

> One thing that needs to be considered and is maximum complexity in
scheduling logic is the mapped-task-group way of scheduling as all
parallel path's are scheduled in parallel and even on operator level
caching is hardly to be achieved.

I think caching on operator level should be enough as all the limit and 
priority information for mapped TIs of the same task is contained in the 
operator itself. As we scheduled mapped TIs from first to last, we can store an 
in-memory index of the last scheduled TI in the batch. If it's not in cache, a 
query can be fired to compute the diff. That's why I assumed that the amount of 
memory should be enough to store the operators of running dags (of the correct 
versions).

> Caching Dags in memory also need to be considered careful. Depending on
the size also you can run into memory limitations with many many Dags.

Exactly, I think it's the top level concern here. If we're able decide that the 
amount of running DAGs shouldn't be large enough, or just introduce an 
incredibly hard-to-achieve limit (say 1000000) + LRU eviction, things may 
become easier to solve. If not - well, we're having the same problem. Only 
stored procedures will help here as data in the DB is best treated... by the DB.

> One option might be to prepare the data for scheduling decisions in a
dedicated data structure like a table in the DB such that quering is
fast.  [...]   Assume you have
a dedicated table only for "schedulable work" and the scheduler would
not use the TI table for checking next tasks?

I don't see how it's better than querying the TI table itself. Well, I actually 
tested the following approach:
1. Run the current CS query
2. Rerun it until `max_tis` tasks are found or all of them are checked
3. Filter out the `starved_` entities and save the tasks fit to be checked 
(IDs) into a temp table `task_sieve`.

Basically the solution here:
[https://github.com/apache/airflow/issues/45636#issuecomment-2592113479], but 
involving a temp table.
It was slow as reinserting thousands/millions of IDs into the same table 
multiple times is a grind.

Regarding preserving information between iterations - maybe... But again we 
lock ourselves to the DB, and it's hard to do without sprocs. Again, query time 
isn't the main concern. The technological problem is how to check all the 
concurrency limits on possibly a large amount of TIs stored in the DB. 

> Anyway as redundantly in the other emails I assume it would be good to
have a write-up of different options along with requirements

We will be there one day. We've tried several approaches that fit into the 
current critical section, and the idea of trying something else is new, so we 
just outline possible strategies. So far only one successful POC was 
implemented in #55537 and benchmarks have shown pretty good results. Doing a 
POC in the scheduler isn't that easy, so if we can first expose the ideas to 
the community and get some feedback, it's a good option to start. I've also 
seen many users suffer from starvation, and I believe it will become a much 
bigger problem in the near future as pipelines will grow in size, and mapped 
tasks will be (ab)used even more intensively.


On Sunday, September 28th, 2025 at 10:34 PM, Jens Scheffler 
<[email protected]> wrote:

> Hi,
> 
> oh, two large / related discussions emails.The discussio here is already
> very fragmented, a lot of ideas have been sketched and it is a very long
> read to get the context and history. And concepts rotated several times.
> So also here I'd propose to write an AIP with ideas of potential options
> and pro/con, measurements, PoC results to individually discuss. When the
> solution space settles then we can decide.
> 
> Regarding lazy creation of tasks I assume this would be a very complex
> internal change. Because many API and UI functions assume all relevant
> tasks are existing in the DB. If they are lazy created we would need to
> check a lot of code how it reacts if data is missing.
> 
> I understand that scheduling is easier if tasks need not be assessed but
> then you have the complexity one step later: Whenenver a task is
> finished (success, failed, given-up) the scheduler would need to
> calculate and create the successor tasks as INSERT. I am not sure
> whether this (lazy) creation then saves more work compared.
> 
> The effort that can be saved might be the list of candidates but also
> the candidates must be elaborated. Would it be an option to add another
> state for candidates that are blocked by limits? Like
> "scheduled/predecessors make then ready" and "scheduled/other
> constraints throttle" (like limits on pools, tasks, runs...) - then
> these buckets could be queried directly with constraints?
> 
> One thing that needs to be considered and is maximum complexity in
> scheduling logic is the mapped-task-group way of scheduling as all
> parallel path's are scheduled in parallel and even on operator level
> caching is hardly to be achieved.
> 
> Caching Dags in memory also need to be considered careful. Depending on
> the size also you can run into memory limitations with many many Dags.
> 
> One option might be to prepare the data for scheduling decisions in a
> dedicated data structure like a table in the DB such that quering is
> fast. With the risk of redundancy and inconsistencies. But this would
> also require a bit of thinking as an additional option. Assume you have
> a dedicated table only for "schedulable work" and the scheduler would
> not use the TI table for checking next tasks?
> 
> Anyway as redundantly in the other emails I assume it would be good to
> have a write-up of different options along with requirements to be able
> to check grade of fulfilment, see problems, pro and con and then decide.
> 
> Jens
> 
> On 28.09.25 20:11, asquator wrote:
> 
> > The idea proposed above is:
> > "Don't even create tasks that can't be run due to concurrency limits 
> > (except executor slots)"
> > OR (slightly weaker):
> > "Make concurrency limits (except executor slots) apply at the SCHEDULED 
> > state except of QUEUED"
> > 
> > Let's reason logically about the worst cases and the bottlenecks here.
> > 
> > If we MUST maintain the per-task priorities as they're implemented today, 
> > then in the worst case we MUST go over ALL the candidate tasks. That's 
> > because WE MUST look from top to bottom every time, and we also WANT to 
> > avoid starvation of tasks at the bottom, if the first batch can't run due 
> > to concurrency limits. It follows that if we have global per-task 
> > priorities as we do, in the worst case we WILL check every candidate task 
> > (for mapped tasks we don't have to iterate over every mapped TI, as looking 
> > at the operator itself is enough). As micro-batching (fetchmany) is pretty 
> > slow when done in a loop, especially in python, we have the following 
> > question:
> > 
> > Is holding ALL of the individual tasks (operators, not including mapped 
> > duplicates) in memory allowed at once?
> > 
> > If we can give up the per-task priorities or make them weaker (I opened a 
> > discussion for that), we can do round-robin scheduling and avoid holding 
> > all at once in memory. Just do micro-batching across scheduler iterations 
> > and be good. The concurrency limits will be checked when creating 
> > tasks/transferring them to SCHEDULED, say per N DAG runs each time, while 
> > every DAG run will have the `last_scheduling_decision` defined.
> > 
> > We'll probably want a strong priority limit one day, and defining it on 
> > task level is not desired. Say it's defined on DAG run level, then we'll 
> > have to actually subtract pool slots and other concurrency slots even if 
> > tasks are not actually scheduled, when we descend to a lower priority rank. 
> > The micro-batching on DAG run level looks appealing, as we can store the 
> > DAG runs in a local cache per scheduler and avoid repetitive queries to 
> > fetch the same DAG runs over and over again. The iterations will be done in 
> > python, and state will be preserved between scheduler iterations.
> > 
> > So, rewriting it in code demands some drastic changes in scheduler logic 
> > and I believe they're possible, especially using the SCHEDULED state to 
> > sieve out tasks that can't be run due to concurrency limit. The QUEUED 
> > state will simply become a sieve for executor slots limits. Without this 
> > drastic change, the sprocs will remain a second-to-none solution - we can't 
> > both store thousands of TIs in the DB and expect to come up with a python 
> > solution.
> > 
> > Caching DAG runs in memory means again that we HAVE TO store as many DAG 
> > runs in state RUNNING between scheduler iterations so we avoid frequent RTT 
> > queries, which are pretty painful, as we can examine thousands of DAG runs 
> > in every iterations.
> > 
> > The question restated:
> > How many concurrent active DAG runs should we prepare for, so we can decide 
> > whether they can be stored in memory?
> > 
> > > Hello, I have thought of a way that might work, in order to both fix
> > > starvation, and reduce complexity, while increasing maintainability.
> > > 
> > > The main issue that causes starvation is that we have scheduled task
> > > instances which /cannot/ be ran, and filtering it dynamically with sql,
> > > is not really possible without either, significantly hindering
> > > performance, or introducing query and maintainability complexity, just
> > > like the stores sql procedures do, though they do solve the issue, they
> > > create a new one where we have 3 implementations, one for each sql
> > > dialect, which in my opinion, is better to be avoided.
> > > 
> > > The proposed solution is that in the dagrun 'update_state' function
> > > (which is part of a critical section) we only create tasks which can be
> > > set to running, using the new fields added, and moving the heavy part
> > > away from the task selection in the '_executable_tis_to_queued' method,
> > > where we will only have to do one query, no loop, and only check
> > > executor slots.
> > > 
> > > To achieve this, we can use the 'concurrency map' created in sql in the
> > > pessimistic-task-selection-with-window-functions pr
> > > 
> > > (https://github.com/apache/airflow/pull/53492)
> > > 
> > > and use it to get simple and lightweight mappings, which will look like
> > > so (sorted by priority_order and DR logical date):
> > > 
> > > -------------------------
> > > 
> > > {dag_id} | {dagrun_id} | {task_id} | currently running
> > > mapped tasks (if any) | total running tis per dagrun | total
> > > tis per task run | pool_slots
> > > 
> > > --------------------------
> > > 
> > > And a second key value pairs for pools:
> > > 
> > > {pool_id} | available_pool_slots
> > > 
> > > And now, after we have sorted these fields, we can select the top N
> > > dagruns to have their state updated according to the order of the sorted
> > > table above, and to the limit for dagruns to examine each loop, and
> > > create task instances for those dagruns only, and we create as many
> > > tasks as we can, by priority, as long as the concurrency limits are
> > > preserved, this keeps the current behaviour, while examining more
> > > prioritized operators first, and then moving to less prioritized ones,
> > > we can also improve it by increasing the limit of dagruns to be examined
> > > or introducing a 'last_scheduling_decision' sorting as well so that we
> > > update the state of all dagruns in a round robin fashion rather than
> > > only getting the most prioritized ones first, however, this does break
> > > the current priority behaviour a little.
> > > 
> > > This means that instead of taking a lock on the pools table during the
> > > task fetching, we take the lock on the pool while examining dagruns and
> > > updating their state, and during task fetching we lock only the
> > > 'task_instance' rows, meaning schedulers can now set tasks to running in
> > > parallel.
> > > 
> > > There are a few other possible derivations of the same solution,
> > > including, looking at each dagrun individually and sending a query per
> > > dagrun, and thus, only locking the pool/pools the current dagrun's
> > > operators are assigned to, allowing multiple schedulers to do the same
> > > work on different dagruns as long as they are in different pools.
> > > 
> > > Another derivation could be to just go over all dagruns with no limit
> > > until we cannot create new tasks instances that can be set to running
> > > (according to the concurrency limits), which will change the sorting
> > > order a little, where operator priority order comes AFTER dagrun logical
> > > date.
> > > 
> > > In conclusion, this solution could be a viable option, it also
> > > simplifies the task selection process a lot and can help remove a lot of
> > > code, while making it easier to maintain, and using a simpler algorithm,
> > > and it also changes the semantic of the scheduled' state of the
> > > task_instance, where now, a 'scheduled' ti, can be sent to the executor
> > > immediately, and so tasks will not be in the 'scheduled' state for long.
> > > 
> > > This proposed solution is, in my opinion the simplest solution, and most
> > > maintainable one, though I might be biased, as I have explored multiple
> > > solutions with Asquator, and I have come to the conclusion that solving
> > > it using SQL is not a viable option, I have yet to open the PR to
> > > airflow, and only have it locally on my computer as I am making a lot of
> > > experimental commits, and a lot of things are changing very quickly,
> > > once I have stabilized things a little, I will open the PR for anyone
> > > who's interested.
> > > 
> > > I would appreciate any feedback or other ideas/propositions from the
> > > airflow community, both on this solution and its derivations or on any
> > > alternative solutions.
> > > 
> > > On 9/24/25 9:11 PM, asquator wrote:
> > > 
> > > > It is a complication, but it seems as we can't do any better and remain 
> > > > scalable. In the end we want priorities enforced (mb not in the way 
> > > > they're implemented today, but it's part of another talk), and we don't 
> > > > know how many tasks we'll have to iterate over in advance, so fetching 
> > > > them into python is a death sentence in some situations (not joking, 
> > > > tried that with fetchmany and chunked streaming, it was way too slow).
> > > > 
> > > > I actually thought of another optimization here:
> > > > Instead of fetching the entire TI relation, we can ignore mapped tasks 
> > > > and only fetch individual tasks (operators), expanding them on the fly 
> > > > into the maximum number of TIs that can be created. And yet this 
> > > > approach is not scalable, as some enormous backfill of a DAG with just 
> > > > 10 tasks will make it fetch MBs of data every time. It's very slow and 
> > > > loads the DB server with heavy network requests.
> > > > 
> > > > Well, it's not just about throughput, but starvation of tasks that 
> > > > can't run for hours sometimes, and unfortunately we encounter this in 
> > > > production very often.
> > > > 
> > > > On Wednesday, September 24th, 2025 at 3:10 AM, Matthew 
> > > > [email protected] wrote:
> > > > 
> > > > > Hi,
> > > > > This seems like a significant level of technical complication/debt 
> > > > > relative
> > > > > to even a 1.5x/2x gain (which as noted is only in certain workloads).
> > > > > Given airflow scheduling code in general is not something one could
> > > > > describe as simple, introducing large amounts of static code that 
> > > > > lives in
> > > > > stored procs seems unwise.If at all possible making this
> > > > > interface pluggable and provided via provider would be the saner 
> > > > > approach
> > > > > in my opinion.
> > > > > 
> > > > > On Tue, Sep 23, 2025 at 11:16 AM [email protected] wrote:
> > > > > 
> > > > > > Hello,
> > > > > > 
> > > > > > A new approach utilizing stored SQL functions is proposed here as a
> > > > > > solution to unnecessary processing/starvation:
> > > > > > 
> > > > > > https://github.com/apache/airflow/pull/55537
> > > > > > 
> > > > > > Benchmarks show an actual improvement in queueing throughput, around
> > > > > > 1.5x-2x for the workloads tested.
> > > > > > 
> > > > > > Regarding DB server load, I wasn't able to note any difference so 
> > > > > > far, we
> > > > > > probably have to run heavier jobs to test that. Looks like a smooth 
> > > > > > line to
> > > > > > me.
> > > > > > 
> > > > > > ---------------------------------------------------------------------
> > > > > > To unsubscribe, e-mail:[email protected]
> > > > > > For additional commands, e-mail:[email protected]
> > > > > > ---------------------------------------------------------------------
> > > > > > To unsubscribe, e-mail:[email protected]
> > > > > > For additional commands, e-mail:[email protected]
> > > > > > ---------------------------------------------------------------------
> > > > > > To unsubscribe, e-mail: [email protected]
> > > > > > For additional commands, e-mail: [email protected]
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to