This is an automated email from the ASF dual-hosted git repository.
josh-fell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bc8ef9a4b5c Refactor deferrable execution in DbtCloudRunJobOperator to
align with PR #64051 patterns. Simplify trigger polling/control flow and
propagate execution_timeout via defer. Add tests for best-effort cancellation
semantics in execute_complete and on_kill. (#66449)
bc8ef9a4b5c is described below
commit bc8ef9a4b5cfb7e24e1a495df68f4897bedce837
Author: SameerMesiah97 <[email protected]>
AuthorDate: Sat May 16 13:37:08 2026 +0100
Refactor deferrable execution in DbtCloudRunJobOperator to align with PR
#64051 patterns. Simplify trigger polling/control flow and propagate
execution_timeout via defer. Add tests for best-effort cancellation semantics
in execute_complete and on_kill. (#66449)
---
.../airflow/providers/dbt/cloud/operators/dbt.py | 38 ++-
.../airflow/providers/dbt/cloud/triggers/dbt.py | 72 +++---
.../tests/unit/dbt/cloud/operators/test_dbt.py | 133 +++++++++-
.../tests/unit/dbt/cloud/triggers/test_dbt.py | 282 ++++++---------------
scripts/ci/prek/known_airflow_exceptions.txt | 1 -
5 files changed, 273 insertions(+), 253 deletions(-)
diff --git
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
index 0718b1f5abd..9c519a78461 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
@@ -23,7 +23,7 @@ from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, Any
-from airflow.providers.common.compat.sdk import AirflowException,
BaseOperator, BaseOperatorLink, XCom, conf
+from airflow.providers.common.compat.sdk import BaseOperator,
BaseOperatorLink, XCom, conf
from airflow.providers.dbt.cloud.hooks.dbt import (
DbtCloudHook,
DbtCloudJobRunException,
@@ -250,16 +250,16 @@ class DbtCloudRunJobOperator(BaseOperator):
# execution_timeout is a hard task-level limit (cancels the job),
# while timeout only limits how long we wait for the job to finish.
# If both are set, the earliest deadline wins.
- end_time = time.time() + self.timeout
+ end_time = time.monotonic() + self.timeout
execution_deadline = None
- if self.execution_timeout:
- execution_deadline = time.time() +
self.execution_timeout.total_seconds()
+ if self.execution_timeout is not None:
+ execution_deadline = time.monotonic() +
self.execution_timeout.total_seconds()
job_run_info = JobRunInfo(account_id=self.account_id,
run_id=self.run_id)
job_run_status = self.hook.get_job_run_status(**job_run_info)
if not DbtCloudJobRunStatus.is_terminal(job_run_status):
self.defer(
- timeout=None,
+ timeout=self.execution_timeout,
trigger=DbtCloudRunJobTrigger(
conn_id=self.dbt_cloud_conn_id,
run_id=self.run_id,
@@ -293,8 +293,22 @@ class DbtCloudRunJobOperator(BaseOperator):
# Enforce execution_timeout semantics in deferrable mode by cancelling
the job.
if event["status"] == "timeout":
- self.hook.cancel_job_run(account_id=self.account_id,
run_id=self.run_id)
- raise AirflowException(f"Job run {self.run_id} has timed out.")
+ if self.run_id is not None:
+ self.log.info("Cancelling DBT job run %s due to execution
timeout", self.run_id)
+
+ # Attempt best-effort job run cancellation.
+ try:
+ self.hook.cancel_job_run(account_id=self.account_id,
run_id=self.run_id)
+ except Exception:
+ self.log.warning(
+ "Failed to cancel DBT job run %s after timeout",
+ self.run_id,
+ exc_info=True,
+ )
+ else:
+ self.log.warning("No run_id found; skipping cancellation")
+
+ raise DbtCloudJobRunException(f"Job run {self.run_id} has timed
out.")
self.log.info(event["message"])
return int(event["run_id"])
@@ -303,7 +317,15 @@ class DbtCloudRunJobOperator(BaseOperator):
if not self.run_id:
return
- self.hook.cancel_job_run(account_id=self.account_id,
run_id=self.run_id)
+ # Attempt best-effort job run cancellation.
+ try:
+ self.hook.cancel_job_run(account_id=self.account_id,
run_id=self.run_id)
+ except Exception:
+ self.log.warning(
+ "Failed to cancel DBT job run %s during on_kill",
+ self.run_id,
+ exc_info=True,
+ )
# Attempt best-effort confirmation of cancellation.
try:
diff --git
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
index a0bb91861a8..8efd918780b 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
@@ -79,9 +79,43 @@ class DbtCloudRunJobTrigger(BaseTrigger):
"""Make async connection to Dbt, polls for the pipeline run status."""
hook = DbtCloudHook(self.conn_id, **self.hook_params)
try:
- while await self.is_still_running(hook):
+ while True:
+ now = time.monotonic()
+
+ job_run_status = await hook.get_job_status(self.run_id,
self.account_id)
+
+ if job_run_status == DbtCloudJobRunStatus.SUCCESS.value:
+ yield TriggerEvent(
+ {
+ "status": "success",
+ "message": f"Job run {self.run_id} has completed
successfully.",
+ "run_id": self.run_id,
+ }
+ )
+ return
+
+ elif job_run_status == DbtCloudJobRunStatus.CANCELLED.value:
+ yield TriggerEvent(
+ {
+ "status": "cancelled",
+ "message": f"Job run {self.run_id} has been
cancelled.",
+ "run_id": self.run_id,
+ }
+ )
+ return
+
+ elif job_run_status == DbtCloudJobRunStatus.ERROR.value:
+ yield TriggerEvent(
+ {
+ "status": "error",
+ "message": f"Job run {self.run_id} has failed.",
+ "run_id": self.run_id,
+ }
+ )
+ return
+
if self.execution_deadline is not None:
- if self.execution_deadline < time.time():
+ if self.execution_deadline <= now:
yield TriggerEvent(
{
"status": "timeout",
@@ -91,11 +125,7 @@ class DbtCloudRunJobTrigger(BaseTrigger):
)
return
- if self.end_time < time.time():
- # Perform a final status check before declaring timeout,
in case the
- # job completed between the last poll and the timeout
expiry.
- if not await self.is_still_running(hook):
- break
+ if self.end_time <= now:
yield TriggerEvent(
{
"status": "error",
@@ -105,34 +135,12 @@ class DbtCloudRunJobTrigger(BaseTrigger):
}
)
return
+
await asyncio.sleep(self.poll_interval)
- job_run_status = await hook.get_job_status(self.run_id,
self.account_id)
- if job_run_status == DbtCloudJobRunStatus.SUCCESS.value:
- yield TriggerEvent(
- {
- "status": "success",
- "message": f"Job run {self.run_id} has completed
successfully.",
- "run_id": self.run_id,
- }
- )
- elif job_run_status == DbtCloudJobRunStatus.CANCELLED.value:
- yield TriggerEvent(
- {
- "status": "cancelled",
- "message": f"Job run {self.run_id} has been
cancelled.",
- "run_id": self.run_id,
- }
- )
- else:
- yield TriggerEvent(
- {
- "status": "error",
- "message": f"Job run {self.run_id} has failed.",
- "run_id": self.run_id,
- }
- )
+
except Exception as e:
yield TriggerEvent({"status": "error", "message": str(e),
"run_id": self.run_id})
+ return
async def is_still_running(self, hook: DbtCloudHook) -> bool:
"""Check whether the submitted job is running."""
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
index e5df84277a1..2d2ed3c28f1 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
@@ -23,7 +23,7 @@ from unittest.mock import MagicMock, patch
import pytest
from airflow.models import DAG, Connection
-from airflow.providers.common.compat.sdk import AirflowException,
TaskDeferred, timezone
+from airflow.providers.common.compat.sdk import TaskDeferred, timezone
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook,
DbtCloudJobRunException, DbtCloudJobRunStatus
from airflow.providers.dbt.cloud.operators.dbt import (
DbtCloudGetJobRunArtifactOperator,
@@ -199,7 +199,7 @@ class TestDbtCloudRunJobOperator:
def test_dbt_run_job_op_async(self, mock_trigger_job_run, mock_dbt_hook,
mock_job_run_status, status):
"""
Asserts that a task is deferred and an DbtCloudRunJobTrigger will be
fired
- when the DbtCloudRunJobOperator has deferrable param set to True
+ when the DbtCloudRunJobOperator has deferrable param set to True.
"""
mock_job_run_status.return_value = status
dbt_op = DbtCloudRunJobOperator(
@@ -215,6 +215,40 @@ class TestDbtCloudRunJobOperator:
dbt_op.execute(MagicMock())
assert isinstance(exc.value.trigger, DbtCloudRunJobTrigger), "Trigger
is not a DbtCloudRunJobTrigger"
+ def test_execute_complete_timeout_without_run_id(self):
+ """
+ Verify that when a deferrable dbt job emits a timeout event with no
run_id,
+ the operator cancels the job and fails.
+ """
+
+ operator = DbtCloudRunJobOperator(
+ task_id=TASK_ID,
+ dbt_cloud_conn_id=ACCOUNT_ID_CONN,
+ job_id=JOB_ID,
+ dag=self.dag,
+ deferrable=True,
+ )
+
+ # Pretend the job was already triggered.
+ operator.run_id = None
+
+ # Mock the hook so we can assert cancellation.
+ operator.hook = MagicMock()
+
+ timeout_event = {
+ "status": "timeout",
+ "run_id": None,
+ "message": "Job run timed out.",
+ }
+
+ with pytest.raises(DbtCloudJobRunException):
+ operator.execute_complete(
+ context=self.mock_context,
+ event=timeout_event,
+ )
+
+ operator.hook.cancel_job_run.assert_not_called()
+
def test_execute_complete_timeout_cancels_job(self):
"""
Verify that when a deferrable dbt job emits a timeout event,
@@ -240,7 +274,45 @@ class TestDbtCloudRunJobOperator:
"message": "Job run timed out.",
}
- with pytest.raises(AirflowException, match="has timed out"):
+ with pytest.raises(DbtCloudJobRunException, match="has timed out"):
+ operator.execute_complete(
+ context=self.mock_context,
+ event=timeout_event,
+ )
+
+ operator.hook.cancel_job_run.assert_called_once_with(
+ account_id=operator.account_id,
+ run_id=RUN_ID,
+ )
+
+ def
test_execute_complete_timeout_cancel_job_does_not_mask_original_error(self):
+ """
+ Verify that when a deferrable dbt job is cancelled after a timeout
event is received,
+ the original error is not masked.
+ """
+ operator = DbtCloudRunJobOperator(
+ task_id=TASK_ID,
+ dbt_cloud_conn_id=ACCOUNT_ID_CONN,
+ job_id=JOB_ID,
+ dag=self.dag,
+ deferrable=True,
+ )
+
+ # Pretend the job was already triggered.
+ operator.run_id = RUN_ID
+
+ # Mock the hook so we can assert cancellation.
+ operator.hook = MagicMock()
+
+ operator.hook.cancel_job_run.side_effect = Exception("Cancellation
failed")
+
+ timeout_event = {
+ "status": "timeout",
+ "run_id": RUN_ID,
+ "message": "Job run timed out.",
+ }
+
+ with pytest.raises(DbtCloudJobRunException, match="has timed out"):
operator.execute_complete(
context=self.mock_context,
event=timeout_event,
@@ -690,6 +762,61 @@ class TestDbtCloudRunJobOperator:
additional_run_config=self.config["additional_run_config"],
)
+ def test_on_kill_cancels_job_and_confirms_success(self):
+ operator = DbtCloudRunJobOperator(
+ task_id=TASK_ID,
+ dbt_cloud_conn_id=ACCOUNT_ID_CONN,
+ job_id=JOB_ID,
+ dag=self.dag,
+ )
+
+ operator.run_id = RUN_ID
+ operator.hook = MagicMock()
+
+ # Simulate successful cancellation confirmation.
+ operator.hook.wait_for_job_run_status.return_value = True
+
+ operator.on_kill()
+
+ operator.hook.cancel_job_run.assert_called_once_with(
+ account_id=operator.account_id,
+ run_id=RUN_ID,
+ )
+
+ operator.hook.wait_for_job_run_status.assert_called_once_with(
+ run_id=RUN_ID,
+ account_id=operator.account_id,
+ expected_statuses=DbtCloudJobRunStatus.CANCELLED.value,
+ check_interval=operator.check_interval,
+ timeout=operator.timeout,
+ )
+
+ def test_on_kill_best_effort_cancellation_does_not_raise(self):
+ operator = DbtCloudRunJobOperator(
+ task_id=TASK_ID,
+ dbt_cloud_conn_id=ACCOUNT_ID_CONN,
+ job_id=JOB_ID,
+ dag=self.dag,
+ )
+
+ operator.run_id = RUN_ID
+ operator.hook = MagicMock()
+
+ # Simulate cancellation failure.
+ operator.hook.cancel_job_run.side_effect = Exception("Cancellation
failed")
+
+ # Simulate confirmation also failing (normal path).
+ operator.hook.wait_for_job_run_status.side_effect =
DbtCloudJobRunException("Still running")
+
+ operator.on_kill()
+
+ operator.hook.cancel_job_run.assert_called_once_with(
+ account_id=operator.account_id,
+ run_id=RUN_ID,
+ )
+
+ operator.hook.wait_for_job_run_status.assert_called_once()
+
@pytest.mark.parametrize(
("conn_id", "account_id"),
[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
index 4d1d971f60d..12c94d19b9e 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
@@ -34,17 +34,23 @@ class TestDbtCloudRunJobTrigger:
RUN_ID = 1234
CONN_ID = "dbt_cloud_default"
ACCOUNT_ID = 12340
- END_TIME = time.time() + 60 * 60 * 24 * 7
- EXECUTION_DEADLINE = time.time() + 60 * 60 * 24 * 7
POLL_INTERVAL = 3.0
- def test_serialization(self):
+ @pytest.fixture
+ def end_time(self):
+ return time.monotonic() + 60 * 60 * 24 * 7
+
+ @pytest.fixture
+ def execution_deadline(self):
+ return time.monotonic() + 60 * 60 * 24 * 7
+
+ def test_serialization(self, end_time, execution_deadline):
"""Assert DbtCloudRunJobTrigger correctly serializes its arguments and
classpath."""
trigger = DbtCloudRunJobTrigger(
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
- end_time=self.END_TIME,
- execution_deadline=self.EXECUTION_DEADLINE,
+ end_time=end_time,
+ execution_deadline=execution_deadline,
run_id=self.RUN_ID,
account_id=self.ACCOUNT_ID,
hook_params={"retry_delay": 10},
@@ -55,28 +61,29 @@ class TestDbtCloudRunJobTrigger:
"run_id": self.RUN_ID,
"account_id": self.ACCOUNT_ID,
"conn_id": self.CONN_ID,
- "end_time": self.END_TIME,
- "execution_deadline": self.EXECUTION_DEADLINE,
+ "end_time": end_time,
+ "execution_deadline": execution_deadline,
"poll_interval": self.POLL_INTERVAL,
"hook_params": {"retry_delay": 10},
}
@pytest.mark.asyncio
-
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
- async def test_dbt_run_job_trigger(self, mocked_is_still_running):
+
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+ async def test_dbt_run_job_trigger(self, mock_get_job_status, end_time):
"""Test DbtCloudRunJobTrigger is triggered with mocked details and run
successfully."""
- mocked_is_still_running.return_value = True
+
+ mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value
trigger = DbtCloudRunJobTrigger(
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
- end_time=self.END_TIME,
+ end_time=end_time,
run_id=self.RUN_ID,
account_id=self.ACCOUNT_ID,
)
task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)
- # TriggerEvent was not returned
+ # TriggerEvent was not returned.
assert task.done() is False
asyncio.get_event_loop().stop()
@@ -85,20 +92,21 @@ class TestDbtCloudRunJobTrigger:
("mock_value", "mock_status", "mock_message"),
[
(DbtCloudJobRunStatus.SUCCESS.value, "success", "Job run 1234 has
completed successfully."),
+ (DbtCloudJobRunStatus.CANCELLED.value, "cancelled", "Job run 1234
has been cancelled."),
+ (DbtCloudJobRunStatus.ERROR.value, "error", "Job run 1234 has
failed."),
],
)
-
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
- async def test_dbt_job_run_for_terminal_status_success(
- self, mock_get_job_status, mocked_is_still_running, mock_value,
mock_status, mock_message
+ async def test_dbt_job_run_for_terminal_status(
+ self, mock_get_job_status, mock_value, mock_status, mock_message,
end_time
):
- """Assert that run trigger success message in case of job success"""
- mocked_is_still_running.return_value = False
+ """Assert run trigger messages when job reaches terminal status."""
+
mock_get_job_status.return_value = mock_value
trigger = DbtCloudRunJobTrigger(
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
- end_time=self.END_TIME,
+ end_time=end_time,
run_id=self.RUN_ID,
account_id=self.ACCOUNT_ID,
)
@@ -107,254 +115,110 @@ class TestDbtCloudRunJobTrigger:
"message": mock_message,
"run_id": self.RUN_ID,
}
- task = asyncio.create_task(trigger.run().__anext__())
- await asyncio.sleep(0.5)
- assert TriggerEvent(expected_result) == task.result()
- asyncio.get_event_loop().stop()
+
+ events = [e async for e in trigger.run()]
+ assert len(events) == 1
+ assert TriggerEvent(expected_result) == events[0]
@pytest.mark.asyncio
- @pytest.mark.parametrize(
- ("mock_value", "mock_status", "mock_message"),
- [
- (DbtCloudJobRunStatus.CANCELLED.value, "cancelled", "Job run 1234
has been cancelled."),
- ],
- )
-
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
- async def test_dbt_job_run_for_terminal_status_cancelled(
- self, mock_get_job_status, mocked_is_still_running, mock_value,
mock_status, mock_message
- ):
- """Assert that run trigger success message in case of job success"""
- mocked_is_still_running.return_value = False
- mock_get_job_status.return_value = mock_value
+ async def test_dbt_job_run_exception(self, mock_get_job_status, end_time):
+ """Assert that run catch exception if dbt cloud job API throw
exception."""
+
+ mock_get_job_status.side_effect = Exception("Test exception")
trigger = DbtCloudRunJobTrigger(
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
- end_time=self.END_TIME,
+ end_time=end_time,
run_id=self.RUN_ID,
account_id=self.ACCOUNT_ID,
)
+
expected_result = {
- "status": mock_status,
- "message": mock_message,
+ "status": "error",
+ "message": "Test exception",
"run_id": self.RUN_ID,
}
- task = asyncio.create_task(trigger.run().__anext__())
- await asyncio.sleep(0.5)
- assert TriggerEvent(expected_result) == task.result()
- asyncio.get_event_loop().stop()
+
+ events = [e async for e in trigger.run()]
+ assert len(events) == 1
+ assert TriggerEvent(expected_result) == events[0]
@pytest.mark.asyncio
- @pytest.mark.parametrize(
- ("mock_value", "mock_status", "mock_message"),
- [
- (DbtCloudJobRunStatus.ERROR.value, "error", "Job run 1234 has
failed."),
- ],
- )
-
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
- async def test_dbt_job_run_for_terminal_status_error(
- self, mock_get_job_status, mocked_is_still_running, mock_value,
mock_status, mock_message
- ):
- """Assert that run trigger success message in case of job success"""
- mocked_is_still_running.return_value = False
- mock_get_job_status.return_value = mock_value
+ async def test_dbt_job_run_timeout(self, mock_get_job_status):
+ """Assert that run timeout after end_time elapsed."""
+
+ mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value
+
+ end_time = time.monotonic() - 1
trigger = DbtCloudRunJobTrigger(
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
- end_time=self.END_TIME,
+ end_time=end_time,
run_id=self.RUN_ID,
account_id=self.ACCOUNT_ID,
)
+
expected_result = {
- "status": mock_status,
- "message": mock_message,
+ "status": "error",
+ "message": f"Job run {self.RUN_ID} has not reached a terminal
status "
+ f"within the configured timeout.",
"run_id": self.RUN_ID,
}
- task = asyncio.create_task(trigger.run().__anext__())
- await asyncio.sleep(0.5)
- assert TriggerEvent(expected_result) == task.result()
- asyncio.get_event_loop().stop()
- @pytest.mark.asyncio
-
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
-
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
- async def test_dbt_job_run_exception(self, mock_get_job_status,
mocked_is_still_running):
- """Assert that run catch exception if dbt cloud job API throw
exception"""
- mocked_is_still_running.return_value = False
- mock_get_job_status.side_effect = Exception("Test exception")
- trigger = DbtCloudRunJobTrigger(
- conn_id=self.CONN_ID,
- poll_interval=self.POLL_INTERVAL,
- end_time=self.END_TIME,
- run_id=self.RUN_ID,
- account_id=self.ACCOUNT_ID,
- )
- task = [i async for i in trigger.run()]
- response = TriggerEvent(
- {
- "status": "error",
- "message": "Test exception",
- "run_id": self.RUN_ID,
- }
- )
- assert len(task) == 1
- assert response in task
+ events = [e async for e in trigger.run()]
+ assert len(events) == 1
+ assert TriggerEvent(expected_result) == events[0]
@pytest.mark.asyncio
-
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
- async def test_dbt_job_run_timeout(self, mock_get_job_status,
mocked_is_still_running):
- """Assert that run timeout after end_time elapsed"""
- mocked_is_still_running.return_value = True
- mock_get_job_status.side_effect = Exception("Test exception")
- end_time = time.time()
- trigger = DbtCloudRunJobTrigger(
- conn_id=self.CONN_ID,
- poll_interval=self.POLL_INTERVAL,
- end_time=end_time,
- run_id=self.RUN_ID,
- account_id=self.ACCOUNT_ID,
- )
- generator = trigger.run()
- actual = await generator.asend(None)
- expected = TriggerEvent(
- {
- "status": "error",
- "message": f"Job run {self.RUN_ID} has not reached a terminal
status "
- f"within the configured timeout.",
- "run_id": self.RUN_ID,
- }
- )
- assert expected == actual
-
- @pytest.mark.asyncio
-
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
- async def test_dbt_job_run_timeout_with_final_status_check(self,
mock_get_job_status):
- """Assert that a final status check prevents false timeout when job
completes near timeout."""
- mock_get_job_status.return_value = DbtCloudJobRunStatus.SUCCESS.value
- # Simulate: first is_still_running call returns True (job running),
- # then the timeout check fires, but the final is_still_running call
returns False
- # (job just completed). The trigger should yield success, not a
timeout error.
- end_time = time.time() # Already expired
- trigger = DbtCloudRunJobTrigger(
- conn_id=self.CONN_ID,
- poll_interval=self.POLL_INTERVAL,
- end_time=end_time,
- run_id=self.RUN_ID,
- account_id=self.ACCOUNT_ID,
- )
- with mock.patch.object(trigger, "is_still_running",
new_callable=AsyncMock) as mock_running:
- # First call: still running; second call (final check): no longer
running
- mock_running.side_effect = [True, False]
- generator = trigger.run()
- actual = await generator.asend(None)
- expected = TriggerEvent(
- {
- "status": "success",
- "message": f"Job run {self.RUN_ID} has completed
successfully.",
- "run_id": self.RUN_ID,
- }
- )
- assert expected == actual
+ async def test_dbt_job_run_execution_timeout(self, mock_get_job_status,
end_time):
+ """Assert that run emits timeout event after execution_deadline
elapsed."""
- @pytest.mark.asyncio
-
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
- async def test_dbt_job_run_execution_timeout(self,
mocked_is_still_running):
- """Assert that run emits timeout event after execution_deadline
elapsed"""
- mocked_is_still_running.return_value = True
+ mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value
- execution_deadline = time.time()
+ execution_deadline = time.monotonic() - 1
trigger = DbtCloudRunJobTrigger(
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
- end_time=time.time() + 60,
+ end_time=end_time,
execution_deadline=execution_deadline,
run_id=self.RUN_ID,
account_id=self.ACCOUNT_ID,
)
- generator = trigger.run()
- actual = await generator.asend(None)
-
- expected = TriggerEvent(
- {
- "status": "timeout",
- "message": f"Job run {self.RUN_ID} has timed out.",
- "run_id": self.RUN_ID,
- }
- )
+ expected_result = {
+ "status": "timeout",
+ "message": f"Job run {self.RUN_ID} has timed out.",
+ "run_id": self.RUN_ID,
+ }
- assert expected == actual
+ events = [e async for e in trigger.run()]
+ assert len(events) == 1
+ assert TriggerEvent(expected_result) == events[0]
@pytest.mark.asyncio
@pytest.mark.parametrize(
("mock_response", "expected_status"),
[
(DbtCloudJobRunStatus.SUCCESS.value, False),
- ],
- )
-
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
- async def test_dbt_job_run_is_still_running_success(
- self, mock_get_job_status, mock_response, expected_status
- ):
- """Test is_still_running with mocked response job status and assert
- the return response with expected value"""
- hook = AsyncMock(DbtCloudHook)
- hook.get_job_status.return_value = mock_response
- trigger = DbtCloudRunJobTrigger(
- conn_id=self.CONN_ID,
- poll_interval=self.POLL_INTERVAL,
- end_time=self.END_TIME,
- run_id=self.RUN_ID,
- account_id=self.ACCOUNT_ID,
- )
- response = await trigger.is_still_running(hook)
- assert response == expected_status
-
- @pytest.mark.asyncio
- @pytest.mark.parametrize(
- ("mock_response", "expected_status"),
- [
(DbtCloudJobRunStatus.RUNNING.value, True),
],
)
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
- async def test_dbt_job_run_is_still_running(self, mock_get_job_status,
mock_response, expected_status):
- """Test is_still_running with mocked response job status and assert
- the return response with expected value"""
- hook = AsyncMock(DbtCloudHook)
- hook.get_job_status.return_value = mock_response
- trigger = DbtCloudRunJobTrigger(
- conn_id=self.CONN_ID,
- poll_interval=self.POLL_INTERVAL,
- end_time=self.END_TIME,
- run_id=self.RUN_ID,
- account_id=self.ACCOUNT_ID,
- )
- response = await trigger.is_still_running(hook)
- assert response == expected_status
-
- @pytest.mark.asyncio
- @pytest.mark.parametrize(
- ("mock_response", "expected_status"),
- [
- (DbtCloudJobRunStatus.QUEUED.value, True),
- ],
- )
-
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
- async def test_dbt_job_run_is_still_running_queued(
- self, mock_get_job_status, mock_response, expected_status
+ async def test_dbt_job_run_is_still_running(
+ self, mock_get_job_status, mock_response, expected_status, end_time
):
"""Test is_still_running with mocked response job status and assert
- the return response with expected value"""
+ the return response with expected value."""
hook = AsyncMock(DbtCloudHook)
hook.get_job_status.return_value = mock_response
trigger = DbtCloudRunJobTrigger(
conn_id=self.CONN_ID,
poll_interval=self.POLL_INTERVAL,
- end_time=self.END_TIME,
+ end_time=end_time,
run_id=self.RUN_ID,
account_id=self.ACCOUNT_ID,
)
diff --git a/scripts/ci/prek/known_airflow_exceptions.txt
b/scripts/ci/prek/known_airflow_exceptions.txt
index 4d700bcb81d..f1ddfbd1efc 100644
--- a/scripts/ci/prek/known_airflow_exceptions.txt
+++ b/scripts/ci/prek/known_airflow_exceptions.txt
@@ -190,7 +190,6 @@
providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py::1
providers/datadog/src/airflow/providers/datadog/hooks/datadog.py::2
providers/datadog/src/airflow/providers/datadog/sensors/datadog.py::1
providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py::3
-providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py::1
providers/dbt/cloud/src/airflow/providers/dbt/cloud/sensors/dbt.py::1
providers/dingding/src/airflow/providers/dingding/hooks/dingding.py::2
providers/discord/src/airflow/providers/discord/operators/discord_webhook.py::1