Repository: incubator-airflow Updated Branches: refs/heads/master 3dade5413 -> 14a77a701
[AIRFLOW-2601] Allow user to specify k8s config Closes #3491 from Noremac201/config_kube Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/14a77a70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/14a77a70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/14a77a70 Branch: refs/heads/master Commit: 14a77a701e401d0e7d7cf258c6294048709f5b3c Parents: 3dade54 Author: Cameron Moberg <cjmob...@google.com> Authored: Mon Jun 18 22:29:19 2018 +0100 Committer: Kaxil Naik <kaxiln...@apache.org> Committed: Mon Jun 18 22:29:19 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/kubernetes/kube_client.py | 12 +++--- .../operators/kubernetes_pod_operator.py | 9 +++- .../minikube/test_kubernetes_pod_operator.py | 44 ++++++++++++++++++++ 3 files changed, 58 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/14a77a70/airflow/contrib/kubernetes/kube_client.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py index 7552e99..be8d56e 100644 --- a/airflow/contrib/kubernetes/kube_client.py +++ b/airflow/contrib/kubernetes/kube_client.py @@ -17,20 +17,22 @@ from airflow.configuration import conf -def _load_kube_config(in_cluster, cluster_context): +def _load_kube_config(in_cluster, cluster_context, config_file): from kubernetes import config, client if in_cluster: config.load_incluster_config() return client.CoreV1Api() else: - if cluster_context is None: + if cluster_context is None and config_file is None: config.load_kube_config() return client.CoreV1Api() else: return client.CoreV1Api( - api_client=config.new_client_from_config(context=cluster_context)) + api_client=config.new_client_from_config(config_file=config_file, + context=cluster_context)) def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster'), - cluster_context=None): - return _load_kube_config(in_cluster, cluster_context) + cluster_context=None, + config_file=None): + return _load_kube_config(in_cluster, cluster_context, config_file) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/14a77a70/airflow/contrib/operators/kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index fa06b08..3e4b8a3 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -70,13 +70,16 @@ class KubernetesPodOperator(BaseOperator): :type get_logs: bool :param affinity: A dict containing a group of affinity scheduling rules :type affinity: dict + :param config_file: The path to the Kubernetes config file + :type config_file: str """ - template_fields = ('cmds', 'arguments', 'env_vars') + template_fields = ('cmds', 'arguments', 'env_vars', 'config_file') def execute(self, context): try: client = kube_client.get_kube_client(in_cluster=self.in_cluster, - cluster_context=self.cluster_context) + cluster_context=self.cluster_context, + config_file=self.config_file) gen = pod_generator.PodGenerator() for mount in self.volume_mounts: @@ -132,6 +135,7 @@ class KubernetesPodOperator(BaseOperator): annotations=None, resources=None, affinity=None, + config_file=None, *args, **kwargs): super(KubernetesPodOperator, self).__init__(*args, **kwargs) @@ -153,3 +157,4 @@ class KubernetesPodOperator(BaseOperator): self.annotations = annotations or {} self.affinity = affinity or {} self.resources = resources or Resources() + self.config_file = config_file http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/14a77a70/tests/contrib/minikube/test_kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/minikube/test_kubernetes_pod_operator.py b/tests/contrib/minikube/test_kubernetes_pod_operator.py index 8d888a3..4cfa3b4 100644 --- a/tests/contrib/minikube/test_kubernetes_pod_operator.py +++ b/tests/contrib/minikube/test_kubernetes_pod_operator.py @@ -16,6 +16,8 @@ # under the License. import unittest +import os +import shutil from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow import AirflowException from subprocess import check_call @@ -34,6 +36,48 @@ except Exception as e: class KubernetesPodOperatorTest(unittest.TestCase): + + def test_config_path_move(self): + new_config_path = '/tmp/kube_config' + old_config_path = os.path.expanduser('~/.kube/config') + shutil.copy(old_config_path, new_config_path) + + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + config_file=new_config_path + ) + k.execute(None) + + @mock.patch("airflow.contrib.kubernetes.pod_launcher.PodLauncher.run_pod") + @mock.patch("airflow.contrib.kubernetes.kube_client.get_kube_client") + def test_config_path(self, client_mock, launcher_mock): + from airflow.utils.state import State + + file_path = "/tmp/fake_file" + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + config_file=file_path, + in_cluster=False, + cluster_context='default' + ) + launcher_mock.return_value = State.SUCCESS + k.execute(None) + client_mock.assert_called_with(in_cluster=False, + cluster_context='default', + config_file=file_path) + def test_working_pod(self): k = KubernetesPodOperator( namespace='default',