This is an automated email from the ASF dual-hosted git repository.
shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new febf1fe70ee fix DataprocSubmitTrigger deferred tasks stuck forever
(#62082)
febf1fe70ee is described below
commit febf1fe70ee2c6efbe793216c2512842646471a0
Author: olegkachur-e <[email protected]>
AuthorDate: Fri Feb 27 11:16:42 2026 +0100
fix DataprocSubmitTrigger deferred tasks stuck forever (#62082)
- To prevent tasks getting stuck in the deffered state, as a result
of sync_hook calls thread stuck on retrieveing credentials.
Observed on secrets storage connection retrival.
Co-authored-by: Oleg Kachur <[email protected]>
---
.../airflow/providers/google/cloud/triggers/dataproc.py | 8 +++++---
.../tests/unit/google/cloud/triggers/test_dataproc.py | 17 ++++++++++-------
2 files changed, 15 insertions(+), 10 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
b/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
index ffe9b9aeaa6..73dd18c4c29 100644
--- a/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
+++ b/providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py
@@ -187,11 +187,13 @@ class DataprocSubmitTrigger(DataprocBaseTrigger):
return task_state != TaskInstanceState.DEFERRED
async def run(self):
+ hook = self.get_async_hook()
+ # Trigger client cache with sync call get_credentials(), evaluated
once.
+ await hook.get_job_client(region=self.region)
+
try:
while True:
- job = await self.get_async_hook().get_job(
- project_id=self.project_id, region=self.region,
job_id=self.job_id
- )
+ job = await hook.get_job(project_id=self.project_id,
region=self.region, job_id=self.job_id)
state = job.status.state
self.log.info("Dataproc job: %s is in state: %s", self.job_id,
state)
if state in (JobStatus.State.DONE, JobStatus.State.CANCELLED,
JobStatus.State.ERROR):
diff --git a/providers/google/tests/unit/google/cloud/triggers/test_dataproc.py
b/providers/google/tests/unit/google/cloud/triggers/test_dataproc.py
index 7a85acba118..aa66d2237ed 100644
--- a/providers/google/tests/unit/google/cloud/triggers/test_dataproc.py
+++ b/providers/google/tests/unit/google/cloud/triggers/test_dataproc.py
@@ -587,10 +587,8 @@ class TestDataprocSubmitTrigger:
async def test_submit_trigger_run_success(self, mock_get_async_hook,
submit_trigger):
"""Test the trigger correctly handles a job completion."""
mock_job = Job(status=JobStatus(state=JobStatus.State.DONE))
- future = asyncio.Future()
- future.set_result(mock_job)
- mock_get_async_hook.return_value.get_job.return_value = future
-
+ mock_get_async_hook.return_value.get_job_client = mock.AsyncMock()
+ mock_get_async_hook.return_value.get_job =
mock.AsyncMock(return_value=mock_job)
async_gen = submit_trigger.run()
event = await async_gen.asend(None)
expected_event = TriggerEvent(
@@ -603,9 +601,12 @@ class TestDataprocSubmitTrigger:
async def test_submit_trigger_run_error(self, mock_get_async_hook,
submit_trigger):
"""Test the trigger correctly handles a job error."""
mock_job = Job(status=JobStatus(state=JobStatus.State.ERROR))
- future = asyncio.Future()
- future.set_result(mock_job)
- mock_get_async_hook.return_value.get_job.return_value = future
+ mock_get_async_hook.return_value.get_job_client = mock.AsyncMock()
+ mock_get_async_hook.return_value.get_job =
mock.AsyncMock(return_value=mock_job)
+
+ # future = asyncio.Future()
+ # future.set_result(mock_job)
+ # mock_get_async_hook.return_value.get_job.return_value = future
async_gen = submit_trigger.run()
event = await async_gen.asend(None)
@@ -625,6 +626,8 @@ class TestDataprocSubmitTrigger:
"""Test the trigger correctly handles an asyncio.CancelledError."""
mock_safe_to_cancel.return_value = is_safe_to_cancel
mock_async_hook = mock_get_async_hook.return_value
+ mock_async_hook.get_job_client = mock.AsyncMock()
+
mock_async_hook.get_job.side_effect = asyncio.CancelledError
mock_sync_hook = mock_get_sync_hook.return_value