This is an automated email from the ASF dual-hosted git repository. tvb pushed a commit to branch tpollard/temp in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit a4bafc125655b73a81cc2e0421f02e508b58660e Author: Tom Pollard <[email protected]> AuthorDate: Wed Oct 2 11:30:10 2019 +0100 Move sched notification poll to loop reader --- src/buildstream/_scheduler/scheduler.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 3476162..87853f0 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -203,15 +203,16 @@ class Scheduler: _watcher = asyncio.get_child_watcher() _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure) - # Add notification handler - if self._notify_back_queue: - self.loop.call_later(0.01, self._loop) + # Add notification listener if in subprocess + self._start_listening() # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): # Run the queues self._sched() self.loop.run_forever() + # Stop listening for notifications + self._stop_listening() self.loop.close() # Stop watching casd @@ -387,7 +388,7 @@ class Scheduler: # def _abort_on_casd_failure(self, pid, returncode): message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.") - self._notify(Notification(NotificationType.MESSAGE, message=message)) + self._notify_front(Notification(NotificationType.MESSAGE, message=message)) self._casd_process.returncode = returncode self.terminate_jobs() @@ -621,16 +622,17 @@ class Scheduler: raise ValueError("Unrecognised notification type received") def _loop(self): - assert self._notify_back_queue - # Check for and process new messages - while True: - try: - notification = self._notify_back_queue.get_nowait() - self._notification_handler(notification) - except queue.Empty: - notification = None - break - self.loop.call_later(0.01, self._loop) + while not self._notify_back_queue.empty(): + notification = self._notify_back_queue.get_nowait() + self._notification_handler(notification) + + def _start_listening(self): + if self._notify_back_queue: + self.loop.add_reader(self._notify_back_queue._reader.fileno(), self._loop) + + def _stop_listening(self): + if self._notify_back_queue: + self.loop.remove_reader(self._notify_back_queue._reader.fileno()) def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing
