ashb commented on code in PR #51805: URL: https://github.com/apache/airflow/pull/51805#discussion_r2167693068
########## airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py: ########## @@ -257,8 +268,324 @@ def grid_data( ) for dag_run in dag_runs ] + return GridResponse(dag_runs=grid_dag_runs) + + +def _get_latest_serdag(dag_id, session): + serdag = session.scalar( + select(SerializedDagModel) + .where( + SerializedDagModel.dag_id == dag_id, + ) + .order_by(SerializedDagModel.id.desc()) + .limit(1) + ) + if not serdag: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"Dag with id {dag_id} was not found", + ) + return serdag + + +def _get_serdag(dag_id, dag_version_id, session) -> SerializedDagModel | None: + # this is a simplification - we account for structure based on the first task + version = session.scalar(select(DagVersion).where(DagVersion.id == dag_version_id)) + if not version: + version = session.scalar( + select(DagVersion) + .where( + DagVersion.dag_id == dag_id, + ) + .order_by(DagVersion.id) # ascending cus this is mostly for pre-3.0 upgrade + .limit(1) + ) + if not (serdag := version.serialized_dag): + log.error( + "No serialized dag found", + dag_id=dag_id, + version_id=version.id, + version_number=version.version_number, + ) + return serdag + + +@grid_router.get( + "/structure/{dag_id}", + responses=create_openapi_http_exception_doc([status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND]), + dependencies=[ + Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.TASK_INSTANCE)), + Depends(requires_access_dag(method="GET", access_entity=DagAccessEntity.RUN)), + ], + response_model_exclude_none=True, +) +def get_dag_structure( + dag_id: str, + session: SessionDep, + offset: QueryOffset, + limit: QueryLimit, + order_by: Annotated[ + SortParam, + Depends(SortParam(["run_after", "logical_date", "start_date", "end_date"], DagRun).dynamic_depends()), + ], + run_after: Annotated[RangeFilter, Depends(datetime_range_filter_factory("run_after", DagRun))], +) -> list[GridNodeResponse]: + """Return dag structure for grid view.""" + latest_serdag = _get_latest_serdag(dag_id, session) + latest_dag = latest_serdag.dag - flat_tis = itertools.chain.from_iterable(tis_by_run_id.values()) - structure = get_combined_structure(task_instances=flat_tis, session=session) + # Retrieve, sort the previous DAG Runs + base_query = select(DagRun.id).where(DagRun.dag_id == dag_id) + # This comparison is to fall back to DAG timetable when no order_by is provided + if order_by.value == order_by.get_primary_key_string(): + ordering = list(latest_dag.timetable.run_ordering) + order_by = SortParam( + allowed_attrs=ordering, + model=DagRun, + ).set_value(ordering[0]) + dag_runs_select_filter, _ = paginated_select( + statement=base_query, + order_by=order_by, + offset=offset, + filters=[run_after], + limit=limit, + ) + run_ids = list(session.scalars(dag_runs_select_filter)) + + task_group_sort = get_task_group_children_getter() + if not run_ids: + nodes = [task_group_to_dict_grid(x) for x in task_group_sort(latest_dag.task_group)] + return nodes - return GridResponse(dag_runs=grid_dag_runs, structure=structure) + serdags = session.scalars( + select(SerializedDagModel).where( + SerializedDagModel.dag_version_id.in_( + select(TaskInstance.dag_version_id) + .join(TaskInstance.dag_run) + .where( + DagRun.id.in_(run_ids), + SerializedDagModel.id != latest_serdag.id, + ) + ) + ) + ) + merged_nodes: list[GridNodeResponse] = [] + dags = [latest_dag] + for serdag in serdags: + if serdag: + dags.append(serdag.dag) + for dag in dags: + nodes = [task_group_to_dict_grid(x) for x in task_group_sort(dag.task_group)] + _merge_node_dicts(merged_nodes, nodes) + + return merged_nodes + + +@grid_router.get( + "/runs/{dag_id}", Review Comment: Ditto etc ########## airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py: ########## @@ -257,8 +268,324 @@ def grid_data( ) for dag_run in dag_runs ] + return GridResponse(dag_runs=grid_dag_runs) + + +def _get_latest_serdag(dag_id, session): + serdag = session.scalar( + select(SerializedDagModel) + .where( + SerializedDagModel.dag_id == dag_id, + ) + .order_by(SerializedDagModel.id.desc()) + .limit(1) + ) + if not serdag: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"Dag with id {dag_id} was not found", + ) + return serdag + + +def _get_serdag(dag_id, dag_version_id, session) -> SerializedDagModel | None: + # this is a simplification - we account for structure based on the first task + version = session.scalar(select(DagVersion).where(DagVersion.id == dag_version_id)) + if not version: + version = session.scalar( + select(DagVersion) + .where( + DagVersion.dag_id == dag_id, + ) + .order_by(DagVersion.id) # ascending cus this is mostly for pre-3.0 upgrade + .limit(1) + ) + if not (serdag := version.serialized_dag): + log.error( + "No serialized dag found", + dag_id=dag_id, + version_id=version.id, + version_number=version.version_number, + ) + return serdag + + +@grid_router.get( + "/structure/{dag_id}", Review Comment: Nit it is not too late (but also don't let this stop merging!) ```suggestion "/{dag_id}/structure", ``` As the structure is "a property" of the dag this feels more natural to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org