This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new e3897dcbed Remove compat code for 2.7.0 - its now the min Airflow version (#39591) e3897dcbed is described below commit e3897dcbed0262b0cab7a357f8d7fbbb6c4f4eeb Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> AuthorDate: Mon May 13 16:01:44 2024 -0600 Remove compat code for 2.7.0 - its now the min Airflow version (#39591) --- .../providers/celery/executors/celery_executor.py | 44 ++++++--------------- .../kubernetes/executors/kubernetes_executor.py | 46 +++++++--------------- .../executors/kubernetes_executor_utils.py | 19 +-------- 3 files changed, 29 insertions(+), 80 deletions(-) diff --git a/airflow/providers/celery/executors/celery_executor.py b/airflow/providers/celery/executors/celery_executor.py index d81e57a222..9f1948db52 100644 --- a/airflow/providers/celery/executors/celery_executor.py +++ b/airflow/providers/celery/executors/celery_executor.py @@ -37,37 +37,19 @@ from celery import states as celery_states from packaging.version import Version from airflow import __version__ as airflow_version - -try: - from airflow.cli.cli_config import ( - ARG_DAEMON, - ARG_LOG_FILE, - ARG_PID, - ARG_SKIP_SERVE_LOGS, - ARG_STDERR, - ARG_STDOUT, - ARG_VERBOSE, - ActionCommand, - Arg, - GroupCommand, - lazy_load_command, - ) -except ImportError: - import packaging.version - - from airflow.exceptions import AirflowOptionalProviderFeatureException - - base_version = packaging.version.parse(airflow_version).base_version - - if packaging.version.parse(base_version) < packaging.version.parse("2.7.0"): - raise AirflowOptionalProviderFeatureException( - "Celery Executor from Celery Provider should only be used with Airflow 2.7.0+.\n" - f"This is Airflow {airflow_version} and Celery and CeleryKubernetesExecutor are " - f"available in the 'airflow.executors' package. You should not use " - f"the provider's executors in this version of Airflow." - ) - raise - +from airflow.cli.cli_config import ( + ARG_DAEMON, + ARG_LOG_FILE, + ARG_PID, + ARG_SKIP_SERVE_LOGS, + ARG_STDERR, + ARG_STDOUT, + ARG_VERBOSE, + ActionCommand, + Arg, + GroupCommand, + lazy_load_command, +) from airflow.configuration import conf from airflow.exceptions import AirflowTaskTimeout from airflow.executors.base_executor import BaseExecutor diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index f62f021fd2..43cefeb9c4 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -38,38 +38,18 @@ from typing import TYPE_CHECKING, Any, Sequence from kubernetes.dynamic import DynamicClient from sqlalchemy import select, update -from airflow.providers.cncf.kubernetes.pod_generator import PodMutationHookException, PodReconciliationError -from airflow.stats import Stats - -try: - from airflow.cli.cli_config import ( - ARG_DAG_ID, - ARG_EXECUTION_DATE, - ARG_OUTPUT_PATH, - ARG_SUBDIR, - ARG_VERBOSE, - ActionCommand, - Arg, - GroupCommand, - lazy_load_command, - positive_int, - ) -except ImportError: - import packaging.version - - from airflow import __version__ as airflow_version - from airflow.exceptions import AirflowOptionalProviderFeatureException - - base_version = packaging.version.parse(airflow_version).base_version - - if packaging.version.parse(base_version) < packaging.version.parse("2.7.0"): - raise AirflowOptionalProviderFeatureException( - "Kubernetes Executor from CNCF Provider should only be used with Airflow 2.7.0+.\n" - f"This is Airflow {airflow_version} and Kubernetes and CeleryKubernetesExecutor are " - f"available in the 'airflow.executors' package. You should not use " - f"the provider's executors in this version of Airflow." - ) - raise +from airflow.cli.cli_config import ( + ARG_DAG_ID, + ARG_EXECUTION_DATE, + ARG_OUTPUT_PATH, + ARG_SUBDIR, + ARG_VERBOSE, + ActionCommand, + Arg, + GroupCommand, + lazy_load_command, + positive_int, +) from airflow.configuration import conf from airflow.executors.base_executor import BaseExecutor from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( @@ -78,6 +58,8 @@ from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types impor ) from airflow.providers.cncf.kubernetes.kube_config import KubeConfig from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import annotations_to_key +from airflow.providers.cncf.kubernetes.pod_generator import PodMutationHookException, PodReconciliationError +from airflow.stats import Stats from airflow.utils.event_scheduler import EventScheduler from airflow.utils.log.logging_mixin import remove_escape_codes from airflow.utils.session import NEW_SESSION, provide_session diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 3f544fe2fe..d26df876ef 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -21,7 +21,7 @@ import json import multiprocessing import time from queue import Empty, Queue -from typing import TYPE_CHECKING, Any, Generic, TypeVar +from typing import TYPE_CHECKING, Any from kubernetes import client, watch from kubernetes.client.rest import ApiException @@ -36,6 +36,7 @@ from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import ( ) from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.singleton import Singleton from airflow.utils.state import TaskInstanceState try: @@ -60,22 +61,6 @@ if TYPE_CHECKING: KubernetesWatchType, ) -# Singleton here is duplicated version of airflow.utils.singleton.Singleton until -# min-airflow version is 2.7.0 for the provider. then it can be imported from airflow.utils.singleton. - -T = TypeVar("T") - - -class Singleton(type, Generic[T]): - """Metaclass that allows to implement singleton pattern.""" - - _instances: dict[Singleton[T], T] = {} - - def __call__(cls: Singleton[T], *args, **kwargs) -> T: - if cls not in cls._instances: - cls._instances[cls] = super().__call__(*args, **kwargs) - return cls._instances[cls] - class ResourceVersion(metaclass=Singleton): """Singleton for tracking resourceVersion from Kubernetes."""