Nataneljpwd commented on issue #49508: URL: https://github.com/apache/airflow/issues/49508#issuecomment-2877636652
I think I agree with what @Asquator mentioned, using windowing will solve this issue and fix most of the edge cases where dags are starved along with tasks which are starved or stuck on queued, Windowing could be a very good approach that will be able to go over all edge cases by each time selecting `max_tis` tasks to schedule if possible and if not, select the largest amount of tasks to schedule, however it might increase the time it takes to run the query, however, might not necessarily increase the duration in the critical section as we will send less queries to the database when encountering starving or tasks that we cannot schedule. I think that if written in sql (a rough draft, might contain mistakes) , the windowing will look something like this: ```sql select *, sum(task_instance.pool_slots) over ( partition by pool order by (-priority_weight /* to get tasks by priority */, updated_at /* to favor tasks which are longer in queue */, map_index /* to allow for mapped tasks as well to run in order */) ) as slots_taken count(task_instance.id) over ( partition by dag_run.id order by(/*same as before*/) ) as tasks_per_dag, (select count(task_instance.id) where task_instance.state="running" and dag_run.dag_id = task_instance.dag_id) as tasks_per_dag, where slots_taken <= pool.slots and tasks_per_dag <= dag.max_active_tasks - running_tasks_per_dag and task_instance.state="SCHEDULED" /*and max_ti_per_task_per_dagrun <= dag.max_ti_per_task_per_dagrun -- needs to be added to the database */ from dag join dag_run on dag_id join task_instance on dag_id join slot_pool as pool on task_instance.pool = pool.pool group by dag_run limit {{max_tis}} ``` This query is untested, and heavily inspired by the query @Asquator wrote in #49160 with small calculation fixes. For this to work, a configuration needs to be added to the airflow db to the `dag` model called `max_active_tis_per_dagrun` which exists after #29094 and does not exist in the database model, after that the query can be complete. Another quicker solution which could work is to change the prioritization to be probability based rather than strictly larger first, for example, get all priorities and give top N priorities at least 1 slot and then by relative weight distribute the task instances or give the top priority x slots, the next in line x / 2 + leftover and so on. The second solution could be implemented using counts and windows over priority as well, which will not make the query too heavy however might increase the query time, we could also think of a simple mathematical algorithm to do it for any fixed amount of priorities and implement it in sql while having almost no impact on performance. Just changing the sql query for the scheduler might work for most cases, however there might still be cases where starvation occues, the second solution should fix starvartion of dags and tasks better but it might not maximize the performance of the scheduled to queued loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org