pierrejeambrun commented on code in PR #67242:
URL: https://github.com/apache/airflow/pull/67242#discussion_r3303240012


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py:
##########
@@ -308,3 +311,45 @@ def get_latest_run_info(dag_id: str, session: SessionDep) 
-> DAGRunLightResponse
     latest_run_info = session.execute(latest_run_info_select).one_or_none()
 
     return DAGRunLightResponse(**latest_run_info._mapping) if latest_run_info 
else None
+
+
+@dags_router.get(
+    "/run_state_counts",
+    dependencies=[
+        Depends(requires_access_dag(method="GET")),
+        Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.RUN)),
+    ],
+    operation_id="get_dag_run_state_counts_ui",
+)
+def get_dag_run_state_counts(
+    session: SessionDep,
+    readable_dags_filter: ReadableDagsFilterDep,
+    dag_ids: Annotated[list[str], Query(min_length=1)],
+    run_after_gte: datetime | None = None,
+) -> DAGsRunStateCountsCollectionResponse:
+    """Return per-Dag DagRun state counts (zero-filled) for the Dag list 
page."""
+    permitted_dag_ids = readable_dags_filter.value or set()
+    requested_dag_ids = [dag_id for dag_id in dict.fromkeys(dag_ids) if dag_id 
in permitted_dag_ids]
+    counts_by_dag: dict[str, dict[DagRunState, int]] = {
+        dag_id: {state: 0 for state in DagRunState} for dag_id in 
requested_dag_ids
+    }
+
+    if requested_dag_ids:
+        count_query = (
+            select(DagRun.dag_id, DagRun.state, func.count().label("cnt"))
+            .where(DagRun.dag_id.in_(requested_dag_ids))
+            .group_by(DagRun.dag_id, DagRun.state)
+        )
+        if run_after_gte is not None:
+            count_query = count_query.where(DagRun.run_after >= run_after_gte)
+        for row in session.execute(count_query):
+            if row.state is None:
+                continue
+            counts_by_dag[row.dag_id][DagRunState(row.state)] = row.cnt
+

Review Comment:
   THat's not scalable and will take forever to run on tables where there a 
10millions of task runs. 
   
   Similarly to the home page we can limit this to 1k+ items but it needs to be 
tested.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py:
##########
@@ -308,3 +311,45 @@ def get_latest_run_info(dag_id: str, session: SessionDep) 
-> DAGRunLightResponse
     latest_run_info = session.execute(latest_run_info_select).one_or_none()
 
     return DAGRunLightResponse(**latest_run_info._mapping) if latest_run_info 
else None
+
+
+@dags_router.get(
+    "/run_state_counts",
+    dependencies=[
+        Depends(requires_access_dag(method="GET")),
+        Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.RUN)),
+    ],
+    operation_id="get_dag_run_state_counts_ui",
+)
+def get_dag_run_state_counts(
+    session: SessionDep,
+    readable_dags_filter: ReadableDagsFilterDep,
+    dag_ids: Annotated[list[str], Query(min_length=1)],
+    run_after_gte: datetime | None = None,
+) -> DAGsRunStateCountsCollectionResponse:
+    """Return per-Dag DagRun state counts (zero-filled) for the Dag list 
page."""
+    permitted_dag_ids = readable_dags_filter.value or set()
+    requested_dag_ids = [dag_id for dag_id in dict.fromkeys(dag_ids) if dag_id 
in permitted_dag_ids]

Review Comment:
   Why the `fromkeys` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to