This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ee200f8a363 Remove AIRFLOW_2_10_PLUS conditions (#49877)
ee200f8a363 is described below

commit ee200f8a363dad626f142fb109d9fc01adcd7947
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Apr 29 00:17:07 2025 +0200

    Remove AIRFLOW_2_10_PLUS conditions (#49877)
    
    After updating providers to 2.10+ we can remove all conditions for
    2.10+.
---
 .../tests/unit/listeners/class_listener.py         |  35 +---
 devel-common/src/tests_common/test_utils/compat.py |  34 ++--
 devel-common/src/tests_common/test_utils/db.py     |   9 +-
 .../src/tests_common/test_utils/version_compat.py  |   1 -
 .../providers/amazon/aws/hooks/redshift_sql.py     |   5 +-
 .../src/airflow/providers/amazon/version_compat.py |   1 -
 .../amazon/aws/example_sagemaker_unified_studio.py |   5 -
 .../amazon/aws/executors/ecs/test_ecs_executor.py  |   3 +-
 .../amazon/tests/unit/amazon/aws/hooks/test_s3.py  |  25 +--
 .../amazon/aws/log/test_cloudwatch_task_handler.py |  22 +--
 .../unit/amazon/aws/operators/test_redshift_sql.py | 201 +++++++++------------
 .../providers/apache/spark/version_compat.py       |   1 -
 .../tests/unit/celery/cli/test_celery_command.py   |   9 +-
 .../unit/celery/executors/test_celery_executor.py  |   6 +-
 .../providers/cncf/kubernetes/version_compat.py    |   1 -
 .../providers/common/compat/assets/__init__.py     |   9 +-
 .../providers/common/compat/lineage/hook.py        |   6 +-
 .../providers/common/compat/standard/operators.py  |   6 +-
 .../providers/common/compat/version_compat.py      |   1 -
 .../airflow/providers/common/io/version_compat.py  |   1 -
 .../airflow/providers/databricks/version_compat.py |   1 -
 .../providers/dbt/cloud/utils/openlineage.py       |  11 +-
 .../airflow/providers/dbt/cloud/version_compat.py  |   1 -
 .../src/airflow/providers/docker/version_compat.py |   1 -
 .../src/airflow/providers/edge3/version_compat.py  |   1 -
 .../providers/elasticsearch/version_compat.py      |   1 -
 .../src/airflow/providers/google/version_compat.py |   1 -
 .../tests/unit/google/cloud/hooks/test_bigquery.py |   3 -
 .../tests/unit/google/cloud/hooks/test_gcs.py      |   9 -
 .../providers/microsoft/azure/version_compat.py    |   1 -
 .../unit/microsoft/azure/operators/test_msgraph.py |   2 -
 .../unit/microsoft/azure/sensors/test_msgraph.py   |   2 -
 .../providers/openlineage/plugins/listener.py      |  36 +---
 .../providers/openlineage/plugins/openlineage.py   |   6 +-
 .../airflow/providers/openlineage/utils/utils.py   |   8 +-
 .../providers/openlineage/version_compat.py        |   1 -
 .../unit/openlineage/extractors/test_manager.py    |  30 ++-
 .../unit/openlineage/plugins/test_execution.py     |   6 +-
 .../unit/openlineage/plugins/test_listener.py      |  39 ++--
 .../tests/unit/openlineage/plugins/test_utils.py   | 108 +----------
 .../tests/unit/openlineage/utils/test_utils.py     |   4 +-
 .../airflow/providers/opensearch/version_compat.py |   1 -
 .../src/airflow/providers/presto/version_compat.py |   1 -
 .../src/airflow/providers/redis/version_compat.py  |   1 -
 .../src/airflow/providers/sftp/version_compat.py   |   1 -
 .../tests/unit/smtp/notifications/test_smtp.py     |   4 +-
 .../providers/snowflake/utils/openlineage.py       |  14 +-
 .../airflow/providers/snowflake/version_compat.py  |   1 -
 .../tests/unit/snowflake/utils/test_openlineage.py |   8 +-
 .../airflow/providers/standard/operators/python.py |  26 +--
 .../src/airflow/providers/standard/sensors/time.py |   5 +-
 .../providers/standard/triggers/temporal.py        |  10 +-
 .../airflow/providers/standard/version_compat.py   |   1 -
 .../standard/decorators/test_external_python.py    |  11 --
 .../tests/unit/standard/operators/test_python.py   |  11 +-
 .../tests/unit/standard/triggers/test_temporal.py  |  41 -----
 .../src/airflow/providers/trino/version_compat.py  |   1 -
 57 files changed, 205 insertions(+), 585 deletions(-)

diff --git a/airflow-core/tests/unit/listeners/class_listener.py 
b/airflow-core/tests/unit/listeners/class_listener.py
index de235abbd40..49a018ea9d8 100644
--- a/airflow-core/tests/unit/listeners/class_listener.py
+++ b/airflow-core/tests/unit/listeners/class_listener.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 from airflow.listeners import hookimpl
 from airflow.utils.state import DagRunState, TaskInstanceState
 
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 if AIRFLOW_V_3_0_PLUS:
 
@@ -64,8 +64,7 @@ if AIRFLOW_V_3_0_PLUS:
         @hookimpl
         def on_dag_run_failed(self, dag_run, msg: str):
             self.state.append(DagRunState.FAILED)
-
-elif AIRFLOW_V_2_10_PLUS:
+else:
 
     class ClassBasedListener:  # type: ignore[no-redef]
         def __init__(self):
@@ -95,36 +94,6 @@ elif AIRFLOW_V_2_10_PLUS:
         @hookimpl
         def on_task_instance_failed(self, previous_state, task_instance, 
error: None | str | BaseException):
             self.state.append(TaskInstanceState.FAILED)
-else:
-
-    class ClassBasedListener:  # type: ignore[no-redef]
-        def __init__(self):
-            self.started_component = None
-            self.stopped_component = None
-            self.state = []
-
-        @hookimpl
-        def on_starting(self, component):
-            self.started_component = component
-            self.state.append(DagRunState.RUNNING)
-
-        @hookimpl
-        def before_stopping(self, component):
-            global stopped_component
-            stopped_component = component
-            self.state.append(DagRunState.SUCCESS)
-
-        @hookimpl
-        def on_task_instance_running(self, previous_state, task_instance, 
session):
-            self.state.append(TaskInstanceState.RUNNING)
-
-        @hookimpl
-        def on_task_instance_success(self, previous_state, task_instance, 
session):
-            self.state.append(TaskInstanceState.SUCCESS)
-
-        @hookimpl
-        def on_task_instance_failed(self, previous_state, task_instance, 
session):
-            self.state.append(TaskInstanceState.FAILED)
 
 
 def clear():
diff --git a/devel-common/src/tests_common/test_utils/compat.py 
b/devel-common/src/tests_common/test_utils/compat.py
index 59e5fbdaf1a..c5eef051bda 100644
--- a/devel-common/src/tests_common/test_utils/compat.py
+++ b/devel-common/src/tests_common/test_utils/compat.py
@@ -23,8 +23,6 @@ from typing import TYPE_CHECKING, Any, cast
 from airflow.exceptions import AirflowOptionalProviderFeatureException
 from airflow.utils.helpers import prune_dict
 
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
-
 try:
     # ImportError has been renamed to ParseImportError in airflow 2.10.0, and 
since our provider tests should
     # run on all supported versions of Airflow, this compatibility shim falls 
back to the old ImportError so
@@ -86,35 +84,27 @@ else:
     except ModuleNotFoundError:
         # dataset is renamed to asset since Airflow 3.0
         from airflow.models.dataset import (
+            DagScheduleDatasetAliasReference as DagScheduleAssetAliasReference,
             DagScheduleDatasetReference as DagScheduleAssetReference,
+            DatasetAliasModel as AssetAliasModel,
             DatasetDagRunQueue as AssetDagRunQueue,
             DatasetEvent as AssetEvent,
             DatasetModel as AssetModel,
             TaskOutletDatasetReference as TaskOutletAssetReference,
         )
 
-        if AIRFLOW_V_2_10_PLUS:
-            from airflow.models.dataset import (
-                DagScheduleDatasetAliasReference as 
DagScheduleAssetAliasReference,
-                DatasetAliasModel as AssetAliasModel,
-            )
-
 
 def deserialize_operator(serialized_operator: dict[str, Any]) -> Operator:
-    if AIRFLOW_V_2_10_PLUS:
-        # In airflow 2.10+ we can deserialize operator using regular 
deserialize method.
-        # We do not need to use deserialize_operator method explicitly but 
some tests are deserializing the
-        # operator and in the future they could use regular ``deserialize`` 
method. This method is a shim
-        # to make deserialization of operator works for tests run against 
older Airflow versions and tests
-        # should use that method instead of calling 
``BaseSerialization.deserialize`` directly.
-        # We can remove this method and switch to the regular ``deserialize`` 
method as long as all providers
-        # are updated to airflow 2.10+.
-        from airflow.serialization.serialized_objects import BaseSerialization
-
-        return BaseSerialization.deserialize(serialized_operator)
-    from airflow.serialization.serialized_objects import SerializedBaseOperator
-
-    return SerializedBaseOperator.deserialize_operator(serialized_operator)
+    # In airflow 2.10+ we can deserialize operator using regular deserialize 
method.
+    # We do not need to use deserialize_operator method explicitly but some 
tests are deserializing the
+    # operator and in the future they could use regular ``deserialize`` 
method. This method is a shim
+    # to make deserialization of operator works for tests run against older 
Airflow versions and tests
+    # should use that method instead of calling 
``BaseSerialization.deserialize`` directly.
+    # We can remove this method and switch to the regular ``deserialize`` 
method as long as all providers
+    # are updated to airflow 2.10+.
+    from airflow.serialization.serialized_objects import BaseSerialization
+
+    return BaseSerialization.deserialize(serialized_operator)
 
 
 def connection_to_dict(
diff --git a/devel-common/src/tests_common/test_utils/db.py 
b/devel-common/src/tests_common/test_utils/db.py
index 4ab88d1c75e..c5711ec514c 100644
--- a/devel-common/src/tests_common/test_utils/db.py
+++ b/devel-common/src/tests_common/test_utils/db.py
@@ -51,7 +51,7 @@ from tests_common.test_utils.compat import (
     ParseImportError,
     TaskOutletAssetReference,
 )
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 if TYPE_CHECKING:
     from pathlib import Path
@@ -159,11 +159,10 @@ def clear_db_assets():
         session.query(AssetDagRunQueue).delete()
         session.query(DagScheduleAssetReference).delete()
         session.query(TaskOutletAssetReference).delete()
-        if AIRFLOW_V_2_10_PLUS:
-            from tests_common.test_utils.compat import AssetAliasModel, 
DagScheduleAssetAliasReference
+        from tests_common.test_utils.compat import AssetAliasModel, 
DagScheduleAssetAliasReference
 
-            session.query(AssetAliasModel).delete()
-            session.query(DagScheduleAssetAliasReference).delete()
+        session.query(AssetAliasModel).delete()
+        session.query(DagScheduleAssetAliasReference).delete()
         if AIRFLOW_V_3_0_PLUS:
             from airflow.models.asset import (
                 AssetActive,
diff --git a/devel-common/src/tests_common/test_utils/version_compat.py 
b/devel-common/src/tests_common/test_utils/version_compat.py
index 7227de2d859..2e990761628 100644
--- a/devel-common/src/tests_common/test_utils/version_compat.py
+++ b/devel-common/src/tests_common/test_utils/version_compat.py
@@ -32,6 +32,5 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
 [].sort()
diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py 
b/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py
index c74b23352f9..bdd90f1d6ba 100644
--- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py
+++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/redshift_sql.py
@@ -26,7 +26,6 @@ from sqlalchemy.engine.url import URL
 
 from airflow.exceptions import AirflowException
 from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
-from airflow.providers.amazon.version_compat import AIRFLOW_V_2_10_PLUS
 from airflow.providers.common.sql.hooks.sql import DbApiHook
 
 if TYPE_CHECKING:
@@ -260,6 +259,4 @@ class RedshiftSQLHook(DbApiHook):
 
     def get_openlineage_default_schema(self) -> str | None:
         """Return current schema. This is usually changed with ``SEARCH_PATH`` 
parameter."""
-        if AIRFLOW_V_2_10_PLUS:
-            return self.get_first("SELECT CURRENT_SCHEMA();")[0]
-        return super().get_openlineage_default_schema()
+        return self.get_first("SELECT CURRENT_SCHEMA();")[0]
diff --git a/providers/amazon/src/airflow/providers/amazon/version_compat.py 
b/providers/amazon/src/airflow/providers/amazon/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/amazon/src/airflow/providers/amazon/version_compat.py
+++ b/providers/amazon/src/airflow/providers/amazon/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git 
a/providers/amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py 
b/providers/amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py
index ab5f91ee2fb..ea002228220 100644
--- 
a/providers/amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py
+++ 
b/providers/amazon/tests/system/amazon/aws/example_sagemaker_unified_studio.py
@@ -18,8 +18,6 @@ from __future__ import annotations
 
 from datetime import datetime
 
-import pytest
-
 from airflow.decorators import task
 from airflow.models.baseoperator import chain
 from airflow.models.dag import DAG
@@ -28,7 +26,6 @@ from 
airflow.providers.amazon.aws.operators.sagemaker_unified_studio import (
 )
 
 from system.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
 
 """
 Prerequisites: The account which runs this test must manually have the 
following:
@@ -42,8 +39,6 @@ The setup tasks will set up the project and configure the 
test runner to emulate
 Then, the SageMakerNotebookOperator will run a test notebook. This should spin 
up a SageMaker training job, run the notebook, and exit successfully.
 """
 
-pytestmark = pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Test requires 
Airflow 2.10+")
-
 DAG_ID = "example_sagemaker_unified_studio"
 
 # Externally fetched variables:
diff --git 
a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py 
b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
index 074d2babd85..70cedadab34 100644
--- a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
+++ b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py
@@ -59,7 +59,7 @@ from airflow.version import version as airflow_version_str
 
 from tests_common import RUNNING_TESTS_AGAINST_AIRFLOW_PACKAGES
 from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 pytestmark = pytest.mark.db_test
 
@@ -380,7 +380,6 @@ class TestEcsExecutorTask:
 class TestAwsEcsExecutor:
     """Tests the AWS ECS Executor."""
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Test requires Airflow 
2.10+")
     
@mock.patch("airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor.change_state")
     def test_execute(self, change_state_mock, mock_airflow_key, mock_executor, 
mock_cmd):
         """Test execution from end-to-end."""
diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py 
b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
index bb585951613..ad31a7d9c07 100644
--- a/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
+++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_s3.py
@@ -43,8 +43,6 @@ from airflow.providers.amazon.aws.hooks.s3 import (
 )
 from airflow.utils.timezone import datetime
 
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
-
 
 @pytest.fixture
 def mocked_s3_res():
@@ -59,19 +57,17 @@ def s3_bucket(mocked_s3_res):
     return bucket
 
 
-if AIRFLOW_V_2_10_PLUS:
-
-    @pytest.fixture
-    def hook_lineage_collector():
-        from airflow.lineage import hook
-        from airflow.providers.common.compat.lineage.hook import 
get_hook_lineage_collector
[email protected]
+def hook_lineage_collector():
+    from airflow.lineage import hook
+    from airflow.providers.common.compat.lineage.hook import 
get_hook_lineage_collector
 
-        hook._hook_lineage_collector = None
-        hook._hook_lineage_collector = hook.HookLineageCollector()
+    hook._hook_lineage_collector = None
+    hook._hook_lineage_collector = hook.HookLineageCollector()
 
-        yield get_hook_lineage_collector()
+    yield get_hook_lineage_collector()
 
-        hook._hook_lineage_collector = None
+    hook._hook_lineage_collector = None
 
 
 class TestAwsS3Hook:
@@ -448,7 +444,6 @@ class TestAwsS3Hook:
         resource = boto3.resource("s3").Object(s3_bucket, "my_key")
         assert resource.get()["Body"].read() == b"Cont\xc3\xa9nt"
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
     def test_load_string_exposes_lineage(self, s3_bucket, 
hook_lineage_collector):
         hook = S3Hook()
 
@@ -1023,7 +1018,6 @@ class TestAwsS3Hook:
         resource = boto3.resource("s3").Object(s3_bucket, "my_key")
         assert gz.decompress(resource.get()["Body"].read()) == b"Content"
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
     def test_load_file_exposes_lineage(self, s3_bucket, tmp_path, 
hook_lineage_collector):
         hook = S3Hook()
         path = tmp_path / "testfile"
@@ -1091,7 +1085,6 @@ class TestAwsS3Hook:
                 ACL="private",
             )
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
     @mock_aws
     def test_copy_object_ol_instrumentation(self, s3_bucket, 
hook_lineage_collector):
         mock_hook = S3Hook()
@@ -1230,7 +1223,6 @@ class TestAwsS3Hook:
 
         assert path.name == output_file
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.NamedTemporaryFile")
     def test_download_file_exposes_lineage(self, mock_temp_file, tmp_path, 
hook_lineage_collector):
         path = tmp_path / "airflow_tmp_test_s3_hook"
@@ -1273,7 +1265,6 @@ class TestAwsS3Hook:
 
         mock_open.assert_called_once_with(path, "wb")
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
     @mock.patch("airflow.providers.amazon.aws.hooks.s3.open")
     def test_download_file_with_preserve_name_exposes_lineage(
         self, mock_open, tmp_path, hook_lineage_collector
diff --git 
a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py 
b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
index 3696f552d76..e2897457c05 100644
--- a/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
+++ b/providers/amazon/tests/unit/amazon/aws/log/test_cloudwatch_task_handler.py
@@ -44,7 +44,7 @@ from airflow.utils.state import State
 from airflow.utils.timezone import datetime
 
 from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 
 def get_time_str(time_in_milliseconds):
@@ -270,19 +270,13 @@ class TestCloudwatchTaskHandler:
                 {"timestamp": current_time, "message": "Third"},
             ],
         )
-        if AIRFLOW_V_2_10_PLUS:
-            monkeypatch.setattr(self.cloudwatch_task_handler, 
"_read_from_logs_server", lambda a, b: ([], []))
-            msg_template = textwrap.dedent("""
-                 INFO - ::group::Log message source details
-                *** Reading remote log from Cloudwatch log_group: {} 
log_stream: {}
-                 INFO - ::endgroup::
-                {}
-            """)[1:][:-1]  # Strip off leading and trailing new lines, but not 
spaces
-        else:
-            msg_template = textwrap.dedent("""
-                *** Reading remote log from Cloudwatch log_group: {} 
log_stream: {}
-                {}
-            """).strip()
+        monkeypatch.setattr(self.cloudwatch_task_handler, 
"_read_from_logs_server", lambda a, b: ([], []))
+        msg_template = textwrap.dedent("""
+             INFO - ::group::Log message source details
+            *** Reading remote log from Cloudwatch log_group: {} log_stream: {}
+             INFO - ::endgroup::
+            {}
+        """)[1:][:-1]  # Strip off leading and trailing new lines, but not 
spaces
 
         logs, metadata = self.cloudwatch_task_handler.read(self.ti)
         if AIRFLOW_V_3_0_PLUS:
diff --git 
a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_sql.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_sql.py
index 1b124ee2028..bbd4ade48d1 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_sql.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_sql.py
@@ -17,7 +17,7 @@
 # under the License.
 from __future__ import annotations
 
-from unittest.mock import MagicMock, PropertyMock, call, patch
+from unittest.mock import MagicMock, call, patch
 
 import pytest
 
@@ -40,14 +40,13 @@ MOCK_REGION_NAME = "eu-north-1"
 class TestRedshiftSQLOpenLineage:
     @patch.dict("os.environ", 
AIRFLOW_CONN_AWS_DEFAULT=f"aws://?region_name={MOCK_REGION_NAME}")
     @pytest.mark.parametrize(
-        "connection_host, connection_extra, expected_identity, is_over_210, 
expected_schemaname",
+        "connection_host, connection_extra, expected_identity, 
expected_schemaname",
         [
             # test without a connection host but with a cluster_identifier in 
connection extra
             (
                 None,
                 {"iam": True, "cluster_identifier": 
"cluster_identifier_from_extra"},
                 f"cluster_identifier_from_extra.{MOCK_REGION_NAME}",
-                True,
                 "database.public",
             ),
             # test with a connection host and without a cluster_identifier in 
connection extra
@@ -55,7 +54,6 @@ class TestRedshiftSQLOpenLineage:
                 
"cluster_identifier_from_host.id.my_region.redshift.amazonaws.com",
                 {"iam": True},
                 "cluster_identifier_from_host.my_region",
-                True,
                 "database.public",
             ),
             # test with both connection host and cluster_identifier in 
connection extra
@@ -63,41 +61,20 @@ class TestRedshiftSQLOpenLineage:
                 "cluster_identifier_from_host.x.y",
                 {"iam": True, "cluster_identifier": 
"cluster_identifier_from_extra"},
                 f"cluster_identifier_from_extra.{MOCK_REGION_NAME}",
-                True,
                 "database.public",
             ),
-            # test when hostname doesn't match pattern
-            ("1.2.3.4", {}, "1.2.3.4", True, "database.public"),
-            # test with Airflow below 2.10 not using Hook connection
-            (
-                
"cluster_identifier_from_host.id.my_region.redshift.amazonaws.com",
-                {"iam": True},
-                "cluster_identifier_from_host.my_region",
-                False,
-                "public",
-            ),
         ],
     )
-    @patch(
-        "airflow.providers.amazon.aws.hooks.redshift_sql.AIRFLOW_V_2_10_PLUS",
-        new_callable=PropertyMock,
-    )
-    @patch("airflow.providers.openlineage.utils.utils.AIRFLOW_V_2_10_PLUS", 
new_callable=PropertyMock)
     @patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.conn")
     def test_execute_openlineage_events(
         self,
         mock_aws_hook_conn,
-        mock_ol_utils,
-        mock_redshift_sql,
         connection_host,
         connection_extra,
         expected_identity,
-        is_over_210,
         expected_schemaname,
         # self, mock_aws_hook_conn, connection_host, connection_extra, 
expected_identity, is_below_2_10, expected_schemaname
     ):
-        mock_ol_utils.__bool__ = lambda x: is_over_210
-        mock_redshift_sql.__bool__ = lambda x: is_over_210
         DB_NAME = "database"
         DB_SCHEMA_NAME = "public"
 
@@ -176,97 +153,93 @@ class TestRedshiftSQLOpenLineage:
         
dbapi_hook.get_conn.return_value.cursor.return_value.fetchall.side_effect = rows
 
         lineage = op.get_openlineage_facets_on_start()
-        if is_over_210:
-            assert 
dbapi_hook.get_conn.return_value.cursor.return_value.execute.mock_calls == [
-                call(
-                    "SELECT SVV_REDSHIFT_COLUMNS.schema_name, "
-                    "SVV_REDSHIFT_COLUMNS.table_name, "
-                    "SVV_REDSHIFT_COLUMNS.column_name, "
-                    "SVV_REDSHIFT_COLUMNS.ordinal_position, "
-                    "SVV_REDSHIFT_COLUMNS.data_type, "
-                    "SVV_REDSHIFT_COLUMNS.database_name \n"
-                    "FROM SVV_REDSHIFT_COLUMNS \n"
-                    f"WHERE SVV_REDSHIFT_COLUMNS.schema_name = 
'{expected_schemaname}' "
-                    "AND SVV_REDSHIFT_COLUMNS.table_name IN ('little_table') "
-                    "OR SVV_REDSHIFT_COLUMNS.database_name = 'another_db' "
-                    "AND SVV_REDSHIFT_COLUMNS.schema_name = 'another_schema' 
AND "
-                    "SVV_REDSHIFT_COLUMNS.table_name IN 
('popular_orders_day_of_week')"
-                ),
-                call(
-                    "SELECT SVV_REDSHIFT_COLUMNS.schema_name, "
-                    "SVV_REDSHIFT_COLUMNS.table_name, "
-                    "SVV_REDSHIFT_COLUMNS.column_name, "
-                    "SVV_REDSHIFT_COLUMNS.ordinal_position, "
-                    "SVV_REDSHIFT_COLUMNS.data_type, "
-                    "SVV_REDSHIFT_COLUMNS.database_name \n"
-                    "FROM SVV_REDSHIFT_COLUMNS \n"
-                    f"WHERE SVV_REDSHIFT_COLUMNS.schema_name = 
'{expected_schemaname}' "
-                    "AND SVV_REDSHIFT_COLUMNS.table_name IN ('Test_table')"
-                ),
-            ]
-        else:
-            assert 
dbapi_hook.get_conn.return_value.cursor.return_value.execute.mock_calls == []
+        assert 
dbapi_hook.get_conn.return_value.cursor.return_value.execute.mock_calls == [
+            call(
+                "SELECT SVV_REDSHIFT_COLUMNS.schema_name, "
+                "SVV_REDSHIFT_COLUMNS.table_name, "
+                "SVV_REDSHIFT_COLUMNS.column_name, "
+                "SVV_REDSHIFT_COLUMNS.ordinal_position, "
+                "SVV_REDSHIFT_COLUMNS.data_type, "
+                "SVV_REDSHIFT_COLUMNS.database_name \n"
+                "FROM SVV_REDSHIFT_COLUMNS \n"
+                f"WHERE SVV_REDSHIFT_COLUMNS.schema_name = 
'{expected_schemaname}' "
+                "AND SVV_REDSHIFT_COLUMNS.table_name IN ('little_table') "
+                "OR SVV_REDSHIFT_COLUMNS.database_name = 'another_db' "
+                "AND SVV_REDSHIFT_COLUMNS.schema_name = 'another_schema' AND "
+                "SVV_REDSHIFT_COLUMNS.table_name IN 
('popular_orders_day_of_week')"
+            ),
+            call(
+                "SELECT SVV_REDSHIFT_COLUMNS.schema_name, "
+                "SVV_REDSHIFT_COLUMNS.table_name, "
+                "SVV_REDSHIFT_COLUMNS.column_name, "
+                "SVV_REDSHIFT_COLUMNS.ordinal_position, "
+                "SVV_REDSHIFT_COLUMNS.data_type, "
+                "SVV_REDSHIFT_COLUMNS.database_name \n"
+                "FROM SVV_REDSHIFT_COLUMNS \n"
+                f"WHERE SVV_REDSHIFT_COLUMNS.schema_name = 
'{expected_schemaname}' "
+                "AND SVV_REDSHIFT_COLUMNS.table_name IN ('Test_table')"
+            ),
+        ]
         expected_namespace = f"redshift://{expected_identity}:5439"
 
-        if is_over_210:
-            assert lineage.inputs == [
-                Dataset(
-                    namespace=expected_namespace,
-                    
name=f"{ANOTHER_DB_NAME}.{ANOTHER_DB_SCHEMA}.popular_orders_day_of_week",
-                    facets={
-                        "schema": SchemaDatasetFacet(
-                            fields=[
-                                
SchemaDatasetFacetFields(name="order_day_of_week", type="varchar"),
-                                
SchemaDatasetFacetFields(name="order_placed_on", type="timestamp"),
-                                SchemaDatasetFacetFields(name="orders_placed", 
type="int4"),
-                            ]
-                        )
-                    },
-                ),
-                Dataset(
-                    namespace=expected_namespace,
-                    name=f"{DB_NAME}.{DB_SCHEMA_NAME}.little_table",
-                    facets={
-                        "schema": SchemaDatasetFacet(
-                            fields=[
-                                
SchemaDatasetFacetFields(name="order_day_of_week", type="varchar"),
-                                
SchemaDatasetFacetFields(name="additional_constant", type="varchar"),
-                            ]
-                        )
-                    },
-                ),
-            ]
-            assert lineage.outputs == [
-                Dataset(
-                    namespace=expected_namespace,
-                    name=f"{DB_NAME}.{DB_SCHEMA_NAME}.test_table",
-                    facets={
-                        "schema": SchemaDatasetFacet(
-                            fields=[
-                                
SchemaDatasetFacetFields(name="order_day_of_week", type="varchar"),
-                                
SchemaDatasetFacetFields(name="order_placed_on", type="timestamp"),
-                                SchemaDatasetFacetFields(name="orders_placed", 
type="int4"),
-                                
SchemaDatasetFacetFields(name="additional_constant", type="varchar"),
-                            ]
-                        ),
-                        "columnLineage": ColumnLineageDatasetFacet(
-                            fields={
-                                "additional_constant": Fields(
-                                    inputFields=[
-                                        InputField(
-                                            namespace=expected_namespace,
-                                            
name="database.public.little_table",
-                                            field="additional_constant",
-                                        )
-                                    ],
-                                    transformationDescription="",
-                                    transformationType="",
-                                )
-                            }
-                        ),
-                    },
-                )
-            ]
+        assert lineage.inputs == [
+            Dataset(
+                namespace=expected_namespace,
+                
name=f"{ANOTHER_DB_NAME}.{ANOTHER_DB_SCHEMA}.popular_orders_day_of_week",
+                facets={
+                    "schema": SchemaDatasetFacet(
+                        fields=[
+                            SchemaDatasetFacetFields(name="order_day_of_week", 
type="varchar"),
+                            SchemaDatasetFacetFields(name="order_placed_on", 
type="timestamp"),
+                            SchemaDatasetFacetFields(name="orders_placed", 
type="int4"),
+                        ]
+                    )
+                },
+            ),
+            Dataset(
+                namespace=expected_namespace,
+                name=f"{DB_NAME}.{DB_SCHEMA_NAME}.little_table",
+                facets={
+                    "schema": SchemaDatasetFacet(
+                        fields=[
+                            SchemaDatasetFacetFields(name="order_day_of_week", 
type="varchar"),
+                            
SchemaDatasetFacetFields(name="additional_constant", type="varchar"),
+                        ]
+                    )
+                },
+            ),
+        ]
+        assert lineage.outputs == [
+            Dataset(
+                namespace=expected_namespace,
+                name=f"{DB_NAME}.{DB_SCHEMA_NAME}.test_table",
+                facets={
+                    "schema": SchemaDatasetFacet(
+                        fields=[
+                            SchemaDatasetFacetFields(name="order_day_of_week", 
type="varchar"),
+                            SchemaDatasetFacetFields(name="order_placed_on", 
type="timestamp"),
+                            SchemaDatasetFacetFields(name="orders_placed", 
type="int4"),
+                            
SchemaDatasetFacetFields(name="additional_constant", type="varchar"),
+                        ]
+                    ),
+                    "columnLineage": ColumnLineageDatasetFacet(
+                        fields={
+                            "additional_constant": Fields(
+                                inputFields=[
+                                    InputField(
+                                        namespace=expected_namespace,
+                                        name="database.public.little_table",
+                                        field="additional_constant",
+                                    )
+                                ],
+                                transformationDescription="",
+                                transformationType="",
+                            )
+                        }
+                    ),
+                },
+            )
+        ]
 
         assert lineage.job_facets == {"sql": SQLJobFacet(query=sql)}
 
diff --git 
a/providers/apache/spark/src/airflow/providers/apache/spark/version_compat.py 
b/providers/apache/spark/src/airflow/providers/apache/spark/version_compat.py
index 21e7170194e..48d122b6696 100644
--- 
a/providers/apache/spark/src/airflow/providers/apache/spark/version_compat.py
+++ 
b/providers/apache/spark/src/airflow/providers/apache/spark/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/celery/tests/unit/celery/cli/test_celery_command.py 
b/providers/celery/tests/unit/celery/cli/test_celery_command.py
index b10a7afa126..cae229fd3b5 100644
--- a/providers/celery/tests/unit/celery/cli/test_celery_command.py
+++ b/providers/celery/tests/unit/celery/cli/test_celery_command.py
@@ -31,7 +31,7 @@ from airflow.providers.celery.cli import celery_command
 from airflow.providers.celery.cli.celery_command import 
_run_stale_bundle_cleanup
 
 from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 pytestmark = pytest.mark.db_test
 
@@ -313,13 +313,6 @@ class TestFlowerCommand:
                 stderr="/tmp/flower-stderr.log",
                 log="/tmp/flower.log",
             )
-            if AIRFLOW_V_2_10_PLUS
-            else mock.call(
-                process="flower",
-                stdout="/tmp/flower-stdout.log",
-                stderr="/tmp/flower-stderr.log",
-                log="/tmp/flower.log",
-            )
         ]
         
mock_pid_file.assert_has_calls([mock.call(mock_setup_locations.return_value[0], 
-1)])
         assert mock_open.mock_calls == [
diff --git 
a/providers/celery/tests/unit/celery/executors/test_celery_executor.py 
b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
index 22dbd59a914..974dc2b06f7 100644
--- a/providers/celery/tests/unit/celery/executors/test_celery_executor.py
+++ b/providers/celery/tests/unit/celery/executors/test_celery_executor.py
@@ -44,7 +44,7 @@ from airflow.utils.state import State
 
 from tests_common.test_utils import db
 from tests_common.test_utils.config import conf_vars
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 pytestmark = pytest.mark.db_test
 
@@ -226,8 +226,8 @@ class TestCeleryExecutor:
 
         not_adopted_tis = executor.try_adopt_task_instances(tis)
 
-        key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, None, 0 if 
AIRFLOW_V_2_10_PLUS else 1)
-        key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, None, 0 if 
AIRFLOW_V_2_10_PLUS else 1)
+        key_1 = TaskInstanceKey(dag.dag_id, task_1.task_id, None, 0)
+        key_2 = TaskInstanceKey(dag.dag_id, task_2.task_id, None, 0)
         assert executor.running == {key_1, key_2}
 
         assert executor.tasks == {key_1: AsyncResult("231"), key_2: 
AsyncResult("232")}
diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
index 21e7170194e..48d122b6696 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/assets/__init__.py
 
b/providers/common/compat/src/airflow/providers/common/compat/assets/__init__.py
index 01fb1ab2018..b2b3c3e86e2 100644
--- 
a/providers/common/compat/src/airflow/providers/common/compat/assets/__init__.py
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/assets/__init__.py
@@ -20,7 +20,6 @@ from __future__ import annotations
 from typing import TYPE_CHECKING
 
 from airflow.providers.common.compat.version_compat import (
-    AIRFLOW_V_2_10_PLUS,
     AIRFLOW_V_3_0_PLUS,
 )
 
@@ -38,16 +37,12 @@ else:
         from airflow.auth.managers.models.resource_details import 
DatasetDetails as AssetDetails
         from airflow.datasets import (
             Dataset as Asset,
+            DatasetAlias as AssetAlias,
             DatasetAll as AssetAll,
             DatasetAny as AssetAny,
+            expand_alias_to_datasets as expand_alias_to_assets,
         )
 
-        if AIRFLOW_V_2_10_PLUS:
-            from airflow.datasets import (
-                DatasetAlias as AssetAlias,
-                expand_alias_to_datasets as expand_alias_to_assets,
-            )
-
 
 __all__ = [
     "Asset",
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py 
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
index 1b18723c457..bf7f1705c6d 100644
--- 
a/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/lineage/hook.py
@@ -16,7 +16,7 @@
 # under the License.
 from __future__ import annotations
 
-from airflow.providers.common.compat.version_compat import 
AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS
+from airflow.providers.common.compat.version_compat import AIRFLOW_V_3_0_PLUS
 
 
 def _get_asset_compat_hook_lineage_collector():
@@ -85,9 +85,7 @@ def get_hook_lineage_collector():
 
         return get_hook_lineage_collector()
 
-    # HookLineageCollector added in 2.10
-    if AIRFLOW_V_2_10_PLUS:
-        return _get_asset_compat_hook_lineage_collector()
+    return _get_asset_compat_hook_lineage_collector()
 
     # For the case that airflow has not yet upgraded to 2.10 or higher,
     # but using the providers that already uses `get_hook_lineage_collector`
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
 
b/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
index 0e34419043f..c53e8b53e92 100644
--- 
a/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/standard/operators.py
@@ -19,8 +19,6 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING
 
-from airflow.providers.common.compat.version_compat import AIRFLOW_V_2_10_PLUS
-
 if TYPE_CHECKING:
     from airflow.providers.standard.operators.python import (
         _SERIALIZERS,
@@ -38,13 +36,11 @@ else:
         )
     except ModuleNotFoundError:
         from airflow.operators.python import (
+            _SERIALIZERS,
             PythonOperator,
             ShortCircuitOperator,
             get_current_context,
         )
 
-        if AIRFLOW_V_2_10_PLUS:
-            from airflow.operators.python import _SERIALIZERS
-
 
 __all__ = ["PythonOperator", "_SERIALIZERS", "ShortCircuitOperator", 
"get_current_context"]
diff --git 
a/providers/common/compat/src/airflow/providers/common/compat/version_compat.py 
b/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
index 21e7170194e..48d122b6696 100644
--- 
a/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
+++ 
b/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git 
a/providers/common/io/src/airflow/providers/common/io/version_compat.py 
b/providers/common/io/src/airflow/providers/common/io/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/common/io/src/airflow/providers/common/io/version_compat.py
+++ b/providers/common/io/src/airflow/providers/common/io/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git 
a/providers/databricks/src/airflow/providers/databricks/version_compat.py 
b/providers/databricks/src/airflow/providers/databricks/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/databricks/src/airflow/providers/databricks/version_compat.py
+++ b/providers/databricks/src/airflow/providers/databricks/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git 
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py
index 983ee48e63c..b9e56f30f3f 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/utils/openlineage.py
@@ -22,7 +22,7 @@ import re
 from typing import TYPE_CHECKING
 
 from airflow.providers.common.compat.openlineage.check import 
require_openlineage_version
-from airflow.providers.dbt.cloud.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from airflow.providers.dbt.cloud.version_compat import AIRFLOW_V_3_0_PLUS
 
 if TYPE_CHECKING:
     from airflow.models.taskinstance import TaskInstance
@@ -48,13 +48,6 @@ def _get_logical_date(task_instance):
     return date
 
 
-def _get_try_number(val):
-    # todo: remove when min airflow version >= 2.10.0
-    if AIRFLOW_V_2_10_PLUS:
-        return val.try_number
-    return val.try_number - 1
-
-
 @require_openlineage_version(provider_min_version="2.0.0")
 def generate_openlineage_events_from_dbt_cloud_run(
     operator: DbtCloudRunJobOperator | DbtCloudJobRunSensor, task_instance: 
TaskInstance
@@ -144,7 +137,7 @@ def generate_openlineage_events_from_dbt_cloud_run(
         dag_id=task_instance.dag_id,
         task_id=operator.task_id,
         logical_date=_get_logical_date(task_instance),
-        try_number=_get_try_number(task_instance),
+        try_number=task_instance.try_number,
         map_index=task_instance.map_index,
     )
 
diff --git 
a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py 
b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
+++ b/providers/dbt/cloud/src/airflow/providers/dbt/cloud/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/docker/src/airflow/providers/docker/version_compat.py 
b/providers/docker/src/airflow/providers/docker/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/docker/src/airflow/providers/docker/version_compat.py
+++ b/providers/docker/src/airflow/providers/docker/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/edge3/src/airflow/providers/edge3/version_compat.py 
b/providers/edge3/src/airflow/providers/edge3/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/edge3/src/airflow/providers/edge3/version_compat.py
+++ b/providers/edge3/src/airflow/providers/edge3/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
index 21e7170194e..48d122b6696 100644
--- 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
+++ 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/google/src/airflow/providers/google/version_compat.py 
b/providers/google/src/airflow/providers/google/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/google/src/airflow/providers/google/version_compat.py
+++ b/providers/google/src/airflow/providers/google/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py 
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index 45eb7cf2d68..d7febaada86 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -52,8 +52,6 @@ from airflow.providers.google.cloud.hooks.bigquery import (
     split_tablename,
 )
 
-from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS
-
 pytestmark = 
pytest.mark.filterwarnings("error::airflow.exceptions.AirflowProviderDeprecationWarning")
 
 PROJECT_ID = "bq-project"
@@ -1933,7 +1931,6 @@ class TestBigQueryAsyncHookMethods:
         assert result == [{"f0_": 22, "f1_": 3.14, "f2_": "PI"}]
 
 
[email protected](not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
 @pytest.mark.db_test
 class TestHookLevelLineage(_BigQueryBaseTestClass):
     @mock.patch("airflow.providers.google.cloud.hooks.bigquery.Client")
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_gcs.py 
b/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
index 5156ca6d395..326209ee1d0 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_gcs.py
@@ -43,7 +43,6 @@ from airflow.providers.google.common.consts import CLIENT_INFO
 from airflow.utils import timezone
 from airflow.version import version
 
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
 from unit.google.cloud.utils.base_gcp_mock import 
mock_base_gcp_hook_default_project_id
 
 BASE_STRING = "airflow.providers.google.common.hooks.base_google.{}"
@@ -411,7 +410,6 @@ class TestGCSHook:
 
         assert str(ctx.value) == "source_bucket and source_object cannot be 
empty."
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
     @mock.patch("google.cloud.storage.Bucket.copy_blob")
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_copy_exposes_lineage(self, mock_service, mock_copy, 
hook_lineage_collector):
@@ -507,7 +505,6 @@ class TestGCSHook:
 
         assert str(ctx.value) == "source_bucket and source_object cannot be 
empty."
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_rewrite_exposes_lineage(self, mock_service, 
hook_lineage_collector):
         source_bucket_name = "test-source-bucket"
@@ -567,7 +564,6 @@ class TestGCSHook:
         with pytest.raises(exceptions.NotFound):
             self.gcs_hook.delete(bucket_name=test_bucket, 
object_name=test_object)
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_delete_exposes_lineage(self, mock_service, 
hook_lineage_collector):
         test_bucket = "test_bucket"
@@ -818,7 +814,6 @@ class TestGCSHook:
 
         assert str(ctx.value) == "bucket_name and destination_object cannot be 
empty."
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_compose_exposes_lineage(self, mock_service, 
hook_lineage_collector):
         test_bucket = "test_bucket"
@@ -859,7 +854,6 @@ class TestGCSHook:
         assert response == test_object_bytes
         download_method.assert_called_once_with()
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
     @mock.patch("google.cloud.storage.Blob.download_as_bytes")
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_download_as_bytes_exposes_lineage(self, mock_service, 
mock_download, hook_lineage_collector):
@@ -899,7 +893,6 @@ class TestGCSHook:
         assert response == test_file
         download_filename_method.assert_called_once_with(test_file, timeout=60)
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
     @mock.patch("google.cloud.storage.Blob.download_to_filename")
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_download_to_file_exposes_lineage(self, mock_service, 
mock_download, hook_lineage_collector):
@@ -1154,7 +1147,6 @@ class TestGCSHookUpload:
 
         assert metadata == blob_object.return_value.metadata
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
     @mock.patch("google.cloud.storage.Blob.upload_from_filename")
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_upload_file_exposes_lineage(self, mock_service, mock_upload, 
hook_lineage_collector):
@@ -1218,7 +1210,6 @@ class TestGCSHookUpload:
 
         upload_method.assert_called_once_with(testdata_bytes, 
content_type="text/plain", timeout=60)
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
     @mock.patch("google.cloud.storage.Blob.upload_from_string")
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_upload_data_exposes_lineage(self, mock_service, mock_upload, 
hook_lineage_collector):
diff --git 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py
 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py
index 21e7170194e..48d122b6696 100644
--- 
a/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py
+++ 
b/providers/microsoft/azure/src/airflow/providers/microsoft/azure/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git 
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
 
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
index e33c71f39f1..6ae597ba0e3 100644
--- 
a/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
+++ 
b/providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_msgraph.py
@@ -33,7 +33,6 @@ from airflow.utils import timezone
 from tests_common.test_utils.file_loading import load_file_from_resources, 
load_json_from_resources
 from tests_common.test_utils.mock_context import mock_context
 from tests_common.test_utils.operators.run_deferrable import execute_operator
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
 from unit.microsoft.azure.base import Base
 from unit.microsoft.azure.test_utils import mock_json_response, mock_response
 
@@ -261,7 +260,6 @@ class TestMSGraphAsyncOperator(Base):
             assert events[0].payload["response"] == base64_encoded_content
 
     @pytest.mark.db_test
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Lambda parameters 
works in Airflow >= 2.10.0")
     def test_execute_with_lambda_parameter_when_response_is_bytes(self):
         content = load_file_from_resources(
             dirname(__file__), "..", "resources", "dummy.pdf", mode="rb", 
encoding=None
diff --git 
a/providers/microsoft/azure/tests/unit/microsoft/azure/sensors/test_msgraph.py 
b/providers/microsoft/azure/tests/unit/microsoft/azure/sensors/test_msgraph.py
index c73c76caffe..055165d4fc7 100644
--- 
a/providers/microsoft/azure/tests/unit/microsoft/azure/sensors/test_msgraph.py
+++ 
b/providers/microsoft/azure/tests/unit/microsoft/azure/sensors/test_msgraph.py
@@ -28,7 +28,6 @@ from airflow.triggers.base import TriggerEvent
 
 from tests_common.test_utils.file_loading import load_json_from_resources
 from tests_common.test_utils.operators.run_deferrable import execute_operator
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
 from unit.microsoft.azure.base import Base
 from unit.microsoft.azure.test_utils import mock_json_response
 
@@ -102,7 +101,6 @@ class TestMSGraphSensor(Base):
             assert events[2].payload["type"] == "builtins.dict"
             assert events[2].payload["response"] == json.dumps(status[1])
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="Lambda parameters 
works in Airflow >= 2.10.0")
     def 
test_execute_with_lambda_parameter_and_result_processor_with_new_signature(self):
         status = load_json_from_resources(dirname(__file__), "..", 
"resources", "status.json")
         response = mock_json_response(200, *status)
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
index 545ab651ad3..11228c0d10f 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
@@ -33,7 +33,6 @@ from airflow.providers.openlineage import conf
 from airflow.providers.openlineage.extractors import ExtractorManager, 
OperatorLineage
 from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, 
RunState
 from airflow.providers.openlineage.utils.utils import (
-    AIRFLOW_V_2_10_PLUS,
     AIRFLOW_V_3_0_PLUS,
     get_airflow_dag_run_facet,
     get_airflow_debug_facet,
@@ -59,13 +58,6 @@ if TYPE_CHECKING:
 _openlineage_listener: OpenLineageListener | None = None
 
 
-def _get_try_number_success(val):
-    # todo: remove when min airflow version >= 2.10.0
-    if AIRFLOW_V_2_10_PLUS:
-        return val.try_number
-    return val.try_number - 1
-
-
 def _executor_initializer():
     """
     Initialize processes for the executor used with DAGRun listener's methods 
(on scheduler).
@@ -304,7 +296,7 @@ class OpenLineageListener:
             task_uuid = self.adapter.build_task_instance_run_id(
                 dag_id=dag.dag_id,
                 task_id=task.task_id,
-                try_number=_get_try_number_success(task_instance),
+                try_number=task_instance.try_number,
                 logical_date=date,
                 map_index=task_instance.map_index,
             )
@@ -366,8 +358,7 @@ class OpenLineageListener:
             dagrun = context["dag_run"]
             dag = context["dag"]
             self._on_task_instance_failed(task_instance, dag, dagrun, task, 
error)
-
-    elif AIRFLOW_V_2_10_PLUS:
+    else:
 
         @hookimpl
         def on_task_instance_failed(
@@ -382,19 +373,6 @@ class OpenLineageListener:
             if TYPE_CHECKING:
                 assert task
             self._on_task_instance_failed(task_instance, task.dag, 
task_instance.dag_run, task, error)
-    else:
-
-        @hookimpl
-        def on_task_instance_failed(
-            self,
-            previous_state: TaskInstanceState,
-            task_instance: TaskInstance,
-            session: Session,  # type: ignore[valid-type]
-        ) -> None:
-            task = task_instance.task
-            if TYPE_CHECKING:
-                assert task
-            self._on_task_instance_failed(task_instance, task.dag, 
task_instance.dag_run, task)
 
     def _on_task_instance_failed(
         self,
@@ -651,10 +629,7 @@ class OpenLineageListener:
                 self.log.debug("Executor have not started before 
`on_dag_run_success`")
                 return
 
-            if AIRFLOW_V_2_10_PLUS:
-                task_ids = DagRun._get_partial_task_ids(dag_run.dag)
-            else:
-                task_ids = dag_run.dag.task_ids if dag_run.dag and 
dag_run.dag.partial else None
+            task_ids = DagRun._get_partial_task_ids(dag_run.dag)
 
             date = dag_run.logical_date
             if AIRFLOW_V_3_0_PLUS and date is None:
@@ -690,10 +665,7 @@ class OpenLineageListener:
                 self.log.debug("Executor have not started before 
`on_dag_run_failed`")
                 return
 
-            if AIRFLOW_V_2_10_PLUS:
-                task_ids = DagRun._get_partial_task_ids(dag_run.dag)
-            else:
-                task_ids = dag_run.dag.task_ids if dag_run.dag and 
dag_run.dag.partial else None
+            task_ids = DagRun._get_partial_task_ids(dag_run.dag)
 
             date = dag_run.logical_date
             if AIRFLOW_V_3_0_PLUS and date is None:
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
index 36c26657546..b1f465a9409 100644
--- 
a/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
+++ 
b/providers/openlineage/src/airflow/providers/openlineage/plugins/openlineage.py
@@ -25,7 +25,6 @@ from airflow.providers.openlineage.plugins.macros import (
     lineage_parent_id,
     lineage_run_id,
 )
-from airflow.providers.openlineage.version_compat import AIRFLOW_V_2_10_PLUS
 
 
 class OpenLineageProviderPlugin(AirflowPlugin):
@@ -40,10 +39,9 @@ class OpenLineageProviderPlugin(AirflowPlugin):
     if not conf.is_disabled():
         macros = [lineage_job_namespace, lineage_job_name, lineage_run_id, 
lineage_parent_id]
         listeners = [get_openlineage_listener()]
-        if AIRFLOW_V_2_10_PLUS:
-            from airflow.lineage.hook import HookLineageReader
+        from airflow.lineage.hook import HookLineageReader
 
-            hook_lineage_readers = [HookLineageReader]
+        hook_lineage_readers = [HookLineageReader]
     else:
         macros = []
         listeners = []
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py 
b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
index 454b164d0f9..2567cb41802 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/utils/utils.py
@@ -50,7 +50,7 @@ from airflow.providers.openlineage.utils.selective_enable 
import (
     is_dag_lineage_enabled,
     is_task_lineage_enabled,
 )
-from airflow.providers.openlineage.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from airflow.providers.openlineage.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.sensors.base import BaseSensorOperator
 from airflow.serialization.serialized_objects import SerializedBaseOperator
 from airflow.utils.module_loading import import_string
@@ -803,12 +803,6 @@ def get_filtered_unknown_operator_keys(operator: 
BaseOperator) -> dict:
 
 def should_use_external_connection(hook) -> bool:
     # If we're at Airflow 2.10, the execution is process-isolated, so we can 
safely run those again.
-    if not AIRFLOW_V_2_10_PLUS:
-        return hook.__class__.__name__ not in [
-            "SnowflakeHook",
-            "SnowflakeSqlApiHook",
-            "RedshiftSQLHook",
-        ]
     return True
 
 
diff --git 
a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py 
b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
+++ b/providers/openlineage/src/airflow/providers/openlineage/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git 
a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py 
b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
index 73f2c0d7b3d..55e11588343 100644
--- a/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
+++ b/providers/openlineage/tests/unit/openlineage/extractors/test_manager.py
@@ -38,7 +38,7 @@ from airflow.utils.state import State
 
 from tests_common.test_utils.compat import DateTimeSensor, PythonOperator
 from tests_common.test_utils.markers import 
skip_if_force_lowest_dependencies_marker
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 if TYPE_CHECKING:
     try:
@@ -53,21 +53,19 @@ if TYPE_CHECKING:
         AssetEventDagRunReference = TIRunContext = Any  # type: ignore[misc, 
assignment]
 
 
-if AIRFLOW_V_2_10_PLUS:
-
-    @pytest.fixture
-    def hook_lineage_collector():
-        from airflow.lineage import hook
-        from airflow.providers.common.compat.lineage.hook import (
-            get_hook_lineage_collector,
-        )
[email protected]
+def hook_lineage_collector():
+    from airflow.lineage import hook
+    from airflow.providers.common.compat.lineage.hook import (
+        get_hook_lineage_collector,
+    )
 
-        hook._hook_lineage_collector = None
-        hook._hook_lineage_collector = hook.HookLineageCollector()
+    hook._hook_lineage_collector = None
+    hook._hook_lineage_collector = hook.HookLineageCollector()
 
-        yield get_hook_lineage_collector()
+    yield get_hook_lineage_collector()
 
-        hook._hook_lineage_collector = None
+    hook._hook_lineage_collector = None
 
 
 if AIRFLOW_V_3_0_PLUS:
@@ -281,7 +279,6 @@ def test_convert_to_ol_dataset_table():
 
 
 @skip_if_force_lowest_dependencies_marker
[email protected](not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
 def test_extractor_manager_uses_hook_level_lineage(hook_lineage_collector):
     dagrun = MagicMock()
     task = MagicMock()
@@ -300,7 +297,6 @@ def 
test_extractor_manager_uses_hook_level_lineage(hook_lineage_collector):
     assert metadata.outputs == [OpenLineageDataset(namespace="s3://bucket", 
name="output_key")]
 
 
[email protected](not AIRFLOW_V_2_10_PLUS, reason="Hook lineage works in 
Airflow >= 2.10.0")
 def test_extractor_manager_does_not_use_hook_level_lineage_when_operator(
     hook_lineage_collector,
 ):
@@ -330,8 +326,8 @@ def 
test_extractor_manager_does_not_use_hook_level_lineage_when_operator(
 
 @pytest.mark.db_test
 @pytest.mark.skipif(
-    not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS,
-    reason="Test for hook level lineage in Airflow >= 2.10.0 < 3.0",
+    AIRFLOW_V_3_0_PLUS,
+    reason="Test for hook level lineage in Airflow < 3.0",
 )
 def test_extractor_manager_gets_data_from_pythonoperator(session, dag_maker, 
hook_lineage_collector):
     path = None
diff --git 
a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
index 9f12994e728..88de93e6e28 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
@@ -37,7 +37,7 @@ from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.db import clear_db_runs
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 if AIRFLOW_V_3_0_PLUS:
     from airflow.utils.types import DagRunTriggeredByType
@@ -68,9 +68,7 @@ def has_value_in_events(events, chain, value):
 with tempfile.TemporaryDirectory(prefix="venv") as tmp_dir:
     listener_path = Path(tmp_dir) / "event"
 
-    @pytest.mark.skipif(
-        not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS, reason="Test requires 
Airflow>=2.10<3.0"
-    )
+    @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Test requires Airflow<3.0")
     @pytest.mark.usefixtures("reset_logging_config")
     class TestOpenLineageExecution:
         def teardown_method(self):
diff --git 
a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
index 7ece6038c11..3dc2c938835 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_listener.py
@@ -44,18 +44,17 @@ from airflow.utils.state import DagRunState, State
 from tests_common.test_utils.compat import EmptyOperator, PythonOperator
 from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.db import clear_db_runs
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 pytestmark = pytest.mark.db_test
 
-EXPECTED_TRY_NUMBER_1 = 1 if AIRFLOW_V_2_10_PLUS else 0
-
-TRY_NUMBER_BEFORE_EXECUTION = 0 if AIRFLOW_V_2_10_PLUS else 1
-TRY_NUMBER_RUNNING = 0 if AIRFLOW_V_2_10_PLUS else 1
-TRY_NUMBER_FAILED = 0 if AIRFLOW_V_2_10_PLUS else 1
-TRY_NUMBER_SUCCESS = 0 if AIRFLOW_V_2_10_PLUS else 2
-TRY_NUMBER_AFTER_EXECUTION = 0 if AIRFLOW_V_2_10_PLUS else 2
+EXPECTED_TRY_NUMBER_1 = 1
 
+TRY_NUMBER_BEFORE_EXECUTION = 0
+TRY_NUMBER_RUNNING = 0
+TRY_NUMBER_FAILED = 0
+TRY_NUMBER_SUCCESS = 0
+TRY_NUMBER_AFTER_EXECUTION = 0
 
 if TYPE_CHECKING:
     from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance
@@ -356,8 +355,8 @@ class TestOpenLineageListenerAirflow2:
         mock_disabled.return_value = False
 
         err = ValueError("test")
-        on_task_failed_listener_kwargs = {"error": err} if AIRFLOW_V_2_10_PLUS 
else {}
-        expected_err_kwargs = {"error": err if AIRFLOW_V_2_10_PLUS else None}
+        on_task_failed_listener_kwargs = {"error": err}
+        expected_err_kwargs = {"error": err}
 
         listener.on_task_instance_failed(
             previous_state=None, task_instance=task_instance, 
**on_task_failed_listener_kwargs, session=None
@@ -460,7 +459,7 @@ class TestOpenLineageListenerAirflow2:
         parameters derived from the task instance.
         """
         listener, task_instance = self._create_listener_and_task_instance()
-        on_task_failed_kwargs = {"error": ValueError("test")} if 
AIRFLOW_V_2_10_PLUS else {}
+        on_task_failed_kwargs = {"error": ValueError("test")}
 
         listener.on_task_instance_failed(
             previous_state=None, task_instance=task_instance, 
**on_task_failed_kwargs, session=None
@@ -582,7 +581,7 @@ class TestOpenLineageListenerAirflow2:
         mock_get_user_provided_run_facets.return_value = {"custom_facet": 2}
         mock_disabled.return_value = True
 
-        on_task_failed_kwargs = {"error": ValueError("test")} if 
AIRFLOW_V_2_10_PLUS else {}
+        on_task_failed_kwargs = {"error": ValueError("test")}
 
         listener.on_task_instance_failed(
             previous_state=None, task_instance=task_instance, 
**on_task_failed_kwargs, session=None
@@ -1011,8 +1010,8 @@ class TestOpenLineageListenerAirflow3:
         mock_disabled.return_value = False
 
         err = ValueError("test")
-        on_task_failed_listener_kwargs = {"error": err} if AIRFLOW_V_2_10_PLUS 
else {}
-        expected_err_kwargs = {"error": err if AIRFLOW_V_2_10_PLUS else None}
+        on_task_failed_listener_kwargs = {"error": err}
+        expected_err_kwargs = {"error": err}
 
         listener.on_task_instance_failed(
             previous_state=None, task_instance=task_instance, 
**on_task_failed_listener_kwargs
@@ -1054,8 +1053,8 @@ class TestOpenLineageListenerAirflow3:
         mock_get_job_name.return_value = "job_name"
 
         err = ValueError("test")
-        on_task_failed_listener_kwargs = {"error": err} if AIRFLOW_V_2_10_PLUS 
else {}
-        expected_err_kwargs = {"error": err if AIRFLOW_V_2_10_PLUS else None}
+        on_task_failed_listener_kwargs = {"error": err}
+        expected_err_kwargs = {"error": err}
 
         listener.on_task_instance_failed(
             previous_state=None, task_instance=task_instance, 
**on_task_failed_listener_kwargs
@@ -1185,7 +1184,7 @@ class TestOpenLineageListenerAirflow3:
         parameters derived from the task instance.
         """
         listener, task_instance = self._create_listener_and_task_instance()
-        on_task_failed_kwargs = {"error": ValueError("test")} if 
AIRFLOW_V_2_10_PLUS else {}
+        on_task_failed_kwargs = {"error": ValueError("test")}
 
         listener.on_task_instance_failed(
             previous_state=None, task_instance=task_instance, 
**on_task_failed_kwargs
@@ -1248,7 +1247,7 @@ class TestOpenLineageListenerAirflow3:
         mock_get_user_provided_run_facets.return_value = {"custom_facet": 2}
         mock_disabled.return_value = True
 
-        on_task_failed_kwargs = {"error": ValueError("test")} if 
AIRFLOW_V_2_10_PLUS else {}
+        on_task_failed_kwargs = {"error": ValueError("test")}
 
         listener.on_task_instance_failed(
             previous_state=None, task_instance=task_instance, 
**on_task_failed_kwargs
@@ -1427,7 +1426,7 @@ class TestOpenLineageSelectiveEnableAirflow2:
         if enable_task:
             enable_lineage(self.task_1)
 
-        on_task_failed_kwargs = {"error": ValueError("test")} if 
AIRFLOW_V_2_10_PLUS else {}
+        on_task_failed_kwargs = {"error": ValueError("test")}
 
         with conf_vars({("openlineage", "selective_enable"): 
selective_enable}):
             listener = OpenLineageListener()
@@ -1485,7 +1484,7 @@ class TestOpenLineageSelectiveEnableAirflow2:
         if enable_task:
             enable_lineage(self.task_1)
 
-        on_task_failed_kwargs = {"error": ValueError("test")} if 
AIRFLOW_V_2_10_PLUS else {}
+        on_task_failed_kwargs = {"error": ValueError("test")}
 
         with conf_vars({("openlineage", "selective_enable"): 
selective_enable}):
             listener = OpenLineageListener()
diff --git a/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py 
b/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py
index 4d7944a0ad6..ffbb10c8c48 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_utils.py
@@ -51,7 +51,7 @@ from airflow.utils.types import DagRunType
 from tests_common.test_utils.compat import (
     BashOperator,
 )
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 if AIRFLOW_V_3_0_PLUS:
     from airflow.utils.types import DagRunTriggeredByType
@@ -562,7 +562,7 @@ def 
test_serialize_timetable_with_dataset_or_time_schedule():
 
 
 @pytest.mark.skipif(
-    not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_0_PLUS,
     reason="This test checks serialization only in 2.10 conditions",
 )
 def test_serialize_timetable_2_10_complex_with_alias():
@@ -600,7 +600,7 @@ def test_serialize_timetable_2_10_complex_with_alias():
 
 
 @pytest.mark.skipif(
-    not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_0_PLUS,
     reason="This test checks serialization only in 2.10 conditions",
 )
 def test_serialize_timetable_2_10_single_asset():
@@ -612,7 +612,7 @@ def test_serialize_timetable_2_10_single_asset():
 
 
 @pytest.mark.skipif(
-    not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_0_PLUS,
     reason="This test checks serialization only in 2.10 conditions",
 )
 def test_serialize_timetable_2_10_list_of_assets():
@@ -630,7 +630,7 @@ def test_serialize_timetable_2_10_list_of_assets():
 
 
 @pytest.mark.skipif(
-    not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_0_PLUS,
     reason="This test checks serialization only in 2.10 conditions",
 )
 def test_serialize_timetable_2_10_with_complex_logical_condition():
@@ -665,7 +665,7 @@ def 
test_serialize_timetable_2_10_with_complex_logical_condition():
 
 
 @pytest.mark.skipif(
-    not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS,
+    AIRFLOW_V_3_0_PLUS,
     reason="This test checks serialization only in 2.10 conditions",
 )
 def test_serialize_timetable_2_10_with_dataset_or_time_schedule():
@@ -709,102 +709,6 @@ def 
test_serialize_timetable_2_10_with_dataset_or_time_schedule():
     }
 
 
[email protected](AIRFLOW_V_2_10_PLUS, reason="This test checks 
serialization only in 2.9 conditions")
-def test_serialize_timetable_2_9_single_asset():
-    dag = DAG(dag_id="test", start_date=datetime.datetime(2025, 1, 1), 
schedule=Asset("a"))
-    dag_info = DagInfo(dag)
-    assert dag_info.timetable == {"dataset_condition": {"__type": "dataset", 
"uri": "a", "extra": None}}
-
-
[email protected](AIRFLOW_V_2_10_PLUS, reason="This test checks 
serialization only in 2.9 conditions")
-def test_serialize_timetable_2_9_list_of_assets():
-    dag = DAG(dag_id="test", start_date=datetime.datetime(2025, 1, 1), 
schedule=[Asset("a"), Asset("b")])
-    dag_info = DagInfo(dag)
-    assert dag_info.timetable == {
-        "dataset_condition": {
-            "__type": "dataset_all",
-            "objects": [
-                {"__type": "dataset", "extra": None, "uri": "a"},
-                {"__type": "dataset", "extra": None, "uri": "b"},
-            ],
-        }
-    }
-
-
[email protected](AIRFLOW_V_2_10_PLUS, reason="This test checks 
serialization only in 2.9 conditions")
-def test_serialize_timetable_2_9_with_complex_logical_condition():
-    dag = DAG(
-        dag_id="test",
-        start_date=datetime.datetime(2025, 1, 1),
-        schedule=(Asset("ds1", extra={"some_extra": 1}) | Asset("ds2"))
-        & (Asset("ds3") | Asset("ds4", extra={"another_extra": 345})),
-    )
-    dag_info = DagInfo(dag)
-    assert dag_info.timetable == {
-        "dataset_condition": {
-            "__type": "dataset_all",
-            "objects": [
-                {
-                    "__type": "dataset_any",
-                    "objects": [
-                        {"__type": "dataset", "uri": "ds1", "extra": 
{"some_extra": 1}},
-                        {"__type": "dataset", "uri": "ds2", "extra": None},
-                    ],
-                },
-                {
-                    "__type": "dataset_any",
-                    "objects": [
-                        {"__type": "dataset", "uri": "ds3", "extra": None},
-                        {"__type": "dataset", "uri": "ds4", "extra": 
{"another_extra": 345}},
-                    ],
-                },
-            ],
-        }
-    }
-
-
[email protected](AIRFLOW_V_2_10_PLUS, reason="This test checks 
serialization only in 2.9 conditions")
-def test_serialize_timetable_2_9_with_dataset_or_time_schedule():
-    from airflow.timetables.datasets import DatasetOrTimeSchedule
-    from airflow.timetables.trigger import CronTriggerTimetable
-
-    dag = DAG(
-        dag_id="test",
-        start_date=datetime.datetime(2025, 1, 1),
-        schedule=DatasetOrTimeSchedule(
-            timetable=CronTriggerTimetable("0 0 * 3 *", timezone="UTC"),
-            datasets=(Asset("ds1", extra={"some_extra": 1}) | Asset("ds2"))
-            & (Asset("ds3") | Asset("ds4", extra={"another_extra": 345})),
-        ),
-    )
-    dag_info = DagInfo(dag)
-    assert dag_info.timetable == {
-        "timetable": {
-            "__type": "airflow.timetables.trigger.CronTriggerTimetable",
-            "__var": {"expression": "0 0 * 3 *", "timezone": "UTC", 
"interval": 0.0},
-        },
-        "dataset_condition": {
-            "__type": "dataset_all",
-            "objects": [
-                {
-                    "__type": "dataset_any",
-                    "objects": [
-                        {"__type": "dataset", "uri": "ds1", "extra": 
{"some_extra": 1}},
-                        {"__type": "dataset", "uri": "ds2", "extra": None},
-                    ],
-                },
-                {
-                    "__type": "dataset_any",
-                    "objects": [
-                        {"__type": "dataset", "uri": "ds3", "extra": None},
-                        {"__type": "dataset", "uri": "ds4", "extra": 
{"another_extra": 345}},
-                    ],
-                },
-            ],
-        },
-    }
-
-
 @pytest.mark.parametrize(
     ("airflow_version", "ol_version"),
     [
diff --git a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py 
b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
index 07b1a90ff9a..27b64daa630 100644
--- a/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
+++ b/providers/openlineage/tests/unit/openlineage/utils/test_utils.py
@@ -64,7 +64,7 @@ from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.compat import BashOperator, PythonOperator
 from tests_common.test_utils.mock_operators import MockOperator
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
 PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
@@ -1026,7 +1026,7 @@ class TestDagInfoAirflow2:
         }
 
 
[email protected](not AIRFLOW_V_2_10_PLUS or AIRFLOW_V_3_0_PLUS, 
reason="Airflow 2.10 tests")
[email protected](AIRFLOW_V_3_0_PLUS, reason="Airflow < 3.0 tests")
 class TestDagInfoAirflow210:
     def test_dag_info_schedule_single_dataset_directly(self):
         dag = DAG(
diff --git 
a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py 
b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
+++ b/providers/opensearch/src/airflow/providers/opensearch/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/presto/src/airflow/providers/presto/version_compat.py 
b/providers/presto/src/airflow/providers/presto/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/presto/src/airflow/providers/presto/version_compat.py
+++ b/providers/presto/src/airflow/providers/presto/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/redis/src/airflow/providers/redis/version_compat.py 
b/providers/redis/src/airflow/providers/redis/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/redis/src/airflow/providers/redis/version_compat.py
+++ b/providers/redis/src/airflow/providers/redis/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/sftp/src/airflow/providers/sftp/version_compat.py 
b/providers/sftp/src/airflow/providers/sftp/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/sftp/src/airflow/providers/sftp/version_compat.py
+++ b/providers/sftp/src/airflow/providers/sftp/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/smtp/tests/unit/smtp/notifications/test_smtp.py 
b/providers/smtp/tests/unit/smtp/notifications/test_smtp.py
index bacabf87b4d..ff3c02c4da9 100644
--- a/providers/smtp/tests/unit/smtp/notifications/test_smtp.py
+++ b/providers/smtp/tests/unit/smtp/notifications/test_smtp.py
@@ -30,14 +30,12 @@ from airflow.providers.smtp.notifications.smtp import (
 from airflow.providers.standard.operators.empty import EmptyOperator
 from airflow.utils import timezone
 
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
-
 pytestmark = pytest.mark.db_test
 
 SMTP_API_DEFAULT_CONN_ID = SmtpHook.default_conn_name
 
 
-NUM_TRY = 0 if AIRFLOW_V_2_10_PLUS else 1
+NUM_TRY = 0
 
 
 class TestSmtpNotifier:
diff --git 
a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py 
b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
index 1a3d3f01836..14d682c50cb 100644
--- a/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
+++ b/providers/snowflake/src/airflow/providers/snowflake/utils/openlineage.py
@@ -23,9 +23,8 @@ from typing import TYPE_CHECKING
 from urllib.parse import quote, urlparse, urlunparse
 
 from airflow.providers.common.compat.openlineage.check import 
require_openlineage_version
-from airflow.providers.snowflake.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from airflow.providers.snowflake.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.utils import timezone
-from airflow.utils.state import TaskInstanceState
 
 if TYPE_CHECKING:
     from openlineage.client.event_v2 import RunEvent
@@ -122,21 +121,12 @@ def _get_ol_run_id(task_instance) -> str:
 
         return date
 
-    def _get_try_number_success():
-        """We are running this in the _on_complete, so need to adjust for 
try_num changes."""
-        # todo: remove when min airflow version >= 2.10.0
-        if AIRFLOW_V_2_10_PLUS:
-            return task_instance.try_number
-        if task_instance.state == TaskInstanceState.SUCCESS:
-            return task_instance.try_number - 1
-        return task_instance.try_number
-
     # Generate same OL run id as is generated for current task instance
     return OpenLineageAdapter.build_task_instance_run_id(
         dag_id=task_instance.dag_id,
         task_id=task_instance.task_id,
         logical_date=_get_logical_date(),
-        try_number=_get_try_number_success(),
+        try_number=task_instance.try_number,
         map_index=task_instance.map_index,
     )
 
diff --git 
a/providers/snowflake/src/airflow/providers/snowflake/version_compat.py 
b/providers/snowflake/src/airflow/providers/snowflake/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/snowflake/src/airflow/providers/snowflake/version_compat.py
+++ b/providers/snowflake/src/airflow/providers/snowflake/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git a/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py 
b/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
index 1ecaf75af18..a6c94a7a383 100644
--- a/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
+++ b/providers/snowflake/tests/unit/snowflake/utils/test_openlineage.py
@@ -44,8 +44,6 @@ from airflow.providers.snowflake.utils.openlineage import (
 from airflow.utils import timezone
 from airflow.utils.state import TaskInstanceState
 
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
-
 
 @pytest.mark.parametrize(
     "source,target",
@@ -118,7 +116,7 @@ def test_get_ol_run_id_ti_success():
         dag_id="dag_id",
         task_id="task_id",
         map_index=1,
-        try_number=1 if AIRFLOW_V_2_10_PLUS else 2,
+        try_number=1,
         logical_date=logical_date,
         state=TaskInstanceState.SUCCESS,
     )
@@ -150,7 +148,7 @@ def test_get_parent_run_facet():
         dag_id="dag_id",
         task_id="task_id",
         map_index=1,
-        try_number=1 if AIRFLOW_V_2_10_PLUS else 2,
+        try_number=1,
         logical_date=logical_date,
         state=TaskInstanceState.SUCCESS,
     )
@@ -553,7 +551,7 @@ def 
test_emit_openlineage_events_for_snowflake_queries_without_hook(mock_now, mo
         dag_id="dag_id",
         task_id="task_id",
         map_index=1,
-        try_number=1 if AIRFLOW_V_2_10_PLUS else 2,
+        try_number=1,
         logical_date=logical_date,
         state=TaskInstanceState.SUCCESS,  # This will be query default state 
if no metadata found
     )
diff --git 
a/providers/standard/src/airflow/providers/standard/operators/python.py 
b/providers/standard/src/airflow/providers/standard/operators/python.py
index 1f6759ef8b8..2f0d288258f 100644
--- a/providers/standard/src/airflow/providers/standard/operators/python.py
+++ b/providers/standard/src/airflow/providers/standard/operators/python.py
@@ -44,12 +44,12 @@ from airflow.exceptions import (
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.variable import Variable
 from airflow.providers.standard.utils.python_virtualenv import 
prepare_virtualenv, write_python_script
-from airflow.providers.standard.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from airflow.providers.standard.version_compat import AIRFLOW_V_3_0_PLUS
 from airflow.utils import hashlib_wrapper
 from airflow.utils.context import context_copy_partial, context_merge
 from airflow.utils.file import get_unique_dag_module_name
 from airflow.utils.operator_helpers import KeywordParameters
-from airflow.utils.process_utils import execute_in_subprocess, 
execute_in_subprocess_with_kwargs
+from airflow.utils.process_utils import execute_in_subprocess
 
 if AIRFLOW_V_3_0_PLUS:
     from airflow.providers.standard.operators.branch import BranchMixIn
@@ -200,12 +200,10 @@ class PythonOperator(BaseOperator):
                 from airflow.sdk.execution_time.context import 
context_get_outlet_events
 
                 return create_executable_runner, 
context_get_outlet_events(context)
-            if AIRFLOW_V_2_10_PLUS:
-                from airflow.utils.context import context_get_outlet_events  # 
type: ignore
-                from airflow.utils.operator_helpers import 
ExecutionCallableRunner  # type: ignore
+            from airflow.utils.context import context_get_outlet_events  # 
type: ignore
+            from airflow.utils.operator_helpers import ExecutionCallableRunner 
 # type: ignore
 
-                return ExecutionCallableRunner, 
context_get_outlet_events(context)
-            return None
+            return ExecutionCallableRunner, context_get_outlet_events(context)
 
         self.__prepare_execution = __prepare_execution
 
@@ -560,16 +558,10 @@ class _BasePythonVirtualenvOperator(PythonOperator, 
metaclass=ABCMeta):
                     os.fspath(termination_log_path),
                     os.fspath(airflow_context_path),
                 ]
-                if AIRFLOW_V_2_10_PLUS:
-                    execute_in_subprocess(
-                        cmd=cmd,
-                        env=env_vars,
-                    )
-                else:
-                    execute_in_subprocess_with_kwargs(
-                        cmd=cmd,
-                        env=env_vars,
-                    )
+                execute_in_subprocess(
+                    cmd=cmd,
+                    env=env_vars,
+                )
             except subprocess.CalledProcessError as e:
                 if e.returncode in self.skip_on_exit_code:
                     raise AirflowSkipException(f"Process exited with code 
{e.returncode}. Skipping.")
diff --git a/providers/standard/src/airflow/providers/standard/sensors/time.py 
b/providers/standard/src/airflow/providers/standard/sensors/time.py
index ee9dde773fe..bbe93dbc01a 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/time.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/time.py
@@ -22,7 +22,6 @@ from dataclasses import dataclass
 from typing import TYPE_CHECKING, Any, NoReturn
 
 from airflow.providers.standard.triggers.temporal import DateTimeTrigger
-from airflow.providers.standard.version_compat import AIRFLOW_V_2_10_PLUS
 from airflow.sensors.base import BaseSensorOperator
 
 try:
@@ -123,9 +122,7 @@ class TimeSensorAsync(BaseSensorOperator):
 
     def execute(self, context: Context) -> NoReturn:
         self.defer(
-            trigger=DateTimeTrigger(moment=self.target_datetime, 
end_from_trigger=self.end_from_trigger)
-            if AIRFLOW_V_2_10_PLUS
-            else DateTimeTrigger(moment=self.target_datetime),
+            trigger=DateTimeTrigger(moment=self.target_datetime, 
end_from_trigger=self.end_from_trigger),
             method_name="execute_complete",
         )
 
diff --git 
a/providers/standard/src/airflow/providers/standard/triggers/temporal.py 
b/providers/standard/src/airflow/providers/standard/triggers/temporal.py
index 48ebb223cb6..5f7451a0e84 100644
--- a/providers/standard/src/airflow/providers/standard/triggers/temporal.py
+++ b/providers/standard/src/airflow/providers/standard/triggers/temporal.py
@@ -23,14 +23,9 @@ from typing import Any
 
 import pendulum
 
-from airflow.exceptions import AirflowException
-from airflow.providers.standard.version_compat import AIRFLOW_V_2_10_PLUS
-from airflow.triggers.base import BaseTrigger, TriggerEvent
+from airflow.triggers.base import BaseTrigger, TaskSuccessEvent, TriggerEvent
 from airflow.utils import timezone
 
-if AIRFLOW_V_2_10_PLUS:
-    from airflow.triggers.base import TaskSuccessEvent
-
 
 class DateTimeTrigger(BaseTrigger):
     """
@@ -54,9 +49,6 @@ class DateTimeTrigger(BaseTrigger):
         if moment.tzinfo is None:
             raise ValueError("You cannot pass naive datetimes")
         self.moment: pendulum.DateTime = timezone.convert_to_utc(moment)
-        if not AIRFLOW_V_2_10_PLUS and end_from_trigger:
-            raise AirflowException("end_from_trigger is only supported in 
Airflow 2.10 and later. ")
-
         self.end_from_trigger = end_from_trigger
 
     def serialize(self) -> tuple[str, dict[str, Any]]:
diff --git 
a/providers/standard/src/airflow/providers/standard/version_compat.py 
b/providers/standard/src/airflow/providers/standard/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/standard/src/airflow/providers/standard/version_compat.py
+++ b/providers/standard/src/airflow/providers/standard/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)
diff --git 
a/providers/standard/tests/unit/standard/decorators/test_external_python.py 
b/providers/standard/tests/unit/standard/decorators/test_external_python.py
index ef2007048f5..1b4bba68c24 100644
--- a/providers/standard/tests/unit/standard/decorators/test_external_python.py
+++ b/providers/standard/tests/unit/standard/decorators/test_external_python.py
@@ -29,8 +29,6 @@ import pytest
 from airflow.decorators import setup, task, teardown
 from airflow.utils import timezone
 
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
-
 pytestmark = pytest.mark.db_test
 
 
@@ -68,7 +66,6 @@ def venv_python_with_cloudpickle_and_dill(tmp_path_factory):
 
 
 class TestExternalPythonDecorator:
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support 
came in after 2.10")
     @pytest.mark.parametrize(
         "serializer",
         [
@@ -89,7 +86,6 @@ class TestExternalPythonDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support 
came in after 2.10")
     @pytest.mark.parametrize(
         "serializer",
         [
@@ -115,7 +111,6 @@ class TestExternalPythonDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support 
came in after 2.10")
     @pytest.mark.parametrize(
         "serializer",
         [
@@ -147,7 +142,6 @@ class TestExternalPythonDecorator:
         with pytest.raises(CalledProcessError):
             ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support 
came in after 2.10")
     @pytest.mark.parametrize(
         "serializer",
         [
@@ -170,7 +164,6 @@ class TestExternalPythonDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support 
came in after 2.10")
     @pytest.mark.parametrize(
         "serializer",
         [
@@ -191,7 +184,6 @@ class TestExternalPythonDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support 
came in after 2.10")
     @pytest.mark.parametrize(
         "serializer",
         [
@@ -212,7 +204,6 @@ class TestExternalPythonDecorator:
 
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support 
came in after 2.10")
     @pytest.mark.parametrize(
         "serializer",
         [
@@ -239,7 +230,6 @@ class TestExternalPythonDecorator:
         assert setup_task.is_setup
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support 
came in after 2.10")
     @pytest.mark.parametrize(
         "serializer",
         [
@@ -266,7 +256,6 @@ class TestExternalPythonDecorator:
         assert teardown_task.is_teardown
         ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
-    @pytest.mark.skipif(not AIRFLOW_V_2_10_PLUS, reason="serializer support 
came in after 2.10")
     @pytest.mark.parametrize(
         "serializer",
         [
diff --git a/providers/standard/tests/unit/standard/operators/test_python.py 
b/providers/standard/tests/unit/standard/operators/test_python.py
index 4206db5481b..45a1e395e1a 100644
--- a/providers/standard/tests/unit/standard/operators/test_python.py
+++ b/providers/standard/tests/unit/standard/operators/test_python.py
@@ -70,7 +70,7 @@ from airflow.utils.trigger_rule import TriggerRule
 from airflow.utils.types import NOTSET, DagRunType
 
 from tests_common.test_utils.db import clear_db_runs
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS, 
AIRFLOW_V_3_0_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
 if TYPE_CHECKING:
     from airflow.models.dagrun import DagRun
@@ -936,11 +936,10 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
             "conn",  # Accessor for Connection.
             "map_index_template",
         }
-        if AIRFLOW_V_2_10_PLUS:
-            intentionally_excluded_context_keys |= {
-                "inlet_events",
-                "outlet_events",
-            }
+        intentionally_excluded_context_keys |= {
+            "inlet_events",
+            "outlet_events",
+        }
 
         ti = create_task_instance(dag_id=self.dag_id, task_id=self.task_id, 
schedule=None)
         context = ti.get_template_context()
diff --git a/providers/standard/tests/unit/standard/triggers/test_temporal.py 
b/providers/standard/tests/unit/standard/triggers/test_temporal.py
index 91e27298c50..fc85eab8273 100644
--- a/providers/standard/tests/unit/standard/triggers/test_temporal.py
+++ b/providers/standard/tests/unit/standard/triggers/test_temporal.py
@@ -29,8 +29,6 @@ from airflow.utils import timezone
 from airflow.utils.state import TaskInstanceState
 from airflow.utils.timezone import utcnow
 
-from tests_common.test_utils.version_compat import AIRFLOW_V_2_10_PLUS
-
 
 def test_input_validation():
     """
@@ -76,7 +74,6 @@ def test_timedelta_trigger_serialization():
     assert -2 < (kwargs["moment"] - expected_moment).total_seconds() < 2
 
 
[email protected](not AIRFLOW_V_2_10_PLUS, reason="Only for Airflow 2.10+")
 @pytest.mark.parametrize(
     "tz, end_from_trigger",
     [
@@ -116,44 +113,6 @@ async def 
test_datetime_trigger_timing_airflow_2_10_plus(tz, end_from_trigger):
     assert result.payload == expected_payload
 
 
[email protected](AIRFLOW_V_2_10_PLUS, reason="Only for Airflow < 2.10+")
[email protected](
-    "tz",
-    [
-        timezone.parse_timezone("UTC"),
-        timezone.parse_timezone("Europe/Paris"),
-        timezone.parse_timezone("America/Toronto"),
-    ],
-)
[email protected]
-async def test_datetime_trigger_timing(tz):
-    """
-    Tests that the DateTimeTrigger only goes off on or after the appropriate
-    time.
-    """
-    past_moment = pendulum.instance((timezone.utcnow() - 
datetime.timedelta(seconds=60)).astimezone(tz))
-    future_moment = pendulum.instance((timezone.utcnow() + 
datetime.timedelta(seconds=60)).astimezone(tz))
-
-    # Create a task that runs the trigger for a short time then cancels it
-    trigger = DateTimeTrigger(future_moment)
-    trigger_task = asyncio.create_task(trigger.run().__anext__())
-    await asyncio.sleep(0.5)
-
-    # It should not have produced a result
-    assert trigger_task.done() is False
-    trigger_task.cancel()
-
-    # Now, make one waiting for en event in the past and do it again
-    trigger = DateTimeTrigger(past_moment)
-    trigger_task = asyncio.create_task(trigger.run().__anext__())
-    await asyncio.sleep(0.5)
-
-    assert trigger_task.done() is True
-    result = trigger_task.result()
-    assert isinstance(result, TriggerEvent)
-    assert result.payload == past_moment
-
-
 @mock.patch("airflow.providers.standard.triggers.temporal.timezone.utcnow")
 @mock.patch("airflow.providers.standard.triggers.temporal.asyncio.sleep")
 @pytest.mark.asyncio
diff --git a/providers/trino/src/airflow/providers/trino/version_compat.py 
b/providers/trino/src/airflow/providers/trino/version_compat.py
index 21e7170194e..48d122b6696 100644
--- a/providers/trino/src/airflow/providers/trino/version_compat.py
+++ b/providers/trino/src/airflow/providers/trino/version_compat.py
@@ -32,5 +32,4 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
     return airflow_version.major, airflow_version.minor, airflow_version.micro
 
 
-AIRFLOW_V_2_10_PLUS = get_base_airflow_version_tuple() >= (2, 10, 0)
 AIRFLOW_V_3_0_PLUS = get_base_airflow_version_tuple() >= (3, 0, 0)

Reply via email to