This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f17981cab3 Fix `RayJobBaseOperator` polling to recognize STOPPED as 
terminal status (#64206)
9f17981cab3 is described below

commit 9f17981cab37b8a8a66991cf68cf2963de9f2fa1
Author: Elad Kalif <[email protected]>
AuthorDate: Wed Mar 25 12:45:02 2026 +0200

    Fix `RayJobBaseOperator` polling to recognize STOPPED as terminal status 
(#64206)
---
 .../google/src/airflow/providers/google/cloud/operators/ray.py |  2 +-
 providers/google/tests/unit/google/cloud/operators/test_ray.py | 10 +++++++---
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/ray.py 
b/providers/google/src/airflow/providers/google/cloud/operators/ray.py
index c243c7d8e26..4a907e8170b 100644
--- a/providers/google/src/airflow/providers/google/cloud/operators/ray.py
+++ b/providers/google/src/airflow/providers/google/cloud/operators/ray.py
@@ -36,7 +36,7 @@ if TYPE_CHECKING:
     from airflow.providers.common.compat.sdk import Context
 
 
-TERMINAL_STATUSES = {JobStatus.SUCCEEDED.value, JobStatus.FAILED.value}
+TERMINAL_STATUSES = {JobStatus.SUCCEEDED.value, JobStatus.FAILED.value, 
JobStatus.STOPPED.value}
 
 
 class OperationFailedException(Exception):
diff --git a/providers/google/tests/unit/google/cloud/operators/test_ray.py 
b/providers/google/tests/unit/google/cloud/operators/test_ray.py
index 5d8aee1f172..04667142e38 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_ray.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_ray.py
@@ -165,9 +165,13 @@ class TestRaySubmitJobOperator:
             },
         )
 
+    @pytest.mark.parametrize(
+        "terminal_status",
+        [JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.STOPPED],
+    )
     @mock.patch(RAY_OP_PATH.format("time.sleep"))
     @mock.patch(RAY_OP_PATH.format("RayJobHook"))
-    def test_check_job_status_reaches_terminal(self, mock_hook_cls, 
mock_sleep):
+    def test_check_job_status_reaches_terminal(self, mock_hook_cls, 
mock_sleep, terminal_status):
         mock_hook = mock_hook_cls.return_value
         mock_hook.stop_job.return_value = True
 
@@ -181,11 +185,11 @@ class TestRaySubmitJobOperator:
             get_job_logs=True,
         )
         operator.hook.get_job_status = mock.MagicMock(
-            side_effect=[JobStatus.RUNNING, JobStatus.RUNNING, 
JobStatus.SUCCEEDED]
+            side_effect=[JobStatus.RUNNING, JobStatus.RUNNING, terminal_status]
         )
         status = operator._check_job_status("addr", "job", polling_interval=1, 
timeout=100)
 
-        assert status == JobStatus.SUCCEEDED
+        assert status == terminal_status
         assert mock_sleep.call_count == 2
 
     @mock.patch(RAY_OP_PATH.format("time.sleep"))

Reply via email to