This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 27735d911e2 Fix DagBag imports in 3.2+ (#56109)
27735d911e2 is described below
commit 27735d911e26fc6a379bd69026e2dfffb07831fd
Author: Jed Cunningham <[email protected]>
AuthorDate: Thu Sep 25 16:54:55 2025 -0600
Fix DagBag imports in 3.2+ (#56109)
It was moved, but it missed the 3.1 release so the conditionals were all
incorrect.
---
devel-common/src/tests_common/pytest_plugin.py | 8 +++-----
devel-common/src/tests_common/test_utils/db.py | 10 +++++-----
.../tests/unit/openlineage/plugins/test_execution.py | 4 ++--
.../src/airflow/providers/standard/sensors/external_task.py | 4 ++--
.../standard/src/airflow/providers/standard/version_compat.py | 2 ++
.../tests/unit/standard/sensors/test_external_task_sensor.py | 10 +++++++---
.../standard/tests/unit/standard/sensors/test_time_delta.py | 4 ++--
providers/standard/tests/unit/standard/sensors/test_weekday.py | 4 ++--
8 files changed, 25 insertions(+), 21 deletions(-)
diff --git a/devel-common/src/tests_common/pytest_plugin.py
b/devel-common/src/tests_common/pytest_plugin.py
index b7c074388b4..cf3b3a46b74 100644
--- a/devel-common/src/tests_common/pytest_plugin.py
+++ b/devel-common/src/tests_common/pytest_plugin.py
@@ -1622,16 +1622,14 @@ def session():
def get_test_dag():
def _get(dag_id: str):
from airflow import settings
+ from airflow.models.serialized_dag import SerializedDagModel
- from tests_common.test_utils.version_compat import AIRFLOW_V_3_1_PLUS
+ from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
- if AIRFLOW_V_3_1_PLUS:
+ if AIRFLOW_V_3_2_PLUS:
from airflow.dag_processing.dagbag import DagBag
else:
from airflow.models.dagbag import DagBag # type: ignore[no-redef,
attribute-defined]
- from airflow.models.serialized_dag import SerializedDagModel
-
- from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
dag_file = AIRFLOW_CORE_TESTS_PATH / "unit" / "dags" / f"{dag_id}.py"
dagbag = DagBag(dag_folder=dag_file, include_examples=False)
diff --git a/devel-common/src/tests_common/test_utils/db.py
b/devel-common/src/tests_common/test_utils/db.py
index c9d964641e3..dc3e86a62d9 100644
--- a/devel-common/src/tests_common/test_utils/db.py
+++ b/devel-common/src/tests_common/test_utils/db.py
@@ -90,9 +90,9 @@ def _deactivate_unknown_dags(active_dag_ids, session):
def _bootstrap_dagbag():
- if AIRFLOW_V_3_1_PLUS:
+ if AIRFLOW_V_3_2_PLUS:
from airflow.dag_processing.dagbag import DagBag
- else: # back-compat for Airflow <3.1
+ else: # back-compat for Airflow <3.2
from airflow.models.dagbag import DagBag # type: ignore[no-redef,
attribute-defined]
if AIRFLOW_V_3_0_PLUS:
@@ -105,7 +105,7 @@ def _bootstrap_dagbag():
dagbag = DagBag()
# Save DAGs in the ORM
- if AIRFLOW_V_3_1_PLUS:
+ if AIRFLOW_V_3_2_PLUS:
from airflow.dag_processing.dagbag import sync_bag_to_db
sync_bag_to_db(dagbag, bundle_name="dags-folder",
bundle_version=None, session=session)
@@ -166,7 +166,7 @@ def initial_db_init():
def parse_and_sync_to_db(folder: Path | str, include_examples: bool = False):
- if AIRFLOW_V_3_1_PLUS:
+ if AIRFLOW_V_3_2_PLUS:
from airflow.dag_processing.dagbag import DagBag
else:
from airflow.models.dagbag import DagBag # type: ignore[no-redef,
attribute-defined]
@@ -180,7 +180,7 @@ def parse_and_sync_to_db(folder: Path | str,
include_examples: bool = False):
session.flush()
dagbag = DagBag(dag_folder=folder, include_examples=include_examples)
- if AIRFLOW_V_3_1_PLUS:
+ if AIRFLOW_V_3_2_PLUS:
from airflow.dag_processing.dagbag import sync_bag_to_db
sync_bag_to_db(dagbag, "dags-folder", None, session=session)
diff --git
a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
index fb84004e7ee..488fc675d0f 100644
--- a/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
+++ b/providers/openlineage/tests/unit/openlineage/plugins/test_execution.py
@@ -37,9 +37,9 @@ from airflow.utils.types import DagRunType
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_runs
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS
-if AIRFLOW_V_3_1_PLUS:
+if AIRFLOW_V_3_2_PLUS:
from airflow.dag_processing.dagbag import DagBag
else:
from airflow.models.dagbag import DagBag # type: ignore[attr-defined,
no-redef]
diff --git
a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
index 8b35d0ad665..4a8cbc03f30 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
@@ -40,7 +40,7 @@ from airflow.providers.standard.triggers.external_task import
WorkflowTrigger
from airflow.providers.standard.utils.sensor_helper import _get_count,
_get_external_task_group_task_ids
from airflow.providers.standard.version_compat import (
AIRFLOW_V_3_0_PLUS,
- AIRFLOW_V_3_1_PLUS,
+ AIRFLOW_V_3_2_PLUS,
BaseOperator,
BaseOperatorLink,
BaseSensorOperator,
@@ -51,7 +51,7 @@ from airflow.utils.state import State, TaskInstanceState
if not AIRFLOW_V_3_0_PLUS:
from airflow.utils.session import NEW_SESSION, provide_session
-if AIRFLOW_V_3_1_PLUS:
+if AIRFLOW_V_3_2_PLUS:
from airflow.dag_processing.dagbag import DagBag
else:
from airflow.models.dagbag import DagBag # type: ignore[attr-defined,
no-redef]
diff --git
a/providers/standard/src/airflow/providers/standard/version_compat.py
b/providers/standard/src/airflow/providers/standard/version_compat.py
index d36955c6776..10820dbe1fd 100644
--- a/providers/standard/src/airflow/providers/standard/version_compat.py
+++ b/providers/standard/src/airflow/providers/standard/version_compat.py
@@ -34,6 +34,7 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
AIRFLOW_V_3_0_PLUS: bool = get_base_airflow_version_tuple() >= (3, 0, 0)
AIRFLOW_V_3_1_PLUS: bool = get_base_airflow_version_tuple() >= (3, 1, 0)
+AIRFLOW_V_3_2_PLUS: bool = get_base_airflow_version_tuple() >= (3, 2, 0)
# BaseOperator is not imported from SDK from 3.0 (and only done from 3.1) due
to a bug with
# DecoratedOperator -- where `DecoratedOperator._handle_output` needed
`xcom_push` to exist on `BaseOperator`
@@ -57,6 +58,7 @@ else:
__all__ = [
"AIRFLOW_V_3_0_PLUS",
"AIRFLOW_V_3_1_PLUS",
+ "AIRFLOW_V_3_2_PLUS",
"BaseOperator",
"BaseOperatorLink",
"BaseHook",
diff --git
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
index a1897f5ffbf..e45b798ebfd 100644
---
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
+++
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
@@ -56,7 +56,7 @@ from airflow.utils.types import DagRunType
from tests_common.test_utils.dag import create_scheduler_dag, sync_dag_to_db,
sync_dags_to_db
from tests_common.test_utils.db import clear_db_runs
from tests_common.test_utils.mock_operators import MockOperator
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS, AIRFLOW_V_3_2_PLUS
if AIRFLOW_V_3_0_PLUS:
from airflow.models.dag_version import DagVersion
@@ -67,14 +67,18 @@ else:
from airflow.models import BaseOperator # type:
ignore[assignment,no-redef]
if AIRFLOW_V_3_1_PLUS:
- from airflow.dag_processing.dagbag import DagBag
from airflow.sdk import TaskGroup
from airflow.sdk.timezone import coerce_datetime, datetime
else:
- from airflow.models.dagbag import DagBag # type: ignore[attr-defined,
no-redef]
from airflow.utils.task_group import TaskGroup # type: ignore[no-redef]
from airflow.utils.timezone import coerce_datetime, datetime # type:
ignore[attr-defined,no-redef]
+if AIRFLOW_V_3_2_PLUS:
+ from airflow.dag_processing.dagbag import DagBag
+else:
+ from airflow.models.dagbag import DagBag # type: ignore[attr-defined,
no-redef]
+
+
pytestmark = pytest.mark.db_test
TI = TaskInstance
diff --git a/providers/standard/tests/unit/standard/sensors/test_time_delta.py
b/providers/standard/tests/unit/standard/sensors/test_time_delta.py
index 629783021c1..65f789f45b2 100644
--- a/providers/standard/tests/unit/standard/sensors/test_time_delta.py
+++ b/providers/standard/tests/unit/standard/sensors/test_time_delta.py
@@ -35,9 +35,9 @@ from airflow.providers.standard.triggers.temporal import
DateTimeTrigger
from airflow.utils.types import DagRunType
from tests_common.test_utils import db
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS, timezone
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS, timezone
-if AIRFLOW_V_3_1_PLUS:
+if AIRFLOW_V_3_2_PLUS:
from airflow.dag_processing.dagbag import DagBag
else:
from airflow.models.dagbag import DagBag # type: ignore[attr-defined,
no-redef]
diff --git a/providers/standard/tests/unit/standard/sensors/test_weekday.py
b/providers/standard/tests/unit/standard/sensors/test_weekday.py
index d789a085061..e95f18ae61c 100644
--- a/providers/standard/tests/unit/standard/sensors/test_weekday.py
+++ b/providers/standard/tests/unit/standard/sensors/test_weekday.py
@@ -27,9 +27,9 @@ from airflow.providers.standard.sensors.weekday import
DayOfWeekSensor
from airflow.providers.standard.utils.weekday import WeekDay
from tests_common.test_utils import db
-from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_1_PLUS, timezone
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS,
AIRFLOW_V_3_2_PLUS, timezone
-if AIRFLOW_V_3_1_PLUS:
+if AIRFLOW_V_3_2_PLUS:
from airflow.dag_processing.dagbag import DagBag
else:
from airflow.models import DagBag # type: ignore[attr-defined, no-redef]