This is an automated email from the ASF dual-hosted git repository.

jhtimmins pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c6d09edf7b77ac4becc7654465c535571f88babf
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Thu May 20 11:22:01 2021 +0100

    Fail tasks in scheduler when executor reports they failed (#15929)
    
    When a task fails in executor while still queued in scheduler, the executor 
reports
    this failure but scheduler doesn't change the task state resulting in the 
task
    being queued until the scheduler is restarted. This commit fixes it by 
ensuring
    that when a task is reported to have failed in the executor, the task is 
failed
    in scheduler
    
    (cherry picked from commit deececcabc080844ca89272a2e4ab1183cd51e3f)
---
 airflow/jobs/scheduler_job.py    | 4 +++-
 tests/jobs/test_scheduler_job.py | 2 +-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index b99f4b2..1758ae1 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1252,12 +1252,14 @@ class SchedulerJob(BaseJob):
                     "task says its %s. (Info: %s) Was the task killed 
externally?"
                 )
                 self.log.error(msg, ti, state, ti.state, info)
+
                 request = TaskCallbackRequest(
                     full_filepath=ti.dag_model.fileloc,
                     simple_task_instance=SimpleTaskInstance(ti),
                     msg=msg % (ti, state, ti.state, info),
                 )
-
+                self.log.info('Setting task instance %s state to %s as 
reported by executor', ti, state)
+                ti.set_state(state)
                 self.processor_agent.send_callback_to_execute(request)
 
         return len(event_buffer)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 0d1f530..37ae65b 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -907,7 +907,7 @@ class TestSchedulerJob(unittest.TestCase):
 
         self.scheduler_job._process_executor_events(session=session)
         ti1.refresh_from_db()
-        assert ti1.state == State.QUEUED
+        assert ti1.state == State.FAILED
         mock_task_callback.assert_called_once_with(
             full_filepath='/test_path1/',
             simple_task_instance=mock.ANY,

Reply via email to