SakshamKapoor2911 commented on code in PR #67881:
URL: https://github.com/apache/airflow/pull/67881#discussion_r3337944631


##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -238,10 +238,12 @@ def sync(self) -> None:
         self._check_workers()
 
     def _read_results(self):
-        while not self.result_queue.empty():
-            key, state, exc = self.result_queue.get()
-
-            self.change_state(key, state)
+        try:
+            while not self.result_queue.empty():
+                key, state, exc = self.result_queue.get()
+                self.change_state(key, state)
+        except (OSError, EOFError):
+            self.log.exception("Error reading from result queue")

Review Comment:
   We cannot use `get_nowait()` or `get(block=False)` here because 
`self.result_queue` is a `multiprocessing.SimpleQueue`, which does not support 
those methods or arguments in the standard library. 
   
   However, since `result_queue` only has a single consumer process (the parent 
scheduler), checking `empty()` (which calls `poll()` on the reader pipe socket 
under the hood) is non-racy and safe to use. If `empty()` returns `False`, the 
subsequent `get()` is guaranteed to return immediately without blocking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to