syedahsn commented on code in PR #40017:
URL: https://github.com/apache/airflow/pull/40017#discussion_r1628474628


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1844,6 +1895,58 @@ def _executor_to_tis(self, tis: list[TaskInstance]) -> 
dict[BaseExecutor, list[T
                 executor = str(ti.executor)
             else:
                 executor = None
-            _executor_to_tis[ExecutorLoader.load_executor(executor)].append(ti)
+
+            if executor_obj := self._try_to_load_executor(executor):
+                _executor_to_tis[executor_obj].append(ti)
+            else:
+                continue
 
         return _executor_to_tis
+
+    def _slots_free_for_tis(self, tis: list[TaskInstance]) -> 
set[TaskInstance]:
+        """Return TIs that we have slots available for."""
+        # First get a mapping of executor names to slots they have available
+        executor_to_slots_available: dict[ExecutorName, int] = {}
+        for executor in self.job.executors:
+            # All executors should have a name if they are initted from the 
executor_loader. But we need to
+            # check for None to make mypy happy.
+            if executor.name:
+                executor_to_slots_available[executor.name] = 
executor.slots_available
+            else:
+                raise AirflowException(f"Executor {executor} did not have a 
`name` field configured!")
+
+        # Loop through all the TIs we're scheduling for and add them to a set 
to be moved to queued if the
+        # executor they're assigned to has slots available.
+        tis_we_have_room_for = set()
+        for ti in tis:
+            if ti.executor:
+                executor = str(ti.executor)
+            else:
+                executor = None
+            if executor_obj := self._try_to_load_executor(executor):
+                ti_exec_name = executor_obj.name
+            else:
+                continue
+
+            if ti_exec_name:
+                if executor_to_slots_available[ti_exec_name] > 0:
+                    tis_we_have_room_for.add(ti)
+                    executor_to_slots_available[ti_exec_name] -= 1
+
+        return tis_we_have_room_for
+
+    def _try_to_load_executor(self, executor: str | None) -> BaseExecutor | 
None:
+        """Try to load the given executor.
+
+        In this context, we don't want to fail if the executor does not exist. 
Catch the exception and
+        log to the user.
+        """
+        try:
+            return ExecutorLoader.load_executor(executor)
+        except AirflowException as e:
+            if "Unknown executor" in str(e):
+                self.log.warning("Executor, {executor}, was not found but a 
Task was configured to use it")

Review Comment:
   This isn't an f-string, so the message will not use the `executor` variable 
here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to