akshetpandey commented on code in PR #67219:
URL: https://github.com/apache/airflow/pull/67219#discussion_r3270591137


##########
providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py:
##########
@@ -202,6 +203,60 @@ async def _mock_operation(operation_name, location, 
use_regional_endpoint):
             == actual
         )
 
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.asyncio.sleep", 
new_callable=mock.AsyncMock)
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+    async def 
test_trigger_continues_polling_after_retryable_service_unavailable(
+        self, mock_hook, mock_sleep, trigger: CloudRunJobFinishedTrigger
+    ):
+        """
+        Transient 503s from ``get_operation`` should be absorbed at the 
polling boundary so the
+        triggerer keeps re-polling instead of failing the deferred task (which 
would otherwise
+        cascade into Airflow re-submitting the whole Cloud Run job on 
task-level retry).
+        """
+        done_operation = mock.MagicMock()
+        done_operation.done = True
+        done_operation.error = Any()
+        done_operation.error.ParseFromString(b"")
+        done_operation.response = _packed_execution_response(task_count=1, 
succeeded_count=1, failed_count=0)
+
+        mock_hook.return_value.get_operation = mock.AsyncMock(
+            side_effect=[
+                ServiceUnavailable("Service is currently unavailable. Please 
retry the request."),
+                done_operation,
+            ]
+        )
+
+        generator = trigger.run()
+        actual = await generator.asend(None)  # type:ignore[attr-defined]

Review Comment:
   sticks with existing usage in file



##########
providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py:
##########
@@ -202,6 +203,60 @@ async def _mock_operation(operation_name, location, 
use_regional_endpoint):
             == actual
         )
 
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.asyncio.sleep", 
new_callable=mock.AsyncMock)
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+    async def 
test_trigger_continues_polling_after_retryable_service_unavailable(
+        self, mock_hook, mock_sleep, trigger: CloudRunJobFinishedTrigger
+    ):
+        """
+        Transient 503s from ``get_operation`` should be absorbed at the 
polling boundary so the
+        triggerer keeps re-polling instead of failing the deferred task (which 
would otherwise
+        cascade into Airflow re-submitting the whole Cloud Run job on 
task-level retry).
+        """
+        done_operation = mock.MagicMock()
+        done_operation.done = True
+        done_operation.error = Any()
+        done_operation.error.ParseFromString(b"")
+        done_operation.response = _packed_execution_response(task_count=1, 
succeeded_count=1, failed_count=0)
+
+        mock_hook.return_value.get_operation = mock.AsyncMock(
+            side_effect=[
+                ServiceUnavailable("Service is currently unavailable. Please 
retry the request."),
+                done_operation,
+            ]
+        )
+
+        generator = trigger.run()
+        actual = await generator.asend(None)  # type:ignore[attr-defined]
+
+        assert (
+            TriggerEvent(
+                {
+                    "status": RunJobStatus.SUCCESS.value,
+                    "job_name": JOB_NAME,
+                }
+            )
+            == actual
+        )
+        assert mock_hook.return_value.get_operation.await_count == 2
+        mock_sleep.assert_awaited_once_with(POLL_SLEEP)
+
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+    async def test_trigger_propagates_unexpected_polling_exception(
+        self, mock_hook, trigger: CloudRunJobFinishedTrigger
+    ):
+        """
+        Only ``ServiceUnavailable`` should be retried. Any other exception from
+        ``get_operation`` must propagate so Airflow's task-level retry can 
take over.
+        """
+        mock_hook.return_value.get_operation = 
mock.AsyncMock(side_effect=RuntimeError("boom"))
+
+        generator = trigger.run()
+        with pytest.raises(RuntimeError, match="boom"):
+            await generator.asend(None)  # type:ignore[attr-defined]

Review Comment:
   sticks with existing usage in file



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to