syedahsn commented on code in PR #40017: URL: https://github.com/apache/airflow/pull/40017#discussion_r1628474628
########## airflow/jobs/scheduler_job_runner.py: ########## @@ -1844,6 +1895,58 @@ def _executor_to_tis(self, tis: list[TaskInstance]) -> dict[BaseExecutor, list[T executor = str(ti.executor) else: executor = None - _executor_to_tis[ExecutorLoader.load_executor(executor)].append(ti) + + if executor_obj := self._try_to_load_executor(executor): + _executor_to_tis[executor_obj].append(ti) + else: + continue return _executor_to_tis + + def _slots_free_for_tis(self, tis: list[TaskInstance]) -> set[TaskInstance]: + """Return TIs that we have slots available for.""" + # First get a mapping of executor names to slots they have available + executor_to_slots_available: dict[ExecutorName, int] = {} + for executor in self.job.executors: + # All executors should have a name if they are initted from the executor_loader. But we need to + # check for None to make mypy happy. + if executor.name: + executor_to_slots_available[executor.name] = executor.slots_available + else: + raise AirflowException(f"Executor {executor} did not have a `name` field configured!") + + # Loop through all the TIs we're scheduling for and add them to a set to be moved to queued if the + # executor they're assigned to has slots available. + tis_we_have_room_for = set() + for ti in tis: + if ti.executor: + executor = str(ti.executor) + else: + executor = None + if executor_obj := self._try_to_load_executor(executor): + ti_exec_name = executor_obj.name + else: + continue + + if ti_exec_name: + if executor_to_slots_available[ti_exec_name] > 0: + tis_we_have_room_for.add(ti) + executor_to_slots_available[ti_exec_name] -= 1 + + return tis_we_have_room_for + + def _try_to_load_executor(self, executor: str | None) -> BaseExecutor | None: + """Try to load the given executor. + + In this context, we don't want to fail if the executor does not exist. Catch the exception and + log to the user. + """ + try: + return ExecutorLoader.load_executor(executor) + except AirflowException as e: + if "Unknown executor" in str(e): + self.log.warning("Executor, {executor}, was not found but a Task was configured to use it") Review Comment: This isn't an f-string, so the message will not use the `executor` variable here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org