This is an automated email from the ASF dual-hosted git repository.

josh-fell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bc8ef9a4b5c Refactor deferrable execution in DbtCloudRunJobOperator to 
align with PR #64051 patterns. Simplify trigger polling/control flow and 
propagate execution_timeout via defer. Add tests for best-effort cancellation 
semantics in execute_complete and on_kill. (#66449)
bc8ef9a4b5c is described below

commit bc8ef9a4b5cfb7e24e1a495df68f4897bedce837
Author: SameerMesiah97 <[email protected]>
AuthorDate: Sat May 16 13:37:08 2026 +0100

    Refactor deferrable execution in DbtCloudRunJobOperator to align with PR 
#64051 patterns. Simplify trigger polling/control flow and propagate 
execution_timeout via defer. Add tests for best-effort cancellation semantics 
in execute_complete and on_kill. (#66449)
---
 .../airflow/providers/dbt/cloud/operators/dbt.py   |  38 ++-
 .../airflow/providers/dbt/cloud/triggers/dbt.py    |  72 +++---
 .../tests/unit/dbt/cloud/operators/test_dbt.py     | 133 +++++++++-
 .../tests/unit/dbt/cloud/triggers/test_dbt.py      | 282 ++++++---------------
 scripts/ci/prek/known_airflow_exceptions.txt       |   1 -
 5 files changed, 273 insertions(+), 253 deletions(-)

diff --git 
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
index 0718b1f5abd..9c519a78461 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
@@ -23,7 +23,7 @@ from functools import cached_property
 from pathlib import Path
 from typing import TYPE_CHECKING, Any
 
-from airflow.providers.common.compat.sdk import AirflowException, 
BaseOperator, BaseOperatorLink, XCom, conf
+from airflow.providers.common.compat.sdk import BaseOperator, 
BaseOperatorLink, XCom, conf
 from airflow.providers.dbt.cloud.hooks.dbt import (
     DbtCloudHook,
     DbtCloudJobRunException,
@@ -250,16 +250,16 @@ class DbtCloudRunJobOperator(BaseOperator):
             # execution_timeout is a hard task-level limit (cancels the job),
             # while timeout only limits how long we wait for the job to finish.
             # If both are set, the earliest deadline wins.
-            end_time = time.time() + self.timeout
+            end_time = time.monotonic() + self.timeout
             execution_deadline = None
-            if self.execution_timeout:
-                execution_deadline = time.time() + 
self.execution_timeout.total_seconds()
+            if self.execution_timeout is not None:
+                execution_deadline = time.monotonic() + 
self.execution_timeout.total_seconds()
 
             job_run_info = JobRunInfo(account_id=self.account_id, 
run_id=self.run_id)
             job_run_status = self.hook.get_job_run_status(**job_run_info)
             if not DbtCloudJobRunStatus.is_terminal(job_run_status):
                 self.defer(
-                    timeout=None,
+                    timeout=self.execution_timeout,
                     trigger=DbtCloudRunJobTrigger(
                         conn_id=self.dbt_cloud_conn_id,
                         run_id=self.run_id,
@@ -293,8 +293,22 @@ class DbtCloudRunJobOperator(BaseOperator):
 
         # Enforce execution_timeout semantics in deferrable mode by cancelling 
the job.
         if event["status"] == "timeout":
-            self.hook.cancel_job_run(account_id=self.account_id, 
run_id=self.run_id)
-            raise AirflowException(f"Job run {self.run_id} has timed out.")
+            if self.run_id is not None:
+                self.log.info("Cancelling DBT job run %s due to execution 
timeout", self.run_id)
+
+                # Attempt best-effort job run cancellation.
+                try:
+                    self.hook.cancel_job_run(account_id=self.account_id, 
run_id=self.run_id)
+                except Exception:
+                    self.log.warning(
+                        "Failed to cancel DBT job run %s after timeout",
+                        self.run_id,
+                        exc_info=True,
+                    )
+            else:
+                self.log.warning("No run_id found; skipping cancellation")
+
+            raise DbtCloudJobRunException(f"Job run {self.run_id} has timed 
out.")
 
         self.log.info(event["message"])
         return int(event["run_id"])
@@ -303,7 +317,15 @@ class DbtCloudRunJobOperator(BaseOperator):
         if not self.run_id:
             return
 
-        self.hook.cancel_job_run(account_id=self.account_id, 
run_id=self.run_id)
+        # Attempt best-effort job run cancellation.
+        try:
+            self.hook.cancel_job_run(account_id=self.account_id, 
run_id=self.run_id)
+        except Exception:
+            self.log.warning(
+                "Failed to cancel DBT job run %s during on_kill",
+                self.run_id,
+                exc_info=True,
+            )
 
         # Attempt best-effort confirmation of cancellation.
         try:
diff --git 
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
index a0bb91861a8..8efd918780b 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/triggers/dbt.py
@@ -79,9 +79,43 @@ class DbtCloudRunJobTrigger(BaseTrigger):
         """Make async connection to Dbt, polls for the pipeline run status."""
         hook = DbtCloudHook(self.conn_id, **self.hook_params)
         try:
-            while await self.is_still_running(hook):
+            while True:
+                now = time.monotonic()
+
+                job_run_status = await hook.get_job_status(self.run_id, 
self.account_id)
+
+                if job_run_status == DbtCloudJobRunStatus.SUCCESS.value:
+                    yield TriggerEvent(
+                        {
+                            "status": "success",
+                            "message": f"Job run {self.run_id} has completed 
successfully.",
+                            "run_id": self.run_id,
+                        }
+                    )
+                    return
+
+                elif job_run_status == DbtCloudJobRunStatus.CANCELLED.value:
+                    yield TriggerEvent(
+                        {
+                            "status": "cancelled",
+                            "message": f"Job run {self.run_id} has been 
cancelled.",
+                            "run_id": self.run_id,
+                        }
+                    )
+                    return
+
+                elif job_run_status == DbtCloudJobRunStatus.ERROR.value:
+                    yield TriggerEvent(
+                        {
+                            "status": "error",
+                            "message": f"Job run {self.run_id} has failed.",
+                            "run_id": self.run_id,
+                        }
+                    )
+                    return
+
                 if self.execution_deadline is not None:
-                    if self.execution_deadline < time.time():
+                    if self.execution_deadline <= now:
                         yield TriggerEvent(
                             {
                                 "status": "timeout",
@@ -91,11 +125,7 @@ class DbtCloudRunJobTrigger(BaseTrigger):
                         )
                         return
 
-                if self.end_time < time.time():
-                    # Perform a final status check before declaring timeout, 
in case the
-                    # job completed between the last poll and the timeout 
expiry.
-                    if not await self.is_still_running(hook):
-                        break
+                if self.end_time <= now:
                     yield TriggerEvent(
                         {
                             "status": "error",
@@ -105,34 +135,12 @@ class DbtCloudRunJobTrigger(BaseTrigger):
                         }
                     )
                     return
+
                 await asyncio.sleep(self.poll_interval)
-            job_run_status = await hook.get_job_status(self.run_id, 
self.account_id)
-            if job_run_status == DbtCloudJobRunStatus.SUCCESS.value:
-                yield TriggerEvent(
-                    {
-                        "status": "success",
-                        "message": f"Job run {self.run_id} has completed 
successfully.",
-                        "run_id": self.run_id,
-                    }
-                )
-            elif job_run_status == DbtCloudJobRunStatus.CANCELLED.value:
-                yield TriggerEvent(
-                    {
-                        "status": "cancelled",
-                        "message": f"Job run {self.run_id} has been 
cancelled.",
-                        "run_id": self.run_id,
-                    }
-                )
-            else:
-                yield TriggerEvent(
-                    {
-                        "status": "error",
-                        "message": f"Job run {self.run_id} has failed.",
-                        "run_id": self.run_id,
-                    }
-                )
+
         except Exception as e:
             yield TriggerEvent({"status": "error", "message": str(e), 
"run_id": self.run_id})
+            return
 
     async def is_still_running(self, hook: DbtCloudHook) -> bool:
         """Check whether the submitted job is running."""
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py 
b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
index e5df84277a1..2d2ed3c28f1 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
@@ -23,7 +23,7 @@ from unittest.mock import MagicMock, patch
 import pytest
 
 from airflow.models import DAG, Connection
-from airflow.providers.common.compat.sdk import AirflowException, 
TaskDeferred, timezone
+from airflow.providers.common.compat.sdk import TaskDeferred, timezone
 from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook, 
DbtCloudJobRunException, DbtCloudJobRunStatus
 from airflow.providers.dbt.cloud.operators.dbt import (
     DbtCloudGetJobRunArtifactOperator,
@@ -199,7 +199,7 @@ class TestDbtCloudRunJobOperator:
     def test_dbt_run_job_op_async(self, mock_trigger_job_run, mock_dbt_hook, 
mock_job_run_status, status):
         """
         Asserts that a task is deferred and an DbtCloudRunJobTrigger will be 
fired
-        when the DbtCloudRunJobOperator has deferrable param set to True
+        when the DbtCloudRunJobOperator has deferrable param set to True.
         """
         mock_job_run_status.return_value = status
         dbt_op = DbtCloudRunJobOperator(
@@ -215,6 +215,40 @@ class TestDbtCloudRunJobOperator:
             dbt_op.execute(MagicMock())
         assert isinstance(exc.value.trigger, DbtCloudRunJobTrigger), "Trigger 
is not a DbtCloudRunJobTrigger"
 
+    def test_execute_complete_timeout_without_run_id(self):
+        """
+        Verify that when a deferrable dbt job emits a timeout event with no 
run_id,
+        the operator cancels the job and fails.
+        """
+
+        operator = DbtCloudRunJobOperator(
+            task_id=TASK_ID,
+            dbt_cloud_conn_id=ACCOUNT_ID_CONN,
+            job_id=JOB_ID,
+            dag=self.dag,
+            deferrable=True,
+        )
+
+        # Pretend the job was already triggered.
+        operator.run_id = None
+
+        # Mock the hook so we can assert cancellation.
+        operator.hook = MagicMock()
+
+        timeout_event = {
+            "status": "timeout",
+            "run_id": None,
+            "message": "Job run timed out.",
+        }
+
+        with pytest.raises(DbtCloudJobRunException):
+            operator.execute_complete(
+                context=self.mock_context,
+                event=timeout_event,
+            )
+
+        operator.hook.cancel_job_run.assert_not_called()
+
     def test_execute_complete_timeout_cancels_job(self):
         """
         Verify that when a deferrable dbt job emits a timeout event,
@@ -240,7 +274,45 @@ class TestDbtCloudRunJobOperator:
             "message": "Job run timed out.",
         }
 
-        with pytest.raises(AirflowException, match="has timed out"):
+        with pytest.raises(DbtCloudJobRunException, match="has timed out"):
+            operator.execute_complete(
+                context=self.mock_context,
+                event=timeout_event,
+            )
+
+        operator.hook.cancel_job_run.assert_called_once_with(
+            account_id=operator.account_id,
+            run_id=RUN_ID,
+        )
+
+    def 
test_execute_complete_timeout_cancel_job_does_not_mask_original_error(self):
+        """
+        Verify that when a deferrable dbt job is cancelled after a timeout 
event is received,
+        the original error is not masked.
+        """
+        operator = DbtCloudRunJobOperator(
+            task_id=TASK_ID,
+            dbt_cloud_conn_id=ACCOUNT_ID_CONN,
+            job_id=JOB_ID,
+            dag=self.dag,
+            deferrable=True,
+        )
+
+        # Pretend the job was already triggered.
+        operator.run_id = RUN_ID
+
+        # Mock the hook so we can assert cancellation.
+        operator.hook = MagicMock()
+
+        operator.hook.cancel_job_run.side_effect = Exception("Cancellation 
failed")
+
+        timeout_event = {
+            "status": "timeout",
+            "run_id": RUN_ID,
+            "message": "Job run timed out.",
+        }
+
+        with pytest.raises(DbtCloudJobRunException, match="has timed out"):
             operator.execute_complete(
                 context=self.mock_context,
                 event=timeout_event,
@@ -690,6 +762,61 @@ class TestDbtCloudRunJobOperator:
                 additional_run_config=self.config["additional_run_config"],
             )
 
+    def test_on_kill_cancels_job_and_confirms_success(self):
+        operator = DbtCloudRunJobOperator(
+            task_id=TASK_ID,
+            dbt_cloud_conn_id=ACCOUNT_ID_CONN,
+            job_id=JOB_ID,
+            dag=self.dag,
+        )
+
+        operator.run_id = RUN_ID
+        operator.hook = MagicMock()
+
+        # Simulate successful cancellation confirmation.
+        operator.hook.wait_for_job_run_status.return_value = True
+
+        operator.on_kill()
+
+        operator.hook.cancel_job_run.assert_called_once_with(
+            account_id=operator.account_id,
+            run_id=RUN_ID,
+        )
+
+        operator.hook.wait_for_job_run_status.assert_called_once_with(
+            run_id=RUN_ID,
+            account_id=operator.account_id,
+            expected_statuses=DbtCloudJobRunStatus.CANCELLED.value,
+            check_interval=operator.check_interval,
+            timeout=operator.timeout,
+        )
+
+    def test_on_kill_best_effort_cancellation_does_not_raise(self):
+        operator = DbtCloudRunJobOperator(
+            task_id=TASK_ID,
+            dbt_cloud_conn_id=ACCOUNT_ID_CONN,
+            job_id=JOB_ID,
+            dag=self.dag,
+        )
+
+        operator.run_id = RUN_ID
+        operator.hook = MagicMock()
+
+        # Simulate cancellation failure.
+        operator.hook.cancel_job_run.side_effect = Exception("Cancellation 
failed")
+
+        # Simulate confirmation also failing (normal path).
+        operator.hook.wait_for_job_run_status.side_effect = 
DbtCloudJobRunException("Still running")
+
+        operator.on_kill()
+
+        operator.hook.cancel_job_run.assert_called_once_with(
+            account_id=operator.account_id,
+            run_id=RUN_ID,
+        )
+
+        operator.hook.wait_for_job_run_status.assert_called_once()
+
     @pytest.mark.parametrize(
         ("conn_id", "account_id"),
         [(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py 
b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
index 4d1d971f60d..12c94d19b9e 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/triggers/test_dbt.py
@@ -34,17 +34,23 @@ class TestDbtCloudRunJobTrigger:
     RUN_ID = 1234
     CONN_ID = "dbt_cloud_default"
     ACCOUNT_ID = 12340
-    END_TIME = time.time() + 60 * 60 * 24 * 7
-    EXECUTION_DEADLINE = time.time() + 60 * 60 * 24 * 7
     POLL_INTERVAL = 3.0
 
-    def test_serialization(self):
+    @pytest.fixture
+    def end_time(self):
+        return time.monotonic() + 60 * 60 * 24 * 7
+
+    @pytest.fixture
+    def execution_deadline(self):
+        return time.monotonic() + 60 * 60 * 24 * 7
+
+    def test_serialization(self, end_time, execution_deadline):
         """Assert DbtCloudRunJobTrigger correctly serializes its arguments and 
classpath."""
         trigger = DbtCloudRunJobTrigger(
             conn_id=self.CONN_ID,
             poll_interval=self.POLL_INTERVAL,
-            end_time=self.END_TIME,
-            execution_deadline=self.EXECUTION_DEADLINE,
+            end_time=end_time,
+            execution_deadline=execution_deadline,
             run_id=self.RUN_ID,
             account_id=self.ACCOUNT_ID,
             hook_params={"retry_delay": 10},
@@ -55,28 +61,29 @@ class TestDbtCloudRunJobTrigger:
             "run_id": self.RUN_ID,
             "account_id": self.ACCOUNT_ID,
             "conn_id": self.CONN_ID,
-            "end_time": self.END_TIME,
-            "execution_deadline": self.EXECUTION_DEADLINE,
+            "end_time": end_time,
+            "execution_deadline": execution_deadline,
             "poll_interval": self.POLL_INTERVAL,
             "hook_params": {"retry_delay": 10},
         }
 
     @pytest.mark.asyncio
-    
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
-    async def test_dbt_run_job_trigger(self, mocked_is_still_running):
+    
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
+    async def test_dbt_run_job_trigger(self, mock_get_job_status, end_time):
         """Test DbtCloudRunJobTrigger is triggered with mocked details and run 
successfully."""
-        mocked_is_still_running.return_value = True
+
+        mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value
         trigger = DbtCloudRunJobTrigger(
             conn_id=self.CONN_ID,
             poll_interval=self.POLL_INTERVAL,
-            end_time=self.END_TIME,
+            end_time=end_time,
             run_id=self.RUN_ID,
             account_id=self.ACCOUNT_ID,
         )
         task = asyncio.create_task(trigger.run().__anext__())
         await asyncio.sleep(0.5)
 
-        # TriggerEvent was not returned
+        # TriggerEvent was not returned.
         assert task.done() is False
         asyncio.get_event_loop().stop()
 
@@ -85,20 +92,21 @@ class TestDbtCloudRunJobTrigger:
         ("mock_value", "mock_status", "mock_message"),
         [
             (DbtCloudJobRunStatus.SUCCESS.value, "success", "Job run 1234 has 
completed successfully."),
+            (DbtCloudJobRunStatus.CANCELLED.value, "cancelled", "Job run 1234 
has been cancelled."),
+            (DbtCloudJobRunStatus.ERROR.value, "error", "Job run 1234 has 
failed."),
         ],
     )
-    
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
     
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
-    async def test_dbt_job_run_for_terminal_status_success(
-        self, mock_get_job_status, mocked_is_still_running, mock_value, 
mock_status, mock_message
+    async def test_dbt_job_run_for_terminal_status(
+        self, mock_get_job_status, mock_value, mock_status, mock_message, 
end_time
     ):
-        """Assert that run trigger success message in case of job success"""
-        mocked_is_still_running.return_value = False
+        """Assert run trigger messages when job reaches terminal status."""
+
         mock_get_job_status.return_value = mock_value
         trigger = DbtCloudRunJobTrigger(
             conn_id=self.CONN_ID,
             poll_interval=self.POLL_INTERVAL,
-            end_time=self.END_TIME,
+            end_time=end_time,
             run_id=self.RUN_ID,
             account_id=self.ACCOUNT_ID,
         )
@@ -107,254 +115,110 @@ class TestDbtCloudRunJobTrigger:
             "message": mock_message,
             "run_id": self.RUN_ID,
         }
-        task = asyncio.create_task(trigger.run().__anext__())
-        await asyncio.sleep(0.5)
-        assert TriggerEvent(expected_result) == task.result()
-        asyncio.get_event_loop().stop()
+
+        events = [e async for e in trigger.run()]
+        assert len(events) == 1
+        assert TriggerEvent(expected_result) == events[0]
 
     @pytest.mark.asyncio
-    @pytest.mark.parametrize(
-        ("mock_value", "mock_status", "mock_message"),
-        [
-            (DbtCloudJobRunStatus.CANCELLED.value, "cancelled", "Job run 1234 
has been cancelled."),
-        ],
-    )
-    
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
     
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
-    async def test_dbt_job_run_for_terminal_status_cancelled(
-        self, mock_get_job_status, mocked_is_still_running, mock_value, 
mock_status, mock_message
-    ):
-        """Assert that run trigger success message in case of job success"""
-        mocked_is_still_running.return_value = False
-        mock_get_job_status.return_value = mock_value
+    async def test_dbt_job_run_exception(self, mock_get_job_status, end_time):
+        """Assert that run catch exception if dbt cloud job API throw 
exception."""
+
+        mock_get_job_status.side_effect = Exception("Test exception")
         trigger = DbtCloudRunJobTrigger(
             conn_id=self.CONN_ID,
             poll_interval=self.POLL_INTERVAL,
-            end_time=self.END_TIME,
+            end_time=end_time,
             run_id=self.RUN_ID,
             account_id=self.ACCOUNT_ID,
         )
+
         expected_result = {
-            "status": mock_status,
-            "message": mock_message,
+            "status": "error",
+            "message": "Test exception",
             "run_id": self.RUN_ID,
         }
-        task = asyncio.create_task(trigger.run().__anext__())
-        await asyncio.sleep(0.5)
-        assert TriggerEvent(expected_result) == task.result()
-        asyncio.get_event_loop().stop()
+
+        events = [e async for e in trigger.run()]
+        assert len(events) == 1
+        assert TriggerEvent(expected_result) == events[0]
 
     @pytest.mark.asyncio
-    @pytest.mark.parametrize(
-        ("mock_value", "mock_status", "mock_message"),
-        [
-            (DbtCloudJobRunStatus.ERROR.value, "error", "Job run 1234 has 
failed."),
-        ],
-    )
-    
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
     
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
-    async def test_dbt_job_run_for_terminal_status_error(
-        self, mock_get_job_status, mocked_is_still_running, mock_value, 
mock_status, mock_message
-    ):
-        """Assert that run trigger success message in case of job success"""
-        mocked_is_still_running.return_value = False
-        mock_get_job_status.return_value = mock_value
+    async def test_dbt_job_run_timeout(self, mock_get_job_status):
+        """Assert that run timeout after end_time elapsed."""
+
+        mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value
+
+        end_time = time.monotonic() - 1
         trigger = DbtCloudRunJobTrigger(
             conn_id=self.CONN_ID,
             poll_interval=self.POLL_INTERVAL,
-            end_time=self.END_TIME,
+            end_time=end_time,
             run_id=self.RUN_ID,
             account_id=self.ACCOUNT_ID,
         )
+
         expected_result = {
-            "status": mock_status,
-            "message": mock_message,
+            "status": "error",
+            "message": f"Job run {self.RUN_ID} has not reached a terminal 
status "
+            f"within the configured timeout.",
             "run_id": self.RUN_ID,
         }
-        task = asyncio.create_task(trigger.run().__anext__())
-        await asyncio.sleep(0.5)
-        assert TriggerEvent(expected_result) == task.result()
-        asyncio.get_event_loop().stop()
 
-    @pytest.mark.asyncio
-    
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
-    
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
-    async def test_dbt_job_run_exception(self, mock_get_job_status, 
mocked_is_still_running):
-        """Assert that run catch exception if dbt cloud job API throw 
exception"""
-        mocked_is_still_running.return_value = False
-        mock_get_job_status.side_effect = Exception("Test exception")
-        trigger = DbtCloudRunJobTrigger(
-            conn_id=self.CONN_ID,
-            poll_interval=self.POLL_INTERVAL,
-            end_time=self.END_TIME,
-            run_id=self.RUN_ID,
-            account_id=self.ACCOUNT_ID,
-        )
-        task = [i async for i in trigger.run()]
-        response = TriggerEvent(
-            {
-                "status": "error",
-                "message": "Test exception",
-                "run_id": self.RUN_ID,
-            }
-        )
-        assert len(task) == 1
-        assert response in task
+        events = [e async for e in trigger.run()]
+        assert len(events) == 1
+        assert TriggerEvent(expected_result) == events[0]
 
     @pytest.mark.asyncio
-    
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
     
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
-    async def test_dbt_job_run_timeout(self, mock_get_job_status, 
mocked_is_still_running):
-        """Assert that run timeout after end_time elapsed"""
-        mocked_is_still_running.return_value = True
-        mock_get_job_status.side_effect = Exception("Test exception")
-        end_time = time.time()
-        trigger = DbtCloudRunJobTrigger(
-            conn_id=self.CONN_ID,
-            poll_interval=self.POLL_INTERVAL,
-            end_time=end_time,
-            run_id=self.RUN_ID,
-            account_id=self.ACCOUNT_ID,
-        )
-        generator = trigger.run()
-        actual = await generator.asend(None)
-        expected = TriggerEvent(
-            {
-                "status": "error",
-                "message": f"Job run {self.RUN_ID} has not reached a terminal 
status "
-                f"within the configured timeout.",
-                "run_id": self.RUN_ID,
-            }
-        )
-        assert expected == actual
-
-    @pytest.mark.asyncio
-    
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
-    async def test_dbt_job_run_timeout_with_final_status_check(self, 
mock_get_job_status):
-        """Assert that a final status check prevents false timeout when job 
completes near timeout."""
-        mock_get_job_status.return_value = DbtCloudJobRunStatus.SUCCESS.value
-        # Simulate: first is_still_running call returns True (job running),
-        # then the timeout check fires, but the final is_still_running call 
returns False
-        # (job just completed). The trigger should yield success, not a 
timeout error.
-        end_time = time.time()  # Already expired
-        trigger = DbtCloudRunJobTrigger(
-            conn_id=self.CONN_ID,
-            poll_interval=self.POLL_INTERVAL,
-            end_time=end_time,
-            run_id=self.RUN_ID,
-            account_id=self.ACCOUNT_ID,
-        )
-        with mock.patch.object(trigger, "is_still_running", 
new_callable=AsyncMock) as mock_running:
-            # First call: still running; second call (final check): no longer 
running
-            mock_running.side_effect = [True, False]
-            generator = trigger.run()
-            actual = await generator.asend(None)
-        expected = TriggerEvent(
-            {
-                "status": "success",
-                "message": f"Job run {self.RUN_ID} has completed 
successfully.",
-                "run_id": self.RUN_ID,
-            }
-        )
-        assert expected == actual
+    async def test_dbt_job_run_execution_timeout(self, mock_get_job_status, 
end_time):
+        """Assert that run emits timeout event after execution_deadline 
elapsed."""
 
-    @pytest.mark.asyncio
-    
@mock.patch("airflow.providers.dbt.cloud.triggers.dbt.DbtCloudRunJobTrigger.is_still_running")
-    async def test_dbt_job_run_execution_timeout(self, 
mocked_is_still_running):
-        """Assert that run emits timeout event after execution_deadline 
elapsed"""
-        mocked_is_still_running.return_value = True
+        mock_get_job_status.return_value = DbtCloudJobRunStatus.RUNNING.value
 
-        execution_deadline = time.time()
+        execution_deadline = time.monotonic() - 1
 
         trigger = DbtCloudRunJobTrigger(
             conn_id=self.CONN_ID,
             poll_interval=self.POLL_INTERVAL,
-            end_time=time.time() + 60,
+            end_time=end_time,
             execution_deadline=execution_deadline,
             run_id=self.RUN_ID,
             account_id=self.ACCOUNT_ID,
         )
 
-        generator = trigger.run()
-        actual = await generator.asend(None)
-
-        expected = TriggerEvent(
-            {
-                "status": "timeout",
-                "message": f"Job run {self.RUN_ID} has timed out.",
-                "run_id": self.RUN_ID,
-            }
-        )
+        expected_result = {
+            "status": "timeout",
+            "message": f"Job run {self.RUN_ID} has timed out.",
+            "run_id": self.RUN_ID,
+        }
 
-        assert expected == actual
+        events = [e async for e in trigger.run()]
+        assert len(events) == 1
+        assert TriggerEvent(expected_result) == events[0]
 
     @pytest.mark.asyncio
     @pytest.mark.parametrize(
         ("mock_response", "expected_status"),
         [
             (DbtCloudJobRunStatus.SUCCESS.value, False),
-        ],
-    )
-    
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
-    async def test_dbt_job_run_is_still_running_success(
-        self, mock_get_job_status, mock_response, expected_status
-    ):
-        """Test is_still_running with mocked response job status and assert
-        the return response with expected value"""
-        hook = AsyncMock(DbtCloudHook)
-        hook.get_job_status.return_value = mock_response
-        trigger = DbtCloudRunJobTrigger(
-            conn_id=self.CONN_ID,
-            poll_interval=self.POLL_INTERVAL,
-            end_time=self.END_TIME,
-            run_id=self.RUN_ID,
-            account_id=self.ACCOUNT_ID,
-        )
-        response = await trigger.is_still_running(hook)
-        assert response == expected_status
-
-    @pytest.mark.asyncio
-    @pytest.mark.parametrize(
-        ("mock_response", "expected_status"),
-        [
             (DbtCloudJobRunStatus.RUNNING.value, True),
         ],
     )
     
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
-    async def test_dbt_job_run_is_still_running(self, mock_get_job_status, 
mock_response, expected_status):
-        """Test is_still_running with mocked response job status and assert
-        the return response with expected value"""
-        hook = AsyncMock(DbtCloudHook)
-        hook.get_job_status.return_value = mock_response
-        trigger = DbtCloudRunJobTrigger(
-            conn_id=self.CONN_ID,
-            poll_interval=self.POLL_INTERVAL,
-            end_time=self.END_TIME,
-            run_id=self.RUN_ID,
-            account_id=self.ACCOUNT_ID,
-        )
-        response = await trigger.is_still_running(hook)
-        assert response == expected_status
-
-    @pytest.mark.asyncio
-    @pytest.mark.parametrize(
-        ("mock_response", "expected_status"),
-        [
-            (DbtCloudJobRunStatus.QUEUED.value, True),
-        ],
-    )
-    
@mock.patch("airflow.providers.dbt.cloud.hooks.dbt.DbtCloudHook.get_job_status")
-    async def test_dbt_job_run_is_still_running_queued(
-        self, mock_get_job_status, mock_response, expected_status
+    async def test_dbt_job_run_is_still_running(
+        self, mock_get_job_status, mock_response, expected_status, end_time
     ):
         """Test is_still_running with mocked response job status and assert
-        the return response with expected value"""
+        the return response with expected value."""
         hook = AsyncMock(DbtCloudHook)
         hook.get_job_status.return_value = mock_response
         trigger = DbtCloudRunJobTrigger(
             conn_id=self.CONN_ID,
             poll_interval=self.POLL_INTERVAL,
-            end_time=self.END_TIME,
+            end_time=end_time,
             run_id=self.RUN_ID,
             account_id=self.ACCOUNT_ID,
         )
diff --git a/scripts/ci/prek/known_airflow_exceptions.txt 
b/scripts/ci/prek/known_airflow_exceptions.txt
index 4d700bcb81d..f1ddfbd1efc 100644
--- a/scripts/ci/prek/known_airflow_exceptions.txt
+++ b/scripts/ci/prek/known_airflow_exceptions.txt
@@ -190,7 +190,6 @@ 
providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py::1
 providers/datadog/src/airflow/providers/datadog/hooks/datadog.py::2
 providers/datadog/src/airflow/providers/datadog/sensors/datadog.py::1
 providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py::3
-providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py::1
 providers/dbt/cloud/src/airflow/providers/dbt/cloud/sensors/dbt.py::1
 providers/dingding/src/airflow/providers/dingding/hooks/dingding.py::2
 providers/discord/src/airflow/providers/discord/operators/discord_webhook.py::1

Reply via email to