This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3d5c762aeca Forward Airflow Dag params to Databricks job parameters in
CreateJobs/SubmitRun/RunNow (#66613)
3d5c762aeca is described below
commit 3d5c762aecae2a6b66b887771ba82403bb0a13e5
Author: Noritaka Sekiyama <[email protected]>
AuthorDate: Tue May 26 15:36:08 2026 +0900
Forward Airflow Dag params to Databricks job parameters in
CreateJobs/SubmitRun/RunNow (#66613)
* Pass airflow config as job parameters in databrickCreateJobOperator
* Adds UT
* Refactor airflow params auto-injection and extend to RunNow / SubmitRun
Apply Lee-W's review suggestion from PR #39007: replace the manual loop
with a list comprehension that uses ``params.dump()`` (the original
``params.items()`` iteration yielded ``Param`` objects rather than the
resolved values, which would not serialise into the Databricks API).
Extend the same pattern to:
* DatabricksRunNowOperator -> populate top-level ``job_parameters`` (the
dict-shaped slot already supported by the run-now endpoint).
* DatabricksSubmitRunOperator -> populate dict-shaped per-task parameter
fields (notebook_task.base_parameters, python_wheel_task.named_parameters,
sql_task.parameters, run_job_task.job_parameters). Tasks whose only
parameter field is ``List[str]`` (spark_jar_task, spark_python_task,
spark_submit_task) are intentionally skipped because there is no
canonical mapping from a key/value dict to positional CLI arguments.
Drop the ``"parameters": []`` expectation that was added to the existing
test_exec_create / test_exec_reset cases by PR #39007 — it never matched
the source logic (``self.params`` is falsy when no params are set, so no
``parameters`` key is added).
Add tests covering: auto-injection for each operator, no override when
the field is already populated, and the per-task injection rules for
SubmitRun.
* Document Airflow params auto-injection in operator user guides
Add a "Forwarding Airflow Dag params" section to the jobs_create, run_now,
and submit_run operator guides describing the new behaviour: when the
operator's params dict is non-empty and the corresponding json slot is
empty, params are auto-injected as job-level parameters / job_parameters /
per-task dict-shaped parameters respectively.
* Fix static checks: PT006 tuple form for parametrize, dict() for
self.params
- pytest.mark.parametrize first arg must be a tuple of names, not a
comma-separated
string (PT006).
- Replace self.params.dump() with dict(self.params) so the call works on
both the
ParamsDict and the plain-dict legs of self.params' union type, satisfying
mypy union-attr.
* Clarify Airflow-params-to-Databricks-params mapping in operator docs
Address Lee-W's review feedback that the auto-injection example was hard to
parse. Each operator's section now:
- Names the exact Databricks API field being populated and links to its
schema (parameters / job_parameters / per-task slots).
- States explicitly that each <key>: <value> pair in params becomes one
{"name": <key>, "default": <value>} entry (CreateJobs) or is passed
through unchanged (RunNow / SubmitRun).
- Splits params into a named variable in the CreateJobs example so the
key/value to name/default mapping reads top-to-bottom.
---------
Co-authored-by: subham611 <[email protected]>
---
.../databricks/docs/operators/jobs_create.rst | 35 +++++
providers/databricks/docs/operators/run_now.rst | 25 +++
providers/databricks/docs/operators/submit_run.rst | 32 ++++
.../providers/databricks/operators/databricks.py | 58 +++++++
.../unit/databricks/operators/test_databricks.py | 174 +++++++++++++++++++++
5 files changed, 324 insertions(+)
diff --git a/providers/databricks/docs/operators/jobs_create.rst
b/providers/databricks/docs/operators/jobs_create.rst
index fd2d9a906ff..115dcc39d09 100644
--- a/providers/databricks/docs/operators/jobs_create.rst
+++ b/providers/databricks/docs/operators/jobs_create.rst
@@ -58,6 +58,41 @@ Currently the named parameters that
``DatabricksCreateJobsOperator`` supports ar
- ``access_control_list``
+Forwarding Airflow Dag params as Databricks job parameters
+----------------------------------------------------------
+
+The Databricks ``api/2.2/jobs/create`` endpoint accepts a top-level
``parameters`` field
+that defines `job-level parameters
+<https://docs.databricks.com/api/workspace/jobs/create#parameters>`_ — a list
of objects
+with a ``name`` (the parameter name) and a ``default`` (its default value),
for example
+``[{"name": "env", "default": "prod"}]``.
+
+If ``parameters`` is not set in ``json`` and the operator's ``params`` dict is
non-empty,
+each key/value pair in ``params`` is converted into one such ``{"name": key,
"default":
+value}`` entry, so that Airflow Dag params can be forwarded as Databricks job
parameters
+without hardcoding the API shape in ``json``. If ``json`` already contains
``parameters``,
+it is left untouched.
+
+.. code-block:: python
+
+ # Airflow Dag params (key/value pairs)
+ params = {"env": "prod", "batch_size": "100"}
+
+ create_job = DatabricksCreateJobsOperator(
+ task_id="create_job",
+ json={"name": "my-job", "tasks": [...]},
+ params=params,
+ )
+
+ # The Databricks job created/reset by the operator will have:
+ # parameters=[
+ # {"name": "env", "default": "prod"},
+ # {"name": "batch_size", "default": "100"},
+ # ]
+ # i.e. each "<key>: <value>" in params becomes one
+ # {"name": "<key>", "default": "<value>"} entry in the job definition.
+
+
Examples
--------
diff --git a/providers/databricks/docs/operators/run_now.rst
b/providers/databricks/docs/operators/run_now.rst
index 1b51124e5f4..f39d872a0c1 100644
--- a/providers/databricks/docs/operators/run_now.rst
+++ b/providers/databricks/docs/operators/run_now.rst
@@ -49,6 +49,31 @@ All other parameters are optional and described in
documentation for ``Databrick
* ``repair_run``
* ``cancel_previous_runs``
+Forwarding Airflow Dag params as Databricks job parameters
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The Databricks ``api/2.2/jobs/run-now`` endpoint accepts a top-level
`job_parameters
+<https://docs.databricks.com/api/workspace/jobs/runnow#job_parameters>`_ field
— a plain
+``Dict[str, str]`` mapping parameter name to value — that overrides the job's
defaults
+for this run.
+
+If ``job_parameters`` is not set in ``json`` and the operator's ``params``
dict is
+non-empty, ``params`` is forwarded as ``job_parameters`` as-is, so Airflow Dag
params can
+be passed dynamically to a run without hardcoding them in ``json``. If
``json`` already
+contains ``job_parameters``, it is left untouched.
+
+.. code-block:: python
+
+ run_now = DatabricksRunNowOperator(
+ task_id="run_now",
+ job_id=123,
+ params={"env": "staging", "batch_size": "42"},
+ )
+ # The triggered run receives:
+ # job_parameters={"env": "staging", "batch_size": "42"}
+ # i.e. the same dict, passed straight through to the run-now request body.
+
+
DatabricksRunNowDeferrableOperator
==================================
diff --git a/providers/databricks/docs/operators/submit_run.rst
b/providers/databricks/docs/operators/submit_run.rst
index 522a051d2e7..f4f78d2fa33 100644
--- a/providers/databricks/docs/operators/submit_run.rst
+++ b/providers/databricks/docs/operators/submit_run.rst
@@ -104,6 +104,38 @@ Another way to do is use the param tasks to pass array of
objects to instantiate
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run",
tasks=tasks)
+Forwarding Airflow Dag params as task parameters
+------------------------------------------------
+
+Unlike ``api/2.2/jobs/create`` and ``api/2.2/jobs/run-now``, the
+``api/2.2/jobs/runs/submit`` endpoint has no top-level parameter slot — each
task in
+``tasks`` carries its own parameters whose shape depends on the task type.
+
+If the operator's ``params`` dict is non-empty, it is forwarded as-is into the
+dict-shaped parameter slot of every task in ``json`` whose corresponding field
is empty:
+
+* ``notebook_task.base_parameters`` (e.g. for ``notebook_task``)
+* ``python_wheel_task.named_parameters``
+* ``sql_task.parameters``
+* ``run_job_task.job_parameters``
+
+Tasks whose only parameter slot is ``List[str]`` (``spark_jar_task``,
``spark_python_task``,
+``spark_submit_task``) are skipped because there is no canonical mapping from
a key/value
+dict to a positional argument list — pass those parameters explicitly via the
``json``
+or ``tasks`` argument.
+
+.. code-block:: python
+
+ notebook_run = DatabricksSubmitRunOperator(
+ task_id="notebook_run",
+ notebook_task={"notebook_path":
"/Users/[email protected]/PrepareData"},
+ new_cluster={"spark_version": "15.4.x-scala2.12", "num_workers": 2},
+ params={"env": "dev", "shard": "1"},
+ )
+ # The submitted run's notebook_task.base_parameters becomes:
+ # {"env": "dev", "shard": "1"}
+ # i.e. the same dict, copied into the task's dict-shaped parameter slot.
+
Examples
--------
diff --git
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
index 4c581215be4..c9e7c6891fb 100644
---
a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
+++
b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py
@@ -260,6 +260,27 @@ def
_handle_deferrable_databricks_operator_completion(event: dict, log: Logger)
raise AirflowException(error_message)
+# Mapping of task definition keys (in the runs/submit JSON) to the dict-shaped
+# parameter sub-key into which Airflow ``self.params`` will be auto-injected.
+# Tasks whose only parameter field is ``List[str]`` (e.g. spark_python_task,
+# spark_jar_task, spark_submit_task) are intentionally omitted because there is
+# no canonical way to convert a key/value dict to positional/CLI arguments.
+_DICT_PARAM_FIELD_BY_TASK = {
+ "notebook_task": "base_parameters",
+ "python_wheel_task": "named_parameters",
+ "sql_task": "parameters",
+ "run_job_task": "job_parameters",
+}
+
+
+def _inject_airflow_params_into_task(task: dict, params: dict) -> None:
+ """Set dict-shaped per-task parameter fields from ``params`` if they are
not already set."""
+ for task_key, field in _DICT_PARAM_FIELD_BY_TASK.items():
+ task_def = task.get(task_key)
+ if isinstance(task_def, dict) and not task_def.get(field):
+ task_def[field] = dict(params)
+
+
class DatabricksJobRunLink(BaseOperatorLink):
"""Constructs a link to monitor a Databricks Job Run."""
@@ -323,6 +344,12 @@ class DatabricksCreateJobsOperator(BaseOperator):
might be a floating point number).
:param databricks_retry_args: An optional dictionary with arguments passed
to ``tenacity.Retrying`` class.
+ .. note::
+ If ``parameters`` is not set in ``json`` and the operator's ``params``
dict is non-empty,
+ the operator's ``params`` are automatically converted to job-level
``parameters`` (a list
+ of ``{"name": k, "default": v}`` entries) so that Airflow Dag params
can be forwarded as
+ Databricks job parameters without hardcoding them in ``json``.
+
"""
# Used in airflow.models.BaseOperator
@@ -406,6 +433,8 @@ class DatabricksCreateJobsOperator(BaseOperator):
if "name" not in self.json:
raise AirflowException("Missing required parameter: name")
job_id = self._hook.find_job_id_by_name(self.json["name"])
+ if not self.json.get("parameters") and self.params:
+ self.json["parameters"] = [{"name": k, "default": v} for k, v in
dict(self.params).items()]
if job_id is None:
return self._hook.create_job(self.json)
self._hook.reset_job(str(job_id), self.json)
@@ -531,6 +560,15 @@ class DatabricksSubmitRunOperator(BaseOperator):
.. seealso::
https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit
+
+ .. note::
+ If the operator's ``params`` dict is non-empty, it is automatically
forwarded into the
+ dict-shaped parameter slot of every task in ``json`` whose
corresponding field is empty:
+ ``notebook_task.base_parameters``,
``python_wheel_task.named_parameters``,
+ ``sql_task.parameters``, ``run_job_task.job_parameters``. Tasks whose
only parameter
+ field is ``List[str]`` (``spark_jar_task``, ``spark_python_task``,
``spark_submit_task``)
+ are skipped because there is no canonical mapping from a key/value
dict to a positional
+ argument list.
"""
# Used in airflow.models.BaseOperator
@@ -645,6 +683,17 @@ class DatabricksSubmitRunOperator(BaseOperator):
pipeline_name = self.json["pipeline_task"]["pipeline_name"]
self.json["pipeline_task"]["pipeline_id"] =
self._hook.find_pipeline_id_by_name(pipeline_name)
del self.json["pipeline_task"]["pipeline_name"]
+
+ if self.params:
+ params_dump = dict(self.params)
+ tasks = self.json.get("tasks")
+ if isinstance(tasks, list):
+ for task in tasks:
+ if isinstance(task, dict):
+ _inject_airflow_params_into_task(task, params_dump)
+ else:
+ _inject_airflow_params_into_task(self.json, params_dump)
+
json_normalised = normalise_json_content(self.json)
self.run_id = self._hook.submit_run(json_normalised)
if self.deferrable:
@@ -844,6 +893,12 @@ class DatabricksRunNowOperator(BaseOperator):
(https://docs.databricks.com/api/workspace/jobs/update). If
nothing is matched, then repair
will not get triggered.
:param cancel_previous_runs: Cancel all existing running jobs before
submitting new one.
+
+ .. note::
+ If ``job_parameters`` is not set in ``json`` and the operator's
``params`` dict is
+ non-empty, the operator's ``params`` are automatically forwarded as
``job_parameters``
+ so that Airflow Dag params can be passed dynamically to Databricks
runs without
+ hardcoding them in ``json``.
"""
# Used in airflow.models.BaseOperator
@@ -953,6 +1008,9 @@ class DatabricksRunNowOperator(BaseOperator):
hook.cancel_all_runs(job_id)
+ if not self.json.get("job_parameters") and self.params:
+ self.json["job_parameters"] = dict(self.params)
+
self.run_id = hook.run_now(self.json)
if self.deferrable:
_handle_deferrable_databricks_operator_execution(self, hook,
self.log, context)
diff --git
a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
index 84c625cddb7..986cf51f1cf 100644
--- a/providers/databricks/tests/unit/databricks/operators/test_databricks.py
+++ b/providers/databricks/tests/unit/databricks/operators/test_databricks.py
@@ -277,6 +277,7 @@ ACCESS_CONTROL_LIST = [
"permission_level": "CAN_MANAGE",
}
]
+JOB_PARAMS = [{"name": "param1", "default": "value1"}]
def mock_dict(d: dict):
@@ -602,6 +603,67 @@ class TestDatabricksCreateJobsOperator:
db_mock.update_job_permission.assert_not_called()
+ @pytest.mark.parametrize(
+ ("found_job_id", "hook_method"),
+ [
+ pytest.param(None, "create_job", id="create-path"),
+ pytest.param(JOB_ID, "reset_job", id="reset-path"),
+ ],
+ )
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ def test_injects_airflow_params_when_parameters_missing(self,
db_mock_class, found_job_id, hook_method):
+ """
+ When ``parameters`` is not set in ``json`` and the operator's
``params`` dict is
+ non-empty, the operator's ``params`` should be forwarded as job-level
+ ``parameters`` on both the create and reset paths (regression test for
+ GH-39002).
+ """
+ op = DatabricksCreateJobsOperator(
+ task_id=TASK_ID,
+ json={"name": JOB_NAME, "tasks": TASKS},
+ params={"env": "prod", "batch_size": 100},
+ )
+ db_mock = db_mock_class.return_value
+ db_mock.find_job_id_by_name.return_value = found_job_id
+
+ op.execute({})
+
+ # The create-path passes the settings dict directly; the reset-path
passes
+ # (job_id, settings) — pull the settings out in either case.
+ call_args = getattr(db_mock, hook_method).call_args.args
+ settings = call_args[0] if hook_method == "create_job" else
call_args[1]
+ assert settings["parameters"] == [
+ {"name": "env", "default": "prod"},
+ {"name": "batch_size", "default": 100},
+ ]
+
+ @pytest.mark.parametrize(
+ ("found_job_id", "hook_method"),
+ [
+ pytest.param(None, "create_job", id="create-path"),
+ pytest.param(JOB_ID, "reset_job", id="reset-path"),
+ ],
+ )
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ def test_does_not_override_existing_parameters(self, db_mock_class,
found_job_id, hook_method):
+ """
+ When ``parameters`` is already set in ``json``, the operator's
``params`` must
+ not override it on either the create or reset paths.
+ """
+ op = DatabricksCreateJobsOperator(
+ task_id=TASK_ID,
+ json={"name": JOB_NAME, "tasks": TASKS, "parameters": JOB_PARAMS},
+ params={"env": "prod"},
+ )
+ db_mock = db_mock_class.return_value
+ db_mock.find_job_id_by_name.return_value = found_job_id
+
+ op.execute({})
+
+ call_args = getattr(db_mock, hook_method).call_args.args
+ settings = call_args[0] if hook_method == "create_job" else
call_args[1]
+ assert settings["parameters"] == JOB_PARAMS
+
class TestDatabricksSubmitRunOperator:
def test_init_with_notebook_task_named_parameters(self):
@@ -1183,6 +1245,76 @@ class TestDatabricksSubmitRunOperator:
assert op.run_id == RUN_ID
assert not mock_defer.called
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ def test_submit_run_injects_airflow_params_into_notebook_task(self,
db_mock_class):
+ """
+ For a single notebook_task, ``self.params`` should be injected into
+ ``notebook_task.base_parameters`` (regression test for GH-39002).
+ """
+ op = DatabricksSubmitRunOperator(
+ task_id=TASK_ID,
+ notebook_task={"notebook_path": "/Users/me/notebook"},
+ new_cluster=NEW_CLUSTER,
+ params={"env": "prod", "batch_size": "100"},
+ )
+ db_mock = db_mock_class.return_value
+ db_mock.submit_run.return_value = RUN_ID
+ db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+ op.execute(None)
+
+ actual = db_mock.submit_run.call_args.args[0]
+ assert actual["notebook_task"]["base_parameters"] == {"env": "prod",
"batch_size": "100"}
+
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ def
test_submit_run_injects_airflow_params_into_each_task_in_tasks_list(self,
db_mock_class):
+ """
+ For multiple ``tasks``, dict-shaped per-task params should be filled
in for
+ each task that supports them.
+ """
+ op = DatabricksSubmitRunOperator(
+ task_id=TASK_ID,
+ tasks=[
+ {"task_key": "t1", "notebook_task": {"notebook_path": "/n1"}},
+ {"task_key": "t2", "spark_jar_task": {"main_class_name":
"Foo"}},
+ ],
+ params={"env": "prod"},
+ )
+ db_mock = db_mock_class.return_value
+ db_mock.submit_run.return_value = RUN_ID
+ db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+ op.execute(None)
+
+ actual = db_mock.submit_run.call_args.args[0]
+ assert actual["tasks"][0]["notebook_task"]["base_parameters"] ==
{"env": "prod"}
+ # spark_jar_task only accepts List[str] parameters; skip
auto-injection.
+ assert "parameters" not in actual["tasks"][1]["spark_jar_task"]
+
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ def test_submit_run_does_not_override_existing_task_parameters(self,
db_mock_class):
+ """
+ If a dict-shaped per-task parameter field is already populated,
``self.params``
+ should not override it.
+ """
+ op = DatabricksSubmitRunOperator(
+ task_id=TASK_ID,
+ notebook_task={
+ "notebook_path": "/Users/me/notebook",
+ "base_parameters": {"explicit": "value"},
+ },
+ new_cluster=NEW_CLUSTER,
+ params={"env": "prod"},
+ )
+ db_mock = db_mock_class.return_value
+ db_mock.submit_run.return_value = RUN_ID
+ db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+ op.execute(None)
+
+ actual = db_mock.submit_run.call_args.args[0]
+ assert actual["notebook_task"]["base_parameters"] == {"explicit":
"value"}
+
class TestDatabricksRunNowOperator:
def test_init_with_named_parameters(self):
@@ -1999,6 +2131,48 @@ class TestDatabricksRunNowOperator:
assert op.run_id == RUN_ID
assert not mock_defer.called
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ def test_run_now_injects_airflow_params_when_job_parameters_missing(self,
db_mock_class):
+ """
+ When ``job_parameters`` is not set in ``json`` and the operator's
``params`` dict is
+ non-empty, the operator's ``params`` should be forwarded as
``job_parameters``
+ (regression test for GH-39002).
+ """
+ op = DatabricksRunNowOperator(
+ task_id=TASK_ID,
+ job_id=JOB_ID,
+ params={"env": "prod", "batch_size": 100},
+ )
+ db_mock = db_mock_class.return_value
+ db_mock.run_now.return_value = RUN_ID
+ db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+ op.execute(None)
+
+ actual = db_mock.run_now.call_args.args[0]
+ assert actual["job_parameters"] == {"env": "prod", "batch_size": 100}
+
+
@mock.patch("airflow.providers.databricks.operators.databricks.DatabricksHook")
+ def test_run_now_does_not_override_existing_job_parameters(self,
db_mock_class):
+ """
+ When ``job_parameters`` is already set in ``json``, the operator's
``params`` should
+ not override it.
+ """
+ op = DatabricksRunNowOperator(
+ task_id=TASK_ID,
+ job_id=JOB_ID,
+ json={"job_parameters": {"explicit": "value"}},
+ params={"env": "prod"},
+ )
+ db_mock = db_mock_class.return_value
+ db_mock.run_now.return_value = RUN_ID
+ db_mock.get_run = make_run_with_state_mock("TERMINATED", "SUCCESS")
+
+ op.execute(None)
+
+ actual = db_mock.run_now.call_args.args[0]
+ assert actual["job_parameters"] == {"explicit": "value"}
+
class TestDatabricksSQLStatementsOperator:
def test_init(self):