cheery550 commented on issue #45636:
URL: https://github.com/apache/airflow/issues/45636#issuecomment-2893121069
Inspired by #49160 , instead of query max_tis candidate tasks, the goal of
`_executable_task_instances_to_queued` may be picking max_tis ready tasks. We
may iterate the loop until reached some conditions, which are:
1. loop reached max_loop_count
2. executable_tis reached max_tis
3. task_instances_to_examine less than max_tis
4. not found new filters
```python
# max_loop_count is to avoid infinite loops, default may be 5 or 10
max_loop_count = DEFAULT_MAX_ITER_LOOP_COUNT
pools = Pool.slots_stats(lock_rows=True, session=session)
# filter starved pools before loop query
starved_pools = {pool_name for pool_name, stats in pools.items() if
stats["open"] <= 0}
# current concurrency_map contains:
# dag_max_active_tasks_map
# dag_run_active_tasks_map
# task_concurrency_map
# task_dagrun_concurrency_map
concurrency_map = ConcurrencyMap()
concurrency_map.load(session=session)
# filter starved dags before loop query
dagmodels = DagModel.get_active_dagmodels(session=session)
starved_dags: set[str] = { dagmodel.dag_id for dagmodel in dagmodels
if dagmodel.dag_id in
concurrency_map.dag_max_active_tasks_map
and dagmodel.max_active_tasks <=
concurrency_map.dag_max_active_tasks_map[dagmodel.dag_id]}
starved_tasks: set[tuple[str, str]] = set()
starved_tasks_task_dagrun_concurrency: set[tuple[str, str, str]] = set()
# in each loop, fill starved_xxx and filter tis by starved_xxx
for loop_count in range(1,max_loop_count+1):
# ...previous code ...
# filter pools which has beed starved
if starved_pools:
query = query.where(TI.pool.not_in(starved_pools))
# filter dags which has beed starved
if starved_dags:
query = query.where(TI.dag_id.not_in(starved_dags))
# filter tasks which has beed starved
if starved_tasks:
query = query.where(tuple_(TI.dag_id,
TI.task_id).not_in(starved_tasks))
if starved_tasks_task_dagrun_concurrency:
query = query.where(
tuple_(TI.dag_id, TI.run_id,
TI.task_id).not_in(starved_tasks_task_dagrun_concurrency)
)
# filter tis which has beed examined
if executable_tis:
query = query.where(TI.id.not_in({ti.id for ti in executable_tis}))
# ...query task_instances_to_examine and fill starved_xxx and filter tis
by starved_xxx...
is_done = len(executable_tis) >= max_tis or
len(task_instances_to_examine) < max_tis
found_new_filters = (
len(starved_pools) > num_starved_pools
or len(starved_dags) > num_starved_dags
or len(starved_tasks) > num_starved_tasks
or len(starved_tasks_task_dagrun_concurrency) >
num_starved_tasks_task_dagrun_concurrency
)
if is_done or not found_new_filters:
break
```
The key point of this solution is iterate more loops until `executable_tis`
reached `max_tis` (instead of at least one task instance is found). Since
starved_xxx is filled in each loop, next loop will more efficiently find
additional ready task by skip starved&examined task instance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]