jscheffl commented on code in PR #66412:
URL: https://github.com/apache/airflow/pull/66412#discussion_r3191317535
##########
airflow-core/newsfragments/66412.significant.rst:
##########
@@ -0,0 +1,26 @@
+Fix triggerer race condition and deadlock that caused deferred tasks to stall
indefinitely
+
+Triggers that call synchronous SDK methods (e.g. ``get_task_states`` used by
+``safe_to_cancel`` in several Google provider operators) could crash the
triggerer's
+internal subprocess. The triggerer would then continue to heartbeat normally —
+appearing healthy to the scheduler — while silently processing zero triggers,
causing
+every deferred task to time out. This was first reported in
:github-issue:`64620`; a
+partial fix shipped in Airflow 3.2.1 (:github-pr:`64882`) but introduced a new
deadlock
+with the same visible symptom under load.
+
+Both issues are fixed by replacing the lock-based serialisation with response
+multiplexing: each request now carries a unique ID and the response is routed
back to
+the correct caller, so concurrent requests from trigger threads no longer
contend or
+deadlock regardless of how many triggers are running or what SDK methods they
call.
+
+**New: triggerer subprocess watchdog**
+
+Even with the race fixed, a trigger that blocks the event loop (e.g. by calling
+``time.sleep()`` or performing blocking I/O directly in ``async def run()``)
would
+previously leave the triggerer appearing healthy indefinitely.
+
+A new ``[triggerer] runner_health_check_threshold`` config option (default: 30
seconds)
Review Comment:
30 seconds sounds short. While we use a lot of deferred KPO in production we
see asyncio warnings regularly on ALL triggerers (currently each 64 tasks only
to be conservative and 45 instances) which lock the triggerer loop from 2-250
seconds.
We had no time until now to investigate the root cause but hope partly that
this PR might help fixing this.
##########
airflow-core/newsfragments/66412.significant.rst:
##########
Review Comment:
Not sure why it is so popular recently... but this is a bugfix, why do we
need a newsfragment for this? Does a user need to make specific things while
upgrading? No / Any breaking change? No.
I think we can leave the note or shorten it if only needed for new new
config option.
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -951,46 +977,80 @@ class TriggerCommsDecoder(CommsDecoder[ToTriggerRunner,
ToTriggerSupervisor]):
factory=lambda: TypeAdapter(ToTriggerRunner), repr=False
)
- def _read_frame(self):
- from asgiref.sync import async_to_sync
-
- with self._thread_lock:
- return async_to_sync(self._aread_frame)()
-
- def send(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
- from asgiref.sync import async_to_sync
-
- with self._thread_lock:
- return async_to_sync(self.asend)(msg)
+ _pending: dict[int, asyncio.Future] = attrs.field(factory=dict, repr=False)
+ _loop: asyncio.AbstractEventLoop | None = attrs.field(default=None,
repr=False)
+ _loop_thread_id: int | None = attrs.field(default=None, repr=False)
+ _reader_task: asyncio.Task | None = attrs.field(default=None, repr=False)
async def _aread_frame(self):
try:
len_bytes = await self._async_reader.readexactly(4)
except ConnectionResetError:
asyncio.current_task().cancel("Supervisor closed")
+ raise
Review Comment:
Will code execution ever reach here? Or a sanity raise ensuring that it
quits here also in failure?
##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -2689,6 +2689,17 @@ triggerer:
type: float
example: ~
default: "30"
+ runner_health_check_threshold:
+ description: |
+ If the TriggerRunner subprocess's async event loop sends no
communication to the parent
+ process for more than this many seconds, the parent stops updating the
triggerer's
+ heartbeat in the database. The triggerer then appears unhealthy to the
scheduler, which
+ will reassign its triggers. This detects a deadlocked or hung event
loop that the normal
+ process-alive check cannot catch. Set to 0 to disable the watchdog.
+ version_added: 3.2.2
+ type: float
+ example: ~
+ default: "30"
Review Comment:
As above, feel that 30 is a bit short. From our experience with KPO in our
current production would rather set it to 300 which would match the scheduler
heartbeat timeout.
```suggestion
default: "300"
```
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -638,6 +647,16 @@ def heartbeat(self):
"TriggerRunnerSupervisor.heartbeat() requires a Job; "
"subclasses without a metadata-DB Job must override this
method."
)
+ elapsed = time.monotonic() - self._last_runner_comms
+ if self.runner_health_check_threshold > 0 and elapsed >
self.runner_health_check_threshold:
+ log.error(
+ "TriggerRunner subprocess event loop appears deadlocked: no
communication received "
+ "for %.1fs (threshold: %ds). Skipping heartbeat so the
triggerer appears unhealthy "
+ "to the scheduler and its triggers are reassigned.",
+ elapsed,
+ self.runner_health_check_threshold,
+ )
+ return
Review Comment:
This is already today in the chart, see chart/templates/_helpers.yaml:767 -
so no change in chart needed. In this case the Pod is marked "liveness failed"
and K8s will terminate and start a new one.
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -951,46 +974,71 @@ class TriggerCommsDecoder(CommsDecoder[ToTriggerRunner,
ToTriggerSupervisor]):
factory=lambda: TypeAdapter(ToTriggerRunner), repr=False
)
- def _read_frame(self):
- from asgiref.sync import async_to_sync
-
- with self._thread_lock:
- return async_to_sync(self._aread_frame)()
-
- def send(self, msg: ToTriggerSupervisor) -> ToTriggerRunner | None:
- from asgiref.sync import async_to_sync
-
- with self._thread_lock:
- return async_to_sync(self.asend)(msg)
+ _pending: dict[int, asyncio.Future] = attrs.field(factory=dict, repr=False)
Review Comment:
Yes, sounds like a good idea - for "the future me" :-D
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]