my2038 opened a new issue, #67380:
URL: https://github.com/apache/airflow/issues/67380

   TriggerCommsDecoder.asend() counter increment outside async lock causes 
RuntimeError under greenback portal
   Apache Airflow version
   3.2.1
   What happened
   Any deferrable trigger that uses `asyncio.to_thread()` in its `run()` body 
crashes the
   triggerer process with:
   ```
   RuntimeError: Response read out of order! Got frame.id=2, expect_id=5
   ```
   The crash originates in `TriggerCommsDecoder._aget_response()` in
   `airflow/jobs/triggerer_job_runner.py`.
   What you expected to happen
   A deferrable trigger using `asyncio.to_thread()` for synchronous I/O — the 
pattern
   described in Airflow documentation and consistent with the Airflow 3 trigger 
contract
   — should run stably in the triggerer without corrupting the supervisor 
communication
   channel.
   How to reproduce
   Write a deferrable trigger that wraps any synchronous blocking call in
   `asyncio.to_thread`:
   ```python
   from airflow.triggers.base import BaseTrigger, TriggerEvent
   import asyncio
   
   class MyTrigger(BaseTrigger):
       def serialize(self):
           return ("my_module.MyTrigger", {})
   
       async def run(self):
           while True:
               # Any synchronous blocking call wrapped in to_thread
               result = await asyncio.to_thread(self._do_sync_work)
               if result is not None:
                   yield TriggerEvent({"status": "done", "result": result})
                   return
               await asyncio.sleep(2)
   
       def _do_sync_work(self):
           # Simulate synchronous I/O (database call, HTTP, Kafka poll, etc.)
           import time
           time.sleep(0.05)
           return None
   ```
   Deploy this trigger in a deferrable operator. Run the triggerer. Under 
Airflow 3.2.1
   the triggerer crashes with `RuntimeError: Response read out of order` within 
seconds
   to minutes, depending on polling interval and system load. The crash is
   non-deterministic in timing but reliably reproducible under sustained 
operation.
   Root cause analysis
   The crash is caused by an interaction between two pieces of Airflow's own 
code:
   `run_trigger` unconditionally installing a greenback portal, and `asend` 
placing
   `next(self.id_counter)` outside `async with self._async_lock`.
   Step 1 — greenback portal installation
   `run_trigger` installs a greenback coroutine shim on every trigger task:
   ```python
   # airflow/jobs/triggerer_job_runner.py
   async def run_trigger(self, trigger_id, trigger, timeout_after=None):
       if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower() == 
"true":
           import greenback
           await greenback.ensure_portal()   # ← installs shim on this asyncio 
Task
   
       async for event in trigger.run():    # ← iterates the trigger
           ...
   ```
   `ensure_portal` replaces the asyncio Task's coroutine with a greenback shim 
that
   intercepts every step of the task as it passes through the event loop. This 
is
   necessary to allow synchronous code inside the trigger to call `await` via
   `greenback.await_()`. The shim is active for the entire lifetime of the 
trigger.
   Step 2 — `asyncio.to_thread` creates a thread-completion Future
   Inside the trigger's `run()`:
   ```python
   result = await asyncio.to_thread(self._do_sync_work)
   ```
   `asyncio.to_thread` submits `_do_sync_work` to the default 
`ThreadPoolExecutor`.
   When the OS thread completes, it calls `loop.call_soon_threadsafe(callback)` 
to
   schedule the Future's done callback on the event loop.
   Step 3 — greenback delivers the callback while `asend` is suspended
   `TriggerCommsDecoder.asend` is the method that sends messages from the 
trigger runner
   to its supervisor over a socket. It is called by `arun()` →
   `sync_state_to_supervisor()` during normal trigger lifecycle management 
(heartbeats,
   log lines, state updates).
   The current implementation:
   ```python
   async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
       frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())  
# ← (A)
       bytes = frame.as_bytes()
   
       async with self._async_lock:                        # ← (B)
           self._async_writer.write(bytes)
           return await self._aget_response(frame.id)      # ← (C) suspended 
here
   ```
   The sequence that triggers the bug:
   `asend` Call 1 executes line (A): `next(self.id_counter)` → id=2. Counter
   is now at 2.
   Call 1 acquires `_async_lock` at (B), writes frame id=2, and suspends at (C)
   while waiting for the supervisor's response.
   The greenback shim intercepts the `to_thread` Future's completion callback 
and
   delivers it into the trigger Task's greenlet context — injecting execution 
while
   Call 1 is suspended at (C).
   This re-enters `arun()` → `sync_state_to_supervisor()` → `asend()`.
   `asend` Call 2 executes line (A): `next(self.id_counter)` → id=3. Counter
   now at 3. Call 1's frame id=2 has not yet received a response.
   Call 2 tries to acquire `_async_lock` — blocks, because Call 1 holds it.
   More callbacks arrive; Calls 3, 4, 5 enter, each advancing the counter to
   4, 5, 6.
   Eventually Call 1 receives its response (for id=2) and releases the lock.
   Call 2 acquires the lock, writes frame id=3, suspends at (C).
   The supervisor responds to id=3. Call 2 calls `_aget_response(3)`.
   But meanwhile Call 5 (id=6) has acquired the lock and written frame id=6.
   The supervisor's next response is for id=6, not id=3.
   ```python
   async def _aget_response(self, expect_id: int) -> ToTriggerRunner | None:
       frame = await self._aread_frame()
       if frame.id != expect_id:
           raise RuntimeError(
               f"Response read out of order! Got {frame.id=}, {expect_id=}"
           )
   ```
   Crash: `RuntimeError: Response read out of order! Got frame.id=6, 
expect_id=3`.
   Why this is a re-entrancy bug, not a race condition
   This is not a threading race condition. Python's asyncio event loop is
   single-threaded; under normal cooperative scheduling, nothing can execute 
between
   line (A) and line (B) because there is no `await` between them. The 
`asyncio.Lock`
   would be sufficient protection in a normal asyncio program.
   Greenback breaks the no-interruption guarantee. Its coroutine shim can 
deliver
   thread-completion callbacks into the Task's execution context between any two
   coroutine steps — including the step between line (A) and the lock 
acquisition at
   (B). This makes `asend` effectively re-entrant from the perspective of the 
counter
   and socket state, even though only one OS thread is involved.
   The correct analogy is a signal handler re-entrancy bug: a signal fires
   mid-function, the signal handler calls the same function again, and two live
   invocations share mutable state (`self.id_counter`) without mutual exclusion.
   Critically: both halves of the bug are in Airflow's own code. `run_trigger`
   unconditionally installs the greenback portal. `asend` places the counter 
increment
   outside the lock. A user-written trigger using `asyncio.to_thread` — which 
is the
   documented, correct pattern for synchronous I/O in deferrable triggers — has 
no
   means to protect itself from this interaction.
   Proposed fix
   Move `next(self.id_counter)` inside `async with self._async_lock`:
   ```python
   async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
       async with self._async_lock:
           frame = _RequestFrame(id=next(self.id_counter), 
body=msg.model_dump())
           self._async_writer.write(frame.as_bytes())
           return await self._aget_response(frame.id)
   ```
   Why this is safe:
   `_aget_response` calls only `_aread_frame` (a socket read). It does not 
attempt to
   acquire `_async_lock`. No deadlock risk.
   The lock already serialises all socket writes. Including the counter 
increment
   inside it restores the original design intent: counter increment, frame 
write, and
   response wait are a single atomic unit under the lock.
   Under greenback, a re-entrant `asend` invocation now blocks on the lock 
until the
   previous invocation has fully completed (counter incremented, frame written,
   response received). The counter cannot advance out of step with the socket.
   Under normal asyncio (no greenback, no re-entrance), behaviour is identical 
to
   before — the lock is uncontended and acquired immediately.
   Why the `AIRFLOW_DISABLE_GREENBACK_PORTAL` escape hatch is not an adequate
   workaround:
   Disabling the greenback portal at triggerer-process scope has unverified 
side effects
   on other Airflow internals that depend on the portal for sync-from-async 
bridging
   (Task SDK calls from synchronous trigger setup code, observability 
integrations).
   The escape hatch is process-wide, not per-trigger. The code fix is the 
correct
   resolution.
   Additional context
   Verified against Airflow 3.2.1 source:
   `airflow/jobs/triggerer_job_runner.py` lines 812–859 (`TriggerCommsDecoder`),
   lines 1204–1209 (`run_trigger` greenback installation).
   `asyncio.to_thread` and `loop.run_in_executor` are equivalent in this 
context —
   both submit work to a thread pool and return a Future whose completion 
callback is
   delivered via `loop.call_soon_threadsafe`. Both exhibit the crash.
   `confluent_kafka.aio.AIOConsumer` also uses `loop.run_in_executor` internally
   (`_call` method) and exhibits the same crash profile. Any thread-pool-based 
async
   wrapper over a synchronous library is affected.
   The crash is non-deterministic in exact timing but reliable under sustained
   operation. It depends on the thread completing and the callback being ready 
to
   deliver precisely while `asend` is suspended inside `_aget_response`. Under 
load
   or with longer blocking calls this window widens and the crash frequency 
increases.
   Environment
        
   Airflow version      3.2.1
   Python version       3.11
   Executor     LocalExecutor
   Deployment   Vanilla Airflow — official Docker image (`apache/airflow:3.2.1`)
   OS   Linux (Docker container)
   Install method       Official Docker image, no third-party distribution
   The reproduction was performed against the official `apache/airflow:3.2.1` 
Docker
   image with no Astronomer or other vendor-specific modifications. This 
confirms the
   bug is in core Airflow and not specific to any managed deployment platform.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to