ashb commented on code in PR #22483: URL: https://github.com/apache/airflow/pull/22483#discussion_r847745645
########## airflow/jobs/scheduler_job.py: ########## @@ -355,141 +355,120 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = "%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str ) - pool_to_task_instances: DefaultDict[str, List[TI]] = defaultdict(list) for task_instance in task_instances_to_examine: - pool_to_task_instances[task_instance.pool].append(task_instance) + pool_name = task_instance.pool - # Go through each pool, and queue up a task for execution if there are - # any open slots in the pool. - - for pool, task_instances in pool_to_task_instances.items(): - pool_name = pool - if pool not in pools: - self.log.warning("Tasks using non-existent pool '%s' will not be scheduled", pool) + if pool_name not in pools: + self.log.warning("Tasks using non-existent pool '%s' will not be scheduled", pool_name) starved_pools.add(pool_name) continue - pool_total = pools[pool]["total"] - open_slots = pools[pool]["open"] + pool_total = pools[pool_name]["total"] + open_slots = pools[pool_name]["open"] Review Comment: ```python pool_stats = pools.get(pool_name) if not pool_stats: self.log.warning("Tasks using non-existent pool '%s' will not be scheduled", pool_name) starved_pools.add(pool_name) continue pool_total = pool_stats["total"] open_slots = pool_stats["open"] ``` ########## airflow/jobs/scheduler_job.py: ########## @@ -355,141 +355,120 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = "%s tasks up for execution:\n\t%s", len(task_instances_to_examine), task_instance_str ) - pool_to_task_instances: DefaultDict[str, List[TI]] = defaultdict(list) for task_instance in task_instances_to_examine: - pool_to_task_instances[task_instance.pool].append(task_instance) + pool_name = task_instance.pool - # Go through each pool, and queue up a task for execution if there are - # any open slots in the pool. - - for pool, task_instances in pool_to_task_instances.items(): - pool_name = pool - if pool not in pools: - self.log.warning("Tasks using non-existent pool '%s' will not be scheduled", pool) + if pool_name not in pools: + self.log.warning("Tasks using non-existent pool '%s' will not be scheduled", pool_name) starved_pools.add(pool_name) continue - pool_total = pools[pool]["total"] - open_slots = pools[pool]["open"] + pool_total = pools[pool_name]["total"] + open_slots = pools[pool_name]["open"] - num_ready = len(task_instances) - self.log.info( - "Figuring out tasks to run in Pool(name=%s) with %s open slots " - "and %s task instances ready to be queued", - pool, - open_slots, - num_ready, - ) - - priority_sorted_task_instances = sorted( - task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date) - ) + if open_slots <= 0: + self.log.info( + "Not scheduling since there are %s open slots in pool %s", open_slots, pool_name + ) + # Can't schedule any more since there are no more open slots. + pool_num_starving_tasks[pool_name] += 1 + num_starving_tasks_total += 1 + starved_pools.add(pool_name) + continue - for current_index, task_instance in enumerate(priority_sorted_task_instances): - if open_slots <= 0: - self.log.info( - "Not scheduling since there are %s open slots in pool %s", open_slots, pool - ) - # Can't schedule any more since there are no more open slots. - num_unhandled = len(priority_sorted_task_instances) - current_index - pool_num_starving_tasks[pool_name] += num_unhandled - num_starving_tasks_total += num_unhandled - starved_pools.add(pool_name) - break - - if task_instance.pool_slots > pool_total: - self.log.warning( - "Not executing %s. Requested pool slots (%s) are greater than " - "total pool slots: '%s' for pool: %s.", - task_instance, - task_instance.pool_slots, - pool_total, - pool, - ) + if task_instance.pool_slots > pool_total: + self.log.warning( + "Not executing %s. Requested pool slots (%s) are greater than " + "total pool slots: '%s' for pool: %s.", + task_instance, + task_instance.pool_slots, + pool_total, + pool_name, + ) - starved_tasks.add((task_instance.dag_id, task_instance.task_id)) - continue + starved_tasks.add((task_instance.dag_id, task_instance.task_id)) + continue - if task_instance.pool_slots > open_slots: - self.log.info( - "Not executing %s since it requires %s slots " - "but there are %s open slots in the pool %s.", - task_instance, - task_instance.pool_slots, - open_slots, - pool, - ) - pool_num_starving_tasks[pool_name] += 1 - num_starving_tasks_total += 1 - starved_tasks.add((task_instance.dag_id, task_instance.task_id)) - # Though we can execute tasks with lower priority if there's enough room - continue + if task_instance.pool_slots > open_slots: + self.log.info( + "Not executing %s since it requires %s slots " + "but there are %s open slots in the pool %s.", + task_instance, + task_instance.pool_slots, + open_slots, + pool_name, + ) + pool_num_starving_tasks[pool_name] += 1 + num_starving_tasks_total += 1 + starved_tasks.add((task_instance.dag_id, task_instance.task_id)) + # Though we can execute tasks with lower priority if there's enough room + continue - # Check to make sure that the task max_active_tasks of the DAG hasn't been - # reached. - dag_id = task_instance.dag_id + # Check to make sure that the task max_active_tasks of the DAG hasn't been + # reached. + dag_id = task_instance.dag_id - current_active_tasks_per_dag = dag_active_tasks_map[dag_id] - max_active_tasks_per_dag_limit = task_instance.dag_model.max_active_tasks + current_active_tasks_per_dag = dag_active_tasks_map[dag_id] + max_active_tasks_per_dag_limit = task_instance.dag_model.max_active_tasks + self.log.info( + "DAG %s has %s/%s running and queued tasks", + dag_id, + current_active_tasks_per_dag, + max_active_tasks_per_dag_limit, + ) + if current_active_tasks_per_dag >= max_active_tasks_per_dag_limit: self.log.info( - "DAG %s has %s/%s running and queued tasks", + "Not executing %s since the number of tasks running or queued " + "from DAG %s is >= to the DAG's max_active_tasks limit of %s", + task_instance, dag_id, - current_active_tasks_per_dag, max_active_tasks_per_dag_limit, ) - if current_active_tasks_per_dag >= max_active_tasks_per_dag_limit: - self.log.info( - "Not executing %s since the number of tasks running or queued " - "from DAG %s is >= to the DAG's max_active_tasks limit of %s", - task_instance, + starved_dags.add(dag_id) + continue + + if task_instance.dag_model.has_task_concurrency_limits: + # Many dags don't have a task_concurrency, so where we can avoid loading the full + # serialized DAG the better. + serialized_dag = self.dagbag.get_dag(dag_id, session=session) + # If the dag is missing, fail the task and continue to the next task. + if not serialized_dag: + self.log.error( + "DAG '%s' for task instance %s not found in serialized_dag table", dag_id, - max_active_tasks_per_dag_limit, + task_instance, + ) + session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update( + {TI.state: State.FAILED}, synchronize_session='fetch' ) - starved_dags.add(dag_id) continue - if task_instance.dag_model.has_task_concurrency_limits: - # Many dags don't have a task_concurrency, so where we can avoid loading the full - # serialized DAG the better. - serialized_dag = self.dagbag.get_dag(dag_id, session=session) - # If the dag is missing, fail the task and continue to the next task. - if not serialized_dag: - self.log.error( - "DAG '%s' for task instance %s not found in serialized_dag table", - dag_id, + task_concurrency_limit: Optional[int] = None + if serialized_dag.has_task(task_instance.task_id): + task_concurrency_limit = serialized_dag.get_task( + task_instance.task_id + ).max_active_tis_per_dag + + if task_concurrency_limit is not None: + current_task_concurrency = task_concurrency_map[ + (task_instance.dag_id, task_instance.task_id) + ] + + if current_task_concurrency >= task_concurrency_limit: + self.log.info( + "Not executing %s since the task concurrency for" + " this task has been reached.", task_instance, ) - session.query(TI).filter(TI.dag_id == dag_id, TI.state == State.SCHEDULED).update( - {TI.state: State.FAILED}, synchronize_session='fetch' - ) + starved_tasks.add((task_instance.dag_id, task_instance.task_id)) continue - task_concurrency_limit: Optional[int] = None - if serialized_dag.has_task(task_instance.task_id): - task_concurrency_limit = serialized_dag.get_task( - task_instance.task_id - ).max_active_tis_per_dag - - if task_concurrency_limit is not None: - current_task_concurrency = task_concurrency_map[ - (task_instance.dag_id, task_instance.task_id) - ] - - if current_task_concurrency >= task_concurrency_limit: - self.log.info( - "Not executing %s since the task concurrency for" - " this task has been reached.", - task_instance, - ) - starved_tasks.add((task_instance.dag_id, task_instance.task_id)) - continue - - executable_tis.append(task_instance) - open_slots -= task_instance.pool_slots - dag_active_tasks_map[dag_id] += 1 - task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1 - - pools[pool]["open"] = open_slots + executable_tis.append(task_instance) + open_slots -= task_instance.pool_slots + dag_active_tasks_map[dag_id] += 1 + task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1 + + pools[pool_name]["open"] = open_slots Review Comment: ```suggestion pool_stats["open"] = open_slots ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org