This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new aefadb8c5b Allow xcom sidecar container image to be configurable in
KPO (#26766)
aefadb8c5b is described below
commit aefadb8c5b9272613d5806b054a1b46edf29d82e
Author: Dov Benyomin Sohacheski <[email protected]>
AuthorDate: Wed Nov 9 08:16:53 2022 +0200
Allow xcom sidecar container image to be configurable in KPO (#26766)
---
.../providers/cncf/kubernetes/hooks/kubernetes.py | 7 +++++++
.../cncf/kubernetes/operators/kubernetes_pod.py | 4 +++-
.../cncf/kubernetes/utils/xcom_sidecar.py | 6 ++++--
.../connections/kubernetes.rst | 4 ++++
kubernetes_tests/test_kubernetes_pod_operator.py | 1 +
.../cncf/kubernetes/hooks/test_kubernetes.py | 13 +++++++++++++
.../kubernetes/operators/test_kubernetes_pod.py | 22 ++++++++++++++++++++++
7 files changed, 54 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index d093567bca..85b76d5f52 100644
--- a/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ b/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -91,6 +91,9 @@ class KubernetesHook(BaseHook):
"cluster_context": StringField(lazy_gettext("Cluster context"),
widget=BS3TextFieldWidget()),
"disable_verify_ssl": BooleanField(lazy_gettext("Disable SSL")),
"disable_tcp_keepalive": BooleanField(lazy_gettext("Disable TCP
keepalive")),
+ "xcom_sidecar_container_image": StringField(
+ lazy_gettext("XCom sidecar image"), widget=BS3TextFieldWidget()
+ ),
}
@staticmethod
@@ -341,6 +344,10 @@ class KubernetesHook(BaseHook):
return self._get_field("namespace")
return None
+ def get_xcom_sidecar_container_image(self):
+ """Returns the xcom sidecar image that defined in the connection"""
+ return self._get_field("xcom_sidecar_container_image")
+
def get_pod_log_stream(
self,
pod_name: str,
diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 785497c390..b22d0780c7 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -607,7 +607,9 @@ class KubernetesPodOperator(BaseOperator):
pod = secret.attach_to_pod(pod)
if self.do_xcom_push:
self.log.debug("Adding xcom sidecar to task %s", self.task_id)
- pod = xcom_sidecar.add_xcom_sidecar(pod)
+ pod = xcom_sidecar.add_xcom_sidecar(
+ pod,
sidecar_container_image=self.hook.get_xcom_sidecar_container_image()
+ )
labels = self._get_ti_pod_labels(context)
self.log.info("Building pod %s with labels: %s", pod.metadata.name,
labels)
diff --git a/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py
b/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py
index ed6d524ca3..81b3047993 100644
--- a/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py
+++ b/airflow/providers/cncf/kubernetes/utils/xcom_sidecar.py
@@ -47,13 +47,15 @@ class PodDefaults:
)
-def add_xcom_sidecar(pod: k8s.V1Pod) -> k8s.V1Pod:
+def add_xcom_sidecar(pod: k8s.V1Pod, *, sidecar_container_image=None) ->
k8s.V1Pod:
"""Adds sidecar"""
pod_cp = copy.deepcopy(pod)
pod_cp.spec.volumes = pod.spec.volumes or []
pod_cp.spec.volumes.insert(0, PodDefaults.VOLUME)
pod_cp.spec.containers[0].volume_mounts =
pod_cp.spec.containers[0].volume_mounts or []
pod_cp.spec.containers[0].volume_mounts.insert(0, PodDefaults.VOLUME_MOUNT)
- pod_cp.spec.containers.append(PodDefaults.SIDECAR_CONTAINER)
+ sidecar = copy.deepcopy(PodDefaults.SIDECAR_CONTAINER)
+ sidecar.image = sidecar_container_image or
PodDefaults.SIDECAR_CONTAINER.image
+ pod_cp.spec.containers.append(sidecar)
return pod_cp
diff --git
a/docs/apache-airflow-providers-cncf-kubernetes/connections/kubernetes.rst
b/docs/apache-airflow-providers-cncf-kubernetes/connections/kubernetes.rst
index edba4f4c2e..fdad100cc7 100644
--- a/docs/apache-airflow-providers-cncf-kubernetes/connections/kubernetes.rst
+++ b/docs/apache-airflow-providers-cncf-kubernetes/connections/kubernetes.rst
@@ -68,6 +68,10 @@ Disable TCP keepalive
TCP keepalive is a feature (enabled by default) that tries to keep
long-running connections
alive. Set this parameter to True to disable this feature.
+Xcom sidecar image
+ Define the ``image`` used by the ``PodDefaults.SIDECAR_CONTAINER`` (defaults
to ``"alpine"``) to allow private
+ repositories, as well as custom image overrides.
+
Example storing connection in env var using URI format:
.. code-block:: bash
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 6bb5c9ec0a..d11365e0ca 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -921,6 +921,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
# todo: This isn't really a system test
await_xcom_sidecar_container_start_mock.return_value = None
hook_mock.return_value.is_in_cluster = False
+ hook_mock.return_value.get_xcom_sidecar_container_image.return_value =
None
extract_xcom_mock.return_value = "{}"
path = sys.path[0] + "/tests/kubernetes/pod.yaml"
k = KubernetesPodOperator(
diff --git a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
index e15648a5e9..e0e1ec99f4 100644
--- a/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
+++ b/tests/providers/cncf/kubernetes/hooks/test_kubernetes.py
@@ -62,6 +62,8 @@ class TestKubernetesHook:
("disable_verify_ssl_empty", {"disable_verify_ssl": ""}),
("disable_tcp_keepalive", {"disable_tcp_keepalive": True}),
("disable_tcp_keepalive_empty", {"disable_tcp_keepalive": ""}),
+ ("sidecar_container_image", {"xcom_sidecar_container_image":
"private.repo.com/alpine:3.16"}),
+ ("sidecar_container_image_empty", {"xcom_sidecar_container_image":
""}),
]:
db.merge_conn(Connection(conn_type="kubernetes", conn_id=conn_id,
extra=json.dumps(extra)))
@@ -316,6 +318,17 @@ class TestKubernetesHook:
"and rename _get_namespace to get_namespace."
)
+ @pytest.mark.parametrize(
+ "conn_id, expected",
+ (
+ pytest.param("sidecar_container_image",
"private.repo.com/alpine:3.16", id="sidecar-with-image"),
+ pytest.param("sidecar_container_image_empty", None,
id="sidecar-without-image"),
+ ),
+ )
+ def test_get_xcom_sidecar_container_image(self, conn_id, expected):
+ hook = KubernetesHook(conn_id=conn_id)
+ assert hook.get_xcom_sidecar_container_image() == expected
+
@patch("kubernetes.config.kube_config.KubeConfigLoader")
@patch("kubernetes.config.kube_config.KubeConfigMerger")
def test_client_types(self, mock_kube_config_merger,
mock_kube_config_loader):
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index 824c979565..f3d462f3c2 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -415,6 +415,28 @@ class TestKubernetesPodOperator:
)
mock_find.assert_called_once_with("default", context=context)
+ @patch(HOOK_CLASS)
+ def test_xcom_sidecar_container_image_default(self, hook_mock):
+ hook_mock.return_value.get_xcom_sidecar_container_image.return_value =
None
+ k = KubernetesPodOperator(
+ name="test",
+ task_id="task",
+ do_xcom_push=True,
+ )
+ pod = k.build_pod_request_obj(create_context(k))
+ assert pod.spec.containers[1].image == "alpine"
+
+ @patch(HOOK_CLASS)
+ def test_xcom_sidecar_container_image_custom(self, hook_mock):
+ hook_mock.return_value.get_xcom_sidecar_container_image.return_value =
"private.repo/alpine:3.13"
+ k = KubernetesPodOperator(
+ name="test",
+ task_id="task",
+ do_xcom_push=True,
+ )
+ pod = k.build_pod_request_obj(create_context(k))
+ assert pod.spec.containers[1].image == "private.repo/alpine:3.13"
+
def test_image_pull_policy_correctly_set(self):
k = KubernetesPodOperator(
task_id="task",