This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e538b13e16a fix(providers/google): wrap sync get_job with
sync_to_async in BigQueryAsyncHook (#63230)
e538b13e16a is described below
commit e538b13e16afa6c19dd5c642834fa89ae44b904c
Author: Yoann <[email protected]>
AuthorDate: Tue Mar 10 12:42:04 2026 -0700
fix(providers/google): wrap sync get_job with sync_to_async in
BigQueryAsyncHook (#63230)
BigQueryAsyncHook._get_job() calls sync_hook.get_job() directly, blocking
the event loop in triggers. This is a regression from #56363 which removed
the async wrapping but left the sync call in place.
Wrap with asgiref.sync_to_async to run the blocking call in a thread.
Fixes apache/airflow#63182
---
.../airflow/providers/google/cloud/hooks/bigquery.py | 3 ++-
.../tests/unit/google/cloud/hooks/test_bigquery.py | 17 +++++++++++++++++
2 files changed, 19 insertions(+), 1 deletion(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index 1ee60b524ee..c5b395b2fab 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -33,6 +33,7 @@ from typing import TYPE_CHECKING, Any, Literal, NoReturn,
cast, overload
import pendulum
from aiohttp import ClientSession as ClientSession
+from asgiref.sync import sync_to_async
from gcloud.aio.bigquery import Job, Table as Table_async
from google.cloud.bigquery import (
DEFAULT_RETRY,
@@ -2089,7 +2090,7 @@ class BigQueryAsyncHook(GoogleBaseAsyncHook):
) -> BigQueryJob | UnknownJob:
"""Get BigQuery job by its ID, project ID and location."""
sync_hook = await self.get_sync_hook()
- job = sync_hook.get_job(job_id=job_id, project_id=project_id,
location=location)
+ job = await sync_to_async(sync_hook.get_job)(job_id=job_id,
project_id=project_id, location=location)
return job
async def get_job_status(
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index bbd4f64bf46..6a6fa8b3f3b 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -1548,6 +1548,23 @@ class TestBigQueryAsyncHookMethods:
result = await hook.get_job_instance(project_id=PROJECT_ID,
job_id=JOB_ID, session=mock_session)
assert isinstance(result, Job)
+ @pytest.mark.asyncio
+ @mock.patch("airflow.providers.google.cloud.hooks.bigquery.sync_to_async")
+
@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook.get_sync_hook")
+ async def test_get_job_runs_via_sync_to_async(self, mock_get_sync_hook,
mock_sync_to_async):
+ """Verify _get_job wraps the sync get_job call with sync_to_async
(#63182)."""
+ mock_sync_hook = mock.MagicMock()
+ mock_get_sync_hook.return_value = mock_sync_hook
+
+ mock_async_get_job = mock.AsyncMock(return_value=mock.MagicMock())
+ mock_sync_to_async.return_value = mock_async_get_job
+
+ hook = BigQueryAsyncHook()
+ await hook._get_job(job_id=JOB_ID, project_id=PROJECT_ID,
location="US")
+
+ mock_sync_to_async.assert_called_once_with(mock_sync_hook.get_job)
+ mock_async_get_job.assert_awaited_once_with(job_id=JOB_ID,
project_id=PROJECT_ID, location="US")
+
@pytest.mark.parametrize(
("job_state", "error_result", "expected"),
[