Copilot commented on code in PR #67219:
URL: https://github.com/apache/airflow/pull/67219#discussion_r3270559145


##########
providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py:
##########
@@ -112,11 +113,17 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
         timeout = self.timeout
         self.hook = self._get_async_hook()
         while timeout is None or timeout > 0:
-            operation = await self.hook.get_operation(
-                operation_name=self.operation_name,
-                location=self.location,
-                use_regional_endpoint=self.use_regional_endpoint,
-            )
+            try:
+                operation = await self.hook.get_operation(
+                    operation_name=self.operation_name,
+                    location=self.location,
+                    use_regional_endpoint=self.use_regional_endpoint,
+                )
+            except ServiceUnavailable as e:
+                self.log.warning("Cloud Run API is temporarily unavailable. 
Retrying... (%s)", e)
+                await asyncio.sleep(self.polling_period_seconds)

Review Comment:
   Logging a warning on every transient 503 can generate high-volume warning 
logs during an outage. Consider lowering this to `info`/`debug`, or 
rate-limiting the warning (e.g., log the first occurrence then periodically) to 
reduce noise while still keeping observability.



##########
providers/google/src/airflow/providers/google/cloud/triggers/cloud_run.py:
##########
@@ -112,11 +113,17 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
         timeout = self.timeout
         self.hook = self._get_async_hook()
         while timeout is None or timeout > 0:
-            operation = await self.hook.get_operation(
-                operation_name=self.operation_name,
-                location=self.location,
-                use_regional_endpoint=self.use_regional_endpoint,
-            )
+            try:
+                operation = await self.hook.get_operation(
+                    operation_name=self.operation_name,
+                    location=self.location,
+                    use_regional_endpoint=self.use_regional_endpoint,
+                )
+            except ServiceUnavailable as e:
+                self.log.warning("Cloud Run API is temporarily unavailable. 
Retrying... (%s)", e)
+                await asyncio.sleep(self.polling_period_seconds)
+                continue

Review Comment:
   The `continue` after sleeping can inadvertently bypass whatever 
timeout-decrement / elapsed-time accounting happens later in the loop, which 
may cause the trigger to exceed its configured timeout when repeated 
`ServiceUnavailable` errors occur. Adjust the timeout tracking in this 
exception path as well (e.g., subtract the sleep duration from `timeout`, or 
switch to a deadline/monotonic-time based check) so transient 503 retries still 
respect the overall timeout.



##########
providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py:
##########
@@ -202,6 +203,60 @@ async def _mock_operation(operation_name, location, 
use_regional_endpoint):
             == actual
         )
 
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.asyncio.sleep", 
new_callable=mock.AsyncMock)
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+    async def 
test_trigger_continues_polling_after_retryable_service_unavailable(
+        self, mock_hook, mock_sleep, trigger: CloudRunJobFinishedTrigger
+    ):
+        """
+        Transient 503s from ``get_operation`` should be absorbed at the 
polling boundary so the
+        triggerer keeps re-polling instead of failing the deferred task (which 
would otherwise
+        cascade into Airflow re-submitting the whole Cloud Run job on 
task-level retry).
+        """
+        done_operation = mock.MagicMock()
+        done_operation.done = True
+        done_operation.error = Any()
+        done_operation.error.ParseFromString(b"")
+        done_operation.response = _packed_execution_response(task_count=1, 
succeeded_count=1, failed_count=0)
+
+        mock_hook.return_value.get_operation = mock.AsyncMock(
+            side_effect=[
+                ServiceUnavailable("Service is currently unavailable. Please 
retry the request."),
+                done_operation,
+            ]
+        )
+
+        generator = trigger.run()
+        actual = await generator.asend(None)  # type:ignore[attr-defined]
+
+        assert (
+            TriggerEvent(
+                {
+                    "status": RunJobStatus.SUCCESS.value,
+                    "job_name": JOB_NAME,
+                }
+            )
+            == actual
+        )
+        assert mock_hook.return_value.get_operation.await_count == 2
+        mock_sleep.assert_awaited_once_with(POLL_SLEEP)
+
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+    async def test_trigger_propagates_unexpected_polling_exception(
+        self, mock_hook, trigger: CloudRunJobFinishedTrigger
+    ):
+        """
+        Only ``ServiceUnavailable`` should be retried. Any other exception from
+        ``get_operation`` must propagate so Airflow's task-level retry can 
take over.
+        """
+        mock_hook.return_value.get_operation = 
mock.AsyncMock(side_effect=RuntimeError("boom"))
+
+        generator = trigger.run()
+        with pytest.raises(RuntimeError, match="boom"):
+            await generator.asend(None)  # type:ignore[attr-defined]

Review Comment:
   The type-ignore directive formatting should be `# type: 
ignore[attr-defined]` (note the space) to match standard typing comment 
conventions and satisfy common linters.



##########
providers/google/tests/unit/google/cloud/triggers/test_cloud_run.py:
##########
@@ -202,6 +203,60 @@ async def _mock_operation(operation_name, location, 
use_regional_endpoint):
             == actual
         )
 
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.asyncio.sleep", 
new_callable=mock.AsyncMock)
+    
@mock.patch("airflow.providers.google.cloud.triggers.cloud_run.CloudRunAsyncHook")
+    async def 
test_trigger_continues_polling_after_retryable_service_unavailable(
+        self, mock_hook, mock_sleep, trigger: CloudRunJobFinishedTrigger
+    ):
+        """
+        Transient 503s from ``get_operation`` should be absorbed at the 
polling boundary so the
+        triggerer keeps re-polling instead of failing the deferred task (which 
would otherwise
+        cascade into Airflow re-submitting the whole Cloud Run job on 
task-level retry).
+        """
+        done_operation = mock.MagicMock()
+        done_operation.done = True
+        done_operation.error = Any()
+        done_operation.error.ParseFromString(b"")
+        done_operation.response = _packed_execution_response(task_count=1, 
succeeded_count=1, failed_count=0)
+
+        mock_hook.return_value.get_operation = mock.AsyncMock(
+            side_effect=[
+                ServiceUnavailable("Service is currently unavailable. Please 
retry the request."),
+                done_operation,
+            ]
+        )
+
+        generator = trigger.run()
+        actual = await generator.asend(None)  # type:ignore[attr-defined]

Review Comment:
   The type-ignore directive formatting should be `# type: 
ignore[attr-defined]` (note the space) to match standard typing comment 
conventions and satisfy common linters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to