ashb opened a new pull request, #66412:
URL: https://github.com/apache/airflow/pull/66412

   TriggerCommsDecoder used an asyncio.Lock to serialise concurrent requests, 
then read the response while holding the lock.  When a trigger called a sync 
SDK method via `async_to_sync()`, that helper spun up a second event loop in a 
worker thread.  The two event loops then raced to read from the same 
asyncio.StreamReader, producing "Response read out of order!" and crashing the 
TriggerRunner subprocess.
   
   Replace the lock-based serial approach with response multiplexing:
   
   * Each asend() registers an asyncio.Future keyed by frame.id in a _pending 
dict.  A single _reader_loop background task reads frames one at a time and 
dispatches each response to the right waiter. No lock, no ordering assumption.
   
   * asyncio.StreamWriter.write() is synchronous — it buffers data into the 
transport without yielding. Since every path through asend() runs on the main 
event loop (threads submit via run_coroutine_threadsafe, foreign loops redirect 
via the same), and the event loop is single-threaded, two write() calls can 
never interleave: one completes before the next coroutine gets any CPU time. No 
lock needed, no atomicity concern.
   
   * sync send() from worker threads schedules asend() on the main event loop 
via run_coroutine_threadsafe() and blocks the calling thread. This eliminates 
competing event loops entirely.
   
   * sync send() from the event loop thread itself (e.g. a trigger calling a 
sync SDK method directly from async def run() via greenback) would deadlock 
with run_coroutine_threadsafe(...).result() because .result() blocks the thread 
the loop runs on.  Detected via thread-ID comparison; greenback.await_() is 
used instead to teleport the coroutine back into the running loop.
   
   * init_comms() reads the initial StartTriggerer frame before starting the 
reader loop so the loop never races with initialisation.
   
   * arun() cancels the reader task on shutdown and checks _reader_task.done() 
each heartbeat so a lost supervisor connection surfaces immediately.
   
   Add a subprocess event-loop watchdog: TriggerRunnerSupervisor stamps 
_last_runner_comms on every message received from the subprocess.  If the 
subprocess goes silent for longer than the new `[triggerer] 
runner_health_check_threshold` (default 30 s), heartbeat() skips the DB update 
so the scheduler sees the triggerer as unhealthy and reassigns its
   triggers.  This detects a deadlocked or hung event loop that the 
process-alive check alone cannot catch.
   
   Fixes  #63913, #63760, #65286, #66358, and supersedes/closes PRs #64869 and 
#65622.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to