This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1b668c35d1a (fix): Render a pod spec using the pod_template_file 
override, if passed to the executor (#46374)
1b668c35d1a is described below

commit 1b668c35d1a5186c6f179e24984d6ba6d94960bb
Author: Balthazar Rouberol <[email protected]>
AuthorDate: Sun Feb 9 07:18:58 2025 +0100

    (fix): Render a pod spec using the pod_template_file override, if passed to 
the executor (#46374)
    
    If a task was created by a custom `executor_options['pod_template_file']` 
option,
    we make sure to render the `TaskInstance`'s associated `k8s_pod_spec`
    with this specific `pod_template_file`, to avoid seeing discrepancies
    between the spec visible in airflow and the one deployed to Kubernetes.
    
    Signed-off-by: Balthazar Rouberol <[email protected]>
---
 .../cncf/kubernetes/docs/kubernetes_executor.rst   |  1 +
 .../cncf/kubernetes/template_rendering.py          | 10 +++-
 .../cncf/kubernetes/test_template_rendering.py     | 62 +++++++++++++++++++++-
 3 files changed, 71 insertions(+), 2 deletions(-)

diff --git a/providers/cncf/kubernetes/docs/kubernetes_executor.rst 
b/providers/cncf/kubernetes/docs/kubernetes_executor.rst
index 91f1747e97a..2d9a4f5718a 100644
--- a/providers/cncf/kubernetes/docs/kubernetes_executor.rst
+++ b/providers/cncf/kubernetes/docs/kubernetes_executor.rst
@@ -155,6 +155,7 @@ name ``base`` and a second container containing your 
desired sidecar.
 
 You can also create custom ``pod_template_file`` on a per-task basis so that 
you can recycle the same base values between multiple tasks.
 This will replace the default ``pod_template_file`` named in the airflow.cfg 
and then override that template using the ``pod_override``.
+That ``pod_template_file`` will also be used to generate the Pod K8s Spec 
visible in the Airflow UI.
 
 Here is an example of a task with both features:
 
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
index 6abeeddde78..ed2fa7e92f4 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py
@@ -35,6 +35,14 @@ if TYPE_CHECKING:
 def render_k8s_pod_yaml(task_instance: TaskInstance) -> dict | None:
     """Render k8s pod yaml."""
     kube_config = KubeConfig()
+    if task_instance.executor_config and 
task_instance.executor_config.get("pod_template_file"):
+        # If a specific pod_template_file was passed to the executor, we make
+        # sure to render the k8s pod spec using this one, and not the default 
one.
+        pod_template_file = task_instance.executor_config["pod_template_file"]
+    else:
+        # If no such pod_template_file override was passed, we can simply 
render
+        # The pod spec using the default template.
+        pod_template_file = kube_config.pod_template_file
     pod = PodGenerator.construct_pod(
         dag_id=task_instance.dag_id,
         run_id=task_instance.run_id,
@@ -48,7 +56,7 @@ def render_k8s_pod_yaml(task_instance: TaskInstance) -> dict 
| None:
         
pod_override_object=PodGenerator.from_obj(task_instance.executor_config),
         scheduler_job_id="0",
         namespace=kube_config.executor_namespace,
-        
base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file),
+        base_worker_pod=PodGenerator.deserialize_model_file(pod_template_file),
         with_mutation_hook=True,
     )
     sanitized_pod = ApiClient().sanitize_for_serialization(pod)
diff --git 
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/test_template_rendering.py
 
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/test_template_rendering.py
index 6f512cdfe80..6262b3587ae 100644
--- 
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/test_template_rendering.py
+++ 
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/test_template_rendering.py
@@ -20,6 +20,8 @@ import os
 from unittest import mock
 
 import pytest
+import yaml
+from kubernetes.client import models as k8s
 from sqlalchemy.orm import make_transient
 
 from airflow.models.renderedtifields import RenderedTaskInstanceFields, 
RenderedTaskInstanceFields as RTIF
@@ -84,11 +86,69 @@ def test_render_k8s_pod_yaml(pod_mutation_hook, 
create_task_instance):
             ]
         },
     }
-
     assert render_k8s_pod_yaml(ti) == expected_pod_spec
     pod_mutation_hook.assert_called_once_with(mock.ANY)
 
 
[email protected](os.environ, {"AIRFLOW_IS_K8S_EXECUTOR_POD": "True"})
[email protected]("airflow.settings.pod_mutation_hook")
+def test_render_k8s_pod_yaml_with_custom_pod_template(pod_mutation_hook, 
create_task_instance, tmp_path):
+    with open(f"{tmp_path}/custom_pod_template.yaml", "w") as ptf:
+        template = {
+            "apiVersion": "v1",
+            "kind": "Pod",
+            "metadata": {"labels": {"custom_label": "custom_value"}},
+        }
+        ptf.write(yaml.dump(template))
+
+    ti = create_task_instance(
+        dag_id="test_render_k8s_pod_yaml",
+        run_id="test_run_id",
+        task_id="op1",
+        logical_date=DEFAULT_DATE,
+        executor_config={"pod_template_file": 
f"{tmp_path}/custom_pod_template.yaml"},
+    )
+
+    ti_pod_yaml = render_k8s_pod_yaml(ti)
+    assert "custom_label" in ti_pod_yaml["metadata"]["labels"]
+    assert ti_pod_yaml["metadata"]["labels"]["custom_label"] == "custom_value"
+
+
[email protected](os.environ, {"AIRFLOW_IS_K8S_EXECUTOR_POD": "True"})
[email protected]("airflow.settings.pod_mutation_hook")
+def test_render_k8s_pod_yaml_with_custom_pod_template_and_pod_override(
+    pod_mutation_hook, create_task_instance, tmp_path
+):
+    with open(f"{tmp_path}/custom_pod_template.yaml", "w") as ptf:
+        template = {
+            "apiVersion": "v1",
+            "kind": "Pod",
+            "metadata": {"labels": {"custom_label": "custom_value"}},
+        }
+        ptf.write(yaml.dump(template))
+
+    pod_override = k8s.V1Pod(
+        metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}, 
labels={"custom_label": "override"})
+    )
+    ti = create_task_instance(
+        dag_id="test_render_k8s_pod_yaml",
+        run_id="test_run_id",
+        task_id="op1",
+        logical_date=DEFAULT_DATE,
+        executor_config={
+            "pod_template_file": f"{tmp_path}/custom_pod_template.yaml",
+            "pod_override": pod_override,
+        },
+    )
+
+    ti_pod_yaml = render_k8s_pod_yaml(ti)
+    assert "custom_label" in ti_pod_yaml["metadata"]["labels"]
+    # The initial value associated with the custom_label label in the 
pod_template_file
+    # was overridden by the pod_override
+    assert ti_pod_yaml["metadata"]["labels"]["custom_label"] == "override"
+    assert ti_pod_yaml["metadata"]["annotations"]["test"] == "annotation"
+
+
 @mock.patch.dict(os.environ, {"AIRFLOW_IS_K8S_EXECUTOR_POD": "True"})
 @mock.patch.object(RenderedTaskInstanceFields, "get_k8s_pod_yaml")
 
@mock.patch("airflow.providers.cncf.kubernetes.template_rendering.render_k8s_pod_yaml")

Reply via email to