my2038 opened a new issue, #67380:
URL: https://github.com/apache/airflow/issues/67380
TriggerCommsDecoder.asend() counter increment outside async lock causes
RuntimeError under greenback portal
Apache Airflow version
3.2.1
What happened
Any deferrable trigger that uses `asyncio.to_thread()` in its `run()` body
crashes the
triggerer process with:
```
RuntimeError: Response read out of order! Got frame.id=2, expect_id=5
```
The crash originates in `TriggerCommsDecoder._aget_response()` in
`airflow/jobs/triggerer_job_runner.py`.
What you expected to happen
A deferrable trigger using `asyncio.to_thread()` for synchronous I/O — the
pattern
described in Airflow documentation and consistent with the Airflow 3 trigger
contract
— should run stably in the triggerer without corrupting the supervisor
communication
channel.
How to reproduce
Write a deferrable trigger that wraps any synchronous blocking call in
`asyncio.to_thread`:
```python
from airflow.triggers.base import BaseTrigger, TriggerEvent
import asyncio
class MyTrigger(BaseTrigger):
def serialize(self):
return ("my_module.MyTrigger", {})
async def run(self):
while True:
# Any synchronous blocking call wrapped in to_thread
result = await asyncio.to_thread(self._do_sync_work)
if result is not None:
yield TriggerEvent({"status": "done", "result": result})
return
await asyncio.sleep(2)
def _do_sync_work(self):
# Simulate synchronous I/O (database call, HTTP, Kafka poll, etc.)
import time
time.sleep(0.05)
return None
```
Deploy this trigger in a deferrable operator. Run the triggerer. Under
Airflow 3.2.1
the triggerer crashes with `RuntimeError: Response read out of order` within
seconds
to minutes, depending on polling interval and system load. The crash is
non-deterministic in timing but reliably reproducible under sustained
operation.
Root cause analysis
The crash is caused by an interaction between two pieces of Airflow's own
code:
`run_trigger` unconditionally installing a greenback portal, and `asend`
placing
`next(self.id_counter)` outside `async with self._async_lock`.
Step 1 — greenback portal installation
`run_trigger` installs a greenback coroutine shim on every trigger task:
```python
# airflow/jobs/triggerer_job_runner.py
async def run_trigger(self, trigger_id, trigger, timeout_after=None):
if not os.environ.get("AIRFLOW_DISABLE_GREENBACK_PORTAL", "").lower() ==
"true":
import greenback
await greenback.ensure_portal() # ← installs shim on this asyncio
Task
async for event in trigger.run(): # ← iterates the trigger
...
```
`ensure_portal` replaces the asyncio Task's coroutine with a greenback shim
that
intercepts every step of the task as it passes through the event loop. This
is
necessary to allow synchronous code inside the trigger to call `await` via
`greenback.await_()`. The shim is active for the entire lifetime of the
trigger.
Step 2 — `asyncio.to_thread` creates a thread-completion Future
Inside the trigger's `run()`:
```python
result = await asyncio.to_thread(self._do_sync_work)
```
`asyncio.to_thread` submits `_do_sync_work` to the default
`ThreadPoolExecutor`.
When the OS thread completes, it calls `loop.call_soon_threadsafe(callback)`
to
schedule the Future's done callback on the event loop.
Step 3 — greenback delivers the callback while `asend` is suspended
`TriggerCommsDecoder.asend` is the method that sends messages from the
trigger runner
to its supervisor over a socket. It is called by `arun()` →
`sync_state_to_supervisor()` during normal trigger lifecycle management
(heartbeats,
log lines, state updates).
The current implementation:
```python
async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
frame = _RequestFrame(id=next(self.id_counter), body=msg.model_dump())
# ← (A)
bytes = frame.as_bytes()
async with self._async_lock: # ← (B)
self._async_writer.write(bytes)
return await self._aget_response(frame.id) # ← (C) suspended
here
```
The sequence that triggers the bug:
`asend` Call 1 executes line (A): `next(self.id_counter)` → id=2. Counter
is now at 2.
Call 1 acquires `_async_lock` at (B), writes frame id=2, and suspends at (C)
while waiting for the supervisor's response.
The greenback shim intercepts the `to_thread` Future's completion callback
and
delivers it into the trigger Task's greenlet context — injecting execution
while
Call 1 is suspended at (C).
This re-enters `arun()` → `sync_state_to_supervisor()` → `asend()`.
`asend` Call 2 executes line (A): `next(self.id_counter)` → id=3. Counter
now at 3. Call 1's frame id=2 has not yet received a response.
Call 2 tries to acquire `_async_lock` — blocks, because Call 1 holds it.
More callbacks arrive; Calls 3, 4, 5 enter, each advancing the counter to
4, 5, 6.
Eventually Call 1 receives its response (for id=2) and releases the lock.
Call 2 acquires the lock, writes frame id=3, suspends at (C).
The supervisor responds to id=3. Call 2 calls `_aget_response(3)`.
But meanwhile Call 5 (id=6) has acquired the lock and written frame id=6.
The supervisor's next response is for id=6, not id=3.
```python
async def _aget_response(self, expect_id: int) -> ToTriggerRunner | None:
frame = await self._aread_frame()
if frame.id != expect_id:
raise RuntimeError(
f"Response read out of order! Got {frame.id=}, {expect_id=}"
)
```
Crash: `RuntimeError: Response read out of order! Got frame.id=6,
expect_id=3`.
Why this is a re-entrancy bug, not a race condition
This is not a threading race condition. Python's asyncio event loop is
single-threaded; under normal cooperative scheduling, nothing can execute
between
line (A) and line (B) because there is no `await` between them. The
`asyncio.Lock`
would be sufficient protection in a normal asyncio program.
Greenback breaks the no-interruption guarantee. Its coroutine shim can
deliver
thread-completion callbacks into the Task's execution context between any two
coroutine steps — including the step between line (A) and the lock
acquisition at
(B). This makes `asend` effectively re-entrant from the perspective of the
counter
and socket state, even though only one OS thread is involved.
The correct analogy is a signal handler re-entrancy bug: a signal fires
mid-function, the signal handler calls the same function again, and two live
invocations share mutable state (`self.id_counter`) without mutual exclusion.
Critically: both halves of the bug are in Airflow's own code. `run_trigger`
unconditionally installs the greenback portal. `asend` places the counter
increment
outside the lock. A user-written trigger using `asyncio.to_thread` — which
is the
documented, correct pattern for synchronous I/O in deferrable triggers — has
no
means to protect itself from this interaction.
Proposed fix
Move `next(self.id_counter)` inside `async with self._async_lock`:
```python
async def asend(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
async with self._async_lock:
frame = _RequestFrame(id=next(self.id_counter),
body=msg.model_dump())
self._async_writer.write(frame.as_bytes())
return await self._aget_response(frame.id)
```
Why this is safe:
`_aget_response` calls only `_aread_frame` (a socket read). It does not
attempt to
acquire `_async_lock`. No deadlock risk.
The lock already serialises all socket writes. Including the counter
increment
inside it restores the original design intent: counter increment, frame
write, and
response wait are a single atomic unit under the lock.
Under greenback, a re-entrant `asend` invocation now blocks on the lock
until the
previous invocation has fully completed (counter incremented, frame written,
response received). The counter cannot advance out of step with the socket.
Under normal asyncio (no greenback, no re-entrance), behaviour is identical
to
before — the lock is uncontended and acquired immediately.
Why the `AIRFLOW_DISABLE_GREENBACK_PORTAL` escape hatch is not an adequate
workaround:
Disabling the greenback portal at triggerer-process scope has unverified
side effects
on other Airflow internals that depend on the portal for sync-from-async
bridging
(Task SDK calls from synchronous trigger setup code, observability
integrations).
The escape hatch is process-wide, not per-trigger. The code fix is the
correct
resolution.
Additional context
Verified against Airflow 3.2.1 source:
`airflow/jobs/triggerer_job_runner.py` lines 812–859 (`TriggerCommsDecoder`),
lines 1204–1209 (`run_trigger` greenback installation).
`asyncio.to_thread` and `loop.run_in_executor` are equivalent in this
context —
both submit work to a thread pool and return a Future whose completion
callback is
delivered via `loop.call_soon_threadsafe`. Both exhibit the crash.
`confluent_kafka.aio.AIOConsumer` also uses `loop.run_in_executor` internally
(`_call` method) and exhibits the same crash profile. Any thread-pool-based
async
wrapper over a synchronous library is affected.
The crash is non-deterministic in exact timing but reliable under sustained
operation. It depends on the thread completing and the callback being ready
to
deliver precisely while `asend` is suspended inside `_aget_response`. Under
load
or with longer blocking calls this window widens and the crash frequency
increases.
Environment
Airflow version 3.2.1
Python version 3.11
Executor LocalExecutor
Deployment Vanilla Airflow — official Docker image (`apache/airflow:3.2.1`)
OS Linux (Docker container)
Install method Official Docker image, no third-party distribution
The reproduction was performed against the official `apache/airflow:3.2.1`
Docker
image with no Astronomer or other vendor-specific modifications. This
confirms the
bug is in core Airflow and not specific to any managed deployment platform.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]