This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 51a1b20f375 Fix ASYNC110 violation in TriggerRunner (#61760)
51a1b20f375 is described below

commit 51a1b20f375b966b9c23bf7efe8a1b7db267222f
Author: Eason09053360 <[email protected]>
AuthorDate: Mon Feb 16 06:26:39 2026 +0800

    Fix ASYNC110 violation in TriggerRunner (#61760)
---
 airflow-core/src/airflow/jobs/triggerer_job_runner.py | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index 57660c6820e..c5ac4e41f6e 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -32,6 +32,7 @@ from socket import socket
 from traceback import format_exception
 from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Literal, TypedDict
 
+import anyio
 import attrs
 import structlog
 from pydantic import BaseModel, Field, TypeAdapter
@@ -862,6 +863,7 @@ class TriggerRunner:
 
     # Should-we-stop flag
     stop: bool = False
+    _stop_event: anyio.Event | None = None
 
     # TODO: connect this to the parent process
     log: FilteringBoundLogger = structlog.get_logger()
@@ -877,10 +879,13 @@ class TriggerRunner:
         self.events = deque()
         self.failed_triggers = deque()
         self.job_id = None
+        self._stop_event = None
 
     def _handle_signal(self, signum, frame) -> None:
         """Handle termination signals gracefully."""
         self.stop = True
+        if self._stop_event is not None:
+            self._stop_event.set()
 
     def run(self):
         """Sync entrypoint - just run arun in an async loop."""
@@ -898,6 +903,7 @@ class TriggerRunner:
         await self.init_comms()
 
         watchdog = asyncio.create_task(self.block_watchdog())
+        stop_event = self._stop_event = anyio.Event()
 
         last_status = time.monotonic()
         try:
@@ -913,8 +919,9 @@ class TriggerRunner:
                 await self.sync_state_to_supervisor(finished_ids)
                 await self.create_triggers()
                 await self.cancel_triggers()
-                # Sleep for a bit
-                await asyncio.sleep(1)
+                # Sleep for a bit, or exit early if stop is requested.
+                with anyio.move_on_after(1):
+                    await stop_event.wait()
                 # Every minute, log status
                 if (now := time.monotonic()) - last_status >= 60:
                     watchers = len([trigger for trigger in 
self.triggers.values() if trigger["is_watcher"]])

Reply via email to