This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 51a1b20f375 Fix ASYNC110 violation in TriggerRunner (#61760)
51a1b20f375 is described below
commit 51a1b20f375b966b9c23bf7efe8a1b7db267222f
Author: Eason09053360 <[email protected]>
AuthorDate: Mon Feb 16 06:26:39 2026 +0800
Fix ASYNC110 violation in TriggerRunner (#61760)
---
airflow-core/src/airflow/jobs/triggerer_job_runner.py | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index 57660c6820e..c5ac4e41f6e 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -32,6 +32,7 @@ from socket import socket
from traceback import format_exception
from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, TypedDict
+import anyio
import attrs
import structlog
from pydantic import BaseModel, Field, TypeAdapter
@@ -862,6 +863,7 @@ class TriggerRunner:
# Should-we-stop flag
stop: bool = False
+ _stop_event: anyio.Event | None = None
# TODO: connect this to the parent process
log: FilteringBoundLogger = structlog.get_logger()
@@ -877,10 +879,13 @@ class TriggerRunner:
self.events = deque()
self.failed_triggers = deque()
self.job_id = None
+ self._stop_event = None
def _handle_signal(self, signum, frame) -> None:
"""Handle termination signals gracefully."""
self.stop = True
+ if self._stop_event is not None:
+ self._stop_event.set()
def run(self):
"""Sync entrypoint - just run arun in an async loop."""
@@ -898,6 +903,7 @@ class TriggerRunner:
await self.init_comms()
watchdog = asyncio.create_task(self.block_watchdog())
+ stop_event = self._stop_event = anyio.Event()
last_status = time.monotonic()
try:
@@ -913,8 +919,9 @@ class TriggerRunner:
await self.sync_state_to_supervisor(finished_ids)
await self.create_triggers()
await self.cancel_triggers()
- # Sleep for a bit
- await asyncio.sleep(1)
+ # Sleep for a bit, or exit early if stop is requested.
+ with anyio.move_on_after(1):
+ await stop_event.wait()
# Every minute, log status
if (now := time.monotonic()) - last_status >= 60:
watchers = len([trigger for trigger in
self.triggers.values() if trigger["is_watcher"]])