This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 9798f314ddc23ab129d8e52988eab08251e755c7
Author: Hussein Awala <huss...@awala.fr>
AuthorDate: Tue Nov 28 20:14:13 2023 +0200

    Consolidate the call of change_state to fail or success in the core 
executors (#35901)
    
    (cherry picked from commit ce7f043e15534c3d9ba6d59c3bb6b851e36a60b9)
---
 airflow/executors/debug_executor.py      | 6 +++---
 airflow/executors/sequential_executor.py | 5 ++---
 tests/executors/test_debug_executor.py   | 4 ++--
 3 files changed, 7 insertions(+), 8 deletions(-)

diff --git a/airflow/executors/debug_executor.py 
b/airflow/executors/debug_executor.py
index b601c2b7c9..bb5f46b1f7 100644
--- a/airflow/executors/debug_executor.py
+++ b/airflow/executors/debug_executor.py
@@ -74,7 +74,7 @@ class DebugExecutor(BaseExecutor):
             elif self._terminated.is_set():
                 self.log.info("Executor is terminated! Stopping %s to %s", 
ti.key, TaskInstanceState.FAILED)
                 ti.set_state(TaskInstanceState.FAILED)
-                self.change_state(ti.key, TaskInstanceState.FAILED)
+                self.fail(ti.key)
             else:
                 task_succeeded = self._run_task(ti)
 
@@ -84,11 +84,11 @@ class DebugExecutor(BaseExecutor):
         try:
             params = self.tasks_params.pop(ti.key, {})
             ti.run(job_id=ti.job_id, **params)
-            self.change_state(key, TaskInstanceState.SUCCESS)
+            self.success(key)
             return True
         except Exception as e:
             ti.set_state(TaskInstanceState.FAILED)
-            self.change_state(key, TaskInstanceState.FAILED)
+            self.fail(key)
             self.log.exception("Failed to execute task: %s.", e)
             return False
 
diff --git a/airflow/executors/sequential_executor.py 
b/airflow/executors/sequential_executor.py
index 8ea3e42dc5..227bf879f3 100644
--- a/airflow/executors/sequential_executor.py
+++ b/airflow/executors/sequential_executor.py
@@ -28,7 +28,6 @@ import subprocess
 from typing import TYPE_CHECKING, Any
 
 from airflow.executors.base_executor import BaseExecutor
-from airflow.utils.state import TaskInstanceState
 
 if TYPE_CHECKING:
     from airflow.executors.base_executor import CommandType
@@ -75,9 +74,9 @@ class SequentialExecutor(BaseExecutor):
 
             try:
                 subprocess.check_call(command, close_fds=True)
-                self.change_state(key, TaskInstanceState.SUCCESS)
+                self.success(key)
             except subprocess.CalledProcessError as e:
-                self.change_state(key, TaskInstanceState.FAILED)
+                self.fail(key)
                 self.log.error("Failed to execute task %s.", e)
 
         self.commands_to_run = []
diff --git a/tests/executors/test_debug_executor.py 
b/tests/executors/test_debug_executor.py
index 03a91f9c92..20ee821842 100644
--- a/tests/executors/test_debug_executor.py
+++ b/tests/executors/test_debug_executor.py
@@ -111,7 +111,7 @@ class TestDebugExecutor:
         assert not executor.tasks_to_run
         change_state_mock.assert_has_calls(
             [
-                mock.call(ti1.key, State.FAILED),
+                mock.call(ti1.key, State.FAILED, None),
                 mock.call(ti2.key, State.UPSTREAM_FAILED),
             ]
         )
@@ -145,6 +145,6 @@ class TestDebugExecutor:
 
         change_state_mock.assert_has_calls(
             [
-                mock.call(ti1.key, State.FAILED),
+                mock.call(ti1.key, State.FAILED, None),
             ]
         )

Reply via email to