This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new cc417216b2a Remove deprecated `kubernetes` commands from core (#44826) cc417216b2a is described below commit cc417216b2aca3dbe2ce8065cfe7dd4a93dc2a0b Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> AuthorDate: Wed Dec 11 10:45:24 2024 -0700 Remove deprecated `kubernetes` commands from core (#44826) These are deprecated in core and moved to the provider. We should remove them for Airflow 3. --- .github/boring-cyborg.yml | 2 +- airflow/cli/cli_config.py | 37 --- airflow/cli/commands/hybrid_commands/__init__.py | 16 -- .../commands/hybrid_commands/kubernetes_command.py | 171 ------------- .../check_cncf_k8s_used_for_k8s_executor_only.py | 7 +- scripts/cov/cli_coverage.py | 1 - tests/cli/commands/hybrid_commands/__init__.py | 16 -- .../hybrid_commands/test_kubernetes_command.py | 274 --------------------- tests/deprecations_ignore.yml | 4 - 9 files changed, 3 insertions(+), 525 deletions(-) diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml index e48a6537fab..c1b3b63d097 100644 --- a/.github/boring-cyborg.yml +++ b/.github/boring-cyborg.yml @@ -155,8 +155,8 @@ labelPRBasedOnFilePath: - providers/tests/cloudant/**/* provider:cncf-kubernetes: - - airflow/**/kubernetes_*.py - airflow/example_dags/example_kubernetes_executor.py + - airflow/example_dags/example_local_kubernetes_executor.py - providers/src/airflow/providers/cncf/kubernetes/**/* - providers/src/airflow/providers/celery/executors/celery_kubernetes_executor.py - docs/apache-airflow-providers-cncf-kubernetes/**/* diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 0e5dbad1f53..503397064cb 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -862,23 +862,6 @@ ARG_OPTIONAL_SECTION = Arg( help="The section name", ) -# kubernetes cleanup-pods -ARG_NAMESPACE = Arg( - ("--namespace",), - default=conf.get("kubernetes_executor", "namespace"), - help="Kubernetes Namespace. Default value is `[kubernetes] namespace` in configuration.", -) - -ARG_MIN_PENDING_MINUTES = Arg( - ("--min-pending-minutes",), - default=30, - type=positive_int(allow_zero=False), - help=( - "Pending pods created before the time interval are to be cleaned up, " - "measured in minutes. Default value is 30(m). The minimum value is 5(m)." - ), -) - # jobs check ARG_JOB_TYPE_FILTER = Arg( ("--job-type",), @@ -1752,26 +1735,6 @@ CONFIG_COMMANDS = ( ), ) -KUBERNETES_COMMANDS = ( - ActionCommand( - name="cleanup-pods", - help=( - "Clean up Kubernetes pods " - "(created by KubernetesExecutor/KubernetesPodOperator) " - "in evicted/failed/succeeded/pending states" - ), - func=lazy_load_command("airflow.providers.cncf.kubernetes.cli.kubernetes_command.cleanup_pods"), - args=(ARG_NAMESPACE, ARG_MIN_PENDING_MINUTES, ARG_VERBOSE), - ), - ActionCommand( - name="generate-dag-yaml", - help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without " - "launching into a cluster", - func=lazy_load_command("airflow.providers.cncf.kubernetes.cli.kubernetes_command.generate_pod_yaml"), - args=(ARG_DAG_ID, ARG_LOGICAL_DATE, ARG_SUBDIR, ARG_OUTPUT_PATH, ARG_VERBOSE), - ), -) - JOBS_COMMANDS = ( ActionCommand( name="check", diff --git a/airflow/cli/commands/hybrid_commands/__init__.py b/airflow/cli/commands/hybrid_commands/__init__.py deleted file mode 100644 index 13a83393a91..00000000000 --- a/airflow/cli/commands/hybrid_commands/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/airflow/cli/commands/hybrid_commands/kubernetes_command.py b/airflow/cli/commands/hybrid_commands/kubernetes_command.py deleted file mode 100644 index 8f5f7333b7f..00000000000 --- a/airflow/cli/commands/hybrid_commands/kubernetes_command.py +++ /dev/null @@ -1,171 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Kubernetes sub-commands.""" - -from __future__ import annotations - -import os -import sys -import warnings -from datetime import datetime, timedelta - -from kubernetes import client -from kubernetes.client.api_client import ApiClient -from kubernetes.client.rest import ApiException - -from airflow.models import DagRun, TaskInstance -from airflow.providers.cncf.kubernetes import pod_generator -from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubeConfig -from airflow.providers.cncf.kubernetes.kube_client import get_kube_client -from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_unique_id -from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator -from airflow.utils import cli as cli_utils, yaml -from airflow.utils.cli import get_dag -from airflow.utils.providers_configuration_loader import providers_configuration_loaded - -warnings.warn( - "Use kubernetes command from providers package, Use cncf.kubernetes provider >= 8.2.1", - DeprecationWarning, - stacklevel=2, -) - - -@cli_utils.action_cli -@providers_configuration_loaded -def generate_pod_yaml(args): - """Generate yaml files for each task in the DAG. Used for testing output of KubernetesExecutor.""" - logical_date = args.logical_date - dag = get_dag(subdir=args.subdir, dag_id=args.dag_id) - yaml_output_path = args.output_path - dr = DagRun(dag.dag_id, logical_date=logical_date) - kube_config = KubeConfig() - for task in dag.tasks: - ti = TaskInstance(task, None) - ti.dag_run = dr - pod = PodGenerator.construct_pod( - dag_id=args.dag_id, - task_id=ti.task_id, - pod_id=create_unique_id(args.dag_id, ti.task_id), - try_number=ti.try_number, - kube_image=kube_config.kube_image, - date=ti.logical_date, - args=ti.command_as_list(), - pod_override_object=PodGenerator.from_obj(ti.executor_config), - scheduler_job_id="worker-config", - namespace=kube_config.executor_namespace, - base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file), - with_mutation_hook=True, - ) - api_client = ApiClient() - date_string = pod_generator.datetime_to_label_safe_datestring(logical_date) - yaml_file_name = f"{args.dag_id}_{ti.task_id}_{date_string}.yml" - os.makedirs(os.path.dirname(yaml_output_path + "/airflow_yaml_output/"), exist_ok=True) - with open(yaml_output_path + "/airflow_yaml_output/" + yaml_file_name, "w") as output: - sanitized_pod = api_client.sanitize_for_serialization(pod) - output.write(yaml.dump(sanitized_pod)) - print(f"YAML output can be found at {yaml_output_path}/airflow_yaml_output/") - - -@cli_utils.action_cli -@providers_configuration_loaded -def cleanup_pods(args): - """Clean up k8s pods in evicted/failed/succeeded/pending states.""" - namespace = args.namespace - - min_pending_minutes = args.min_pending_minutes - # protect newly created pods from deletion - if min_pending_minutes < 5: - min_pending_minutes = 5 - - # https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ - # All Containers in the Pod have terminated in success, and will not be restarted. - pod_succeeded = "succeeded" - - # The Pod has been accepted by the Kubernetes cluster, - # but one or more of the containers has not been set up and made ready to run. - pod_pending = "pending" - - # All Containers in the Pod have terminated, and at least one Container has terminated in failure. - # That is, the Container either exited with non-zero status or was terminated by the system. - pod_failed = "failed" - - # https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/ - pod_reason_evicted = "evicted" - # If pod is failed and restartPolicy is: - # * Always: Restart Container; Pod phase stays Running. - # * OnFailure: Restart Container; Pod phase stays Running. - # * Never: Pod phase becomes Failed. - pod_restart_policy_never = "never" - - print("Loading Kubernetes configuration") - kube_client = get_kube_client() - print(f"Listing pods in namespace {namespace}") - airflow_pod_labels = [ - "dag_id", - "task_id", - "try_number", - "airflow_version", - ] - list_kwargs = {"namespace": namespace, "limit": 500, "label_selector": ",".join(airflow_pod_labels)} - - while True: - pod_list = kube_client.list_namespaced_pod(**list_kwargs) - for pod in pod_list.items: - pod_name = pod.metadata.name - print(f"Inspecting pod {pod_name}") - pod_phase = pod.status.phase.lower() - pod_reason = pod.status.reason.lower() if pod.status.reason else "" - pod_restart_policy = pod.spec.restart_policy.lower() - current_time = datetime.now(pod.metadata.creation_timestamp.tzinfo) - - if ( - pod_phase == pod_succeeded - or (pod_phase == pod_failed and pod_restart_policy == pod_restart_policy_never) - or (pod_reason == pod_reason_evicted) - or ( - pod_phase == pod_pending - and current_time - pod.metadata.creation_timestamp - > timedelta(minutes=min_pending_minutes) - ) - ): - print( - f'Deleting pod "{pod_name}" phase "{pod_phase}" and reason "{pod_reason}", ' - f'restart policy "{pod_restart_policy}"' - ) - try: - _delete_pod(pod.metadata.name, namespace) - except ApiException as e: - print(f"Can't remove POD: {e}", file=sys.stderr) - else: - print(f"No action taken on pod {pod_name}") - continue_token = pod_list.metadata._continue - if not continue_token: - break - list_kwargs["_continue"] = continue_token - - -def _delete_pod(name, namespace): - """ - Delete a namespaced pod. - - Helper Function for cleanup_pods. - """ - kube_client = get_kube_client() - delete_options = client.V1DeleteOptions() - print(f'Deleting POD "{name}" from "{namespace}" namespace') - api_response = kube_client.delete_namespaced_pod(name=name, namespace=namespace, body=delete_options) - print(api_response) diff --git a/scripts/ci/pre_commit/check_cncf_k8s_used_for_k8s_executor_only.py b/scripts/ci/pre_commit/check_cncf_k8s_used_for_k8s_executor_only.py index 6d2355ede4c..a452c648cdf 100755 --- a/scripts/ci/pre_commit/check_cncf_k8s_used_for_k8s_executor_only.py +++ b/scripts/ci/pre_commit/check_cncf_k8s_used_for_k8s_executor_only.py @@ -51,8 +51,6 @@ def get_imports(path: str): errors: list[str] = [] -EXCEPTIONS = ["airflow/cli/commands/hybrid_commands/kubernetes_command.py"] - def main() -> int: for path in sys.argv[1:]: @@ -62,9 +60,8 @@ def main() -> int: import_count += 1 if len(imp.module) > 3: if imp.module[:4] == ["airflow", "providers", "cncf", "kubernetes"]: - if path not in EXCEPTIONS: - local_error_count += 1 - errors.append(f"{path}: ({'.'.join(imp.module)})") + local_error_count += 1 + errors.append(f"{path}: ({'.'.join(imp.module)})") console.print(f"[blue]{path}:[/] Import count: {import_count}, error_count {local_error_count}") if errors: console.print( diff --git a/scripts/cov/cli_coverage.py b/scripts/cov/cli_coverage.py index b47b7513609..b8b37423f7a 100644 --- a/scripts/cov/cli_coverage.py +++ b/scripts/cov/cli_coverage.py @@ -38,7 +38,6 @@ files_not_fully_covered = [ "airflow/cli/commands/local_commands/db_command.py", "airflow/cli/commands/local_commands/info_command.py", "airflow/cli/commands/remote_commands/jobs_command.py", - "airflow/cli/commands/hybrid_commands/kubernetes_command.py", "airflow/cli/commands/local_commands/plugins_command.py", "airflow/cli/commands/remote_commands/pool_command.py", "airflow/cli/commands/remote_commands/provider_command.py", diff --git a/tests/cli/commands/hybrid_commands/__init__.py b/tests/cli/commands/hybrid_commands/__init__.py deleted file mode 100644 index 13a83393a91..00000000000 --- a/tests/cli/commands/hybrid_commands/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/tests/cli/commands/hybrid_commands/test_kubernetes_command.py b/tests/cli/commands/hybrid_commands/test_kubernetes_command.py deleted file mode 100644 index 838bf8fc074..00000000000 --- a/tests/cli/commands/hybrid_commands/test_kubernetes_command.py +++ /dev/null @@ -1,274 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import importlib -import os -from unittest import mock -from unittest.mock import MagicMock, call - -import kubernetes -import pytest -from dateutil.parser import parse - -from airflow.cli import cli_parser -from airflow.cli.commands.hybrid_commands import kubernetes_command -from airflow.executors import executor_loader - -from tests_common.test_utils.config import conf_vars - -pytestmark = pytest.mark.db_test - - -class TestGenerateDagYamlCommand: - @classmethod - def setup_class(cls): - with conf_vars({("core", "executor"): "KubernetesExecutor"}): - importlib.reload(executor_loader) - importlib.reload(cli_parser) - cls.parser = cli_parser.get_parser() - - def test_generate_dag_yaml(self, tmp_path): - path = tmp_path / "miscellaneous_test_dag_run_after_loop_2020-11-03T00_00_00_plus_00_00.yml" - kubernetes_command.generate_pod_yaml( - self.parser.parse_args( - [ - "kubernetes", - "generate-dag-yaml", - "miscellaneous_test_dag", - "2020-11-03", - "--output-path", - os.fspath(path.parent), - ] - ) - ) - assert sum(1 for _ in path.parent.iterdir()) == 1 - output_path = path.parent / "airflow_yaml_output" - assert sum(1 for _ in output_path.iterdir()) == 6 - assert os.path.isfile(output_path / path.name) - assert (output_path / path.name).stat().st_size > 0 - - -class TestCleanUpPodsCommand: - label_selector = "dag_id,task_id,try_number,airflow_version" - - @classmethod - def setup_class(cls): - with conf_vars({("core", "executor"): "KubernetesExecutor"}): - importlib.reload(executor_loader) - importlib.reload(cli_parser) - cls.parser = cli_parser.get_parser() - - @mock.patch("kubernetes.client.CoreV1Api.delete_namespaced_pod") - @mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config") - def test_delete_pod(self, load_incluster_config, delete_namespaced_pod): - kubernetes_command._delete_pod("dummy", "awesome-namespace") - delete_namespaced_pod.assert_called_with(body=mock.ANY, name="dummy", namespace="awesome-namespace") - load_incluster_config.assert_called_once() - - @mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod") - @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") - @mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config") - def test_running_pods_are_not_cleaned(self, load_incluster_config, list_namespaced_pod, delete_pod): - pod1 = MagicMock() - pod1.metadata.name = "dummy" - pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") - pod1.status.phase = "Running" - pod1.status.reason = None - pods = MagicMock() - pods.metadata._continue = None - pods.items = [pod1] - list_namespaced_pod.return_value = pods - kubernetes_command.cleanup_pods( - self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) - ) - list_namespaced_pod.assert_called_once_with( - namespace="awesome-namespace", limit=500, label_selector=self.label_selector - ) - delete_pod.assert_not_called() - load_incluster_config.assert_called_once() - - @mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod") - @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") - @mock.patch("airflow.providers.cncf.kubernetes.kube_client.config.load_incluster_config") - def test_cleanup_succeeded_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): - pod1 = MagicMock() - pod1.metadata.name = "dummy" - pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") - pod1.status.phase = "Succeeded" - pod1.status.reason = None - pods = MagicMock() - pods.metadata._continue = None - pods.items = [pod1] - list_namespaced_pod.return_value = pods - kubernetes_command.cleanup_pods( - self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) - ) - list_namespaced_pod.assert_called_once_with( - namespace="awesome-namespace", limit=500, label_selector=self.label_selector - ) - delete_pod.assert_called_with("dummy", "awesome-namespace") - load_incluster_config.assert_called_once() - - @mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod") - @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") - @mock.patch("kubernetes.config.load_incluster_config") - def test_no_cleanup_failed_pods_wo_restart_policy_never( - self, load_incluster_config, list_namespaced_pod, delete_pod - ): - pod1 = MagicMock() - pod1.metadata.name = "dummy2" - pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") - pod1.status.phase = "Failed" - pod1.status.reason = None - pod1.spec.restart_policy = "Always" - pods = MagicMock() - pods.metadata._continue = None - pods.items = [pod1] - list_namespaced_pod.return_value = pods - kubernetes_command.cleanup_pods( - self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) - ) - list_namespaced_pod.assert_called_once_with( - namespace="awesome-namespace", limit=500, label_selector=self.label_selector - ) - delete_pod.assert_not_called() - load_incluster_config.assert_called_once() - - @mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod") - @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") - @mock.patch("kubernetes.config.load_incluster_config") - def test_cleanup_failed_pods_w_restart_policy_never( - self, load_incluster_config, list_namespaced_pod, delete_pod - ): - pod1 = MagicMock() - pod1.metadata.name = "dummy3" - pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") - pod1.status.phase = "Failed" - pod1.status.reason = None - pod1.spec.restart_policy = "Never" - pods = MagicMock() - pods.metadata._continue = None - pods.items = [pod1] - list_namespaced_pod.return_value = pods - kubernetes_command.cleanup_pods( - self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) - ) - list_namespaced_pod.assert_called_once_with( - namespace="awesome-namespace", limit=500, label_selector=self.label_selector - ) - delete_pod.assert_called_with("dummy3", "awesome-namespace") - load_incluster_config.assert_called_once() - - @mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod") - @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") - @mock.patch("kubernetes.config.load_incluster_config") - def test_cleanup_evicted_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): - pod1 = MagicMock() - pod1.metadata.name = "dummy4" - pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") - pod1.status.phase = "Failed" - pod1.status.reason = "Evicted" - pod1.spec.restart_policy = "Never" - pods = MagicMock() - pods.metadata._continue = None - pods.items = [pod1] - list_namespaced_pod.return_value = pods - kubernetes_command.cleanup_pods( - self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) - ) - list_namespaced_pod.assert_called_once_with( - namespace="awesome-namespace", limit=500, label_selector=self.label_selector - ) - delete_pod.assert_called_with("dummy4", "awesome-namespace") - load_incluster_config.assert_called_once() - - @mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod") - @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") - @mock.patch("kubernetes.config.load_incluster_config") - def test_cleanup_pending_pods(self, load_incluster_config, list_namespaced_pod, delete_pod): - pod1 = MagicMock() - pod1.metadata.name = "dummy5" - pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") - pod1.status.phase = "Pending" - pod1.status.reason = "Unschedulable" - pods = MagicMock() - pods.metadata._continue = None - pods.items = [pod1] - list_namespaced_pod.return_value = pods - kubernetes_command.cleanup_pods( - self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) - ) - list_namespaced_pod.assert_called_once_with( - namespace="awesome-namespace", limit=500, label_selector=self.label_selector - ) - delete_pod.assert_called_with("dummy5", "awesome-namespace") - load_incluster_config.assert_called_once() - - @mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod") - @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") - @mock.patch("kubernetes.config.load_incluster_config") - def test_cleanup_api_exception_continue(self, load_incluster_config, list_namespaced_pod, delete_pod): - delete_pod.side_effect = kubernetes.client.rest.ApiException(status=0) - pod1 = MagicMock() - pod1.metadata.name = "dummy" - pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") - pod1.status.phase = "Succeeded" - pod1.status.reason = None - pods = MagicMock() - pods.metadata._continue = None - pods.items = [pod1] - list_namespaced_pod.return_value = pods - kubernetes_command.cleanup_pods( - self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) - ) - list_namespaced_pod.assert_called_once_with( - namespace="awesome-namespace", limit=500, label_selector=self.label_selector - ) - load_incluster_config.assert_called_once() - - @mock.patch("airflow.cli.commands.hybrid_commands.kubernetes_command._delete_pod") - @mock.patch("kubernetes.client.CoreV1Api.list_namespaced_pod") - @mock.patch("kubernetes.config.load_incluster_config") - def test_list_pod_with_continue_token(self, load_incluster_config, list_namespaced_pod, delete_pod): - pod1 = MagicMock() - pod1.metadata.name = "dummy" - pod1.metadata.creation_timestamp = parse("2021-12-20T08:01:07Z") - pod1.status.phase = "Succeeded" - pod1.status.reason = None - pods = MagicMock() - pods.metadata._continue = "dummy-token" - pods.items = [pod1] - next_pods = MagicMock() - next_pods.metadata._continue = None - next_pods.items = [pod1] - list_namespaced_pod.side_effect = [pods, next_pods] - kubernetes_command.cleanup_pods( - self.parser.parse_args(["kubernetes", "cleanup-pods", "--namespace", "awesome-namespace"]) - ) - calls = [ - call.first(namespace="awesome-namespace", limit=500, label_selector=self.label_selector), - call.second( - namespace="awesome-namespace", - limit=500, - label_selector=self.label_selector, - _continue="dummy-token", - ), - ] - list_namespaced_pod.assert_has_calls(calls) - delete_pod.assert_called_with("dummy", "awesome-namespace") - load_incluster_config.assert_called_once() diff --git a/tests/deprecations_ignore.yml b/tests/deprecations_ignore.yml index 99e395bba92..51b6306f4c0 100644 --- a/tests/deprecations_ignore.yml +++ b/tests/deprecations_ignore.yml @@ -16,10 +16,6 @@ # under the License. --- -# CLI -# https://github.com/apache/airflow/issues/39199 -- tests/cli/commands/hybrid_commands/test_kubernetes_command.py::TestGenerateDagYamlCommand::test_generate_dag_yaml - # Core - tests/models/test_dagbag.py::TestDagBag::test_load_subdags