sunwiz-sherwin opened a new issue, #67870:
URL: https://github.com/apache/airflow/issues/67870

   ### Apache Airflow version
   
   3.1.7
   
   ### What happened
   
   On a single-node LocalExecutor deployment, the scheduler **process stays 
alive but permanently stops heartbeating** after a forked task-execution 
worker's comms pipe breaks. The scheduler does not crash or exit — so the 
failure is silent: the health endpoint goes `unhealthy`, schedulable tasks sit 
in `state=None` forever, and `systemd Restart=always` never fires because the 
process never exits.
   
   We have observed the scheduler wedged in this state for **~12 hours** at a 
time. A manual `systemctl restart airflow-scheduler` (which force-kills the 
wedged process) recovers it; it then runs a burst of work and can wedge again 
later when the same condition recurs.
   
   **Mechanism captured from journald (two things observed concurrently in the 
logs — we have not confirmed which is cause vs. effect):**
   
   1. A forked task-execution subprocess raises a `BrokenPipeError` while 
sending over the supervisor comms channel:
   
      ```
      BrokenPipeError: [Errno 32] Broken pipe
        ...SUPERVISOR_COMMS.send(...)
        airflow/sdk/execution_time/supervisor.py  _fork_main      (line 388)
        airflow/sdk/execution_time/supervisor.py  _subprocess_main (line 206)
      ```
   
      It also surfaces via the task runner's top-level handler (structlog 
`_write` -> `BrokenPipe`):
   
      ```
      airflow/sdk/execution_time/task_runner.py  main()  log.exception("Top 
level error")
      ```
   
   2. Concurrently, the scheduler main loop logs an exception on the 
executor-heartbeat path. (Line numbers below are against tag 3.1.7.)
   
      ```
      scheduler_job_runner.py ERROR - Exception when executing 
SchedulerJob._run_scheduler_loop
      Traceback (most recent call last):
        File ".../jobs/scheduler_job_runner.py", line 1555, in 
_run_scheduler_loop
          executor.heartbeat()
        File ".../executors/base_executor.py", line 365, in heartbeat
          self.sync()
        File ".../executors/local_executor.py", line 252, in sync
          self._read_results()
        File ".../executors/local_executor.py", line 257, in _read_results
          key, state, exc = self.result_queue.get()
      ```
   
   3. The exception drives the executor into shutdown (`LocalExecutor.end()`):
   
      ```
      local_executor.py INFO - Shutting down LocalExecutor; waiting for running 
tasks to finish. Signal again if you don't want to wait.
      ```
   
   4. The scheduler **main thread then blocks permanently**. A thread dump 
shows the main TID in kernel wchan `do_wait` (i.e. `waitpid`), waiting on child 
`airflow worker -- LocalExecutor: <uuid>` processes — several of which are 
already `<defunct>` (zombie) workers. `py-spy dump` could not sample a Python 
stack because the thread is blocked in a C-level `waitpid`. There is no timeout 
on this wait, so the hang is permanent.
   
   **Observed state while wedged:**
   - `systemctl status airflow-scheduler` -> `active (running)`; Linux process 
state `Ssl`.
   - `GET /api/v2/monitor/health` -> scheduler `unhealthy`; 
`latest_scheduler_heartbeat` goes stale and stays stale.
   - Tasks whose upstreams are all `success` remain in `state=None` 
indefinitely — never queued — because there is effectively no live scheduler.
   - `systemd` unit has `Restart=always`, but it never fires because the 
process never exits.
   
   **Likely root cause (line numbers verified against the source at tag 
3.1.7):**
   `LocalExecutor.end()` 
(`airflow-core/src/airflow/executors/local_executor.py`, method spanning ~lines 
261–278) sends one shutdown sentinel per worker and then, for each alive 
worker, calls `proc.join()` with **no timeout**:
   
   ```python
   # local_executor.py @ 3.1.7
   for proc in self.workers.values():
       if proc.is_alive():
           proc.join()      # line 273 — no timeout
       proc.close()
   ```
   
   A wedged/zombie worker — e.g. one whose comms pipe already broke — therefore 
blocks the scheduler main thread indefinitely in `waitpid`. The shutdown is 
entered because the triggering exception arrives via `sync()` (line 252) -> 
`_read_results()` (line 257, the `self.result_queue.get()`) during 
`executor.heartbeat()`. We are confident about the unbounded `join()` (it is 
plainly visible in the source); we are **not** asserting certainty on the exact 
upstream cause of the `result_queue.get()` exception — we only observe it 
co-occur with the worker `BrokenPipeError`.
   
   **Related but distinct existing issues** (same `_read_results` code path and 
same "Shutting down LocalExecutor…" log line, but a *different trigger and a 
different, less-severe outcome*):
   - #47873 / PR #48517, #58708 (Airflow 3.1.3, Python 3.12), #64476 / PRs 
#64484 #64485 (Airflow 3.1.8), #49099 (open) — in all of these the exception is 
an **unpickleable httpx error** (`ServerResponseError` / `HTTPStatusError`) 
from an API-server error response, and the scheduler **crashes and exits 
cleanly** ("Exited execute loop"), which lets the supervisor/orchestrator 
restart it. That clean exit is the behaviour we *want*; this report is the 
opposite — the same shutdown path **hangs in `waitpid` forever** instead. The 
merged fixes (#48517, #64484/#64485) only make specific httpx exceptions 
pickleable; they do not add a timeout to `end()`'s `join()`, so they would not 
address this hang.
   - #65505 (open) is adjacent (supervisor / orphan-and-zombie subprocess 
family) but is CeleryExecutor + HTTP-409 + closed-epoll `ValueError`, not 
LocalExecutor + BrokenPipe + unbounded `waitpid`.
   
   **Speculation (labeled as such):** This may relate to the new task-SDK 
supervisor execution model (AIP-72) — the `BrokenPipeError` originates in 
`SUPERVISOR_COMMS.send` inside `sdk/execution_time/supervisor.py`. We have not 
confirmed whether the worker pipe breaks first and the heartbeat exception is a 
downstream effect, or vice versa; we only observe them concurrently in the logs.
   
   ### What you think should happen instead
   
   A broken worker-comms pipe (or any exception surfacing in 
`LocalExecutor.sync()` / `_read_results()`) should **not** drive executor 
shutdown into an unbounded `waitpid()` on dead/zombie workers. The executor 
should do one of:
   
   1. **Bound the wait in `LocalExecutor.end()`** — use 
`proc.join(timeout=...)` and force-`terminate()`/`kill()` workers that don't 
exit, and reap zombies, so shutdown cannot block forever; and/or
   2. **Make `sync()` / `_read_results()` resilient** so a single broken worker 
pipe does not cascade into a full executor shutdown — log/skip the broken 
worker and continue scheduling; and/or
   3. **Fail the scheduler process** (clean exit) if the executor cannot 
recover, so `systemd Restart=always` (or k8s) can restart it — the same 
recovery path the unpickleable-error variants (#47873/#58708/#64476) already 
get today.
   
   The current behaviour — scheduler alive-but-idle, silently stops scheduling, 
health `unhealthy`, no exit, no restart — is the worst of all outcomes.
   
   ### How to reproduce
   
   Reproduction is condition-dependent (it occurs when a forked LocalExecutor 
worker's supervisor-comms pipe breaks while the scheduler is heartbeating the 
executor), but the failure mechanism is deterministic given that trigger:
   
   1. Run Airflow 3.1.7, Python 3.12, **LocalExecutor**, single node, Postgres 
metadata DB.
   2. Drive task execution such that a forked worker subprocess hits 
`BrokenPipeError: [Errno 32] Broken pipe` in `SUPERVISOR_COMMS.send` 
(`sdk/execution_time/supervisor.py` `_fork_main`, line 388) — e.g. the worker 
dies/closes its comms pipe while the supervisor still tries to write to it.
   3. Observe the scheduler log `Exception when executing 
SchedulerJob._run_scheduler_loop` at the `executor.heartbeat()` -> `sync()` 
(local_executor.py:252) -> `_read_results()` (local_executor.py:257) path, then 
`Shutting down LocalExecutor; waiting for running tasks to finish.`
   4. Observe the scheduler main thread block in `waitpid` (kernel wchan 
`do_wait`) on the now-defunct worker(s) via the unbounded `proc.join()` 
(local_executor.py:273). The process stays `active (running)` / `Ssl`, 
`/api/v2/monitor/health` reports the scheduler `unhealthy` with a stale 
heartbeat, and schedulable tasks remain in `state=None`.
   
   Inspection commands that confirm the state: `ps -o pid,stat,wchan,cmd` on 
the scheduler + its `airflow worker -- LocalExecutor` children (shows `do_wait` 
and `<defunct>`); `py-spy dump` (cannot sample — blocked in C-level `waitpid`); 
`GET /api/v2/monitor/health`.
   
   ### Operating System
   
   Ubuntu (GCP VM)
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow-providers-fab==3.2.0      # also reproduced on 2.3.1
   apache-airflow-providers-standard        # at 3.1.7 constraints version
   apache-airflow-providers-common-sql      # at 3.1.7 constraints version
   apache-airflow-providers-common-compat   # at 3.1.7 constraints version
   ```
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   - Single-node, systemd-managed: `airflow-api-server`, `airflow-scheduler`, 
`airflow-dag-processor`, `airflow-triggerer`.
   - Executor: **LocalExecutor**.
   - Metadata DB: PostgreSQL.
   - Python 3.12.
   - `airflow-scheduler` systemd unit has `Restart=always` (never fires here — 
the process never exits).
   
   **Ruled out:**
   - **Not OOM** — the VM had ~24 GiB free at the time; the only OOM kills in 
`dmesg` were old and unrelated (desktop `pipewire`).
   - **Not the api-server** — a separate earlier crash was an 
`apache-airflow-providers-fab` 2.3.1 vs core 3.1.7 mismatch in 
`DagAccessEntity.HITL_DETAIL` handling causing api 500s, fixed by upgrading fab 
to 3.2.0. The scheduler hang persists independently of that and is specifically 
about LocalExecutor worker-pipe handling.
   
   ### Anything else
   
   - Frequency: recurs whenever the triggering worker-comms BrokenPipe 
condition happens; once wedged it stays wedged indefinitely (observed ~12h) 
until manually restarted.
   - Our current mitigation is a systemd timer that probes 
`/api/v2/monitor/health` every 90s and restarts the scheduler after 2 
consecutive `unhealthy` reads. This is a band-aid; a real fix belongs in 
`LocalExecutor.end()` (bounded join + terminate/reap) and/or 
`sync()`/`_read_results()` resilience.
   - Source verified at tag 3.1.7: 
`airflow-core/src/airflow/executors/local_executor.py` — `end()` performs an 
unbounded `proc.join()` (line 273); `sync()` (line 252) -> `_read_results()` 
(line 257). Scheduler heartbeat call at `scheduler_job_runner.py:1555`; the 
`Exception when executing SchedulerJob._run_scheduler_loop` log at 
`scheduler_job_runner.py:1565`; `base_executor.heartbeat()` -> `self.sync()` at 
`base_executor.py:365`.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to