This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 246c19f113f k8s tests: wait for push task in executor before killing 
scheduler (#67067)
246c19f113f is described below

commit 246c19f113f7e5ca9b676567a9cfdde13c25c00e
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun May 17 21:50:40 2026 +0200

    k8s tests: wait for push task in executor before killing scheduler (#67067)
    
    test_integration_run_dag_with_scheduler_failure is intermittently flaky
    on ARM CI: the scheduler is killed immediately after 
start_job_in_kubernetes,
    so the new scheduler pod (after `kubectl rollout status` returns 
successfully)
    sometimes has to handle the very first scheduling step itself before the
    40s monitor_task timeout expires. push stays in `queued` for the full 40s
    and the test fails with `assert 'queued' == 'success'`.
    
    Two adjustments:
    
    1. Before killing the scheduler, wait until the `push` task instance has
       reached a `queued`-or-later state. That way the original scheduler has
       already handed the task to the executor and the post-restart scheduler
       only needs to drive the downstream dependency for `puller`, not pick up
       `push` from scratch.
    
    2. Bump the post-restart monitor_task timeout from 40s to 120s. The
       previous "fail fast if failing" budget races with scheduler-loop warm-up
       under load; 120s is still fast for a successful run and gives a clear
       margin for the legitimate cases.
    
    This is a residual flake left after #46502 — the rollout-status wait fixed
    the worst of it, but the race between "pod is running" and "scheduler loop
    is actually scheduling" remained.
    
    Reopens the spirit of #45145.
---
 .../tests/kubernetes_tests/test_base.py            | 26 ++++++++++++++++++++++
 .../tests/kubernetes_tests/test_other_executors.py | 23 ++++++++++++++++---
 2 files changed, 46 insertions(+), 3 deletions(-)

diff --git a/kubernetes-tests/tests/kubernetes_tests/test_base.py 
b/kubernetes-tests/tests/kubernetes_tests/test_base.py
index bffc6574153..caa82946718 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_base.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_base.py
@@ -242,6 +242,32 @@ class BaseK8STest:
             print(f"The expected state is wrong {state} != 
{expected_final_state} (expected)!")
         assert state == expected_final_state
 
+    def wait_until_task_in_executor(self, host, dag_run_id, dag_id, task_id, 
timeout=60):
+        """Poll until the task instance has been handed to the executor.
+
+        Once the state is ``queued`` (or any post-queued state), the scheduler
+        has already pushed the task to the executor queue, so a subsequent
+        scheduler crash does not race with the very first scheduling step.
+        """
+        deadline = time.monotonic() + timeout
+        post_queued_states = {"queued", "running", "success", "failed", 
"upstream_failed", "removed"}
+        get_string = 
f"http://{host}/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}";
+        state: str | None = None
+        while time.monotonic() < deadline:
+            try:
+                result = self.session.get(get_string)
+                if result.status_code == 200:
+                    state = result.json().get("state")
+                    print(f"[wait_until_task_in_executor] {task_id} 
state={state}")
+                    if state in post_queued_states:
+                        return state
+            except requests.exceptions.ConnectionError as exc:
+                print(f"[wait_until_task_in_executor] api call failed, 
retrying. error={exc}")
+            time.sleep(2)
+        raise AssertionError(
+            f"task {task_id} did not reach a post-queued state within 
{timeout}s (last seen state: {state})"
+        )
+
     @staticmethod
     def ensure_resource_health(
         resource_name: str,
diff --git a/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py 
b/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
index f2efcce5bde..a13e56437f7 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
@@ -54,6 +54,18 @@ class TestCeleryAndLocalExecutor(BaseK8STest):
 
         dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id, 
self.host)
 
+        # Make sure the first task has already been handed to the executor 
before
+        # we kill the scheduler. Otherwise the scheduler-kill races with the 
very
+        # first scheduling step, and the post-restart scheduler has to re-pick 
the
+        # task itself — making the post-restart monitor timeouts unreliable.
+        self.wait_until_task_in_executor(
+            host=self.host,
+            dag_run_id=dag_run_id,
+            dag_id=dag_id,
+            task_id="push",
+            timeout=60,
+        )
+
         self._delete_airflow_pod("scheduler")
 
         # Wait for the scheduler to be recreated
@@ -65,14 +77,19 @@ class TestCeleryAndLocalExecutor(BaseK8STest):
             raise ValueError(f"Unknown executor {EXECUTOR}")
         self.ensure_resource_health("airflow-scheduler", 
resource_type=scheduler_resource_type)
 
-        # Wait some time for the operator to complete
+        # `push` is already in the executor at this point, but `kubectl rollout
+        # status` returns when the new scheduler pod is *running*, not when the
+        # scheduler loop has resumed processing. Give the worker / new 
scheduler
+        # enough headroom to drive push → success and then schedule the
+        # downstream puller. 40s used to be the "fail fast" budget — in
+        # practice that races with scheduler-loop warm-up.
         self.monitor_task(
             host=self.host,
             dag_run_id=dag_run_id,
             dag_id=dag_id,
             task_id="push",
             expected_final_state="success",
-            timeout=40,  # This should fail fast if failing
+            timeout=120,
         )
 
         self.monitor_task(
@@ -81,7 +98,7 @@ class TestCeleryAndLocalExecutor(BaseK8STest):
             dag_id=dag_id,
             task_id="puller",
             expected_final_state="success",
-            timeout=40,
+            timeout=120,
         )
 
         self.ensure_dag_expected_state(

Reply via email to