This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 40da90b8151 Fix flaky OTel integration tests by bounding scheduler
shutdown wait (#67455)
40da90b8151 is described below
commit 40da90b8151dc3efa81b102474bdc46b0e2568b1
Author: Shubham Raj <[email protected]>
AuthorDate: Tue May 26 00:30:10 2026 +0530
Fix flaky OTel integration tests by bounding scheduler shutdown wait
(#67455)
Replace unbounded subprocess.wait() calls with a 30-second grace
period followed by SIGKILL, and raise execution_timeout from 90s to
160s on all three OTel integration test methods.
The 90s limit did not account for the fixed 10s startup sleep, 10s
post-run sleep, and OTel atexit metric flush (up to 10s via
force_flush), leaving no budget for shutdown on slow CI runs. The
new 160s ceiling covers the 140s worst-case path with a 20s buffer.
---
airflow-core/tests/integration/otel/test_otel.py | 34 +++++++++++++-----------
1 file changed, 19 insertions(+), 15 deletions(-)
diff --git a/airflow-core/tests/integration/otel/test_otel.py
b/airflow-core/tests/integration/otel/test_otel.py
index 05c7ea5638e..a6af896b437 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -357,17 +357,13 @@ class TestOtelIntegration:
finally:
# Terminate the processes.
- scheduler_process.terminate()
- scheduler_process.wait()
-
+ self._terminate_process(scheduler_process)
scheduler_status = scheduler_process.poll()
assert scheduler_status is not None, (
"The scheduler_1 process status is None, which means that it
hasn't terminated as expected."
)
- apiserver_process.terminate()
- apiserver_process.wait()
-
+ self._terminate_process(apiserver_process)
apiserver_status = apiserver_process.poll()
assert apiserver_status is not None, (
"The apiserver process status is None, which means that it
hasn't terminated as expected."
@@ -387,7 +383,8 @@ class TestOtelIntegration:
)
return ti
- @pytest.mark.execution_timeout(90)
+ # 160s = 10s startup + 90s dag-run wait + 10s post-run sleep + 30s
shutdown grace + 20s CI buffer
+ @pytest.mark.execution_timeout(160)
@pytest.mark.parametrize(
("legacy_names_on_bool", "legacy_names_exported"),
[
@@ -423,7 +420,7 @@ class TestOtelIntegration:
if legacy_names_exported:
assert set(legacy_metric_names).issubset(metrics_dict.keys())
- @pytest.mark.execution_timeout(90)
+ @pytest.mark.execution_timeout(160)
def test_export_metrics_during_process_shutdown(self, capfd):
out, dag = self.dag_execution_for_testing_metrics(capfd)
@@ -442,7 +439,7 @@ class TestOtelIntegration:
assert set(metrics_to_check).issubset(metrics_dict.keys())
- @pytest.mark.execution_timeout(90)
+ @pytest.mark.execution_timeout(160)
@pytest.mark.parametrize(
("task_span_detail_level", "expected_hierarchy"),
[
@@ -520,17 +517,13 @@ class TestOtelIntegration:
dump_airflow_metadata_db(session)
# Terminate the processes.
- scheduler_process.terminate()
- scheduler_process.wait()
-
+ self._terminate_process(scheduler_process)
scheduler_status = scheduler_process.poll()
assert scheduler_status is not None, (
"The scheduler_1 process status is None, which means that it
hasn't terminated as expected."
)
- apiserver_process.terminate()
- apiserver_process.wait()
-
+ self._terminate_process(apiserver_process)
apiserver_status = apiserver_process.poll()
assert apiserver_status is not None, (
"The apiserver process status is None, which means that it
hasn't terminated as expected."
@@ -572,6 +565,17 @@ class TestOtelIntegration:
nested = get_span_hierarchy()
assert nested == expected_hierarchy
+ @staticmethod
+ def _terminate_process(proc: subprocess.Popen, timeout: int = 30) -> None:
+ # Grace period covers OTel atexit flush (force_flush default: 10s);
+ # SIGKILL is the fallback if the process is still alive after timeout.
+ proc.terminate()
+ try:
+ proc.wait(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ proc.wait()
+
def start_scheduler(self, capture_output: bool = False):
stdout = None if capture_output else subprocess.DEVNULL
stderr = None if capture_output else subprocess.DEVNULL