Asquator commented on code in PR #53492:
URL: https://github.com/apache/airflow/pull/53492#discussion_r2238900449


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -373,292 +375,165 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
         # If the pools are full, there is no point doing anything!
         # If _somehow_ the pool is overfull, don't let the limit go negative - 
it breaks SQL
         pool_slots_free = sum(max(0, pool["open"]) for pool in pools.values())
+        starved_pools = {pool_name for pool_name, stats in pools.items() if 
stats["open"] <= 0}
+        starved_tasks: set[tuple[str, str]] = set()
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
             return []
 
-        max_tis = min(max_tis, pool_slots_free)
-
-        starved_pools = {pool_name for pool_name, stats in pools.items() if 
stats["open"] <= 0}
-
-        # dag_id to # of running tasks and (dag_id, task_id) to # of running 
tasks.
-        concurrency_map = ConcurrencyMap()
-        concurrency_map.load(session=session)
+        priority_order = [-TI.priority_weight, DR.logical_date, TI.map_index]
 
-        # Number of tasks that cannot be scheduled because of no open slot in 
pool
-        num_starving_tasks_total = 0
-
-        # dag and task ids that can't be queued because of concurrency limits
-        starved_dags: set[str] = set()
-        starved_tasks: set[tuple[str, str]] = set()
-        starved_tasks_task_dagrun_concurrency: set[tuple[str, str, str]] = 
set()
-
-        pool_num_starving_tasks: dict[str, int] = Counter()
+        query = (
+            select(TI)
+            .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
+            .join(TI.dag_run)
+            .where(DR.state == DagRunState.RUNNING)
+            .join(TI.dag_model)
+            .where(~DM.is_paused)
+            .where(TI.state == TaskInstanceState.SCHEDULED)
+            .where(DM.bundle_name.is_not(None))
+            .options(selectinload(TI.dag_model))
+        )
 
-        for loop_count in itertools.count(start=1):
-            num_starved_pools = len(starved_pools)
-            num_starved_dags = len(starved_dags)
-            num_starved_tasks = len(starved_tasks)
-            num_starved_tasks_task_dagrun_concurrency = 
len(starved_tasks_task_dagrun_concurrency)
+        @dataclass
+        class Limit:
+            running_now_join: Subquery
+            max_units: Column
+            window: expression.ColumnElement
+
+        def running_tasks_group(*group_fields):

Review Comment:
   Please add another suggestion to import CTE and I will commit them in batch



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -373,292 +375,165 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
         # If the pools are full, there is no point doing anything!
         # If _somehow_ the pool is overfull, don't let the limit go negative - 
it breaks SQL
         pool_slots_free = sum(max(0, pool["open"]) for pool in pools.values())
+        starved_pools = {pool_name for pool_name, stats in pools.items() if 
stats["open"] <= 0}
+        starved_tasks: set[tuple[str, str]] = set()
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
             return []
 
-        max_tis = min(max_tis, pool_slots_free)
-
-        starved_pools = {pool_name for pool_name, stats in pools.items() if 
stats["open"] <= 0}
-
-        # dag_id to # of running tasks and (dag_id, task_id) to # of running 
tasks.
-        concurrency_map = ConcurrencyMap()
-        concurrency_map.load(session=session)
+        priority_order = [-TI.priority_weight, DR.logical_date, TI.map_index]
 
-        # Number of tasks that cannot be scheduled because of no open slot in 
pool
-        num_starving_tasks_total = 0
-
-        # dag and task ids that can't be queued because of concurrency limits
-        starved_dags: set[str] = set()
-        starved_tasks: set[tuple[str, str]] = set()
-        starved_tasks_task_dagrun_concurrency: set[tuple[str, str, str]] = 
set()
-
-        pool_num_starving_tasks: dict[str, int] = Counter()
+        query = (
+            select(TI)
+            .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
+            .join(TI.dag_run)
+            .where(DR.state == DagRunState.RUNNING)
+            .join(TI.dag_model)
+            .where(~DM.is_paused)
+            .where(TI.state == TaskInstanceState.SCHEDULED)
+            .where(DM.bundle_name.is_not(None))
+            .options(selectinload(TI.dag_model))
+        )
 
-        for loop_count in itertools.count(start=1):
-            num_starved_pools = len(starved_pools)
-            num_starved_dags = len(starved_dags)
-            num_starved_tasks = len(starved_tasks)
-            num_starved_tasks_task_dagrun_concurrency = 
len(starved_tasks_task_dagrun_concurrency)
+        @dataclass
+        class Limit:
+            running_now_join: Subquery
+            max_units: Column
+            window: expression.ColumnElement
+
+        def running_tasks_group(*group_fields):

Review Comment:
   Please add another suggestion to import `CTE` and I will commit them in batch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to