This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cc417216b2a Remove deprecated `kubernetes` commands from core (#44826)
cc417216b2a is described below
commit cc417216b2aca3dbe2ce8065cfe7dd4a93dc2a0b
Author: Jed Cunningham <[email protected]>
AuthorDate: Wed Dec 11 10:45:24 2024 -0700
Remove deprecated `kubernetes` commands from core (#44826)
These are deprecated in core and moved to the provider. We should remove
them for Airflow 3.
---
.github/boring-cyborg.yml | 2 +-
airflow/cli/cli_config.py | 37 ---
airflow/cli/commands/hybrid_commands/__init__.py | 16 --
.../commands/hybrid_commands/kubernetes_command.py | 171 -------------
.../check_cncf_k8s_used_for_k8s_executor_only.py | 7 +-
scripts/cov/cli_coverage.py | 1 -
tests/cli/commands/hybrid_commands/__init__.py | 16 --
.../hybrid_commands/test_kubernetes_command.py | 274 ---------------------
tests/deprecations_ignore.yml | 4 -
9 files changed, 3 insertions(+), 525 deletions(-)
diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml
index e48a6537fab..c1b3b63d097 100644
--- a/.github/boring-cyborg.yml
+++ b/.github/boring-cyborg.yml
@@ -155,8 +155,8 @@ labelPRBasedOnFilePath:
- providers/tests/cloudant/**/*
provider:cncf-kubernetes:
- - airflow/**/kubernetes_*.py
- airflow/example_dags/example_kubernetes_executor.py
+ - airflow/example_dags/example_local_kubernetes_executor.py
- providers/src/airflow/providers/cncf/kubernetes/**/*
-
providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py
- docs/apache-airflow-providers-cncf-kubernetes/**/*
diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py
index 0e5dbad1f53..503397064cb 100644
--- a/airflow/cli/cli_config.py
+++ b/airflow/cli/cli_config.py
@@ -862,23 +862,6 @@ ARG_OPTIONAL_SECTION = Arg(
help="The section name",
)
-# kubernetes cleanup-pods
-ARG_NAMESPACE = Arg(
- ("--namespace",),
- default=conf.get("kubernetes_executor", "namespace"),
- help="Kubernetes Namespace. Default value is `[kubernetes] namespace` in
configuration.",
-)
-
-ARG_MIN_PENDING_MINUTES = Arg(
- ("--min-pending-minutes",),
- default=30,
- type=positive_int(allow_zero=False),
- help=(
- "Pending pods created before the time interval are to be cleaned up, "
- "measured in minutes. Default value is 30(m). The minimum value is
5(m)."
- ),
-)
-
# jobs check
ARG_JOB_TYPE_FILTER = Arg(
("--job-type",),
@@ -1752,26 +1735,6 @@ CONFIG_COMMANDS = (
),
)
-KUBERNETES_COMMANDS = (
- ActionCommand(
- name="cleanup-pods",
- help=(
- "Clean up Kubernetes pods "
- "(created by KubernetesExecutor/KubernetesPodOperator) "
- "in evicted/failed/succeeded/pending states"
- ),
-
func=lazy_load_command("airflow.providers.cncf.kubernetes.cli.kubernetes_command.cleanup_pods"),
- args=(ARG_NAMESPACE, ARG_MIN_PENDING_MINUTES, ARG_VERBOSE),
- ),
- ActionCommand(
- name="generate-dag-yaml",
- help="Generate YAML files for all tasks in DAG. Useful for debugging
tasks without "
- "launching into a cluster",
-
func=lazy_load_command("airflow.providers.cncf.kubernetes.cli.kubernetes_command.generate_pod_yaml"),
- args=(ARG_DAG_ID, ARG_LOGICAL_DATE, ARG_SUBDIR, ARG_OUTPUT_PATH,
ARG_VERBOSE),
- ),
-)
-
JOBS_COMMANDS = (
ActionCommand(
name="check",
diff --git a/airflow/cli/commands/hybrid_commands/__init__.py
b/airflow/cli/commands/hybrid_commands/__init__.py
deleted file mode 100644
index 13a83393a91..00000000000
--- a/airflow/cli/commands/hybrid_commands/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/airflow/cli/commands/hybrid_commands/kubernetes_command.py
b/airflow/cli/commands/hybrid_commands/kubernetes_command.py
deleted file mode 100644
index 8f5f7333b7f..00000000000
--- a/airflow/cli/commands/hybrid_commands/kubernetes_command.py
+++ /dev/null
@@ -1,171 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Kubernetes sub-commands."""
-
-from __future__ import annotations
-
-import os
-import sys
-import warnings
-from datetime import datetime, timedelta
-
-from kubernetes import client
-from kubernetes.client.api_client import ApiClient
-from kubernetes.client.rest import ApiException
-
-from airflow.models import DagRun, TaskInstance
-from airflow.providers.cncf.kubernetes import pod_generator
-from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import
KubeConfig
-from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
-from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import
create_unique_id
-from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
-from airflow.utils import cli as cli_utils, yaml
-from airflow.utils.cli import get_dag
-from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
-
-warnings.warn(
- "Use kubernetes command from providers package, Use cncf.kubernetes
provider >= 8.2.1",
- DeprecationWarning,
- stacklevel=2,
-)
-
-
-@cli_utils.action_cli
-@providers_configuration_loaded
-def generate_pod_yaml(args):
- """Generate yaml files for each task in the DAG. Used for testing output
of KubernetesExecutor."""
- logical_date = args.logical_date
- dag = get_dag(subdir=args.subdir, dag_id=args.dag_id)
- yaml_output_path = args.output_path
- dr = DagRun(dag.dag_id, logical_date=logical_date)
- kube_config = KubeConfig()
- for task in dag.tasks:
- ti = TaskInstance(task, None)
- ti.dag_run = dr
- pod = PodGenerator.construct_pod(
- dag_id=args.dag_id,
- task_id=ti.task_id,
- pod_id=create_unique_id(args.dag_id, ti.task_id),
- try_number=ti.try_number,
- kube_image=kube_config.kube_image,
- date=ti.logical_date,
- args=ti.command_as_list(),
- pod_override_object=PodGenerator.from_obj(ti.executor_config),
- scheduler_job_id="worker-config",
- namespace=kube_config.executor_namespace,
-
base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file),
- with_mutation_hook=True,
- )
- api_client = ApiClient()
- date_string =
pod_generator.datetime_to_label_safe_datestring(logical_date)
- yaml_file_name = f"{args.dag_id}_{ti.task_id}_{date_string}.yml"
- os.makedirs(os.path.dirname(yaml_output_path +
"/airflow_yaml_output/"), exist_ok=True)
- with open(yaml_output_path + "/airflow_yaml_output/" + yaml_file_name,
"w") as output:
- sanitized_pod = api_client.sanitize_for_serialization(pod)
- output.write(yaml.dump(sanitized_pod))
- print(f"YAML output can be found at
{yaml_output_path}/airflow_yaml_output/")
-
-
-@cli_utils.action_cli
-@providers_configuration_loaded
-def cleanup_pods(args):
- """Clean up k8s pods in evicted/failed/succeeded/pending states."""
- namespace = args.namespace
-
- min_pending_minutes = args.min_pending_minutes
- # protect newly created pods from deletion
- if min_pending_minutes < 5:
- min_pending_minutes = 5
-
- # https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
- # All Containers in the Pod have terminated in success, and will not be
restarted.
- pod_succeeded = "succeeded"
-
- # The Pod has been accepted by the Kubernetes cluster,
- # but one or more of the containers has not been set up and made ready to
run.
- pod_pending = "pending"
-
- # All Containers in the Pod have terminated, and at least one Container
has terminated in failure.
- # That is, the Container either exited with non-zero status or was
terminated by the system.
- pod_failed = "failed"
-
- # https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/
- pod_reason_evicted = "evicted"
- # If pod is failed and restartPolicy is:
- # * Always: Restart Container; Pod phase stays Running.
- # * OnFailure: Restart Container; Pod phase stays Running.
- # * Never: Pod phase becomes Failed.
- pod_restart_policy_never = "never"
-
- print("Loading Kubernetes configuration")
- kube_client = get_kube_client()
- print(f"Listing pods in namespace {namespace}")
- airflow_pod_labels = [
- "dag_id",
- "task_id",
- "try_number",
- "airflow_version",
- ]
- list_kwargs = {"namespace": namespace, "limit": 500, "label_selector":
",".join(airflow_pod_labels)}
-
- while True:
- pod_list = kube_client.list_namespaced_pod(**list_kwargs)
- for pod in pod_list.items:
- pod_name = pod.metadata.name
- print(f"Inspecting pod {pod_name}")
- pod_phase = pod.status.phase.lower()
- pod_reason = pod.status.reason.lower() if pod.status.reason else ""
- pod_restart_policy = pod.spec.restart_policy.lower()
- current_time = datetime.now(pod.metadata.creation_timestamp.tzinfo)
-
- if (
- pod_phase == pod_succeeded
- or (pod_phase == pod_failed and pod_restart_policy ==
pod_restart_policy_never)
- or (pod_reason == pod_reason_evicted)
- or (
- pod_phase == pod_pending
- and current_time - pod.metadata.creation_timestamp
- > timedelta(minutes=min_pending_minutes)
- )
- ):
- print(
- f'Deleting pod "{pod_name}" phase "{pod_phase}" and reason
"{pod_reason}", '
- f'restart policy "{pod_restart_policy}"'
- )
- try:
- _delete_pod(pod.metadata.name, namespace)
- except ApiException as e:
- print(f"Can't remove POD: {e}", file=sys.stderr)
- else:
- print(f"No action taken on pod {pod_name}")
- continue_token = pod_list.metadata._continue
- if not continue_token:
- break
- list_kwargs["_continue"] = continue_token
-
-
-def _delete_pod(name, namespace):
- """
- Delete a namespaced pod.
-
- Helper Function for cleanup_pods.
- """
- kube_client = get_kube_client()
- delete_options = client.V1DeleteOptions()
- print(f'Deleting POD "{name}" from "{namespace}" namespace')
- api_response = kube_client.delete_namespaced_pod(name=name,
namespace=namespace, body=delete_options)
- print(api_response)
diff --git a/scripts/ci/pre_commit/check_cncf_k8s_used_for_k8s_executor_only.py
b/scripts/ci/pre_commit/check_cncf_k8s_used_for_k8s_executor_only.py
index 6d2355ede4c..a452c648cdf 100755
--- a/scripts/ci/pre_commit/check_cncf_k8s_used_for_k8s_executor_only.py
+++ b/scripts/ci/pre_commit/check_cncf_k8s_used_for_k8s_executor_only.py
@@ -51,8 +51,6 @@ def get_imports(path: str):
errors: list[str] = []
-EXCEPTIONS = ["airflow/cli/commands/hybrid_commands/kubernetes_command.py"]
-
def main() -> int:
for path in sys.argv[1:]:
@@ -62,9 +60,8 @@ def main() -> int:
import_count += 1
if len(imp.module) > 3:
if imp.module[:4] == ["airflow", "providers", "cncf",
"kubernetes"]:
- if path not in EXCEPTIONS:
- local_error_count += 1
- errors.append(f"{path}: ({'.'.join(imp.module)})")
+ local_error_count += 1
+ errors.append(f"{path}: ({'.'.join(imp.module)})")
console.print(f"[blue]{path}:[/] Import count: {import_count},
error_count {local_error_count}")
if errors:
console.print(
diff --git a/scripts/cov/cli_coverage.py b/scripts/cov/cli_coverage.py
index b47b7513609..b8b37423f7a 100644
--- a/scripts/cov/cli_coverage.py
+++ b/scripts/cov/cli_coverage.py
@@ -38,7 +38,6 @@ files_not_fully_covered = [
"airflow/cli/commands/local_commands/db_command.py",
"airflow/cli/commands/local_commands/info_command.py",
"airflow/cli/commands/remote_commands/jobs_command.py",
- "airflow/cli/commands/hybrid_commands/kubernetes_command.py",
"airflow/cli/commands/local_commands/plugins_command.py",
"airflow/cli/commands/remote_commands/pool_command.py",
"airflow/cli/commands/remote_commands/provider_command.py",
diff --git a/tests/cli/commands/hybrid_commands/__init__.py
b/tests/cli/commands/hybrid_commands/__init__.py
deleted file mode 100644
index 13a83393a91..00000000000
--- a/tests/cli/commands/hybrid_commands/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
diff --git a/tests/cli/commands/hybrid_commands/test_kubernetes_command.py
b/tests/cli/commands/hybrid_commands/test_kubernetes_command.py
deleted file mode 100644
index 838bf8fc074..00000000000
--- a/tests/cli/commands/hybrid_commands/test_kubernetes_command.py
+++ /dev/null
@@ -1,274 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import importlib
-import os
-from unittest import mock
-from unittest.mock import MagicMock, call
-
-import kubernetes
-import pytest
-from dateutil.parser import parse
-
-from airflow.cli import cli_parser
-from airflow.cli.commands.hybrid_commands import kubernetes_command
-from airflow.executors import executor_loader
-
-from tests_common.test_utils.config import conf_vars
-
-pytestmark = pytest.mark.db_test
-
-
-class TestGenerateDagYamlCommand:
- @classmethod
- def setup_class(cls):
- with conf_vars({("core", "executor"): "KubernetesExecutor"}):
- importlib.reload(executor_loader)
- importlib.reload(cli_parser)
- cls.parser = cli_parser.get_parser()
-
- def test_generate_dag_yaml(self, tmp_path):
- path = tmp_path /
"miscellaneous_test_dag_run_after_loop_2020-11-03T00_00_00_plus_00_00.yml"
- kubernetes_command.generate_pod_yaml(
- self.parser.parse_args(
- [
- "kubernetes",
- "generate-dag-yaml",
- "miscellaneous_test_dag",
- "2020-11-03",
- "--output-path",
- os.fspath(path.parent),
- ]
- )
- )
- assert sum(1 for _ in path.parent.iterdir()) == 1
- output_path = path.parent / "airflow_yaml_output"
- assert sum(1 for _ in output_path.iterdir()) == 6
- assert os.path.isfile(output_path / path.name)
- assert (output_path / path.name).stat().st_size > 0
-
-
-class TestCleanUpPodsCommand:
- label_selector = "dag_id,task_id,try_number,airflow_version"
-
- @classmethod
- def setup_class(cls):
- with conf_vars({("core", "executor"): "KubernetesExecutor"}):
- importlib.reload(executor_loader)
- importlib.reload(cli_parser)
- cls.parser = cli_parser.get_parser()
-
- @mock.patch("kubernetes.client.CoreV1Api.delete_namespaced_pod")
-
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config")
- def test_delete_pod(self, load_incluster_config, delete_namespaced_pod):
- kubernetes_command._delete_pod("dummy", "awesome-namespace")
- delete_namespaced_pod.assert_called_with(body=mock.ANY, name="dummy",
namespace="awesome-namespace")
- load_incluster_config.assert_called_once()
-
-
@mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod")
- @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod")
-
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config")
- def test_running_pods_are_not_cleaned(self, load_incluster_config,
list_namespaced_pod, delete_pod):
- pod1 = MagicMock()
- pod1.metadata.name = "dummy"
- pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z")
- pod1.status.phase = "Running"
- pod1.status.reason = None
- pods = MagicMock()
- pods.metadata._continue = None
- pods.items = [pod1]
- list_namespaced_pod.return_value = pods
- kubernetes_command.cleanup_pods(
- self.parser.parse_args(["kubernetes", "cleanup-pods",
"--namespace", "awesome-namespace"])
- )
- list_namespaced_pod.assert_called_once_with(
- namespace="awesome-namespace", limit=500,
label_selector=self.label_selector
- )
- delete_pod.assert_not_called()
- load_incluster_config.assert_called_once()
-
-
@mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod")
- @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod")
-
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config")
- def test_cleanup_succeeded_pods(self, load_incluster_config,
list_namespaced_pod, delete_pod):
- pod1 = MagicMock()
- pod1.metadata.name = "dummy"
- pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z")
- pod1.status.phase = "Succeeded"
- pod1.status.reason = None
- pods = MagicMock()
- pods.metadata._continue = None
- pods.items = [pod1]
- list_namespaced_pod.return_value = pods
- kubernetes_command.cleanup_pods(
- self.parser.parse_args(["kubernetes", "cleanup-pods",
"--namespace", "awesome-namespace"])
- )
- list_namespaced_pod.assert_called_once_with(
- namespace="awesome-namespace", limit=500,
label_selector=self.label_selector
- )
- delete_pod.assert_called_with("dummy", "awesome-namespace")
- load_incluster_config.assert_called_once()
-
-
@mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod")
- @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod")
- @mock.patch("kubernetes.config.load_incluster_config")
- def test_no_cleanup_failed_pods_wo_restart_policy_never(
- self, load_incluster_config, list_namespaced_pod, delete_pod
- ):
- pod1 = MagicMock()
- pod1.metadata.name = "dummy2"
- pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z")
- pod1.status.phase = "Failed"
- pod1.status.reason = None
- pod1.spec.restart_policy = "Always"
- pods = MagicMock()
- pods.metadata._continue = None
- pods.items = [pod1]
- list_namespaced_pod.return_value = pods
- kubernetes_command.cleanup_pods(
- self.parser.parse_args(["kubernetes", "cleanup-pods",
"--namespace", "awesome-namespace"])
- )
- list_namespaced_pod.assert_called_once_with(
- namespace="awesome-namespace", limit=500,
label_selector=self.label_selector
- )
- delete_pod.assert_not_called()
- load_incluster_config.assert_called_once()
-
-
@mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod")
- @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod")
- @mock.patch("kubernetes.config.load_incluster_config")
- def test_cleanup_failed_pods_w_restart_policy_never(
- self, load_incluster_config, list_namespaced_pod, delete_pod
- ):
- pod1 = MagicMock()
- pod1.metadata.name = "dummy3"
- pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z")
- pod1.status.phase = "Failed"
- pod1.status.reason = None
- pod1.spec.restart_policy = "Never"
- pods = MagicMock()
- pods.metadata._continue = None
- pods.items = [pod1]
- list_namespaced_pod.return_value = pods
- kubernetes_command.cleanup_pods(
- self.parser.parse_args(["kubernetes", "cleanup-pods",
"--namespace", "awesome-namespace"])
- )
- list_namespaced_pod.assert_called_once_with(
- namespace="awesome-namespace", limit=500,
label_selector=self.label_selector
- )
- delete_pod.assert_called_with("dummy3", "awesome-namespace")
- load_incluster_config.assert_called_once()
-
-
@mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod")
- @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod")
- @mock.patch("kubernetes.config.load_incluster_config")
- def test_cleanup_evicted_pods(self, load_incluster_config,
list_namespaced_pod, delete_pod):
- pod1 = MagicMock()
- pod1.metadata.name = "dummy4"
- pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z")
- pod1.status.phase = "Failed"
- pod1.status.reason = "Evicted"
- pod1.spec.restart_policy = "Never"
- pods = MagicMock()
- pods.metadata._continue = None
- pods.items = [pod1]
- list_namespaced_pod.return_value = pods
- kubernetes_command.cleanup_pods(
- self.parser.parse_args(["kubernetes", "cleanup-pods",
"--namespace", "awesome-namespace"])
- )
- list_namespaced_pod.assert_called_once_with(
- namespace="awesome-namespace", limit=500,
label_selector=self.label_selector
- )
- delete_pod.assert_called_with("dummy4", "awesome-namespace")
- load_incluster_config.assert_called_once()
-
-
@mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod")
- @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod")
- @mock.patch("kubernetes.config.load_incluster_config")
- def test_cleanup_pending_pods(self, load_incluster_config,
list_namespaced_pod, delete_pod):
- pod1 = MagicMock()
- pod1.metadata.name = "dummy5"
- pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z")
- pod1.status.phase = "Pending"
- pod1.status.reason = "Unschedulable"
- pods = MagicMock()
- pods.metadata._continue = None
- pods.items = [pod1]
- list_namespaced_pod.return_value = pods
- kubernetes_command.cleanup_pods(
- self.parser.parse_args(["kubernetes", "cleanup-pods",
"--namespace", "awesome-namespace"])
- )
- list_namespaced_pod.assert_called_once_with(
- namespace="awesome-namespace", limit=500,
label_selector=self.label_selector
- )
- delete_pod.assert_called_with("dummy5", "awesome-namespace")
- load_incluster_config.assert_called_once()
-
-
@mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod")
- @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod")
- @mock.patch("kubernetes.config.load_incluster_config")
- def test_cleanup_api_exception_continue(self, load_incluster_config,
list_namespaced_pod, delete_pod):
- delete_pod.side_effect = kubernetes.client.rest.ApiException(status=0)
- pod1 = MagicMock()
- pod1.metadata.name = "dummy"
- pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z")
- pod1.status.phase = "Succeeded"
- pod1.status.reason = None
- pods = MagicMock()
- pods.metadata._continue = None
- pods.items = [pod1]
- list_namespaced_pod.return_value = pods
- kubernetes_command.cleanup_pods(
- self.parser.parse_args(["kubernetes", "cleanup-pods",
"--namespace", "awesome-namespace"])
- )
- list_namespaced_pod.assert_called_once_with(
- namespace="awesome-namespace", limit=500,
label_selector=self.label_selector
- )
- load_incluster_config.assert_called_once()
-
-
@mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod")
- @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod")
- @mock.patch("kubernetes.config.load_incluster_config")
- def test_list_pod_with_continue_token(self, load_incluster_config,
list_namespaced_pod, delete_pod):
- pod1 = MagicMock()
- pod1.metadata.name = "dummy"
- pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z")
- pod1.status.phase = "Succeeded"
- pod1.status.reason = None
- pods = MagicMock()
- pods.metadata._continue = "dummy-token"
- pods.items = [pod1]
- next_pods = MagicMock()
- next_pods.metadata._continue = None
- next_pods.items = [pod1]
- list_namespaced_pod.side_effect = [pods, next_pods]
- kubernetes_command.cleanup_pods(
- self.parser.parse_args(["kubernetes", "cleanup-pods",
"--namespace", "awesome-namespace"])
- )
- calls = [
- call.first(namespace="awesome-namespace", limit=500,
label_selector=self.label_selector),
- call.second(
- namespace="awesome-namespace",
- limit=500,
- label_selector=self.label_selector,
- _continue="dummy-token",
- ),
- ]
- list_namespaced_pod.assert_has_calls(calls)
- delete_pod.assert_called_with("dummy", "awesome-namespace")
- load_incluster_config.assert_called_once()
diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml
index 99e395bba92..51b6306f4c0 100644
--- a/tests/deprecations_ignore.yml
+++ b/tests/deprecations_ignore.yml
@@ -16,10 +16,6 @@
# under the License.
---
-# CLI
-# https://github.com/apache/airflow/issues/39199
--
tests/cli/commands/hybrid_commands/test_kubernetes_command.py::TestGenerateDagYamlCommand::test_generate_dag_yaml
-
# Core
- tests/models/test_dagbag.py::TestDagBag::test_load_subdags