This is an automated email from the ASF dual-hosted git repository. dimberman pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v1-10-test by this push: new 7ebd913 [AIRFLOW-3152] Kubernetes Pod Operator should support init containers. (#6196) 7ebd913 is described below commit 7ebd91380114d372fe0d114a2189905a4ad8cf70 Author: Marina Pereira <46900505+marinajpere...@users.noreply.github.com> AuthorDate: Tue Dec 17 00:14:24 2019 -0500 [AIRFLOW-3152] Kubernetes Pod Operator should support init containers. (#6196) * Add support for init-containers to Kubernetes Pod Operator Enables start-up related code to be added for an app container in K8s Pod operator. Add new init_container resource that can be attached to the K8s Pod. * Update init_container and fix tests Fix the error in init_container and the associated tests. * Refactor and fix tests Fix tests for init_containers * Fix init_container test errors Remove unused mocks in init_container test * Fix init_container test errors Update volume mount object used in init_container test * Fix init_container test errors Add the missing volume setup for the init_container test. * Fix init_container test failure. Fix the expected result in the init_container test. * Fix init_container test failures Update expected results in the init_container tests * Update the KubernetesPodOperator guide Update the KubernetesPodOperator guide to document support for init containers * Update init-container tests Fix test failures casued due python versions by sorting the output before assert test. * Update init-container to use k8s V1Container object Remove custom object InitContainer. Allow users to pass List[k8s.V1Container] as init-container in K8sPodOperator * Add missing init_containers initalization in K8s pod operator Due to rebase from master, certain sections of the kubernetes_pod_operator.py file was refactored which led to missing init_containers initalization in K8s pod operator. Add missing init_containers initalization in K8s pod operator. Update kubernetes pod operator configurations in init container test. (cherry picked from commit 4e1b0aa7c52e79eeb99698e6190e6f39ca3cab7f) --- .../contrib/operators/kubernetes_pod_operator.py | 5 +- docs/howto/operator/kubernetes.rst | 155 +++++++++++++++++++++ 2 files changed, 159 insertions(+), 1 deletion(-) diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 41f0df3..e64a912 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -138,6 +138,8 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes. :type do_xcom_push: bool + :param init_containers: init container for the launched Pod + :type init_containers: list[kubernetes.client.models.V1Container] :param pod_template_file: path to pod template file :type pod_template_file: str """ @@ -178,11 +180,11 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- dnspolicy=None, schedulername=None, full_pod_spec=None, - init_containers=None, log_events_on_failure=False, do_xcom_push=False, pod_template_file=None, priority_class_name=None, + init_containers=None, *args, **kwargs): if kwargs.get('xcom_push') is not None: @@ -224,6 +226,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- self.schedulername = schedulername self.full_pod_spec = full_pod_spec self.init_containers = init_containers or [] + self.init_containers = init_containers or [] self.log_events_on_failure = log_events_on_failure self.pod_template_file = pod_template_file self.priority_class_name = priority_class_name diff --git a/docs/howto/operator/kubernetes.rst b/docs/howto/operator/kubernetes.rst new file mode 100644 index 0000000..f612cce --- /dev/null +++ b/docs/howto/operator/kubernetes.rst @@ -0,0 +1,155 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +.. _howto/operator:KubernetesPodOperator: + +Kubernetes Operator +=================== + +The :class:`airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator` allows you to create +Pods on Kubernetes. It works with any type of executor. + +.. code:: python + + import kubernetes.client.models as k8s + + from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator + from airflow.contrib.kubernetes.secret import Secret + from airflow.contrib.kubernetes.volume import Volume + from airflow.contrib.kubernetes.volume_mount import VolumeMount + from airflow.contrib.kubernetes.pod import Port + + + secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn') + secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn') + secret_all_keys = Secret('env', None, 'airflow-secrets-2') + volume_mount = VolumeMount('test-volume', + mount_path='/root/mount_file', + sub_path=None, + read_only=True) + port = Port('http', 80) + configmaps = ['test-configmap-1', 'test-configmap-2'] + + volume_config= { + 'persistentVolumeClaim': + { + 'claimName': 'test-volume' + } + } + volume = Volume(name='test-volume', configs=volume_config) + + init_container_volume_mounts = [k8s.V1VolumeMount( + mount_path='/etc/foo', + name='test-volume', + sub_path=None, + read_only=True + )] + + init_environments = [k8s.V1EnvVar( + name='key1', + value='value1' + ), k8s.V1EnvVar( + name='key2', + value='value2' + )] + + init_container = k8s.V1Container( + name="init-container", + image="ubuntu:16.04", + env=init_environments, + volume_mounts=init_container_volume_mounts, + command=["bash", "-cx"], + args=["echo 10"] + ) + + affinity = { + 'nodeAffinity': { + 'preferredDuringSchedulingIgnoredDuringExecution': [ + { + "weight": 1, + "preference": { + "matchExpressions": { + "key": "disktype", + "operator": "In", + "values": ["ssd"] + } + } + } + ] + }, + "podAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": [ + { + "labelSelector": { + "matchExpressions": [ + { + "key": "security", + "operator": "In", + "values": ["S1"] + } + ] + }, + "topologyKey": "failure-domain.beta.kubernetes.io/zone" + } + ] + }, + "podAntiAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": [ + { + "labelSelector": { + "matchExpressions": [ + { + "key": "security", + "operator": "In", + "values": ["S2"] + } + ] + }, + "topologyKey": "kubernetes.io/hostname" + } + ] + } + } + + tolerations = [ + { + 'key': "key", + 'operator': 'Equal', + 'value': 'value' + } + ] + + k = KubernetesPodOperator(namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + secrets=[secret_file, secret_env, secret_all_keys], + ports=[port] + volumes=[volume], + volume_mounts=[volume_mount], + name="test", + task_id="task", + affinity=affinity, + is_delete_operator_pod=True, + hostnetwork=False, + tolerations=tolerations, + configmaps=configmaps, + init_containers=[init_container], + )