Repository: incubator-airflow Updated Branches: refs/heads/master c3aa8e31f -> 72f15a108
[AIRFLOW-1960] Add support for secrets in kubernetes operator Closes #3271 from ese/secrets-kubernetes-operator Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/72f15a10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/72f15a10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/72f15a10 Branch: refs/heads/master Commit: 72f15a108e5556970229cb68d3b0968ee18db46e Parents: c3aa8e3 Author: Sergio Ballesteros <sna...@locolandia.net> Authored: Sun Apr 29 11:54:55 2018 +0200 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Sun Apr 29 11:54:55 2018 +0200 ---------------------------------------------------------------------- .../kubernetes_request_factory.py | 11 +++++-- airflow/contrib/kubernetes/secret.py | 5 +++- .../operators/kubernetes_pod_operator.py | 31 ++++++++++++++++++-- 3 files changed, 41 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/72f15a10/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index 6e8632f..12d05ec 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -84,7 +84,10 @@ class KubernetesRequestFactory: @staticmethod def attach_volumes(pod, req): - req['spec']['volumes'] = pod.volumes + req['spec']['volumes'] = ( + req['spec'].get('volumes', [])) + if len(pod.volumes) > 0: + req['spec']['volumes'].extend(pod.volumes) @staticmethod def attach_volume_mounts(pod, req): @@ -101,8 +104,10 @@ class KubernetesRequestFactory: def extract_volume_secrets(pod, req): vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume'] if any(vol_secrets): - req['spec']['containers'][0]['volumeMounts'] = [] - req['spec']['volumes'] = [] + req['spec']['containers'][0]['volumeMounts'] = ( + req['spec']['containers'][0].get('volumeMounts', [])) + req['spec']['volumes'] = ( + req['spec'].get('volumes', [])) for idx, vol in enumerate(vol_secrets): vol_id = 'secretvol' + str(idx) req['spec']['containers'][0]['volumeMounts'].append({ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/72f15a10/airflow/contrib/kubernetes/secret.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/secret.py b/airflow/contrib/kubernetes/secret.py index 23bfacc..5c1038c 100644 --- a/airflow/contrib/kubernetes/secret.py +++ b/airflow/contrib/kubernetes/secret.py @@ -25,7 +25,8 @@ class Secret: :param deploy_type: The type of secret deploy in Kubernetes, either `env` or `volume` :type deploy_type: ``str`` - :param deploy_target: The environment variable to be created in the worker. + :param deploy_target: The environment variable when `deploy_type` `env` or + file path when `deploy_type` `volume` where expose secret :type deploy_target: ``str`` :param secret: Name of the secrets object in Kubernetes :type secret: ``str`` @@ -34,5 +35,7 @@ class Secret: """ self.deploy_type = deploy_type self.deploy_target = deploy_target.upper() + if deploy_type == 'volume': + self.deploy_target = deploy_target self.secret = secret self.key = key http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/72f15a10/airflow/contrib/operators/kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index bcc3cad..9e95d8b 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -27,6 +27,31 @@ ui_color = '#ffefeb' class KubernetesPodOperator(BaseOperator): + """ + Execute a task in a Kubernetes Pod + + :param image: Docker image name + :type image: str + :param: namespace: namespace name where run the Pod + :type: namespace: str + :param cmds: entrypoint of the container + :type cmds: list + :param arguments: arguments of to the entrypoint. + The docker image's CMD is used if this is not provided. + :type arguments: list + :param labels: labels to apply to the Pod + :type labels: dict + :param startup_timeout_seconds: timeout in seconds to startup the pod + :type startup_timeout_seconds: int + :param name: name for the pod + :type name: str + :param secrets: Secrets to attach to the container + :type secrets: list + :param in_cluster: run kubernetes client with in_cluster configuration + :type in_cluster: bool + :param get_logs: get the stdout of the container as logs of the tasks + """ + def execute(self, context): try: @@ -42,6 +67,8 @@ class KubernetesPodOperator(BaseOperator): labels=self.labels ) + pod.secrets = self.secrets + launcher = pod_launcher.PodLauncher(client) final_state = launcher.run_pod( pod, @@ -59,15 +86,14 @@ class KubernetesPodOperator(BaseOperator): cmds, arguments, name, + secrets=None, in_cluster=False, labels=None, startup_timeout_seconds=120, - kube_executor_config=None, get_logs=True, *args, **kwargs): super(KubernetesPodOperator, self).__init__(*args, **kwargs) - self.kube_executor_config = kube_executor_config or {} self.image = image self.namespace = namespace self.cmds = cmds @@ -75,5 +101,6 @@ class KubernetesPodOperator(BaseOperator): self.labels = labels or {} self.startup_timeout_seconds = startup_timeout_seconds self.name = name + self.secrets = secrets or [] self.in_cluster = in_cluster self.get_logs = get_logs