This is an automated email from the ASF dual-hosted git repository.

joshfell pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b7418576cc7 Raise on unexpected terminal dbt Cloud job run states 
(#61300)
b7418576cc7 is described below

commit b7418576cc7fb0d5298b8a54be76390fbada0360
Author: SameerMesiah97 <[email protected]>
AuthorDate: Fri Feb 27 15:17:46 2026 +0000

    Raise on unexpected terminal dbt Cloud job run states (#61300)
    
    DbtCloudHook.wait_for_job_run_status previously returned False when a job 
run
    reached a terminal failure state (ERROR or CANCELLED), which could allow 
Airflow
    tasks to succeed silently when dbt Cloud jobs failed.
    
    This change updates the helper to raise DbtCloudJobRunException when a job 
run
    reaches an unexpected terminal state before the expected status is reached,
    ensuring task failure semantics correctly reflect external job failures.
    
    Call sites are updated accordingly, and on_kill now guards against 
propagated
    exceptions since cancellation confirmation is best-effort and should not 
affect task
    termination behavior.
    
    Co-authored-by: Sameer Mesiah <[email protected]>
---
 .../src/airflow/providers/dbt/cloud/hooks/dbt.py   | 26 ++++++++++++++--------
 .../airflow/providers/dbt/cloud/operators/dbt.py   | 16 +++++++++++--
 .../cloud/tests/unit/dbt/cloud/hooks/test_dbt.py   |  6 ++---
 .../tests/unit/dbt/cloud/operators/test_dbt.py     |  2 +-
 4 files changed, 35 insertions(+), 15 deletions(-)

diff --git a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py
index ca20480abd2..8910f9c8ebc 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py
@@ -797,20 +797,32 @@ class DbtCloudHook(HttpHook):
         :param check_interval: Time in seconds to check on a pipeline run's 
status.
         :param timeout: Time in seconds to wait for a pipeline to reach a 
terminal status or the expected
             status.
-        :return: Boolean indicating if the job run has reached the 
``expected_status``.
+        :return: ``True`` if the job run has reached the ``expected_status``.
+        :raises: ``DbtCloudJobRunException`` If the job run reaches an 
unexpected terminal status
+            or does not reach an expected status within the timeout.
         """
         expected_statuses = (expected_statuses,) if 
isinstance(expected_statuses, int) else expected_statuses
 
         DbtCloudJobRunStatus.check_is_valid(expected_statuses)
 
         job_run_info = JobRunInfo(account_id=account_id, run_id=run_id)
-        job_run_status = self.get_job_run_status(**job_run_info)
 
         start_time = time.monotonic()
 
-        while (
-            not DbtCloudJobRunStatus.is_terminal(job_run_status) and 
job_run_status not in expected_statuses
-        ):
+        while True:
+            job_run_status = self.get_job_run_status(**job_run_info)
+
+            if job_run_status in expected_statuses:
+                return True
+
+            # Reached terminal failure before expected state.
+            if DbtCloudJobRunStatus.is_terminal(job_run_status):
+                raise DbtCloudJobRunException(
+                    f"Job run {run_id} reached terminal status "
+                    f"{DbtCloudJobRunStatus(job_run_status).name} "
+                    f"before reaching expected statuses {expected_statuses}"
+                )
+
             # Check if the job-run duration has exceeded the ``timeout`` 
configured.
             if start_time + timeout < time.monotonic():
                 raise DbtCloudJobRunException(
@@ -820,10 +832,6 @@ class DbtCloudHook(HttpHook):
             # Wait to check the status of the job run based on the 
``check_interval`` configured.
             time.sleep(check_interval)
 
-            job_run_status = self.get_job_run_status(**job_run_info)
-
-        return job_run_status in expected_statuses
-
     @fallback_to_default_account
     def cancel_job_run(self, run_id: int, account_id: int | None = None) -> 
None:
         """
diff --git 
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
index 63ce7955697..ad5f1418807 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py
@@ -256,9 +256,14 @@ class DbtCloudRunJobOperator(BaseOperator):
         return int(event["run_id"])
 
     def on_kill(self) -> None:
-        if self.run_id:
-            self.hook.cancel_job_run(account_id=self.account_id, 
run_id=self.run_id)
+        if not self.run_id:
+            return
 
+        self.hook.cancel_job_run(account_id=self.account_id, 
run_id=self.run_id)
+
+        # Attempt best-effort confirmation of cancellation.
+        try:
+            # This can raise a DbtCloudJobRunException under normal operation.
             if self.hook.wait_for_job_run_status(
                 run_id=self.run_id,
                 account_id=self.account_id,
@@ -268,6 +273,13 @@ class DbtCloudRunJobOperator(BaseOperator):
             ):
                 self.log.info("Job run %s has been cancelled successfully.", 
self.run_id)
 
+        except DbtCloudJobRunException as exc:
+            self.log.warning(
+                "Failed to confirm cancellation of job run %s during task 
kill: %s",
+                self.run_id,
+                exc,
+            )
+
     @cached_property
     def hook(self):
         """Returns DBT Cloud hook."""
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/hooks/test_dbt.py 
b/providers/dbt/cloud/tests/unit/dbt/cloud/hooks/test_dbt.py
index 45f940236b7..6cddddd4bf1 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/hooks/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/hooks/test_dbt.py
@@ -904,8 +904,8 @@ class TestDbtCloudHook:
 
     wait_for_job_run_status_test_args = [
         (DbtCloudJobRunStatus.SUCCESS.value, 
DbtCloudJobRunStatus.SUCCESS.value, True),
-        (DbtCloudJobRunStatus.ERROR.value, DbtCloudJobRunStatus.SUCCESS.value, 
False),
-        (DbtCloudJobRunStatus.CANCELLED.value, 
DbtCloudJobRunStatus.SUCCESS.value, False),
+        (DbtCloudJobRunStatus.ERROR.value, DbtCloudJobRunStatus.SUCCESS.value, 
"exception"),
+        (DbtCloudJobRunStatus.CANCELLED.value, 
DbtCloudJobRunStatus.SUCCESS.value, "exception"),
         (DbtCloudJobRunStatus.RUNNING.value, 
DbtCloudJobRunStatus.SUCCESS.value, "timeout"),
         (DbtCloudJobRunStatus.QUEUED.value, 
DbtCloudJobRunStatus.SUCCESS.value, "timeout"),
         (DbtCloudJobRunStatus.STARTING.value, 
DbtCloudJobRunStatus.SUCCESS.value, "timeout"),
@@ -943,7 +943,7 @@ class TestDbtCloudHook:
         ):
             mock_job_run_status.return_value = job_run_status
 
-            if expected_output != "timeout":
+            if expected_output not in ("timeout", "exception"):
                 assert hook.wait_for_job_run_status(**config) == 
expected_output
             else:
                 with pytest.raises(DbtCloudJobRunException):
diff --git a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py 
b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
index 4aa7a72f962..e41089c9647 100644
--- a/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
+++ b/providers/dbt/cloud/tests/unit/dbt/cloud/operators/test_dbt.py
@@ -376,7 +376,7 @@ class TestDbtCloudRunJobOperator:
                 assert mock_run_job.return_value.data["id"] == RUN_ID
             elif expected_output == "exception":
                 # The operator should fail if the job run fails or is 
cancelled.
-                error_message = r"has failed or has been cancelled\.$"
+                error_message = r"reached terminal status (ERROR|CANCELLED)"
                 with pytest.raises(DbtCloudJobRunException, 
match=error_message):
                     operator.execute(context=self.mock_context)
             else:

Reply via email to