This is an automated email from the ASF dual-hosted git repository.
choo121600 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 91a994f4716 E2E: prevent Dag parsing races and improve Dag run cleanup
(#64540)
91a994f4716 is described below
commit 91a994f47169b6b8adec7df1ec3b28c0b26bb2c2
Author: Yeonguk Choo <[email protected]>
AuthorDate: Thu Apr 2 18:20:02 2026 +0900
E2E: prevent Dag parsing races and improve Dag run cleanup (#64540)
---
airflow-core/src/airflow/ui/playwright.config.ts | 2 +-
.../src/airflow/ui/tests/e2e/global-setup.ts | 22 ++++++-
.../src/airflow/ui/tests/e2e/global-teardown.ts | 4 +-
.../src/airflow/ui/tests/e2e/utils/test-helpers.ts | 73 +++++++++++++---------
4 files changed, 66 insertions(+), 35 deletions(-)
diff --git a/airflow-core/src/airflow/ui/playwright.config.ts
b/airflow-core/src/airflow/ui/playwright.config.ts
index 84e5e0d937b..a7632fdf54f 100644
--- a/airflow-core/src/airflow/ui/playwright.config.ts
+++ b/airflow-core/src/airflow/ui/playwright.config.ts
@@ -50,7 +50,7 @@ export const AUTH_FILE = path.join(currentDirname,
"playwright/.auth/user.json")
export default defineConfig({
expect: {
- timeout: 5000,
+ timeout: 10_000,
},
forbidOnly: process.env.CI !== undefined && process.env.CI !== "",
fullyParallel: true,
diff --git a/airflow-core/src/airflow/ui/tests/e2e/global-setup.ts
b/airflow-core/src/airflow/ui/tests/e2e/global-setup.ts
index 6157ddd52f2..b48163003e3 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/global-setup.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/global-setup.ts
@@ -16,12 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-import { chromium, firefox, webkit, type FullConfig } from "@playwright/test";
+import { chromium, firefox, request, webkit, type FullConfig } from
"@playwright/test";
import fs from "node:fs";
import path from "node:path";
import { AUTH_FILE, testConfig } from "../../playwright.config";
import { LoginPage } from "./pages/LoginPage";
+import { waitForDagReady } from "./utils/test-helpers";
const browsers = { chromium, firefox, webkit };
@@ -53,6 +54,25 @@ async function globalSetup(config: FullConfig) {
} finally {
await browser.close();
}
+
+ // Pre-warm: wait for all Dags used by E2E tests to be parsed before workers
start.
+ const apiContext = await request.newContext({
+ baseURL,
+ storageState: AUTH_FILE,
+ });
+
+ try {
+ await Promise.all(
+ [
+ testConfig.testDag.id,
+ testConfig.testDag.hitlId,
+ testConfig.xcomDag.id,
+ "example_python_operator",
+ ].map((dagId) => waitForDagReady(apiContext, dagId, { timeout: 300_000
})),
+ );
+ } finally {
+ await apiContext.dispose();
+ }
}
export default globalSetup;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts
b/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts
index aae6cc588aa..2c0c2d787ab 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts
@@ -48,8 +48,8 @@ async function globalTeardown() {
timeout: 10_000,
});
} catch (error) {
- console.warn(
- `[e2e teardown] Failed to re-pause DAG ${dagId}: ${error instanceof
Error ? error.message : String(error)}`,
+ console.debug(
+ `[e2e teardown] Could not re-pause DAG ${dagId}: ${error instanceof
Error ? error.message : String(error)}`,
);
}
}
diff --git a/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
b/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
index 3f651289567..62c5535b8e0 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
@@ -69,7 +69,7 @@ export async function waitForDagReady(
.poll(
async () => {
try {
- const response = await
request.get(`${baseUrl}/api/v2/dags/${dagId}`, { timeout: 30_000 });
+ const response = await
request.get(`${baseUrl}/api/v2/dags/${dagId}`, { timeout: 10_000 });
return response.ok();
} catch {
@@ -107,7 +107,7 @@ export async function apiTriggerDagRun(
note: "e2e test",
},
headers: { "Content-Type": "application/json" },
- timeout: 30_000,
+ timeout: 10_000,
});
if (!response.ok()) {
@@ -152,7 +152,7 @@ export async function apiCreateDagRun(source: RequestLike,
dagId: string, data:
note: data.note ?? "e2e test",
},
headers: { "Content-Type": "application/json" },
- timeout: 30_000,
+ timeout: 10_000,
});
if (!response.ok()) {
@@ -187,7 +187,7 @@ export async function apiSetDagRunState(
const response = await
request.patch(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
data: { state },
headers: { "Content-Type": "application/json" },
- timeout: 30_000,
+ timeout: 10_000,
});
if (response.status() !== 409 && !response.ok()) {
@@ -213,7 +213,7 @@ export async function waitForDagRunStatus(
async () => {
try {
const response = await
request.get(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
- timeout: 30_000,
+ timeout: 10_000,
});
if (!response.ok()) {
@@ -272,7 +272,7 @@ export async function waitForTaskInstanceState(
try {
const response = await request.get(
`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances/${taskId}`,
- { timeout: 30_000 },
+ { timeout: 10_000 },
);
if (!response.ok()) {
@@ -331,7 +331,7 @@ export async function apiRespondToHITL(
{
data: { chosen_options: chosenOptions, params_input: paramsInput },
headers: { "Content-Type": "application/json" },
- timeout: 30_000,
+ timeout: 10_000,
},
);
@@ -478,7 +478,7 @@ export async function apiDeleteDagRun(source: RequestLike,
dagId: string, runId:
const request = getRequestContext(source);
const response = await
request.delete(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
- timeout: 30_000,
+ timeout: 10_000,
});
// 404 = already deleted by another worker or cleanup; acceptable.
@@ -497,29 +497,40 @@ export async function apiDeleteDagRun(source:
RequestLike, dagId: string, runId:
* Delete a DAG run, logging (not throwing) unexpected errors.
* Use this in fixture teardown where cleanup must not abort the loop.
* 404 is already handled inside `apiDeleteDagRun`.
- * 409 (running state) is handled by force-failing the run first, then
retrying.
+ *
+ * Strategy: force-fail the run first so the server doesn't wait for
+ * running tasks during deletion, then delete with one retry on timeout.
*/
export async function safeCleanupDagRun(source: RequestLike, dagId: string,
runId: string): Promise<void> {
+ const request = getRequestContext(source);
+
try {
- await apiDeleteDagRun(source, dagId, runId);
- } catch (error) {
- const message = error instanceof Error ? error.message : String(error);
-
- // 409 = DAG run is still running — force-fail, then retry deletion.
- if (message.includes("409")) {
- try {
- await apiSetDagRunState(source, { dagId, runId, state: "failed" });
- await apiDeleteDagRun(source, dagId, runId);
- } catch (retryError) {
- console.warn(
- `[e2e cleanup] Retry failed for DAG run ${dagId}/${runId}:
${retryError instanceof Error ? retryError.message : String(retryError)}`,
- );
+ await request.patch(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
+ data: { state: "failed" },
+ headers: { "Content-Type": "application/json" },
+ timeout: 10_000,
+ });
+ } catch {
+ // Run may already be terminal or deleted — ignore.
+ }
+
+ for (let attempt = 0; attempt < 2; attempt++) {
+ try {
+ await apiDeleteDagRun(source, dagId, runId);
+
+ return;
+ } catch (error) {
+ const message = error instanceof Error ? error.message : String(error);
+ const isTimeout = message.includes("Timeout");
+
+ if (isTimeout && attempt === 0) {
+ continue;
}
+ console.warn(`[e2e cleanup] Failed to delete DAG run ${dagId}/${runId}:
${message}`);
+
return;
}
-
- console.warn(`[e2e cleanup] Failed to delete DAG run ${dagId}/${runId}:
${message}`);
}
}
@@ -549,7 +560,7 @@ export async function apiDeleteVariable(source:
RequestLike, key: string): Promi
const request = getRequestContext(source);
const response = await
request.delete(`${baseUrl}/api/v2/variables/${encodeURIComponent(key)}`, {
- timeout: 30_000,
+ timeout: 10_000,
});
// 404 = already deleted by another worker or cleanup; acceptable.
@@ -569,7 +580,7 @@ export async function apiCancelBackfill(source:
RequestLike, backfillId: number)
const request = getRequestContext(source);
const response = await
request.put(`${baseUrl}/api/v2/backfills/${backfillId}/cancel`, {
- timeout: 30_000,
+ timeout: 10_000,
});
if (response.status() !== 200 && response.status() !== 409) {
@@ -582,7 +593,7 @@ export async function apiCancelAllActiveBackfills(source:
RequestLike, dagId: st
const request = getRequestContext(source);
const response = await
request.get(`${baseUrl}/api/v2/backfills?dag_id=${dagId}&limit=100`, {
- timeout: 30_000,
+ timeout: 10_000,
});
if (!response.ok()) {
@@ -611,7 +622,7 @@ export async function apiWaitForNoActiveBackfill(
async () => {
try {
const response = await
request.get(`${baseUrl}/api/v2/backfills?dag_id=${dagId}&limit=100`, {
- timeout: 30_000,
+ timeout: 10_000,
});
if (!response.ok()) {
@@ -649,7 +660,7 @@ export async function apiWaitForBackfillComplete(
async () => {
try {
const response = await
request.get(`${baseUrl}/api/v2/backfills/${backfillId}`, {
- timeout: 30_000,
+ timeout: 10_000,
});
if (!response.ok()) {
@@ -700,7 +711,7 @@ export async function apiCreateBackfill(
const response = await request.post(`${baseUrl}/api/v2/backfills`, {
data: body,
headers: { "Content-Type": "application/json" },
- timeout: 30_000,
+ timeout: 10_000,
});
if (response.status() === 409) {
@@ -710,7 +721,7 @@ export async function apiCreateBackfill(
const retryResponse = await request.post(`${baseUrl}/api/v2/backfills`, {
data: body,
headers: { "Content-Type": "application/json" },
- timeout: 30_000,
+ timeout: 10_000,
});
if (!retryResponse.ok()) {