This is an automated email from the ASF dual-hosted git repository.
shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 175da03afe9 🩹 fix: Use GoogleBaseAsyncHook (#55316)
175da03afe9 is described below
commit 175da03afe9bad967cf6d8c01bd246b6518f41ee
Author: Morgan <[email protected]>
AuthorDate: Sat Sep 6 16:51:58 2025 +1000
🩹 fix: Use GoogleBaseAsyncHook (#55316)
When run with `deferrable=True`, the `CloudRunExecuteJobOperator` fails
with the error `You cannot use AsyncToSync in the same thread as an async
event loop - just await the async function directly`
This is because the `__init__` method of the GoogleBaseHook makes a
blocking call to retrieve extra details for the connection.
Inheriting from the existing GoogleBaseAsyncHook in the
`CloudRunAsyncHook` prevents this issue.
Co-authored-by: Morgan Kerle <[email protected]>
---
.../providers/google/cloud/hooks/cloud_run.py | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py
b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py
index d555e8034d1..ef7452336e3 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_run.py
@@ -42,7 +42,11 @@ from google.longrunning import operations_pb2
from airflow.exceptions import AirflowException
from airflow.providers.google.common.consts import CLIENT_INFO
-from airflow.providers.google.common.hooks.base_google import
PROVIDE_PROJECT_ID, GoogleBaseHook
+from airflow.providers.google.common.hooks.base_google import (
+ PROVIDE_PROJECT_ID,
+ GoogleBaseAsyncHook,
+ GoogleBaseHook,
+)
if TYPE_CHECKING:
from google.api_core import operation
@@ -159,7 +163,7 @@ class CloudRunHook(GoogleBaseHook):
return list(itertools.islice(jobs, limit))
-class CloudRunAsyncHook(GoogleBaseHook):
+class CloudRunAsyncHook(GoogleBaseAsyncHook):
"""
Async hook for the Google Cloud Run service.
@@ -174,6 +178,8 @@ class CloudRunAsyncHook(GoogleBaseHook):
account from the list granting this role to the originating account.
"""
+ sync_hook_class = GoogleBaseHook
+
def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
@@ -183,16 +189,16 @@ class CloudRunAsyncHook(GoogleBaseHook):
self._client: JobsAsyncClient | None = None
super().__init__(gcp_conn_id=gcp_conn_id,
impersonation_chain=impersonation_chain, **kwargs)
- def get_conn(self):
+ async def get_conn(self):
if self._client is None:
- self._client = JobsAsyncClient(credentials=self.get_credentials(),
client_info=CLIENT_INFO)
+ sync_hook = await self.get_sync_hook()
+ self._client =
JobsAsyncClient(credentials=sync_hook.get_credentials(),
client_info=CLIENT_INFO)
return self._client
async def get_operation(self, operation_name: str) ->
operations_pb2.Operation:
- return await self.get_conn().get_operation(
- operations_pb2.GetOperationRequest(name=operation_name),
timeout=120
- )
+ conn = await self.get_conn()
+ return await
conn.get_operation(operations_pb2.GetOperationRequest(name=operation_name),
timeout=120)
class CloudRunServiceHook(GoogleBaseHook):