This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 349de0187b6 Fix deferrable execution_timeout handling in
DbtCloudRunJobOperator (#67360)
349de0187b6 is described below
commit 349de0187b679f8f70affeee039f35f3c575f398
Author: SameerMesiah97 <[email protected]>
AuthorDate: Mon May 25 16:00:34 2026 +0100
Fix deferrable execution_timeout handling in DbtCloudRunJobOperator (#67360)
Prevent framework-level deferred timeouts from bypassing
execute_complete() cancellation handling for dbt job runs.
Co-authored-by: Sameer Mesiah <[email protected]>
---
.../airflow/providers/dbt/cloud/operators/dbt.py | 2 +-
.../tests/unit/dbt/cloud/operators/test_dbt.py | 54 +++++++++++++++++++++-
2 files changed, 54 insertions(+), 2 deletions(-)
diff --git
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
index 9c519a78461..088e5f48de2 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
@@ -259,7 +259,7 @@ class DbtCloudRunJobOperator(BaseOperator):
job_run_status = self.hook.get_job_run_status(**job_run_info)
if not DbtCloudJobRunStatus.is_terminal(job_run_status):
self.defer(
- timeout=self.execution_timeout,
+ timeout=None,
trigger=DbtCloudRunJobTrigger(
conn_id=self.dbt_cloud_conn_id,
run_id=self.run_id,
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
index 2d2ed3c28f1..bb5988acdea 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
@@ -18,7 +18,7 @@ from __future__ import annotations
import os
from datetime import timedelta
-from unittest.mock import MagicMock, patch
+from unittest.mock import ANY, MagicMock, patch
import pytest
@@ -180,6 +180,58 @@ class TestDbtCloudRunJobOperator:
dbt_op.execute(MagicMock())
assert not mock_defer.called
+ @patch(
+
"airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_run_status",
+ return_value=DbtCloudJobRunStatus.QUEUED.value,
+ )
+
@patch("airflow.providers.dbt.cloud.operators.dbt.DbtCloudRunJobOperator.defer")
+ @patch("airflow.providers.dbt.cloud.operators.dbt.DbtCloudRunJobTrigger")
+ @patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_connection")
+ @patch(
+ "airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.trigger_job_run",
+ return_value=mock_response_json(DEFAULT_ACCOUNT_JOB_RUN_RESPONSE),
+ )
+ def test_execute_deferrable_does_not_pass_execution_timeout_to_defer(
+ self,
+ mock_trigger_job_run,
+ mock_dbt_hook,
+ mock_dbt_trigger,
+ mock_defer,
+ mock_job_run_status,
+ ):
+ dbt_op = DbtCloudRunJobOperator(
+ dbt_cloud_conn_id=ACCOUNT_ID_CONN,
+ task_id=TASK_ID,
+ job_id=JOB_ID,
+ check_interval=1,
+ timeout=3,
+ dag=self.dag,
+ deferrable=True,
+ execution_timeout=timedelta(seconds=3),
+ )
+
+ dbt_op.execute(MagicMock())
+
+ # Explicitly pass timeout=None to defer() so Airflow's framework-level
+ # deferred timeout handling does not raise TaskDeferredTimeout before
+ # execute_complete() can perform dbt job cancellation.
+ mock_defer.assert_called_once_with(
+ method_name="execute_complete",
+ trigger=mock_dbt_trigger.return_value,
+ timeout=None,
+ )
+
+ # The dbt trigger should still receive the calculated execution
deadline
+ # used for dbt job cancellation handling within execute_complete().
+ mock_dbt_trigger.assert_called_once_with(
+ conn_id=ACCOUNT_ID_CONN,
+ run_id=5555,
+ end_time=ANY,
+ execution_deadline=ANY,
+ account_id=None,
+ poll_interval=1,
+ )
+
@pytest.mark.parametrize(
"status",
(