Asquator commented on code in PR #53492:
URL: https://github.com/apache/airflow/pull/53492#discussion_r2250135977
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -373,292 +377,171 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
# If the pools are full, there is no point doing anything!
# If _somehow_ the pool is overfull, don't let the limit go negative -
it breaks SQL
pool_slots_free = sum(max(0, pool["open"]) for pool in pools.values())
+ starved_pools = {pool_name for pool_name, stats in pools.items() if
stats["open"] <= 0}
+ starved_tasks: set[tuple[str, str]] = set()
if pool_slots_free == 0:
self.log.debug("All pools are full!")
return []
- max_tis = min(max_tis, pool_slots_free)
-
- starved_pools = {pool_name for pool_name, stats in pools.items() if
stats["open"] <= 0}
-
- # dag_id to # of running tasks and (dag_id, task_id) to # of running
tasks.
- concurrency_map = ConcurrencyMap()
- concurrency_map.load(session=session)
+ priority_order = [-TI.priority_weight, DR.logical_date, TI.map_index]
- # Number of tasks that cannot be scheduled because of no open slot in
pool
- num_starving_tasks_total = 0
-
- # dag and task ids that can't be queued because of concurrency limits
- starved_dags: set[str] = set()
- starved_tasks: set[tuple[str, str]] = set()
- starved_tasks_task_dagrun_concurrency: set[tuple[str, str, str]] =
set()
-
- pool_num_starving_tasks: dict[str, int] = Counter()
+ query = (
+ select(TI)
+ .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
+ .join(TI.dag_run)
+ .where(DR.state == DagRunState.RUNNING)
+ .join(TI.dag_model)
+ .where(~DM.is_paused)
+ .where(TI.state == TaskInstanceState.SCHEDULED)
+ .where(DM.bundle_name.is_not(None))
+ )
- for loop_count in itertools.count(start=1):
- num_starved_pools = len(starved_pools)
- num_starved_dags = len(starved_dags)
- num_starved_tasks = len(starved_tasks)
- num_starved_tasks_task_dagrun_concurrency =
len(starved_tasks_task_dagrun_concurrency)
+ @dataclass
+ class LimitWindowDescriptor:
Review Comment:
> I'd propose to make this rather a top level but private class and not a
class in the class.
I think the dynamic query building process should be taken outside the
scheduler, as it's just not wise to build the query every iteration from
scratch (even if the interpreter can optimize it, which I doubt). The query
should rather be a constant imported from elsewhere as it's a pretty static
object (after the injection of `max_tis`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]