Copilot commented on code in PR #54666: URL: https://github.com/apache/airflow/pull/54666#discussion_r2301304942
########## airflow-core/src/airflow/jobs/asyncio_monitor.py: ########## @@ -0,0 +1,244 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import asyncio +import contextlib +import dataclasses +import logging +import sys +import threading +import time +import traceback +from collections import deque +from time import perf_counter +from types import FrameType +from typing import TYPE_CHECKING + +import structlog + +from airflow._shared.timezones import timezone +from airflow.stats import Stats + +if TYPE_CHECKING: + from structlog.typing import FilteringBoundLogger + +logger = logging.getLogger(__name__) + + +log: FilteringBoundLogger = structlog.get_logger(logger_name=__name__) + + [email protected] +class StallSample: + """Single stack capture during a stall.""" + + taken_at_perf: float + stack_text: str + + [email protected] +class StallIncident: + """A coalesced stall incident with start/end, samples, and suspected tasks.""" + + started_at_perf: float + ended_at_perf: float | None + threshold: float + samples: list[StallSample] + suspected_tasks: list[str] # filled on correlation + # Human-readable timestamps for convenience + started_at_utc: str = dataclasses.field(default="") + ended_at_utc: str = dataclasses.field(default="") + + def duration(self) -> float | None: + return None if self.ended_at_perf is None else (self.ended_at_perf - self.started_at_perf) + + +def _utc_now_str() -> str: + return timezone.utcnow().isoformat(timespec="seconds") + + +def _format_stack_from_frame(frame: FrameType, max_frames: int) -> str: + # Limit frames from the BOTTOM (most recent last) to keep prints short. + limit: int | None = max_frames if max_frames > 0 else None + return "".join(traceback.format_stack(frame, limit=limit)) + + +class AsyncioStallMonitor: + """ + Hybrid stall monitor. + + Detection path: + - In-loop heartbeat updates a shared timestamp. + - Background thread checks wall time vs. last heartbeat; if exceeds threshold, + we enter "stall" state, sample loop thread stack, and log. + - While stalled, we periodically re-sample (min_report_interval). + - When the loop catches up (heartbeat moves), we close the incident and + correlate with asyncio tasks inside the loop thread. + """ + + def __init__( + self, + loop: asyncio.AbstractEventLoop | None = None, + *, + threshold: float = 0.2, # seconds without heartbeat => stall + heartbeat_interval: float = 0.1, # loop posts heartbeat this often + min_report_interval: float = 0.5, # during a long stall, log at most this often + max_frames: int = 25, # limit stack frames captured + history_size: int = 1, # TODO: increase and make history more useful in the future Review Comment: [nitpick] The TODO comment should specify what changes are planned for making history more useful. Consider documenting the intended future functionality or removing the TODO if no specific plans exist. ```suggestion history_size: int = 1, # TODO: Increase to retain multiple stall incidents for later analysis and debugging. ``` ########## airflow-core/tests/unit/jobs/test_asyncio_monitor.py: ########## @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import asyncio +import time + +import pytest + +from airflow.jobs.asyncio_monitor import AsyncioStallMonitor, StallIncident + + +async def wait_until(pred, timeout=2.0, interval=0.01): + start = time.monotonic() + while time.monotonic() - start < timeout: + if pred(): + return True + await asyncio.sleep(interval) + return False + + [email protected] +async def test_start_sets_ident_and_heartbeat(): + loop = asyncio.get_running_loop() + mon = AsyncioStallMonitor(loop=loop, threshold=0.20, heartbeat_interval=0.05) + try: + mon.start() + # let call_soon callbacks run + await asyncio.sleep(0.05) + + assert isinstance(mon._loop_thread_ident, int) + + hb1 = mon._hb_perf + await asyncio.sleep(0.08) + hb2 = mon._hb_perf + assert hb2 > hb1 + finally: + mon.stop() + + [email protected] +async def test_detects_stall_and_records_history(): + loop = asyncio.get_running_loop() + mon = AsyncioStallMonitor( + loop=loop, + threshold=0.08, + heartbeat_interval=0.01, + min_report_interval=0.02, + max_frames=50, + ) + + async def bad_task(): + import time as _t + + _t.sleep(0.2) # noqa: ASYNC251 -- intentional: simulate event-loop stall Review Comment: [nitpick] The magic number 0.2 should be extracted to a named constant or variable to improve readability and make the test threshold relationship clearer. ```suggestion STALL_DURATION = 0.2 # Duration to simulate event-loop stall async def bad_task(): import time as _t _t.sleep(STALL_DURATION) # noqa: ASYNC251 -- intentional: simulate event-loop stall ``` ########## airflow-core/src/airflow/jobs/asyncio_monitor.py: ########## @@ -0,0 +1,244 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import asyncio +import contextlib +import dataclasses +import logging +import sys +import threading +import time +import traceback +from collections import deque +from time import perf_counter +from types import FrameType +from typing import TYPE_CHECKING + +import structlog + +from airflow._shared.timezones import timezone +from airflow.stats import Stats + +if TYPE_CHECKING: + from structlog.typing import FilteringBoundLogger + +logger = logging.getLogger(__name__) + + +log: FilteringBoundLogger = structlog.get_logger(logger_name=__name__) + + [email protected] +class StallSample: + """Single stack capture during a stall.""" + + taken_at_perf: float + stack_text: str + + [email protected] +class StallIncident: + """A coalesced stall incident with start/end, samples, and suspected tasks.""" + + started_at_perf: float + ended_at_perf: float | None + threshold: float + samples: list[StallSample] + suspected_tasks: list[str] # filled on correlation + # Human-readable timestamps for convenience + started_at_utc: str = dataclasses.field(default="") + ended_at_utc: str = dataclasses.field(default="") + + def duration(self) -> float | None: + return None if self.ended_at_perf is None else (self.ended_at_perf - self.started_at_perf) + + +def _utc_now_str() -> str: + return timezone.utcnow().isoformat(timespec="seconds") + + +def _format_stack_from_frame(frame: FrameType, max_frames: int) -> str: + # Limit frames from the BOTTOM (most recent last) to keep prints short. + limit: int | None = max_frames if max_frames > 0 else None + return "".join(traceback.format_stack(frame, limit=limit)) + + +class AsyncioStallMonitor: + """ + Hybrid stall monitor. + + Detection path: + - In-loop heartbeat updates a shared timestamp. + - Background thread checks wall time vs. last heartbeat; if exceeds threshold, + we enter "stall" state, sample loop thread stack, and log. + - While stalled, we periodically re-sample (min_report_interval). + - When the loop catches up (heartbeat moves), we close the incident and + correlate with asyncio tasks inside the loop thread. + """ + + def __init__( + self, + loop: asyncio.AbstractEventLoop | None = None, + *, + threshold: float = 0.2, # seconds without heartbeat => stall + heartbeat_interval: float = 0.1, # loop posts heartbeat this often + min_report_interval: float = 0.5, # during a long stall, log at most this often + max_frames: int = 25, # limit stack frames captured + history_size: int = 1, # TODO: increase and make history more useful in the future + ) -> None: + self.loop = loop or asyncio.get_event_loop() + self.threshold = float(threshold) + self.heartbeat_interval = float(heartbeat_interval) + self.min_report_interval = float(min_report_interval) + self.max_frames = int(max_frames) + + self._hb_perf: float = perf_counter() + self._running = False + self._thread: threading.Thread | None = None + self._loop_thread_ident: int | None = None + + self._incident: StallIncident | None = None + self._last_report_perf: float = 0.0 + + self._lock = threading.Lock() + self.history: deque[StallIncident] = deque(maxlen=history_size) + + def start(self) -> None: + if self._running: + return + self._running = True + + def _capture_thread_ident(): + self._loop_thread_ident = threading.get_ident() + + self.loop.call_soon(_capture_thread_ident) + + # Kick off the heartbeat loop + self.loop.call_soon(self._heartbeat) + + # Start watchdog thread + self._thread = threading.Thread(target=self._watchdog, name="asyncio-stall-watchdog", daemon=True) + self._thread.start() + log.info( + "AsyncioStallMonitor started: threshold=%.3fs heartbeat=%.3fs", + self.threshold, + self.heartbeat_interval, + ) + + def stop(self) -> None: + self._running = False + log.info("AsyncioStallMonitor stopping...") + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc, tb): + self.stop() + + def _heartbeat(self) -> None: + if not self._running: + return + self._hb_perf = perf_counter() + # Reschedule next beat + self.loop.call_later(self.heartbeat_interval, self._heartbeat) + + def _watchdog(self) -> None: + while self._running: + time.sleep(self.heartbeat_interval) + now = perf_counter() + gap = now - self._hb_perf + + if gap > self.threshold: + # We are in a stall + if self._incident is None: + # Stall START + self._incident = StallIncident( + started_at_perf=now - gap, # best estimate: when heartbeat stopped advancing + ended_at_perf=None, + threshold=self.threshold, + samples=[], + suspected_tasks=[], + started_at_utc=_utc_now_str(), + ended_at_utc="", + ) + self._last_report_perf = 0.0 + self._sample_and_log(now, phase="start") + + # During stall: periodic updates + elif now - self._last_report_perf >= self.min_report_interval: + self._sample_and_log(now, phase="update") + + else: + # No stall *or* stall ended: close & correlate if needed + if self._incident is not None: + self._incident.ended_at_perf = now + self._incident.ended_at_utc = _utc_now_str() + incident = self._incident + self._incident = None + self._post_incident_correlation(incident) + + def _sample_and_log(self, now_perf: float, *, phase: str) -> None: + stack_text = self._capture_loop_stack_bounded() + if stack_text is None: + stack_text = "<loop thread stack unavailable>" + + sample = StallSample(taken_at_perf=now_perf, stack_text=stack_text) + if self._incident: + self._incident.samples.append(sample) + + if phase == "start": + log.warning("Event loop stall detected (gapā„%.3fs). Captured loop stack.", self.threshold) + else: + log.warning( + "Event loop still stalled (%.3fs since start).", + now_perf - (self._incident.started_at_perf if self._incident else now_perf), + ) + log.warning("%s", stack_text) + Stats.incr("triggers.blocked_main_thread") + + self._last_report_perf = now_perf + + def _capture_loop_stack_bounded(self) -> str | None: + ident = self._loop_thread_ident + if ident is None: + return None + try: + frame = sys._current_frames().get(ident) Review Comment: [nitpick] Using sys._current_frames() can expose sensitive information in stack traces. Consider adding a note in the docstring about potential security implications when logging stack traces in production environments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
