ashb commented on code in PR #22483:
URL: https://github.com/apache/airflow/pull/22483#discussion_r847745645


##########
airflow/jobs/scheduler_job.py:
##########
@@ -355,141 +355,120 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session =
                 "%s tasks up for execution:\n\t%s", 
len(task_instances_to_examine), task_instance_str
             )
 
-            pool_to_task_instances: DefaultDict[str, List[TI]] = 
defaultdict(list)
             for task_instance in task_instances_to_examine:
-                
pool_to_task_instances[task_instance.pool].append(task_instance)
+                pool_name = task_instance.pool
 
-            # Go through each pool, and queue up a task for execution if there 
are
-            # any open slots in the pool.
-
-            for pool, task_instances in pool_to_task_instances.items():
-                pool_name = pool
-                if pool not in pools:
-                    self.log.warning("Tasks using non-existent pool '%s' will 
not be scheduled", pool)
+                if pool_name not in pools:
+                    self.log.warning("Tasks using non-existent pool '%s' will 
not be scheduled", pool_name)
                     starved_pools.add(pool_name)
                     continue
 
-                pool_total = pools[pool]["total"]
-                open_slots = pools[pool]["open"]
+                pool_total = pools[pool_name]["total"]
+                open_slots = pools[pool_name]["open"]

Review Comment:
   ```python
                   pool_stats = pools.get(pool_name)
                   if not pool_stats:
                       self.log.warning("Tasks using non-existent pool '%s' 
will not be scheduled", pool_name)
                       starved_pools.add(pool_name)
                       continue
                   pool_total = pool_stats["total"]
                   open_slots = pool_stats["open"]
   ```



##########
airflow/jobs/scheduler_job.py:
##########
@@ -355,141 +355,120 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session =
                 "%s tasks up for execution:\n\t%s", 
len(task_instances_to_examine), task_instance_str
             )
 
-            pool_to_task_instances: DefaultDict[str, List[TI]] = 
defaultdict(list)
             for task_instance in task_instances_to_examine:
-                
pool_to_task_instances[task_instance.pool].append(task_instance)
+                pool_name = task_instance.pool
 
-            # Go through each pool, and queue up a task for execution if there 
are
-            # any open slots in the pool.
-
-            for pool, task_instances in pool_to_task_instances.items():
-                pool_name = pool
-                if pool not in pools:
-                    self.log.warning("Tasks using non-existent pool '%s' will 
not be scheduled", pool)
+                if pool_name not in pools:
+                    self.log.warning("Tasks using non-existent pool '%s' will 
not be scheduled", pool_name)
                     starved_pools.add(pool_name)
                     continue
 
-                pool_total = pools[pool]["total"]
-                open_slots = pools[pool]["open"]
+                pool_total = pools[pool_name]["total"]
+                open_slots = pools[pool_name]["open"]
 
-                num_ready = len(task_instances)
-                self.log.info(
-                    "Figuring out tasks to run in Pool(name=%s) with %s open 
slots "
-                    "and %s task instances ready to be queued",
-                    pool,
-                    open_slots,
-                    num_ready,
-                )
-
-                priority_sorted_task_instances = sorted(
-                    task_instances, key=lambda ti: (-ti.priority_weight, 
ti.execution_date)
-                )
+                if open_slots <= 0:
+                    self.log.info(
+                        "Not scheduling since there are %s open slots in pool 
%s", open_slots, pool_name
+                    )
+                    # Can't schedule any more since there are no more open 
slots.
+                    pool_num_starving_tasks[pool_name] += 1
+                    num_starving_tasks_total += 1
+                    starved_pools.add(pool_name)
+                    continue
 
-                for current_index, task_instance in 
enumerate(priority_sorted_task_instances):
-                    if open_slots <= 0:
-                        self.log.info(
-                            "Not scheduling since there are %s open slots in 
pool %s", open_slots, pool
-                        )
-                        # Can't schedule any more since there are no more open 
slots.
-                        num_unhandled = len(priority_sorted_task_instances) - 
current_index
-                        pool_num_starving_tasks[pool_name] += num_unhandled
-                        num_starving_tasks_total += num_unhandled
-                        starved_pools.add(pool_name)
-                        break
-
-                    if task_instance.pool_slots > pool_total:
-                        self.log.warning(
-                            "Not executing %s. Requested pool slots (%s) are 
greater than "
-                            "total pool slots: '%s' for pool: %s.",
-                            task_instance,
-                            task_instance.pool_slots,
-                            pool_total,
-                            pool,
-                        )
+                if task_instance.pool_slots > pool_total:
+                    self.log.warning(
+                        "Not executing %s. Requested pool slots (%s) are 
greater than "
+                        "total pool slots: '%s' for pool: %s.",
+                        task_instance,
+                        task_instance.pool_slots,
+                        pool_total,
+                        pool_name,
+                    )
 
-                        starved_tasks.add((task_instance.dag_id, 
task_instance.task_id))
-                        continue
+                    starved_tasks.add((task_instance.dag_id, 
task_instance.task_id))
+                    continue
 
-                    if task_instance.pool_slots > open_slots:
-                        self.log.info(
-                            "Not executing %s since it requires %s slots "
-                            "but there are %s open slots in the pool %s.",
-                            task_instance,
-                            task_instance.pool_slots,
-                            open_slots,
-                            pool,
-                        )
-                        pool_num_starving_tasks[pool_name] += 1
-                        num_starving_tasks_total += 1
-                        starved_tasks.add((task_instance.dag_id, 
task_instance.task_id))
-                        # Though we can execute tasks with lower priority if 
there's enough room
-                        continue
+                if task_instance.pool_slots > open_slots:
+                    self.log.info(
+                        "Not executing %s since it requires %s slots "
+                        "but there are %s open slots in the pool %s.",
+                        task_instance,
+                        task_instance.pool_slots,
+                        open_slots,
+                        pool_name,
+                    )
+                    pool_num_starving_tasks[pool_name] += 1
+                    num_starving_tasks_total += 1
+                    starved_tasks.add((task_instance.dag_id, 
task_instance.task_id))
+                    # Though we can execute tasks with lower priority if 
there's enough room
+                    continue
 
-                    # Check to make sure that the task max_active_tasks of the 
DAG hasn't been
-                    # reached.
-                    dag_id = task_instance.dag_id
+                # Check to make sure that the task max_active_tasks of the DAG 
hasn't been
+                # reached.
+                dag_id = task_instance.dag_id
 
-                    current_active_tasks_per_dag = dag_active_tasks_map[dag_id]
-                    max_active_tasks_per_dag_limit = 
task_instance.dag_model.max_active_tasks
+                current_active_tasks_per_dag = dag_active_tasks_map[dag_id]
+                max_active_tasks_per_dag_limit = 
task_instance.dag_model.max_active_tasks
+                self.log.info(
+                    "DAG %s has %s/%s running and queued tasks",
+                    dag_id,
+                    current_active_tasks_per_dag,
+                    max_active_tasks_per_dag_limit,
+                )
+                if current_active_tasks_per_dag >= 
max_active_tasks_per_dag_limit:
                     self.log.info(
-                        "DAG %s has %s/%s running and queued tasks",
+                        "Not executing %s since the number of tasks running or 
queued "
+                        "from DAG %s is >= to the DAG's max_active_tasks limit 
of %s",
+                        task_instance,
                         dag_id,
-                        current_active_tasks_per_dag,
                         max_active_tasks_per_dag_limit,
                     )
-                    if current_active_tasks_per_dag >= 
max_active_tasks_per_dag_limit:
-                        self.log.info(
-                            "Not executing %s since the number of tasks 
running or queued "
-                            "from DAG %s is >= to the DAG's max_active_tasks 
limit of %s",
-                            task_instance,
+                    starved_dags.add(dag_id)
+                    continue
+
+                if task_instance.dag_model.has_task_concurrency_limits:
+                    # Many dags don't have a task_concurrency, so where we can 
avoid loading the full
+                    # serialized DAG the better.
+                    serialized_dag = self.dagbag.get_dag(dag_id, 
session=session)
+                    # If the dag is missing, fail the task and continue to the 
next task.
+                    if not serialized_dag:
+                        self.log.error(
+                            "DAG '%s' for task instance %s not found in 
serialized_dag table",
                             dag_id,
-                            max_active_tasks_per_dag_limit,
+                            task_instance,
+                        )
+                        session.query(TI).filter(TI.dag_id == dag_id, TI.state 
== State.SCHEDULED).update(
+                            {TI.state: State.FAILED}, 
synchronize_session='fetch'
                         )
-                        starved_dags.add(dag_id)
                         continue
 
-                    if task_instance.dag_model.has_task_concurrency_limits:
-                        # Many dags don't have a task_concurrency, so where we 
can avoid loading the full
-                        # serialized DAG the better.
-                        serialized_dag = self.dagbag.get_dag(dag_id, 
session=session)
-                        # If the dag is missing, fail the task and continue to 
the next task.
-                        if not serialized_dag:
-                            self.log.error(
-                                "DAG '%s' for task instance %s not found in 
serialized_dag table",
-                                dag_id,
+                    task_concurrency_limit: Optional[int] = None
+                    if serialized_dag.has_task(task_instance.task_id):
+                        task_concurrency_limit = serialized_dag.get_task(
+                            task_instance.task_id
+                        ).max_active_tis_per_dag
+
+                    if task_concurrency_limit is not None:
+                        current_task_concurrency = task_concurrency_map[
+                            (task_instance.dag_id, task_instance.task_id)
+                        ]
+
+                        if current_task_concurrency >= task_concurrency_limit:
+                            self.log.info(
+                                "Not executing %s since the task concurrency 
for"
+                                " this task has been reached.",
                                 task_instance,
                             )
-                            session.query(TI).filter(TI.dag_id == dag_id, 
TI.state == State.SCHEDULED).update(
-                                {TI.state: State.FAILED}, 
synchronize_session='fetch'
-                            )
+                            starved_tasks.add((task_instance.dag_id, 
task_instance.task_id))
                             continue
 
-                        task_concurrency_limit: Optional[int] = None
-                        if serialized_dag.has_task(task_instance.task_id):
-                            task_concurrency_limit = serialized_dag.get_task(
-                                task_instance.task_id
-                            ).max_active_tis_per_dag
-
-                        if task_concurrency_limit is not None:
-                            current_task_concurrency = task_concurrency_map[
-                                (task_instance.dag_id, task_instance.task_id)
-                            ]
-
-                            if current_task_concurrency >= 
task_concurrency_limit:
-                                self.log.info(
-                                    "Not executing %s since the task 
concurrency for"
-                                    " this task has been reached.",
-                                    task_instance,
-                                )
-                                starved_tasks.add((task_instance.dag_id, 
task_instance.task_id))
-                                continue
-
-                    executable_tis.append(task_instance)
-                    open_slots -= task_instance.pool_slots
-                    dag_active_tasks_map[dag_id] += 1
-                    task_concurrency_map[(task_instance.dag_id, 
task_instance.task_id)] += 1
-
-                pools[pool]["open"] = open_slots
+                executable_tis.append(task_instance)
+                open_slots -= task_instance.pool_slots
+                dag_active_tasks_map[dag_id] += 1
+                task_concurrency_map[(task_instance.dag_id, 
task_instance.task_id)] += 1
+
+                pools[pool_name]["open"] = open_slots

Review Comment:
   ```suggestion
                   pool_stats["open"] = open_slots
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to