shahar1 commented on code in PR #67219:
URL: https://github.com/apache/airflow/pull/67219#discussion_r3287075808


##########
providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py:
##########
@@ -202,6 +211,77 @@ async def _mock_operation(operation_name, location, 
use_regional_endpoint):
             == actual
         )
 
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "transient_exception",
+        [
+            pytest.param(ServiceUnavailable("Service is currently 
unavailable."), id="ServiceUnavailable"),
+            pytest.param(InternalServerError("Internal server error."), 
id="InternalServerError"),
+            pytest.param(DeadlineExceeded("Deadline exceeded."), 
id="DeadlineExceeded"),
+            pytest.param(GatewayTimeout("Gateway timeout."), 
id="GatewayTimeout"),
+            pytest.param(ResourceExhausted("Quota exceeded."), 
id="ResourceExhausted"),
+            pytest.param(Aborted("Aborted."), id="Aborted"),
+        ],
+    )
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.asyncio.sleep", 
new_callable=mock.AsyncMock)
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+    async def test_trigger_continues_polling_after_retryable_grpc_error(
+        self, mock_hook, mock_sleep, transient_exception, trigger: 
CloudRunJobFinishedTrigger
+    ):
+        """
+        Transient gRPC errors from ``get_operation`` should be absorbed at the 
polling boundary
+        so the triggerer keeps re-polling instead of failing the deferred task 
(which would
+        otherwise cascade into Airflow re-submitting the whole Cloud Run job 
on task-level
+        retry). Covers every error class in the retryable tuple.
+        """
+        done_operation = mock.MagicMock()
+        done_operation.done = True
+        done_operation.error = Any()
+        done_operation.error.ParseFromString(b"")
+        done_operation.response = _packed_execution_response(task_count=1, 
succeeded_count=1, failed_count=0)
+
+        mock_hook.return_value.get_operation = mock.AsyncMock(
+            side_effect=[transient_exception, done_operation]
+        )
+
+        generator = trigger.run()
+        actual = await generator.asend(None)  # type:ignore[attr-defined]
+
+        assert (
+            TriggerEvent(
+                {
+                    "status": RunJobStatus.SUCCESS.value,
+                    "job_name": JOB_NAME,
+                }
+            )
+            == actual
+        )
+        assert mock_hook.return_value.get_operation.await_count == 2
+        mock_sleep.assert_awaited_once_with(POLL_SLEEP)
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "fatal_exception",
+        [
+            pytest.param(PermissionDenied("Permission denied."), 
id="PermissionDenied"),
+            pytest.param(RuntimeError("boom"), id="RuntimeError"),
+        ],
+    )
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+    async def test_trigger_propagates_unexpected_polling_exception(
+        self, mock_hook, fatal_exception, trigger: CloudRunJobFinishedTrigger
+    ):
+        """
+        Only the retryable gRPC error set should be absorbed. Anything outside 
that tuple
+        (auth failures, permission denied, unexpected runtime errors, ...) 
must propagate so
+        Airflow's task-level retry can take over.
+        """
+        mock_hook.return_value.get_operation = 
mock.AsyncMock(side_effect=fatal_exception)
+
+        generator = trigger.run()
+        with pytest.raises(type(fatal_exception)):
+            await generator.asend(None)  # type:ignore[attr-defined]
+

Review Comment:
   We probably also want to assert that `asyncio.sleep` was not awaited



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to