Nataneljpwd commented on issue #49508:
URL: https://github.com/apache/airflow/issues/49508#issuecomment-2877636652

    I think I agree with what @Asquator mentioned, using windowing will solve 
this issue and fix most of the edge cases where dags are starved along with 
tasks which are starved or stuck on queued, Windowing could be a very good 
approach that will be able to go over all edge cases by each time selecting 
`max_tis` tasks to schedule if possible and if not, select the largest amount 
of tasks to schedule, however it might increase the time it takes to run the 
query, however, might not necessarily  increase the duration in the critical 
section as we will send less queries to the database when encountering starving 
or tasks that we cannot schedule.
   
   I think that if written in sql (a rough draft, might contain mistakes) , the 
windowing will look something like this:
   
   ```sql
   select *,
   
   sum(task_instance.pool_slots) over (
     partition by pool order by (-priority_weight /* to get tasks by priority 
*/, updated_at /* to favor tasks which are longer in queue */, map_index /* to 
allow for mapped tasks as well to run in order */)
     ) as slots_taken
   
   count(task_instance.id) over (
     partition by dag_run.id order by(/*same as before*/)
   ) as tasks_per_dag,
   
   (select count(task_instance.id) where task_instance.state="running" and 
dag_run.dag_id = task_instance.dag_id) as tasks_per_dag,
   
   where slots_taken <= pool.slots and
   tasks_per_dag <= dag.max_active_tasks - running_tasks_per_dag and
   task_instance.state="SCHEDULED" /*and 
   max_ti_per_task_per_dagrun <= dag.max_ti_per_task_per_dagrun -- needs to be 
added to the database
   */
   
   from dag join dag_run on dag_id
   join task_instance on dag_id
   join slot_pool as pool on task_instance.pool = pool.pool
   
   group by dag_run
   
   limit {{max_tis}}    
   ```
   
   This query is untested, and heavily inspired by the query @Asquator wrote in 
#49160 with small calculation fixes.
   For this to work, a configuration needs to be added to the airflow db to the 
`dag` model called `max_active_tis_per_dagrun` which exists after #29094 and 
does not exist in the database model, after that the query can be complete.
   
   Another quicker solution which could work is to change the prioritization to 
be probability based rather than strictly larger first, for example, get all 
priorities and give top N priorities at least 1 slot and then by relative 
weight distribute the task instances or give the top priority x slots, the next 
in line x / 2 + leftover and so on.
   
   The second solution could be implemented using counts and windows over 
priority as well, which will not make the query too heavy however might 
increase the query time, we could also think of a simple mathematical algorithm 
to do it for any fixed amount of priorities and implement it in sql while 
having almost no impact on performance.
   
   Just changing the sql query for the scheduler might work for most cases, 
however there might still be cases where starvation occues, the second solution 
should fix starvartion of dags and tasks better but it might not maximize the 
performance of the scheduled to queued loop.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to