This is an automated email from the ASF dual-hosted git repository.

choo121600 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 91a994f4716 E2E: prevent Dag parsing races and improve Dag run cleanup 
(#64540)
91a994f4716 is described below

commit 91a994f47169b6b8adec7df1ec3b28c0b26bb2c2
Author: Yeonguk Choo <[email protected]>
AuthorDate: Thu Apr 2 18:20:02 2026 +0900

    E2E: prevent Dag parsing races and improve Dag run cleanup (#64540)
---
 airflow-core/src/airflow/ui/playwright.config.ts   |  2 +-
 .../src/airflow/ui/tests/e2e/global-setup.ts       | 22 ++++++-
 .../src/airflow/ui/tests/e2e/global-teardown.ts    |  4 +-
 .../src/airflow/ui/tests/e2e/utils/test-helpers.ts | 73 +++++++++++++---------
 4 files changed, 66 insertions(+), 35 deletions(-)

diff --git a/airflow-core/src/airflow/ui/playwright.config.ts 
b/airflow-core/src/airflow/ui/playwright.config.ts
index 84e5e0d937b..a7632fdf54f 100644
--- a/airflow-core/src/airflow/ui/playwright.config.ts
+++ b/airflow-core/src/airflow/ui/playwright.config.ts
@@ -50,7 +50,7 @@ export const AUTH_FILE = path.join(currentDirname, 
"playwright/.auth/user.json")
 
 export default defineConfig({
   expect: {
-    timeout: 5000,
+    timeout: 10_000,
   },
   forbidOnly: process.env.CI !== undefined && process.env.CI !== "",
   fullyParallel: true,
diff --git a/airflow-core/src/airflow/ui/tests/e2e/global-setup.ts 
b/airflow-core/src/airflow/ui/tests/e2e/global-setup.ts
index 6157ddd52f2..b48163003e3 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/global-setup.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/global-setup.ts
@@ -16,12 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-import { chromium, firefox, webkit, type FullConfig } from "@playwright/test";
+import { chromium, firefox, request, webkit, type FullConfig } from 
"@playwright/test";
 import fs from "node:fs";
 import path from "node:path";
 
 import { AUTH_FILE, testConfig } from "../../playwright.config";
 import { LoginPage } from "./pages/LoginPage";
+import { waitForDagReady } from "./utils/test-helpers";
 
 const browsers = { chromium, firefox, webkit };
 
@@ -53,6 +54,25 @@ async function globalSetup(config: FullConfig) {
   } finally {
     await browser.close();
   }
+
+  // Pre-warm: wait for all Dags used by E2E tests to be parsed before workers 
start.
+  const apiContext = await request.newContext({
+    baseURL,
+    storageState: AUTH_FILE,
+  });
+
+  try {
+    await Promise.all(
+      [
+        testConfig.testDag.id,
+        testConfig.testDag.hitlId,
+        testConfig.xcomDag.id,
+        "example_python_operator",
+      ].map((dagId) => waitForDagReady(apiContext, dagId, { timeout: 300_000 
})),
+    );
+  } finally {
+    await apiContext.dispose();
+  }
 }
 
 export default globalSetup;
diff --git a/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts 
b/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts
index aae6cc588aa..2c0c2d787ab 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/global-teardown.ts
@@ -48,8 +48,8 @@ async function globalTeardown() {
         timeout: 10_000,
       });
     } catch (error) {
-      console.warn(
-        `[e2e teardown] Failed to re-pause DAG ${dagId}: ${error instanceof 
Error ? error.message : String(error)}`,
+      console.debug(
+        `[e2e teardown] Could not re-pause DAG ${dagId}: ${error instanceof 
Error ? error.message : String(error)}`,
       );
     }
   }
diff --git a/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts 
b/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
index 3f651289567..62c5535b8e0 100644
--- a/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
+++ b/airflow-core/src/airflow/ui/tests/e2e/utils/test-helpers.ts
@@ -69,7 +69,7 @@ export async function waitForDagReady(
     .poll(
       async () => {
         try {
-          const response = await 
request.get(`${baseUrl}/api/v2/dags/${dagId}`, { timeout: 30_000 });
+          const response = await 
request.get(`${baseUrl}/api/v2/dags/${dagId}`, { timeout: 10_000 });
 
           return response.ok();
         } catch {
@@ -107,7 +107,7 @@ export async function apiTriggerDagRun(
         note: "e2e test",
       },
       headers: { "Content-Type": "application/json" },
-      timeout: 30_000,
+      timeout: 10_000,
     });
 
     if (!response.ok()) {
@@ -152,7 +152,7 @@ export async function apiCreateDagRun(source: RequestLike, 
dagId: string, data:
         note: data.note ?? "e2e test",
       },
       headers: { "Content-Type": "application/json" },
-      timeout: 30_000,
+      timeout: 10_000,
     });
 
     if (!response.ok()) {
@@ -187,7 +187,7 @@ export async function apiSetDagRunState(
     const response = await 
request.patch(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
       data: { state },
       headers: { "Content-Type": "application/json" },
-      timeout: 30_000,
+      timeout: 10_000,
     });
 
     if (response.status() !== 409 && !response.ok()) {
@@ -213,7 +213,7 @@ export async function waitForDagRunStatus(
       async () => {
         try {
           const response = await 
request.get(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
-            timeout: 30_000,
+            timeout: 10_000,
           });
 
           if (!response.ok()) {
@@ -272,7 +272,7 @@ export async function waitForTaskInstanceState(
         try {
           const response = await request.get(
             
`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}/taskInstances/${taskId}`,
-            { timeout: 30_000 },
+            { timeout: 10_000 },
           );
 
           if (!response.ok()) {
@@ -331,7 +331,7 @@ export async function apiRespondToHITL(
       {
         data: { chosen_options: chosenOptions, params_input: paramsInput },
         headers: { "Content-Type": "application/json" },
-        timeout: 30_000,
+        timeout: 10_000,
       },
     );
 
@@ -478,7 +478,7 @@ export async function apiDeleteDagRun(source: RequestLike, 
dagId: string, runId:
   const request = getRequestContext(source);
 
   const response = await 
request.delete(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
-    timeout: 30_000,
+    timeout: 10_000,
   });
 
   // 404 = already deleted by another worker or cleanup; acceptable.
@@ -497,29 +497,40 @@ export async function apiDeleteDagRun(source: 
RequestLike, dagId: string, runId:
  * Delete a DAG run, logging (not throwing) unexpected errors.
  * Use this in fixture teardown where cleanup must not abort the loop.
  * 404 is already handled inside `apiDeleteDagRun`.
- * 409 (running state) is handled by force-failing the run first, then 
retrying.
+ *
+ * Strategy: force-fail the run first so the server doesn't wait for
+ * running tasks during deletion, then delete with one retry on timeout.
  */
 export async function safeCleanupDagRun(source: RequestLike, dagId: string, 
runId: string): Promise<void> {
+  const request = getRequestContext(source);
+
   try {
-    await apiDeleteDagRun(source, dagId, runId);
-  } catch (error) {
-    const message = error instanceof Error ? error.message : String(error);
-
-    // 409 = DAG run is still running — force-fail, then retry deletion.
-    if (message.includes("409")) {
-      try {
-        await apiSetDagRunState(source, { dagId, runId, state: "failed" });
-        await apiDeleteDagRun(source, dagId, runId);
-      } catch (retryError) {
-        console.warn(
-          `[e2e cleanup] Retry failed for DAG run ${dagId}/${runId}: 
${retryError instanceof Error ? retryError.message : String(retryError)}`,
-        );
+    await request.patch(`${baseUrl}/api/v2/dags/${dagId}/dagRuns/${runId}`, {
+      data: { state: "failed" },
+      headers: { "Content-Type": "application/json" },
+      timeout: 10_000,
+    });
+  } catch {
+    // Run may already be terminal or deleted — ignore.
+  }
+
+  for (let attempt = 0; attempt < 2; attempt++) {
+    try {
+      await apiDeleteDagRun(source, dagId, runId);
+
+      return;
+    } catch (error) {
+      const message = error instanceof Error ? error.message : String(error);
+      const isTimeout = message.includes("Timeout");
+
+      if (isTimeout && attempt === 0) {
+        continue;
       }
 
+      console.warn(`[e2e cleanup] Failed to delete DAG run ${dagId}/${runId}: 
${message}`);
+
       return;
     }
-
-    console.warn(`[e2e cleanup] Failed to delete DAG run ${dagId}/${runId}: 
${message}`);
   }
 }
 
@@ -549,7 +560,7 @@ export async function apiDeleteVariable(source: 
RequestLike, key: string): Promi
   const request = getRequestContext(source);
 
   const response = await 
request.delete(`${baseUrl}/api/v2/variables/${encodeURIComponent(key)}`, {
-    timeout: 30_000,
+    timeout: 10_000,
   });
 
   // 404 = already deleted by another worker or cleanup; acceptable.
@@ -569,7 +580,7 @@ export async function apiCancelBackfill(source: 
RequestLike, backfillId: number)
   const request = getRequestContext(source);
 
   const response = await 
request.put(`${baseUrl}/api/v2/backfills/${backfillId}/cancel`, {
-    timeout: 30_000,
+    timeout: 10_000,
   });
 
   if (response.status() !== 200 && response.status() !== 409) {
@@ -582,7 +593,7 @@ export async function apiCancelAllActiveBackfills(source: 
RequestLike, dagId: st
   const request = getRequestContext(source);
 
   const response = await 
request.get(`${baseUrl}/api/v2/backfills?dag_id=${dagId}&limit=100`, {
-    timeout: 30_000,
+    timeout: 10_000,
   });
 
   if (!response.ok()) {
@@ -611,7 +622,7 @@ export async function apiWaitForNoActiveBackfill(
       async () => {
         try {
           const response = await 
request.get(`${baseUrl}/api/v2/backfills?dag_id=${dagId}&limit=100`, {
-            timeout: 30_000,
+            timeout: 10_000,
           });
 
           if (!response.ok()) {
@@ -649,7 +660,7 @@ export async function apiWaitForBackfillComplete(
       async () => {
         try {
           const response = await 
request.get(`${baseUrl}/api/v2/backfills/${backfillId}`, {
-            timeout: 30_000,
+            timeout: 10_000,
           });
 
           if (!response.ok()) {
@@ -700,7 +711,7 @@ export async function apiCreateBackfill(
   const response = await request.post(`${baseUrl}/api/v2/backfills`, {
     data: body,
     headers: { "Content-Type": "application/json" },
-    timeout: 30_000,
+    timeout: 10_000,
   });
 
   if (response.status() === 409) {
@@ -710,7 +721,7 @@ export async function apiCreateBackfill(
     const retryResponse = await request.post(`${baseUrl}/api/v2/backfills`, {
       data: body,
       headers: { "Content-Type": "application/json" },
-      timeout: 30_000,
+      timeout: 10_000,
     });
 
     if (!retryResponse.ok()) {

Reply via email to