This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 56699a8aa88 Add team name in asset graph view (#68457)
56699a8aa88 is described below
commit 56699a8aa8892e8fc91f663f788534a00ebd0062
Author: Vincent <[email protected]>
AuthorDate: Fri Jun 12 15:16:29 2026 -0400
Add team name in asset graph view (#68457)
---
.../api_fastapi/core_api/datamodels/ui/common.py | 1 +
.../api_fastapi/core_api/openapi/_private_ui.yaml | 10 +++
.../core_api/services/ui/dependencies.py | 21 ++++++
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 22 ++++++
.../airflow/ui/openapi-gen/requests/types.gen.ts | 2 +
.../airflow/ui/src/components/Graph/DagNode.tsx | 16 +++-
.../airflow/ui/src/components/Graph/TaskNode.tsx | 13 ++++
.../ui/src/components/Graph/elkGraphUtils.ts | 2 +
.../ui/src/components/Graph/reactflowUtils.ts | 1 +
.../core_api/routes/ui/test_dependencies.py | 86 +++++++++++++++++++++-
.../core_api/routes/ui/test_structure.py | 22 ++++++
11 files changed, 191 insertions(+), 5 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py
index 464eef0402d..e4296730017 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/common.py
@@ -54,6 +54,7 @@ class BaseNodeResponse(BaseModel):
"sensor",
"trigger",
]
+ team: str | None = None
E = TypeVar("E", bound=BaseEdgeResponse)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 8b28661c26a..ee7d0beb815 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -1884,6 +1884,11 @@ components:
- sensor
- trigger
title: Type
+ team:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Team
type: object
required:
- id
@@ -3271,6 +3276,11 @@ components:
- sensor
- trigger
title: Type
+ team:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Team
children:
anyOf:
- items:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py
index f125e22fce9..eb4e8e20910 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/dependencies.py
@@ -21,6 +21,7 @@ from collections import defaultdict, deque
from typing import TYPE_CHECKING
from airflow.models.asset import AssetModel
+from airflow.models.dag import DagModel
if TYPE_CHECKING:
from sqlalchemy.orm import Session
@@ -134,6 +135,15 @@ def get_scheduling_dependencies(readable_dag_ids: set[str]
| None = None) -> dic
target = dep.target if ":" in dep.target else
f"dag:{dep.target}"
edge_tuples.add((source, target))
+ dag_ids = [node["label"] for node in nodes_dict.values() if node["type"]
== "dag"]
+ if dag_ids:
+ dag_id_to_team = DagModel.get_dag_id_to_team_name_mapping(dag_ids)
+ for node in nodes_dict.values():
+ if node["type"] == "dag":
+ team_name = dag_id_to_team.get(node["label"])
+ if team_name:
+ node["team"] = team_name
+
return {
"nodes": list(nodes_dict.values()),
"edges": [{"source_id": source, "target_id": target} for source,
target in sorted(edge_tuples)],
@@ -267,6 +277,17 @@ def get_data_dependencies(
if outlet_ref.asset_id not in processed_assets:
assets_to_process.append(outlet_ref.asset_id)
+ all_dag_ids = list({dag_id for dag_id, _ in processed_tasks})
+ if all_dag_ids:
+ dag_id_to_team = DagModel.get_dag_id_to_team_name_mapping(all_dag_ids,
session=session)
+ for node in nodes_dict.values():
+ if not node["id"].startswith("task:"):
+ continue
+ dag_id = node["id"].removeprefix("task:").split(SEPARATOR, 1)[0]
+ team_name = dag_id_to_team.get(dag_id)
+ if team_name:
+ node["team"] = team_name
+
return {
"nodes": list(nodes_dict.values()),
"edges": [{"source_id": source, "target_id": target} for source,
target in edge_set],
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 6e73f5b9204..8db0fd81208 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -8234,6 +8234,17 @@ export const $BaseNodeResponse = {
type: 'string',
enum: ['join', 'task', 'asset-condition', 'asset', 'asset-alias',
'asset-name-ref', 'asset-uri-ref', 'dag', 'sensor', 'trigger'],
title: 'Type'
+ },
+ team: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Team'
}
},
type: 'object',
@@ -9756,6 +9767,17 @@ export const $NodeResponse = {
enum: ['join', 'task', 'asset-condition', 'asset', 'asset-alias',
'asset-name-ref', 'asset-uri-ref', 'dag', 'sensor', 'trigger'],
title: 'Type'
},
+ team: {
+ anyOf: [
+ {
+ type: 'string'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Team'
+ },
children: {
anyOf: [
{
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index e8317de2300..27ff0f96656 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -2064,6 +2064,7 @@ export type BaseNodeResponse = {
id: string;
label: string;
type: 'join' | 'task' | 'asset-condition' | 'asset' | 'asset-alias' |
'asset-name-ref' | 'asset-uri-ref' | 'dag' | 'sensor' | 'trigger';
+ team?: string | null;
};
export type type = 'join' | 'task' | 'asset-condition' | 'asset' |
'asset-alias' | 'asset-name-ref' | 'asset-uri-ref' | 'dag' | 'sensor' |
'trigger';
@@ -2466,6 +2467,7 @@ export type NodeResponse = {
id: string;
label: string;
type: 'join' | 'task' | 'asset-condition' | 'asset' | 'asset-alias' |
'asset-name-ref' | 'asset-uri-ref' | 'dag' | 'sensor' | 'trigger';
+ team?: string | null;
children?: Array<NodeResponse> | null;
is_mapped?: boolean | null;
tooltip?: string | null;
diff --git a/airflow-core/src/airflow/ui/src/components/Graph/DagNode.tsx
b/airflow-core/src/airflow/ui/src/components/Graph/DagNode.tsx
index 3cd9e100a4c..be0f7f9741d 100644
--- a/airflow-core/src/airflow/ui/src/components/Graph/DagNode.tsx
+++ b/airflow-core/src/airflow/ui/src/components/Graph/DagNode.tsx
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { Flex, HStack, LinkOverlay } from "@chakra-ui/react";
+import { Flex, HStack, LinkOverlay, Text } from "@chakra-ui/react";
import type { NodeProps, Node as NodeType } from "@xyflow/react";
import { Link as RouterLink } from "react-router-dom";
@@ -28,7 +28,7 @@ import { NodeWrapper } from "./NodeWrapper";
import type { CustomNodeProps } from "./reactflowUtils";
export const DagNode = ({
- data: { height, isOpen, isSelected, label, width },
+ data: { height, isOpen, isSelected, label, team, width },
}: NodeProps<NodeType<CustomNodeProps, "dag">>) => {
const { data: dag } = useDagServiceGetDag({ dagId: label });
@@ -58,6 +58,18 @@ export const DagNode = ({
<LinkOverlay asChild>
<RouterLink to={`/dags/${dag?.dag_id ??
label}`}>{dag?.dag_display_name ?? label}</RouterLink>
</LinkOverlay>
+ {team !== undefined && team !== null ? (
+ <Text
+ color="fg.muted"
+ fontSize="xs"
+ fontStyle="italic"
+ overflow="hidden"
+ textOverflow="ellipsis"
+ whiteSpace="nowrap"
+ >
+ {team}
+ </Text>
+ ) : undefined}
</Flex>
</NodeWrapper>
);
diff --git a/airflow-core/src/airflow/ui/src/components/Graph/TaskNode.tsx
b/airflow-core/src/airflow/ui/src/components/Graph/TaskNode.tsx
index 983865095d4..5d8affeca30 100644
--- a/airflow-core/src/airflow/ui/src/components/Graph/TaskNode.tsx
+++ b/airflow-core/src/airflow/ui/src/components/Graph/TaskNode.tsx
@@ -45,6 +45,7 @@ export const TaskNode = ({
operator,
setupTeardownType,
taskInstance,
+ team,
tooltip,
width = 0,
},
@@ -133,6 +134,18 @@ export const TaskNode = ({
>
{isGroup ? translate("graph.taskGroup") : displayOperator}
</Text>
+ {team !== undefined && team !== null ? (
+ <Text
+ color="fg.muted"
+ fontSize="xs"
+ fontStyle="italic"
+ overflow="hidden"
+ textOverflow="ellipsis"
+ whiteSpace="nowrap"
+ >
+ {team}
+ </Text>
+ ) : undefined}
{taskInstance === undefined ? undefined : (
<HStack>
<StateBadge fontSize="xs" state={taskInstance.state}>
diff --git a/airflow-core/src/airflow/ui/src/components/Graph/elkGraphUtils.ts
b/airflow-core/src/airflow/ui/src/components/Graph/elkGraphUtils.ts
index ac460c29cb7..593dec17c9a 100644
--- a/airflow-core/src/airflow/ui/src/components/Graph/elkGraphUtils.ts
+++ b/airflow-core/src/airflow/ui/src/components/Graph/elkGraphUtils.ts
@@ -48,6 +48,7 @@ export type FormattedNode = {
isMapped?: boolean;
isOpen?: boolean;
setupTeardownType?: NodeResponse["setup_teardown_type"];
+ team?: string | null;
} & ElkShape &
NodeResponse;
@@ -388,6 +389,7 @@ export const generateElkGraph = ({
layoutOptions: direction === "RIGHT" ? { "elk.portConstraints":
"FIXED_SIDE" } : undefined,
operator: node.operator,
setupTeardownType: node.setup_teardown_type,
+ team: node.team,
tooltip: node.tooltip,
type: node.type,
width,
diff --git a/airflow-core/src/airflow/ui/src/components/Graph/reactflowUtils.ts
b/airflow-core/src/airflow/ui/src/components/Graph/reactflowUtils.ts
index f50025b6cf3..67fe2c2d240 100644
--- a/airflow-core/src/airflow/ui/src/components/Graph/reactflowUtils.ts
+++ b/airflow-core/src/airflow/ui/src/components/Graph/reactflowUtils.ts
@@ -38,6 +38,7 @@ export type CustomNodeProps = {
operator?: string | null;
setupTeardownType?: NodeResponse["setup_teardown_type"];
taskInstance?: LightGridTaskInstanceSummary;
+ team?: string | null;
tooltip?: string | null;
type: string;
width?: number;
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py
index d02a66393bc..513290accb9 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dependencies.py
@@ -23,6 +23,8 @@ import pytest
from sqlalchemy import select
from airflow.models.asset import AssetModel
+from airflow.models.dagbundle import DagBundleModel
+from airflow.models.team import Team
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.trigger_dagrun import
TriggerDagRunOperator
from airflow.providers.standard.sensors.external_task import ExternalTaskSensor
@@ -113,31 +115,37 @@ def expected_primary_component_response(asset1_id):
"id": "dag:downstream",
"label": "downstream",
"type": "dag",
+ "team": None,
},
{
"id": f"asset:{asset1_id}",
"label": "asset1",
"type": "asset",
+ "team": None,
},
{
"id": "sensor:other_dag:downstream:external_task_sensor",
"label": "external_task_sensor",
"type": "sensor",
+ "team": None,
},
{
"id": "dag:external_trigger_dag_id",
"label": "external_trigger_dag_id",
"type": "dag",
+ "team": None,
},
{
"id":
"trigger:external_trigger_dag_id:downstream:trigger_dag_run_operator",
"label": "trigger_dag_run_operator",
"type": "trigger",
+ "team": None,
},
{
"id": "dag:upstream",
"label": "upstream",
"type": "dag",
+ "team": None,
},
],
}
@@ -188,16 +196,19 @@ def expected_secondary_component_response(asset2_id):
"id": "dag:downstream_secondary",
"label": "downstream_secondary",
"type": "dag",
+ "team": None,
},
{
"id": f"asset:{asset2_id}",
"label": "asset2",
"type": "asset",
+ "team": None,
},
{
"id": "dag:upstream_secondary",
"label": "upstream_secondary",
"type": "dag",
+ "team": None,
},
],
}
@@ -206,7 +217,7 @@ def expected_secondary_component_response(asset2_id):
class TestGetDependencies:
@pytest.mark.usefixtures("make_primary_connected_component")
def test_should_response_200(self, test_client,
expected_primary_component_response):
- with assert_queries_count(6):
+ with assert_queries_count(7):
response = test_client.get("/dependencies")
assert response.status_code == 200
@@ -242,7 +253,7 @@ class TestGetDependencies:
@pytest.mark.usefixtures("make_primary_connected_component",
"make_secondary_connected_component")
def test_with_node_id_filter(self, test_client, node_id,
expected_response_fixture, request):
expected_response = request.getfixturevalue(expected_response_fixture)
- with assert_queries_count(6):
+ with assert_queries_count(7):
response = test_client.get("/dependencies", params={"node_id":
node_id})
assert response.status_code == 200
@@ -260,7 +271,7 @@ class TestGetDependencies:
(asset1_id, expected_primary_component_response),
(asset2_id, expected_secondary_component_response),
):
- with assert_queries_count(6):
+ with assert_queries_count(7):
response = test_client.get("/dependencies", params={"node_id":
f"asset:{asset_id}"})
assert response.status_code == 200
@@ -320,6 +331,8 @@ class TestGetDependencies:
# Task label includes dag_id.task_id for disambiguation
assert nodes_by_id[task_node_id]["label"] == "upstream.task2"
assert nodes_by_id[task_node_id]["type"] == "task"
+ # No team association in the default fixture
+ assert nodes_by_id[task_node_id]["team"] is None
# Task should point to asset (producing task → asset)
edges = result["edges"]
@@ -471,3 +484,70 @@ class TestGetDependencies:
params={"node_id": f"asset:{asset1_id}", "dependency_type":
"data"},
)
assert response.status_code == 404
+
+ def test_scheduling_dependencies_include_team_name(self, dag_maker,
test_client, session):
+ team = Team(name="my-team")
+ session.add(team)
+ bundle = DagBundleModel(name="team-bundle-sched")
+ bundle.teams.append(team)
+ session.add(bundle)
+ session.flush()
+
+ with dag_maker(
+ dag_id="team_upstream",
+ serialized=True,
+ session=session,
+ bundle_name="team-bundle-sched",
+ ):
+ EmptyOperator(task_id="t1",
outlets=[Asset(uri="s3://team-asset/1", name="team_asset")])
+
+ with dag_maker(
+ dag_id="team_downstream",
+ schedule=[Asset(uri="s3://team-asset/1", name="team_asset")],
+ serialized=True,
+ session=session,
+ bundle_name="team-bundle-sched",
+ ):
+ EmptyOperator(task_id="t2")
+
+ dag_maker.sync_dagbag_to_db()
+
+ response = test_client.get("/dependencies")
+ assert response.status_code == 200
+
+ result = response.json()
+ dag_nodes = {node["id"]: node for node in result["nodes"] if
node["type"] == "dag"}
+ assert dag_nodes["dag:team_upstream"]["team"] == "my-team"
+ assert dag_nodes["dag:team_downstream"]["team"] == "my-team"
+
+ def test_data_dependencies_include_team_name_on_task_nodes(self,
dag_maker, test_client, session):
+ team = Team(name="data-team")
+ session.add(team)
+ bundle = DagBundleModel(name="team-bundle-data")
+ bundle.teams.append(team)
+ session.add(bundle)
+ session.flush()
+
+ asset = Asset(uri="s3://team-data-asset/1", name="team_data_asset")
+ with dag_maker(
+ dag_id="producer_dag",
+ serialized=True,
+ session=session,
+ bundle_name="team-bundle-data",
+ ):
+ EmptyOperator(task_id="produce_task", outlets=[asset])
+
+ dag_maker.sync_dagbag_to_db()
+
+ asset_id = session.scalar(select(AssetModel.id).where(AssetModel.name
== "team_data_asset"))
+
+ response = test_client.get(
+ "/dependencies",
+ params={"node_id": f"asset:{asset_id}", "dependency_type": "data"},
+ )
+ assert response.status_code == 200
+
+ result = response.json()
+ task_nodes = [node for node in result["nodes"] if node["type"] ==
"task"]
+ assert len(task_nodes) == 1
+ assert task_nodes[0]["team"] == "data-team"
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py
index 8e504c132c4..0bde80a22dc 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py
@@ -55,6 +55,7 @@ LATEST_VERSION_DAG_RESPONSE: dict = {
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
+ "team": None,
"operator": "EmptyOperator",
"asset_condition_type": None,
},
@@ -66,6 +67,7 @@ LATEST_VERSION_DAG_RESPONSE: dict = {
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
+ "team": None,
"operator": "EmptyOperator",
"asset_condition_type": None,
},
@@ -77,6 +79,7 @@ LATEST_VERSION_DAG_RESPONSE: dict = {
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
+ "team": None,
"operator": "EmptyOperator",
"asset_condition_type": None,
},
@@ -278,6 +281,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
+ "team": None,
"operator": "EmptyOperator",
},
{
@@ -289,6 +293,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
+ "team": None,
"operator": "ExternalTaskSensor",
},
{
@@ -300,6 +305,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
+ "team": None,
"operator": "EmptyOperator",
},
],
@@ -334,6 +340,7 @@ class TestStructureDataEndpoint:
"setup_teardown_type": None,
"tooltip": None,
"type": "task",
+ "team": None,
},
],
},
@@ -361,6 +368,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
+ "team": None,
"operator": "TriggerDagRunOperator",
},
{
@@ -372,6 +380,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "trigger",
+ "team": None,
"operator": None,
},
],
@@ -468,6 +477,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
+ "team": None,
"operator": "EmptyOperator",
"asset_condition_type": None,
},
@@ -479,6 +489,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
+ "team": None,
"operator": "ExternalTaskSensor",
"asset_condition_type": None,
},
@@ -490,6 +501,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "task",
+ "team": None,
"operator": "EmptyOperator",
"asset_condition_type": None,
},
@@ -501,6 +513,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "asset",
+ "team": None,
"operator": None,
"asset_condition_type": None,
},
@@ -512,6 +525,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "sensor",
+ "team": None,
"operator": None,
"asset_condition_type": None,
},
@@ -523,6 +537,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "trigger",
+ "team": None,
"operator": None,
"asset_condition_type": None,
},
@@ -534,6 +549,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "asset-condition",
+ "team": None,
"operator": None,
"asset_condition_type": "and-gate",
},
@@ -545,6 +561,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "asset",
+ "team": None,
"operator": None,
"asset_condition_type": None,
},
@@ -556,6 +573,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "asset",
+ "team": None,
"operator": None,
"asset_condition_type": None,
},
@@ -567,6 +585,7 @@ class TestStructureDataEndpoint:
"tooltip": None,
"setup_teardown_type": None,
"type": "asset-alias",
+ "team": None,
"operator": None,
"asset_condition_type": None,
},
@@ -611,6 +630,7 @@ class TestStructureDataEndpoint:
"id": "task_1",
"label": "task_1",
"type": "task",
+ "team": None,
"children": None,
"is_mapped": None,
"tooltip": None,
@@ -622,6 +642,7 @@ class TestStructureDataEndpoint:
"id": "task_2",
"label": "task_2",
"type": "task",
+ "team": None,
"children": None,
"is_mapped": None,
"tooltip": None,
@@ -633,6 +654,7 @@ class TestStructureDataEndpoint:
"id": f"asset:{resolved_asset.id}",
"label": "resolved_example_asset_alias",
"type": "asset",
+ "team": None,
"children": None,
"is_mapped": None,
"tooltip": None,