cheery550 commented on issue #45636:
URL: https://github.com/apache/airflow/issues/45636#issuecomment-2893121069

   Inspired by #49160 , instead of query max_tis candidate tasks,  the goal of 
`_executable_task_instances_to_queued` may be picking max_tis ready tasks. We 
may iterate the loop until reached some conditions, which are:
   1. loop reached max_loop_count
   2. executable_tis reached max_tis
   3. task_instances_to_examine less than max_tis
   4. not found new filters
   
   ```python
   
   # max_loop_count is to avoid infinite loops, default may be 5 or 10
   max_loop_count = DEFAULT_MAX_ITER_LOOP_COUNT
   
   pools = Pool.slots_stats(lock_rows=True, session=session)
   # filter starved pools before loop query
   starved_pools = {pool_name for pool_name, stats in pools.items() if 
stats["open"] <= 0}
   
   # current concurrency_map contains:
   #    dag_max_active_tasks_map
   #    dag_run_active_tasks_map
   #    task_concurrency_map
   #    task_dagrun_concurrency_map
   concurrency_map = ConcurrencyMap()
   concurrency_map.load(session=session)
   
   # filter starved dags before loop query
   dagmodels = DagModel.get_active_dagmodels(session=session)
   starved_dags: set[str] = { dagmodel.dag_id for dagmodel in dagmodels
                               if dagmodel.dag_id in 
concurrency_map.dag_max_active_tasks_map
                               and  dagmodel.max_active_tasks <= 
concurrency_map.dag_max_active_tasks_map[dagmodel.dag_id]}
   
   starved_tasks: set[tuple[str, str]] = set()
   starved_tasks_task_dagrun_concurrency: set[tuple[str, str, str]] = set()
   
   # in each loop, fill starved_xxx and filter tis by starved_xxx
   for loop_count in range(1,max_loop_count+1):
   
       # ...previous code ...
   
       # filter pools which has beed starved
       if starved_pools:
           query = query.where(TI.pool.not_in(starved_pools))
   
       # filter dags which has beed starved
       if starved_dags:
           query = query.where(TI.dag_id.not_in(starved_dags))
   
       # filter tasks which has beed starved
       if starved_tasks:
           query = query.where(tuple_(TI.dag_id, 
TI.task_id).not_in(starved_tasks))
   
       if starved_tasks_task_dagrun_concurrency:
           query = query.where(
               tuple_(TI.dag_id, TI.run_id, 
TI.task_id).not_in(starved_tasks_task_dagrun_concurrency)
           )
   
       # filter tis which has beed examined
       if executable_tis:
           query = query.where(TI.id.not_in({ti.id for ti in executable_tis}))
   
       # ...query task_instances_to_examine and fill starved_xxx and filter tis 
by starved_xxx...
   
       is_done = len(executable_tis) >= max_tis or 
len(task_instances_to_examine) < max_tis
   
       found_new_filters = (
           len(starved_pools) > num_starved_pools
           or len(starved_dags) > num_starved_dags
           or len(starved_tasks) > num_starved_tasks
           or len(starved_tasks_task_dagrun_concurrency) > 
num_starved_tasks_task_dagrun_concurrency
       )
   
       if is_done or not found_new_filters:
           break
   
   ```
   
   The key point of this solution is iterate more loops until `executable_tis` 
reached `max_tis` (instead of at least one task instance is found). Since 
starved_xxx is filled in each loop, next loop will more efficiently find 
additional ready task by skip starved&examined task instance.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to