This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 246c19f113f k8s tests: wait for push task in executor before killing
scheduler (#67067)
246c19f113f is described below
commit 246c19f113f7e5ca9b676567a9cfdde13c25c00e
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun May 17 21:50:40 2026 +0200
k8s tests: wait for push task in executor before killing scheduler (#67067)
test_integration_run_dag_with_scheduler_failure is intermittently flaky
on ARM CI: the scheduler is killed immediately after
start_job_in_kubernetes,
so the new scheduler pod (after `kubectl rollout status` returns
successfully)
sometimes has to handle the very first scheduling step itself before the
40s monitor_task timeout expires. push stays in `queued` for the full 40s
and the test fails with `assert 'queued' == 'success'`.
Two adjustments:
1. Before killing the scheduler, wait until the `push` task instance has
reached a `queued`-or-later state. That way the original scheduler has
already handed the task to the executor and the post-restart scheduler
only needs to drive the downstream dependency for `puller`, not pick up
`push` from scratch.
2. Bump the post-restart monitor_task timeout from 40s to 120s. The
previous "fail fast if failing" budget races with scheduler-loop warm-up
under load; 120s is still fast for a successful run and gives a clear
margin for the legitimate cases.
This is a residual flake left after #46502 — the rollout-status wait fixed
the worst of it, but the race between "pod is running" and "scheduler loop
is actually scheduling" remained.
Reopens the spirit of #45145.
---
.../tests/kubernetes_tests/test_base.py | 26 ++++++++++++++++++++++
.../tests/kubernetes_tests/test_other_executors.py | 23 ++++++++++++++++---
2 files changed, 46 insertions(+), 3 deletions(-)
diff --git a/kubernetes-tests/tests/kubernetes_tests/test_base.py
b/kubernetes-tests/tests/kubernetes_tests/test_base.py
index bffc6574153..caa82946718 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_base.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_base.py
@@ -242,6 +242,32 @@ class BaseK8STest:
print(f"The expected state is wrong {state} !=
{expected_final_state} (expected)!")
assert state == expected_final_state
+ def wait_until_task_in_executor(self, host, dag_run_id, dag_id, task_id,
timeout=60):
+ """Poll until the task instance has been handed to the executor.
+
+ Once the state is ``queued`` (or any post-queued state), the scheduler
+ has already pushed the task to the executor queue, so a subsequent
+ scheduler crash does not race with the very first scheduling step.
+ """
+ deadline = time.monotonic() + timeout
+ post_queued_states = {"queued", "running", "success", "failed",
"upstream_failed", "removed"}
+ get_string =
f"http://{host}/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}"
+ state: str | None = None
+ while time.monotonic() < deadline:
+ try:
+ result = self.session.get(get_string)
+ if result.status_code == 200:
+ state = result.json().get("state")
+ print(f"[wait_until_task_in_executor] {task_id}
state={state}")
+ if state in post_queued_states:
+ return state
+ except requests.exceptions.ConnectionError as exc:
+ print(f"[wait_until_task_in_executor] api call failed,
retrying. error={exc}")
+ time.sleep(2)
+ raise AssertionError(
+ f"task {task_id} did not reach a post-queued state within
{timeout}s (last seen state: {state})"
+ )
+
@staticmethod
def ensure_resource_health(
resource_name: str,
diff --git a/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
b/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
index f2efcce5bde..a13e56437f7 100644
--- a/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
+++ b/kubernetes-tests/tests/kubernetes_tests/test_other_executors.py
@@ -54,6 +54,18 @@ class TestCeleryAndLocalExecutor(BaseK8STest):
dag_run_id, logical_date = self.start_job_in_kubernetes(dag_id,
self.host)
+ # Make sure the first task has already been handed to the executor
before
+ # we kill the scheduler. Otherwise the scheduler-kill races with the
very
+ # first scheduling step, and the post-restart scheduler has to re-pick
the
+ # task itself — making the post-restart monitor timeouts unreliable.
+ self.wait_until_task_in_executor(
+ host=self.host,
+ dag_run_id=dag_run_id,
+ dag_id=dag_id,
+ task_id="push",
+ timeout=60,
+ )
+
self._delete_airflow_pod("scheduler")
# Wait for the scheduler to be recreated
@@ -65,14 +77,19 @@ class TestCeleryAndLocalExecutor(BaseK8STest):
raise ValueError(f"Unknown executor {EXECUTOR}")
self.ensure_resource_health("airflow-scheduler",
resource_type=scheduler_resource_type)
- # Wait some time for the operator to complete
+ # `push` is already in the executor at this point, but `kubectl rollout
+ # status` returns when the new scheduler pod is *running*, not when the
+ # scheduler loop has resumed processing. Give the worker / new
scheduler
+ # enough headroom to drive push → success and then schedule the
+ # downstream puller. 40s used to be the "fail fast" budget — in
+ # practice that races with scheduler-loop warm-up.
self.monitor_task(
host=self.host,
dag_run_id=dag_run_id,
dag_id=dag_id,
task_id="push",
expected_final_state="success",
- timeout=40, # This should fail fast if failing
+ timeout=120,
)
self.monitor_task(
@@ -81,7 +98,7 @@ class TestCeleryAndLocalExecutor(BaseK8STest):
dag_id=dag_id,
task_id="puller",
expected_final_state="success",
- timeout=40,
+ timeout=120,
)
self.ensure_dag_expected_state(