This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 07c40bd78a Support failing tasks stuck in queued for hybrid executors 
(#39624)
07c40bd78a is described below

commit 07c40bd78a05bb4e1e8ee03d885006ac7a44e21d
Author: Niko Oliveira <oniko...@amazon.com>
AuthorDate: Wed May 15 13:34:17 2024 -0700

    Support failing tasks stuck in queued for hybrid executors (#39624)
    
    Sort the set of tasks that are up for failing for being queued too long
    and send them to the appropriate executor for cleanup.
---
 airflow/jobs/scheduler_job_runner.py | 30 ++++++++++++++---------------
 tests/jobs/test_scheduler_job.py     | 37 ++++++++++++++++++++++++++----------
 2 files changed, 41 insertions(+), 26 deletions(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index f2333e8d5a..3d81aaeb56 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -1566,22 +1566,20 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 TI.queued_by_job_id == self.job.id,
             )
         ).all()
-        try:
-            cleaned_up_task_instances = 
self.job.executor.cleanup_stuck_queued_tasks(
-                tis=tasks_stuck_in_queued
-            )
-            cleaned_up_task_instances = set(cleaned_up_task_instances)
-            for ti in tasks_stuck_in_queued:
-                if repr(ti) in cleaned_up_task_instances:
-                    self._task_context_logger.warning(
-                        "Marking task instance %s stuck in queued as failed. "
-                        "If the task instance has available retries, it will 
be retried.",
-                        ti,
-                        ti=ti,
-                    )
-        except NotImplementedError:
-            self.log.debug("Executor doesn't support cleanup of stuck queued 
tasks. Skipping.")
-            ...
+
+        for executor, stuck_tis in 
self._executor_to_tis(tasks_stuck_in_queued).items():
+            try:
+                cleaned_up_task_instances = 
set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
+                for ti in stuck_tis:
+                    if repr(ti) in cleaned_up_task_instances:
+                        self._task_context_logger.warning(
+                            "Marking task instance %s stuck in queued as 
failed. "
+                            "If the task instance has available retries, it 
will be retried.",
+                            ti,
+                            ti=ti,
+                        )
+            except NotImplementedError:
+                self.log.debug("Executor doesn't support cleanup of stuck 
queued tasks. Skipping.")
 
     @provide_session
     def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 85399892ac..9fcdba5ac5 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1809,24 +1809,41 @@ class TestSchedulerJob:
         # Second executor called for ti3
         
mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3])
 
-    def test_fail_stuck_queued_tasks(self, dag_maker, session):
-        with dag_maker("test_fail_stuck_queued_tasks"):
+    def test_fail_stuck_queued_tasks(self, dag_maker, session, mock_executors):
+        with dag_maker("test_fail_stuck_queued_tasks_multiple_executors"):
             op1 = EmptyOperator(task_id="op1")
+            op2 = EmptyOperator(task_id="op2", executor="default_exec")
+            op3 = EmptyOperator(task_id="op3", executor="secondary_exec")
 
         dr = dag_maker.create_dagrun()
-        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
-        ti.state = State.QUEUED
-        ti.queued_dttm = timezone.utcnow() - timedelta(minutes=15)
+        ti1 = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti2 = dr.get_task_instance(task_id=op2.task_id, session=session)
+        ti3 = dr.get_task_instance(task_id=op3.task_id, session=session)
+        for ti in [ti1, ti2, ti3]:
+            ti.state = State.QUEUED
+            ti.queued_dttm = timezone.utcnow() - timedelta(minutes=15)
         session.commit()
-        executor = MagicMock()
-        executor.cleanup_stuck_queued_tasks = mock.MagicMock()
-        scheduler_job = Job(executor=executor)
+        scheduler_job = Job()
         job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0)
         job_runner._task_queued_timeout = 300
 
-        job_runner._fail_tasks_stuck_in_queued()
+        with 
mock.patch("airflow.executors.executor_loader.ExecutorLoader.load_executor") as 
loader_mock:
+            # The executors are mocked, so cannot be loaded/imported. Mock 
load_executor and return the
+            # correct object for the given input executor name.
+            loader_mock.side_effect = lambda *x: {
+                ("default_exec",): mock_executors[0],
+                (None,): mock_executors[0],
+                ("secondary_exec",): mock_executors[1],
+            }[x]
+            job_runner._fail_tasks_stuck_in_queued()
 
-        job_runner.job.executor.cleanup_stuck_queued_tasks.assert_called_once()
+        # Default executor is called for ti1 (no explicit executor override 
uses default) and ti2 (where we
+        # explicitly marked that for execution by the default executor)
+        try:
+            
mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti1, 
ti2])
+        except AssertionError:
+            
mock_executors[0].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti2, 
ti1])
+        
mock_executors[1].cleanup_stuck_queued_tasks.assert_called_once_with(tis=[ti3])
 
     def test_fail_stuck_queued_tasks_raises_not_implemented(self, dag_maker, 
session, caplog):
         with dag_maker("test_fail_stuck_queued_tasks"):

Reply via email to