This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 9798f314ddc23ab129d8e52988eab08251e755c7 Author: Hussein Awala <huss...@awala.fr> AuthorDate: Tue Nov 28 20:14:13 2023 +0200 Consolidate the call of change_state to fail or success in the core executors (#35901) (cherry picked from commit ce7f043e15534c3d9ba6d59c3bb6b851e36a60b9) --- airflow/executors/debug_executor.py | 6 +++--- airflow/executors/sequential_executor.py | 5 ++--- tests/executors/test_debug_executor.py | 4 ++-- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/airflow/executors/debug_executor.py b/airflow/executors/debug_executor.py index b601c2b7c9..bb5f46b1f7 100644 --- a/airflow/executors/debug_executor.py +++ b/airflow/executors/debug_executor.py @@ -74,7 +74,7 @@ class DebugExecutor(BaseExecutor): elif self._terminated.is_set(): self.log.info("Executor is terminated! Stopping %s to %s", ti.key, TaskInstanceState.FAILED) ti.set_state(TaskInstanceState.FAILED) - self.change_state(ti.key, TaskInstanceState.FAILED) + self.fail(ti.key) else: task_succeeded = self._run_task(ti) @@ -84,11 +84,11 @@ class DebugExecutor(BaseExecutor): try: params = self.tasks_params.pop(ti.key, {}) ti.run(job_id=ti.job_id, **params) - self.change_state(key, TaskInstanceState.SUCCESS) + self.success(key) return True except Exception as e: ti.set_state(TaskInstanceState.FAILED) - self.change_state(key, TaskInstanceState.FAILED) + self.fail(key) self.log.exception("Failed to execute task: %s.", e) return False diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 8ea3e42dc5..227bf879f3 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -28,7 +28,6 @@ import subprocess from typing import TYPE_CHECKING, Any from airflow.executors.base_executor import BaseExecutor -from airflow.utils.state import TaskInstanceState if TYPE_CHECKING: from airflow.executors.base_executor import CommandType @@ -75,9 +74,9 @@ class SequentialExecutor(BaseExecutor): try: subprocess.check_call(command, close_fds=True) - self.change_state(key, TaskInstanceState.SUCCESS) + self.success(key) except subprocess.CalledProcessError as e: - self.change_state(key, TaskInstanceState.FAILED) + self.fail(key) self.log.error("Failed to execute task %s.", e) self.commands_to_run = [] diff --git a/tests/executors/test_debug_executor.py b/tests/executors/test_debug_executor.py index 03a91f9c92..20ee821842 100644 --- a/tests/executors/test_debug_executor.py +++ b/tests/executors/test_debug_executor.py @@ -111,7 +111,7 @@ class TestDebugExecutor: assert not executor.tasks_to_run change_state_mock.assert_has_calls( [ - mock.call(ti1.key, State.FAILED), + mock.call(ti1.key, State.FAILED, None), mock.call(ti2.key, State.UPSTREAM_FAILED), ] ) @@ -145,6 +145,6 @@ class TestDebugExecutor: change_state_mock.assert_has_calls( [ - mock.call(ti1.key, State.FAILED), + mock.call(ti1.key, State.FAILED, None), ] )