Copilot commented on code in PR #67881:
URL: https://github.com/apache/airflow/pull/67881#discussion_r3337889615


##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -257,10 +259,23 @@ def end(self) -> None:
             # Send the shutdown message once for each alive worker
             if proc.is_alive():
                 self.activity_queue.put(None)
+        

Review Comment:
   Line 262 contains trailing whitespace, which will likely be flagged by the 
project's linter (pre-commit / ruff). Please strip the trailing whitespace.



##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -257,10 +259,23 @@ def end(self) -> None:
             # Send the shutdown message once for each alive worker
             if proc.is_alive():
                 self.activity_queue.put(None)
+        
+        # To prevent deadlock, we should consume results from result_queue 
while waiting for processes to join.
+        # Otherwise, a worker blocked on putting results into a full 
result_queue pipe will never exit,
+        # and an unbounded proc.join() will hang the scheduler indefinitely.
 
         for proc in self.workers.values():
             if proc.is_alive():
-                proc.join()
+                try:
+                    while proc.is_alive():
+                        self._read_results()
+                        proc.join(timeout=0.05)
+                except (KeyboardInterrupt, SystemExit):
+                    self.log.error("KeyboardInterrupt received during 
shutdown. Force terminating workers.")
+                    for p in self.workers.values():
+                        if p.is_alive():
+                            p.terminate()
+                    raise
             proc.close()

Review Comment:
   When `KeyboardInterrupt`/`SystemExit` is raised, the handler terminates all 
workers but then re-raises before any `proc.close()` is called and before the 
remaining workers are joined. Terminated processes still need to be reaped 
(`join`) before `close()` to avoid resource warnings/zombies, and 
`self.result_queue.close()` further down also won't be reached. Consider 
joining the terminated workers (with a short timeout) before re-raising, or 
moving the cleanup into a `finally` block so `close()` of workers and queues 
still occurs on interrupt.



##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -238,10 +238,12 @@ def sync(self) -> None:
         self._check_workers()
 
     def _read_results(self):
-        while not self.result_queue.empty():
-            key, state, exc = self.result_queue.get()
-
-            self.change_state(key, state)
+        try:
+            while not self.result_queue.empty():
+                key, state, exc = self.result_queue.get()
+                self.change_state(key, state)
+        except (OSError, EOFError):
+            self.log.exception("Error reading from result queue")

Review Comment:
   `multiprocessing.Queue.empty()` is documented as unreliable due to inherent 
races, and `Queue.get()` without a timeout will block indefinitely if the queue 
becomes empty between the `empty()` check and `get()`. During shutdown (when 
this is called from the join loop) that re-introduces the very deadlock this PR 
is attempting to fix. Recommend using `get_nowait()` and catching 
`queue.Empty`, instead of `empty()` + blocking `get()`.



##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -270,7 +285,11 @@ def end(self) -> None:
         self.result_queue.close()
 
     def terminate(self):
-        """Terminate the executor is not doing anything."""
+        """Forcefully terminate all worker processes under control of the 
executor."""
+        self.log.info("Terminating all LocalExecutor worker processes.")
+        for proc in self.workers.values():
+            if proc.is_alive():
+                proc.terminate()

Review Comment:
   After calling `proc.terminate()`, the process should be `join()`-ed to reap 
it; otherwise terminated children remain as zombies until the parent exits, and 
a subsequent `proc.close()` (called elsewhere in `end()`) will raise 
`ValueError` because the process is still considered active. Consider following 
each `terminate()` with a `proc.join(timeout=...)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to