ashb commented on code in PR #54666: URL: https://github.com/apache/airflow/pull/54666#discussion_r2301995676
########## airflow-core/src/airflow/jobs/asyncio_monitor.py: ########## @@ -0,0 +1,244 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import asyncio +import contextlib +import dataclasses +import logging +import sys +import threading +import time +import traceback +from collections import deque +from time import perf_counter +from types import FrameType +from typing import TYPE_CHECKING + +import structlog + +from airflow._shared.timezones import timezone +from airflow.stats import Stats + +if TYPE_CHECKING: + from structlog.typing import FilteringBoundLogger + +logger = logging.getLogger(__name__) + + +log: FilteringBoundLogger = structlog.get_logger(logger_name=__name__) + + [email protected] +class StallSample: + """Single stack capture during a stall.""" + + taken_at_perf: float + stack_text: str + + [email protected] +class StallIncident: + """A coalesced stall incident with start/end, samples, and suspected tasks.""" + + started_at_perf: float + ended_at_perf: float | None + threshold: float + samples: list[StallSample] + suspected_tasks: list[str] # filled on correlation + # Human-readable timestamps for convenience + started_at_utc: str = dataclasses.field(default="") + ended_at_utc: str = dataclasses.field(default="") + + def duration(self) -> float | None: + return None if self.ended_at_perf is None else (self.ended_at_perf - self.started_at_perf) + + +def _utc_now_str() -> str: + return timezone.utcnow().isoformat(timespec="seconds") + + +def _format_stack_from_frame(frame: FrameType, max_frames: int) -> str: + # Limit frames from the BOTTOM (most recent last) to keep prints short. + limit: int | None = max_frames if max_frames > 0 else None + return "".join(traceback.format_stack(frame, limit=limit)) + + +class AsyncioStallMonitor: + """ + Hybrid stall monitor. + + Detection path: + - In-loop heartbeat updates a shared timestamp. + - Background thread checks wall time vs. last heartbeat; if exceeds threshold, + we enter "stall" state, sample loop thread stack, and log. + - While stalled, we periodically re-sample (min_report_interval). + - When the loop catches up (heartbeat moves), we close the incident and + correlate with asyncio tasks inside the loop thread. + """ + + def __init__( + self, + loop: asyncio.AbstractEventLoop | None = None, + *, + threshold: float = 0.2, # seconds without heartbeat => stall + heartbeat_interval: float = 0.1, # loop posts heartbeat this often + min_report_interval: float = 0.5, # during a long stall, log at most this often + max_frames: int = 25, # limit stack frames captured + history_size: int = 1, # TODO: increase and make history more useful in the future + ) -> None: + self.loop = loop or asyncio.get_event_loop() Review Comment: `get_event_loop()` will raise an warning on Python 3.12 if there isn't a running loop, where `get_running_loop()` will raise an error, so this is better ```suggestion self.loop = loop or asyncio.get_running_loop() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
