This is an automated email from the ASF dual-hosted git repository. not-in-ldap pushed a commit to branch phil/ui-split-refactor in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit e39c94064de749d31bf41987fccaa19fc129a975 Author: Phil Dawson <[email protected]> AuthorDate: Thu Jun 20 10:18:49 2019 +0100 Send scheduler notifications over a multiprocessing queue --- src/buildstream/_scheduler/scheduler.py | 17 ++++++++++------- src/buildstream/_stream.py | 4 +++- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 9e120e4..14ecf30 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -98,7 +98,7 @@ class Notification: class Scheduler(): def __init__(self, context, - start_time, state, notification_handler, + start_time, state, notification_queue, interrupt_callback=None, ticker_callback=None, interactive_failure=False): @@ -131,8 +131,8 @@ class Scheduler(): self._cleanup_scheduled = False # Whether we have a cleanup job scheduled self._cleanup_running = None # A running CleanupJob, or None - # Callback to send notifications to report back to the Scheduler's owner - self.notify = notification_handler + # Message to send notifications back to the Scheduler's owner + self._notification_queue = notification_queue # Whether our exclusive jobs, like 'cleanup' are currently already # waiting or active. @@ -305,7 +305,7 @@ class Scheduler(): job_status=status, failed_element=job.element_job, element=element) - self.notify(notification) + self._notify(notification) if process_jobs: # Now check for more jobs @@ -369,7 +369,7 @@ class Scheduler(): full_name=job.name, job_action=job.action_name, elapsed_time=self.elapsed_time()) - self.notify(notification) + self._notify(notification) job.start() # Callback for the cache size job @@ -589,7 +589,7 @@ class Scheduler(): return notification = Notification(NotificationType.INTERRUPT) - self.notify(notification) + self._notify(notification) # _terminate_event(): # @@ -649,9 +649,12 @@ class Scheduler(): # Regular timeout for driving status in the UI def _tick(self): notification = Notification(NotificationType.TICK) - self.notify(notification) + self._notify(notification) self.loop.call_later(1, self._tick) + def _notify(self, notification): + self._notification_queue.put(notification) + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 45bb41b..2d451c6 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -21,6 +21,7 @@ import itertools import functools +import multiprocessing as mp import os import sys import stat @@ -76,6 +77,7 @@ class Stream(): # # Private members # + self._notification_queue = mp.Queue() self._context = context self._artifacts = None self._sourcecache = None @@ -85,7 +87,7 @@ class Stream(): context.messenger.set_state(self._state) - self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler, + self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue, interrupt_callback=interrupt_callback, ticker_callback=ticker_callback, interactive_failure=interactive_failure)
