This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit d8db36a1b1bee478af4b4ca3e8ac6471da557a96
Author: Jarek Potiuk <jarek.pot...@polidea.com>
AuthorDate: Wed Nov 11 17:15:02 2020 +0100

    Added k9s as integrated tool to help with kubernetes testing (#12163)
    
    The K9s is fantastic tool that helps to debug a running k8s
    instance. It is terminal-based windowed CLI that makes you
    several times more productive comparing to using kubectl
    commands. We've integrated k9s (it is run as a docker container
    and downloaded on demand). We've also separated out KUBECONFIG
    of the integrated kind cluster so that it does not mess with
    kubernetes configuration you might already have.
    
    Also - together with that the "surrounding" of the kubernetes
    tests were simplified and improved so that the k9s integration
    can be utilized well. Instead of kubectl port forwarding (which
    caused multitude of problems) we are now utilizing kind's
    portMapping feature + custom NodePort resource that maps
    port 8080 to 30007 NodePort which in turn maps it to 8080
    port of the Webserver. This way we do not have to establish
    an external kubectl port forward which is prone to error and
    management - everything is brought up when Airflow gets
    deployed to the Kind Cluster and shuts down when the Kind
    cluster is stopped.
    
    Yet another problem fixed was killing of postgres by one of the
    kubernetes tests ('test_integration_run_dag_with_scheduler_failure').
    Instead of just killing the scheduler it killed all pods - including
    the Postgres one (it was named 'airflow-postgres.*'). That caused
    various problems, as the database could be left in a strange state.
    I changed the tests to do what it claimed was doing - so killing only the
    scheduler during the test. This seemed to improve the stability
    of tests immensely in my local setup.
    
    (cherry picked from commit 21999dd56e9dbe9f7f9e25961954c5677c3c7c58)
---
 .github/workflows/ci.yml                           |  17 +-
 BREEZE.rst                                         |  11 +-
 TESTING.rst                                        | 151 ++++-
 breeze                                             |   9 +
 breeze-complete                                    |   5 +-
 chart/requirements.lock                            |   4 +-
 images/testing/k9s.png                             | Bin 0 -> 238713 bytes
 images/testing/kubeconfig-env.png                  | Bin 0 -> 231280 bytes
 images/testing/kubernetes-virtualenv.png           | Bin 0 -> 110011 bytes
 images/testing/pytest-runner.png                   | Bin 0 -> 131589 bytes
 images/testing/run-test.png                        | Bin 0 -> 140728 bytes
 kubernetes_tests/test_kubernetes_executor.py       |   7 +-
 kubernetes_tests/test_kubernetes_pod_operator.py   | 672 ++++++++-------------
 scripts/ci/kubernetes/ci_run_kubernetes_tests.sh   |   7 +-
 ...up_cluster_and_deploy_airflow_to_kubernetes.sh} |   3 +-
 scripts/ci/kubernetes/kind-cluster-conf.yaml       |   5 +
 .../{kind-cluster-conf.yaml => nodeport.yaml}      |  30 +-
 ...oy_app_to_kubernetes.sh => redeploy_airflow.sh} |   6 +-
 scripts/ci/libraries/_kind.sh                      | 126 ++--
 19 files changed, 516 insertions(+), 537 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 6c854a1..81890a7 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -636,23 +636,14 @@ jobs:
           python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
       - name: "Free space"
         run: ./scripts/ci/tools/ci_free_space_on_ci.sh
-      - name: "Setup Kind Cluster ${{ env.KIND_VERSION }}"
-        uses: engineerd/setup-kind@v0.4.0
-        with:
-          version: "${{ env.KIND_VERSION }}"
-          name: 
airflow-python-${{matrix.python-version}}-${{matrix.kubernetes-version}}
-          config: "scripts/ci/kubernetes/kind-cluster-conf.yaml"
       - name: "Prepare PROD Image"
         run: ./scripts/ci/images/ci_prepare_prod_image_on_ci.sh
-      - name: "Deploy airflow to cluster"
-        id: deploy-app
-        run: ./scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh
+      - name: "Setup cluster and deploy Airflow"
+        id: setp-cluster-deploy-app
+        run: 
./scripts/ci/kubernetes/ci_setup_cluster_and_deploy_airflow_to_kubernetes.sh
         env:
           # We have the right image pulled already by the previous step
           SKIP_BUILDING_PROD_IMAGE: "true"
-          # due to some instabilities, in CI we try to increase port numbers 
when trying to establish
-          # port forwarding
-          INCREASE_PORT_NUMBER_FOR_KUBERNETES: "true"
       - name: "Cache virtualenv for kubernetes testing"
         uses: actions/cache@v2
         env:
@@ -669,7 +660,7 @@ jobs:
           key: "${{ env.cache-name }}-${{ github.job }}-${{ 
hashFiles('setup.py') }}\
 -${{ needs.build-info.outputs.defaultKindVersion }}\
 -${{ needs.build-info.outputs.defaultHelmVersion }}\
--$${{ matrix.kubernetes-version }}"
+-${{ matrix.kubernetes-version }}"
       - name: "Kubernetes Tests"
         run: ./scripts/ci/kubernetes/ci_run_kubernetes_tests.sh
       - name: "Upload KinD logs"
diff --git a/BREEZE.rst b/BREEZE.rst
index c3c2d95..ce7dc6a 100644
--- a/BREEZE.rst
+++ b/BREEZE.rst
@@ -1188,6 +1188,7 @@ This is the current syntax for  `./breeze <./breeze>`_:
           image building time in production image and at container entering 
time for CI image. One of:
 
                  1.10.12 1.10.11 1.10.10 1.10.9 1.10.8 1.10.7 1.10.6 1.10.5 
1.10.4 1.10.3 1.10.2
+                 wheel
 
   -t, --install-airflow-reference INSTALL_AIRFLOW_REFERENCE
           If specified, installs Airflow directly from reference in GitHub. 
This happens at
@@ -1712,7 +1713,14 @@ This is the current syntax for  `./breeze <./breeze>`_:
         to the cluster so you can also pass appropriate build image flags that 
will influence
         rebuilding the production image. Operation is one of:
 
-                 start stop restart status deploy test shell
+                 start stop restart status deploy test shell k9s
+
+        The last two operations - shell and k9s allow you to perform 
interactive testing with
+        kubernetes tests. You can enter the shell from which you can run 
kubernetes tests and in
+        another terminal you can start the k9s CLI to debug kubernetes 
instance. It is an easy
+        way to debug the kubernetes deployments.
+
+        You can read more about k9s at https://k9scli.io/
 
   Flags:
 
@@ -2087,6 +2095,7 @@ This is the current syntax for  `./breeze <./breeze>`_:
           image building time in production image and at container entering 
time for CI image. One of:
 
                  1.10.12 1.10.11 1.10.10 1.10.9 1.10.8 1.10.7 1.10.6 1.10.5 
1.10.4 1.10.3 1.10.2
+                 wheel
 
   -t, --install-airflow-reference INSTALL_AIRFLOW_REFERENCE
           If specified, installs Airflow directly from reference in GitHub. 
This happens at
diff --git a/TESTING.rst b/TESTING.rst
index c8b170b..f6c6c10 100644
--- a/TESTING.rst
+++ b/TESTING.rst
@@ -418,6 +418,14 @@ can also decide to only run tests with ``-m quarantined`` 
flag to run only those
 Running Tests with Kubernetes
 =============================
 
+Airflow has tests that are run against real kubernetes cluster. We are using
+`Kind <https://kind.sigs.k8s.io/>`_ to create and run the cluster. We 
integrated the tools to start/stop/
+deploy and run the cluster tests in our repository and into Breeze development 
environment.
+
+Configuration for the cluster is kept in ``./build/.kube/config`` file in your 
Airflow source repository and
+our scripts set the ``KUBECONFIG`` variable to it. If you want to interact 
with the Kind cluster created
+you can do it from outside of the scripts by exporting this variable and point 
it to this file.
+
 Starting Kubernetes Cluster
 ---------------------------
 
@@ -425,7 +433,7 @@ For your testing you manage Kind cluster with 
``kind-cluster`` breeze command:
 
 .. code-block:: bash
 
-    ./breeze kind-cluster [ start | stop | recreate | status | deploy | test | 
shell ]
+    ./breeze kind-cluster [ start | stop | recreate | status | deploy | test | 
shell | k9s ]
 
 The command allows you to start/stop/recreate/status Kind Kubernetes cluster, 
deploy Airflow via Helm
 chart as well as interact with the cluster (via test and shell commands).
@@ -444,11 +452,11 @@ Deploying Airflow to Kubernetes Cluster
 
 Deploying Airflow to the Kubernetes cluster created is also done via 
``kind-cluster deploy`` breeze command:
 
-.. code-block:: bash`
+.. code-block:: bash
 
     ./breeze kind-cluster deploy
 
-The deploy commands performs tthose steps:
+The deploy commands performs those steps:
 
 1. It rebuilds the latest ``apache/airflow:master-pythonX.Y`` production 
images using the
    latest sources using local cachine. It also adds example DAGs to the image, 
so that they do not
@@ -465,20 +473,63 @@ Running tests with Kubernetes Cluster
 You can either run all tests or you can select which tests to run. You can 
also enter interactive virtualenv
 to run the tests manually one by one.
 
-.. code-block:: bash
+Running kubernetes tests via shell:
 
-    Running kubernetes tests
+.. code-block:: bash
 
       ./scripts/ci/kubernetes/ci_run_kubernetes_tests.sh                      
- runs all kubernetes tests
       ./scripts/ci/kubernetes/ci_run_kubernetes_tests.sh TEST [TEST ...]      
- runs selected kubernetes tests (from kubernetes_tests folder)
+
+
+Running kubernetes tests via breeze:
+
+.. code-block:: bash
+
+      ./breeze kind-cluster test
+      ./breeze kind-cluster test -- TEST TEST [TEST ...]
+
+
+Entering shell with Kubernetes Cluster
+--------------------------------------
+
+This shell is prepared to run kubernetes tests interactively. It has 
``kubectl`` and ``kind`` cli tools
+available in the path, it has also activated virtualenv environment that 
allows you to run tests via pytest.
+
+You can enter the shell via those scripts
+
       ./scripts/ci/kubernetes/ci_run_kubernetes_tests.sh [-i|--interactive]   
- Activates virtual environment ready to run tests and drops you in
       ./scripts/ci/kubernetes/ci_run_kubernetes_tests.sh [--help]             
- Prints this help message
 
 
-You can also run the same tests command with Breeze, using ``kind-cluster 
test`` command (to run all
-kubernetes tests) and with ``kind-cluster shell`` command you can enter 
interactive shell when you can
-run tests.
+.. code-block:: bash
+
+      ./breeze kind-cluster shell
+
 
+K9s CLI - debug kubernetes in style!
+------------------------------------
+
+Breeze has built-in integration with fantastic k9s CLI tool, that allows you 
to debug the kubernetes
+installation effortlessly and in style. K9S provides terminal (but windowed) 
CLI that allows you to
+easily observe what's going on in the kubernetes instance, observe the 
resources defined (pods, secrets,
+custom resource definitions), enter shell for the Pods/Containers running, see 
the log files and more.
+
+You can read more about k9s at `https://k9scli.io/ <https://k9scli.io/>`_
+
+Here is the screenshot of k9s tools in operation:
+
+.. image:: images/testing/k9s.png
+    :align: center
+    :alt: K9S tool
+
+
+You can enter the k9s tool via breeze (after you deployed Airflow):
+
+.. code-block:: bash
+
+      ./breeze kind-cluster k9s
+
+You can exit k9s by pressing Ctrl-C.
 
 Typical testing pattern for Kubernetes tests
 --------------------------------------------
@@ -578,7 +629,6 @@ This prepares and enters the virtualenv in 
``.build/.kubernetes_venv`` folder:
 
     ./breeze kind-cluster shell
 
-
 Once you enter the environment you receive this information:
 
 
@@ -595,12 +645,67 @@ Once you enter the environment you receive this 
information:
 
     You are entering the virtualenv now. Type exit to exit back to the 
original shell
 
+In a separate terminal you can open the k9s CLI:
+
+.. code-block:: bash
+
+    ./breeze kind-cluster k9s
+
+Use it to observe what's going on in your cluster.
+
+6. Debugging in IntelliJ/PyCharm
+
+It is very easy to running/debug Kubernetes tests with IntelliJ/PyCharm. 
Unlike the regular tests they are
+in ``kubernetes_tests`` folder and if you followed the previous steps and 
entered the shell using
+``./breeze kind-cluster shell`` command, you can setup your IDE very easily to 
run (and debug) your
+tests using the standard IntelliJ Run/Debug feature. You just need a few steps:
+
+a) Add the virtualenv as interpreter for the project:
+
+.. image:: images/testing/kubernetes-virtualenv.png
+    :align: center
+    :alt: Kubernetes testing virtualenv
+
+The virtualenv is created in your "Airflow" source directory in 
``.build/.kubernetes_venv/`` folder and you
+have to find ``python`` binary and choose it when selecting interpreter.
+
+b) Choose pytest as test runner:
+
+.. image:: images/testing/pytest-runner.png
+    :align: center
+    :alt: Pytest runner
+
+c) Run/Debug tests using standard "Run/Debug" feature of IntelliJ
+
+.. image:: images/testing/run-tests.png
+    :align: center
+    :alt: Run/Debug tests
+
+
+NOTE! The first time you run it, it will likely fail with
+``kubernetes.config.config_exception.ConfigException``:
+``Invalid kube-config file. Expected key current-context in kube-config``. You 
need to add KUBECONFIG
+environment variabl copying it from the result of "./breeze kind-cluster test":
+
+.. code-block:: bash
+
+    echo ${KUBECONFIG}
+
+    /home/jarek/code/airflow/.build/.kube/config
+
+
+.. image:: images/testing/kubeconfig-env.png
+    :align: center
+    :alt: Run/Debug tests
+
+
+The configuration for kubernetes is stored in your "Airflow" source directory 
in ".build/.kube/config" file
+and this is where KUBECONFIG env should point to.
 
 You can iterate with tests while you are in the virtualenv. All the tests 
requiring kubernetes cluster
 are in "kubernetes_tests" folder. You can add extra ``pytest`` parameters then 
(for example ``-s`` will
 print output generated test logs and print statements to the terminal 
immediately.
 
-
 .. code-block:: bash
 
     pytest 
kubernetes_tests/test_kubernetes_executor.py::TestKubernetesExecutor::test_integration_run_dag_with_scheduler_failure
 -s
@@ -609,6 +714,30 @@ print output generated test logs and print statements to 
the terminal immediatel
 You can modify the tests or KubernetesPodOperator and re-run them without 
re-deploying
 airflow to KinD cluster.
 
+
+Sometimes there are side effects from running tests. You can run 
``redeploy_airflow.sh`` without
+recreating the whole cluster. This will delete the whole namespace, including 
the database data
+and start a new Airflow deployment in the cluster.
+
+.. code-block:: bash
+
+    ./scripts/ci/redeploy_airflow.sh
+
+If needed you can also delete the cluster manually:
+
+
+.. code-block:: bash
+
+    kind get clusters
+    kind delete clusters <NAME_OF_THE_CLUSTER>
+
+Kind has also useful commands to inspect your running cluster:
+
+.. code-block:: text
+
+    kind --help
+
+
 However, when you change Airflow Kubernetes executor implementation you need 
to redeploy
 Airflow to the cluster.
 
@@ -617,7 +746,7 @@ Airflow to the cluster.
     ./breeze kind-cluster deploy
 
 
-5. Stop KinD cluster when you are done
+7. Stop KinD cluster when you are done
 
 .. code-block:: bash
 
diff --git a/breeze b/breeze
index 3498c64..55412d0 100755
--- a/breeze
+++ b/breeze
@@ -1684,6 +1684,13 @@ ${CMDNAME} kind-cluster [FLAGS] OPERATION
 
 ${FORMATTED_KIND_OPERATIONS}
 
+      The last two operations - shell and k9s allow you to perform interactive 
testing with
+      kubernetes tests. You can enter the shell from which you can run 
kubernetes tests and in
+      another terminal you can start the k9s CLI to debug kubernetes instance. 
It is an easy
+      way to debug the kubernetes deployments.
+
+      You can read more about k9s at https://k9scli.io/
+
 Flags:
 $(breeze::flag_airflow_variants)
 $(breeze::flag_build_docker_images)
@@ -2901,6 +2908,8 @@ function breeze::run_build_command() {
             echo "Run Kubernetes tests with the KinD cluster "
         elif [[ ${KIND_CLUSTER_OPERATION} == "shell" ]]; then
             echo "Enter an interactive shell for kubernetes testing"
+        elif [[ ${KIND_CLUSTER_OPERATION} == "k9s" ]]; then
+            echo "Run k9s cli to debug in style"
         elif [[ -z ${KIND_CLUSTER_OPERATION=} ]]; then
             echo
             echo "Please provide an operation to run"
diff --git a/breeze-complete b/breeze-complete
index 4855c86..94854ad 100644
--- a/breeze-complete
+++ b/breeze-complete
@@ -32,7 +32,7 @@ _breeze_allowed_helm_versions="v3.2.4"
 _breeze_allowed_kind_versions="v0.8.0"
 _breeze_allowed_mysql_versions="5.6 5.7"
 _breeze_allowed_postgres_versions="9.6 10 11 12 13"
-_breeze_allowed_kind_operations="start stop restart status deploy test shell"
+_breeze_allowed_kind_operations="start stop restart status deploy test shell 
k9s"
 _breeze_allowed_test_types="All Core Integration Heisentests Postgres MySQL 
Helm"
 
 # shellcheck disable=SC2034
@@ -60,6 +60,7 @@ _breeze_allowed_install_airflow_versions=$(cat <<-EOF
 1.10.4
 1.10.3
 1.10.2
+wheel
 EOF
 )
 
@@ -134,7 +135,7 @@ _breeze_long_options="
 help python: backend: integration:
 kubernetes-mode: kubernetes-version: helm-version: kind-version:
 skip-mounting-local-sources install-airflow-version: 
install-airflow-reference: db-reset
-verbose assume-yes assume-no assume-quit forward-credentials rbac-ui 
init-script:
+verbose assume-yes assume-no assume-quit forward-credentials init-script:
 force-build-images force-pull-images production-image extras: 
force-clean-images skip-rebuild-check
 build-cache-local build-cache-pulled build-cache-disabled disable-pip-cache
 dockerhub-user: dockerhub-repo: github-registry github-repository: 
github-image-id:
diff --git a/chart/requirements.lock b/chart/requirements.lock
index 3f3c34a..e460e9f 100644
--- a/chart/requirements.lock
+++ b/chart/requirements.lock
@@ -2,5 +2,5 @@ dependencies:
 - name: postgresql
   repository: https://charts.helm.sh/stable/
   version: 6.3.12
-digest: sha256:58d88cf56e78b2380091e9e16cc6ccf58b88b3abe4a1886dd47cd9faef5309af
-generated: "2020-11-04T15:59:36.967913-08:00"
+digest: sha256:1748aa702050d4e72ffba1b18960f49bfe5368757cf976116afeffbdedda1c98
+generated: "2020-11-07T17:40:45.418723358+01:00"
diff --git a/images/testing/k9s.png b/images/testing/k9s.png
new file mode 100644
index 0000000..a8eec97
Binary files /dev/null and b/images/testing/k9s.png differ
diff --git a/images/testing/kubeconfig-env.png 
b/images/testing/kubeconfig-env.png
new file mode 100644
index 0000000..b2ebfd5
Binary files /dev/null and b/images/testing/kubeconfig-env.png differ
diff --git a/images/testing/kubernetes-virtualenv.png 
b/images/testing/kubernetes-virtualenv.png
new file mode 100644
index 0000000..6e208d6
Binary files /dev/null and b/images/testing/kubernetes-virtualenv.png differ
diff --git a/images/testing/pytest-runner.png b/images/testing/pytest-runner.png
new file mode 100644
index 0000000..fdb48cc
Binary files /dev/null and b/images/testing/pytest-runner.png differ
diff --git a/images/testing/run-test.png b/images/testing/run-test.png
new file mode 100644
index 0000000..21a5c9d
Binary files /dev/null and b/images/testing/run-test.png differ
diff --git a/kubernetes_tests/test_kubernetes_executor.py 
b/kubernetes_tests/test_kubernetes_executor.py
index 694cf75..bb89cb7 100644
--- a/kubernetes_tests/test_kubernetes_executor.py
+++ b/kubernetes_tests/test_kubernetes_executor.py
@@ -64,10 +64,11 @@ class TestKubernetesExecutor(unittest.TestCase):
         return len(names)
 
     @staticmethod
-    def _delete_airflow_pod():
+    def _delete_airflow_pod(name=''):
+        suffix = '-' + name if name else ''
         air_pod = check_output(['kubectl', 'get', 'pods']).decode()
         air_pod = air_pod.split('\n')
-        names = [re.compile(r'\s+').split(x)[0] for x in air_pod if 'airflow' 
in x]
+        names = [re.compile(r'\s+').split(x)[0] for x in air_pod if 'airflow' 
+ suffix in x]
         if names:
             check_call(['kubectl', 'delete', 'pod', names[0]])
 
@@ -233,7 +234,7 @@ class TestKubernetesExecutor(unittest.TestCase):
 
         execution_date = self.start_job_in_kubernetes(dag_id, host)
 
-        self._delete_airflow_pod()
+        self._delete_airflow_pod("scheduler")
 
         time.sleep(10)  # give time for pod to restart
 
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 7a8674a..c2a0c62 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -15,43 +15,38 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import json
 import logging
 import os
+import random
 import shutil
 import sys
-import unittest
 import textwrap
+import unittest
+from unittest import mock
+from unittest.mock import ANY
 
-import kubernetes.client.models as k8s
 import pendulum
+from kubernetes.client import models as k8s
 from kubernetes.client.api_client import ApiClient
 from kubernetes.client.rest import ApiException
 
-from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
 from airflow.exceptions import AirflowException
 from airflow.kubernetes import kube_client
-from airflow.kubernetes.pod import Port
 from airflow.kubernetes.pod_generator import PodDefaults
 from airflow.kubernetes.pod_launcher import PodLauncher
-from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
 from airflow.kubernetes.secret import Secret
-from airflow.kubernetes.volume import Volume
-from airflow.kubernetes.volume_mount import VolumeMount
 from airflow.models import DAG, TaskInstance
+from airflow.contrib.operators.kubernetes_pod_operator import 
KubernetesPodOperator
 from airflow.utils import timezone
 from airflow.version import version as airflow_version
-from tests.compat import mock, patch
 
 
-# noinspection DuplicatedCode
 def create_context(task):
     dag = DAG(dag_id="dag")
     tzinfo = pendulum.timezone("Europe/Amsterdam")
     execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo)
-    task_instance = TaskInstance(task=task,
-                                 execution_date=execution_date)
+    task_instance = TaskInstance(task=task, execution_date=execution_date)
     return {
         "dag": dag,
         "ts": execution_date.isoformat(),
@@ -60,7 +55,11 @@ def create_context(task):
     }
 
 
-# noinspection DuplicatedCode,PyUnusedLocal
+def get_kubeconfig_path():
+    kubeconfig_path = os.environ.get('KUBECONFIG')
+    return kubeconfig_path if kubeconfig_path else 
os.path.expanduser('~/.kube/config')
+
+
 class TestKubernetesPodOperatorSystem(unittest.TestCase):
     def get_current_task_name(self):
         # reverse test name to make pod name unique (it has limited length)
@@ -74,29 +73,33 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             'kind': 'Pod',
             'metadata': {
                 'namespace': 'default',
-                'name': mock.ANY,
+                'name': ANY,
                 'annotations': {},
                 'labels': {
-                    'foo': 'bar', 'kubernetes_pod_operator': 'True',
+                    'foo': 'bar',
+                    'kubernetes_pod_operator': 'True',
                     'airflow_version': airflow_version.replace('+', '-'),
                     'execution_date': '2016-01-01T0100000100-a2f50a31f',
                     'dag_id': 'dag',
-                    'task_id': 'task',
-                    'try_number': '1'},
+                    'task_id': ANY,
+                    'try_number': '1',
+                },
             },
             'spec': {
                 'affinity': {},
-                'containers': [{
-                    'image': 'ubuntu:16.04',
-                    'args': ["echo 10"],
-                    'command': ["bash", "-cx"],
-                    'env': [],
-                    'imagePullPolicy': 'IfNotPresent',
-                    'envFrom': [],
-                    'name': 'base',
-                    'ports': [],
-                    'volumeMounts': [],
-                }],
+                'containers': [
+                    {
+                        'image': 'ubuntu:16.04',
+                        'args': ["echo 10"],
+                        'command': ["bash", "-cx"],
+                        'env': [],
+                        'envFrom': [],
+                        'resources': {},
+                        'name': 'base',
+                        'ports': [],
+                        'volumeMounts': [],
+                    }
+                ],
                 'hostNetwork': False,
                 'imagePullSecrets': [],
                 'initContainers': [],
@@ -106,29 +109,19 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                 'serviceAccountName': 'default',
                 'tolerations': [],
                 'volumes': [],
-            }
+            },
         }
 
     def tearDown(self):
         client = kube_client.get_kube_client(in_cluster=False)
         client.delete_collection_namespaced_pod(namespace="default")
+        import time
 
-    def create_context(self, task):
-        dag = DAG(dag_id="dag")
-        tzinfo = pendulum.timezone("Europe/Amsterdam")
-        execution_date = timezone.datetime(2016, 1, 1, 1, 0, 0, tzinfo=tzinfo)
-        task_instance = TaskInstance(task=task,
-                                     execution_date=execution_date)
-        return {
-            "dag": dag,
-            "ts": execution_date.isoformat(),
-            "task": task,
-            "ti": task_instance,
-        }
+        time.sleep(1)
 
     def test_do_xcom_push_defaults_false(self):
         new_config_path = '/tmp/kube_config'
-        old_config_path = os.path.expanduser('~/.kube/config')
+        old_config_path = get_kubeconfig_path()
         shutil.copy(old_config_path, new_config_path)
 
         k = KubernetesPodOperator(
@@ -137,8 +130,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             config_file=new_config_path,
@@ -147,7 +140,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
 
     def test_config_path_move(self):
         new_config_path = '/tmp/kube_config'
-        old_config_path = os.path.expanduser('~/.kube/config')
+        old_config_path = get_kubeconfig_path()
         shutil.copy(old_config_path, new_config_path)
 
         k = KubernetesPodOperator(
@@ -157,103 +150,16 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             arguments=["echo 10"],
             labels={"foo": "bar"},
             name="test1",
-            task_id="task",
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             config_file=new_config_path,
         )
-        context = self.create_context(k)
+        context = create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.assertEqual(self.expected_pod, actual_pod)
 
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
-    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_config_path(self, client_mock, monitor_mock, start_mock):  # 
pylint: disable=unused-argument
-        from airflow.utils.state import State
-
-        file_path = "/tmp/fake_file"
-        k = KubernetesPodOperator(
-            namespace='default',
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="test",
-            task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            config_file=file_path,
-            cluster_context='default',
-        )
-        monitor_mock.return_value = (State.SUCCESS, None)
-        client_mock.list_namespaced_pod.return_value = []
-        context = self.create_context(k)
-        k.execute(context=context)
-        client_mock.assert_called_once_with(
-            in_cluster=False,
-            cluster_context='default',
-            config_file=file_path,
-        )
-
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
-    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_image_pull_secrets_correctly_set(self, mock_client, monitor_mock, 
start_mock):
-        from airflow.utils.state import State
-
-        fake_pull_secrets = "fakeSecret"
-        k = KubernetesPodOperator(
-            namespace='default',
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="test",
-            task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            image_pull_secrets=fake_pull_secrets,
-            cluster_context='default',
-        )
-        monitor_mock.return_value = (State.SUCCESS, None)
-        context = self.create_context(k)
-        k.execute(context=context)
-        self.assertEqual(
-            start_mock.call_args[0][0].spec.image_pull_secrets,
-            [k8s.V1LocalObjectReference(name=fake_pull_secrets)]
-        )
-
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
-    @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.delete_pod")
-    @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_pod_delete_even_on_launcher_error(
-            self,
-            mock_client,
-            delete_pod_mock,
-            monitor_pod_mock,
-            start_pod_mock):  # pylint: disable=unused-argument
-        k = KubernetesPodOperator(
-            namespace='default',
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            arguments=["echo 10"],
-            labels={"foo": "bar"},
-            name="test",
-            task_id="task",
-            in_cluster=False,
-            do_xcom_push=False,
-            cluster_context='default',
-            is_delete_operator_pod=True,
-        )
-        monitor_pod_mock.side_effect = AirflowException('fake failure')
-        with self.assertRaises(AirflowException):
-            context = self.create_context(k)
-            k.execute(context=context)
-        assert delete_pod_mock.called
-
     def test_working_pod(self):
         k = KubernetesPodOperator(
             namespace='default',
@@ -261,8 +167,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
         )
@@ -279,49 +185,15 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             is_delete_operator_pod=True,
         )
-        context = self.create_context(k)
-        k.execute(context)
-        actual_pod = self.api_client.sanitize_for_serialization(k.pod)
-        self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
-        self.assertEqual(self.expected_pod['metadata']['labels'], 
actual_pod['metadata']['labels'])
-
-    def test_pod_with_volume_secret(self):
-        k = KubernetesPodOperator(
-            namespace='default',
-            image="ubuntu:16.04",
-            cmds=["bash", "-cx"],
-            in_cluster=False,
-            labels={"foo": "bar"},
-            arguments=["echo 10"],
-            secrets=[Secret(
-                deploy_type="volume",
-                deploy_target="/var/location",
-                secret="my-secret",
-                key="content.json",
-            )],
-            name="airflow-test-pod",
-            task_id="task",
-            get_logs=True,
-            is_delete_operator_pod=True,
-        )
-
-        context = self.create_context(k)
+        context = create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
-        self.expected_pod['spec']['containers'][0]['volumeMounts'] = [
-            {'mountPath': '/var/location',
-             'name': mock.ANY,
-             'readOnly': True}]
-        self.expected_pod['spec']['volumes'] = [
-            {'name': mock.ANY,
-             'secret': {'secretName': 'my-secret'}}
-        ]
         self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
         self.assertEqual(self.expected_pod['metadata']['labels'], 
actual_pod['metadata']['labels'])
 
@@ -332,13 +204,13 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             hostnetwork=True,
         )
-        context = self.create_context(k)
+        context = create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['hostNetwork'] = True
@@ -353,14 +225,14 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             hostnetwork=True,
-            dnspolicy=dns_policy
+            dnspolicy=dns_policy,
         )
-        context = self.create_context(k)
+        context = create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['hostNetwork'] = True
@@ -376,32 +248,28 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
-            schedulername=scheduler_name
+            schedulername=scheduler_name,
         )
-        context = self.create_context(k)
+        context = create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['schedulerName'] = scheduler_name
         self.assertEqual(self.expected_pod, actual_pod)
-        self.assertEqual(self.expected_pod['spec'], actual_pod['spec'])
-        self.assertEqual(self.expected_pod['metadata']['labels'], 
actual_pod['metadata']['labels'])
 
     def test_pod_node_selectors(self):
-        node_selectors = {
-            'beta.kubernetes.io/os': 'linux'
-        }
+        node_selectors = {'beta.kubernetes.io/os': 'linux'}
         k = KubernetesPodOperator(
             namespace='default',
             image="ubuntu:16.04",
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             node_selectors=node_selectors,
@@ -413,40 +281,28 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         self.assertEqual(self.expected_pod, actual_pod)
 
     def test_pod_resources(self):
-        resources = {
-            'limit_cpu': 0.25,
-            'limit_memory': '64Mi',
-            'limit_ephemeral_storage': '2Gi',
-            'request_cpu': '250m',
-            'request_memory': '64Mi',
-            'request_ephemeral_storage': '1Gi',
-        }
+        resources = k8s.V1ResourceRequirements(
+            requests={'memory': '64Mi', 'cpu': '250m', 'ephemeral-storage': 
'1Gi'},
+            limits={'memory': '64Mi', 'cpu': 0.25, 'nvidia.com/gpu': None, 
'ephemeral-storage': '2Gi'},
+        )
         k = KubernetesPodOperator(
             namespace='default',
             image="ubuntu:16.04",
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             resources=resources,
         )
-        context = self.create_context(k)
+        context = create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['containers'][0]['resources'] = {
-            'requests': {
-                'memory': '64Mi',
-                'cpu': '250m',
-                'ephemeral-storage': '1Gi'
-            },
-            'limits': {
-                'memory': '64Mi',
-                'cpu': 0.25,
-                'ephemeral-storage': '2Gi'
-            }
+            'requests': {'memory': '64Mi', 'cpu': '250m', 'ephemeral-storage': 
'1Gi'},
+            'limits': {'memory': '64Mi', 'cpu': 0.25, 'nvidia.com/gpu': None, 
'ephemeral-storage': '2Gi'},
         }
         self.assertEqual(self.expected_pod, actual_pod)
 
@@ -457,11 +313,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                     'nodeSelectorTerms': [
                         {
                             'matchExpressions': [
-                                {
-                                    'key': 'beta.kubernetes.io/os',
-                                    'operator': 'In',
-                                    'values': ['linux']
-                                }
+                                {'key': 'beta.kubernetes.io/os', 'operator': 
'In', 'values': ['linux']}
                             ]
                         }
                     ]
@@ -474,8 +326,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             affinity=affinity,
@@ -487,7 +339,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         self.assertEqual(self.expected_pod, actual_pod)
 
     def test_port(self):
-        port = Port('http', 80)
+        port = k8s.V1ContainerPort(
+            name='http',
+            container_port=80,
+        )
 
         k = KubernetesPodOperator(
             namespace='default',
@@ -495,37 +350,33 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             ports=[port],
         )
-        context = self.create_context(k)
+        context = create_context(k)
         k.execute(context=context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
-        self.expected_pod['spec']['containers'][0]['ports'] = [{
-            'name': 'http',
-            'containerPort': 80
-        }]
+        self.expected_pod['spec']['containers'][0]['ports'] = [{'name': 
'http', 'containerPort': 80}]
         self.assertEqual(self.expected_pod, actual_pod)
 
     def test_volume_mount(self):
-        with patch.object(PodLauncher, 'log') as mock_logger:
-            volume_mount = VolumeMount('test-volume',
-                                       mount_path='/tmp/test_volume',
-                                       sub_path=None,
-                                       read_only=False)
-
-            volume_config = {
-                'persistentVolumeClaim':
-                    {
-                        'claimName': 'test-volume'
-                    }
-            }
-            volume = Volume(name='test-volume', configs=volume_config)
-            args = ["echo \"retrieved from mount\" > /tmp/test_volume/test.txt 
"
-                    "&& cat /tmp/test_volume/test.txt"]
+        with mock.patch.object(PodLauncher, 'log') as mock_logger:
+            volume_mount = k8s.V1VolumeMount(
+                name='test-volume', mount_path='/tmp/test_volume', 
sub_path=None, read_only=False
+            )
+
+            volume = k8s.V1Volume(
+                name='test-volume',
+                
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
+            )
+
+            args = [
+                "echo \"retrieved from mount\" > /tmp/test_volume/test.txt "
+                "&& cat /tmp/test_volume/test.txt"
+            ]
             k = KubernetesPodOperator(
                 namespace='default',
                 image="ubuntu:16.04",
@@ -534,27 +385,22 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                 labels={"foo": "bar"},
                 volume_mounts=[volume_mount],
                 volumes=[volume],
-                name="test",
-                task_id="task",
+                name="test-" + str(random.randint(0, 1000000)),
+                task_id="task" + self.get_current_task_name(),
                 in_cluster=False,
                 do_xcom_push=False,
             )
             context = create_context(k)
             k.execute(context=context)
-            mock_logger.info.assert_any_call(b"retrieved from mount\n")
+            mock_logger.info.assert_any_call('retrieved from mount')
             actual_pod = self.api_client.sanitize_for_serialization(k.pod)
             self.expected_pod['spec']['containers'][0]['args'] = args
-            self.expected_pod['spec']['containers'][0]['volumeMounts'] = [{
-                'name': 'test-volume',
-                'mountPath': '/tmp/test_volume',
-                'readOnly': False
-            }]
-            self.expected_pod['spec']['volumes'] = [{
-                'name': 'test-volume',
-                'persistentVolumeClaim': {
-                    'claimName': 'test-volume'
-                }
-            }]
+            self.expected_pod['spec']['containers'][0]['volumeMounts'] = [
+                {'name': 'test-volume', 'mountPath': '/tmp/test_volume', 
'readOnly': False}
+            ]
+            self.expected_pod['spec']['volumes'] = [
+                {'name': 'test-volume', 'persistentVolumeClaim': {'claimName': 
'test-volume'}}
+            ]
             self.assertEqual(self.expected_pod, actual_pod)
 
     def test_run_as_user_root(self):
@@ -569,8 +415,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             security_context=security_context,
@@ -594,8 +440,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             security_context=security_context,
@@ -619,8 +465,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-fs-group",
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             security_context=security_context,
@@ -639,8 +485,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             startup_timeout_seconds=5,
@@ -660,8 +506,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             startup_timeout_seconds=5,
@@ -685,8 +531,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=bad_internal_command,
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
         )
@@ -699,15 +545,15 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
 
     def test_xcom_push(self):
         return_value = '{"foo": "bar"\n, "buzz": 2}'
-        args = ['echo \'{}\' > /airflow/xcom/return.json'.format(return_value)]
+        args = ['echo \'' + str(return_value) + '\' > 
/airflow/xcom/return.json']
         k = KubernetesPodOperator(
             namespace='default',
             image="ubuntu:16.04",
             cmds=["bash", "-cx"],
             arguments=args,
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=True,
         )
@@ -730,7 +576,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         # GIVEN
         from airflow.utils.state import State
 
-        configmap = 'test-configmap'
+        configmap_name = "test-config-map"
+        env_from = 
[k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap_name))]
         # WHEN
         k = KubernetesPodOperator(
             namespace='default',
@@ -738,22 +585,17 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
-            configmaps=[configmap],
+            env_from=env_from,
         )
         # THEN
         mock_monitor.return_value = (State.SUCCESS, None)
-        context = self.create_context(k)
+        context = create_context(k)
         k.execute(context)
-        self.assertEqual(
-            mock_start.call_args[0][0].spec.containers[0].env_from,
-            [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(
-                name=configmap
-            ))]
-        )
+        
self.assertEqual(mock_start.call_args[0][0].spec.containers[0].env_from, 
env_from)
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
@@ -761,6 +603,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
     def test_envs_from_secrets(self, mock_client, monitor_mock, start_mock):
         # GIVEN
         from airflow.utils.state import State
+
         secret_ref = 'secret_name'
         secrets = [Secret('env', None, secret_ref)]
         # WHEN
@@ -771,34 +614,40 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             arguments=["echo 10"],
             secrets=secrets,
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
         )
         # THEN
         monitor_mock.return_value = (State.SUCCESS, None)
-        context = self.create_context(k)
+        context = create_context(k)
         k.execute(context)
         self.assertEqual(
             start_mock.call_args[0][0].spec.containers[0].env_from,
-            [k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(
-                name=secret_ref
-            ))]
+            
[k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(name=secret_ref))],
         )
 
     def test_env_vars(self):
         # WHEN
+        env_vars = [
+            k8s.V1EnvVar(name="ENV1", value="val1"),
+            k8s.V1EnvVar(name="ENV2", value="val2"),
+            k8s.V1EnvVar(
+                name="ENV3",
+                
value_from=k8s.V1EnvVarSource(field_ref=k8s.V1ObjectFieldSelector(field_path="status.podIP")),
+            ),
+        ]
+
         k = KubernetesPodOperator(
             namespace='default',
             image="ubuntu:16.04",
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
-            env_vars={"ENV1": "val1", "ENV2": "val2", },
-            pod_runtime_info_envs=[PodRuntimeInfoEnv("ENV3", "status.podIP")],
+            env_vars=env_vars,
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
         )
@@ -811,14 +660,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         self.expected_pod['spec']['containers'][0]['env'] = [
             {'name': 'ENV1', 'value': 'val1'},
             {'name': 'ENV2', 'value': 'val2'},
-            {
-                'name': 'ENV3',
-                'valueFrom': {
-                    'fieldRef': {
-                        'fieldPath': 'status.podIP'
-                    }
-                }
-            }
+            {'name': 'ENV3', 'valueFrom': {'fieldRef': {'fieldPath': 
'status.podIP'}}},
         ]
         self.assertEqual(self.expected_pod, actual_pod)
 
@@ -828,7 +670,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             pod_template_file=fixture,
-            do_xcom_push=True
+            do_xcom_push=True,
         )
 
         context = create_context(k)
@@ -841,10 +683,10 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         k = KubernetesPodOperator(
             task_id="task" + self.get_current_task_name(),
             labels={"foo": "bar", "fizz": "buzz"},
-            env_vars={"env_name": "value"},
+            env_vars=[k8s.V1EnvVar(name="env_name", value="value")],
             in_cluster=False,
             pod_template_file=fixture,
-            do_xcom_push=True
+            do_xcom_push=True,
         )
 
         context = create_context(k)
@@ -856,20 +698,14 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
 
     def test_init_container(self):
         # GIVEN
-        volume_mounts = [k8s.V1VolumeMount(
-            mount_path='/etc/foo',
-            name='test-volume',
-            sub_path=None,
-            read_only=True
-        )]
-
-        init_environments = [k8s.V1EnvVar(
-            name='key1',
-            value='value1'
-        ), k8s.V1EnvVar(
-            name='key2',
-            value='value2'
-        )]
+        volume_mounts = [
+            k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', 
sub_path=None, read_only=True)
+        ]
+
+        init_environments = [
+            k8s.V1EnvVar(name='key1', value='value1'),
+            k8s.V1EnvVar(name='key2', value='value2'),
+        ]
 
         init_container = k8s.V1Container(
             name="init-container",
@@ -877,34 +713,20 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             env=init_environments,
             volume_mounts=volume_mounts,
             command=["bash", "-cx"],
-            args=["echo 10"]
+            args=["echo 10"],
         )
 
-        volume_config = {
-            'persistentVolumeClaim':
-                {
-                    'claimName': 'test-volume'
-                }
-        }
-        volume = Volume(name='test-volume', configs=volume_config)
-
+        volume = k8s.V1Volume(
+            name='test-volume',
+            
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'),
+        )
         expected_init_container = {
             'name': 'init-container',
             'image': 'ubuntu:16.04',
             'command': ['bash', '-cx'],
             'args': ['echo 10'],
-            'env': [{
-                'name': 'key1',
-                'value': 'value1'
-            }, {
-                'name': 'key2',
-                'value': 'value2'
-            }],
-            'volumeMounts': [{
-                'mountPath': '/etc/foo',
-                'name': 'test-volume',
-                'readOnly': True
-            }],
+            'env': [{'name': 'key1', 'value': 'value1'}, {'name': 'key2', 
'value': 'value2'}],
+            'volumeMounts': [{'mountPath': '/etc/foo', 'name': 'test-volume', 
'readOnly': True}],
         }
 
         k = KubernetesPodOperator(
@@ -913,8 +735,8 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             volumes=[volume],
             init_containers=[init_container],
             in_cluster=False,
@@ -924,30 +746,30 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['initContainers'] = [expected_init_container]
-        self.expected_pod['spec']['volumes'] = [{
-            'name': 'test-volume',
-            'persistentVolumeClaim': {
-                'claimName': 'test-volume'
-            }
-        }]
+        self.expected_pod['spec']['volumes'] = [
+            {'name': 'test-volume', 'persistentVolumeClaim': {'claimName': 
'test-volume'}}
+        ]
         self.assertEqual(self.expected_pod, actual_pod)
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
     @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
-    def test_pod_template_file(self, mock_client, monitor_mock, start_mock):
+    def test_pod_template_file(
+        self, mock_client, monitor_mock, start_mock  # pylint: 
disable=unused-argument
+    ):
         from airflow.utils.state import State
-        fixture = sys.path[0] + '/tests/kubernetes/pod.yaml'
+
+        path = sys.path[0] + '/tests/kubernetes/pod.yaml'
         k = KubernetesPodOperator(
-            task_id='task',
-            pod_template_file=fixture,
-            do_xcom_push=True
+            task_id="task" + self.get_current_task_name(), 
pod_template_file=path, do_xcom_push=True
         )
+
         monitor_mock.return_value = (State.SUCCESS, None)
         context = create_context(k)
         with self.assertLogs(k.log, level=logging.DEBUG) as cm:
             k.execute(context)
-            expected_line = textwrap.dedent("""\
+            expected_line = textwrap.dedent(
+                """\
             DEBUG:airflow.task.operators:Starting pod:
             api_version: v1
             kind: Pod
@@ -956,65 +778,57 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
               cluster_name: null
               creation_timestamp: null
               deletion_grace_period_seconds: null\
-            """).strip()
+            """
+            ).strip()
             self.assertTrue(any(line.startswith(expected_line) for line in 
cm.output))
 
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
-        expected_dict = {'apiVersion': 'v1',
-                         'kind': 'Pod',
-                         'metadata': {'annotations': {},
-                                      'labels': {},
-                                      'name': 'memory-demo',
-                                      'namespace': 'mem-example'},
-                         'spec': {'affinity': {},
-                                  'containers': [{'args': ['--vm',
-                                                           '1',
-                                                           '--vm-bytes',
-                                                           '150M',
-                                                           '--vm-hang',
-                                                           '1'],
-                                                  'command': ['stress'],
-                                                  'env': [],
-                                                  'envFrom': [],
-                                                  'image': 
'apache/airflow:stress-2020.07.10-1.0.4',
-                                                  'imagePullPolicy': 
'IfNotPresent',
-                                                  'name': 'base',
-                                                  'ports': [],
-                                                  'resources': {'limits': 
{'memory': '200Mi'},
-                                                                'requests': 
{'memory': '100Mi'}},
-                                                  'volumeMounts': 
[{'mountPath': '/airflow/xcom',
-                                                                    'name': 
'xcom'}]},
-                                                 {'command': ['sh',
-                                                              '-c',
-                                                              'trap "exit 0" 
INT; while true; do sleep '
-                                                              '30; done;'],
-                                                  'image': 'alpine',
-                                                  'name': 
'airflow-xcom-sidecar',
-                                                  'resources': {'requests': 
{'cpu': '1m'}},
-                                                  'volumeMounts': 
[{'mountPath': '/airflow/xcom',
-                                                                    'name': 
'xcom'}]}],
-                                  'hostNetwork': False,
-                                  'imagePullSecrets': [],
-                                  'initContainers': [],
-                                  'nodeSelector': {},
-                                  'restartPolicy': 'Never',
-                                  'securityContext': {},
-                                  'serviceAccountName': 'default',
-                                  'tolerations': [],
-                                  'volumes': [{'emptyDir': {}, 'name': 
'xcom'}]}}
+        expected_dict = {
+            'apiVersion': 'v1',
+            'kind': 'Pod',
+            'metadata': {'annotations': {}, 'labels': {}, 'name': 
'memory-demo', 'namespace': 'mem-example'},
+            'spec': {
+                'affinity': {},
+                'containers': [
+                    {
+                        'args': ['--vm', '1', '--vm-bytes', '150M', 
'--vm-hang', '1'],
+                        'command': ['stress'],
+                        'env': [],
+                        'envFrom': [],
+                        'image': 'apache/airflow:stress-2020.07.10-1.0.4',
+                        'name': 'base',
+                        'ports': [],
+                        'resources': {'limits': {'memory': '200Mi'}, 
'requests': {'memory': '100Mi'}},
+                        'volumeMounts': [{'mountPath': '/airflow/xcom', 
'name': 'xcom'}],
+                    },
+                    {
+                        'command': ['sh', '-c', 'trap "exit 0" INT; while 
true; do sleep 30; done;'],
+                        'image': 'alpine',
+                        'name': 'airflow-xcom-sidecar',
+                        'resources': {'requests': {'cpu': '1m'}},
+                        'volumeMounts': [{'mountPath': '/airflow/xcom', 
'name': 'xcom'}],
+                    },
+                ],
+                'hostNetwork': False,
+                'imagePullSecrets': [],
+                'initContainers': [],
+                'nodeSelector': {},
+                'restartPolicy': 'Never',
+                'securityContext': {},
+                'serviceAccountName': 'default',
+                'tolerations': [],
+                'volumes': [{'emptyDir': {}, 'name': 'xcom'}],
+            },
+        }
         self.assertEqual(expected_dict, actual_pod)
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.start_pod")
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
     @mock.patch("airflow.kubernetes.kube_client.get_kube_client")
     def test_pod_priority_class_name(
-            self,
-            mock_client,
-            monitor_mock,
-            start_mock):  # pylint: disable=unused-argument
-        """Test ability to assign priorityClassName to pod
-
-        """
+        self, mock_client, monitor_mock, start_mock  # pylint: 
disable=unused-argument
+    ):
+        """Test ability to assign priorityClassName to pod"""
         from airflow.utils.state import State
 
         priority_class_name = "medium-test"
@@ -1024,15 +838,15 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             cmds=["bash", "-cx"],
             arguments=["echo 10"],
             labels={"foo": "bar"},
-            name="test",
-            task_id="task",
+            name="test-" + str(random.randint(0, 1000000)),
+            task_id="task" + self.get_current_task_name(),
             in_cluster=False,
             do_xcom_push=False,
             priority_class_name=priority_class_name,
         )
 
         monitor_mock.return_value = (State.SUCCESS, None)
-        context = self.create_context(k)
+        context = create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
         self.expected_pod['spec']['priorityClassName'] = priority_class_name
@@ -1048,15 +862,15 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
                 arguments=["echo 10"],
                 labels={"foo": "bar"},
                 name=pod_name_too_long,
-                task_id="task",
+                task_id="task" + self.get_current_task_name(),
                 in_cluster=False,
                 do_xcom_push=False,
             )
 
     @mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod")
-    def test_on_kill(self,
-                     monitor_mock):  # pylint: disable=unused-argument
+    def test_on_kill(self, monitor_mock):  # pylint: disable=unused-argument
         from airflow.utils.state import State
+
         client = kube_client.get_kube_client(in_cluster=False)
         name = "test"
         namespace = "default"
@@ -1082,4 +896,46 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         with self.assertRaises(ApiException):
             pod = client.read_namespaced_pod(name=name, namespace=namespace)
 
+    def test_reattach_failing_pod_once(self):
+        from airflow.utils.state import State
+
+        client = kube_client.get_kube_client(in_cluster=False)
+        name = "test"
+        namespace = "default"
+        k = KubernetesPodOperator(
+            namespace='default',
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["exit 1"],
+            labels={"foo": "bar"},
+            name="test",
+            task_id=name,
+            in_cluster=False,
+            do_xcom_push=False,
+            is_delete_operator_pod=False,
+            termination_grace_period=0,
+        )
+
+        context = create_context(k)
+
+        with 
mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") as 
monitor_mock:
+            monitor_mock.return_value = (State.SUCCESS, None)
+            k.execute(context)
+            name = k.pod.metadata.name
+            pod = client.read_namespaced_pod(name=name, namespace=namespace)
+            while pod.status.phase != "Failed":
+                pod = client.read_namespaced_pod(name=name, 
namespace=namespace)
+        with self.assertRaises(AirflowException):
+            k.execute(context)
+        pod = client.read_namespaced_pod(name=name, namespace=namespace)
+        self.assertEqual(pod.metadata.labels["already_checked"], "True")
+        with mock.patch(
+            
"airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator"
+            ".create_new_pod_for_operator"
+        ) as create_mock:
+            create_mock.return_value = ("success", {}, {})
+            k.execute(context)
+            create_mock.assert_called_once()
+
+
 # pylint: enable=unused-argument
diff --git a/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh 
b/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh
index 4f13335..bcb5cf4 100755
--- a/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh
+++ b/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh
@@ -21,7 +21,6 @@
 kind::make_sure_kubernetes_tools_are_installed
 kind::get_kind_cluster_name
 
-traps::add_trap kind::stop_kubectl EXIT HUP INT TERM
 traps::add_trap kind::dump_kind_logs EXIT HUP INT TERM
 
 interactive="false"
@@ -82,8 +81,13 @@ if [[ ! -d ${virtualenv_path} ]]; then
     python -m venv "${virtualenv_path}"
 fi
 
+# In Python 3.5 activating virtualenv hits undefined variable
+set +u
+
 . "${virtualenv_path}/bin/activate"
 
+set -u
+
 pip install --upgrade pip==20.2.3
 
 pip install pytest freezegun pytest-cov \
@@ -105,7 +109,6 @@ if [[ ${interactive} == "true" ]]; then
     echo
     echo "You are entering the virtualenv now. Type exit to exit back to the 
original shell"
     echo
-    kubectl config set-context --current --namespace=airflow
     exec "${SHELL}"
 else
     pytest "${pytest_args[@]}" "${tests_to_run[@]}"
diff --git a/scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh 
b/scripts/ci/kubernetes/ci_setup_cluster_and_deploy_airflow_to_kubernetes.sh
similarity index 94%
copy from scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh
copy to 
scripts/ci/kubernetes/ci_setup_cluster_and_deploy_airflow_to_kubernetes.sh
index 2a7455a..e12f809 100755
--- a/scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh
+++ b/scripts/ci/kubernetes/ci_setup_cluster_and_deploy_airflow_to_kubernetes.sh
@@ -23,10 +23,11 @@ traps::add_trap "kind::dump_kind_logs" EXIT HUP INT TERM
 
 kind::make_sure_kubernetes_tools_are_installed
 kind::get_kind_cluster_name
+kind::perform_kind_cluster_operation "start"
 build_images::prepare_prod_build
 build_images::build_prod_images
 kind::build_image_for_kubernetes_tests
 kind::load_image_to_kind_cluster
 kind::deploy_airflow_with_helm
-kind::forward_port_to_kind_webserver
 kind::deploy_test_kubernetes_resources
+kind::wait_for_webserver_healthy
diff --git a/scripts/ci/kubernetes/kind-cluster-conf.yaml 
b/scripts/ci/kubernetes/kind-cluster-conf.yaml
index 348fb68..df60820 100644
--- a/scripts/ci/kubernetes/kind-cluster-conf.yaml
+++ b/scripts/ci/kubernetes/kind-cluster-conf.yaml
@@ -23,6 +23,11 @@ networking:
 nodes:
   - role: control-plane
   - role: worker
+    extraPortMappings:
+      - containerPort: 30007
+        hostPort: 8080
+        listenAddress: "0.0.0.0"
+        protocol: TCP
 kubeadmConfigPatchesJson6902:
   - group: kubeadm.k8s.io
     version: v1beta2
diff --git a/scripts/ci/kubernetes/kind-cluster-conf.yaml 
b/scripts/ci/kubernetes/nodeport.yaml
similarity index 68%
copy from scripts/ci/kubernetes/kind-cluster-conf.yaml
copy to scripts/ci/kubernetes/nodeport.yaml
index 348fb68..8438281 100644
--- a/scripts/ci/kubernetes/kind-cluster-conf.yaml
+++ b/scripts/ci/kubernetes/nodeport.yaml
@@ -15,19 +15,17 @@
 # specific language governing permissions and limitations
 # under the License.
 ---
-kind: Cluster
-apiVersion: kind.sigs.k8s.io/v1alpha3
-networking:
-  apiServerAddress: 0.0.0.0
-  apiServerPort: 19090
-nodes:
-  - role: control-plane
-  - role: worker
-kubeadmConfigPatchesJson6902:
-  - group: kubeadm.k8s.io
-    version: v1beta2
-    kind: ClusterConfiguration
-    patch: |
-      - op: add
-        path: /apiServer/certSANs/-
-        value: docker
+apiVersion: v1
+kind: Service
+metadata:
+  name: airflow-webserver-node-port
+spec:
+  type: NodePort
+  selector:
+    component: webserver
+    release: airflow
+    tier: airflow
+  ports:
+    - port: 8080
+      targetPort: 8080
+      nodePort: 30007
diff --git a/scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh 
b/scripts/ci/kubernetes/redeploy_airflow.sh
similarity index 86%
rename from scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh
rename to scripts/ci/kubernetes/redeploy_airflow.sh
index 2a7455a..7803d7c 100755
--- a/scripts/ci/kubernetes/ci_deploy_app_to_kubernetes.sh
+++ b/scripts/ci/kubernetes/redeploy_airflow.sh
@@ -23,10 +23,6 @@ traps::add_trap "kind::dump_kind_logs" EXIT HUP INT TERM
 
 kind::make_sure_kubernetes_tools_are_installed
 kind::get_kind_cluster_name
-build_images::prepare_prod_build
-build_images::build_prod_images
-kind::build_image_for_kubernetes_tests
-kind::load_image_to_kind_cluster
 kind::deploy_airflow_with_helm
-kind::forward_port_to_kind_webserver
 kind::deploy_test_kubernetes_resources
+kind::wait_for_webserver_healthy
diff --git a/scripts/ci/libraries/_kind.sh b/scripts/ci/libraries/_kind.sh
index 6194742..defa4de 100644
--- a/scripts/ci/libraries/_kind.sh
+++ b/scripts/ci/libraries/_kind.sh
@@ -16,14 +16,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-
-function kind::get_kind_cluster_name(){
+function kind::get_kind_cluster_name() {
     # Name of the KinD cluster to connect to
     export 
KIND_CLUSTER_NAME=${KIND_CLUSTER_NAME:="airflow-python-${PYTHON_MAJOR_MINOR_VERSION}-${KUBERNETES_VERSION}"}
     readonly KIND_CLUSTER_NAME
     # Name of the KinD cluster to connect to when referred to via kubectl
     export KUBECTL_CLUSTER_NAME=kind-${KIND_CLUSTER_NAME}
     readonly KUBECTL_CLUSTER_NAME
+    export KUBECONFIG="${BUILD_CACHE_DIR}/.kube/config"
+    mkdir -pv "${BUILD_CACHE_DIR}/.kube/"
+    touch "${KUBECONFIG}"
 }
 
 function kind::dump_kind_logs() {
@@ -40,7 +42,7 @@ function kind::dump_kind_logs() {
 }
 
 function kind::make_sure_kubernetes_tools_are_installed() {
-    SYSTEM=$(uname -s| tr '[:upper:]' '[:lower:]')
+    SYSTEM=$(uname -s | tr '[:upper:]' '[:lower:]')
 
     
KIND_URL="https://github.com/kubernetes-sigs/kind/releases/download/${KIND_VERSION}/kind-${SYSTEM}-amd64";
     mkdir -pv "${BUILD_CACHE_DIR}/bin"
@@ -48,7 +50,7 @@ function kind::make_sure_kubernetes_tools_are_installed() {
         DOWNLOADED_KIND_VERSION=v"$(${KIND_BINARY_PATH} --version | awk '{ 
print $3 }')"
         echo "Currently downloaded kind version = ${DOWNLOADED_KIND_VERSION}"
     fi
-    if [[ ! -f "${KIND_BINARY_PATH}"  || ${DOWNLOADED_KIND_VERSION} != 
"${KIND_VERSION}" ]]; then
+    if [[ ! -f "${KIND_BINARY_PATH}" || ${DOWNLOADED_KIND_VERSION} != 
"${KIND_VERSION}" ]]; then
         echo
         echo "Downloading Kind version ${KIND_VERSION}"
         repeats::run_with_retry 4 \
@@ -94,24 +96,10 @@ function kind::make_sure_kubernetes_tools_are_installed() {
 }
 
 function kind::create_cluster() {
-    if [[ "${TRAVIS:="false"}" == "true" ]]; then
-        # Travis CI does not handle the nice output of Kind well, so we need 
to capture it
-        # And display only if kind fails to start
-        start_output_heartbeat "Creating kubernetes cluster" 10
-        set +e
-        if ! OUTPUT=$(kind create cluster \
-                        --name "${KIND_CLUSTER_NAME}" \
-                        --config 
"${AIRFLOW_SOURCES}/scripts/ci/kubernetes/kind-cluster-conf.yaml" \
-                        --image "kindest/node:${KUBERNETES_VERSION}" 2>&1); 
then
-            echo "${OUTPUT}"
-        fi
-        stop_output_heartbeat
-    else
-        kind create cluster \
-            --name "${KIND_CLUSTER_NAME}" \
-            --config 
"${AIRFLOW_SOURCES}/scripts/ci/kubernetes/kind-cluster-conf.yaml" \
-            --image "kindest/node:${KUBERNETES_VERSION}"
-    fi
+    kind create cluster \
+        --name "${KIND_CLUSTER_NAME}" \
+        --config 
"${AIRFLOW_SOURCES}/scripts/ci/kubernetes/kind-cluster-conf.yaml" \
+        --image "kindest/node:${KUBERNETES_VERSION}"
     echo
     echo "Created cluster ${KIND_CLUSTER_NAME}"
     echo
@@ -125,9 +113,12 @@ function kind::delete_cluster() {
     rm -rf "${HOME}/.kube/*"
 }
 
-function kind::perform_kind_cluster_operation() {
-    ALLOWED_KIND_OPERATIONS="[ start restart stop deploy test shell recreate ]"
+function kind::set_current_context() {
+    kubectl config set-context --current --namespace=airflow
+}
 
+function kind::perform_kind_cluster_operation() {
+    ALLOWED_KIND_OPERATIONS="[ start restart stop deploy test shell recreate 
k9s]"
     set +u
     if [[ -z "${1=}" ]]; then
         echo >&2
@@ -170,6 +161,7 @@ function kind::perform_kind_cluster_operation() {
             echo
             kind::delete_cluster
             kind::create_cluster
+            kind::set_current_context
         elif [[ ${OPERATION} == "stop" ]]; then
             echo
             echo "Deleting cluster"
@@ -181,20 +173,35 @@ function kind::perform_kind_cluster_operation() {
             echo "Deploying Airflow to KinD"
             echo
             kind::build_image_for_kubernetes_tests
+            kind::get_kind_cluster_name
             kind::load_image_to_kind_cluster
             kind::deploy_airflow_with_helm
-            kind::forward_port_to_kind_webserver
             kind::deploy_test_kubernetes_resources
+            kind::wait_for_webserver_healthy
         elif [[ ${OPERATION} == "test" ]]; then
             echo
             echo "Testing with KinD"
             echo
+            kind::set_current_context
             
"${AIRFLOW_SOURCES}/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh"
         elif [[ ${OPERATION} == "shell" ]]; then
             echo
             echo "Entering an interactive shell for kubernetes testing"
             echo
+            kind::set_current_context
             
"${AIRFLOW_SOURCES}/scripts/ci/kubernetes/ci_run_kubernetes_tests.sh" "-i"
+        elif [[ ${OPERATION} == "k9s" ]]; then
+            echo
+            echo "Starting k9s CLI"
+            echo
+            export TERM=xterm-256color
+            export EDITOR=vim
+            export K9S_EDITOR=vim
+            kind::set_current_context
+            exec docker run --rm -it --network host \
+                -e COLUMNS="$(tput cols)" -e LINES="$(tput lines)" \
+                -e EDITOR -e K9S_EDITOR \
+                -v "${KUBECONFIG}:/root/.kube/config" quay.io/derailed/k9s
         else
             echo >&2
             echo >&2 "Wrong cluster operation: ${OPERATION}. Should be one of: 
${ALLOWED_KIND_OPERATIONS}"
@@ -213,8 +220,7 @@ function kind::perform_kind_cluster_operation() {
             echo "Creating cluster"
             echo
             kind::create_cluster
-        elif [[ ${OPERATION} == "stop" || ${OPERATION} == "deploy" || \
-                ${OPERATION} == "test" || ${OPERATION} == "shell" ]]; then
+        elif [[ ${OPERATION} == "stop" || ${OPERATION} == "deploy" || 
${OPERATION} == "test" || ${OPERATION} == "shell" ]]; then
             echo >&2
             echo >&2 "Cluster ${KIND_CLUSTER_NAME} does not exist. It should 
exist for ${OPERATION} operation"
             echo >&2
@@ -245,7 +251,6 @@ function kind::check_cluster_ready_for_airflow() {
     kubectl create namespace test-namespace --cluster "${KUBECTL_CLUSTER_NAME}"
 }
 
-
 function kind::build_image_for_kubernetes_tests() {
     cd "${AIRFLOW_SOURCES}" || exit 1
     docker build --tag "${AIRFLOW_PROD_IMAGE_KUBERNETES}" . -f - <<EOF
@@ -268,61 +273,37 @@ function kind::load_image_to_kind_cluster() {
     kind load docker-image --name "${KIND_CLUSTER_NAME}" 
"${AIRFLOW_PROD_IMAGE_KUBERNETES}"
 }
 
-MAX_NUM_TRIES_FOR_PORT_FORWARD=12
-readonly MAX_NUM_TRIES_FOR_PORT_FORWARD
+MAX_NUM_TRIES_FOR_HEALTH_CHECK=12
+readonly MAX_NUM_TRIES_FOR_HEALTH_CHECK
 
-SLEEP_TIME_FOR_PORT_FORWARD=10
-readonly SLEEP_TIME_FOR_PORT_FORWARD
-
-forwarded_port_number=8080
-
-function kind::start_kubectl_forward() {
-    echo
-    echo "Trying to forward port ${forwarded_port_number} to 8080 on server"
-    echo
-    kubectl port-forward svc/airflow-webserver "${forwarded_port_number}:8080" 
--namespace airflow >/dev/null &
-}
+SLEEP_TIME_FOR_HEALTH_CHECK=10
+readonly SLEEP_TIME_FOR_HEALTH_CHECK
 
-function kind::stop_kubectl() {
-    echo
-    echo "Stops all kubectl instances"
-    echo
-    killall kubectl || true
-    sleep 10
-    killall -s KILL kubectl || true
+FORWARDED_PORT_NUMBER=8080
+readonly FORWARDED_PORT_NUMBER
 
-}
 
-function kind::forward_port_to_kind_webserver() {
+function kind::wait_for_webserver_healthy() {
     num_tries=0
     set +e
-    kind::start_kubectl_forward
-    sleep "${SLEEP_TIME_FOR_PORT_FORWARD}"
-    while ! curl "http://localhost:${forwarded_port_number}/health"; -s | grep 
-q healthy; do
+    sleep "${SLEEP_TIME_FOR_HEALTH_CHECK}"
+    while ! curl "http://localhost:${FORWARDED_PORT_NUMBER}/health"; -s | grep 
-q healthy; do
         echo
-        echo "Trying to establish port forwarding to 'airflow webserver'"
+        echo "Sleeping ${SLEEP_TIME_FOR_HEALTH_CHECK} while waiting for 
webserver being ready"
         echo
-        if [[ ${INCREASE_PORT_NUMBER_FOR_KUBERNETES=} == "true" ]] ; then
-            forwarded_port_number=$(( forwarded_port_number + 1 ))
+        sleep "${SLEEP_TIME_FOR_HEALTH_CHECK}"
+        num_tries=$((num_tries + 1))
+        if [[ ${num_tries} == "${MAX_NUM_TRIES_FOR_HEALTH_CHECK}" ]]; then
+            >&2 echo
+            >&2 echo "Timeout while waiting for the webserver health check"
+            >&2 echo
         fi
-        if [[ ${num_tries} == "${MAX_NUM_TRIES_FOR_PORT_FORWARD}" ]]; then
-            echo >&2
-            echo >&2 "ERROR! Could not setup a forward port to Airflow's 
webserver after ${num_tries}! Exiting."
-            echo >&2
-            exit 1
-        fi
-        echo
-        echo "Trying to establish port forwarding to 'airflow webserver'"
-        echo
-        kind::start_kubectl_forward
-        sleep "${SLEEP_TIME_FOR_PORT_FORWARD}"
-        num_tries=$(( num_tries + 1))
     done
     echo
-    echo "Connection to 'airflow webserver' established on port 
${forwarded_port_number}"
+    echo "Connection to 'airflow webserver' established on port 
${FORWARDED_PORT_NUMBER}"
     echo
-    initialization::ga_env CLUSTER_FORWARDED_PORT "${forwarded_port_number}"
-    export CLUSTER_FORWARDED_PORT="${forwarded_port_number}"
+    initialization::ga_env CLUSTER_FORWARDED_PORT "${FORWARDED_PORT_NUMBER}"
+    export CLUSTER_FORWARDED_PORT="${FORWARDED_PORT_NUMBER}"
     set -e
 }
 
@@ -348,16 +329,15 @@ function kind::deploy_airflow_with_helm() {
     popd || exit 1
 }
 
-
 function kind::deploy_test_kubernetes_resources() {
     echo
     echo "Deploying Custom kubernetes resources"
     echo
     kubectl apply -f "scripts/ci/kubernetes/volumes.yaml" --namespace default
     kubectl apply -f "scripts/ci/kubernetes/secrets.yaml" --namespace default
+    kubectl apply -f "scripts/ci/kubernetes/nodeport.yaml" --namespace airflow
 }
 
-
 function kind::dump_kubernetes_logs() {
     POD=$(kubectl get pods -o go-template --template '{{range 
.items}}{{.metadata.name}}{{"\n"}}{{end}}' \
         --cluster "${KUBECTL_CLUSTER_NAME}" | grep airflow | head -1)

Reply via email to