This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 65c4900bd7b fix(serialized_objects): fix how dataset/asset
dag_dependency is converted from v1 to v2 (#49281)
65c4900bd7b is described below
commit 65c4900bd7bc4cc1d5227f68dbbc2e5a33f4eaf4
Author: Wei Lee <[email protected]>
AuthorDate: Tue Apr 15 22:28:52 2025 +0800
fix(serialized_objects): fix how dataset/asset dag_dependency is converted
from v1 to v2 (#49281)
## Why
In the original conversion, it replaces every "dataset" to "asset".
However, `Datset("dataset-uri")` will be interpreted as `Asset("asset-uri")`
which should be `Asset("dataset-uri")` instead.
## What
* If the dag dep doesn't have a "label" column, fill it with "dependency_id"
* If the "dependency_type" is "dataset" or "dataset-alias", update it to
"asset" or "asset-alias".
* If the source/target value is "dataset" or "dataset-alias" and it
equals to its "dependency_type" (which means this is a root node or an end
node), update it to "asset" or "asset-alias".
* If the source/target value starts with "dataset:" or "dataset-alias:"
(which means its and intermediate node), update it to "asset:" or
"asset-alias:".
---------
Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
.../airflow/serialization/serialized_objects.py | 18 ++-
.../unit/serialization/test_dag_serialization.py | 122 ++++++++++++++++++++-
2 files changed, 132 insertions(+), 8 deletions(-)
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 8dd2b9294d2..1f425c4facb 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -1892,9 +1892,21 @@ class SerializedDAG(DAG, BaseSerialization):
if "dag_dependencies" in dag_dict:
for dep in dag_dict["dag_dependencies"]:
- for fld in ("dependency_type", "target", "source"):
- if dep.get(fld) == "dataset":
- dep[fld] = "asset"
+ dep_type = dep.get("dependency_type")
+ if dep_type in ("dataset", "dataset-alias"):
+ dep["dependency_type"] = dep_type.replace("dataset",
"asset")
+
+ if not dep.get("label"):
+ dep["label"] = dep["dependency_id"]
+
+ for fld in ("target", "source"):
+ val = dep.get(fld)
+ if val == dep_type and val in ("dataset", "dataset-alias"):
+ dep[fld] = dep[fld].replace("dataset", "asset")
+ elif val.startswith("dataset:"):
+ dep[fld] = dep[fld].replace("dataset:", "asset:")
+ elif val.startswith("dataset-alias:"):
+ dep[fld] = dep[fld].replace("dataset-alias:",
"asset-alias:")
for task in dag_dict["tasks"]:
task_var: dict = task["__var"]
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index d3e605063d8..bff4cbfd5d4 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -3246,17 +3246,128 @@ def test_handle_v1_serdag():
},
"edge_info": {},
"dag_dependencies": [
+ # dataset as schedule (source)
{
- "dependency_id": '{"name": "asset-2", "uri": "asset-2"}',
- "dependency_type": "asset",
- "label": "asset-2",
- "source": "simple_dag",
- "target": "asset",
+ "source": "dataset",
+ "target": "dag1",
+ "dependency_type": "dataset",
+ "dependency_id": "dataset_uri_1",
+ },
+ # dataset alias (resolved) as schedule (source)
+ {
+ "source": "dataset",
+ "target": "dataset-alias:alias_name_1",
+ "dependency_type": "dataset",
+ "dependency_id": "dataset_uri_2",
+ },
+ {
+ "source": "dataset:alias_name_1",
+ "target": "dag2",
+ "dependency_type": "dataset-alias",
+ "dependency_id": "alias_name_1",
+ },
+ # dataset alias (not resolved) as schedule (source)
+ {
+ "source": "dataset-alias",
+ "target": "dag2",
+ "dependency_type": "dataset-alias",
+ "dependency_id": "alias_name_2",
+ },
+ # dataset as outlets (target)
+ {
+ "source": "dag10",
+ "target": "dataset",
+ "dependency_type": "dataset",
+ "dependency_id": "dataset_uri_10",
+ },
+ # dataset alias (resolved) as outlets (target)
+ {
+ "source": "dag20",
+ "target": "dataset-alias:alias_name_10",
+ "dependency_type": "dataset",
+ "dependency_id": "dataset_uri_20",
+ },
+ {
+ "source": "dataset:dataset_uri_20",
+ "target": "dataset-alias",
+ "dependency_type": "dataset-alias",
+ "dependency_id": "alias_name_10",
+ },
+ # dataset alias (not resolved) as outlets (target)
+ {
+ "source": "dag2",
+ "target": "dataset-alias",
+ "dependency_type": "dataset-alias",
+ "dependency_id": "alias_name_2",
},
],
"params": [],
},
}
+ expected_dag_dependencies = [
+ # asset as schedule (source)
+ {
+ "dependency_id": "dataset_uri_1",
+ "dependency_type": "asset",
+ "label": "dataset_uri_1",
+ "source": "asset",
+ "target": "dag1",
+ },
+ # asset alias (resolved) as schedule (source)
+ {
+ "dependency_id": "dataset_uri_2",
+ "dependency_type": "asset",
+ "label": "dataset_uri_2",
+ "source": "asset",
+ "target": "asset-alias:alias_name_1",
+ },
+ {
+ "dependency_id": "alias_name_1",
+ "dependency_type": "asset-alias",
+ "label": "alias_name_1",
+ "source": "asset:alias_name_1",
+ "target": "dag2",
+ },
+ # asset alias (not resolved) as schedule (source)
+ {
+ "dependency_id": "alias_name_2",
+ "dependency_type": "asset-alias",
+ "label": "alias_name_2",
+ "source": "asset-alias",
+ "target": "dag2",
+ },
+ # asset as outlets (target)
+ {
+ "dependency_id": "dataset_uri_10",
+ "dependency_type": "asset",
+ "label": "dataset_uri_10",
+ "source": "dag10",
+ "target": "asset",
+ },
+ # asset alias (resolved) as outlets (target)
+ {
+ "dependency_id": "dataset_uri_20",
+ "dependency_type": "asset",
+ "label": "dataset_uri_20",
+ "source": "dag20",
+ "target": "asset-alias:alias_name_10",
+ },
+ {
+ "dependency_id": "alias_name_10",
+ "dependency_type": "asset-alias",
+ "label": "alias_name_10",
+ "source": "asset:dataset_uri_20",
+ "target": "asset-alias",
+ },
+ # asset alias (not resolved) as outlets (target)
+ {
+ "dependency_id": "alias_name_2",
+ "dependency_type": "asset-alias",
+ "label": "alias_name_2",
+ "source": "dag2",
+ "target": "asset-alias",
+ },
+ ]
SerializedDAG.conversion_v1_to_v2(v1)
@@ -3266,6 +3377,7 @@ def test_handle_v1_serdag():
v1["dag"]["disable_bundle_versioning"] = False
expected = copy.deepcopy(serialized_simple_dag_ground_truth)
+ expected["dag"]["dag_dependencies"] = expected_dag_dependencies
del expected["dag"]["tasks"][1]["__var"]["_operator_extra_links"]
assert v1 == expected