This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c692e85e34c Fix CloudRunExecuteJobOperator deferrable mode silently 
passing on cancel (#67050)
c692e85e34c is described below

commit c692e85e34c0f163390fa1506c13cf9513a12b23
Author: deepinsight coder <[email protected]>
AuthorDate: Sun May 17 12:42:58 2026 -0700

    Fix CloudRunExecuteJobOperator deferrable mode silently passing on cancel 
(#67050)
---
 .../providers/google/cloud/triggers/cloud_run.py   | 43 +++++++++++++-
 .../unit/google/cloud/operators/test_cloud_run.py  | 26 +++++++++
 .../unit/google/cloud/triggers/test_cloud_run.py   | 68 ++++++++++++++++++++++
 3 files changed, 135 insertions(+), 2 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py 
b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
index 87f1d5f0d89..d5cebf5ca7f 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py
@@ -21,6 +21,8 @@ from collections.abc import AsyncIterator, Sequence
 from enum import Enum
 from typing import Any, Literal
 
+from google.cloud.run_v2 import Execution
+
 from airflow.providers.common.compat.sdk import AirflowException
 from airflow.providers.google.cloud.hooks.cloud_run import CloudRunAsyncHook
 from airflow.triggers.base import BaseTrigger, TriggerEvent
@@ -127,13 +129,50 @@ class CloudRunJobFinishedTrigger(BaseTrigger):
                             "job_name": self.job_name,
                         }
                     )
-                else:
+                    return
+
+                # The LRO can complete without populating ``operation.error`` 
even when the
+                # underlying Cloud Run Execution did not succeed — for example 
when the job is
+                # cancelled from the Google Cloud UI or API, every remaining 
task ends up in
+                # ``cancelled_count`` rather than ``failed_count``. Mirror the 
sync path's
+                # ``_fail_if_execution_failed`` check on the Execution payload 
so deferrable mode
+                # surfaces the same failure semantics.
+                execution = Execution.deserialize(operation.response.value)
+                if execution.succeeded_count + execution.failed_count != 
execution.task_count:
+                    yield TriggerEvent(
+                        {
+                            "status": RunJobStatus.FAIL.value,
+                            "operation_error_code": None,
+                            "operation_error_message": (
+                                f"Cloud Run Job did not finish all tasks: 
task_count="
+                                f"{execution.task_count}, succeeded_count="
+                                f"{execution.succeeded_count}, failed_count="
+                                f"{execution.failed_count}, cancelled_count="
+                                f"{execution.cancelled_count}."
+                            ),
+                            "job_name": self.job_name,
+                        }
+                    )
+                    return
+                if execution.failed_count > 0:
                     yield TriggerEvent(
                         {
-                            "status": RunJobStatus.SUCCESS.value,
+                            "status": RunJobStatus.FAIL.value,
+                            "operation_error_code": None,
+                            "operation_error_message": (
+                                f"Some Cloud Run Job tasks failed: 
failed_count="
+                                f"{execution.failed_count} of 
task_count={execution.task_count}."
+                            ),
                             "job_name": self.job_name,
                         }
                     )
+                    return
+                yield TriggerEvent(
+                    {
+                        "status": RunJobStatus.SUCCESS.value,
+                        "job_name": self.job_name,
+                    }
+                )
                 return
             elif operation.error.message:
                 raise AirflowException(f"Cloud Run Job error: 
{operation.error.message}")
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py 
b/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
index b77d4199d6f..c29af5f9987 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_run.py
@@ -252,6 +252,32 @@ class TestCloudRunExecuteJobOperator:
             e.value
         )
 
+    @mock.patch(CLOUD_RUN_HOOK_PATH)
+    def 
test_execute_deferrable_execute_complete_method_fail_on_cancellation(self, 
hook_mock):
+        """
+        Pin the contract that a FAIL event emitted by the trigger when a Cloud 
Run Job is
+        cancelled (no ``operation.error`` but ``cancelled_count > 0``) 
propagates as an
+        AirflowException — see #57791.
+        """
+        operator = CloudRunExecuteJobOperator(
+            task_id=TASK_ID, project_id=PROJECT_ID, region=REGION, 
job_name=JOB_NAME, deferrable=True
+        )
+
+        event = {
+            "status": RunJobStatus.FAIL.value,
+            "operation_error_code": None,
+            "operation_error_message": (
+                "Cloud Run Job did not finish all tasks: task_count=3, 
succeeded_count=1, "
+                "failed_count=0, cancelled_count=2."
+            ),
+            "job_name": JOB_NAME,
+        }
+
+        with pytest.raises(AirflowException) as e:
+            operator.execute_complete(mock.MagicMock(), event)
+
+        assert "cancelled_count=2" in str(e.value)
+
     @mock.patch(CLOUD_RUN_HOOK_PATH)
     def test_execute_deferrable_execute_complete_method_success(self, 
hook_mock):
         hook_mock.return_value.get_job.return_value = JOB
diff --git 
a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py 
b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
index a906a7d0332..0988444cfb5 100644
--- a/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
+++ b/providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py
@@ -20,12 +20,26 @@ from __future__ import annotations
 from unittest import mock
 
 import pytest
+from google.cloud.run_v2 import Execution
 from google.protobuf.any_pb2 import Any
 from google.rpc.status_pb2 import Status
 
 from airflow.providers.google.cloud.triggers.cloud_run import 
CloudRunJobFinishedTrigger, RunJobStatus
 from airflow.triggers.base import TriggerEvent
 
+
+def _packed_execution_response(task_count, succeeded_count, failed_count, 
cancelled_count=0):
+    """Build a ``google.protobuf.Any`` packed with an ``Execution`` proto for 
trigger tests."""
+    execution = Execution()
+    execution.task_count = task_count
+    execution.succeeded_count = succeeded_count
+    execution.failed_count = failed_count
+    execution.cancelled_count = cancelled_count
+    response = Any()
+    response.Pack(Execution.pb(execution))
+    return response
+
+
 OPERATION_NAME = "operation"
 JOB_NAME = "jobName"
 ERROR_CODE = 13
@@ -87,6 +101,7 @@ class TestCloudBatchJobFinishedTrigger:
             operation.name = "name"
             operation.error = Any()
             operation.error.ParseFromString(b"")
+            operation.response = _packed_execution_response(task_count=3, 
succeeded_count=3, failed_count=0)
             return operation
 
         mock_hook.return_value.get_operation = _mock_operation
@@ -102,6 +117,59 @@ class TestCloudBatchJobFinishedTrigger:
             == actual
         )
 
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+    async def test_trigger_yields_fail_when_job_cancelled(
+        self, mock_hook, trigger: CloudRunJobFinishedTrigger
+    ):
+        """
+        When the Cloud Run Job is cancelled via the Google Cloud UI/API the 
LRO completes with
+        no ``operation.error`` set but the Execution reports a non-zero 
``cancelled_count``. The
+        trigger must surface this as a failure to mirror the sync path's 
semantics — see #57791.
+        """
+
+        async def _mock_operation(operation_name, location, 
use_regional_endpoint):
+            operation = mock.MagicMock()
+            operation.done = True
+            operation.error = Any()
+            operation.error.ParseFromString(b"")
+            operation.response = _packed_execution_response(
+                task_count=3, succeeded_count=1, failed_count=0, 
cancelled_count=2
+            )
+            return operation
+
+        mock_hook.return_value.get_operation = _mock_operation
+        generator = trigger.run()
+        actual = await generator.asend(None)  # type:ignore[attr-defined]
+        assert actual.payload["status"] == RunJobStatus.FAIL.value
+        assert actual.payload["job_name"] == JOB_NAME
+        assert "cancelled_count=2" in actual.payload["operation_error_message"]
+        assert "did not finish all tasks" in 
actual.payload["operation_error_message"]
+
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+    async def test_trigger_yields_fail_when_some_tasks_failed(
+        self, mock_hook, trigger: CloudRunJobFinishedTrigger
+    ):
+        """
+        Regression-guard symmetry with the sync path: when ``failed_count > 
0`` and the counts
+        sum to ``task_count`` the deferrable trigger must still report failure.
+        """
+
+        async def _mock_operation(operation_name, location, 
use_regional_endpoint):
+            operation = mock.MagicMock()
+            operation.done = True
+            operation.error = Any()
+            operation.error.ParseFromString(b"")
+            operation.response = _packed_execution_response(task_count=3, 
succeeded_count=1, failed_count=2)
+            return operation
+
+        mock_hook.return_value.get_operation = _mock_operation
+        generator = trigger.run()
+        actual = await generator.asend(None)  # type:ignore[attr-defined]
+        assert actual.payload["status"] == RunJobStatus.FAIL.value
+        assert "Some Cloud Run Job tasks failed" in 
actual.payload["operation_error_message"]
+
     @pytest.mark.asyncio
     
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
     async def test_trigger_on_operation_failed_yield_error(

Reply via email to