This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 485d78f8c6e Update example dags to import `DAG`, `Asset` etc from Task
SDK (#48014)
485d78f8c6e is described below
commit 485d78f8c6e05a05547f9eb822f122e309bef4f8
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Apr 4 04:18:21 2025 +0530
Update example dags to import `DAG`, `Asset` etc from Task SDK (#48014)
Users of Airflow 3 should import things from Task SDK.
This PR updates imports for DAG, Asset, Param to use SDK.
related #47851
closes #48433
---
.../src/airflow/cli/commands/dag_command.py | 3 +-
.../src/airflow/cli/commands/task_command.py | 4 +-
.../airflow/example_dags/example_asset_alias.py | 3 +-
.../example_asset_alias_with_no_taskflow.py | 3 +-
.../example_dags/example_asset_decorator.py | 3 +-
.../example_dags/example_asset_with_watchers.py | 3 +-
.../src/airflow/example_dags/example_assets.py | 3 +-
.../airflow/example_dags/example_bash_operator.py | 5 +-
.../example_branch_datetime_operator.py | 2 +-
.../example_branch_day_of_week_operator.py | 2 +-
.../airflow/example_dags/example_branch_labels.py | 2 +-
.../example_dags/example_branch_operator.py | 2 +-
.../example_branch_operator_decorator.py | 2 +-
.../example_branch_python_dop_operator_3.py | 2 +-
.../src/airflow/example_dags/example_complex.py | 3 +-
.../airflow/example_dags/example_custom_weight.py | 6 +-
.../airflow/example_dags/example_dag_decorator.py | 2 +-
.../example_dags/example_dynamic_task_mapping.py | 2 +-
...amic_task_mapping_with_no_taskflow_operators.py | 2 +-
.../example_external_task_marker_dag.py | 2 +-
.../example_dags/example_inlet_event_extra.py | 3 +-
.../example_dags/example_kubernetes_executor.py | 2 +-
.../airflow/example_dags/example_latest_only.py | 2 +-
.../example_latest_only_with_trigger.py | 2 +-
.../example_local_kubernetes_executor.py | 2 +-
.../example_dags/example_nested_branch_dag.py | 2 +-
.../example_dags/example_outlet_event_extra.py | 4 +-
.../example_dags/example_params_trigger_ui.py | 3 +-
.../example_dags/example_params_ui_tutorial.py | 3 +-
.../example_passing_params_via_test_command.py | 2 +-
.../example_dags/example_python_operator.py | 2 +-
.../src/airflow/example_dags/example_sensors.py | 2 +-
.../airflow/example_dags/example_setup_teardown.py | 2 +-
.../example_setup_teardown_taskflow.py | 2 +-
.../example_dags/example_short_circuit_operator.py | 3 +-
.../src/airflow/example_dags/example_skip_dag.py | 4 +-
.../src/airflow/example_dags/example_task_group.py | 2 +-
.../example_dags/example_task_group_decorator.py | 2 +-
.../example_time_delta_sensor_async.py | 2 +-
.../example_dags/example_trigger_controller_dag.py | 2 +-
.../example_dags/example_trigger_target_dag.py | 2 +-
.../example_dags/example_workday_timetable.py | 2 +-
.../src/airflow/example_dags/example_xcom.py | 2 +-
.../src/airflow/example_dags/example_xcomargs.py | 2 +-
airflow-core/src/airflow/example_dags/tutorial.py | 6 +-
.../src/airflow/example_dags/tutorial_dag.py | 6 +-
airflow-core/src/airflow/models/dag.py | 74 ++++++++++++++++++++++
airflow-core/src/airflow/utils/cli.py | 5 +-
.../tests/unit/cli/commands/test_dag_command.py | 2 +-
.../tests/unit/cli/commands/test_task_command.py | 4 ++
airflow-core/tests/unit/jobs/test_scheduler_job.py | 22 +++++--
airflow-core/tests/unit/models/test_dagbag.py | 10 +--
airflow-core/tests/unit/models/test_dagcode.py | 8 ++-
.../tests/unit/models/test_serialized_dag.py | 16 +++--
54 files changed, 177 insertions(+), 88 deletions(-)
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py
b/airflow-core/src/airflow/cli/commands/dag_command.py
index 9bff688a41b..44e14352ad1 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -44,6 +44,7 @@ from airflow.dag_processing.bundles.manager import
DagBundlesManager
from airflow.exceptions import AirflowException
from airflow.jobs.job import Job
from airflow.models import DagBag, DagModel, DagRun, DagTag, TaskInstance
+from airflow.models.dag import DAG
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.sdk.definitions._internal.dag_parsing_context import
_airflow_parsing_context_manager
@@ -59,7 +60,6 @@ if TYPE_CHECKING:
from graphviz.dot import Dot
from sqlalchemy.orm import Session
- from airflow.models.dag import DAG
from airflow.timetables.base import DataInterval
log = logging.getLogger(__name__)
@@ -656,6 +656,7 @@ def dag_test(args, dag: DAG | None = None, session: Session
= NEW_SESSION) -> No
f"Dag {args.dag_id!r} could not be found; either it does not exist
or it failed to parse."
)
+ dag = DAG.from_sdk_dag(dag)
dr: DagRun = dag.test(
logical_date=logical_date,
run_conf=run_conf,
diff --git a/airflow-core/src/airflow/cli/commands/task_command.py
b/airflow-core/src/airflow/cli/commands/task_command.py
index 1d64b084664..e6fb5f68ac9 100644
--- a/airflow-core/src/airflow/cli/commands/task_command.py
+++ b/airflow-core/src/airflow/cli/commands/task_command.py
@@ -255,7 +255,7 @@ def task_state(args) -> None:
>>> airflow tasks state tutorial sleep 2015-01-01
success
"""
- dag = get_dag(args.subdir, args.dag_id)
+ dag = get_dag(args.subdir, args.dag_id, from_db=True)
task = dag.get_task(task_id=args.task_id)
ti, _ = _get_ti(task, args.map_index,
logical_date_or_run_id=args.logical_date_or_run_id)
print(ti.current_state())
@@ -367,6 +367,8 @@ def task_test(args, dag: DAG | None = None, session:
Session = NEW_SESSION) -> N
dag = dag or get_dag(args.subdir, args.dag_id)
+ dag = DAG.from_sdk_dag(dag)
+
task = dag.get_task(task_id=args.task_id)
# Add CLI provided task_params to task.params
if args.task_params:
diff --git a/airflow-core/src/airflow/example_dags/example_asset_alias.py
b/airflow-core/src/airflow/example_dags/example_asset_alias.py
index 9c98b9756a8..dfbad070bb3 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_alias.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_alias.py
@@ -34,9 +34,8 @@ from __future__ import annotations
import pendulum
-from airflow import DAG
from airflow.decorators import task
-from airflow.sdk.definitions.asset import Asset, AssetAlias
+from airflow.sdk import DAG, Asset, AssetAlias
with DAG(
dag_id="asset_s3_bucket_producer",
diff --git
a/airflow-core/src/airflow/example_dags/example_asset_alias_with_no_taskflow.py
b/airflow-core/src/airflow/example_dags/example_asset_alias_with_no_taskflow.py
index c3d1ac0b8d1..d9de5e06d9f 100644
---
a/airflow-core/src/airflow/example_dags/example_asset_alias_with_no_taskflow.py
+++
b/airflow-core/src/airflow/example_dags/example_asset_alias_with_no_taskflow.py
@@ -35,9 +35,8 @@ from __future__ import annotations
import pendulum
-from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
-from airflow.sdk.definitions.asset import Asset, AssetAlias
+from airflow.sdk import DAG, Asset, AssetAlias
with DAG(
dag_id="asset_s3_bucket_producer_with_no_taskflow",
diff --git a/airflow-core/src/airflow/example_dags/example_asset_decorator.py
b/airflow-core/src/airflow/example_dags/example_asset_decorator.py
index b7560f21342..1e3ea4b9fcb 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_decorator.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_decorator.py
@@ -19,8 +19,7 @@ from __future__ import annotations
import pendulum
from airflow.decorators import dag, task
-from airflow.sdk.definitions.asset import Asset
-from airflow.sdk.definitions.asset.decorators import asset
+from airflow.sdk import Asset, asset
@asset(uri="s3://bucket/asset1_producer", schedule=None)
diff --git
a/airflow-core/src/airflow/example_dags/example_asset_with_watchers.py
b/airflow-core/src/airflow/example_dags/example_asset_with_watchers.py
index aa430d57b06..8468806b137 100644
--- a/airflow-core/src/airflow/example_dags/example_asset_with_watchers.py
+++ b/airflow-core/src/airflow/example_dags/example_asset_with_watchers.py
@@ -21,9 +21,8 @@ Example DAG for demonstrating the usage of event driven
scheduling using assets
from __future__ import annotations
from airflow.decorators import task
-from airflow.models.dag import DAG
from airflow.providers.standard.triggers.file import FileDeleteTrigger
-from airflow.sdk import Asset, AssetWatcher, chain
+from airflow.sdk import DAG, Asset, AssetWatcher, chain
file_path = "/tmp/test"
diff --git a/airflow-core/src/airflow/example_dags/example_assets.py
b/airflow-core/src/airflow/example_dags/example_assets.py
index b81cdad9453..2bb3cffc527 100644
--- a/airflow-core/src/airflow/example_dags/example_assets.py
+++ b/airflow-core/src/airflow/example_dags/example_assets.py
@@ -54,9 +54,8 @@ from __future__ import annotations
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk import DAG, Asset
from airflow.timetables.assets import AssetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
diff --git a/airflow-core/src/airflow/example_dags/example_bash_operator.py
b/airflow-core/src/airflow/example_dags/example_bash_operator.py
index ab9d09f5cbb..a4fb3161ab6 100644
--- a/airflow-core/src/airflow/example_dags/example_bash_operator.py
+++ b/airflow-core/src/airflow/example_dags/example_bash_operator.py
@@ -23,9 +23,9 @@ import datetime
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
with DAG(
dag_id="example_bash_operator",
@@ -72,6 +72,3 @@ this_will_skip = BashOperator(
)
# [END howto_operator_bash_skip]
this_will_skip >> run_this_last
-
-if __name__ == "__main__":
- dag.test()
diff --git
a/airflow-core/src/airflow/example_dags/example_branch_datetime_operator.py
b/airflow-core/src/airflow/example_dags/example_branch_datetime_operator.py
index 39589b46f3f..710bdb1de89 100644
--- a/airflow-core/src/airflow/example_dags/example_branch_datetime_operator.py
+++ b/airflow-core/src/airflow/example_dags/example_branch_datetime_operator.py
@@ -24,9 +24,9 @@ from __future__ import annotations
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.datetime import
BranchDateTimeOperator
from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
dag1 = DAG(
dag_id="example_branch_datetime_operator",
diff --git
a/airflow-core/src/airflow/example_dags/example_branch_day_of_week_operator.py
b/airflow-core/src/airflow/example_dags/example_branch_day_of_week_operator.py
index 9182f9a7898..43400522468 100644
---
a/airflow-core/src/airflow/example_dags/example_branch_day_of_week_operator.py
+++
b/airflow-core/src/airflow/example_dags/example_branch_day_of_week_operator.py
@@ -23,10 +23,10 @@ from __future__ import annotations
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.weekday import
BranchDayOfWeekOperator
from airflow.providers.standard.utils.weekday import WeekDay
+from airflow.sdk import DAG
with DAG(
dag_id="example_weekday_branch_operator",
diff --git a/airflow-core/src/airflow/example_dags/example_branch_labels.py
b/airflow-core/src/airflow/example_dags/example_branch_labels.py
index fadb5f23cb5..edc1b059738 100644
--- a/airflow-core/src/airflow/example_dags/example_branch_labels.py
+++ b/airflow-core/src/airflow/example_dags/example_branch_labels.py
@@ -23,8 +23,8 @@ from __future__ import annotations
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
from airflow.utils.edgemodifier import Label
with DAG(
diff --git a/airflow-core/src/airflow/example_dags/example_branch_operator.py
b/airflow-core/src/airflow/example_dags/example_branch_operator.py
index aed92576e3a..d824de4aa08 100644
--- a/airflow-core/src/airflow/example_dags/example_branch_operator.py
+++ b/airflow-core/src/airflow/example_dags/example_branch_operator.py
@@ -29,7 +29,6 @@ from pathlib import Path
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import (
BranchExternalPythonOperator,
@@ -39,6 +38,7 @@ from airflow.providers.standard.operators.python import (
PythonOperator,
PythonVirtualenvOperator,
)
+from airflow.sdk import DAG
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule
diff --git
a/airflow-core/src/airflow/example_dags/example_branch_operator_decorator.py
b/airflow-core/src/airflow/example_dags/example_branch_operator_decorator.py
index f30fd7394b0..5c76080024f 100644
--- a/airflow-core/src/airflow/example_dags/example_branch_operator_decorator.py
+++ b/airflow-core/src/airflow/example_dags/example_branch_operator_decorator.py
@@ -31,8 +31,8 @@ import tempfile
import pendulum
from airflow.decorators import task
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule
diff --git
a/airflow-core/src/airflow/example_dags/example_branch_python_dop_operator_3.py
b/airflow-core/src/airflow/example_dags/example_branch_python_dop_operator_3.py
index 8bca82645f8..cc2358da7ce 100644
---
a/airflow-core/src/airflow/example_dags/example_branch_python_dop_operator_3.py
+++
b/airflow-core/src/airflow/example_dags/example_branch_python_dop_operator_3.py
@@ -25,8 +25,8 @@ from __future__ import annotations
import pendulum
from airflow.decorators import task
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
@task.branch()
diff --git a/airflow-core/src/airflow/example_dags/example_complex.py
b/airflow-core/src/airflow/example_dags/example_complex.py
index 601e5b4c790..1a414aa4d6c 100644
--- a/airflow-core/src/airflow/example_dags/example_complex.py
+++ b/airflow-core/src/airflow/example_dags/example_complex.py
@@ -23,9 +23,8 @@ from __future__ import annotations
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
-from airflow.sdk import chain
+from airflow.sdk import DAG, chain
with DAG(
dag_id="example_complex",
diff --git a/airflow-core/src/airflow/example_dags/example_custom_weight.py
b/airflow-core/src/airflow/example_dags/example_custom_weight.py
index 1e1f9c31e94..4bbc261c553 100644
--- a/airflow-core/src/airflow/example_dags/example_custom_weight.py
+++ b/airflow-core/src/airflow/example_dags/example_custom_weight.py
@@ -24,11 +24,11 @@ import datetime
import pendulum
from airflow.example_dags.plugins.decreasing_priority_weight_strategy import
DecreasingPriorityStrategy
-
-# [START example_custom_weight_dag]
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
+
+# [START example_custom_weight_dag]
with DAG(
dag_id="example_custom_weight",
diff --git a/airflow-core/src/airflow/example_dags/example_dag_decorator.py
b/airflow-core/src/airflow/example_dags/example_dag_decorator.py
index 995bdcb3687..9d1aec9b1e2 100644
--- a/airflow-core/src/airflow/example_dags/example_dag_decorator.py
+++ b/airflow-core/src/airflow/example_dags/example_dag_decorator.py
@@ -27,7 +27,7 @@ from airflow.models.baseoperator import BaseOperator
from airflow.providers.standard.operators.bash import BashOperator
if TYPE_CHECKING:
- from airflow.sdk.definitions.context import Context
+ from airflow.sdk import Context
class GetRequestOperator(BaseOperator):
diff --git
a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py
b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py
index 61c32bf3832..54967e97dbe 100644
--- a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py
+++ b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping.py
@@ -22,7 +22,7 @@ from __future__ import annotations
from datetime import datetime
from airflow.decorators import task
-from airflow.models.dag import DAG
+from airflow.sdk import DAG
with DAG(dag_id="example_dynamic_task_mapping", schedule=None,
start_date=datetime(2022, 3, 4)) as dag:
diff --git
a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py
b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py
index 3d42ac47b56..c762eee74f9 100644
---
a/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py
+++
b/airflow-core/src/airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py
@@ -22,7 +22,7 @@ from __future__ import annotations
from datetime import datetime
from airflow.models.baseoperator import BaseOperator
-from airflow.models.dag import DAG
+from airflow.sdk import DAG
class AddOneOperator(BaseOperator):
diff --git
a/airflow-core/src/airflow/example_dags/example_external_task_marker_dag.py
b/airflow-core/src/airflow/example_dags/example_external_task_marker_dag.py
index a4c630a1e9f..b8fad182549 100644
--- a/airflow-core/src/airflow/example_dags/example_external_task_marker_dag.py
+++ b/airflow-core/src/airflow/example_dags/example_external_task_marker_dag.py
@@ -42,9 +42,9 @@ from __future__ import annotations
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.sensors.external_task import
ExternalTaskMarker, ExternalTaskSensor
+from airflow.sdk import DAG
start_date = pendulum.datetime(2021, 1, 1, tz="UTC")
diff --git a/airflow-core/src/airflow/example_dags/example_inlet_event_extra.py
b/airflow-core/src/airflow/example_dags/example_inlet_event_extra.py
index c503e832a83..9919564f3ee 100644
--- a/airflow-core/src/airflow/example_dags/example_inlet_event_extra.py
+++ b/airflow-core/src/airflow/example_dags/example_inlet_event_extra.py
@@ -26,9 +26,8 @@ from __future__ import annotations
import datetime
from airflow.decorators import task
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
-from airflow.sdk.definitions.asset import Asset
+from airflow.sdk import DAG, Asset
asset = Asset("s3://output/1.txt")
diff --git
a/airflow-core/src/airflow/example_dags/example_kubernetes_executor.py
b/airflow-core/src/airflow/example_dags/example_kubernetes_executor.py
index 27ffb4cdad6..c7082acad05 100644
--- a/airflow-core/src/airflow/example_dags/example_kubernetes_executor.py
+++ b/airflow-core/src/airflow/example_dags/example_kubernetes_executor.py
@@ -29,7 +29,7 @@ import pendulum
from airflow.configuration import conf
from airflow.decorators import task
from airflow.example_dags.libs.helper import print_stuff
-from airflow.models.dag import DAG
+from airflow.sdk import DAG
log = logging.getLogger(__name__)
diff --git a/airflow-core/src/airflow/example_dags/example_latest_only.py
b/airflow-core/src/airflow/example_dags/example_latest_only.py
index 1bda2751312..8881d7d1f80 100644
--- a/airflow-core/src/airflow/example_dags/example_latest_only.py
+++ b/airflow-core/src/airflow/example_dags/example_latest_only.py
@@ -21,9 +21,9 @@ from __future__ import annotations
import datetime
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
+from airflow.sdk import DAG
with DAG(
dag_id="latest_only",
diff --git
a/airflow-core/src/airflow/example_dags/example_latest_only_with_trigger.py
b/airflow-core/src/airflow/example_dags/example_latest_only_with_trigger.py
index 63dba3a5d39..7e8a2b1b475 100644
--- a/airflow-core/src/airflow/example_dags/example_latest_only_with_trigger.py
+++ b/airflow-core/src/airflow/example_dags/example_latest_only_with_trigger.py
@@ -26,9 +26,9 @@ import datetime
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
+from airflow.sdk import DAG
from airflow.utils.trigger_rule import TriggerRule
with DAG(
diff --git
a/airflow-core/src/airflow/example_dags/example_local_kubernetes_executor.py
b/airflow-core/src/airflow/example_dags/example_local_kubernetes_executor.py
index 07be5ba1d38..8563ae05435 100644
--- a/airflow-core/src/airflow/example_dags/example_local_kubernetes_executor.py
+++ b/airflow-core/src/airflow/example_dags/example_local_kubernetes_executor.py
@@ -27,7 +27,7 @@ from datetime import datetime
from airflow.configuration import conf
from airflow.decorators import task
from airflow.example_dags.libs.helper import print_stuff
-from airflow.models.dag import DAG
+from airflow.sdk import DAG
log = logging.getLogger(__name__)
diff --git a/airflow-core/src/airflow/example_dags/example_nested_branch_dag.py
b/airflow-core/src/airflow/example_dags/example_nested_branch_dag.py
index d0f3eea02d7..ac2907645c2 100644
--- a/airflow-core/src/airflow/example_dags/example_nested_branch_dag.py
+++ b/airflow-core/src/airflow/example_dags/example_nested_branch_dag.py
@@ -26,8 +26,8 @@ from __future__ import annotations
import pendulum
from airflow.decorators import task
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
from airflow.utils.trigger_rule import TriggerRule
with DAG(
diff --git
a/airflow-core/src/airflow/example_dags/example_outlet_event_extra.py
b/airflow-core/src/airflow/example_dags/example_outlet_event_extra.py
index 8b08bb5fc94..18fd49263e1 100644
--- a/airflow-core/src/airflow/example_dags/example_outlet_event_extra.py
+++ b/airflow-core/src/airflow/example_dags/example_outlet_event_extra.py
@@ -26,10 +26,8 @@ from __future__ import annotations
import datetime
from airflow.decorators import task
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
-from airflow.sdk.definitions.asset import Asset
-from airflow.sdk.definitions.asset.metadata import Metadata
+from airflow.sdk import DAG, Asset, Metadata
asset = Asset(uri="s3://output/1.txt", name="test-asset")
diff --git a/airflow-core/src/airflow/example_dags/example_params_trigger_ui.py
b/airflow-core/src/airflow/example_dags/example_params_trigger_ui.py
index 9acabdda980..b1f331d3faa 100644
--- a/airflow-core/src/airflow/example_dags/example_params_trigger_ui.py
+++ b/airflow-core/src/airflow/example_dags/example_params_trigger_ui.py
@@ -26,8 +26,7 @@ import datetime
from pathlib import Path
from airflow.decorators import task
-from airflow.models.dag import DAG
-from airflow.sdk import Param
+from airflow.sdk import DAG, Param
from airflow.utils.trigger_rule import TriggerRule
# [START params_trigger]
diff --git
a/airflow-core/src/airflow/example_dags/example_params_ui_tutorial.py
b/airflow-core/src/airflow/example_dags/example_params_ui_tutorial.py
index 25aac4e1374..6634bb9a6a7 100644
--- a/airflow-core/src/airflow/example_dags/example_params_ui_tutorial.py
+++ b/airflow-core/src/airflow/example_dags/example_params_ui_tutorial.py
@@ -28,8 +28,7 @@ import json
from pathlib import Path
from airflow.decorators import task
-from airflow.models.dag import DAG
-from airflow.sdk import Param
+from airflow.sdk import DAG, Param
with DAG(
dag_id=Path(__file__).stem,
diff --git
a/airflow-core/src/airflow/example_dags/example_passing_params_via_test_command.py
b/airflow-core/src/airflow/example_dags/example_passing_params_via_test_command.py
index 7dcd963c096..2b390756b9f 100644
---
a/airflow-core/src/airflow/example_dags/example_passing_params_via_test_command.py
+++
b/airflow-core/src/airflow/example_dags/example_passing_params_via_test_command.py
@@ -26,8 +26,8 @@ import textwrap
import pendulum
from airflow.decorators import task
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk import DAG
@task(task_id="run_this")
diff --git a/airflow-core/src/airflow/example_dags/example_python_operator.py
b/airflow-core/src/airflow/example_dags/example_python_operator.py
index 976813d53fd..d48bcae483c 100644
--- a/airflow-core/src/airflow/example_dags/example_python_operator.py
+++ b/airflow-core/src/airflow/example_dags/example_python_operator.py
@@ -29,12 +29,12 @@ from pprint import pprint
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.python import (
ExternalPythonOperator,
PythonOperator,
PythonVirtualenvOperator,
)
+from airflow.sdk import DAG
log = logging.getLogger(__name__)
diff --git a/airflow-core/src/airflow/example_dags/example_sensors.py
b/airflow-core/src/airflow/example_dags/example_sensors.py
index 7481271edbc..5ec33bad51b 100644
--- a/airflow-core/src/airflow/example_dags/example_sensors.py
+++ b/airflow-core/src/airflow/example_dags/example_sensors.py
@@ -21,7 +21,6 @@ import datetime
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.sensors.bash import BashSensor
from airflow.providers.standard.sensors.filesystem import FileSensor
@@ -30,6 +29,7 @@ from airflow.providers.standard.sensors.time import
TimeSensor, TimeSensorAsync
from airflow.providers.standard.sensors.time_delta import TimeDeltaSensor,
TimeDeltaSensorAsync
from airflow.providers.standard.sensors.weekday import DayOfWeekSensor
from airflow.providers.standard.utils.weekday import WeekDay
+from airflow.sdk import DAG
from airflow.utils.trigger_rule import TriggerRule
diff --git a/airflow-core/src/airflow/example_dags/example_setup_teardown.py
b/airflow-core/src/airflow/example_dags/example_setup_teardown.py
index 81994fabc20..a36e79a55e5 100644
--- a/airflow-core/src/airflow/example_dags/example_setup_teardown.py
+++ b/airflow-core/src/airflow/example_dags/example_setup_teardown.py
@@ -21,8 +21,8 @@ from __future__ import annotations
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk import DAG
from airflow.utils.task_group import TaskGroup
with DAG(
diff --git
a/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py
b/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py
index 6fec9f9a478..6eeee2b4235 100644
--- a/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py
+++ b/airflow-core/src/airflow/example_dags/example_setup_teardown_taskflow.py
@@ -22,7 +22,7 @@ from __future__ import annotations
import pendulum
from airflow.decorators import setup, task, task_group, teardown
-from airflow.models.dag import DAG
+from airflow.sdk import DAG
with DAG(
dag_id="example_setup_teardown_taskflow",
diff --git
a/airflow-core/src/airflow/example_dags/example_short_circuit_operator.py
b/airflow-core/src/airflow/example_dags/example_short_circuit_operator.py
index b20f8acb8d4..494bd55a869 100644
--- a/airflow-core/src/airflow/example_dags/example_short_circuit_operator.py
+++ b/airflow-core/src/airflow/example_dags/example_short_circuit_operator.py
@@ -21,10 +21,9 @@ from __future__ import annotations
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import ShortCircuitOperator
-from airflow.sdk import chain
+from airflow.sdk import DAG, chain
from airflow.utils.trigger_rule import TriggerRule
with DAG(
diff --git a/airflow-core/src/airflow/example_dags/example_skip_dag.py
b/airflow-core/src/airflow/example_dags/example_skip_dag.py
index 99f439cc3f7..7575494d0d9 100644
--- a/airflow-core/src/airflow/example_dags/example_skip_dag.py
+++ b/airflow-core/src/airflow/example_dags/example_skip_dag.py
@@ -26,12 +26,12 @@ import pendulum
from airflow.exceptions import AirflowSkipException
from airflow.models.baseoperator import BaseOperator
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
from airflow.utils.trigger_rule import TriggerRule
if TYPE_CHECKING:
- from airflow.sdk.definitions.context import Context
+ from airflow.sdk import Context
# Create some placeholder operators
diff --git a/airflow-core/src/airflow/example_dags/example_task_group.py
b/airflow-core/src/airflow/example_dags/example_task_group.py
index 2f1931808b5..e83ac2e9989 100644
--- a/airflow-core/src/airflow/example_dags/example_task_group.py
+++ b/airflow-core/src/airflow/example_dags/example_task_group.py
@@ -21,9 +21,9 @@ from __future__ import annotations
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
from airflow.utils.task_group import TaskGroup
# [START howto_task_group]
diff --git
a/airflow-core/src/airflow/example_dags/example_task_group_decorator.py
b/airflow-core/src/airflow/example_dags/example_task_group_decorator.py
index ce4a0e33b8c..29b270a763a 100644
--- a/airflow-core/src/airflow/example_dags/example_task_group_decorator.py
+++ b/airflow-core/src/airflow/example_dags/example_task_group_decorator.py
@@ -22,7 +22,7 @@ from __future__ import annotations
import pendulum
from airflow.decorators import task, task_group
-from airflow.models.dag import DAG
+from airflow.sdk import DAG
# [START howto_task_group_decorator]
diff --git
a/airflow-core/src/airflow/example_dags/example_time_delta_sensor_async.py
b/airflow-core/src/airflow/example_dags/example_time_delta_sensor_async.py
index 140dd866b9e..7b847e6871e 100644
--- a/airflow-core/src/airflow/example_dags/example_time_delta_sensor_async.py
+++ b/airflow-core/src/airflow/example_dags/example_time_delta_sensor_async.py
@@ -26,9 +26,9 @@ import datetime
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.sensors.time_delta import TimeDeltaSensorAsync
+from airflow.sdk import DAG
with DAG(
dag_id="example_time_delta_sensor_async",
diff --git
a/airflow-core/src/airflow/example_dags/example_trigger_controller_dag.py
b/airflow-core/src/airflow/example_dags/example_trigger_controller_dag.py
index e546bd0a7e8..6adac540957 100644
--- a/airflow-core/src/airflow/example_dags/example_trigger_controller_dag.py
+++ b/airflow-core/src/airflow/example_dags/example_trigger_controller_dag.py
@@ -25,8 +25,8 @@ from __future__ import annotations
import pendulum
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.trigger_dagrun import
TriggerDagRunOperator
+from airflow.sdk import DAG
with DAG(
dag_id="example_trigger_controller_dag",
diff --git
a/airflow-core/src/airflow/example_dags/example_trigger_target_dag.py
b/airflow-core/src/airflow/example_dags/example_trigger_target_dag.py
index 3af68a25607..6eae90499d5 100644
--- a/airflow-core/src/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow-core/src/airflow/example_dags/example_trigger_target_dag.py
@@ -26,8 +26,8 @@ from __future__ import annotations
import pendulum
from airflow.decorators import task
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk import DAG
@task(task_id="run_this")
diff --git a/airflow-core/src/airflow/example_dags/example_workday_timetable.py
b/airflow-core/src/airflow/example_dags/example_workday_timetable.py
index db569c8e627..c5e942ec654 100644
--- a/airflow-core/src/airflow/example_dags/example_workday_timetable.py
+++ b/airflow-core/src/airflow/example_dags/example_workday_timetable.py
@@ -19,8 +19,8 @@ from __future__ import annotations
import pendulum
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import DAG
with DAG(
dag_id="example_workday_timetable",
diff --git a/airflow-core/src/airflow/example_dags/example_xcom.py
b/airflow-core/src/airflow/example_dags/example_xcom.py
index 2563eda77ee..00c88073891 100644
--- a/airflow-core/src/airflow/example_dags/example_xcom.py
+++ b/airflow-core/src/airflow/example_dags/example_xcom.py
@@ -22,9 +22,9 @@ from __future__ import annotations
import pendulum
from airflow.decorators import task
-from airflow.models.dag import DAG
from airflow.models.xcom_arg import XComArg
from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk import DAG
value_1 = [1, 2, 3]
value_2 = {"a": "b"}
diff --git a/airflow-core/src/airflow/example_dags/example_xcomargs.py
b/airflow-core/src/airflow/example_dags/example_xcomargs.py
index a7103dc1911..166a65e4c96 100644
--- a/airflow-core/src/airflow/example_dags/example_xcomargs.py
+++ b/airflow-core/src/airflow/example_dags/example_xcomargs.py
@@ -24,8 +24,8 @@ import logging
import pendulum
from airflow.decorators import task
-from airflow.models.dag import DAG
from airflow.providers.standard.operators.bash import BashOperator
+from airflow.sdk import DAG
log = logging.getLogger(__name__)
diff --git a/airflow-core/src/airflow/example_dags/tutorial.py
b/airflow-core/src/airflow/example_dags/tutorial.py
index 8f371ee2427..7e12eb45469 100644
--- a/airflow-core/src/airflow/example_dags/tutorial.py
+++ b/airflow-core/src/airflow/example_dags/tutorial.py
@@ -28,12 +28,12 @@ from __future__ import annotations
import textwrap
from datetime import datetime, timedelta
-# The DAG object; we'll need this to instantiate a DAG
-from airflow.models.dag import DAG
-
# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator
+# The DAG object; we'll need this to instantiate a DAG
+from airflow.sdk import DAG
+
# [END import_module]
diff --git a/airflow-core/src/airflow/example_dags/tutorial_dag.py
b/airflow-core/src/airflow/example_dags/tutorial_dag.py
index 0e4f5086efc..0f891e5dd78 100644
--- a/airflow-core/src/airflow/example_dags/tutorial_dag.py
+++ b/airflow-core/src/airflow/example_dags/tutorial_dag.py
@@ -29,12 +29,12 @@ import textwrap
import pendulum
-# The DAG object; we'll need this to instantiate a DAG
-from airflow.models.dag import DAG
-
# Operators; we need this to operate!
from airflow.providers.standard.operators.python import PythonOperator
+# The DAG object; we'll need this to instantiate a DAG
+from airflow.sdk import DAG
+
# [END import_module]
# [START instantiate_dag]
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index 8ca0f338330..5e6a5013c5e 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -90,6 +90,7 @@ from airflow.models.taskinstance import (
clear_task_instances,
)
from airflow.models.tasklog import LogTemplate
+from airflow.sdk import TaskGroup
from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetUniqueKey,
BaseAsset
from airflow.sdk.definitions.dag import DAG as TaskSDKDag, dag as
task_sdk_dag_decorator
from airflow.secrets.local_filesystem import LocalFilesystemBackend
@@ -1997,6 +1998,79 @@ class DAG(TaskSDKDag, LoggingMixin):
if isinstance(port, of_type):
yield task.task_id, port
+ @classmethod
+ def from_sdk_dag(cls, dag: TaskSDKDag) -> DAG:
+ """Create a new (Scheduler) DAG object from a TaskSDKDag."""
+ if not isinstance(dag, TaskSDKDag):
+ return dag
+
+ fields = attrs.fields(dag.__class__)
+
+ kwargs = {}
+ for field in fields:
+ # Skip fields that are:
+ # 1. Initialized after creation (init=False)
+ # 2. Internal state fields that shouldn't be copied
+ if not field.init or field.name in ["edge_info"]:
+ continue
+
+ value = getattr(dag, field.name)
+
+ # Handle special cases where values need conversion
+ if field.name == "max_consecutive_failed_dag_runs":
+ # SchedulerDAG requires this to be >= 0, while TaskSDKDag
allows -1
+ if value == -1:
+ # If it is -1, we get the default value from the DAG
+ continue
+
+ kwargs[field.name] = value
+
+ new_dag = cls(**kwargs)
+
+ task_group_map = {}
+
+ def create_task_groups(task_group, parent_group=None):
+ new_task_group = copy.deepcopy(task_group)
+
+ new_task_group.dag = new_dag
+ new_task_group.parent_group = parent_group
+ new_task_group.children = {}
+
+ task_group_map[task_group.group_id] = new_task_group
+
+ for child in task_group.children.values():
+ if isinstance(child, TaskGroup):
+ create_task_groups(child, new_task_group)
+
+ create_task_groups(dag.task_group)
+
+ def create_tasks(task):
+ if isinstance(task, TaskGroup):
+ return task_group_map[task.group_id]
+
+ new_task = copy.deepcopy(task)
+
+ # Only overwrite the specific attributes we want to change
+ new_task.task_id = task.task_id
+ new_task.dag = None # Don't set dag yet
+ new_task.task_group = task_group_map.get(task.task_group.group_id)
if task.task_group else None
+
+ return new_task
+
+ # Process all tasks in the original DAG
+ for task in dag.tasks:
+ new_task = create_tasks(task)
+ if not isinstance(new_task, TaskGroup):
+ # Add the task to the DAG
+ new_dag.task_dict[new_task.task_id] = new_task
+ if new_task.task_group:
+ new_task.task_group.children[new_task.task_id] = new_task
+ new_task.dag = new_dag
+
+ new_dag.edge_info = dag.edge_info.copy()
+
+ return new_dag
+
class DagTag(Base):
"""A tag name per dag, to allow quick filtering in the DAG view."""
diff --git a/airflow-core/src/airflow/utils/cli.py
b/airflow-core/src/airflow/utils/cli.py
index 0114d65e04f..e2137237f69 100644
--- a/airflow-core/src/airflow/utils/cli.py
+++ b/airflow-core/src/airflow/utils/cli.py
@@ -266,7 +266,8 @@ def get_dag(subdir: str | None, dag_id: str, from_db: bool
= False) -> DAG:
find the correct path (assuming it's a file) and failing that, use the
configured
dags folder.
"""
- from airflow.models import DagBag
+ from airflow.models.dag import DAG
+ from airflow.models.dagbag import DagBag
if from_db:
dagbag = DagBag(read_dags_from_db=True)
@@ -275,6 +276,8 @@ def get_dag(subdir: str | None, dag_id: str, from_db: bool
= False) -> DAG:
first_path = process_subdir(subdir)
dagbag = DagBag(first_path)
dag = dagbag.dags.get(dag_id) # avoids db calls made in get_dag
+ # Create a SchedulerDAG since some of the CLI commands rely on DB
access
+ dag = DAG.from_sdk_dag(dag)
if not dag:
if from_db:
raise AirflowException(f"Dag {dag_id!r} could not be found in
DagBag read from database.")
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index 77944cbc2a2..55ead5897d4 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -310,7 +310,7 @@ class TestCliDags:
@conf_vars({("core", "load_examples"): "true"})
def test_dagbag_dag_col(self):
valid_cols = [c for c in dag_command.DAGSchema().fields]
- dagbag = DagBag(include_examples=True)
+ dagbag = DagBag(include_examples=True, read_dags_from_db=True)
dag_details =
dag_command._get_dagbag_dag_details(dagbag.get_dag("tutorial_dag"))
assert list(dag_details.keys()) == valid_cols
diff --git a/airflow-core/tests/unit/cli/commands/test_task_command.py
b/airflow-core/tests/unit/cli/commands/test_task_command.py
index 57534bd978b..b3dd69912ce 100644
--- a/airflow-core/tests/unit/cli/commands/test_task_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_task_command.py
@@ -41,6 +41,7 @@ from airflow.configuration import conf
from airflow.exceptions import DagRunNotFound
from airflow.models import DagBag, DagRun, TaskInstance
from airflow.providers.standard.operators.bash import BashOperator
+from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State, TaskInstanceState
@@ -339,6 +340,9 @@ class TestCliTasks:
def test_task_states_for_dag_run(self):
dag2 = DagBag().dags["example_python_operator"]
task2 = dag2.get_task(task_id="print_the_context")
+
+ dag2 = SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(dag2))
+
default_date2 = timezone.datetime(2016, 1, 9)
dag2.clear()
data_interval =
dag2.timetable.infer_manual_data_interval(run_after=default_date2)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index b4970176248..c703037b0f0 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -64,7 +64,7 @@ from airflow.models.taskinstance import TaskInstance
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk.definitions.asset import Asset
-from airflow.serialization.serialized_objects import SerializedDAG
+from airflow.serialization.serialized_objects import LazyDeserializedDAG,
SerializedDAG
from airflow.timetables.base import DataInterval
from airflow.traces.tracer import Trace
from airflow.utils import timezone
@@ -5805,12 +5805,17 @@ class TestSchedulerJob:
dagfile = os.path.join(EXAMPLE_DAGS_FOLDER,
"example_branch_operator.py")
dagbag = DagBag(dagfile)
dag = dagbag.get_dag("example_branch_operator")
- DAG.bulk_write_to_db("testing", None, [dag])
+ dm = LazyDeserializedDAG(data=SerializedDAG.to_dict(dag))
+ scheduler_dag = DAG.from_sdk_dag(dag)
+
+ DAG.bulk_write_to_db("testing", None, [dm])
SerializedDagModel.write_dag(dag=dag, bundle_name="testing")
dag_v = DagVersion.get_latest_version(dag.dag_id)
- data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
+
+ data_interval =
scheduler_dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
+
dag_run = create_dagrun(
- dag,
+ scheduler_dag,
logical_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
data_interval=data_interval,
@@ -5866,13 +5871,16 @@ class TestSchedulerJob:
dagfile = os.path.join(EXAMPLE_DAGS_FOLDER,
"example_branch_operator.py")
dagbag = DagBag(dagfile)
dag = dagbag.get_dag("example_branch_operator")
- DAG.bulk_write_to_db("testing", None, [dag])
+ dm = LazyDeserializedDAG(data=SerializedDAG.to_dict(dag))
+ scheduler_dag = DAG.from_sdk_dag(dag)
+
+ DAG.bulk_write_to_db("testing", None, [dm])
SerializedDagModel.write_dag(dag, bundle_name="testing")
session.query(Job).delete()
- data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
+ data_interval =
scheduler_dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dag_run = create_dagrun(
- dag,
+ scheduler_dag,
logical_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
data_interval=data_interval,
diff --git a/airflow-core/tests/unit/models/test_dagbag.py
b/airflow-core/tests/unit/models/test_dagbag.py
index 6d659c99c4c..657cc0e3659 100644
--- a/airflow-core/tests/unit/models/test_dagbag.py
+++ b/airflow-core/tests/unit/models/test_dagbag.py
@@ -681,7 +681,7 @@ with airflow.DAG(
"""
with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 0)),
tick=False):
example_bash_op_dag =
DagBag(include_examples=True).dags.get("example_bash_operator")
- example_bash_op_dag.sync_to_db()
+ DAG.from_sdk_dag(example_bash_op_dag).sync_to_db()
SerializedDagModel.write_dag(dag=example_bash_op_dag,
bundle_name="testing")
dag_bag = DagBag(read_dags_from_db=True)
@@ -699,7 +699,7 @@ with airflow.DAG(
# Make a change in the DAG and write Serialized DAG to the DB
with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 6)),
tick=False):
example_bash_op_dag.tags.add("new_tag")
- example_bash_op_dag.sync_to_db()
+ DAG.from_sdk_dag(example_bash_op_dag).sync_to_db()
SerializedDagModel.write_dag(dag=example_bash_op_dag,
bundle_name="testing")
# Since min_serialized_dag_fetch_interval is passed verify that
calling 'dag_bag.get_dag'
@@ -723,7 +723,7 @@ with airflow.DAG(
# serialize the initial version of the DAG
with time_machine.travel((tz.datetime(2020, 1, 5, 0, 0, 0)),
tick=False):
example_bash_op_dag =
DagBag(include_examples=True).dags.get("example_bash_operator")
- example_bash_op_dag.sync_to_db()
+ DAG.from_sdk_dag(example_bash_op_dag).sync_to_db()
SerializedDagModel.write_dag(dag=example_bash_op_dag,
bundle_name="testing")
# deserialize the DAG
@@ -749,7 +749,7 @@ with airflow.DAG(
# long before the transaction is committed
with time_machine.travel((tz.datetime(2020, 1, 5, 1, 0, 0)),
tick=False):
example_bash_op_dag.tags.add("new_tag")
- example_bash_op_dag.sync_to_db()
+ DAG.from_sdk_dag(example_bash_op_dag).sync_to_db()
SerializedDagModel.write_dag(dag=example_bash_op_dag,
bundle_name="testing")
# Since min_serialized_dag_fetch_interval is passed verify that
calling 'dag_bag.get_dag'
@@ -769,7 +769,7 @@ with airflow.DAG(
example_dags = dagbag.dags
for dag in example_dags.values():
- dag.sync_to_db()
+ DAG.from_sdk_dag(dag).sync_to_db()
SerializedDagModel.write_dag(dag, bundle_name="dag_maker")
new_dagbag = DagBag(read_dags_from_db=True)
diff --git a/airflow-core/tests/unit/models/test_dagcode.py
b/airflow-core/tests/unit/models/test_dagcode.py
index f604037ffe7..97bc070a6d4 100644
--- a/airflow-core/tests/unit/models/test_dagcode.py
+++ b/airflow-core/tests/unit/models/test_dagcode.py
@@ -30,6 +30,7 @@ from airflow.models.dag import DAG
from airflow.models.dag_version import DagVersion
from airflow.models.dagcode import DagCode
from airflow.models.serialized_dag import SerializedDagModel as SDM
+from airflow.serialization.serialized_objects import LazyDeserializedDAG,
SerializedDAG
# To move it to a shared module.
from airflow.utils.file import open_maybe_zipped
@@ -54,7 +55,8 @@ def make_example_dags(module):
session.add(testing)
dagbag = DagBag(module.__path__[0])
- DAG.bulk_write_to_db("testing", None, dagbag.dags.values())
+ dags = [LazyDeserializedDAG(data=SerializedDAG.to_dict(dag)) for dag in
dagbag.dags.values()]
+ DAG.bulk_write_to_db("testing", None, dags)
return dagbag.dags
@@ -142,7 +144,9 @@ class TestDagCode:
"""Test new DagCode is created in DB when ser dag is changed"""
example_dag =
make_example_dags(example_dags_module).get("example_bash_operator")
SDM.write_dag(example_dag, bundle_name="testing")
- example_dag.create_dagrun(
+
+ dag = DAG.from_sdk_dag(example_dag)
+ dag.create_dagrun(
run_id="test1",
run_after=pendulum.datetime(2025, 1, 1, tz="UTC"),
state=DagRunState.QUEUED,
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py
b/airflow-core/tests/unit/models/test_serialized_dag.py
index f20e3cc1375..91910383b4a 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -36,7 +36,7 @@ from airflow.providers.standard.operators.bash import
BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import DAG, Asset
-from airflow.serialization.serialized_objects import SerializedDAG
+from airflow.serialization.serialized_objects import LazyDeserializedDAG,
SerializedDAG
from airflow.settings import json
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.session import create_session
@@ -60,7 +60,9 @@ def make_example_dags(module):
session.add(testing)
dagbag = DagBag(module.__path__[0])
- SchedulerDAG.bulk_write_to_db("testing", None, dagbag.dags.values())
+
+ dags = [LazyDeserializedDAG(data=SerializedDAG.to_dict(dag)) for dag in
dagbag.dags.values()]
+ SchedulerDAG.bulk_write_to_db("testing", None, dags)
return dagbag.dags
@@ -143,7 +145,10 @@ class TestSerializedDagModel:
example_bash_op_dag = example_dags.get("example_bash_operator")
dag_updated = SDM.write_dag(dag=example_bash_op_dag,
bundle_name="testing")
assert dag_updated is True
- example_bash_op_dag.create_dagrun(
+
+ # SchedulerDAG is created to create dagrun
+ dag = SchedulerDAG.from_sdk_dag(dag=example_bash_op_dag)
+ dag.create_dagrun(
run_id="test1",
run_after=pendulum.datetime(2025, 1, 1, tz="UTC"),
state=DagRunState.QUEUED,
@@ -191,7 +196,10 @@ class TestSerializedDagModel:
assert len(example_dags) == len(serialized_dags)
dag = example_dags.get("example_bash_operator")
- dag.create_dagrun(
+
+ # DAGs are serialized and deserialized to access create_dagrun object
+ sdag =
SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(dag=dag))
+ sdag.create_dagrun(
run_id="test1",
run_after=pendulum.datetime(2025, 1, 1, tz="UTC"),
state=DagRunState.QUEUED,