kacpermuda commented on code in PR #62018:
URL: https://github.com/apache/airflow/pull/62018#discussion_r2817717783
##########
providers/openlineage/src/airflow/providers/openlineage/utils/utils.py:
##########
@@ -1361,9 +1361,82 @@ def get_dag_job_dependency_facet(
return {}
+def _build_labeled_edge_map(dag: DAG | SerializedDAG) -> dict[str, tuple[dict,
dict]]:
+ """
+ Build a mapping of classified labeled edges for every task and group in
the DAG.
+
+ Translates Airflow's internal ``dag.edge_info`` (which uses virtual
+ ``downstream_join_id`` / ``upstream_join_id`` keys) into a clean mapping
+ keyed by real ``task_id`` or ``group_id``. Each value is a tuple of
+ ``(downstream_task_edges, downstream_group_edges)`` sorted dicts.
+
+ ``downstream_task_edges``
+ Target is a **specific task**. Keyed by ``task_id``, regardless of
+ whether source and target are in the same group, different groups,
+ or at root level.
+
+ ``downstream_group_edges``
+ Target is a **TaskGroup as a whole**. Keyed by ``group_id``.
+ Only produced by ``task >> TaskGroup`` or ``TaskGroup >> TaskGroup``.
+
+ **How to read the result (for OL event consumers)**
+
+ Look up ``edge_map.get(task_id)`` or ``edge_map.get(group_id)``.
+ Missing keys mean no labeled edges from that source. Unlabeled edges
+ are not included — use ``downstream_task_ids`` for the full dependency
+ graph.
+
+ Key points:
+
+ - Cross-group task-to-task labels (e.g. ``step_2 >> Label >> final``
+ where the tasks are in different groups) appear under the **source
+ group's** key, not the originating task's key, because Airflow promotes
+ labels to the group boundary.
+ - ``task >> TaskGroup`` produces entries in **both** dicts: per-root-task
+ entries in ``downstream_task_edges`` and a group-level entry in
+ ``downstream_group_edges``, all with the same label.
+
+ See ``test_get_tasks_details_with_edge_labels`` for a comprehensive example
+ covering all edge patterns (same-group, cross-group, task-to-TaskGroup,
+ TaskGroup-to-TaskGroup, nested groups, etc.).
+ """
+ edge_info = dag.edge_info or {}
+ if not edge_info:
+ return {}
+
+ upstream_join_id_to_group_id: dict[str, str] = {}
+ downstream_join_id_to_group_id: dict[str, str] = {}
+
+ for tg_id, tg in dag.task_group_dict.items():
+ upstream_join_id_to_group_id[tg.upstream_join_id] = tg_id
+ downstream_join_id_to_group_id[tg.downstream_join_id] = tg_id
+
+ result: dict[str, tuple[dict, dict]] = {}
+ for source_id, edges in edge_info.items():
+ # Resolve downstream_join_id sources to their owning group_id
+ group_id = downstream_join_id_to_group_id.get(source_id)
+ logical_key = group_id if group_id is not None else source_id
+
+ task_edges: dict = {}
+ group_edges: dict = {}
+ for target_id, info in sorted(edges.items()):
+ resolved_group = upstream_join_id_to_group_id.get(target_id)
+ if resolved_group is not None: # If target is a group, classify
as group edge
+ group_edges[resolved_group] = info
+ else:
+ task_edges[target_id] = info
+
+ result[logical_key] = (task_edges, group_edges)
+
+ return result
+
+
def _get_tasks_details(dag: DAG | SerializedDAG) -> dict:
- tasks = {
- single_task.task_id: {
+ edge_map = _build_labeled_edge_map(dag)
Review Comment:
Valid point, adjusted
##########
providers/openlineage/src/airflow/providers/openlineage/utils/utils.py:
##########
@@ -1373,23 +1446,26 @@ def _get_tasks_details(dag: DAG | SerializedDAG) ->
dict:
"is_setup": single_task.is_setup,
"is_teardown": single_task.is_teardown,
"downstream_task_ids": sorted(single_task.downstream_task_ids),
+ "downstream_task_edges": task_edges,
+ "downstream_group_edges": group_edges,
}
- for single_task in sorted(dag.tasks, key=lambda x: x.task_id)
- }
-
return tasks
def _get_task_groups_details(dag: DAG | SerializedDAG) -> dict:
- return {
- tg_id: {
+ edge_map = _build_labeled_edge_map(dag)
+ result = {}
+ for tg_id, tg in dag.task_group_dict.items():
+ task_edges, group_edges = edge_map.get(tg_id, ({}, {}))
+ result[tg_id] = {
"parent_group": tg.parent_group.group_id,
Review Comment:
Yup, adjusted, thanks !
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]