Asquator commented on code in PR #53492: URL: https://github.com/apache/airflow/pull/53492#discussion_r2238900449
########## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ########## @@ -373,292 +375,165 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - # If the pools are full, there is no point doing anything! # If _somehow_ the pool is overfull, don't let the limit go negative - it breaks SQL pool_slots_free = sum(max(0, pool["open"]) for pool in pools.values()) + starved_pools = {pool_name for pool_name, stats in pools.items() if stats["open"] <= 0} + starved_tasks: set[tuple[str, str]] = set() if pool_slots_free == 0: self.log.debug("All pools are full!") return [] - max_tis = min(max_tis, pool_slots_free) - - starved_pools = {pool_name for pool_name, stats in pools.items() if stats["open"] <= 0} - - # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks. - concurrency_map = ConcurrencyMap() - concurrency_map.load(session=session) + priority_order = [-TI.priority_weight, DR.logical_date, TI.map_index] - # Number of tasks that cannot be scheduled because of no open slot in pool - num_starving_tasks_total = 0 - - # dag and task ids that can't be queued because of concurrency limits - starved_dags: set[str] = set() - starved_tasks: set[tuple[str, str]] = set() - starved_tasks_task_dagrun_concurrency: set[tuple[str, str, str]] = set() - - pool_num_starving_tasks: dict[str, int] = Counter() + query = ( + select(TI) + .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql") + .join(TI.dag_run) + .where(DR.state == DagRunState.RUNNING) + .join(TI.dag_model) + .where(~DM.is_paused) + .where(TI.state == TaskInstanceState.SCHEDULED) + .where(DM.bundle_name.is_not(None)) + .options(selectinload(TI.dag_model)) + ) - for loop_count in itertools.count(start=1): - num_starved_pools = len(starved_pools) - num_starved_dags = len(starved_dags) - num_starved_tasks = len(starved_tasks) - num_starved_tasks_task_dagrun_concurrency = len(starved_tasks_task_dagrun_concurrency) + @dataclass + class Limit: + running_now_join: Subquery + max_units: Column + window: expression.ColumnElement + + def running_tasks_group(*group_fields): Review Comment: Please add another suggestion to import CTE and I will commit them in batch ########## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ########## @@ -373,292 +375,165 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - # If the pools are full, there is no point doing anything! # If _somehow_ the pool is overfull, don't let the limit go negative - it breaks SQL pool_slots_free = sum(max(0, pool["open"]) for pool in pools.values()) + starved_pools = {pool_name for pool_name, stats in pools.items() if stats["open"] <= 0} + starved_tasks: set[tuple[str, str]] = set() if pool_slots_free == 0: self.log.debug("All pools are full!") return [] - max_tis = min(max_tis, pool_slots_free) - - starved_pools = {pool_name for pool_name, stats in pools.items() if stats["open"] <= 0} - - # dag_id to # of running tasks and (dag_id, task_id) to # of running tasks. - concurrency_map = ConcurrencyMap() - concurrency_map.load(session=session) + priority_order = [-TI.priority_weight, DR.logical_date, TI.map_index] - # Number of tasks that cannot be scheduled because of no open slot in pool - num_starving_tasks_total = 0 - - # dag and task ids that can't be queued because of concurrency limits - starved_dags: set[str] = set() - starved_tasks: set[tuple[str, str]] = set() - starved_tasks_task_dagrun_concurrency: set[tuple[str, str, str]] = set() - - pool_num_starving_tasks: dict[str, int] = Counter() + query = ( + select(TI) + .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql") + .join(TI.dag_run) + .where(DR.state == DagRunState.RUNNING) + .join(TI.dag_model) + .where(~DM.is_paused) + .where(TI.state == TaskInstanceState.SCHEDULED) + .where(DM.bundle_name.is_not(None)) + .options(selectinload(TI.dag_model)) + ) - for loop_count in itertools.count(start=1): - num_starved_pools = len(starved_pools) - num_starved_dags = len(starved_dags) - num_starved_tasks = len(starved_tasks) - num_starved_tasks_task_dagrun_concurrency = len(starved_tasks_task_dagrun_concurrency) + @dataclass + class Limit: + running_now_join: Subquery + max_units: Column + window: expression.ColumnElement + + def running_tasks_group(*group_fields): Review Comment: Please add another suggestion to import `CTE` and I will commit them in batch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org