Copilot commented on code in PR #67881:
URL: https://github.com/apache/airflow/pull/67881#discussion_r3337889615
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -257,10 +259,23 @@ def end(self) -> None:
# Send the shutdown message once for each alive worker
if proc.is_alive():
self.activity_queue.put(None)
+
Review Comment:
Line 262 contains trailing whitespace, which will likely be flagged by the
project's linter (pre-commit / ruff). Please strip the trailing whitespace.
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -257,10 +259,23 @@ def end(self) -> None:
# Send the shutdown message once for each alive worker
if proc.is_alive():
self.activity_queue.put(None)
+
+ # To prevent deadlock, we should consume results from result_queue
while waiting for processes to join.
+ # Otherwise, a worker blocked on putting results into a full
result_queue pipe will never exit,
+ # and an unbounded proc.join() will hang the scheduler indefinitely.
for proc in self.workers.values():
if proc.is_alive():
- proc.join()
+ try:
+ while proc.is_alive():
+ self._read_results()
+ proc.join(timeout=0.05)
+ except (KeyboardInterrupt, SystemExit):
+ self.log.error("KeyboardInterrupt received during
shutdown. Force terminating workers.")
+ for p in self.workers.values():
+ if p.is_alive():
+ p.terminate()
+ raise
proc.close()
Review Comment:
When `KeyboardInterrupt`/`SystemExit` is raised, the handler terminates all
workers but then re-raises before any `proc.close()` is called and before the
remaining workers are joined. Terminated processes still need to be reaped
(`join`) before `close()` to avoid resource warnings/zombies, and
`self.result_queue.close()` further down also won't be reached. Consider
joining the terminated workers (with a short timeout) before re-raising, or
moving the cleanup into a `finally` block so `close()` of workers and queues
still occurs on interrupt.
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -238,10 +238,12 @@ def sync(self) -> None:
self._check_workers()
def _read_results(self):
- while not self.result_queue.empty():
- key, state, exc = self.result_queue.get()
-
- self.change_state(key, state)
+ try:
+ while not self.result_queue.empty():
+ key, state, exc = self.result_queue.get()
+ self.change_state(key, state)
+ except (OSError, EOFError):
+ self.log.exception("Error reading from result queue")
Review Comment:
`multiprocessing.Queue.empty()` is documented as unreliable due to inherent
races, and `Queue.get()` without a timeout will block indefinitely if the queue
becomes empty between the `empty()` check and `get()`. During shutdown (when
this is called from the join loop) that re-introduces the very deadlock this PR
is attempting to fix. Recommend using `get_nowait()` and catching
`queue.Empty`, instead of `empty()` + blocking `get()`.
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -270,7 +285,11 @@ def end(self) -> None:
self.result_queue.close()
def terminate(self):
- """Terminate the executor is not doing anything."""
+ """Forcefully terminate all worker processes under control of the
executor."""
+ self.log.info("Terminating all LocalExecutor worker processes.")
+ for proc in self.workers.values():
+ if proc.is_alive():
+ proc.terminate()
Review Comment:
After calling `proc.terminate()`, the process should be `join()`-ed to reap
it; otherwise terminated children remain as zombies until the parent exits, and
a subsequent `proc.close()` (called elsewhere in `end()`) will raise
`ValueError` because the process is still considered active. Consider following
each `terminate()` with a `proc.join(timeout=...)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]