Repository: incubator-airflow
Updated Branches:
  refs/heads/master 3dade5413 -> 14a77a701


[AIRFLOW-2601] Allow user to specify k8s config

Closes #3491 from Noremac201/config_kube


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/14a77a70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/14a77a70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/14a77a70

Branch: refs/heads/master
Commit: 14a77a701e401d0e7d7cf258c6294048709f5b3c
Parents: 3dade54
Author: Cameron Moberg <cjmob...@google.com>
Authored: Mon Jun 18 22:29:19 2018 +0100
Committer: Kaxil Naik <kaxiln...@apache.org>
Committed: Mon Jun 18 22:29:19 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/kubernetes/kube_client.py       | 12 +++---
 .../operators/kubernetes_pod_operator.py        |  9 +++-
 .../minikube/test_kubernetes_pod_operator.py    | 44 ++++++++++++++++++++
 3 files changed, 58 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/14a77a70/airflow/contrib/kubernetes/kube_client.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/kube_client.py 
b/airflow/contrib/kubernetes/kube_client.py
index 7552e99..be8d56e 100644
--- a/airflow/contrib/kubernetes/kube_client.py
+++ b/airflow/contrib/kubernetes/kube_client.py
@@ -17,20 +17,22 @@
 from airflow.configuration import conf
 
 
-def _load_kube_config(in_cluster, cluster_context):
+def _load_kube_config(in_cluster, cluster_context, config_file):
     from kubernetes import config, client
     if in_cluster:
         config.load_incluster_config()
         return client.CoreV1Api()
     else:
-        if cluster_context is None:
+        if cluster_context is None and config_file is None:
             config.load_kube_config()
             return client.CoreV1Api()
         else:
             return client.CoreV1Api(
-                
api_client=config.new_client_from_config(context=cluster_context))
+                
api_client=config.new_client_from_config(config_file=config_file,
+                                                         
context=cluster_context))
 
 
 def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster'),
-                    cluster_context=None):
-    return _load_kube_config(in_cluster, cluster_context)
+                    cluster_context=None,
+                    config_file=None):
+    return _load_kube_config(in_cluster, cluster_context, config_file)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/14a77a70/airflow/contrib/operators/kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py 
b/airflow/contrib/operators/kubernetes_pod_operator.py
index fa06b08..3e4b8a3 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -70,13 +70,16 @@ class KubernetesPodOperator(BaseOperator):
     :type get_logs: bool
     :param affinity: A dict containing a group of affinity scheduling rules
     :type affinity: dict
+    :param config_file: The path to the Kubernetes config file
+    :type config_file: str
     """
-    template_fields = ('cmds', 'arguments', 'env_vars')
+    template_fields = ('cmds', 'arguments', 'env_vars', 'config_file')
 
     def execute(self, context):
         try:
             client = kube_client.get_kube_client(in_cluster=self.in_cluster,
-                                                 
cluster_context=self.cluster_context)
+                                                 
cluster_context=self.cluster_context,
+                                                 config_file=self.config_file)
             gen = pod_generator.PodGenerator()
 
             for mount in self.volume_mounts:
@@ -132,6 +135,7 @@ class KubernetesPodOperator(BaseOperator):
                  annotations=None,
                  resources=None,
                  affinity=None,
+                 config_file=None,
                  *args,
                  **kwargs):
         super(KubernetesPodOperator, self).__init__(*args, **kwargs)
@@ -153,3 +157,4 @@ class KubernetesPodOperator(BaseOperator):
         self.annotations = annotations or {}
         self.affinity = affinity or {}
         self.resources = resources or Resources()
+        self.config_file = config_file

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/14a77a70/tests/contrib/minikube/test_kubernetes_pod_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py 
b/tests/contrib/minikube/test_kubernetes_pod_operator.py
index 8d888a3..4cfa3b4 100644
--- a/tests/contrib/minikube/test_kubernetes_pod_operator.py
+++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py
@@ -16,6 +16,8 @@
 # under the License.
 
 import unittest
+import os
+import shutil
 from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
 from airflow import AirflowException
 from subprocess import check_call
@@ -34,6 +36,48 @@ except Exception as e:
 
 
 class KubernetesPodOperatorTest(unittest.TestCase):
+
+    def test_config_path_move(self):
+        new_config_path = '/tmp/kube_config'
+        old_config_path = os.path.expanduser('~/.kube/config')
+        shutil.copy(old_config_path, new_config_path)
+
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            config_file=new_config_path
+        )
+        k.execute(None)
+
+    @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod")
+    @mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client")
+    def test_config_path(self, client_mock, launcher_mock):
+        from airflow.utils.state import State
+
+        file_path = "/tmp/fake_file"
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id="task",
+            config_file=file_path,
+            in_cluster=False,
+            cluster_context='default'
+        )
+        launcher_mock.return_value = State.SUCCESS
+        k.execute(None)
+        client_mock.assert_called_with(in_cluster=False,
+                                       cluster_context='default',
+                                       config_file=file_path)
+
     def test_working_pod(self):
         k = KubernetesPodOperator(
             namespace='default',

Reply via email to