This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8248e048fdd Render Jinja templates in CloudBatchSubmitJobOperator job 
field (#67021)
8248e048fdd is described below

commit 8248e048fddad3d079c2ae8c160e2f6c172f4d95
Author: Aaron Chen <[email protected]>
AuthorDate: Sun May 17 12:28:11 2026 -0700

    Render Jinja templates in CloudBatchSubmitJobOperator job field (#67021)
---
 .../google/cloud/operators/cloud_batch.py          |  7 ++-
 .../google/cloud/operators/test_cloud_batch.py     | 57 +++++++++++++++++++++-
 2 files changed, 62 insertions(+), 2 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/operators/cloud_batch.py 
b/providers/google/src/airflow/providers/google/cloud/operators/cloud_batch.py
index bee44dd5c3f..914559070b7 100644
--- 
a/providers/google/src/airflow/providers/google/cloud/operators/cloud_batch.py
+++ 
b/providers/google/src/airflow/providers/google/cloud/operators/cloud_batch.py
@@ -57,7 +57,8 @@ class CloudBatchSubmitJobOperator(GoogleCloudBaseOperator):
 
     """
 
-    template_fields = ("project_id", "region", "gcp_conn_id", 
"impersonation_chain", "job_name")
+    template_fields = ("project_id", "region", "gcp_conn_id", 
"impersonation_chain", "job_name", "job")
+    template_fields_renderers = {"job": "json"}
 
     def __init__(
         self,
@@ -77,6 +78,10 @@ class CloudBatchSubmitJobOperator(GoogleCloudBaseOperator):
         self.region = region
         self.job_name = job_name
         self.job = job
+        # Normalize Job protobuf to dict so Airflow's template renderer can 
descend
+        # into nested fields (e.g. runnable.container.commands). See #37217.
+        if isinstance(job, Job):
+            self.job = Job.to_dict(job)
         self.polling_period_seconds = polling_period_seconds
         self.timeout_seconds = timeout_seconds
         self.gcp_conn_id = gcp_conn_id
diff --git 
a/providers/google/tests/unit/google/cloud/operators/test_cloud_batch.py 
b/providers/google/tests/unit/google/cloud/operators/test_cloud_batch.py
index b90044b5644..1b688bb65df 100644
--- a/providers/google/tests/unit/google/cloud/operators/test_cloud_batch.py
+++ b/providers/google/tests/unit/google/cloud/operators/test_cloud_batch.py
@@ -17,6 +17,8 @@
 # under the License.
 from __future__ import annotations
 
+import json
+from datetime import datetime
 from unittest import mock
 
 import pytest
@@ -52,7 +54,7 @@ class TestCloudBatchSubmitJobOperator:
         assert completed_job["name"] == JOB_NAME
 
         mock.return_value.submit_batch_job.assert_called_with(
-            job_name=JOB_NAME, job=JOB, region=REGION, project_id=PROJECT_ID
+            job_name=JOB_NAME, job=batch_v1.Job.to_dict(JOB), region=REGION, 
project_id=PROJECT_ID
         )
         mock.return_value.wait_for_job.assert_called()
 
@@ -92,6 +94,59 @@ class TestCloudBatchSubmitJobOperator:
             operator.execute_complete(context=mock.MagicMock(), event=event)
 
 
+def _job_dict_with_template() -> dict:
+    return {
+        "task_groups": [
+            {
+                "task_spec": {
+                    "runnables": [
+                        {
+                            "container": {
+                                "image_uri": 
"gcr.io/google-containers/busybox",
+                                "entrypoint": "/bin/sh",
+                                "commands": ["-c", "echo {{ ds }}"],
+                            }
+                        }
+                    ]
+                }
+            }
+        ],
+        "labels": {"run_id": "{{ run_id }}"},
+    }
+
+
+class TestCloudBatchSubmitJobOperatorTemplating:
+    def test_template_fields_includes_job(self):
+        assert "job" in CloudBatchSubmitJobOperator.template_fields
+
+    @pytest.mark.db_test
+    @pytest.mark.parametrize(
+        "job_input_factory",
+        [
+            pytest.param(lambda d: d, id="dict"),
+            pytest.param(lambda d: batch_v1.Job.from_json(json.dumps(d)), 
id="protobuf-Job"),
+        ],
+    )
+    def test_jinja_in_job_commands_is_rendered(self, 
create_task_instance_of_operator, job_input_factory):
+        ti = create_task_instance_of_operator(
+            CloudBatchSubmitJobOperator,
+            dag_id="test_cloud_batch_render",
+            task_id=TASK_ID,
+            project_id=PROJECT_ID,
+            region=REGION,
+            job_name=JOB_NAME,
+            job=job_input_factory(_job_dict_with_template()),
+            logical_date=datetime(2026, 1, 15),
+        )
+        task = ti.render_templates()
+
+        assert isinstance(task.job, dict)
+        rendered_cmd = 
task.job["task_groups"][0]["task_spec"]["runnables"][0]["container"]["commands"][1]
+        assert rendered_cmd == "echo 2026-01-15"
+        # dag_maker's default run_id is "test"; the point is {{ run_id }} got 
substituted at all.
+        assert task.job["labels"]["run_id"] == "test"
+
+
 class TestCloudBatchDeleteJobOperator:
     @mock.patch(CLOUD_BATCH_HOOK_PATH)
     def test_execute(self, hook_mock):

Reply via email to