This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e63e7d1d4f0 fix(serialized_dag): expand DagDependency generation and 
add fallback value (#49327)
e63e7d1d4f0 is described below

commit e63e7d1d4f03513de57e102a68174a526d8cfe68
Author: Wei Lee <[email protected]>
AuthorDate: Wed Apr 16 09:16:22 2025 +0800

    fix(serialized_dag): expand DagDependency generation and add fallback value 
(#49327)
---
 airflow-core/src/airflow/models/serialized_dag.py | 22 +++++++++++++++++++---
 1 file changed, 19 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/models/serialized_dag.py 
b/airflow-core/src/airflow/models/serialized_dag.py
index a9340d43ed9..9e4c6115d92 100644
--- a/airflow-core/src/airflow/models/serialized_dag.py
+++ b/airflow-core/src/airflow/models/serialized_dag.py
@@ -105,7 +105,17 @@ class _DagDependenciesResolver:
                             dep_data[node_key] = f"asset:{asset_id}"
                             break
 
-                    dag_deps.append(DagDependency(**dep_data))
+                    dep_id = dep_data["dependency_id"]
+                    dag_deps.append(
+                        DagDependency(
+                            source=dep_data["source"],
+                            target=dep_data["target"],
+                            # handle the case that serialized_dag does not 
have label column (e.g., from 2.x)
+                            label=dep_data.get("label", dep_id),
+                            dependency_type=dep_data["dependency_type"],
+                            dependency_id=dep_id,
+                        )
+                    )
 
             dag_depdendencies_by_dag[dag_id] = dag_deps
         return dag_depdendencies_by_dag
@@ -171,8 +181,14 @@ class _DagDependenciesResolver:
     def resolve_asset_dag_dep(self, dep_data: dict) -> DagDependency:
         dep_id = dep_data["dependency_id"]
         unique_key = AssetUniqueKey.from_str(dep_id)
-        dep_data["dependency_id"] = str(self.asset_key_to_id[unique_key])
-        return DagDependency(**dep_data)
+        return DagDependency(
+            source=dep_data["source"],
+            target=dep_data["target"],
+            # handle the case that serialized_dag does not have label column 
(e.g., from 2.x)
+            label=dep_data.get("label", unique_key.name),
+            dependency_type=dep_data["dependency_type"],
+            dependency_id=str(self.asset_key_to_id[unique_key]),
+        )
 
     def resolve_asset_ref_dag_dep(
         self, dep_data: dict, ref_type: Literal["asset-name-ref", 
"asset-uri-ref"]

Reply via email to