This is an automated email from the ASF dual-hosted git repository. jhtimmins pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c6d09edf7b77ac4becc7654465c535571f88babf Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Thu May 20 11:22:01 2021 +0100 Fail tasks in scheduler when executor reports they failed (#15929) When a task fails in executor while still queued in scheduler, the executor reports this failure but scheduler doesn't change the task state resulting in the task being queued until the scheduler is restarted. This commit fixes it by ensuring that when a task is reported to have failed in the executor, the task is failed in scheduler (cherry picked from commit deececcabc080844ca89272a2e4ab1183cd51e3f) --- airflow/jobs/scheduler_job.py | 4 +++- tests/jobs/test_scheduler_job.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index b99f4b2..1758ae1 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1252,12 +1252,14 @@ class SchedulerJob(BaseJob): "task says its %s. (Info: %s) Was the task killed externally?" ) self.log.error(msg, ti, state, ti.state, info) + request = TaskCallbackRequest( full_filepath=ti.dag_model.fileloc, simple_task_instance=SimpleTaskInstance(ti), msg=msg % (ti, state, ti.state, info), ) - + self.log.info('Setting task instance %s state to %s as reported by executor', ti, state) + ti.set_state(state) self.processor_agent.send_callback_to_execute(request) return len(event_buffer) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 0d1f530..37ae65b 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -907,7 +907,7 @@ class TestSchedulerJob(unittest.TestCase): self.scheduler_job._process_executor_events(session=session) ti1.refresh_from_db() - assert ti1.state == State.QUEUED + assert ti1.state == State.FAILED mock_task_callback.assert_called_once_with( full_filepath='/test_path1/', simple_task_instance=mock.ANY,