Repository: incubator-airflow
Updated Branches:
  refs/heads/master c3aa8e31f -> 72f15a108


[AIRFLOW-1960] Add support for secrets in kubernetes operator

Closes #3271 from ese/secrets-kubernetes-operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/72f15a10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/72f15a10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/72f15a10

Branch: refs/heads/master
Commit: 72f15a108e5556970229cb68d3b0968ee18db46e
Parents: c3aa8e3
Author: Sergio Ballesteros <sna...@locolandia.net>
Authored: Sun Apr 29 11:54:55 2018 +0200
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Sun Apr 29 11:54:55 2018 +0200

----------------------------------------------------------------------
 .../kubernetes_request_factory.py               | 11 +++++--
 airflow/contrib/kubernetes/secret.py            |  5 +++-
 .../operators/kubernetes_pod_operator.py        | 31 ++++++++++++++++++--
 3 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/72f15a10/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
----------------------------------------------------------------------
diff --git 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
index 6e8632f..12d05ec 100644
--- 
a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
+++ 
b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
@@ -84,7 +84,10 @@ class KubernetesRequestFactory:
 
     @staticmethod
     def attach_volumes(pod, req):
-        req['spec']['volumes'] = pod.volumes
+        req['spec']['volumes'] = (
+            req['spec'].get('volumes', []))
+        if len(pod.volumes) > 0:
+            req['spec']['volumes'].extend(pod.volumes)
 
     @staticmethod
     def attach_volume_mounts(pod, req):
@@ -101,8 +104,10 @@ class KubernetesRequestFactory:
     def extract_volume_secrets(pod, req):
         vol_secrets = [s for s in pod.secrets if s.deploy_type == 'volume']
         if any(vol_secrets):
-            req['spec']['containers'][0]['volumeMounts'] = []
-            req['spec']['volumes'] = []
+            req['spec']['containers'][0]['volumeMounts'] = (
+                req['spec']['containers'][0].get('volumeMounts', []))
+            req['spec']['volumes'] = (
+                req['spec'].get('volumes', []))
         for idx, vol in enumerate(vol_secrets):
             vol_id = 'secretvol' + str(idx)
             req['spec']['containers'][0]['volumeMounts'].append({

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/72f15a10/airflow/contrib/kubernetes/secret.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/secret.py 
b/airflow/contrib/kubernetes/secret.py
index 23bfacc..5c1038c 100644
--- a/airflow/contrib/kubernetes/secret.py
+++ b/airflow/contrib/kubernetes/secret.py
@@ -25,7 +25,8 @@ class Secret:
         :param deploy_type: The type of secret deploy in Kubernetes, either 
`env` or
             `volume`
         :type deploy_type: ``str``
-        :param deploy_target: The environment variable to be created in the 
worker.
+        :param deploy_target: The environment variable when `deploy_type` 
`env` or
+            file path when `deploy_type` `volume` where expose secret
         :type deploy_target: ``str``
         :param secret: Name of the secrets object in Kubernetes
         :type secret: ``str``
@@ -34,5 +35,7 @@ class Secret:
         """
         self.deploy_type = deploy_type
         self.deploy_target = deploy_target.upper()
+        if deploy_type == 'volume':
+            self.deploy_target = deploy_target
         self.secret = secret
         self.key = key

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/72f15a10/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py 
b/airflow/contrib/operators/kubernetes_pod_operator.py
index bcc3cad..9e95d8b 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -27,6 +27,31 @@ ui_color = '#ffefeb'
 
 
 class KubernetesPodOperator(BaseOperator):
+    """
+    Execute a task in a Kubernetes Pod
+
+    :param image: Docker image name
+    :type image: str
+    :param: namespace: namespace name where run the Pod
+    :type: namespace: str
+    :param cmds: entrypoint of the container
+    :type cmds: list
+    :param arguments: arguments of to the entrypoint.
+        The docker image's CMD is used if this is not provided.
+    :type arguments: list
+    :param labels: labels to apply to the Pod
+    :type labels: dict
+    :param startup_timeout_seconds: timeout in seconds to startup the pod
+    :type startup_timeout_seconds: int
+    :param name: name for the pod
+    :type name: str
+    :param secrets: Secrets to attach to the container
+    :type secrets: list
+    :param in_cluster: run kubernetes client with in_cluster configuration
+    :type in_cluster: bool
+    :param get_logs: get the stdout of the container as logs of the tasks
+    """
+
     def execute(self, context):
         try:
 
@@ -42,6 +67,8 @@ class KubernetesPodOperator(BaseOperator):
                 labels=self.labels
             )
 
+            pod.secrets = self.secrets
+
             launcher = pod_launcher.PodLauncher(client)
             final_state = launcher.run_pod(
                 pod,
@@ -59,15 +86,14 @@ class KubernetesPodOperator(BaseOperator):
                  cmds,
                  arguments,
                  name,
+                 secrets=None,
                  in_cluster=False,
                  labels=None,
                  startup_timeout_seconds=120,
-                 kube_executor_config=None,
                  get_logs=True,
                  *args,
                  **kwargs):
         super(KubernetesPodOperator, self).__init__(*args, **kwargs)
-        self.kube_executor_config = kube_executor_config or {}
         self.image = image
         self.namespace = namespace
         self.cmds = cmds
@@ -75,5 +101,6 @@ class KubernetesPodOperator(BaseOperator):
         self.labels = labels or {}
         self.startup_timeout_seconds = startup_timeout_seconds
         self.name = name
+        self.secrets = secrets or []
         self.in_cluster = in_cluster
         self.get_logs = get_logs

Reply via email to