This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ae2f8147abd Improve historical metrics endpoint performance (#63526)
ae2f8147abd is described below

commit ae2f8147abd2a50909daf3da6c1e9894ab963a92
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Mon Mar 16 16:03:00 2026 +0100

    Improve historical metrics endpoint performance (#63526)
    
    * Optimize historical_metrics_data endpoint with capped success count
    
    * Return state_count_limit in historical_metrics response
    
    The UI now reads the cap value from the API response instead of
    hardcoding it, keeping frontend and backend in sync.
    
    * Hide percentage and fill bars when any state count is capped
    
    When any state in a section reaches the cap, percentages become
    unreliable. Fill every bar to full width and hide the percent
    label instead of showing misleading numbers.
    
    * Revert "Hide percentage and fill bars when any state count is capped"
    
    This reverts commit facf46b60e2343d65fb5b457d7d8a7147925f59e.
    
    * Hide percentage and fill bar only for individually capped states
    
    When a specific state's count reaches the cap, its bar fills to
    full width and its percentage is hidden. Non-capped states keep
    their normal proportional bars and percentages.
---
 .../core_api/datamodels/ui/dashboard.py            |  11 +-
 .../api_fastapi/core_api/openapi/_private_ui.yaml  |  29 +-----
 .../api_fastapi/core_api/routes/ui/dashboard.py    | 112 +++++++++++----------
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts |  34 +------
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  12 +--
 .../Dashboard/HistoricalMetrics/DagRunMetrics.tsx  |  21 ++--
 .../HistoricalMetrics/HistoricalMetrics.tsx        |  16 ++-
 .../Dashboard/HistoricalMetrics/MetricSection.tsx  |  22 ++--
 .../HistoricalMetrics/TaskInstanceMetrics.tsx      |  21 ++--
 .../core_api/routes/ui/test_dashboard.py           |  31 +++++-
 10 files changed, 131 insertions(+), 178 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
index bf2afa9fcd3..4f48ea48351 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
@@ -19,15 +19,6 @@ from __future__ import annotations
 from airflow.api_fastapi.core_api.base import BaseModel
 
 
-class DAGRunTypes(BaseModel):
-    """DAG Run Types for responses."""
-
-    backfill: int
-    scheduled: int
-    manual: int
-    asset_triggered: int
-
-
 class DAGRunStates(BaseModel):
     """DAG Run States for responses."""
 
@@ -58,9 +49,9 @@ class TaskInstanceStateCount(BaseModel):
 class HistoricalMetricDataResponse(BaseModel):
     """Historical Metric Data serializer for responses."""
 
-    dag_run_types: DAGRunTypes
     dag_run_states: DAGRunStates
     task_instance_states: TaskInstanceStateCount
+    state_count_limit: int
 
 
 class DashboardDagStatsResponse(BaseModel):
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 6c38b7ce2d6..96a383b7299 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -1774,28 +1774,6 @@ components:
       - failed
       title: DAGRunStates
       description: DAG Run States for responses.
-    DAGRunTypes:
-      properties:
-        backfill:
-          type: integer
-          title: Backfill
-        scheduled:
-          type: integer
-          title: Scheduled
-        manual:
-          type: integer
-          title: Manual
-        asset_triggered:
-          type: integer
-          title: Asset Triggered
-      type: object
-      required:
-      - backfill
-      - scheduled
-      - manual
-      - asset_triggered
-      title: DAGRunTypes
-      description: DAG Run Types for responses.
     DAGWithLatestDagRunsCollectionResponse:
       properties:
         total_entries:
@@ -2523,17 +2501,18 @@ components:
       title: HTTPValidationError
     HistoricalMetricDataResponse:
       properties:
-        dag_run_types:
-          $ref: '#/components/schemas/DAGRunTypes'
         dag_run_states:
           $ref: '#/components/schemas/DAGRunStates'
         task_instance_states:
           $ref: '#/components/schemas/TaskInstanceStateCount'
+        state_count_limit:
+          type: integer
+          title: State Count Limit
       type: object
       required:
-      - dag_run_types
       - dag_run_states
       - task_instance_states
+      - state_count_limit
       title: HistoricalMetricDataResponse
       description: Historical Metric Data serializer for responses.
     JobResponse:
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py
index ba8697fcd18..1ce32badd4a 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 from typing import cast
 
 from fastapi import Depends, status
-from sqlalchemy import func, select
+from sqlalchemy import func, literal, select, union_all
 from sqlalchemy.sql.expression import case, false
 
 from airflow._shared.timezones import timezone
@@ -34,12 +34,16 @@ from airflow.api_fastapi.core_api.datamodels.ui.dashboard 
import (
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.api_fastapi.core_api.security import ReadableDagsFilterDep, 
requires_access_dag
 from airflow.models.dag import DagModel
-from airflow.models.dagrun import DagRun, DagRunType
+from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
 from airflow.utils.state import DagRunState, TaskInstanceState
 
 dashboard_router = AirflowRouter(tags=["Dashboard"], prefix="/dashboard")
 
+# Cap for state counts — avoids counting millions of rows.
+# The UI shows "N+" when the returned count equals this value.
+STATE_COUNT_CAP = 1000
+
 
 @dashboard_router.get(
     "/historical_metrics_data",
@@ -58,57 +62,59 @@ def historical_metrics(
     """Return cluster activity historical metrics."""
     current_time = timezone.utcnow()
     permitted_dag_ids = cast("set[str]", readable_dags_filter.value)
-    # DagRuns
-    dag_run_types = session.execute(
-        select(DagRun.run_type, func.count(DagRun.run_id))
-        .where(
-            func.coalesce(DagRun.start_date, current_time) >= start_date,
-            func.coalesce(DagRun.end_date, current_time) <= 
func.coalesce(end_date, current_time),
-        )
-        .where(DagRun.dag_id.in_(permitted_dag_ids))
-        .group_by(DagRun.run_type)
-    ).all()
-
-    dag_run_states = session.execute(
-        select(DagRun.state, func.count(DagRun.run_id))
-        .where(
-            func.coalesce(DagRun.start_date, current_time) >= start_date,
-            func.coalesce(DagRun.end_date, current_time) <= 
func.coalesce(end_date, current_time),
-        )
-        .where(DagRun.dag_id.in_(permitted_dag_ids))
-        .group_by(DagRun.state)
-    ).all()
-
-    # TaskInstances
-    task_instance_states = session.execute(
-        select(TaskInstance.state, func.count(TaskInstance.run_id))
-        .join(TaskInstance.dag_run)
-        .where(
-            func.coalesce(DagRun.start_date, current_time) >= start_date,
-            func.coalesce(DagRun.end_date, current_time) <= 
func.coalesce(end_date, current_time),
-        )
-        .where(DagRun.dag_id.in_(permitted_dag_ids))
-        .group_by(TaskInstance.state)
-    ).all()
-
-    # Combining historical metrics response as dictionary
-    historical_metrics_response = {
-        "dag_run_types": {
-            **{dag_run_type.value: 0 for dag_run_type in DagRunType},
-            **{row.run_type: row.count for row in dag_run_types},
-        },
-        "dag_run_states": {
-            **{dag_run_state.value: 0 for dag_run_state in DagRunState},
-            **{row.state: row.count for row in dag_run_states},
-        },
-        "task_instance_states": {
-            "no_status": 0,
-            **{ti_state.value: 0 for ti_state in TaskInstanceState},
-            **{ti_state or "no_status": sum_value for ti_state, sum_value in 
task_instance_states},
-        },
-    }
-
-    return 
HistoricalMetricDataResponse.model_validate(historical_metrics_response)
+
+    dag_run_filters = [
+        func.coalesce(DagRun.start_date, current_time) >= start_date,
+        func.coalesce(DagRun.end_date, current_time) <= 
func.coalesce(end_date, current_time),
+        DagRun.dag_id.in_(permitted_dag_ids),
+    ]
+
+    # Build one LIMIT-capped subquery per state, then UNION ALL them into a
+    # single query.  Every state gets the same treatment: at most 
STATE_COUNT_CAP
+    # rows are read from the index, so even states with millions of rows
+    # (typically "success") are counted in single-digit milliseconds.
+    # Each branch is wrapped in a subquery so LIMIT works on all backends
+    # (SQLite rejects LIMIT inside bare UNION ALL arms).
+    def _capped_state_counts(model, states, label_fn, join=None):
+        branches = []
+        for state in states:
+            stmt = 
select(literal(label_fn(state)).label("state")).select_from(model)
+            if join is not None:
+                stmt = stmt.join(join)
+            branch = (
+                stmt.where(*dag_run_filters)
+                .where(model.state == state if state else 
model.state.is_(None))
+                .limit(STATE_COUNT_CAP)
+                .subquery()
+            )
+            branches.append(select(branch.c.state))
+        capped = union_all(*branches).subquery()
+        return session.execute(
+            select(capped.c.state, 
func.count().label("cnt")).group_by(capped.c.state)
+        ).all()
+
+    dag_run_state_counts = _capped_state_counts(DagRun, list(DagRunState), 
lambda s: s.value)
+    ti_state_counts = _capped_state_counts(
+        TaskInstance,
+        [None, *TaskInstanceState],
+        lambda s: s.value if s else "no_status",
+        join=TaskInstance.dag_run,
+    )
+
+    return HistoricalMetricDataResponse.model_validate(
+        {
+            "dag_run_states": {
+                **{dag_run_state.value: 0 for dag_run_state in DagRunState},
+                **{row.state: row.cnt for row in dag_run_state_counts},
+            },
+            "task_instance_states": {
+                "no_status": 0,
+                **{ti_state.value: 0 for ti_state in TaskInstanceState},
+                **{row.state: row.cnt for row in ti_state_counts},
+            },
+            "state_count_limit": STATE_COUNT_CAP,
+        }
+    )
 
 
 @dashboard_router.get(
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 6c540964655..1be05b11908 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -7519,31 +7519,6 @@ export const $DAGRunStates = {
     description: 'DAG Run States for responses.'
 } as const;
 
-export const $DAGRunTypes = {
-    properties: {
-        backfill: {
-            type: 'integer',
-            title: 'Backfill'
-        },
-        scheduled: {
-            type: 'integer',
-            title: 'Scheduled'
-        },
-        manual: {
-            type: 'integer',
-            title: 'Manual'
-        },
-        asset_triggered: {
-            type: 'integer',
-            title: 'Asset Triggered'
-        }
-    },
-    type: 'object',
-    required: ['backfill', 'scheduled', 'manual', 'asset_triggered'],
-    title: 'DAGRunTypes',
-    description: 'DAG Run Types for responses.'
-} as const;
-
 export const $DAGWithLatestDagRunsCollectionResponse = {
     properties: {
         total_entries: {
@@ -8287,18 +8262,19 @@ export const $GridTISummaries = {
 
 export const $HistoricalMetricDataResponse = {
     properties: {
-        dag_run_types: {
-            '$ref': '#/components/schemas/DAGRunTypes'
-        },
         dag_run_states: {
             '$ref': '#/components/schemas/DAGRunStates'
         },
         task_instance_states: {
             '$ref': '#/components/schemas/TaskInstanceStateCount'
+        },
+        state_count_limit: {
+            type: 'integer',
+            title: 'State Count Limit'
         }
     },
     type: 'object',
-    required: ['dag_run_types', 'dag_run_states', 'task_instance_states'],
+    required: ['dag_run_states', 'task_instance_states', 'state_count_limit'],
     title: 'HistoricalMetricDataResponse',
     description: 'Historical Metric Data serializer for responses.'
 } as const;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 038c02ee2cd..e6c208c6393 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1859,16 +1859,6 @@ export type DAGRunStates = {
     failed: number;
 };
 
-/**
- * DAG Run Types for responses.
- */
-export type DAGRunTypes = {
-    backfill: number;
-    scheduled: number;
-    manual: number;
-    asset_triggered: number;
-};
-
 /**
  * DAG with latest dag runs collection response serializer.
  */
@@ -2046,9 +2036,9 @@ export type GridTISummaries = {
  * Historical Metric Data serializer for responses.
  */
 export type HistoricalMetricDataResponse = {
-    dag_run_types: DAGRunTypes;
     dag_run_states: DAGRunStates;
     task_instance_states: TaskInstanceStateCount;
+    state_count_limit: number;
 };
 
 /**
diff --git 
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/DagRunMetrics.tsx
 
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/DagRunMetrics.tsx
index 8eb824b60f6..a6578c0f383 100644
--- 
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/DagRunMetrics.tsx
+++ 
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/DagRunMetrics.tsx
@@ -20,10 +20,6 @@ import { Box, Separator, Heading, HStack, Stack } from 
"@chakra-ui/react";
 import type { DAGRunStates } from "openapi-gen/requests/types.gen";
 import { useTranslation } from "react-i18next";
 import { FiBarChart } from "react-icons/fi";
-import { Link as RouterLink } from "react-router-dom";
-
-import { StateBadge } from "src/components/StateBadge";
-import { SearchParamsKeys } from "src/constants/searchParams";
 
 import { MetricSection } from "./MetricSection";
 
@@ -31,31 +27,26 @@ type DagRunMetricsProps = {
   readonly dagRunStates: DAGRunStates;
   readonly endDate?: string;
   readonly startDate: string;
-  readonly total: number;
+  readonly stateCountLimit: number;
 };
 
 const DAGRUN_STATES: Array<keyof DAGRunStates> = ["queued", "running", 
"success", "failed"];
 
-export const DagRunMetrics = ({ dagRunStates, endDate, startDate, total }: 
DagRunMetricsProps) => {
+export const DagRunMetrics = ({ dagRunStates, endDate, startDate, 
stateCountLimit }: DagRunMetricsProps) => {
   const { t: translate } = useTranslation();
+  const total = Object.values(dagRunStates).reduce((sum, count) => sum + 
count, 0);
 
   return (
     <Box borderRadius={5} borderWidth={1} p={4}>
       <HStack>
-        <RouterLink
-          to={`/dag_runs?${SearchParamsKeys.START_DATE}=${startDate}${endDate 
=== undefined ? "" : `&${SearchParamsKeys.END_DATE}=${endDate}`}`}
-        >
-          <StateBadge colorPalette="brand" fontSize="md" variant="solid">
-            <FiBarChart />
-            {total}
-          </StateBadge>
-        </RouterLink>
-        <Heading size="md">{translate("dagRun", { count: total })}</Heading>
+        <FiBarChart />
+        <Heading size="md">{translate("dagRun", { count: 2 })}</Heading>
       </HStack>
       <Separator my={3} />
       <Stack gap={4}>
         {DAGRUN_STATES.map((state) => (
           <MetricSection
+            capped={dagRunStates[state] >= stateCountLimit}
             endDate={endDate}
             key={state}
             kind="dag_runs"
diff --git 
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx
 
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx
index bd60f278edc..6609bc89da2 100644
--- 
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx
+++ 
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx
@@ -53,14 +53,6 @@ export const HistoricalMetrics = () => {
     },
   );
 
-  const dagRunTotal = data
-    ? Object.values(data.dag_run_states).reduce((partialSum, value) => 
partialSum + value, 0)
-    : 0;
-
-  const taskRunTotal = data
-    ? Object.values(data.task_instance_states).reduce((partialSum, value) => 
partialSum + value, 0)
-    : 0;
-
   const { data: assetEventsData, isLoading: isLoadingAssetEvents } = 
useAssetServiceGetAssetEvents({
     limit: 6,
     orderBy: [assetSortBy],
@@ -90,11 +82,15 @@ export const HistoricalMetrics = () => {
             {isLoading ? <MetricSectionSkeleton /> : undefined}
             {!isLoading && data !== undefined && (
               <Box>
-                <DagRunMetrics dagRunStates={data.dag_run_states} 
startDate={startDate} total={dagRunTotal} />
+                <DagRunMetrics
+                  dagRunStates={data.dag_run_states}
+                  startDate={startDate}
+                  stateCountLimit={data.state_count_limit}
+                />
                 <TaskInstanceMetrics
                   startDate={startDate}
+                  stateCountLimit={data.state_count_limit}
                   taskInstanceStates={data.task_instance_states}
-                  total={taskRunTotal}
                 />
               </Box>
             )}
diff --git 
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/MetricSection.tsx
 
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/MetricSection.tsx
index 611884880a4..714c322ca23 100644
--- 
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/MetricSection.tsx
+++ 
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/MetricSection.tsx
@@ -28,6 +28,7 @@ const BAR_WIDTH = 100;
 const BAR_HEIGHT = 5;
 
 type MetricSectionProps = {
+  readonly capped?: boolean;
   readonly endDate?: string;
   readonly kind: string;
   readonly runs: number;
@@ -36,12 +37,18 @@ type MetricSectionProps = {
   readonly total: number;
 };
 
-export const MetricSection = ({ endDate, kind, runs, startDate, state, total 
}: MetricSectionProps) => {
-  // Calculate the given state as a percentage of total and draw a bar
-  // in state's color with width as state's percentage and remaining width 
filed as gray
-  const statePercent = total === 0 ? 0 : ((runs / total) * 100).toFixed(2);
-  const stateWidth = total === 0 ? 0 : (runs / total) * BAR_WIDTH;
+export const MetricSection = ({
+  capped = false,
+  endDate,
+  kind,
+  runs,
+  startDate,
+  state,
+  total,
+}: MetricSectionProps) => {
+  const stateWidth = capped ? BAR_WIDTH : total === 0 ? 0 : (runs / total) * 
BAR_WIDTH;
   const remainingWidth = BAR_WIDTH - stateWidth;
+  const statePercent = capped ? undefined : total === 0 ? 0 : ((runs / total) 
* 100).toFixed(2);
 
   const stateParam = kind === "task_instances" ? SearchParamsKeys.TASK_STATE : 
SearchParamsKeys.STATE;
   const searchParams = new URLSearchParams(
@@ -60,12 +67,13 @@ export const MetricSection = ({ endDate, kind, runs, 
startDate, state, total }:
           <RouterLink to={`/${kind}?${searchParams.toString()}`}>
             {/* eslint-disable-next-line unicorn/no-null */}
             <StateBadge fontSize="md" state={state === "no_status" ? null : 
state}>
-              {runs}
+              {}
+              {capped ? `${runs}+` : runs}
             </StateBadge>
           </RouterLink>
           <Text>{translate(`states.${state}`)}</Text>
         </HStack>
-        <Text color="fg.muted"> {statePercent}% </Text>
+        {statePercent === undefined ? undefined : <Text color="fg.muted"> 
{statePercent}% </Text>}
       </Flex>
       <HStack gap={0} mt={2}>
         <Box
diff --git 
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
 
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
index 78d8f748ccd..f36c0a4635b 100644
--- 
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
+++ 
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
@@ -20,18 +20,14 @@ import { Box, Separator, Heading, HStack, Stack } from 
"@chakra-ui/react";
 import type { TaskInstanceState, TaskInstanceStateCount } from 
"openapi-gen/requests/types.gen";
 import { useTranslation } from "react-i18next";
 import { MdOutlineTask } from "react-icons/md";
-import { Link as RouterLink } from "react-router-dom";
-
-import { StateBadge } from "src/components/StateBadge";
-import { SearchParamsKeys } from "src/constants/searchParams";
 
 import { MetricSection } from "./MetricSection";
 
 type TaskInstanceMetricsProps = {
   readonly endDate?: string;
   readonly startDate: string;
+  readonly stateCountLimit: number;
   readonly taskInstanceStates: TaskInstanceStateCount;
-  readonly total: number;
 };
 
 const TASK_STATES: Array<keyof TaskInstanceStateCount> = [
@@ -53,23 +49,17 @@ const TASK_STATES: Array<keyof TaskInstanceStateCount> = [
 export const TaskInstanceMetrics = ({
   endDate,
   startDate,
+  stateCountLimit,
   taskInstanceStates,
-  total,
 }: TaskInstanceMetricsProps) => {
   const { t: translate } = useTranslation();
+  const total = Object.values(taskInstanceStates).reduce((sum, count) => sum + 
count, 0);
 
   return (
     <Box borderRadius={5} borderWidth={1} mt={2} p={4}>
       <HStack>
-        <RouterLink
-          
to={`/task_instances?${SearchParamsKeys.START_DATE}=${startDate}${endDate === 
undefined ? "" : `&${SearchParamsKeys.END_DATE}=${endDate}`}`}
-        >
-          <StateBadge colorPalette="brand" fontSize="md" variant="solid">
-            <MdOutlineTask />
-            {total}
-          </StateBadge>
-        </RouterLink>
-        <Heading size="md">{translate("taskInstance", { count: total 
})}</Heading>
+        <MdOutlineTask />
+        <Heading size="md">{translate("taskInstance", { count: 2 })}</Heading>
       </HStack>
       <Separator my={3} />
       <Stack gap={4}>
@@ -78,6 +68,7 @@ export const TaskInstanceMetrics = ({
         ).map((state) =>
           taskInstanceStates[state] > 0 ? (
             <MetricSection
+              capped={taskInstanceStates[state] >= stateCountLimit}
               endDate={endDate}
               key={state}
               kind="task_instances"
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
index 291a02e68af..ccfc2b26f78 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 from datetime import timedelta
+from unittest import mock
 
 import pendulum
 import pytest
@@ -244,7 +245,6 @@ class TestHistoricalMetricsDataEndpoint:
                 {"start_date": "2023-01-01T00:00", "end_date": 
"2023-08-02T00:00"},
                 {
                     "dag_run_states": {"failed": 1, "queued": 1, "running": 1, 
"success": 1},
-                    "dag_run_types": {"backfill": 0, "asset_triggered": 1, 
"manual": 0, "scheduled": 3},
                     "task_instance_states": {
                         "deferred": 0,
                         "failed": 2,
@@ -260,13 +260,13 @@ class TestHistoricalMetricsDataEndpoint:
                         "up_for_retry": 0,
                         "upstream_failed": 0,
                     },
+                    "state_count_limit": 1000,
                 },
             ),
             (
                 {"start_date": "2023-02-02T00:00", "end_date": 
"2023-06-02T00:00"},
                 {
                     "dag_run_states": {"failed": 1, "queued": 0, "running": 0, 
"success": 0},
-                    "dag_run_types": {"backfill": 0, "asset_triggered": 1, 
"manual": 0, "scheduled": 0},
                     "task_instance_states": {
                         "deferred": 0,
                         "failed": 2,
@@ -282,13 +282,13 @@ class TestHistoricalMetricsDataEndpoint:
                         "up_for_retry": 0,
                         "upstream_failed": 0,
                     },
+                    "state_count_limit": 1000,
                 },
             ),
             (
                 {"start_date": "2023-02-02T00:00"},
                 {
                     "dag_run_states": {"failed": 1, "queued": 1, "running": 1, 
"success": 0},
-                    "dag_run_types": {"backfill": 0, "asset_triggered": 1, 
"manual": 0, "scheduled": 2},
                     "task_instance_states": {
                         "deferred": 0,
                         "failed": 2,
@@ -304,6 +304,7 @@ class TestHistoricalMetricsDataEndpoint:
                         "up_for_retry": 0,
                         "upstream_failed": 0,
                     },
+                    "state_count_limit": 1000,
                 },
             ),
         ],
@@ -315,6 +316,30 @@ class TestHistoricalMetricsDataEndpoint:
         assert response.status_code == 200
         assert response.json() == expected
 
+    @pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
+    def test_state_counts_are_capped(self, test_client):
+        """State counts are capped at STATE_COUNT_CAP; fixture creates 4 dag 
runs and 8 TIs."""
+        with 
mock.patch("airflow.api_fastapi.core_api.routes.ui.dashboard.STATE_COUNT_CAP", 
1):
+            response = test_client.get(
+                "/dashboard/historical_metrics_data",
+                params={"start_date": "2023-01-01T00:00", "end_date": 
"2023-08-02T00:00"},
+            )
+        assert response.status_code == 200
+        data = response.json()
+
+        assert data["state_count_limit"] == 1
+
+        dr_states = data["dag_run_states"]
+        assert dr_states["success"] == 1
+        assert dr_states["failed"] == 1
+        assert dr_states["running"] == 1
+        assert dr_states["queued"] == 1
+
+        ti_states = data["task_instance_states"]
+        assert ti_states["success"] == 1
+        assert ti_states["failed"] == 1
+        assert ti_states["no_status"] == 1
+
     def test_should_response_401(self, unauthenticated_test_client):
         response = unauthenticated_test_client.get(
             "/dashboard/historical_metrics_data", params={"start_date": 
"2023-02-02T00:00"}

Reply via email to