This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8248e048fdd Render Jinja templates in CloudBatchSubmitJobOperator job
field (#67021)
8248e048fdd is described below
commit 8248e048fddad3d079c2ae8c160e2f6c172f4d95
Author: Aaron Chen <[email protected]>
AuthorDate: Sun May 17 12:28:11 2026 -0700
Render Jinja templates in CloudBatchSubmitJobOperator job field (#67021)
---
.../google/cloud/operators/cloud_batch.py | 7 ++-
.../google/cloud/operators/test_cloud_batch.py | 57 +++++++++++++++++++++-
2 files changed, 62 insertions(+), 2 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/operators/cloud_batch.py
b/providers/google/src/airflow/providers/google/cloud/operators/cloud_batch.py
index bee44dd5c3f..914559070b7 100644
---
a/providers/google/src/airflow/providers/google/cloud/operators/cloud_batch.py
+++
b/providers/google/src/airflow/providers/google/cloud/operators/cloud_batch.py
@@ -57,7 +57,8 @@ class CloudBatchSubmitJobOperator(GoogleCloudBaseOperator):
"""
- template_fields = ("project_id", "region", "gcp_conn_id",
"impersonation_chain", "job_name")
+ template_fields = ("project_id", "region", "gcp_conn_id",
"impersonation_chain", "job_name", "job")
+ template_fields_renderers = {"job": "json"}
def __init__(
self,
@@ -77,6 +78,10 @@ class CloudBatchSubmitJobOperator(GoogleCloudBaseOperator):
self.region = region
self.job_name = job_name
self.job = job
+ # Normalize Job protobuf to dict so Airflow's template renderer can
descend
+ # into nested fields (e.g. runnable.container.commands). See #37217.
+ if isinstance(job, Job):
+ self.job = Job.to_dict(job)
self.polling_period_seconds = polling_period_seconds
self.timeout_seconds = timeout_seconds
self.gcp_conn_id = gcp_conn_id
diff --git
a/providers/google/tests/unit/google/cloud/operators/test_cloud_batch.py
b/providers/google/tests/unit/google/cloud/operators/test_cloud_batch.py
index b90044b5644..1b688bb65df 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_cloud_batch.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_batch.py
@@ -17,6 +17,8 @@
# under the License.
from __future__ import annotations
+import json
+from datetime import datetime
from unittest import mock
import pytest
@@ -52,7 +54,7 @@ class TestCloudBatchSubmitJobOperator:
assert completed_job["name"] == JOB_NAME
mock.return_value.submit_batch_job.assert_called_with(
- job_name=JOB_NAME, job=JOB, region=REGION, project_id=PROJECT_ID
+ job_name=JOB_NAME, job=batch_v1.Job.to_dict(JOB), region=REGION,
project_id=PROJECT_ID
)
mock.return_value.wait_for_job.assert_called()
@@ -92,6 +94,59 @@ class TestCloudBatchSubmitJobOperator:
operator.execute_complete(context=mock.MagicMock(), event=event)
+def _job_dict_with_template() -> dict:
+ return {
+ "task_groups": [
+ {
+ "task_spec": {
+ "runnables": [
+ {
+ "container": {
+ "image_uri":
"gcr.io/google-containers/busybox",
+ "entrypoint": "/bin/sh",
+ "commands": ["-c", "echo {{ ds }}"],
+ }
+ }
+ ]
+ }
+ }
+ ],
+ "labels": {"run_id": "{{ run_id }}"},
+ }
+
+
+class TestCloudBatchSubmitJobOperatorTemplating:
+ def test_template_fields_includes_job(self):
+ assert "job" in CloudBatchSubmitJobOperator.template_fields
+
+ @pytest.mark.db_test
+ @pytest.mark.parametrize(
+ "job_input_factory",
+ [
+ pytest.param(lambda d: d, id="dict"),
+ pytest.param(lambda d: batch_v1.Job.from_json(json.dumps(d)),
id="protobuf-Job"),
+ ],
+ )
+ def test_jinja_in_job_commands_is_rendered(self,
create_task_instance_of_operator, job_input_factory):
+ ti = create_task_instance_of_operator(
+ CloudBatchSubmitJobOperator,
+ dag_id="test_cloud_batch_render",
+ task_id=TASK_ID,
+ project_id=PROJECT_ID,
+ region=REGION,
+ job_name=JOB_NAME,
+ job=job_input_factory(_job_dict_with_template()),
+ logical_date=datetime(2026, 1, 15),
+ )
+ task = ti.render_templates()
+
+ assert isinstance(task.job, dict)
+ rendered_cmd =
task.job["task_groups"][0]["task_spec"]["runnables"][0]["container"]["commands"][1]
+ assert rendered_cmd == "echo 2026-01-15"
+ # dag_maker's default run_id is "test"; the point is {{ run_id }} got
substituted at all.
+ assert task.job["labels"]["run_id"] == "test"
+
+
class TestCloudBatchDeleteJobOperator:
@mock.patch(CLOUD_BATCH_HOOK_PATH)
def test_execute(self, hook_mock):