Repository: incubator-airflow Updated Branches: refs/heads/master f77a93191 -> 11e670ddb
[AIRFLOW-2530] KubernetesOperator supports multiple clusters Closes #3425 from mrkm4ntr/airflow-2530 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/11e670dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/11e670dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/11e670dd Branch: refs/heads/master Commit: 11e670ddbce419489c798f26d3e94e7d3a00f5eb Parents: f77a931 Author: Shintaro Murakami <mrkm4...@gmail.com> Authored: Mon May 28 16:51:22 2018 +0200 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Mon May 28 16:51:22 2018 +0200 ---------------------------------------------------------------------- airflow/contrib/kubernetes/kube_client.py | 16 ++++++++++------ airflow/contrib/kubernetes/pod_launcher.py | 5 +++-- .../contrib/operators/kubernetes_pod_operator.py | 10 ++++++++-- 3 files changed, 21 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/11e670dd/airflow/contrib/kubernetes/kube_client.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/kube_client.py b/airflow/contrib/kubernetes/kube_client.py index 35e8410..7552e99 100644 --- a/airflow/contrib/kubernetes/kube_client.py +++ b/airflow/contrib/kubernetes/kube_client.py @@ -17,16 +17,20 @@ from airflow.configuration import conf -def _load_kube_config(in_cluster): +def _load_kube_config(in_cluster, cluster_context): from kubernetes import config, client if in_cluster: config.load_incluster_config() return client.CoreV1Api() else: - config.load_kube_config() - return client.CoreV1Api() + if cluster_context is None: + config.load_kube_config() + return client.CoreV1Api() + else: + return client.CoreV1Api( + api_client=config.new_client_from_config(context=cluster_context)) -def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster')): - # TODO: This should also allow people to point to a cluster. - return _load_kube_config(in_cluster) +def get_kube_client(in_cluster=conf.getboolean('kubernetes', 'in_cluster'), + cluster_context=None): + return _load_kube_config(in_cluster, cluster_context) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/11e670dd/airflow/contrib/kubernetes/pod_launcher.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/pod_launcher.py b/airflow/contrib/kubernetes/pod_launcher.py index f1467a9..c1c3f30 100644 --- a/airflow/contrib/kubernetes/pod_launcher.py +++ b/airflow/contrib/kubernetes/pod_launcher.py @@ -37,9 +37,10 @@ class PodStatus(object): class PodLauncher(LoggingMixin): - def __init__(self, kube_client=None, in_cluster=True): + def __init__(self, kube_client=None, in_cluster=True, cluster_context=None): super(PodLauncher, self).__init__() - self._client = kube_client or get_kube_client(in_cluster=in_cluster) + self._client = kube_client or get_kube_client(in_cluster=in_cluster, + cluster_context=cluster_context) self._watch = watch.Watch() self.kube_req_factory = pod_fac.SimplePodRequestFactory() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/11e670dd/airflow/contrib/operators/kubernetes_pod_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index 8e88b68..fa06b08 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -63,6 +63,9 @@ class KubernetesPodOperator(BaseOperator): :type secrets: list of Secret :param in_cluster: run kubernetes client with in_cluster configuration :type in_cluster: bool + :param cluster_context: context that points to kubernetes cluster. + Ignored when in_cluster is True. If None, current-context is used. + :type cluster_context: string :param get_logs: get the stdout of the container as logs of the tasks :type get_logs: bool :param affinity: A dict containing a group of affinity scheduling rules @@ -72,7 +75,8 @@ class KubernetesPodOperator(BaseOperator): def execute(self, context): try: - client = kube_client.get_kube_client(in_cluster=self.in_cluster) + client = kube_client.get_kube_client(in_cluster=self.in_cluster, + cluster_context=self.cluster_context) gen = pod_generator.PodGenerator() for mount in self.volume_mounts: @@ -96,7 +100,7 @@ class KubernetesPodOperator(BaseOperator): pod.resources = self.resources pod.affinity = self.affinity - launcher = pod_launcher.PodLauncher(client) + launcher = pod_launcher.PodLauncher(kube_client=client) final_state = launcher.run_pod( pod, startup_timeout=self.startup_timeout_seconds, @@ -120,6 +124,7 @@ class KubernetesPodOperator(BaseOperator): env_vars=None, secrets=None, in_cluster=False, + cluster_context=None, labels=None, startup_timeout_seconds=120, get_logs=True, @@ -142,6 +147,7 @@ class KubernetesPodOperator(BaseOperator): self.volumes = volumes or [] self.secrets = secrets or [] self.in_cluster = in_cluster + self.cluster_context = cluster_context self.get_logs = get_logs self.image_pull_policy = image_pull_policy self.annotations = annotations or {}