This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e63e7d1d4f0 fix(serialized_dag): expand DagDependency generation and
add fallback value (#49327)
e63e7d1d4f0 is described below
commit e63e7d1d4f03513de57e102a68174a526d8cfe68
Author: Wei Lee <[email protected]>
AuthorDate: Wed Apr 16 09:16:22 2025 +0800
fix(serialized_dag): expand DagDependency generation and add fallback value
(#49327)
---
airflow-core/src/airflow/models/serialized_dag.py | 22 +++++++++++++++++++---
1 file changed, 19 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/models/serialized_dag.py
b/airflow-core/src/airflow/models/serialized_dag.py
index a9340d43ed9..9e4c6115d92 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -105,7 +105,17 @@ class _DagDependenciesResolver:
dep_data[node_key] = f"asset:{asset_id}"
break
- dag_deps.append(DagDependency(**dep_data))
+ dep_id = dep_data["dependency_id"]
+ dag_deps.append(
+ DagDependency(
+ source=dep_data["source"],
+ target=dep_data["target"],
+ # handle the case that serialized_dag does not
have label column (e.g., from 2.x)
+ label=dep_data.get("label", dep_id),
+ dependency_type=dep_data["dependency_type"],
+ dependency_id=dep_id,
+ )
+ )
dag_depdendencies_by_dag[dag_id] = dag_deps
return dag_depdendencies_by_dag
@@ -171,8 +181,14 @@ class _DagDependenciesResolver:
def resolve_asset_dag_dep(self, dep_data: dict) -> DagDependency:
dep_id = dep_data["dependency_id"]
unique_key = AssetUniqueKey.from_str(dep_id)
- dep_data["dependency_id"] = str(self.asset_key_to_id[unique_key])
- return DagDependency(**dep_data)
+ return DagDependency(
+ source=dep_data["source"],
+ target=dep_data["target"],
+ # handle the case that serialized_dag does not have label column
(e.g., from 2.x)
+ label=dep_data.get("label", unique_key.name),
+ dependency_type=dep_data["dependency_type"],
+ dependency_id=str(self.asset_key_to_id[unique_key]),
+ )
def resolve_asset_ref_dag_dep(
self, dep_data: dict, ref_type: Literal["asset-name-ref",
"asset-uri-ref"]