Copilot commented on code in PR #54666:
URL: https://github.com/apache/airflow/pull/54666#discussion_r2301304942


##########
airflow-core/src/airflow/jobs/asyncio_monitor.py:
##########
@@ -0,0 +1,244 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import contextlib
+import dataclasses
+import logging
+import sys
+import threading
+import time
+import traceback
+from collections import deque
+from time import perf_counter
+from types import FrameType
+from typing import TYPE_CHECKING
+
+import structlog
+
+from airflow._shared.timezones import timezone
+from airflow.stats import Stats
+
+if TYPE_CHECKING:
+    from structlog.typing import FilteringBoundLogger
+
+logger = logging.getLogger(__name__)
+
+
+log: FilteringBoundLogger = structlog.get_logger(logger_name=__name__)
+
+
[email protected]
+class StallSample:
+    """Single stack capture during a stall."""
+
+    taken_at_perf: float
+    stack_text: str
+
+
[email protected]
+class StallIncident:
+    """A coalesced stall incident with start/end, samples, and suspected 
tasks."""
+
+    started_at_perf: float
+    ended_at_perf: float | None
+    threshold: float
+    samples: list[StallSample]
+    suspected_tasks: list[str]  # filled on correlation
+    # Human-readable timestamps for convenience
+    started_at_utc: str = dataclasses.field(default="")
+    ended_at_utc: str = dataclasses.field(default="")
+
+    def duration(self) -> float | None:
+        return None if self.ended_at_perf is None else (self.ended_at_perf - 
self.started_at_perf)
+
+
+def _utc_now_str() -> str:
+    return timezone.utcnow().isoformat(timespec="seconds")
+
+
+def _format_stack_from_frame(frame: FrameType, max_frames: int) -> str:
+    # Limit frames from the BOTTOM (most recent last) to keep prints short.
+    limit: int | None = max_frames if max_frames > 0 else None
+    return "".join(traceback.format_stack(frame, limit=limit))
+
+
+class AsyncioStallMonitor:
+    """
+    Hybrid stall monitor.
+
+    Detection path:
+      - In-loop heartbeat updates a shared timestamp.
+      - Background thread checks wall time vs. last heartbeat; if exceeds 
threshold,
+        we enter "stall" state, sample loop thread stack, and log.
+      - While stalled, we periodically re-sample (min_report_interval).
+      - When the loop catches up (heartbeat moves), we close the incident and
+        correlate with asyncio tasks inside the loop thread.
+    """
+
+    def __init__(
+        self,
+        loop: asyncio.AbstractEventLoop | None = None,
+        *,
+        threshold: float = 0.2,  # seconds without heartbeat => stall
+        heartbeat_interval: float = 0.1,  # loop posts heartbeat this often
+        min_report_interval: float = 0.5,  # during a long stall, log at most 
this often
+        max_frames: int = 25,  # limit stack frames captured
+        history_size: int = 1,  # TODO: increase and make history more useful 
in the future

Review Comment:
   [nitpick] The TODO comment should specify what changes are planned for 
making history more useful. Consider documenting the intended future 
functionality or removing the TODO if no specific plans exist.
   ```suggestion
           history_size: int = 1,  # TODO: Increase to retain multiple stall 
incidents for later analysis and debugging.
   ```



##########
airflow-core/tests/unit/jobs/test_asyncio_monitor.py:
##########
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import time
+
+import pytest
+
+from airflow.jobs.asyncio_monitor import AsyncioStallMonitor, StallIncident
+
+
+async def wait_until(pred, timeout=2.0, interval=0.01):
+    start = time.monotonic()
+    while time.monotonic() - start < timeout:
+        if pred():
+            return True
+        await asyncio.sleep(interval)
+    return False
+
+
[email protected]
+async def test_start_sets_ident_and_heartbeat():
+    loop = asyncio.get_running_loop()
+    mon = AsyncioStallMonitor(loop=loop, threshold=0.20, 
heartbeat_interval=0.05)
+    try:
+        mon.start()
+        # let call_soon callbacks run
+        await asyncio.sleep(0.05)
+
+        assert isinstance(mon._loop_thread_ident, int)
+
+        hb1 = mon._hb_perf
+        await asyncio.sleep(0.08)
+        hb2 = mon._hb_perf
+        assert hb2 > hb1
+    finally:
+        mon.stop()
+
+
[email protected]
+async def test_detects_stall_and_records_history():
+    loop = asyncio.get_running_loop()
+    mon = AsyncioStallMonitor(
+        loop=loop,
+        threshold=0.08,
+        heartbeat_interval=0.01,
+        min_report_interval=0.02,
+        max_frames=50,
+    )
+
+    async def bad_task():
+        import time as _t
+
+        _t.sleep(0.2)  # noqa: ASYNC251  -- intentional: simulate event-loop 
stall

Review Comment:
   [nitpick] The magic number 0.2 should be extracted to a named constant or 
variable to improve readability and make the test threshold relationship 
clearer.
   ```suggestion
       STALL_DURATION = 0.2  # Duration to simulate event-loop stall
   
       async def bad_task():
           import time as _t
   
           _t.sleep(STALL_DURATION)  # noqa: ASYNC251  -- intentional: simulate 
event-loop stall
   ```



##########
airflow-core/src/airflow/jobs/asyncio_monitor.py:
##########
@@ -0,0 +1,244 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import asyncio
+import contextlib
+import dataclasses
+import logging
+import sys
+import threading
+import time
+import traceback
+from collections import deque
+from time import perf_counter
+from types import FrameType
+from typing import TYPE_CHECKING
+
+import structlog
+
+from airflow._shared.timezones import timezone
+from airflow.stats import Stats
+
+if TYPE_CHECKING:
+    from structlog.typing import FilteringBoundLogger
+
+logger = logging.getLogger(__name__)
+
+
+log: FilteringBoundLogger = structlog.get_logger(logger_name=__name__)
+
+
[email protected]
+class StallSample:
+    """Single stack capture during a stall."""
+
+    taken_at_perf: float
+    stack_text: str
+
+
[email protected]
+class StallIncident:
+    """A coalesced stall incident with start/end, samples, and suspected 
tasks."""
+
+    started_at_perf: float
+    ended_at_perf: float | None
+    threshold: float
+    samples: list[StallSample]
+    suspected_tasks: list[str]  # filled on correlation
+    # Human-readable timestamps for convenience
+    started_at_utc: str = dataclasses.field(default="")
+    ended_at_utc: str = dataclasses.field(default="")
+
+    def duration(self) -> float | None:
+        return None if self.ended_at_perf is None else (self.ended_at_perf - 
self.started_at_perf)
+
+
+def _utc_now_str() -> str:
+    return timezone.utcnow().isoformat(timespec="seconds")
+
+
+def _format_stack_from_frame(frame: FrameType, max_frames: int) -> str:
+    # Limit frames from the BOTTOM (most recent last) to keep prints short.
+    limit: int | None = max_frames if max_frames > 0 else None
+    return "".join(traceback.format_stack(frame, limit=limit))
+
+
+class AsyncioStallMonitor:
+    """
+    Hybrid stall monitor.
+
+    Detection path:
+      - In-loop heartbeat updates a shared timestamp.
+      - Background thread checks wall time vs. last heartbeat; if exceeds 
threshold,
+        we enter "stall" state, sample loop thread stack, and log.
+      - While stalled, we periodically re-sample (min_report_interval).
+      - When the loop catches up (heartbeat moves), we close the incident and
+        correlate with asyncio tasks inside the loop thread.
+    """
+
+    def __init__(
+        self,
+        loop: asyncio.AbstractEventLoop | None = None,
+        *,
+        threshold: float = 0.2,  # seconds without heartbeat => stall
+        heartbeat_interval: float = 0.1,  # loop posts heartbeat this often
+        min_report_interval: float = 0.5,  # during a long stall, log at most 
this often
+        max_frames: int = 25,  # limit stack frames captured
+        history_size: int = 1,  # TODO: increase and make history more useful 
in the future
+    ) -> None:
+        self.loop = loop or asyncio.get_event_loop()
+        self.threshold = float(threshold)
+        self.heartbeat_interval = float(heartbeat_interval)
+        self.min_report_interval = float(min_report_interval)
+        self.max_frames = int(max_frames)
+
+        self._hb_perf: float = perf_counter()
+        self._running = False
+        self._thread: threading.Thread | None = None
+        self._loop_thread_ident: int | None = None
+
+        self._incident: StallIncident | None = None
+        self._last_report_perf: float = 0.0
+
+        self._lock = threading.Lock()
+        self.history: deque[StallIncident] = deque(maxlen=history_size)
+
+    def start(self) -> None:
+        if self._running:
+            return
+        self._running = True
+
+        def _capture_thread_ident():
+            self._loop_thread_ident = threading.get_ident()
+
+        self.loop.call_soon(_capture_thread_ident)
+
+        # Kick off the heartbeat loop
+        self.loop.call_soon(self._heartbeat)
+
+        # Start watchdog thread
+        self._thread = threading.Thread(target=self._watchdog, 
name="asyncio-stall-watchdog", daemon=True)
+        self._thread.start()
+        log.info(
+            "AsyncioStallMonitor started: threshold=%.3fs heartbeat=%.3fs",
+            self.threshold,
+            self.heartbeat_interval,
+        )
+
+    def stop(self) -> None:
+        self._running = False
+        log.info("AsyncioStallMonitor stopping...")
+
+    def __enter__(self):
+        self.start()
+        return self
+
+    def __exit__(self, exc_type, exc, tb):
+        self.stop()
+
+    def _heartbeat(self) -> None:
+        if not self._running:
+            return
+        self._hb_perf = perf_counter()
+        # Reschedule next beat
+        self.loop.call_later(self.heartbeat_interval, self._heartbeat)
+
+    def _watchdog(self) -> None:
+        while self._running:
+            time.sleep(self.heartbeat_interval)
+            now = perf_counter()
+            gap = now - self._hb_perf
+
+            if gap > self.threshold:
+                # We are in a stall
+                if self._incident is None:
+                    # Stall START
+                    self._incident = StallIncident(
+                        started_at_perf=now - gap,  # best estimate: when 
heartbeat stopped advancing
+                        ended_at_perf=None,
+                        threshold=self.threshold,
+                        samples=[],
+                        suspected_tasks=[],
+                        started_at_utc=_utc_now_str(),
+                        ended_at_utc="",
+                    )
+                    self._last_report_perf = 0.0
+                    self._sample_and_log(now, phase="start")
+
+                # During stall: periodic updates
+                elif now - self._last_report_perf >= self.min_report_interval:
+                    self._sample_and_log(now, phase="update")
+
+            else:
+                # No stall *or* stall ended: close & correlate if needed
+                if self._incident is not None:
+                    self._incident.ended_at_perf = now
+                    self._incident.ended_at_utc = _utc_now_str()
+                    incident = self._incident
+                    self._incident = None
+                    self._post_incident_correlation(incident)
+
+    def _sample_and_log(self, now_perf: float, *, phase: str) -> None:
+        stack_text = self._capture_loop_stack_bounded()
+        if stack_text is None:
+            stack_text = "<loop thread stack unavailable>"
+
+        sample = StallSample(taken_at_perf=now_perf, stack_text=stack_text)
+        if self._incident:
+            self._incident.samples.append(sample)
+
+        if phase == "start":
+            log.warning("Event loop stall detected (gap≄%.3fs). Captured loop 
stack.", self.threshold)
+        else:
+            log.warning(
+                "Event loop still stalled (%.3fs since start).",
+                now_perf - (self._incident.started_at_perf if self._incident 
else now_perf),
+            )
+        log.warning("%s", stack_text)
+        Stats.incr("triggers.blocked_main_thread")
+
+        self._last_report_perf = now_perf
+
+    def _capture_loop_stack_bounded(self) -> str | None:
+        ident = self._loop_thread_ident
+        if ident is None:
+            return None
+        try:
+            frame = sys._current_frames().get(ident)

Review Comment:
   [nitpick] Using sys._current_frames() can expose sensitive information in 
stack traces. Consider adding a note in the docstring about potential security 
implications when logging stack traces in production environments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to