This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 86e27c7cd7 Fix cloud run operation timeout error (#34755)
86e27c7cd7 is described below
commit 86e27c7cd7142fc68ff0e01a650d88c8dd786ebe
Author: Freddy Demiane <[email protected]>
AuthorDate: Wed Oct 18 16:21:04 2023 +0200
Fix cloud run operation timeout error (#34755)
---
airflow/providers/google/cloud/hooks/cloud_run.py | 6 ++++--
tests/providers/google/cloud/hooks/test_cloud_run.py | 12 +++++++-----
2 files changed, 11 insertions(+), 7 deletions(-)
diff --git a/airflow/providers/google/cloud/hooks/cloud_run.py
b/airflow/providers/google/cloud/hooks/cloud_run.py
index d2acbc4462..d6f1674f79 100644
--- a/airflow/providers/google/cloud/hooks/cloud_run.py
+++ b/airflow/providers/google/cloud/hooks/cloud_run.py
@@ -165,7 +165,7 @@ class CloudRunAsyncHook(GoogleBaseHook):
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
):
- self._client: JobsAsyncClient = JobsAsyncClient()
+ self._client: JobsAsyncClient | None = None
super().__init__(gcp_conn_id=gcp_conn_id,
impersonation_chain=impersonation_chain)
def get_conn(self):
@@ -175,4 +175,6 @@ class CloudRunAsyncHook(GoogleBaseHook):
return self._client
async def get_operation(self, operation_name: str) ->
operations_pb2.Operation:
- return await
self.get_conn().get_operation(operations_pb2.GetOperationRequest(name=operation_name))
+ return await self.get_conn().get_operation(
+ operations_pb2.GetOperationRequest(name=operation_name),
timeout=120
+ )
diff --git a/tests/providers/google/cloud/hooks/test_cloud_run.py
b/tests/providers/google/cloud/hooks/test_cloud_run.py
index ba53b6de92..aa4c512f63 100644
--- a/tests/providers/google/cloud/hooks/test_cloud_run.py
+++ b/tests/providers/google/cloud/hooks/test_cloud_run.py
@@ -255,19 +255,21 @@ class TestCloudRunAsyncHook:
@mock.patch("airflow.providers.google.cloud.hooks.cloud_run.JobsAsyncClient")
async def test_get_operation(self, mock_client):
expected_operation = {"name": "somename"}
-
- async def _get_operation(name):
- return expected_operation
-
operation_name = "operationname"
mock_client.return_value = mock.MagicMock()
- mock_client.return_value.get_operation = _get_operation
+ mock_client.return_value.get_operation =
self.mock_get_operation(expected_operation)
hook = CloudRunAsyncHook()
hook.get_credentials = self._dummy_get_credentials
returned_operation = await
hook.get_operation(operation_name=operation_name)
+
mock_client.return_value.get_operation.assert_called_once_with(mock.ANY,
timeout=120)
assert returned_operation == expected_operation
+ def mock_get_operation(self, expected_operation):
+ get_operation_mock = mock.AsyncMock()
+ get_operation_mock.return_value = expected_operation
+ return get_operation_mock
+
def _dummy_get_credentials(self):
pass