This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aefadb8c5b Allow xcom sidecar container image to be configurable in 
KPO (#26766)
aefadb8c5b is described below

commit aefadb8c5b9272613d5806b054a1b46edf29d82e
Author: Dov Benyomin Sohacheski <[email protected]>
AuthorDate: Wed Nov 9 08:16:53 2022 +0200

    Allow xcom sidecar container image to be configurable in KPO (#26766)
---
 .../providers/cncf/kubernetes/hooks/kubernetes.py  |  7 +++++++
 .../cncf/kubernetes/operators/kubernetes_pod.py    |  4 +++-
 .../cncf/kubernetes/utils/xcom_sidecar.py          |  6 ++++--
 .../connections/kubernetes.rst                     |  4 ++++
 kubernetes_tests/test_kubernetes_pod_operator.py   |  1 +
 .../cncf/kubernetes/hooks/test_kubernetes.py       | 13 +++++++++++++
 .../kubernetes/operators/test_kubernetes_pod.py    | 22 ++++++++++++++++++++++
 7 files changed, 54 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py 
b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index d093567bca..85b76d5f52 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -91,6 +91,9 @@ class KubernetesHook(BaseHook):
             "cluster_context": StringField(lazy_gettext("Cluster context"), 
widget=BS3TextFieldWidget()),
             "disable_verify_ssl": BooleanField(lazy_gettext("Disable SSL")),
             "disable_tcp_keepalive": BooleanField(lazy_gettext("Disable TCP 
keepalive")),
+            "xcom_sidecar_container_image": StringField(
+                lazy_gettext("XCom sidecar image"), widget=BS3TextFieldWidget()
+            ),
         }
 
     @staticmethod
@@ -341,6 +344,10 @@ class KubernetesHook(BaseHook):
             return self._get_field("namespace")
         return None
 
+    def get_xcom_sidecar_container_image(self):
+        """Returns the xcom sidecar image that defined in the connection"""
+        return self._get_field("xcom_sidecar_container_image")
+
     def get_pod_log_stream(
         self,
         pod_name: str,
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py 
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 785497c390..b22d0780c7 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -607,7 +607,9 @@ class KubernetesPodOperator(BaseOperator):
             pod = secret.attach_to_pod(pod)
         if self.do_xcom_push:
             self.log.debug("Adding xcom sidecar to task %s", self.task_id)
-            pod = xcom_sidecar.add_xcom_sidecar(pod)
+            pod = xcom_sidecar.add_xcom_sidecar(
+                pod, 
sidecar_container_image=self.hook.get_xcom_sidecar_container_image()
+            )
 
         labels = self._get_ti_pod_labels(context)
         self.log.info("Building pod %s with labels: %s", pod.metadata.name, 
labels)
diff --git a/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py 
b/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py
index ed6d524ca3..81b3047993 100644
--- a/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py
+++ b/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py
@@ -47,13 +47,15 @@ class PodDefaults:
     )
 
 
-def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod:
+def add_xcom_sidecar(pod: k8s.V1Pod, *, sidecar_container_image=None) -> 
k8s.V1Pod:
     """Adds sidecar"""
     pod_cp = copy.deepcopy(pod)
     pod_cp.spec.volumes = pod.spec.volumes or []
     pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
     pod_cp.spec.containers[0].volume_mounts = 
pod_cp.spec.containers[0].volume_mounts or []
     pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
-    pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER)
+    sidecar = copy.deepcopy(PodDefaults.SIDECAR_CONTAINER)
+    sidecar.image = sidecar_container_image or 
PodDefaults.SIDECAR_CONTAINER.image
+    pod_cp.spec.containers.append(sidecar)
 
     return pod_cp
diff --git 
a/docs/apache-airflow-providers-cncf-kubernetes/connections/kubernetes.rst 
b/docs/apache-airflow-providers-cncf-kubernetes/connections/kubernetes.rst
index edba4f4c2e..fdad100cc7 100644
--- a/docs/apache-airflow-providers-cncf-kubernetes/connections/kubernetes.rst
+++ b/docs/apache-airflow-providers-cncf-kubernetes/connections/kubernetes.rst
@@ -68,6 +68,10 @@ Disable TCP keepalive
   TCP keepalive is a feature (enabled by default) that tries to keep 
long-running connections
   alive. Set this parameter to True to disable this feature.
 
+Xcom sidecar image
+  Define the ``image`` used by the ``PodDefaults.SIDECAR_CONTAINER`` (defaults 
to ``"alpine"``) to allow private
+  repositories, as well as custom image overrides.
+
 Example storing connection in env var using URI format:
 
 .. code-block:: bash
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 6bb5c9ec0a..d11365e0ca 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -921,6 +921,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         # todo: This isn't really a system test
         await_xcom_sidecar_container_start_mock.return_value = None
         hook_mock.return_value.is_in_cluster = False
+        hook_mock.return_value.get_xcom_sidecar_container_image.return_value = 
None
         extract_xcom_mock.return_value = "{}"
         path = sys.path[0] + "/tests/kubernetes/pod.yaml"
         k = KubernetesPodOperator(
diff --git a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py 
b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
index e15648a5e9..e0e1ec99f4 100644
--- a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
@@ -62,6 +62,8 @@ class TestKubernetesHook:
             ("disable_verify_ssl_empty", {"disable_verify_ssl": ""}),
             ("disable_tcp_keepalive", {"disable_tcp_keepalive": True}),
             ("disable_tcp_keepalive_empty", {"disable_tcp_keepalive": ""}),
+            ("sidecar_container_image", {"xcom_sidecar_container_image": 
"private.repo.com/alpine:3.16"}),
+            ("sidecar_container_image_empty", {"xcom_sidecar_container_image": 
""}),
         ]:
             db.merge_conn(Connection(conn_type="kubernetes", conn_id=conn_id, 
extra=json.dumps(extra)))
 
@@ -316,6 +318,17 @@ class TestKubernetesHook:
                 "and rename _get_namespace to get_namespace."
             )
 
+    @pytest.mark.parametrize(
+        "conn_id, expected",
+        (
+            pytest.param("sidecar_container_image", 
"private.repo.com/alpine:3.16", id="sidecar-with-image"),
+            pytest.param("sidecar_container_image_empty", None, 
id="sidecar-without-image"),
+        ),
+    )
+    def test_get_xcom_sidecar_container_image(self, conn_id, expected):
+        hook = KubernetesHook(conn_id=conn_id)
+        assert hook.get_xcom_sidecar_container_image() == expected
+
     @patch("kubernetes.config.kube_config.KubeConfigLoader")
     @patch("kubernetes.config.kube_config.KubeConfigMerger")
     def test_client_types(self, mock_kube_config_merger, 
mock_kube_config_loader):
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index 824c979565..f3d462f3c2 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -415,6 +415,28 @@ class TestKubernetesPodOperator:
         )
         mock_find.assert_called_once_with("default", context=context)
 
+    @patch(HOOK_CLASS)
+    def test_xcom_sidecar_container_image_default(self, hook_mock):
+        hook_mock.return_value.get_xcom_sidecar_container_image.return_value = 
None
+        k = KubernetesPodOperator(
+            name="test",
+            task_id="task",
+            do_xcom_push=True,
+        )
+        pod = k.build_pod_request_obj(create_context(k))
+        assert pod.spec.containers[1].image == "alpine"
+
+    @patch(HOOK_CLASS)
+    def test_xcom_sidecar_container_image_custom(self, hook_mock):
+        hook_mock.return_value.get_xcom_sidecar_container_image.return_value = 
"private.repo/alpine:3.13"
+        k = KubernetesPodOperator(
+            name="test",
+            task_id="task",
+            do_xcom_push=True,
+        )
+        pod = k.build_pod_request_obj(create_context(k))
+        assert pod.spec.containers[1].image == "private.repo/alpine:3.13"
+
     def test_image_pull_policy_correctly_set(self):
         k = KubernetesPodOperator(
             task_id="task",

Reply via email to