ashb opened a new pull request, #66412: URL: https://github.com/apache/airflow/pull/66412
TriggerCommsDecoder used an asyncio.Lock to serialise concurrent requests, then read the response while holding the lock. When a trigger called a sync SDK method via `async_to_sync()`, that helper spun up a second event loop in a worker thread. The two event loops then raced to read from the same asyncio.StreamReader, producing "Response read out of order!" and crashing the TriggerRunner subprocess. Replace the lock-based serial approach with response multiplexing: * Each asend() registers an asyncio.Future keyed by frame.id in a _pending dict. A single _reader_loop background task reads frames one at a time and dispatches each response to the right waiter. No lock, no ordering assumption. * asyncio.StreamWriter.write() is synchronous — it buffers data into the transport without yielding. Since every path through asend() runs on the main event loop (threads submit via run_coroutine_threadsafe, foreign loops redirect via the same), and the event loop is single-threaded, two write() calls can never interleave: one completes before the next coroutine gets any CPU time. No lock needed, no atomicity concern. * sync send() from worker threads schedules asend() on the main event loop via run_coroutine_threadsafe() and blocks the calling thread. This eliminates competing event loops entirely. * sync send() from the event loop thread itself (e.g. a trigger calling a sync SDK method directly from async def run() via greenback) would deadlock with run_coroutine_threadsafe(...).result() because .result() blocks the thread the loop runs on. Detected via thread-ID comparison; greenback.await_() is used instead to teleport the coroutine back into the running loop. * init_comms() reads the initial StartTriggerer frame before starting the reader loop so the loop never races with initialisation. * arun() cancels the reader task on shutdown and checks _reader_task.done() each heartbeat so a lost supervisor connection surfaces immediately. Add a subprocess event-loop watchdog: TriggerRunnerSupervisor stamps _last_runner_comms on every message received from the subprocess. If the subprocess goes silent for longer than the new `[triggerer] runner_health_check_threshold` (default 30 s), heartbeat() skips the DB update so the scheduler sees the triggerer as unhealthy and reassigns its triggers. This detects a deadlocked or hung event loop that the process-alive check alone cannot catch. Fixes #63913, #63760, #65286, #66358, and supersedes/closes PRs #64869 and #65622. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
