SakshamKapoor2911 commented on code in PR #67881:
URL: https://github.com/apache/airflow/pull/67881#discussion_r3337944631
##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -238,10 +238,12 @@ def sync(self) -> None:
self._check_workers()
def _read_results(self):
- while not self.result_queue.empty():
- key, state, exc = self.result_queue.get()
-
- self.change_state(key, state)
+ try:
+ while not self.result_queue.empty():
+ key, state, exc = self.result_queue.get()
+ self.change_state(key, state)
+ except (OSError, EOFError):
+ self.log.exception("Error reading from result queue")
Review Comment:
We cannot use `get_nowait()` or `get(block=False)` here because
`self.result_queue` is a `multiprocessing.SimpleQueue`, which does not support
those methods or arguments in the standard library.
However, since `result_queue` only has a single consumer process (the parent
scheduler), checking `empty()` (which calls `poll()` on the reader pipe socket
under the hood) is non-racy and safe to use. If `empty()` returns `False`, the
subsequent `get()` is guaranteed to return immediately without blocking.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]