sunwiz-sherwin opened a new issue, #67870:
URL: https://github.com/apache/airflow/issues/67870
### Apache Airflow version
3.1.7
### What happened
On a single-node LocalExecutor deployment, the scheduler **process stays
alive but permanently stops heartbeating** after a forked task-execution
worker's comms pipe breaks. The scheduler does not crash or exit — so the
failure is silent: the health endpoint goes `unhealthy`, schedulable tasks sit
in `state=None` forever, and `systemd Restart=always` never fires because the
process never exits.
We have observed the scheduler wedged in this state for **~12 hours** at a
time. A manual `systemctl restart airflow-scheduler` (which force-kills the
wedged process) recovers it; it then runs a burst of work and can wedge again
later when the same condition recurs.
**Mechanism captured from journald (two things observed concurrently in the
logs — we have not confirmed which is cause vs. effect):**
1. A forked task-execution subprocess raises a `BrokenPipeError` while
sending over the supervisor comms channel:
```
BrokenPipeError: [Errno 32] Broken pipe
...SUPERVISOR_COMMS.send(...)
airflow/sdk/execution_time/supervisor.py _fork_main (line 388)
airflow/sdk/execution_time/supervisor.py _subprocess_main (line 206)
```
It also surfaces via the task runner's top-level handler (structlog
`_write` -> `BrokenPipe`):
```
airflow/sdk/execution_time/task_runner.py main() log.exception("Top
level error")
```
2. Concurrently, the scheduler main loop logs an exception on the
executor-heartbeat path. (Line numbers below are against tag 3.1.7.)
```
scheduler_job_runner.py ERROR - Exception when executing
SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File ".../jobs/scheduler_job_runner.py", line 1555, in
_run_scheduler_loop
executor.heartbeat()
File ".../executors/base_executor.py", line 365, in heartbeat
self.sync()
File ".../executors/local_executor.py", line 252, in sync
self._read_results()
File ".../executors/local_executor.py", line 257, in _read_results
key, state, exc = self.result_queue.get()
```
3. The exception drives the executor into shutdown (`LocalExecutor.end()`):
```
local_executor.py INFO - Shutting down LocalExecutor; waiting for running
tasks to finish. Signal again if you don't want to wait.
```
4. The scheduler **main thread then blocks permanently**. A thread dump
shows the main TID in kernel wchan `do_wait` (i.e. `waitpid`), waiting on child
`airflow worker -- LocalExecutor: <uuid>` processes — several of which are
already `<defunct>` (zombie) workers. `py-spy dump` could not sample a Python
stack because the thread is blocked in a C-level `waitpid`. There is no timeout
on this wait, so the hang is permanent.
**Observed state while wedged:**
- `systemctl status airflow-scheduler` -> `active (running)`; Linux process
state `Ssl`.
- `GET /api/v2/monitor/health` -> scheduler `unhealthy`;
`latest_scheduler_heartbeat` goes stale and stays stale.
- Tasks whose upstreams are all `success` remain in `state=None`
indefinitely — never queued — because there is effectively no live scheduler.
- `systemd` unit has `Restart=always`, but it never fires because the
process never exits.
**Likely root cause (line numbers verified against the source at tag
3.1.7):**
`LocalExecutor.end()`
(`airflow-core/src/airflow/executors/local_executor.py`, method spanning ~lines
261–278) sends one shutdown sentinel per worker and then, for each alive
worker, calls `proc.join()` with **no timeout**:
```python
# local_executor.py @ 3.1.7
for proc in self.workers.values():
if proc.is_alive():
proc.join() # line 273 — no timeout
proc.close()
```
A wedged/zombie worker — e.g. one whose comms pipe already broke — therefore
blocks the scheduler main thread indefinitely in `waitpid`. The shutdown is
entered because the triggering exception arrives via `sync()` (line 252) ->
`_read_results()` (line 257, the `self.result_queue.get()`) during
`executor.heartbeat()`. We are confident about the unbounded `join()` (it is
plainly visible in the source); we are **not** asserting certainty on the exact
upstream cause of the `result_queue.get()` exception — we only observe it
co-occur with the worker `BrokenPipeError`.
**Related but distinct existing issues** (same `_read_results` code path and
same "Shutting down LocalExecutor…" log line, but a *different trigger and a
different, less-severe outcome*):
- #47873 / PR #48517, #58708 (Airflow 3.1.3, Python 3.12), #64476 / PRs
#64484 #64485 (Airflow 3.1.8), #49099 (open) — in all of these the exception is
an **unpickleable httpx error** (`ServerResponseError` / `HTTPStatusError`)
from an API-server error response, and the scheduler **crashes and exits
cleanly** ("Exited execute loop"), which lets the supervisor/orchestrator
restart it. That clean exit is the behaviour we *want*; this report is the
opposite — the same shutdown path **hangs in `waitpid` forever** instead. The
merged fixes (#48517, #64484/#64485) only make specific httpx exceptions
pickleable; they do not add a timeout to `end()`'s `join()`, so they would not
address this hang.
- #65505 (open) is adjacent (supervisor / orphan-and-zombie subprocess
family) but is CeleryExecutor + HTTP-409 + closed-epoll `ValueError`, not
LocalExecutor + BrokenPipe + unbounded `waitpid`.
**Speculation (labeled as such):** This may relate to the new task-SDK
supervisor execution model (AIP-72) — the `BrokenPipeError` originates in
`SUPERVISOR_COMMS.send` inside `sdk/execution_time/supervisor.py`. We have not
confirmed whether the worker pipe breaks first and the heartbeat exception is a
downstream effect, or vice versa; we only observe them concurrently in the logs.
### What you think should happen instead
A broken worker-comms pipe (or any exception surfacing in
`LocalExecutor.sync()` / `_read_results()`) should **not** drive executor
shutdown into an unbounded `waitpid()` on dead/zombie workers. The executor
should do one of:
1. **Bound the wait in `LocalExecutor.end()`** — use
`proc.join(timeout=...)` and force-`terminate()`/`kill()` workers that don't
exit, and reap zombies, so shutdown cannot block forever; and/or
2. **Make `sync()` / `_read_results()` resilient** so a single broken worker
pipe does not cascade into a full executor shutdown — log/skip the broken
worker and continue scheduling; and/or
3. **Fail the scheduler process** (clean exit) if the executor cannot
recover, so `systemd Restart=always` (or k8s) can restart it — the same
recovery path the unpickleable-error variants (#47873/#58708/#64476) already
get today.
The current behaviour — scheduler alive-but-idle, silently stops scheduling,
health `unhealthy`, no exit, no restart — is the worst of all outcomes.
### How to reproduce
Reproduction is condition-dependent (it occurs when a forked LocalExecutor
worker's supervisor-comms pipe breaks while the scheduler is heartbeating the
executor), but the failure mechanism is deterministic given that trigger:
1. Run Airflow 3.1.7, Python 3.12, **LocalExecutor**, single node, Postgres
metadata DB.
2. Drive task execution such that a forked worker subprocess hits
`BrokenPipeError: [Errno 32] Broken pipe` in `SUPERVISOR_COMMS.send`
(`sdk/execution_time/supervisor.py` `_fork_main`, line 388) — e.g. the worker
dies/closes its comms pipe while the supervisor still tries to write to it.
3. Observe the scheduler log `Exception when executing
SchedulerJob._run_scheduler_loop` at the `executor.heartbeat()` -> `sync()`
(local_executor.py:252) -> `_read_results()` (local_executor.py:257) path, then
`Shutting down LocalExecutor; waiting for running tasks to finish.`
4. Observe the scheduler main thread block in `waitpid` (kernel wchan
`do_wait`) on the now-defunct worker(s) via the unbounded `proc.join()`
(local_executor.py:273). The process stays `active (running)` / `Ssl`,
`/api/v2/monitor/health` reports the scheduler `unhealthy` with a stale
heartbeat, and schedulable tasks remain in `state=None`.
Inspection commands that confirm the state: `ps -o pid,stat,wchan,cmd` on
the scheduler + its `airflow worker -- LocalExecutor` children (shows `do_wait`
and `<defunct>`); `py-spy dump` (cannot sample — blocked in C-level `waitpid`);
`GET /api/v2/monitor/health`.
### Operating System
Ubuntu (GCP VM)
### Versions of Apache Airflow Providers
```
apache-airflow-providers-fab==3.2.0 # also reproduced on 2.3.1
apache-airflow-providers-standard # at 3.1.7 constraints version
apache-airflow-providers-common-sql # at 3.1.7 constraints version
apache-airflow-providers-common-compat # at 3.1.7 constraints version
```
### Deployment
Virtualenv installation
### Deployment details
- Single-node, systemd-managed: `airflow-api-server`, `airflow-scheduler`,
`airflow-dag-processor`, `airflow-triggerer`.
- Executor: **LocalExecutor**.
- Metadata DB: PostgreSQL.
- Python 3.12.
- `airflow-scheduler` systemd unit has `Restart=always` (never fires here —
the process never exits).
**Ruled out:**
- **Not OOM** — the VM had ~24 GiB free at the time; the only OOM kills in
`dmesg` were old and unrelated (desktop `pipewire`).
- **Not the api-server** — a separate earlier crash was an
`apache-airflow-providers-fab` 2.3.1 vs core 3.1.7 mismatch in
`DagAccessEntity.HITL_DETAIL` handling causing api 500s, fixed by upgrading fab
to 3.2.0. The scheduler hang persists independently of that and is specifically
about LocalExecutor worker-pipe handling.
### Anything else
- Frequency: recurs whenever the triggering worker-comms BrokenPipe
condition happens; once wedged it stays wedged indefinitely (observed ~12h)
until manually restarted.
- Our current mitigation is a systemd timer that probes
`/api/v2/monitor/health` every 90s and restarts the scheduler after 2
consecutive `unhealthy` reads. This is a band-aid; a real fix belongs in
`LocalExecutor.end()` (bounded join + terminate/reap) and/or
`sync()`/`_read_results()` resilience.
- Source verified at tag 3.1.7:
`airflow-core/src/airflow/executors/local_executor.py` — `end()` performs an
unbounded `proc.join()` (line 273); `sync()` (line 252) -> `_read_results()`
(line 257). Scheduler heartbeat call at `scheduler_job_runner.py:1555`; the
`Exception when executing SchedulerJob._run_scheduler_loop` log at
`scheduler_job_runner.py:1565`; `base_executor.heartbeat()` -> `self.sync()` at
`base_executor.py:365`.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]