The GitHub Actions job "Tests" on airflow.git/triggerer-comms-66412 has succeeded. Run started by GitHub user jedcunningham (triggered by jedcunningham).
Head commit for run: 8e97f9a6640a9494328e2940b051bb1ee6ee7a19 / Ash Berlin-Taylor <[email protected]> Fix triggerer crash when multiple triggers call sync SDK methods concurrently (#66412) TriggerCommsDecoder used an asyncio.Lock to serialise concurrent requests, then read the response while holding the lock. When a trigger called a sync SDK method wrapped with sync_to_async(), that method ran in a worker thread and internally used async_to_sync() to make async calls, spinning up a second event loop in that thread. The two event loops then raced to read from the same asyncio.StreamReader, producing "Response read out of order!" and crashing the TriggerRunner subprocess. PR #64882 attempted a threading.Lock fix but introduced a new deadlock where the triggerer heartbeat normally while processing zero triggers. Replace the lock-based serial approach with response multiplexing: * Each asend() registers an asyncio.Future keyed by frame.id in a _pending dict. A single _reader_loop background task reads frames one at a time and dispatches each response to the right waiter. No lock, no ordering assumption. * sync send() from worker threads schedules asend() on the main event loop via run_coroutine_threadsafe() and blocks the calling thread. This eliminates competing event loops entirely. * sync send() from the event loop thread itself (e.g. a trigger calling a sync SDK method directly from async def run() via greenback) would deadlock with run_coroutine_threadsafe(...).result() because .result() blocks the thread the loop runs on. Detected via thread-ID comparison; greenback.await_() is used instead to teleport the coroutine back into the running loop. * asend() called from a foreign event loop (async_to_sync inside a sync_to_async-wrapped trigger) detects the loop mismatch and bridges back to the main loop via run_coroutine_threadsafe + wrap_future so the foreign loop can still await the result. This is the root-cause code path, covered directly by test_foreign_loop_path. * init_comms() reads the initial StartTriggerer frame before starting the reader loop so the loop never races with initialisation. Add a subprocess event-loop watchdog: TriggerRunnerSupervisor stamps _last_runner_comms on every message received from the subprocess. If the subprocess goes silent for longer than the new [triggerer] runner_health_check_threshold (default 30 s), heartbeat() skips the DB update so the scheduler sees the triggerer as unhealthy and reassigns its triggers. This detects a deadlocked or hung event loop that the process-alive check alone cannot catch. (cherry picked from commit 099fd2b03f3a4e642f75e748fbce2f3d99ffbba8) Report URL: https://github.com/apache/airflow/actions/runs/25402301296 With regards, GitHub Actions via GitBox --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
