cheery550 commented on issue #45636: URL: https://github.com/apache/airflow/issues/45636#issuecomment-2893121069
Inspired by #49160 , instead of query max_tis candidate tasks, the goal of `_executable_task_instances_to_queued` may be picking max_tis ready tasks. We may iterate the loop until reached some conditions, which are: 1. loop reached max_loop_count 2. executable_tis reached max_tis 3. task_instances_to_examine less than max_tis 4. not found new filters ```python # max_loop_count is to avoid infinite loops, default may be 5 or 10 max_loop_count = DEFAULT_MAX_ITER_LOOP_COUNT pools = Pool.slots_stats(lock_rows=True, session=session) # filter starved pools before loop query starved_pools = {pool_name for pool_name, stats in pools.items() if stats["open"] <= 0} # current concurrency_map contains: # dag_max_active_tasks_map # dag_run_active_tasks_map # task_concurrency_map # task_dagrun_concurrency_map concurrency_map = ConcurrencyMap() concurrency_map.load(session=session) # filter starved dags before loop query dagmodels = DagModel.get_active_dagmodels(session=session) starved_dags: set[str] = { dagmodel.dag_id for dagmodel in dagmodels if dagmodel.dag_id in concurrency_map.dag_max_active_tasks_map and dagmodel.max_active_tasks <= concurrency_map.dag_max_active_tasks_map[dagmodel.dag_id]} starved_tasks: set[tuple[str, str]] = set() starved_tasks_task_dagrun_concurrency: set[tuple[str, str, str]] = set() # in each loop, fill starved_xxx and filter tis by starved_xxx for loop_count in range(1,max_loop_count+1): # ...previous code ... # filter pools which has beed starved if starved_pools: query = query.where(TI.pool.not_in(starved_pools)) # filter dags which has beed starved if starved_dags: query = query.where(TI.dag_id.not_in(starved_dags)) # filter tasks which has beed starved if starved_tasks: query = query.where(tuple_(TI.dag_id, TI.task_id).not_in(starved_tasks)) if starved_tasks_task_dagrun_concurrency: query = query.where( tuple_(TI.dag_id, TI.run_id, TI.task_id).not_in(starved_tasks_task_dagrun_concurrency) ) # filter tis which has beed examined if executable_tis: query = query.where(TI.id.not_in({ti.id for ti in executable_tis})) # ...query task_instances_to_examine and fill starved_xxx and filter tis by starved_xxx... is_done = len(executable_tis) >= max_tis or len(task_instances_to_examine) < max_tis found_new_filters = ( len(starved_pools) > num_starved_pools or len(starved_dags) > num_starved_dags or len(starved_tasks) > num_starved_tasks or len(starved_tasks_task_dagrun_concurrency) > num_starved_tasks_task_dagrun_concurrency ) if is_done or not found_new_filters: break ``` The key point of this solution is iterate more loops until `executable_tis` reached `max_tis` (instead of at least one task instance is found). Since starved_xxx is filled in each loop, next loop will more efficiently find additional ready task by skip starved&examined task instance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org