This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e3897dcbed Remove compat code for 2.7.0 - its now the min Airflow 
version (#39591)
e3897dcbed is described below

commit e3897dcbed0262b0cab7a357f8d7fbbb6c4f4eeb
Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com>
AuthorDate: Mon May 13 16:01:44 2024 -0600

    Remove compat code for 2.7.0 - its now the min Airflow version (#39591)
---
 .../providers/celery/executors/celery_executor.py  | 44 ++++++---------------
 .../kubernetes/executors/kubernetes_executor.py    | 46 +++++++---------------
 .../executors/kubernetes_executor_utils.py         | 19 +--------
 3 files changed, 29 insertions(+), 80 deletions(-)

diff --git a/airflow/providers/celery/executors/celery_executor.py 
b/airflow/providers/celery/executors/celery_executor.py
index d81e57a222..9f1948db52 100644
--- a/airflow/providers/celery/executors/celery_executor.py
+++ b/airflow/providers/celery/executors/celery_executor.py
@@ -37,37 +37,19 @@ from celery import states as celery_states
 from packaging.version import Version
 
 from airflow import __version__ as airflow_version
-
-try:
-    from airflow.cli.cli_config import (
-        ARG_DAEMON,
-        ARG_LOG_FILE,
-        ARG_PID,
-        ARG_SKIP_SERVE_LOGS,
-        ARG_STDERR,
-        ARG_STDOUT,
-        ARG_VERBOSE,
-        ActionCommand,
-        Arg,
-        GroupCommand,
-        lazy_load_command,
-    )
-except ImportError:
-    import packaging.version
-
-    from airflow.exceptions import AirflowOptionalProviderFeatureException
-
-    base_version = packaging.version.parse(airflow_version).base_version
-
-    if packaging.version.parse(base_version) < 
packaging.version.parse("2.7.0"):
-        raise AirflowOptionalProviderFeatureException(
-            "Celery Executor from Celery Provider should only be used with 
Airflow 2.7.0+.\n"
-            f"This is Airflow {airflow_version} and Celery and 
CeleryKubernetesExecutor are "
-            f"available in the 'airflow.executors' package. You should not use 
"
-            f"the provider's executors in this version of Airflow."
-        )
-    raise
-
+from airflow.cli.cli_config import (
+    ARG_DAEMON,
+    ARG_LOG_FILE,
+    ARG_PID,
+    ARG_SKIP_SERVE_LOGS,
+    ARG_STDERR,
+    ARG_STDOUT,
+    ARG_VERBOSE,
+    ActionCommand,
+    Arg,
+    GroupCommand,
+    lazy_load_command,
+)
 from airflow.configuration import conf
 from airflow.exceptions import AirflowTaskTimeout
 from airflow.executors.base_executor import BaseExecutor
diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index f62f021fd2..43cefeb9c4 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -38,38 +38,18 @@ from typing import TYPE_CHECKING, Any, Sequence
 from kubernetes.dynamic import DynamicClient
 from sqlalchemy import select, update
 
-from airflow.providers.cncf.kubernetes.pod_generator import 
PodMutationHookException, PodReconciliationError
-from airflow.stats import Stats
-
-try:
-    from airflow.cli.cli_config import (
-        ARG_DAG_ID,
-        ARG_EXECUTION_DATE,
-        ARG_OUTPUT_PATH,
-        ARG_SUBDIR,
-        ARG_VERBOSE,
-        ActionCommand,
-        Arg,
-        GroupCommand,
-        lazy_load_command,
-        positive_int,
-    )
-except ImportError:
-    import packaging.version
-
-    from airflow import __version__ as airflow_version
-    from airflow.exceptions import AirflowOptionalProviderFeatureException
-
-    base_version = packaging.version.parse(airflow_version).base_version
-
-    if packaging.version.parse(base_version) < 
packaging.version.parse("2.7.0"):
-        raise AirflowOptionalProviderFeatureException(
-            "Kubernetes Executor from CNCF Provider should only be used with 
Airflow 2.7.0+.\n"
-            f"This is Airflow {airflow_version} and Kubernetes and 
CeleryKubernetesExecutor are "
-            f"available in the 'airflow.executors' package. You should not use 
"
-            f"the provider's executors in this version of Airflow."
-        )
-    raise
+from airflow.cli.cli_config import (
+    ARG_DAG_ID,
+    ARG_EXECUTION_DATE,
+    ARG_OUTPUT_PATH,
+    ARG_SUBDIR,
+    ARG_VERBOSE,
+    ActionCommand,
+    Arg,
+    GroupCommand,
+    lazy_load_command,
+    positive_int,
+)
 from airflow.configuration import conf
 from airflow.executors.base_executor import BaseExecutor
 from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types 
import (
@@ -78,6 +58,8 @@ from 
airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types impor
 )
 from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
 from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import 
annotations_to_key
+from airflow.providers.cncf.kubernetes.pod_generator import 
PodMutationHookException, PodReconciliationError
+from airflow.stats import Stats
 from airflow.utils.event_scheduler import EventScheduler
 from airflow.utils.log.logging_mixin import remove_escape_codes
 from airflow.utils.session import NEW_SESSION, provide_session
diff --git 
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index 3f544fe2fe..d26df876ef 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -21,7 +21,7 @@ import json
 import multiprocessing
 import time
 from queue import Empty, Queue
-from typing import TYPE_CHECKING, Any, Generic, TypeVar
+from typing import TYPE_CHECKING, Any
 
 from kubernetes import client, watch
 from kubernetes.client.rest import ApiException
@@ -36,6 +36,7 @@ from 
airflow.providers.cncf.kubernetes.kubernetes_helper_functions import (
 )
 from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.singleton import Singleton
 from airflow.utils.state import TaskInstanceState
 
 try:
@@ -60,22 +61,6 @@ if TYPE_CHECKING:
         KubernetesWatchType,
     )
 
-# Singleton here is duplicated version of airflow.utils.singleton.Singleton 
until
-# min-airflow version is 2.7.0 for the provider. then it can be imported from 
airflow.utils.singleton.
-
-T = TypeVar("T")
-
-
-class Singleton(type, Generic[T]):
-    """Metaclass that allows to implement singleton pattern."""
-
-    _instances: dict[Singleton[T], T] = {}
-
-    def __call__(cls: Singleton[T], *args, **kwargs) -> T:
-        if cls not in cls._instances:
-            cls._instances[cls] = super().__call__(*args, **kwargs)
-        return cls._instances[cls]
-
 
 class ResourceVersion(metaclass=Singleton):
     """Singleton for tracking resourceVersion from Kubernetes."""

Reply via email to