This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch tpollard/subrebase
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 6f54dfac3436336eb7853862b16e965634cd65f7
Author: Tom Pollard <[email protected]>
AuthorDate: Wed Oct 2 11:30:10 2019 +0100

    Move sched notification poll to loop reader
---
 src/buildstream/_scheduler/scheduler.py | 30 ++++++++++++++++--------------
 1 file changed, 16 insertions(+), 14 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py 
b/src/buildstream/_scheduler/scheduler.py
index 5fad1f2..e35d34e 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -203,15 +203,16 @@ class Scheduler:
         _watcher = asyncio.get_child_watcher()
         _watcher.add_child_handler(self._casd_process.pid, 
self._abort_on_casd_failure)
 
-        # Add notification handler
-        if self._notify_back_queue:
-            self.loop.call_later(0.01, self._loop)
+        # Add notification listener if in subprocess
+        self._start_listening()
 
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for 
queue in self.queues)):
             # Run the queues
             self._sched()
             self.loop.run_forever()
+            # Stop listening for notifications
+            self._stop_listening()
             self.loop.close()
 
         # Stop watching casd
@@ -387,7 +388,7 @@ class Scheduler:
     #
     def _abort_on_casd_failure(self, pid, returncode):
         message = Message(MessageType.BUG, "buildbox-casd died while the 
pipeline was active.")
-        self._notify(Notification(NotificationType.MESSAGE, message=message))
+        self._notify_front(Notification(NotificationType.MESSAGE, 
message=message))
 
         self._casd_process.returncode = returncode
         self.terminate_jobs()
@@ -621,16 +622,17 @@ class Scheduler:
             raise ValueError("Unrecognised notification type received")
 
     def _loop(self):
-        assert self._notify_back_queue
-        # Check for and process new messages
-        while True:
-            try:
-                notification = self._notify_back_queue.get_nowait()
-                self._notification_handler(notification)
-            except queue.Empty:
-                notification = None
-                break
-        self.loop.call_later(0.01, self._loop)
+        while not self._notify_back_queue.empty():
+            notification = self._notify_back_queue.get_nowait()
+            self._notification_handler(notification)
+
+    def _start_listening(self):
+        if self._notify_back_queue:
+            self.loop.add_reader(self._notify_back_queue._reader.fileno(), 
self._loop)
+
+    def _stop_listening(self):
+        if self._notify_back_queue:
+            self.loop.remove_reader(self._notify_back_queue._reader.fileno())
 
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing

Reply via email to