This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 31e1d87e129 Split e2e test-helpers.ts into domain-scoped modules
(#66387)
31e1d87e129 is described below
commit 31e1d87e12906d271f630136d93fbdacc47e7ef6
Author: Yeonguk Choo <[email protected]>
AuthorDate: Sat Jun 13 12:53:08 2026 +0900
Split e2e test-helpers.ts into domain-scoped modules (#66387)
---
.../airflow/ui/tests/e2e/fixtures/asset-data.ts | 2 +-
.../ui/tests/e2e/fixtures/audit-log-data.ts | 2 +-
.../airflow/ui/tests/e2e/fixtures/calendar-data.ts | 4 +-
.../airflow/ui/tests/e2e/fixtures/dag-runs-data.ts | 4 +-
.../ui/tests/e2e/fixtures/dashboard-data.ts | 2 +-
.../src/airflow/ui/tests/e2e/fixtures/data.ts | 4 +-
.../ui/tests/e2e/fixtures/task-instances-data.ts | 8 +-
.../src/airflow/ui/tests/e2e/fixtures/xcom-data.ts | 4 +-
.../src/airflow/ui/tests/e2e/global-setup.ts | 2 +-
.../src/airflow/ui/tests/e2e/pages/BackfillPage.ts | 2 +-
.../airflow/ui/tests/e2e/pages/ConnectionsPage.ts | 2 +-
.../ui/tests/e2e/pages/RequiredActionsPage.ts | 2 +-
.../airflow/ui/tests/e2e/specs/backfill.spec.ts | 4 +-
.../airflow/ui/tests/e2e/specs/connections.spec.ts | 2 +-
.../airflow/ui/tests/e2e/specs/dags-list.spec.ts | 2 +-
.../src/airflow/ui/tests/e2e/specs/pools.spec.ts | 2 +-
.../ui/tests/e2e/specs/requiredAction.spec.ts | 3 +-
.../airflow/ui/tests/e2e/specs/variable.spec.ts | 3 +-
.../airflow/ui/tests/e2e/utils/api/backfills.ts | 188 +++++
.../src/airflow/ui/tests/e2e/utils/api/dag-runs.ts | 345 +++++++++
.../src/airflow/ui/tests/e2e/utils/api/hitl.ts | 194 +++++
.../airflow/ui/tests/e2e/utils/api/variables.ts | 66 ++
.../dashboard-data.ts => utils/shared.ts} | 32 +-
.../src/airflow/ui/tests/e2e/utils/test-helpers.ts | 802 ---------------------
.../src/airflow/ui/tests/e2e/utils/ui/waits.ts | 64 ++
25 files changed, 899 insertions(+), 846 deletions(-)
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/asset-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/asset-data.ts
index c373f20494d..4821548c747 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/fixtures/asset-data.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/asset-data.ts
@@ -26,7 +26,7 @@ import {
safeCleanupDagRun,
waitForDagReady,
waitForDagRunStatus,
-} from "tests/e2e/utils/test-helpers";
+} from "tests/e2e/utils/api/dag-runs";
export type AssetData = {
dagId: string;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/audit-log-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/audit-log-data.ts
index a749ac04ae9..14092e58bf4 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/fixtures/audit-log-data.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/audit-log-data.ts
@@ -27,7 +27,7 @@ import {
safeCleanupDagRun,
waitForDagReady,
waitForDagRunStatus,
-} from "tests/e2e/utils/test-helpers";
+} from "tests/e2e/utils/api/dag-runs";
export type AuditLogData = {
dagId: string;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/calendar-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/calendar-data.ts
index b024c958bdf..96fdc228cd5 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/fixtures/calendar-data.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/calendar-data.ts
@@ -27,9 +27,9 @@ import {
apiCreateDagRun,
apiSetDagRunState,
safeCleanupDagRun,
- uniqueRunId,
waitForDagReady,
-} from "tests/e2e/utils/test-helpers";
+} from "tests/e2e/utils/api/dag-runs";
+import { uniqueRunId } from "tests/e2e/utils/shared";
export type CalendarRunsData = {
dagId: string;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/dag-runs-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/dag-runs-data.ts
index 1723b9f809d..d5890e67b2d 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/fixtures/dag-runs-data.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/dag-runs-data.ts
@@ -26,9 +26,9 @@ import {
apiCreateDagRun,
apiSetDagRunState,
safeCleanupDagRun,
- uniqueRunId,
waitForDagReady,
-} from "tests/e2e/utils/test-helpers";
+} from "tests/e2e/utils/api/dag-runs";
+import { uniqueRunId } from "tests/e2e/utils/shared";
export type DagRunsPageData = {
dag1Id: string;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/dashboard-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/dashboard-data.ts
index a574fd0b3b8..3eccceef99f 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/fixtures/dashboard-data.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/dashboard-data.ts
@@ -22,7 +22,7 @@
*/
import { testConfig } from "playwright.config";
import { test as base } from "tests/e2e/fixtures";
-import { safeCleanupDagRun } from "tests/e2e/utils/test-helpers";
+import { safeCleanupDagRun } from "tests/e2e/utils/api/dag-runs";
export type DagRunCleanup = {
track: (runId: string) => void;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/data.ts
index 8bd949fbf98..bc0352916d2 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/fixtures/data.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/data.ts
@@ -34,10 +34,10 @@ import {
apiSetDagRunState,
apiTriggerDagRun,
safeCleanupDagRun,
- uniqueRunId,
waitForDagReady,
waitForDagRunStatus,
-} from "../utils/test-helpers";
+} from "../utils/api/dag-runs";
+import { uniqueRunId } from "../utils/shared";
import { test as base } from "./pom";
/** Shape returned by single Dag run fixtures. */
diff --git
a/airflow-core/src/airflow/ui/tests/e2e/fixtures/task-instances-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/task-instances-data.ts
index b212b2c8884..aac083c8566 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/fixtures/task-instances-data.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/task-instances-data.ts
@@ -23,12 +23,8 @@
import { expect, type APIRequestContext } from "@playwright/test";
import { testConfig } from "playwright.config";
import { test as base } from "tests/e2e/fixtures";
-import {
- apiCreateDagRun,
- safeCleanupDagRun,
- uniqueRunId,
- waitForDagReady,
-} from "tests/e2e/utils/test-helpers";
+import { apiCreateDagRun, safeCleanupDagRun, waitForDagReady } from
"tests/e2e/utils/api/dag-runs";
+import { uniqueRunId } from "tests/e2e/utils/shared";
export type TaskInstancesData = {
dagId: string;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/xcom-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/fixtures/xcom-data.ts
index 48edcfadf0e..43745452846 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/fixtures/xcom-data.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/fixtures/xcom-data.ts
@@ -25,10 +25,10 @@ import { test as base } from "tests/e2e/fixtures";
import {
apiCreateDagRun,
safeCleanupDagRun,
- uniqueRunId,
waitForDagReady,
waitForDagRunStatus,
-} from "tests/e2e/utils/test-helpers";
+} from "tests/e2e/utils/api/dag-runs";
+import { uniqueRunId } from "tests/e2e/utils/shared";
export type XcomRunsData = {
dagId: string;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/global-setup.ts
b/airflow-core/src/airflow/ui/tests/e2e/global-setup.ts
index bab5353a65b..77313e49102 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/global-setup.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/global-setup.ts
@@ -22,7 +22,7 @@ import path from "node:path";
import { AUTH_FILE, testConfig } from "../../playwright.config";
import { LoginPage } from "./pages/LoginPage";
-import { waitForDagReady } from "./utils/test-helpers";
+import { waitForDagReady } from "./utils/api/dag-runs";
const browsers = { chromium, firefox, webkit };
diff --git a/airflow-core/src/airflow/ui/tests/e2e/pages/BackfillPage.ts
b/airflow-core/src/airflow/ui/tests/e2e/pages/BackfillPage.ts
index 10eb0ae78bd..c28843ccd8c 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/pages/BackfillPage.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/pages/BackfillPage.ts
@@ -24,7 +24,7 @@ import {
apiCancelAllActiveBackfills,
apiCancelBackfill,
apiWaitForNoActiveBackfill,
-} from "tests/e2e/utils/test-helpers";
+} from "tests/e2e/utils/api/backfills";
export const REPROCESS_API_TO_UI = {
completed: "All Runs",
diff --git a/airflow-core/src/airflow/ui/tests/e2e/pages/ConnectionsPage.ts
b/airflow-core/src/airflow/ui/tests/e2e/pages/ConnectionsPage.ts
index 96237f995e0..d16a89702ca 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/pages/ConnectionsPage.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/pages/ConnectionsPage.ts
@@ -18,7 +18,7 @@
*/
import { expect, type Locator, type Page } from "@playwright/test";
import { BasePage } from "tests/e2e/pages/BasePage";
-import { waitForStableRowCount } from "tests/e2e/utils/test-helpers";
+import { waitForStableRowCount } from "tests/e2e/utils/ui/waits";
type ConnectionDetails = {
conn_type: string;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/pages/RequiredActionsPage.ts
b/airflow-core/src/airflow/ui/tests/e2e/pages/RequiredActionsPage.ts
index ee2996f39b9..d524c9f7756 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/pages/RequiredActionsPage.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/pages/RequiredActionsPage.ts
@@ -18,7 +18,7 @@
*/
import { expect, type APIRequestContext, type Locator, type Page } from
"@playwright/test";
import { testConfig } from "playwright.config";
-import { apiTriggerDagRun, waitForDagReady } from
"tests/e2e/utils/test-helpers";
+import { apiTriggerDagRun, waitForDagReady } from
"tests/e2e/utils/api/dag-runs";
import { BasePage } from "./BasePage";
diff --git a/airflow-core/src/airflow/ui/tests/e2e/specs/backfill.spec.ts
b/airflow-core/src/airflow/ui/tests/e2e/specs/backfill.spec.ts
index d248ac14bbe..4d79d34f74c 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/specs/backfill.spec.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/specs/backfill.spec.ts
@@ -24,8 +24,8 @@ import {
apiCancelAllActiveBackfills,
apiCreateBackfill,
apiWaitForBackfillComplete,
- waitForDagReady,
-} from "tests/e2e/utils/test-helpers";
+} from "tests/e2e/utils/api/backfills";
+import { waitForDagReady } from "tests/e2e/utils/api/dag-runs";
// Fixed past dates avoid non-determinism from relative date calculations.
// Controls tests use wide, non-overlapping ranges so the scheduler cannot
diff --git a/airflow-core/src/airflow/ui/tests/e2e/specs/connections.spec.ts
b/airflow-core/src/airflow/ui/tests/e2e/specs/connections.spec.ts
index ba9fa1bc1d0..96e58eee059 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/specs/connections.spec.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/specs/connections.spec.ts
@@ -17,7 +17,7 @@
* under the License.
*/
import { expect, test } from "tests/e2e/fixtures";
-import { uniqueRunId } from "tests/e2e/utils/test-helpers";
+import { uniqueRunId } from "tests/e2e/utils/shared";
test.describe("Connections Page - List and Display", () => {
let seedConnection: { conn_type: string; connection_id: string; host: string
};
diff --git a/airflow-core/src/airflow/ui/tests/e2e/specs/dags-list.spec.ts
b/airflow-core/src/airflow/ui/tests/e2e/specs/dags-list.spec.ts
index 543a687e392..8b91cf44847 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/specs/dags-list.spec.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/specs/dags-list.spec.ts
@@ -18,7 +18,7 @@
*/
import { testConfig } from "playwright.config";
import { expect, test } from "tests/e2e/fixtures";
-import { apiDeleteDagRun, waitForDagRunStatus } from
"tests/e2e/utils/test-helpers";
+import { apiDeleteDagRun, waitForDagRunStatus } from
"tests/e2e/utils/api/dag-runs";
test.describe("Dag Trigger Workflow", () => {
const testDagId = testConfig.testDag.id;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/specs/pools.spec.ts
b/airflow-core/src/airflow/ui/tests/e2e/specs/pools.spec.ts
index dd97e4dd999..a4d87299994 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/specs/pools.spec.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/specs/pools.spec.ts
@@ -17,7 +17,7 @@
* under the License.
*/
import { expect, test } from "tests/e2e/fixtures";
-import { uniqueRunId } from "tests/e2e/utils/test-helpers";
+import { uniqueRunId } from "tests/e2e/utils/shared";
test.describe("Pools Page", () => {
test.setTimeout(60_000);
diff --git a/airflow-core/src/airflow/ui/tests/e2e/specs/requiredAction.spec.ts
b/airflow-core/src/airflow/ui/tests/e2e/specs/requiredAction.spec.ts
index 477461003c5..b55dc5ec330 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/specs/requiredAction.spec.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/specs/requiredAction.spec.ts
@@ -18,7 +18,8 @@
*/
import { testConfig } from "playwright.config";
import { expect, test } from "tests/e2e/fixtures";
-import { apiDeleteDagRun, setupHITLFlowViaAPI } from
"tests/e2e/utils/test-helpers";
+import { apiDeleteDagRun } from "tests/e2e/utils/api/dag-runs";
+import { setupHITLFlowViaAPI } from "tests/e2e/utils/api/hitl";
const hitlDagId = testConfig.testDag.hitlId;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/specs/variable.spec.ts
b/airflow-core/src/airflow/ui/tests/e2e/specs/variable.spec.ts
index cdd79a360ec..14c24229e12 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/specs/variable.spec.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/specs/variable.spec.ts
@@ -17,7 +17,8 @@
* under the License.
*/
import { expect, test } from "tests/e2e/fixtures";
-import { apiCreateVariable, apiDeleteVariable, uniqueRunId } from
"tests/e2e/utils/test-helpers";
+import { apiCreateVariable, apiDeleteVariable } from
"tests/e2e/utils/api/variables";
+import { uniqueRunId } from "tests/e2e/utils/shared";
test.describe("Variables Page", () => {
test.describe.configure({ mode: "serial" });
diff --git a/airflow-core/src/airflow/ui/tests/e2e/utils/api/backfills.ts
b/airflow-core/src/airflow/ui/tests/e2e/utils/api/backfills.ts
new file mode 100644
index 00000000000..2203dfb90f5
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/utils/api/backfills.ts
@@ -0,0 +1,188 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Backfill API helpers — create, cancel, poll completion.
+ */
+import { expect } from "@playwright/test";
+
+import { baseUrl, getRequestContext, type RequestLike } from "../shared";
+
+/** Cancel a single backfill via the API. 409 (already completed) is treated
as success. */
+export async function apiCancelBackfill(source: RequestLike, backfillId:
number): Promise<void> {
+ const request = getRequestContext(source);
+
+ const response = await
request.put(`${baseUrl}/api/v2/backfills/${backfillId}/cancel`, {
+ timeout: 10_000,
+ });
+
+ if (response.status() !== 200 && response.status() !== 409) {
+ throw new Error(`Cancel backfill failed (${response.status()})`);
+ }
+}
+
+/** Cancel all active (non-completed) backfills for a Dag. */
+export async function apiCancelAllActiveBackfills(source: RequestLike, dagId:
string): Promise<void> {
+ const request = getRequestContext(source);
+
+ const response = await
request.get(`${baseUrl}/api/v2/backfills?dag_id=${dagId}&limit=100`, {
+ timeout: 10_000,
+ });
+
+ if (!response.ok()) {
+ throw new Error(`List backfills failed (${response.status()})`);
+ }
+
+ const data = (await response.json()) as { backfills: Array<{ completed_at:
string | null; id: number }> };
+
+ for (const backfill of data.backfills) {
+ if (backfill.completed_at === null) {
+ await apiCancelBackfill(source, backfill.id);
+ }
+ }
+}
+
+/** Poll until all backfills for a Dag are completed. */
+export async function apiWaitForNoActiveBackfill(
+ source: RequestLike,
+ dagId: string,
+ timeout: number = 120_000,
+): Promise<void> {
+ const request = getRequestContext(source);
+
+ await expect
+ .poll(
+ async () => {
+ try {
+ const response = await
request.get(`${baseUrl}/api/v2/backfills?dag_id=${dagId}&limit=100`, {
+ timeout: 10_000,
+ });
+
+ if (!response.ok()) {
+ return false;
+ }
+
+ const data = (await response.json()) as {
+ backfills: Array<{ completed_at: string | null }>;
+ };
+
+ return data.backfills.every((b) => b.completed_at !== null);
+ } catch {
+ return false;
+ }
+ },
+ {
+ intervals: [2000, 5000, 10_000],
+ message: `Active backfills for Dag ${dagId} did not clear within
${timeout}ms`,
+ timeout,
+ },
+ )
+ .toBeTruthy();
+}
+
+/** Poll until a backfill reaches completed state. */
+export async function apiWaitForBackfillComplete(
+ source: RequestLike,
+ backfillId: number,
+ timeout: number = 120_000,
+): Promise<void> {
+ const request = getRequestContext(source);
+
+ await expect
+ .poll(
+ async () => {
+ try {
+ const response = await
request.get(`${baseUrl}/api/v2/backfills/${backfillId}`, {
+ timeout: 10_000,
+ });
+
+ if (!response.ok()) {
+ return false;
+ }
+
+ const data = (await response.json()) as { completed_at: string |
null };
+
+ return data.completed_at !== null;
+ } catch {
+ return false;
+ }
+ },
+ {
+ intervals: [2000, 5000, 10_000],
+ message: `Backfill ${backfillId} did not complete within ${timeout}ms`,
+ timeout,
+ },
+ )
+ .toBeTruthy();
+}
+
+/** Create a backfill via the API. On 409, cancels active backfills and
retries once. */
+export async function apiCreateBackfill(
+ source: RequestLike,
+ dagId: string,
+ options: {
+ fromDate: string;
+ maxActiveRuns?: number;
+ reprocessBehavior?: string;
+ toDate: string;
+ },
+): Promise<number> {
+ const request = getRequestContext(source);
+ const { fromDate, maxActiveRuns, reprocessBehavior = "none", toDate } =
options;
+
+ const body: Record<string, unknown> = {
+ dag_id: dagId,
+ from_date: fromDate,
+ reprocess_behavior: reprocessBehavior,
+ to_date: toDate,
+ };
+
+ if (maxActiveRuns !== undefined) {
+ body.max_active_runs = maxActiveRuns;
+ }
+
+ const response = await request.post(`${baseUrl}/api/v2/backfills`, {
+ data: body,
+ headers: { "Content-Type": "application/json" },
+ timeout: 10_000,
+ });
+
+ if (response.status() === 409) {
+ await apiCancelAllActiveBackfills(source, dagId);
+ await apiWaitForNoActiveBackfill(source, dagId, 30_000);
+
+ const retryResponse = await request.post(`${baseUrl}/api/v2/backfills`, {
+ data: body,
+ headers: { "Content-Type": "application/json" },
+ timeout: 10_000,
+ });
+
+ if (!retryResponse.ok()) {
+ throw new Error(`Backfill creation retry failed
(${retryResponse.status()})`);
+ }
+
+ return ((await retryResponse.json()) as { id: number }).id;
+ }
+
+ if (!response.ok()) {
+ throw new Error(`Backfill creation failed (${response.status()})`);
+ }
+
+ return ((await response.json()) as { id: number }).id;
+}
diff --git a/airflow-core/src/airflow/ui/tests/e2e/utils/api/dag-runs.ts
b/airflow-core/src/airflow/ui/tests/e2e/utils/api/dag-runs.ts
new file mode 100644
index 00000000000..31d4bc8ad79
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/utils/api/dag-runs.ts
@@ -0,0 +1,345 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * DAG run API helpers — trigger, create, set state, poll, delete, cleanup.
+ * Includes task-instance state polling since TIs belong to a DAG run.
+ */
+import { expect } from "@playwright/test";
+
+import { baseUrl, getRequestContext, type RequestLike, uniqueRunId } from
"../shared";
+
+type DagRunData = {
+ conf?: Record<string, unknown>;
+ dag_run_id: string;
+ logical_date: string;
+ note?: string;
+};
+
+type DagRunResponse = {
+ dag_run_id: string;
+ state: string;
+};
+
+/**
+ * Poll GET /api/v2/dags/{dagId} until 200 — waits for Dag to be parsed.
+ */
+export async function waitForDagReady(
+ source: RequestLike,
+ dagId: string,
+ options?: { timeout?: number },
+): Promise<void> {
+ const request = getRequestContext(source);
+
+ const timeout = options?.timeout ?? 120_000;
+
+ await expect
+ .poll(
+ async () => {
+ try {
+ const response = await
request.get(`${baseUrl}/api/v2/dags/${dagId}`, { timeout: 10_000 });
+
+ return response.ok();
+ } catch {
+ return false;
+ }
+ },
+ { intervals: [2000], timeout },
+ )
+ .toBe(true);
+}
+
+/**
+ * Trigger a Dag run with an auto-generated unique run ID.
+ * Returns the dagRunId and logicalDate for targeted polling.
+ */
+export async function apiTriggerDagRun(
+ source: RequestLike,
+ dagId: string,
+ options?: { runId?: string },
+): Promise<{ dagRunId: string; logicalDate: string }> {
+ const request = getRequestContext(source);
+
+ let dagRunId = options?.runId ?? uniqueRunId(dagId);
+ let resultLogicalDate = new Date().toISOString();
+
+ await expect(async () => {
+ // Generate a fresh logicalDate on each attempt so that a 409
+ // (logical_date collision from a parallel worker) is recoverable.
+ const logicalDate = new Date().toISOString();
+
+ const response = await
request.post(`${baseUrl}/api/v2/dags/${dagId}/dagRuns`, {
+ data: {
+ dag_run_id: dagRunId,
+ logical_date: logicalDate,
+ note: "e2e test",
+ },
+ headers: { "Content-Type": "application/json" },
+ timeout: 10_000,
+ });
+
+ if (!response.ok()) {
+ // On 409, regenerate dag_run_id (unless caller pinned it) so the
+ // next attempt doesn't collide on either dag_run_id or logical_date.
+ if (response.status() === 409 && options?.runId === undefined) {
+ dagRunId = uniqueRunId(dagId);
+ }
+
+ throw new Error(`Dag run trigger failed (${response.status()})`);
+ }
+
+ const json = (await response.json()) as { logical_date?: string } &
DagRunResponse;
+
+ resultLogicalDate = json.logical_date ?? logicalDate;
+ }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
+
+ return { dagRunId, logicalDate: resultLogicalDate };
+}
+
+/**
+ * Create a Dag run via the API.
+ */
+export async function apiCreateDagRun(source: RequestLike, dagId: string,
data: DagRunData): Promise<string> {
+ const request = getRequestContext(source);
+ let resultRunId = data.dag_run_id;
+
+ // Track fallback values that are regenerated on 409 collisions,
+ // without mutating the caller's `data` parameter.
+ let retryRunId: string | undefined;
+ let retryLogicalDate: string | undefined;
+
+ await expect(async () => {
+ const runId = retryRunId ?? data.dag_run_id;
+ const logicalDate = retryLogicalDate ?? data.logical_date;
+
+ const response = await
request.post(`${baseUrl}/api/v2/dags/${dagId}/dagRuns`, {
+ data: {
+ conf: data.conf ?? {},
+ dag_run_id: runId,
+ logical_date: logicalDate,
+ note: data.note ?? "e2e test",
+ },
+ headers: { "Content-Type": "application/json" },
+ timeout: 10_000,
+ });
+
+ if (!response.ok()) {
+ // On 409, generate fresh dag_run_id and logical_date for the next retry.
+ if (response.status() === 409) {
+ retryRunId = uniqueRunId(dagId);
+ retryLogicalDate = new Date().toISOString();
+ }
+
+ throw new Error(`Dag run creation failed (${response.status()})`);
+ }
+
+ const json = (await response.json()) as DagRunResponse;
+
+ resultRunId = json.dag_run_id;
+ }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
+
+ return resultRunId;
+}
+
+/**
+ * Set a Dag run's state via the API.
+ */
+export async function apiSetDagRunState(
+ source: RequestLike,
+ options: { dagId: string; runId: string; state: "failed" | "queued" |
"success" },
+): Promise<void> {
+ const { dagId, runId, state } = options;
+ const request = getRequestContext(source);
+
+ await expect(async () => {
+ const response = await
request.patch(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
+ data: { state },
+ headers: { "Content-Type": "application/json" },
+ timeout: 10_000,
+ });
+
+ if (response.status() !== 409 && !response.ok()) {
+ throw new Error(`Set Dag run state failed (${response.status()})`);
+ }
+ }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
+}
+
+/**
+ * Poll the API until the Dag run reaches the expected state.
+ */
+export async function waitForDagRunStatus(
+ source: RequestLike,
+ options: { dagId: string; expectedState: string; runId: string; timeout?:
number },
+): Promise<void> {
+ const { dagId, expectedState, runId } = options;
+ const request = getRequestContext(source);
+
+ const timeout = options.timeout ?? 120_000;
+
+ await expect
+ .poll(
+ async () => {
+ try {
+ const response = await
request.get(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
+ timeout: 10_000,
+ });
+
+ if (!response.ok()) {
+ return `unknown (HTTP ${response.status()})`;
+ }
+
+ const data = (await response.json()) as DagRunResponse;
+
+ if (data.state === "failed" && expectedState !== "failed") {
+ throw new Error(`Dag run ${runId} failed unexpectedly`);
+ }
+
+ return data.state;
+ } catch (error) {
+ // Re-throw intentional failures (unexpected "failed" state).
+ if (error instanceof Error && error.message.includes("failed
unexpectedly")) {
+ throw error;
+ }
+
+ // Transient network/timeout errors — retry on next interval.
+ return `unknown (${error instanceof Error ? error.message : "network
error"})`;
+ }
+ },
+ {
+ intervals: [5000],
+ message: `Dag run ${runId} did not reach state "${expectedState}"
within ${timeout}ms`,
+ timeout,
+ },
+ )
+ .toBe(expectedState);
+}
+
+/**
+ * Poll the API until a task instance reaches the expected state.
+ * Unlike waitForDagRunStatus, this targets a specific task within a Dag run.
+ */
+export async function waitForTaskInstanceState(
+ source: RequestLike,
+ options: {
+ dagId: string;
+ expectedState: string;
+ runId: string;
+ taskId: string;
+ timeout?: number;
+ },
+): Promise<void> {
+ const { dagId, expectedState, runId, taskId } = options;
+ const request = getRequestContext(source);
+
+ const timeout = options.timeout ?? 120_000;
+ const terminalStates = new Set(["success", "failed", "skipped", "removed",
"upstream_failed"]);
+
+ await expect
+ .poll(
+ async () => {
+ try {
+ const response = await request.get(
+
`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances/${taskId}`,
+ { timeout: 10_000 },
+ );
+
+ if (!response.ok()) {
+ return "unknown";
+ }
+
+ const data = (await response.json()) as { state: string };
+ const { state } = data;
+ const expected = expectedState.toLowerCase();
+
+ if (state !== expected && terminalStates.has(state)) {
+ throw new Error(`Task ${taskId} reached terminal state "${state}"
instead of "${expected}"`);
+ }
+
+ return state;
+ } catch (error) {
+ if (error instanceof Error && error.message.includes("terminal
state")) {
+ throw error;
+ }
+
+ return "unknown";
+ }
+ },
+ {
+ intervals: [3000, 5000],
+ message: `Task ${taskId} did not reach state "${expectedState}" within
${timeout}ms`,
+ timeout,
+ },
+ )
+ .toBe(expectedState.toLowerCase());
+}
+
+/** Delete a Dag run via the API. 404 is treated as success. */
+export async function apiDeleteDagRun(source: RequestLike, dagId: string,
runId: string): Promise<void> {
+ const request = getRequestContext(source);
+
+ const response = await
request.delete(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
+ timeout: 10_000,
+ });
+
+ // 404 = already deleted by another worker or cleanup; acceptable.
+ if (response.status() === 404) {
+ return;
+ }
+
+ if (!response.ok()) {
+ const body = await response.text();
+
+ throw new Error(`Dag run deletion failed (${response.status()}): ${body}`);
+ }
+}
+
+/**
+ * Delete a Dag run, logging (not throwing) unexpected errors.
+ * Use this in fixture teardown where cleanup must not abort the loop.
+ * 404 is already handled inside `apiDeleteDagRun`.
+ *
+ * Strategy: force-fail the run first so the server doesn't wait for
+ * running tasks during deletion, then delete with one retry on timeout.
+ */
+export async function safeCleanupDagRun(source: RequestLike, dagId: string,
runId: string): Promise<void> {
+ try {
+ await apiSetDagRunState(source, { dagId, runId, state: "failed" });
+ } catch {
+ // Run may already be terminal or deleted — ignore.
+ }
+
+ for (let attempt = 0; attempt < 2; attempt++) {
+ try {
+ await apiDeleteDagRun(source, dagId, runId);
+
+ return;
+ } catch (error) {
+ const message = error instanceof Error ? error.message : String(error);
+ const isTimeout = message.includes("Timeout");
+
+ if (isTimeout && attempt === 0) {
+ continue;
+ }
+
+ console.warn(`[e2e cleanup] Failed to delete Dag run ${dagId}/${runId}:
${message}`);
+
+ return;
+ }
+ }
+}
diff --git a/airflow-core/src/airflow/ui/tests/e2e/utils/api/hitl.ts
b/airflow-core/src/airflow/ui/tests/e2e/utils/api/hitl.ts
new file mode 100644
index 00000000000..de3113f9c9a
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/utils/api/hitl.ts
@@ -0,0 +1,194 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * HITL (Human-in-the-Loop) API helpers and end-to-end scenario.
+ */
+import { expect } from "@playwright/test";
+
+import { baseUrl, getRequestContext, type RequestLike } from "../shared";
+import { apiTriggerDagRun, waitForDagReady, waitForDagRunStatus,
waitForTaskInstanceState } from "./dag-runs";
+
+/**
+ * Respond to a HITL (Human-in-the-Loop) task via the API.
+ * 409 is treated as success (already responded).
+ */
+export async function apiRespondToHITL(
+ source: RequestLike,
+ options: {
+ chosenOptions: Array<string>;
+ dagId: string;
+ mapIndex?: number;
+ paramsInput?: Record<string, unknown>;
+ runId: string;
+ taskId: string;
+ },
+): Promise<void> {
+ const { chosenOptions, dagId, runId, taskId } = options;
+ const mapIndex = options.mapIndex ?? -1;
+ const paramsInput = options.paramsInput ?? {};
+ const request = getRequestContext(source);
+
+ await expect(async () => {
+ const response = await request.patch(
+
`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances/${taskId}/${mapIndex}/hitlDetails`,
+ {
+ data: { chosen_options: chosenOptions, params_input: paramsInput },
+ headers: { "Content-Type": "application/json" },
+ timeout: 10_000,
+ },
+ );
+
+ // 409 = already responded; acceptable.
+ if (response.status() !== 409 && !response.ok()) {
+ throw new Error(`HITL response failed (${response.status()})`);
+ }
+ }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
+}
+
+/**
+ * Run the full HITL flow entirely via API — no browser needed.
+ *
+ * The example_hitl_operator Dag has 4 parallel HITL tasks, then an approval
+ * task, then a branch task. This function triggers the Dag, responds to each
+ * task via the API, and waits for the Dag run to complete.
+ */
+export async function setupHITLFlowViaAPI(
+ source: RequestLike,
+ dagId: string,
+ approve: boolean,
+): Promise<string> {
+ const request = getRequestContext(source);
+
+ await waitForDagReady(request, dagId);
+ await request.patch(`${baseUrl}/api/v2/dags/${dagId}`, { data: { is_paused:
false } });
+
+ const { dagRunId } = await apiTriggerDagRun(request, dagId);
+
+ // wait_for_default_option auto-resolves (1s timeout, defaults=["option 7"]).
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ taskId: "wait_for_default_option",
+ });
+
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "awaiting_input",
+ runId: dagRunId,
+ taskId: "wait_for_input",
+ });
+ await apiRespondToHITL(request, {
+ chosenOptions: ["OK"],
+ dagId,
+ paramsInput: { information: "Approved by test" },
+ runId: dagRunId,
+ taskId: "wait_for_input",
+ });
+
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "awaiting_input",
+ runId: dagRunId,
+ taskId: "wait_for_option",
+ });
+ await apiRespondToHITL(request, {
+ chosenOptions: ["option 1"],
+ dagId,
+ runId: dagRunId,
+ taskId: "wait_for_option",
+ });
+
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "awaiting_input",
+ runId: dagRunId,
+ taskId: "wait_for_multiple_options",
+ });
+ await apiRespondToHITL(request, {
+ chosenOptions: ["option 4", "option 5"],
+ dagId,
+ runId: dagRunId,
+ taskId: "wait_for_multiple_options",
+ });
+
+ // Wait for all parallel tasks to succeed before the approval task starts.
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ taskId: "wait_for_input",
+ });
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ taskId: "wait_for_option",
+ });
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ taskId: "wait_for_multiple_options",
+ });
+
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "awaiting_input",
+ runId: dagRunId,
+ taskId: "valid_input_and_options",
+ });
+ await apiRespondToHITL(request, {
+ chosenOptions: [approve ? "Approve" : "Reject"],
+ dagId,
+ runId: dagRunId,
+ taskId: "valid_input_and_options",
+ });
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ taskId: "valid_input_and_options",
+ });
+
+ if (approve) {
+ await waitForTaskInstanceState(request, {
+ dagId,
+ expectedState: "awaiting_input",
+ runId: dagRunId,
+ taskId: "choose_a_branch_to_run",
+ });
+ await apiRespondToHITL(request, {
+ chosenOptions: ["task_1"],
+ dagId,
+ runId: dagRunId,
+ taskId: "choose_a_branch_to_run",
+ });
+ }
+
+ await waitForDagRunStatus(request, {
+ dagId,
+ expectedState: "success",
+ runId: dagRunId,
+ timeout: 120_000,
+ });
+
+ return dagRunId;
+}
diff --git a/airflow-core/src/airflow/ui/tests/e2e/utils/api/variables.ts
b/airflow-core/src/airflow/ui/tests/e2e/utils/api/variables.ts
new file mode 100644
index 00000000000..62ea62d6f76
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/utils/api/variables.ts
@@ -0,0 +1,66 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Variable API helpers — create and delete Airflow variables.
+ */
+import { expect } from "@playwright/test";
+
+import { baseUrl, getRequestContext, type RequestLike } from "../shared";
+
+/** Create a variable via the API. 409 is treated as success. */
+export async function apiCreateVariable(
+ source: RequestLike,
+ options: { description?: string; key: string; value: string },
+): Promise<void> {
+ const { description, key, value } = options;
+ const request = getRequestContext(source);
+
+ await expect(async () => {
+ const response = await request.post(`${baseUrl}/api/v2/variables`, {
+ data: { description: description ?? "", key, value },
+ headers: { "Content-Type": "application/json" },
+ timeout: 10_000,
+ });
+
+ if (response.status() !== 409 && !response.ok()) {
+ throw new Error(`Variable creation failed (${response.status()})`);
+ }
+ }).toPass({ intervals: [2000, 3000, 5000], timeout: 90_000 });
+}
+
+/** Delete a variable via the API. 404 is treated as success. */
+export async function apiDeleteVariable(source: RequestLike, key: string):
Promise<void> {
+ const request = getRequestContext(source);
+
+ const response = await
request.delete(`${baseUrl}/api/v2/variables/${encodeURIComponent(key)}`, {
+ timeout: 10_000,
+ });
+
+ // 404 = already deleted by another worker or cleanup; acceptable.
+ if (response.status() === 404) {
+ return;
+ }
+
+ if (!response.ok()) {
+ const body = await response.text();
+
+ throw new Error(`Variable deletion failed (${response.status()}):
${body}`);
+ }
+}
diff --git a/airflow-core/src/airflow/ui/tests/e2e/fixtures/dashboard-data.ts
b/airflow-core/src/airflow/ui/tests/e2e/utils/shared.ts
similarity index 54%
copy from airflow-core/src/airflow/ui/tests/e2e/fixtures/dashboard-data.ts
copy to airflow-core/src/airflow/ui/tests/e2e/utils/shared.ts
index a574fd0b3b8..3e08d9ee7ce 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/fixtures/dashboard-data.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/utils/shared.ts
@@ -18,25 +18,25 @@
*/
/**
- * Dashboard data fixture — tracks UI-triggered Dag runs for cleanup.
+ * Shared primitives for E2E API helpers.
*/
+import type { APIRequestContext, Page } from "@playwright/test";
+import { randomUUID } from "node:crypto";
import { testConfig } from "playwright.config";
-import { test as base } from "tests/e2e/fixtures";
-import { safeCleanupDagRun } from "tests/e2e/utils/test-helpers";
-export type DagRunCleanup = {
- track: (runId: string) => void;
-};
+export type RequestLike = APIRequestContext | Page;
-/* eslint-disable react-hooks/rules-of-hooks -- Playwright's `use` is not a
React Hook. */
-export const test = base.extend<{ dagRunCleanup: DagRunCleanup }>({
- dagRunCleanup: async ({ authenticatedRequest }, use) => {
- const trackedRunIds: Array<string> = [];
+export function getRequestContext(source: RequestLike): APIRequestContext {
+ if ("request" in source) {
+ return source.request;
+ }
- await use({ track: (runId: string) => trackedRunIds.push(runId) });
+ return source;
+}
- for (const runId of trackedRunIds) {
- await safeCleanupDagRun(authenticatedRequest, testConfig.testDag.id,
runId);
- }
- },
-});
+export const { baseUrl } = testConfig.connection;
+
+/** Generate a unique run ID: `{prefix}_{uuid8}`. */
+export function uniqueRunId(prefix: string): string {
+ return `${prefix}_${randomUUID().slice(0, 8)}`;
+}
diff --git a/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
b/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
deleted file mode 100644
index b0b159fce44..00000000000
--- a/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
+++ /dev/null
@@ -1,802 +0,0 @@
-/*!
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Shared E2E test utilities for data isolation and API-based setup.
- */
-import { expect, type APIRequestContext, type Locator, type Page } from
"@playwright/test";
-import { randomUUID } from "node:crypto";
-import { testConfig } from "playwright.config";
-
-type RequestLike = APIRequestContext | Page;
-
-type DagRunData = {
- conf?: Record<string, unknown>;
- dag_run_id: string;
- logical_date: string;
- note?: string;
-};
-
-type DagRunResponse = {
- dag_run_id: string;
- state: string;
-};
-
-function getRequestContext(source: RequestLike): APIRequestContext {
- if ("request" in source) {
- return source.request;
- }
-
- return source;
-}
-
-const { baseUrl } = testConfig.connection;
-
-/** Generate a unique run ID: `{prefix}_{uuid8}`. */
-export function uniqueRunId(prefix: string): string {
- return `${prefix}_${randomUUID().slice(0, 8)}`;
-}
-
-/**
- * Poll GET /api/v2/dags/{dagId} until 200 — waits for Dag to be parsed.
- */
-export async function waitForDagReady(
- source: RequestLike,
- dagId: string,
- options?: { timeout?: number },
-): Promise<void> {
- const request = getRequestContext(source);
-
- const timeout = options?.timeout ?? 120_000;
-
- await expect
- .poll(
- async () => {
- try {
- const response = await
request.get(`${baseUrl}/api/v2/dags/${dagId}`, { timeout: 10_000 });
-
- return response.ok();
- } catch {
- return false;
- }
- },
- { intervals: [2000], timeout },
- )
- .toBe(true);
-}
-
-/**
- * Trigger a Dag run with an auto-generated unique run ID.
- * Returns the dagRunId and logicalDate for targeted polling.
- */
-export async function apiTriggerDagRun(
- source: RequestLike,
- dagId: string,
- options?: { runId?: string },
-): Promise<{ dagRunId: string; logicalDate: string }> {
- const request = getRequestContext(source);
-
- let dagRunId = options?.runId ?? uniqueRunId(dagId);
- let resultLogicalDate = new Date().toISOString();
-
- await expect(async () => {
- // Generate a fresh logicalDate on each attempt so that a 409
- // (logical_date collision from a parallel worker) is recoverable.
- const logicalDate = new Date().toISOString();
-
- const response = await
request.post(`${baseUrl}/api/v2/dags/${dagId}/dagRuns`, {
- data: {
- dag_run_id: dagRunId,
- logical_date: logicalDate,
- note: "e2e test",
- },
- headers: { "Content-Type": "application/json" },
- timeout: 10_000,
- });
-
- if (!response.ok()) {
- // On 409, regenerate dag_run_id (unless caller pinned it) so the
- // next attempt doesn't collide on either dag_run_id or logical_date.
- if (response.status() === 409 && options?.runId === undefined) {
- dagRunId = uniqueRunId(dagId);
- }
-
- throw new Error(`Dag run trigger failed (${response.status()})`);
- }
-
- const json = (await response.json()) as { logical_date?: string } &
DagRunResponse;
-
- resultLogicalDate = json.logical_date ?? logicalDate;
- }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
-
- return { dagRunId, logicalDate: resultLogicalDate };
-}
-
-/**
- * Create a Dag run via the API.
- */
-export async function apiCreateDagRun(source: RequestLike, dagId: string,
data: DagRunData): Promise<string> {
- const request = getRequestContext(source);
- let resultRunId = data.dag_run_id;
-
- // Track fallback values that are regenerated on 409 collisions,
- // without mutating the caller's `data` parameter.
- let retryRunId: string | undefined;
- let retryLogicalDate: string | undefined;
-
- await expect(async () => {
- const runId = retryRunId ?? data.dag_run_id;
- const logicalDate = retryLogicalDate ?? data.logical_date;
-
- const response = await
request.post(`${baseUrl}/api/v2/dags/${dagId}/dagRuns`, {
- data: {
- conf: data.conf ?? {},
- dag_run_id: runId,
- logical_date: logicalDate,
- note: data.note ?? "e2e test",
- },
- headers: { "Content-Type": "application/json" },
- timeout: 10_000,
- });
-
- if (!response.ok()) {
- // On 409, generate fresh dag_run_id and logical_date for the next retry.
- if (response.status() === 409) {
- retryRunId = uniqueRunId(dagId);
- retryLogicalDate = new Date().toISOString();
- }
-
- throw new Error(`Dag run creation failed (${response.status()})`);
- }
-
- const json = (await response.json()) as DagRunResponse;
-
- resultRunId = json.dag_run_id;
- }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
-
- return resultRunId;
-}
-
-/**
- * Set a Dag run's state via the API.
- */
-export async function apiSetDagRunState(
- source: RequestLike,
- options: { dagId: string; runId: string; state: "failed" | "queued" |
"success" },
-): Promise<void> {
- const { dagId, runId, state } = options;
- const request = getRequestContext(source);
-
- await expect(async () => {
- const response = await
request.patch(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
- data: { state },
- headers: { "Content-Type": "application/json" },
- timeout: 10_000,
- });
-
- if (response.status() !== 409 && !response.ok()) {
- throw new Error(`Set Dag run state failed (${response.status()})`);
- }
- }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
-}
-
-/**
- * Poll the API until the Dag run reaches the expected state.
- */
-export async function waitForDagRunStatus(
- source: RequestLike,
- options: { dagId: string; expectedState: string; runId: string; timeout?:
number },
-): Promise<void> {
- const { dagId, expectedState, runId } = options;
- const request = getRequestContext(source);
-
- const timeout = options.timeout ?? 120_000;
-
- await expect
- .poll(
- async () => {
- try {
- const response = await
request.get(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
- timeout: 10_000,
- });
-
- if (!response.ok()) {
- return `unknown (HTTP ${response.status()})`;
- }
-
- const data = (await response.json()) as DagRunResponse;
-
- if (data.state === "failed" && expectedState !== "failed") {
- throw new Error(`Dag run ${runId} failed unexpectedly`);
- }
-
- return data.state;
- } catch (error) {
- // Re-throw intentional failures (unexpected "failed" state).
- if (error instanceof Error && error.message.includes("failed
unexpectedly")) {
- throw error;
- }
-
- // Transient network/timeout errors — retry on next interval.
- return `unknown (${error instanceof Error ? error.message : "network
error"})`;
- }
- },
- {
- intervals: [5000],
- message: `Dag run ${runId} did not reach state "${expectedState}"
within ${timeout}ms`,
- timeout,
- },
- )
- .toBe(expectedState);
-}
-
-/**
- * Poll the API until a task instance reaches the expected state.
- * Unlike waitForDagRunStatus, this targets a specific task within a Dag run.
- */
-export async function waitForTaskInstanceState(
- source: RequestLike,
- options: {
- dagId: string;
- expectedState: string;
- runId: string;
- taskId: string;
- timeout?: number;
- },
-): Promise<void> {
- const { dagId, expectedState, runId, taskId } = options;
- const request = getRequestContext(source);
-
- const timeout = options.timeout ?? 120_000;
- const terminalStates = new Set(["success", "failed", "skipped", "removed",
"upstream_failed"]);
-
- await expect
- .poll(
- async () => {
- try {
- const response = await request.get(
-
`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances/${taskId}`,
- { timeout: 10_000 },
- );
-
- if (!response.ok()) {
- return "unknown";
- }
-
- const data = (await response.json()) as { state: string };
- const { state } = data;
- const expected = expectedState.toLowerCase();
-
- if (state !== expected && terminalStates.has(state)) {
- throw new Error(`Task ${taskId} reached terminal state "${state}"
instead of "${expected}"`);
- }
-
- return state;
- } catch (error) {
- if (error instanceof Error && error.message.includes("terminal
state")) {
- throw error;
- }
-
- return "unknown";
- }
- },
- {
- intervals: [3000, 5000],
- message: `Task ${taskId} did not reach state "${expectedState}" within
${timeout}ms`,
- timeout,
- },
- )
- .toBe(expectedState.toLowerCase());
-}
-
-/**
- * Respond to a HITL (Human-in-the-Loop) task via the API.
- * 409 is treated as success (already responded).
- */
-export async function apiRespondToHITL(
- source: RequestLike,
- options: {
- chosenOptions: Array<string>;
- dagId: string;
- mapIndex?: number;
- paramsInput?: Record<string, unknown>;
- runId: string;
- taskId: string;
- },
-): Promise<void> {
- const { chosenOptions, dagId, runId, taskId } = options;
- const mapIndex = options.mapIndex ?? -1;
- const paramsInput = options.paramsInput ?? {};
- const request = getRequestContext(source);
-
- await expect(async () => {
- const response = await request.patch(
-
`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances/${taskId}/${mapIndex}/hitlDetails`,
- {
- data: { chosen_options: chosenOptions, params_input: paramsInput },
- headers: { "Content-Type": "application/json" },
- timeout: 10_000,
- },
- );
-
- // 409 = already responded; acceptable.
- if (response.status() !== 409 && !response.ok()) {
- throw new Error(`HITL response failed (${response.status()})`);
- }
- }).toPass({ intervals: [2000, 3000, 5000], timeout: 60_000 });
-}
-
-/**
- * Run the full HITL flow entirely via API — no browser needed.
- *
- * The example_hitl_operator Dag has 4 parallel HITL tasks, then an approval
- * task, then a branch task. This function triggers the Dag, responds to each
- * task via the API, and waits for the Dag run to complete.
- */
-export async function setupHITLFlowViaAPI(
- source: RequestLike,
- dagId: string,
- approve: boolean,
-): Promise<string> {
- const request = getRequestContext(source);
-
- await waitForDagReady(request, dagId);
- await request.patch(`${baseUrl}/api/v2/dags/${dagId}`, { data: { is_paused:
false } });
-
- const { dagRunId } = await apiTriggerDagRun(request, dagId);
-
- // wait_for_default_option auto-resolves (1s timeout, defaults=["option 7"]).
- await waitForTaskInstanceState(request, {
- dagId,
- expectedState: "success",
- runId: dagRunId,
- taskId: "wait_for_default_option",
- });
-
- await waitForTaskInstanceState(request, {
- dagId,
- expectedState: "awaiting_input",
- runId: dagRunId,
- taskId: "wait_for_input",
- });
- await apiRespondToHITL(request, {
- chosenOptions: ["OK"],
- dagId,
- paramsInput: { information: "Approved by test" },
- runId: dagRunId,
- taskId: "wait_for_input",
- });
-
- await waitForTaskInstanceState(request, {
- dagId,
- expectedState: "awaiting_input",
- runId: dagRunId,
- taskId: "wait_for_option",
- });
- await apiRespondToHITL(request, {
- chosenOptions: ["option 1"],
- dagId,
- runId: dagRunId,
- taskId: "wait_for_option",
- });
-
- await waitForTaskInstanceState(request, {
- dagId,
- expectedState: "awaiting_input",
- runId: dagRunId,
- taskId: "wait_for_multiple_options",
- });
- await apiRespondToHITL(request, {
- chosenOptions: ["option 4", "option 5"],
- dagId,
- runId: dagRunId,
- taskId: "wait_for_multiple_options",
- });
-
- // Wait for all parallel tasks to succeed before the approval task starts.
- await waitForTaskInstanceState(request, {
- dagId,
- expectedState: "success",
- runId: dagRunId,
- taskId: "wait_for_input",
- });
- await waitForTaskInstanceState(request, {
- dagId,
- expectedState: "success",
- runId: dagRunId,
- taskId: "wait_for_option",
- });
- await waitForTaskInstanceState(request, {
- dagId,
- expectedState: "success",
- runId: dagRunId,
- taskId: "wait_for_multiple_options",
- });
-
- await waitForTaskInstanceState(request, {
- dagId,
- expectedState: "awaiting_input",
- runId: dagRunId,
- taskId: "valid_input_and_options",
- });
- await apiRespondToHITL(request, {
- chosenOptions: [approve ? "Approve" : "Reject"],
- dagId,
- runId: dagRunId,
- taskId: "valid_input_and_options",
- });
- await waitForTaskInstanceState(request, {
- dagId,
- expectedState: "success",
- runId: dagRunId,
- taskId: "valid_input_and_options",
- });
-
- if (approve) {
- await waitForTaskInstanceState(request, {
- dagId,
- expectedState: "awaiting_input",
- runId: dagRunId,
- taskId: "choose_a_branch_to_run",
- });
- await apiRespondToHITL(request, {
- chosenOptions: ["task_1"],
- dagId,
- runId: dagRunId,
- taskId: "choose_a_branch_to_run",
- });
- }
-
- await waitForDagRunStatus(request, {
- dagId,
- expectedState: "success",
- runId: dagRunId,
- timeout: 120_000,
- });
-
- return dagRunId;
-}
-
-/** Delete a Dag run via the API. 404 is treated as success. */
-export async function apiDeleteDagRun(source: RequestLike, dagId: string,
runId: string): Promise<void> {
- const request = getRequestContext(source);
-
- const response = await
request.delete(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
- timeout: 10_000,
- });
-
- // 404 = already deleted by another worker or cleanup; acceptable.
- if (response.status() === 404) {
- return;
- }
-
- if (!response.ok()) {
- const body = await response.text();
-
- throw new Error(`Dag run deletion failed (${response.status()}): ${body}`);
- }
-}
-
-/**
- * Delete a Dag run, logging (not throwing) unexpected errors.
- * Use this in fixture teardown where cleanup must not abort the loop.
- * 404 is already handled inside `apiDeleteDagRun`.
- *
- * Strategy: force-fail the run first so the server doesn't wait for
- * running tasks during deletion, then delete with one retry on timeout.
- */
-export async function safeCleanupDagRun(source: RequestLike, dagId: string,
runId: string): Promise<void> {
- try {
- await apiSetDagRunState(source, { dagId, runId, state: "failed" });
- } catch {
- // Run may already be terminal or deleted — ignore.
- }
-
- for (let attempt = 0; attempt < 2; attempt++) {
- try {
- await apiDeleteDagRun(source, dagId, runId);
-
- return;
- } catch (error) {
- const message = error instanceof Error ? error.message : String(error);
- const isTimeout = message.includes("Timeout");
-
- if (isTimeout && attempt === 0) {
- continue;
- }
-
- console.warn(`[e2e cleanup] Failed to delete Dag run ${dagId}/${runId}:
${message}`);
-
- return;
- }
- }
-}
-
-/** Create a variable via the API. 409 is treated as success. */
-export async function apiCreateVariable(
- source: RequestLike,
- options: { description?: string; key: string; value: string },
-): Promise<void> {
- const { description, key, value } = options;
- const request = getRequestContext(source);
-
- await expect(async () => {
- const response = await request.post(`${baseUrl}/api/v2/variables`, {
- data: { description: description ?? "", key, value },
- headers: { "Content-Type": "application/json" },
- timeout: 10_000,
- });
-
- if (response.status() !== 409 && !response.ok()) {
- throw new Error(`Variable creation failed (${response.status()})`);
- }
- }).toPass({ intervals: [2000, 3000, 5000], timeout: 90_000 });
-}
-
-/** Delete a variable via the API. 404 is treated as success. */
-export async function apiDeleteVariable(source: RequestLike, key: string):
Promise<void> {
- const request = getRequestContext(source);
-
- const response = await
request.delete(`${baseUrl}/api/v2/variables/${encodeURIComponent(key)}`, {
- timeout: 10_000,
- });
-
- // 404 = already deleted by another worker or cleanup; acceptable.
- if (response.status() === 404) {
- return;
- }
-
- if (!response.ok()) {
- const body = await response.text();
-
- throw new Error(`Variable deletion failed (${response.status()}):
${body}`);
- }
-}
-
-/** Cancel a single backfill via the API. 409 (already completed) is treated
as success. */
-export async function apiCancelBackfill(source: RequestLike, backfillId:
number): Promise<void> {
- const request = getRequestContext(source);
-
- const response = await
request.put(`${baseUrl}/api/v2/backfills/${backfillId}/cancel`, {
- timeout: 10_000,
- });
-
- if (response.status() !== 200 && response.status() !== 409) {
- throw new Error(`Cancel backfill failed (${response.status()})`);
- }
-}
-
-/** Cancel all active (non-completed) backfills for a Dag. */
-export async function apiCancelAllActiveBackfills(source: RequestLike, dagId:
string): Promise<void> {
- const request = getRequestContext(source);
-
- const response = await
request.get(`${baseUrl}/api/v2/backfills?dag_id=${dagId}&limit=100`, {
- timeout: 10_000,
- });
-
- if (!response.ok()) {
- throw new Error(`List backfills failed (${response.status()})`);
- }
-
- const data = (await response.json()) as { backfills: Array<{ completed_at:
string | null; id: number }> };
-
- for (const backfill of data.backfills) {
- if (backfill.completed_at === null) {
- await apiCancelBackfill(source, backfill.id);
- }
- }
-}
-
-/** Poll until all backfills for a Dag are completed. */
-export async function apiWaitForNoActiveBackfill(
- source: RequestLike,
- dagId: string,
- timeout: number = 120_000,
-): Promise<void> {
- const request = getRequestContext(source);
-
- await expect
- .poll(
- async () => {
- try {
- const response = await
request.get(`${baseUrl}/api/v2/backfills?dag_id=${dagId}&limit=100`, {
- timeout: 10_000,
- });
-
- if (!response.ok()) {
- return false;
- }
-
- const data = (await response.json()) as {
- backfills: Array<{ completed_at: string | null }>;
- };
-
- return data.backfills.every((b) => b.completed_at !== null);
- } catch {
- return false;
- }
- },
- {
- intervals: [2000, 5000, 10_000],
- message: `Active backfills for Dag ${dagId} did not clear within
${timeout}ms`,
- timeout,
- },
- )
- .toBeTruthy();
-}
-
-/** Poll until a backfill reaches completed state. */
-export async function apiWaitForBackfillComplete(
- source: RequestLike,
- backfillId: number,
- timeout: number = 120_000,
-): Promise<void> {
- const request = getRequestContext(source);
-
- await expect
- .poll(
- async () => {
- try {
- const response = await
request.get(`${baseUrl}/api/v2/backfills/${backfillId}`, {
- timeout: 10_000,
- });
-
- if (!response.ok()) {
- return false;
- }
-
- const data = (await response.json()) as { completed_at: string |
null };
-
- return data.completed_at !== null;
- } catch {
- return false;
- }
- },
- {
- intervals: [2000, 5000, 10_000],
- message: `Backfill ${backfillId} did not complete within ${timeout}ms`,
- timeout,
- },
- )
- .toBeTruthy();
-}
-
-/** Create a backfill via the API. On 409, cancels active backfills and
retries once. */
-export async function apiCreateBackfill(
- source: RequestLike,
- dagId: string,
- options: {
- fromDate: string;
- maxActiveRuns?: number;
- reprocessBehavior?: string;
- toDate: string;
- },
-): Promise<number> {
- const request = getRequestContext(source);
- const { fromDate, maxActiveRuns, reprocessBehavior = "none", toDate } =
options;
-
- const body: Record<string, unknown> = {
- dag_id: dagId,
- from_date: fromDate,
- reprocess_behavior: reprocessBehavior,
- to_date: toDate,
- };
-
- if (maxActiveRuns !== undefined) {
- body.max_active_runs = maxActiveRuns;
- }
-
- const response = await request.post(`${baseUrl}/api/v2/backfills`, {
- data: body,
- headers: { "Content-Type": "application/json" },
- timeout: 10_000,
- });
-
- if (response.status() === 409) {
- await apiCancelAllActiveBackfills(source, dagId);
- await apiWaitForNoActiveBackfill(source, dagId, 30_000);
-
- const retryResponse = await request.post(`${baseUrl}/api/v2/backfills`, {
- data: body,
- headers: { "Content-Type": "application/json" },
- timeout: 10_000,
- });
-
- if (!retryResponse.ok()) {
- throw new Error(`Backfill creation retry failed
(${retryResponse.status()})`);
- }
-
- return ((await retryResponse.json()) as { id: number }).id;
- }
-
- if (!response.ok()) {
- throw new Error(`Backfill creation failed (${response.status()})`);
- }
-
- return ((await response.json()) as { id: number }).id;
-}
-
-/**
- * Wait for a table (by testId) to load and show at least one row or an empty
message.
- */
-export async function waitForTableLoad(
- page: Page,
- options?: { checkSkeletons?: boolean; testId?: string; timeout?: number },
-): Promise<void> {
- const testId = options?.testId ?? "table-list";
- const timeout = options?.timeout ?? 30_000;
-
- const table = page.getByTestId(testId);
-
- await expect(table).toBeVisible({ timeout });
-
- // Skip skeleton check by default — XComs uses [data-scope="skeleton"] for
- // lazy-loaded cell content, not table loading, which would cause false
waits.
- if (options?.checkSkeletons) {
- await expect(table.locator('[data-testid="skeleton"],
[data-scope="skeleton"]')).toHaveCount(0, {
- timeout,
- });
- }
-
- const firstRow = table.locator("tbody tr").first();
- const emptyMessage = page.getByText(/no .* found/i);
-
- await expect(firstRow.or(emptyMessage)).toBeVisible({ timeout });
-}
-
-/**
- * Wait for DOM row count to stabilize across consecutive measurements.
- * Uses 500ms intervals to give React concurrent rendering time to settle.
- */
-export async function waitForStableRowCount(
- rowLocator: Locator,
- options?: { timeout?: number },
-): Promise<number> {
- const timeout = options?.timeout ?? 10_000;
- const requiredStableChecks = 2;
- let lastStableCount = 0;
-
- await expect
- .poll(
- async () => {
- const counts: Array<number> = [];
-
- for (let i = 0; i < requiredStableChecks + 1; i++) {
- counts.push(await rowLocator.count());
-
- if (i < requiredStableChecks) {
- await new Promise((resolve) => setTimeout(resolve, 500));
- }
- }
-
- const first = counts[0] ?? 0;
- const allSame = counts.length > 0 && first > 0 && counts.every((c) =>
c === first);
-
- if (allSame) {
- lastStableCount = first;
- }
-
- return allSame;
- },
- { intervals: [500], timeout },
- )
- .toBe(true);
-
- return lastStableCount;
-}
diff --git a/airflow-core/src/airflow/ui/tests/e2e/utils/ui/waits.ts
b/airflow-core/src/airflow/ui/tests/e2e/utils/ui/waits.ts
new file mode 100644
index 00000000000..cc6a788c139
--- /dev/null
+++ b/airflow-core/src/airflow/ui/tests/e2e/utils/ui/waits.ts
@@ -0,0 +1,64 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * UI wait helpers for DOM stability and concurrent rendering.
+ */
+import { expect, type Locator } from "@playwright/test";
+
+/**
+ * Wait for DOM row count to stabilize across consecutive measurements.
+ * Uses 500ms intervals to give React concurrent rendering time to settle.
+ */
+export async function waitForStableRowCount(
+ rowLocator: Locator,
+ options?: { timeout?: number },
+): Promise<number> {
+ const timeout = options?.timeout ?? 10_000;
+ const requiredStableChecks = 2;
+ let lastStableCount = 0;
+
+ await expect
+ .poll(
+ async () => {
+ const counts: Array<number> = [];
+
+ for (let i = 0; i < requiredStableChecks + 1; i++) {
+ counts.push(await rowLocator.count());
+
+ if (i < requiredStableChecks) {
+ await new Promise((resolve) => setTimeout(resolve, 500));
+ }
+ }
+
+ const first = counts[0] ?? 0;
+ const allSame = counts.length > 0 && first > 0 && counts.every((c) =>
c === first);
+
+ if (allSame) {
+ lastStableCount = first;
+ }
+
+ return allSame;
+ },
+ { intervals: [500], timeout },
+ )
+ .toBe(true);
+
+ return lastStableCount;
+}