shahar1 commented on code in PR #67219:
URL: https://github.com/apache/airflow/pull/67219#discussion_r3287075808
##########
providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py:
##########
@@ -202,6 +211,77 @@ async def _mock_operation(operation_name, location,
use_regional_endpoint):
== actual
)
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "transient_exception",
+ [
+ pytest.param(ServiceUnavailable("Service is currently
unavailable."), id="ServiceUnavailable"),
+ pytest.param(InternalServerError("Internal server error."),
id="InternalServerError"),
+ pytest.param(DeadlineExceeded("Deadline exceeded."),
id="DeadlineExceeded"),
+ pytest.param(GatewayTimeout("Gateway timeout."),
id="GatewayTimeout"),
+ pytest.param(ResourceExhausted("Quota exceeded."),
id="ResourceExhausted"),
+ pytest.param(Aborted("Aborted."), id="Aborted"),
+ ],
+ )
+
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.asyncio.sleep",
new_callable=mock.AsyncMock)
+
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+ async def test_trigger_continues_polling_after_retryable_grpc_error(
+ self, mock_hook, mock_sleep, transient_exception, trigger:
CloudRunJobFinishedTrigger
+ ):
+ """
+ Transient gRPC errors from ``get_operation`` should be absorbed at the
polling boundary
+ so the triggerer keeps re-polling instead of failing the deferred task
(which would
+ otherwise cascade into Airflow re-submitting the whole Cloud Run job
on task-level
+ retry). Covers every error class in the retryable tuple.
+ """
+ done_operation = mock.MagicMock()
+ done_operation.done = True
+ done_operation.error = Any()
+ done_operation.error.ParseFromString(b"")
+ done_operation.response = _packed_execution_response(task_count=1,
succeeded_count=1, failed_count=0)
+
+ mock_hook.return_value.get_operation = mock.AsyncMock(
+ side_effect=[transient_exception, done_operation]
+ )
+
+ generator = trigger.run()
+ actual = await generator.asend(None) # type:ignore[attr-defined]
+
+ assert (
+ TriggerEvent(
+ {
+ "status": RunJobStatus.SUCCESS.value,
+ "job_name": JOB_NAME,
+ }
+ )
+ == actual
+ )
+ assert mock_hook.return_value.get_operation.await_count == 2
+ mock_sleep.assert_awaited_once_with(POLL_SLEEP)
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ "fatal_exception",
+ [
+ pytest.param(PermissionDenied("Permission denied."),
id="PermissionDenied"),
+ pytest.param(RuntimeError("boom"), id="RuntimeError"),
+ ],
+ )
+
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+ async def test_trigger_propagates_unexpected_polling_exception(
+ self, mock_hook, fatal_exception, trigger: CloudRunJobFinishedTrigger
+ ):
+ """
+ Only the retryable gRPC error set should be absorbed. Anything outside
that tuple
+ (auth failures, permission denied, unexpected runtime errors, ...)
must propagate so
+ Airflow's task-level retry can take over.
+ """
+ mock_hook.return_value.get_operation =
mock.AsyncMock(side_effect=fatal_exception)
+
+ generator = trigger.run()
+ with pytest.raises(type(fatal_exception)):
+ await generator.asend(None) # type:ignore[attr-defined]
+
Review Comment:
We probably want to assert that `asyncio.sleep` was not awaited
##########
providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py:
##########
@@ -112,11 +113,17 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
timeout = self.timeout
self.hook = self._get_async_hook()
while timeout is None or timeout > 0:
- operation = await self.hook.get_operation(
- operation_name=self.operation_name,
- location=self.location,
- use_regional_endpoint=self.use_regional_endpoint,
- )
+ try:
+ operation = await self.hook.get_operation(
+ operation_name=self.operation_name,
+ location=self.location,
+ use_regional_endpoint=self.use_regional_endpoint,
+ )
+ except ServiceUnavailable as e:
+ self.log.warning("Cloud Run API is temporarily unavailable.
Retrying... (%s)", e)
+ await asyncio.sleep(self.polling_period_seconds)
+ continue
Review Comment:
In #66293 there wasn't a timeout definition like here (we probably need to
have it there as well).
Once a `timeout` is defined, it should be handled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]