This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 08424fe4da2 Raise ValueError instead of KeyError when
cancel_previous_runs=True and no job identifier is provided (#62393)
08424fe4da2 is described below
commit 08424fe4da204d8341525c46476bf173cd77ecfa
Author: SameerMesiah97 <[email protected]>
AuthorDate: Mon Mar 9 09:24:21 2026 +0000
Raise ValueError instead of KeyError when cancel_previous_runs=True and no
job identifier is provided (#62393)
Co-authored-by: Wei Lee <[email protected]>
Co-authored-by: Sameer Mesiah <[email protected]>
---
.../providers/databricks/operators/databricks.py | 9 ++++--
.../unit/databricks/operators/test_databricks.py | 36 +++++++++++++++++++++-
2 files changed, 42 insertions(+), 3 deletions(-)
diff --git
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
index fe6240dad5c..2d70602bbd5 100644
---
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
+++
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
@@ -921,8 +921,13 @@ class DatabricksRunNowOperator(BaseOperator):
self.json["job_id"] = job_id
del self.json["job_name"]
- if self.cancel_previous_runs and self.json["job_id"] is not None:
- hook.cancel_all_runs(self.json["job_id"])
+ if self.cancel_previous_runs:
+ if (job_id := self.json.get("job_id")) is None:
+ raise ValueError(
+ "cancel_previous_runs=True requires either job_id or
job_name to be provided."
+ )
+
+ hook.cancel_all_runs(job_id)
self.run_id = hook.run_now(self.json)
if self.deferrable:
diff --git
a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
index 9c7b070a636..ca2561dca07 100644
--- a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
+++ b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
@@ -21,7 +21,7 @@ import hashlib
from datetime import datetime, timedelta
from typing import Any
from unittest import mock
-from unittest.mock import MagicMock
+from unittest.mock import MagicMock, call
import pytest
@@ -1634,6 +1634,40 @@ class TestDatabricksRunNowOperator:
db_mock.get_run_page_url.assert_called_once_with(RUN_ID)
db_mock.get_run.assert_not_called()
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ def test_cancel_previous_runs_without_job_id_raises(self, db_mock_class):
+ run = {
+ "notebook_params": NOTEBOOK_PARAMS,
+ "notebook_task": NOTEBOOK_TASK,
+ "jar_params": JAR_PARAMS,
+ }
+
+ op = DatabricksRunNowOperator(
+ task_id=TASK_ID,
+ json=run,
+ cancel_previous_runs=True,
+ )
+
+ db_mock = db_mock_class.return_value
+
+ with pytest.raises(
+ ValueError,
+ match="cancel_previous_runs=True requires either job_id or
job_name",
+ ):
+ op.execute(None)
+
+ assert db_mock_class.mock_calls == [
+ call(
+ DEFAULT_CONN_ID,
+ retry_limit=op.databricks_retry_limit,
+ retry_delay=op.databricks_retry_delay,
+ retry_args=None,
+ caller="DatabricksRunNowOperator",
+ )
+ ]
+ assert db_mock.cancel_all_runs.mock_calls == []
+ assert db_mock.run_now.mock_calls == []
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
def test_execute_task_deferred(self, db_mock_class):
"""