This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ae2f8147abd Improve historical metrics endpoint performance (#63526)
ae2f8147abd is described below
commit ae2f8147abd2a50909daf3da6c1e9894ab963a92
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Mon Mar 16 16:03:00 2026 +0100
Improve historical metrics endpoint performance (#63526)
* Optimize historical_metrics_data endpoint with capped success count
* Return state_count_limit in historical_metrics response
The UI now reads the cap value from the API response instead of
hardcoding it, keeping frontend and backend in sync.
* Hide percentage and fill bars when any state count is capped
When any state in a section reaches the cap, percentages become
unreliable. Fill every bar to full width and hide the percent
label instead of showing misleading numbers.
* Revert "Hide percentage and fill bars when any state count is capped"
This reverts commit facf46b60e2343d65fb5b457d7d8a7147925f59e.
* Hide percentage and fill bar only for individually capped states
When a specific state's count reaches the cap, its bar fills to
full width and its percentage is hidden. Non-capped states keep
their normal proportional bars and percentages.
---
.../core_api/datamodels/ui/dashboard.py | 11 +-
.../api_fastapi/core_api/openapi/_private_ui.yaml | 29 +-----
.../api_fastapi/core_api/routes/ui/dashboard.py | 112 +++++++++++----------
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 34 +------
.../airflow/ui/openapi-gen/requests/types.gen.ts | 12 +--
.../Dashboard/HistoricalMetrics/DagRunMetrics.tsx | 21 ++--
.../HistoricalMetrics/HistoricalMetrics.tsx | 16 ++-
.../Dashboard/HistoricalMetrics/MetricSection.tsx | 22 ++--
.../HistoricalMetrics/TaskInstanceMetrics.tsx | 21 ++--
.../core_api/routes/ui/test_dashboard.py | 31 +++++-
10 files changed, 131 insertions(+), 178 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
index bf2afa9fcd3..4f48ea48351 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dashboard.py
@@ -19,15 +19,6 @@ from __future__ import annotations
from airflow.api_fastapi.core_api.base import BaseModel
-class DAGRunTypes(BaseModel):
- """DAG Run Types for responses."""
-
- backfill: int
- scheduled: int
- manual: int
- asset_triggered: int
-
-
class DAGRunStates(BaseModel):
"""DAG Run States for responses."""
@@ -58,9 +49,9 @@ class TaskInstanceStateCount(BaseModel):
class HistoricalMetricDataResponse(BaseModel):
"""Historical Metric Data serializer for responses."""
- dag_run_types: DAGRunTypes
dag_run_states: DAGRunStates
task_instance_states: TaskInstanceStateCount
+ state_count_limit: int
class DashboardDagStatsResponse(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 6c38b7ce2d6..96a383b7299 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -1774,28 +1774,6 @@ components:
- failed
title: DAGRunStates
description: DAG Run States for responses.
- DAGRunTypes:
- properties:
- backfill:
- type: integer
- title: Backfill
- scheduled:
- type: integer
- title: Scheduled
- manual:
- type: integer
- title: Manual
- asset_triggered:
- type: integer
- title: Asset Triggered
- type: object
- required:
- - backfill
- - scheduled
- - manual
- - asset_triggered
- title: DAGRunTypes
- description: DAG Run Types for responses.
DAGWithLatestDagRunsCollectionResponse:
properties:
total_entries:
@@ -2523,17 +2501,18 @@ components:
title: HTTPValidationError
HistoricalMetricDataResponse:
properties:
- dag_run_types:
- $ref: '#/components/schemas/DAGRunTypes'
dag_run_states:
$ref: '#/components/schemas/DAGRunStates'
task_instance_states:
$ref: '#/components/schemas/TaskInstanceStateCount'
+ state_count_limit:
+ type: integer
+ title: State Count Limit
type: object
required:
- - dag_run_types
- dag_run_states
- task_instance_states
+ - state_count_limit
title: HistoricalMetricDataResponse
description: Historical Metric Data serializer for responses.
JobResponse:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py
index ba8697fcd18..1ce32badd4a 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dashboard.py
@@ -19,7 +19,7 @@ from __future__ import annotations
from typing import cast
from fastapi import Depends, status
-from sqlalchemy import func, select
+from sqlalchemy import func, literal, select, union_all
from sqlalchemy.sql.expression import case, false
from airflow._shared.timezones import timezone
@@ -34,12 +34,16 @@ from airflow.api_fastapi.core_api.datamodels.ui.dashboard
import (
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import ReadableDagsFilterDep,
requires_access_dag
from airflow.models.dag import DagModel
-from airflow.models.dagrun import DagRun, DagRunType
+from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.utils.state import DagRunState, TaskInstanceState
dashboard_router = AirflowRouter(tags=["Dashboard"], prefix="/dashboard")
+# Cap for state counts — avoids counting millions of rows.
+# The UI shows "N+" when the returned count equals this value.
+STATE_COUNT_CAP = 1000
+
@dashboard_router.get(
"/historical_metrics_data",
@@ -58,57 +62,59 @@ def historical_metrics(
"""Return cluster activity historical metrics."""
current_time = timezone.utcnow()
permitted_dag_ids = cast("set[str]", readable_dags_filter.value)
- # DagRuns
- dag_run_types = session.execute(
- select(DagRun.run_type, func.count(DagRun.run_id))
- .where(
- func.coalesce(DagRun.start_date, current_time) >= start_date,
- func.coalesce(DagRun.end_date, current_time) <=
func.coalesce(end_date, current_time),
- )
- .where(DagRun.dag_id.in_(permitted_dag_ids))
- .group_by(DagRun.run_type)
- ).all()
-
- dag_run_states = session.execute(
- select(DagRun.state, func.count(DagRun.run_id))
- .where(
- func.coalesce(DagRun.start_date, current_time) >= start_date,
- func.coalesce(DagRun.end_date, current_time) <=
func.coalesce(end_date, current_time),
- )
- .where(DagRun.dag_id.in_(permitted_dag_ids))
- .group_by(DagRun.state)
- ).all()
-
- # TaskInstances
- task_instance_states = session.execute(
- select(TaskInstance.state, func.count(TaskInstance.run_id))
- .join(TaskInstance.dag_run)
- .where(
- func.coalesce(DagRun.start_date, current_time) >= start_date,
- func.coalesce(DagRun.end_date, current_time) <=
func.coalesce(end_date, current_time),
- )
- .where(DagRun.dag_id.in_(permitted_dag_ids))
- .group_by(TaskInstance.state)
- ).all()
-
- # Combining historical metrics response as dictionary
- historical_metrics_response = {
- "dag_run_types": {
- **{dag_run_type.value: 0 for dag_run_type in DagRunType},
- **{row.run_type: row.count for row in dag_run_types},
- },
- "dag_run_states": {
- **{dag_run_state.value: 0 for dag_run_state in DagRunState},
- **{row.state: row.count for row in dag_run_states},
- },
- "task_instance_states": {
- "no_status": 0,
- **{ti_state.value: 0 for ti_state in TaskInstanceState},
- **{ti_state or "no_status": sum_value for ti_state, sum_value in
task_instance_states},
- },
- }
-
- return
HistoricalMetricDataResponse.model_validate(historical_metrics_response)
+
+ dag_run_filters = [
+ func.coalesce(DagRun.start_date, current_time) >= start_date,
+ func.coalesce(DagRun.end_date, current_time) <=
func.coalesce(end_date, current_time),
+ DagRun.dag_id.in_(permitted_dag_ids),
+ ]
+
+ # Build one LIMIT-capped subquery per state, then UNION ALL them into a
+ # single query. Every state gets the same treatment: at most
STATE_COUNT_CAP
+ # rows are read from the index, so even states with millions of rows
+ # (typically "success") are counted in single-digit milliseconds.
+ # Each branch is wrapped in a subquery so LIMIT works on all backends
+ # (SQLite rejects LIMIT inside bare UNION ALL arms).
+ def _capped_state_counts(model, states, label_fn, join=None):
+ branches = []
+ for state in states:
+ stmt =
select(literal(label_fn(state)).label("state")).select_from(model)
+ if join is not None:
+ stmt = stmt.join(join)
+ branch = (
+ stmt.where(*dag_run_filters)
+ .where(model.state == state if state else
model.state.is_(None))
+ .limit(STATE_COUNT_CAP)
+ .subquery()
+ )
+ branches.append(select(branch.c.state))
+ capped = union_all(*branches).subquery()
+ return session.execute(
+ select(capped.c.state,
func.count().label("cnt")).group_by(capped.c.state)
+ ).all()
+
+ dag_run_state_counts = _capped_state_counts(DagRun, list(DagRunState),
lambda s: s.value)
+ ti_state_counts = _capped_state_counts(
+ TaskInstance,
+ [None, *TaskInstanceState],
+ lambda s: s.value if s else "no_status",
+ join=TaskInstance.dag_run,
+ )
+
+ return HistoricalMetricDataResponse.model_validate(
+ {
+ "dag_run_states": {
+ **{dag_run_state.value: 0 for dag_run_state in DagRunState},
+ **{row.state: row.cnt for row in dag_run_state_counts},
+ },
+ "task_instance_states": {
+ "no_status": 0,
+ **{ti_state.value: 0 for ti_state in TaskInstanceState},
+ **{row.state: row.cnt for row in ti_state_counts},
+ },
+ "state_count_limit": STATE_COUNT_CAP,
+ }
+ )
@dashboard_router.get(
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 6c540964655..1be05b11908 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -7519,31 +7519,6 @@ export const $DAGRunStates = {
description: 'DAG Run States for responses.'
} as const;
-export const $DAGRunTypes = {
- properties: {
- backfill: {
- type: 'integer',
- title: 'Backfill'
- },
- scheduled: {
- type: 'integer',
- title: 'Scheduled'
- },
- manual: {
- type: 'integer',
- title: 'Manual'
- },
- asset_triggered: {
- type: 'integer',
- title: 'Asset Triggered'
- }
- },
- type: 'object',
- required: ['backfill', 'scheduled', 'manual', 'asset_triggered'],
- title: 'DAGRunTypes',
- description: 'DAG Run Types for responses.'
-} as const;
-
export const $DAGWithLatestDagRunsCollectionResponse = {
properties: {
total_entries: {
@@ -8287,18 +8262,19 @@ export const $GridTISummaries = {
export const $HistoricalMetricDataResponse = {
properties: {
- dag_run_types: {
- '$ref': '#/components/schemas/DAGRunTypes'
- },
dag_run_states: {
'$ref': '#/components/schemas/DAGRunStates'
},
task_instance_states: {
'$ref': '#/components/schemas/TaskInstanceStateCount'
+ },
+ state_count_limit: {
+ type: 'integer',
+ title: 'State Count Limit'
}
},
type: 'object',
- required: ['dag_run_types', 'dag_run_states', 'task_instance_states'],
+ required: ['dag_run_states', 'task_instance_states', 'state_count_limit'],
title: 'HistoricalMetricDataResponse',
description: 'Historical Metric Data serializer for responses.'
} as const;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 038c02ee2cd..e6c208c6393 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1859,16 +1859,6 @@ export type DAGRunStates = {
failed: number;
};
-/**
- * DAG Run Types for responses.
- */
-export type DAGRunTypes = {
- backfill: number;
- scheduled: number;
- manual: number;
- asset_triggered: number;
-};
-
/**
* DAG with latest dag runs collection response serializer.
*/
@@ -2046,9 +2036,9 @@ export type GridTISummaries = {
* Historical Metric Data serializer for responses.
*/
export type HistoricalMetricDataResponse = {
- dag_run_types: DAGRunTypes;
dag_run_states: DAGRunStates;
task_instance_states: TaskInstanceStateCount;
+ state_count_limit: number;
};
/**
diff --git
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/DagRunMetrics.tsx
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/DagRunMetrics.tsx
index 8eb824b60f6..a6578c0f383 100644
---
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/DagRunMetrics.tsx
+++
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/DagRunMetrics.tsx
@@ -20,10 +20,6 @@ import { Box, Separator, Heading, HStack, Stack } from
"@chakra-ui/react";
import type { DAGRunStates } from "openapi-gen/requests/types.gen";
import { useTranslation } from "react-i18next";
import { FiBarChart } from "react-icons/fi";
-import { Link as RouterLink } from "react-router-dom";
-
-import { StateBadge } from "src/components/StateBadge";
-import { SearchParamsKeys } from "src/constants/searchParams";
import { MetricSection } from "./MetricSection";
@@ -31,31 +27,26 @@ type DagRunMetricsProps = {
readonly dagRunStates: DAGRunStates;
readonly endDate?: string;
readonly startDate: string;
- readonly total: number;
+ readonly stateCountLimit: number;
};
const DAGRUN_STATES: Array<keyof DAGRunStates> = ["queued", "running",
"success", "failed"];
-export const DagRunMetrics = ({ dagRunStates, endDate, startDate, total }:
DagRunMetricsProps) => {
+export const DagRunMetrics = ({ dagRunStates, endDate, startDate,
stateCountLimit }: DagRunMetricsProps) => {
const { t: translate } = useTranslation();
+ const total = Object.values(dagRunStates).reduce((sum, count) => sum +
count, 0);
return (
<Box borderRadius={5} borderWidth={1} p={4}>
<HStack>
- <RouterLink
- to={`/dag_runs?${SearchParamsKeys.START_DATE}=${startDate}${endDate
=== undefined ? "" : `&${SearchParamsKeys.END_DATE}=${endDate}`}`}
- >
- <StateBadge colorPalette="brand" fontSize="md" variant="solid">
- <FiBarChart />
- {total}
- </StateBadge>
- </RouterLink>
- <Heading size="md">{translate("dagRun", { count: total })}</Heading>
+ <FiBarChart />
+ <Heading size="md">{translate("dagRun", { count: 2 })}</Heading>
</HStack>
<Separator my={3} />
<Stack gap={4}>
{DAGRUN_STATES.map((state) => (
<MetricSection
+ capped={dagRunStates[state] >= stateCountLimit}
endDate={endDate}
key={state}
kind="dag_runs"
diff --git
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx
index bd60f278edc..6609bc89da2 100644
---
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx
+++
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/HistoricalMetrics.tsx
@@ -53,14 +53,6 @@ export const HistoricalMetrics = () => {
},
);
- const dagRunTotal = data
- ? Object.values(data.dag_run_states).reduce((partialSum, value) =>
partialSum + value, 0)
- : 0;
-
- const taskRunTotal = data
- ? Object.values(data.task_instance_states).reduce((partialSum, value) =>
partialSum + value, 0)
- : 0;
-
const { data: assetEventsData, isLoading: isLoadingAssetEvents } =
useAssetServiceGetAssetEvents({
limit: 6,
orderBy: [assetSortBy],
@@ -90,11 +82,15 @@ export const HistoricalMetrics = () => {
{isLoading ? <MetricSectionSkeleton /> : undefined}
{!isLoading && data !== undefined && (
<Box>
- <DagRunMetrics dagRunStates={data.dag_run_states}
startDate={startDate} total={dagRunTotal} />
+ <DagRunMetrics
+ dagRunStates={data.dag_run_states}
+ startDate={startDate}
+ stateCountLimit={data.state_count_limit}
+ />
<TaskInstanceMetrics
startDate={startDate}
+ stateCountLimit={data.state_count_limit}
taskInstanceStates={data.task_instance_states}
- total={taskRunTotal}
/>
</Box>
)}
diff --git
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/MetricSection.tsx
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/MetricSection.tsx
index 611884880a4..714c322ca23 100644
---
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/MetricSection.tsx
+++
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/MetricSection.tsx
@@ -28,6 +28,7 @@ const BAR_WIDTH = 100;
const BAR_HEIGHT = 5;
type MetricSectionProps = {
+ readonly capped?: boolean;
readonly endDate?: string;
readonly kind: string;
readonly runs: number;
@@ -36,12 +37,18 @@ type MetricSectionProps = {
readonly total: number;
};
-export const MetricSection = ({ endDate, kind, runs, startDate, state, total
}: MetricSectionProps) => {
- // Calculate the given state as a percentage of total and draw a bar
- // in state's color with width as state's percentage and remaining width
filed as gray
- const statePercent = total === 0 ? 0 : ((runs / total) * 100).toFixed(2);
- const stateWidth = total === 0 ? 0 : (runs / total) * BAR_WIDTH;
+export const MetricSection = ({
+ capped = false,
+ endDate,
+ kind,
+ runs,
+ startDate,
+ state,
+ total,
+}: MetricSectionProps) => {
+ const stateWidth = capped ? BAR_WIDTH : total === 0 ? 0 : (runs / total) *
BAR_WIDTH;
const remainingWidth = BAR_WIDTH - stateWidth;
+ const statePercent = capped ? undefined : total === 0 ? 0 : ((runs / total)
* 100).toFixed(2);
const stateParam = kind === "task_instances" ? SearchParamsKeys.TASK_STATE :
SearchParamsKeys.STATE;
const searchParams = new URLSearchParams(
@@ -60,12 +67,13 @@ export const MetricSection = ({ endDate, kind, runs,
startDate, state, total }:
<RouterLink to={`/${kind}?${searchParams.toString()}`}>
{/* eslint-disable-next-line unicorn/no-null */}
<StateBadge fontSize="md" state={state === "no_status" ? null :
state}>
- {runs}
+ {}
+ {capped ? `${runs}+` : runs}
</StateBadge>
</RouterLink>
<Text>{translate(`states.${state}`)}</Text>
</HStack>
- <Text color="fg.muted"> {statePercent}% </Text>
+ {statePercent === undefined ? undefined : <Text color="fg.muted">
{statePercent}% </Text>}
</Flex>
<HStack gap={0} mt={2}>
<Box
diff --git
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
index 78d8f748ccd..f36c0a4635b 100644
---
a/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
+++
b/airflow-core/src/airflow/ui/src/pages/Dashboard/HistoricalMetrics/TaskInstanceMetrics.tsx
@@ -20,18 +20,14 @@ import { Box, Separator, Heading, HStack, Stack } from
"@chakra-ui/react";
import type { TaskInstanceState, TaskInstanceStateCount } from
"openapi-gen/requests/types.gen";
import { useTranslation } from "react-i18next";
import { MdOutlineTask } from "react-icons/md";
-import { Link as RouterLink } from "react-router-dom";
-
-import { StateBadge } from "src/components/StateBadge";
-import { SearchParamsKeys } from "src/constants/searchParams";
import { MetricSection } from "./MetricSection";
type TaskInstanceMetricsProps = {
readonly endDate?: string;
readonly startDate: string;
+ readonly stateCountLimit: number;
readonly taskInstanceStates: TaskInstanceStateCount;
- readonly total: number;
};
const TASK_STATES: Array<keyof TaskInstanceStateCount> = [
@@ -53,23 +49,17 @@ const TASK_STATES: Array<keyof TaskInstanceStateCount> = [
export const TaskInstanceMetrics = ({
endDate,
startDate,
+ stateCountLimit,
taskInstanceStates,
- total,
}: TaskInstanceMetricsProps) => {
const { t: translate } = useTranslation();
+ const total = Object.values(taskInstanceStates).reduce((sum, count) => sum +
count, 0);
return (
<Box borderRadius={5} borderWidth={1} mt={2} p={4}>
<HStack>
- <RouterLink
-
to={`/task_instances?${SearchParamsKeys.START_DATE}=${startDate}${endDate ===
undefined ? "" : `&${SearchParamsKeys.END_DATE}=${endDate}`}`}
- >
- <StateBadge colorPalette="brand" fontSize="md" variant="solid">
- <MdOutlineTask />
- {total}
- </StateBadge>
- </RouterLink>
- <Heading size="md">{translate("taskInstance", { count: total
})}</Heading>
+ <MdOutlineTask />
+ <Heading size="md">{translate("taskInstance", { count: 2 })}</Heading>
</HStack>
<Separator my={3} />
<Stack gap={4}>
@@ -78,6 +68,7 @@ export const TaskInstanceMetrics = ({
).map((state) =>
taskInstanceStates[state] > 0 ? (
<MetricSection
+ capped={taskInstanceStates[state] >= stateCountLimit}
endDate={endDate}
key={state}
kind="task_instances"
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
index 291a02e68af..ccfc2b26f78 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dashboard.py
@@ -18,6 +18,7 @@
from __future__ import annotations
from datetime import timedelta
+from unittest import mock
import pendulum
import pytest
@@ -244,7 +245,6 @@ class TestHistoricalMetricsDataEndpoint:
{"start_date": "2023-01-01T00:00", "end_date":
"2023-08-02T00:00"},
{
"dag_run_states": {"failed": 1, "queued": 1, "running": 1,
"success": 1},
- "dag_run_types": {"backfill": 0, "asset_triggered": 1,
"manual": 0, "scheduled": 3},
"task_instance_states": {
"deferred": 0,
"failed": 2,
@@ -260,13 +260,13 @@ class TestHistoricalMetricsDataEndpoint:
"up_for_retry": 0,
"upstream_failed": 0,
},
+ "state_count_limit": 1000,
},
),
(
{"start_date": "2023-02-02T00:00", "end_date":
"2023-06-02T00:00"},
{
"dag_run_states": {"failed": 1, "queued": 0, "running": 0,
"success": 0},
- "dag_run_types": {"backfill": 0, "asset_triggered": 1,
"manual": 0, "scheduled": 0},
"task_instance_states": {
"deferred": 0,
"failed": 2,
@@ -282,13 +282,13 @@ class TestHistoricalMetricsDataEndpoint:
"up_for_retry": 0,
"upstream_failed": 0,
},
+ "state_count_limit": 1000,
},
),
(
{"start_date": "2023-02-02T00:00"},
{
"dag_run_states": {"failed": 1, "queued": 1, "running": 1,
"success": 0},
- "dag_run_types": {"backfill": 0, "asset_triggered": 1,
"manual": 0, "scheduled": 2},
"task_instance_states": {
"deferred": 0,
"failed": 2,
@@ -304,6 +304,7 @@ class TestHistoricalMetricsDataEndpoint:
"up_for_retry": 0,
"upstream_failed": 0,
},
+ "state_count_limit": 1000,
},
),
],
@@ -315,6 +316,30 @@ class TestHistoricalMetricsDataEndpoint:
assert response.status_code == 200
assert response.json() == expected
+ @pytest.mark.usefixtures("freeze_time_for_dagruns", "make_dag_runs")
+ def test_state_counts_are_capped(self, test_client):
+ """State counts are capped at STATE_COUNT_CAP; fixture creates 4 dag
runs and 8 TIs."""
+ with
mock.patch("airflow.api_fastapi.core_api.routes.ui.dashboard.STATE_COUNT_CAP",
1):
+ response = test_client.get(
+ "/dashboard/historical_metrics_data",
+ params={"start_date": "2023-01-01T00:00", "end_date":
"2023-08-02T00:00"},
+ )
+ assert response.status_code == 200
+ data = response.json()
+
+ assert data["state_count_limit"] == 1
+
+ dr_states = data["dag_run_states"]
+ assert dr_states["success"] == 1
+ assert dr_states["failed"] == 1
+ assert dr_states["running"] == 1
+ assert dr_states["queued"] == 1
+
+ ti_states = data["task_instance_states"]
+ assert ti_states["success"] == 1
+ assert ti_states["failed"] == 1
+ assert ti_states["no_status"] == 1
+
def test_should_response_401(self, unauthenticated_test_client):
response = unauthenticated_test_client.get(
"/dashboard/historical_metrics_data", params={"start_date":
"2023-02-02T00:00"}