This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 40da90b8151 Fix flaky OTel integration tests by bounding scheduler 
shutdown wait (#67455)
40da90b8151 is described below

commit 40da90b8151dc3efa81b102474bdc46b0e2568b1
Author: Shubham Raj <[email protected]>
AuthorDate: Tue May 26 00:30:10 2026 +0530

    Fix flaky OTel integration tests by bounding scheduler shutdown wait 
(#67455)
    
    Replace unbounded subprocess.wait() calls with a 30-second grace
    period followed by SIGKILL, and raise execution_timeout from 90s to
    160s on all three OTel integration test methods.
    
    The 90s limit did not account for the fixed 10s startup sleep, 10s
    post-run sleep, and OTel atexit metric flush (up to 10s via
    force_flush), leaving no budget for shutdown on slow CI runs. The
    new 160s ceiling covers the 140s worst-case path with a 20s buffer.
---
 airflow-core/tests/integration/otel/test_otel.py | 34 +++++++++++++-----------
 1 file changed, 19 insertions(+), 15 deletions(-)

diff --git a/airflow-core/tests/integration/otel/test_otel.py 
b/airflow-core/tests/integration/otel/test_otel.py
index 05c7ea5638e..a6af896b437 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -357,17 +357,13 @@ class TestOtelIntegration:
         finally:
             # Terminate the processes.
 
-            scheduler_process.terminate()
-            scheduler_process.wait()
-
+            self._terminate_process(scheduler_process)
             scheduler_status = scheduler_process.poll()
             assert scheduler_status is not None, (
                 "The scheduler_1 process status is None, which means that it 
hasn't terminated as expected."
             )
 
-            apiserver_process.terminate()
-            apiserver_process.wait()
-
+            self._terminate_process(apiserver_process)
             apiserver_status = apiserver_process.poll()
             assert apiserver_status is not None, (
                 "The apiserver process status is None, which means that it 
hasn't terminated as expected."
@@ -387,7 +383,8 @@ class TestOtelIntegration:
             )
         return ti
 
-    @pytest.mark.execution_timeout(90)
+    # 160s = 10s startup + 90s dag-run wait + 10s post-run sleep + 30s 
shutdown grace + 20s CI buffer
+    @pytest.mark.execution_timeout(160)
     @pytest.mark.parametrize(
         ("legacy_names_on_bool", "legacy_names_exported"),
         [
@@ -423,7 +420,7 @@ class TestOtelIntegration:
             if legacy_names_exported:
                 assert set(legacy_metric_names).issubset(metrics_dict.keys())
 
-    @pytest.mark.execution_timeout(90)
+    @pytest.mark.execution_timeout(160)
     def test_export_metrics_during_process_shutdown(self, capfd):
         out, dag = self.dag_execution_for_testing_metrics(capfd)
 
@@ -442,7 +439,7 @@ class TestOtelIntegration:
 
             assert set(metrics_to_check).issubset(metrics_dict.keys())
 
-    @pytest.mark.execution_timeout(90)
+    @pytest.mark.execution_timeout(160)
     @pytest.mark.parametrize(
         ("task_span_detail_level", "expected_hierarchy"),
         [
@@ -520,17 +517,13 @@ class TestOtelIntegration:
                     dump_airflow_metadata_db(session)
 
             # Terminate the processes.
-            scheduler_process.terminate()
-            scheduler_process.wait()
-
+            self._terminate_process(scheduler_process)
             scheduler_status = scheduler_process.poll()
             assert scheduler_status is not None, (
                 "The scheduler_1 process status is None, which means that it 
hasn't terminated as expected."
             )
 
-            apiserver_process.terminate()
-            apiserver_process.wait()
-
+            self._terminate_process(apiserver_process)
             apiserver_status = apiserver_process.poll()
             assert apiserver_status is not None, (
                 "The apiserver process status is None, which means that it 
hasn't terminated as expected."
@@ -572,6 +565,17 @@ class TestOtelIntegration:
         nested = get_span_hierarchy()
         assert nested == expected_hierarchy
 
+    @staticmethod
+    def _terminate_process(proc: subprocess.Popen, timeout: int = 30) -> None:
+        # Grace period covers OTel atexit flush (force_flush default: 10s);
+        # SIGKILL is the fallback if the process is still alive after timeout.
+        proc.terminate()
+        try:
+            proc.wait(timeout=timeout)
+        except subprocess.TimeoutExpired:
+            proc.kill()
+            proc.wait()
+
     def start_scheduler(self, capture_output: bool = False):
         stdout = None if capture_output else subprocess.DEVNULL
         stderr = None if capture_output else subprocess.DEVNULL

Reply via email to