This is an automated email from the ASF dual-hosted git repository. not-in-ldap pushed a commit to branch tpollard/subrebase in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 519737f25cc40814819cd1dac4ba072ae34f37ce Author: Tom Pollard <[email protected]> AuthorDate: Mon Sep 16 12:20:06 2019 +0100 Add in dual queue implementation for subprocess build. This also adapts utils.py handling of PID to account for the stream multiprocessing, and how callers assert that they're the 'main_process' or in a job. --- doc/source/hacking/coding_guidelines.rst | 2 +- src/buildstream/_messenger.py | 2 +- src/buildstream/_scheduler/scheduler.py | 42 +++++++++--- src/buildstream/_stream.py | 104 ++++++++++++++++++++++++++---- src/buildstream/_workspaces.py | 2 +- src/buildstream/element.py | 6 +- src/buildstream/sandbox/_sandboxremote.py | 2 +- src/buildstream/utils.py | 31 ++++++--- 8 files changed, 152 insertions(+), 39 deletions(-) diff --git a/doc/source/hacking/coding_guidelines.rst b/doc/source/hacking/coding_guidelines.rst index ecab241..10f76e9 100644 --- a/doc/source/hacking/coding_guidelines.rst +++ b/doc/source/hacking/coding_guidelines.rst @@ -609,7 +609,7 @@ In these cases, do **not** raise any of the ``BstError`` class exceptions. Instead, use the ``assert`` statement, e.g.:: - assert utils._is_main_process(), \ + assert not utils._is_job_process(), \ "Attempted to save workspace configuration from child process" This will result in a ``BUG`` message with the stack trace included being diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py index 03b2833..9e2269f 100644 --- a/src/buildstream/_messenger.py +++ b/src/buildstream/_messenger.py @@ -270,7 +270,7 @@ class Messenger: # we also do not allow it in the main process. assert self._log_handle is None assert self._log_filename is None - assert not utils._is_main_process() + assert utils._is_job_process() # Create the fully qualified logfile in the log directory, # appending the pid and .log extension at the end. diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 4e034c5..402ce1d 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -24,6 +24,7 @@ import asyncio from itertools import chain import signal import datetime +import queue # Local imports from .resources import Resources @@ -67,6 +68,7 @@ class NotificationType(FastEnum): RETRY = "retry" MESSAGE = "message" TASK_ERROR = "task_error" + EXCEPTION = "exception" # Notification() @@ -88,7 +90,9 @@ class Notification: time=None, element=None, message=None, - task_error=None + task_error=None, + for_scheduler=None, + exception=None ): self.notification_type = notification_type self.full_name = full_name @@ -98,6 +102,7 @@ class Notification: self.element = element self.message = message self.task_error = task_error # Tuple of domain & reason + self.exception = exception # Scheduler() @@ -121,7 +126,7 @@ class Notification: # ticker_callback: A callback call once per second # class Scheduler: - def __init__(self, context, start_time, state, notification_queue, notifier): + def __init__(self, context, start_time, state, notifier): # # Public members @@ -145,8 +150,10 @@ class Scheduler: self._state = state self._casd_process = None # handle to the casd process for monitoring purpose - # Bidirectional queue to send notifications back to the Scheduler's owner - self._notification_queue = notification_queue + # Bidirectional pipe to send notifications back to the Scheduler's owner + self._notify_front = None + self._notify_back = None + # Notifier callback to use if not running in a subprocess self._notifier = notifier self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers) @@ -190,6 +197,10 @@ class Scheduler: _watcher = asyncio.get_child_watcher() _watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure) + # Add notification handler + if self._notify_back: + self.loop.call_later(0.01, self._loop) + # Start the profiler with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)): # Run the queues @@ -573,12 +584,13 @@ class Scheduler: queue.enqueue([element]) def _notify(self, notification): - # Scheduler to Stream notifcations on right side - self._notification_queue.append(notification) - self._notifier() + # Check if we need to call the notifier callback + if self._notify_front: + self._notify_front.put(notification) + else: + self._notifier(notification) - def _stream_notification_handler(self): - notification = self._notification_queue.popleft() + def _stream_notification_handler(self, notification): if notification.notification_type == NotificationType.TERMINATE: self.terminate_jobs() elif notification.notification_type == NotificationType.QUIT: @@ -594,6 +606,18 @@ class Scheduler: # as we don't want to pickle exceptions between processes raise ValueError("Unrecognised notification type received") + def _loop(self): + assert self._notify_back + # Check for and process new messages + while True: + try: + notification = self._notify_back.get_nowait() + self._stream_notification_handler(notification) + except queue.Empty: + notification = None + break + self.loop.call_later(0.01, self._loop) + def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing # are enabling the 'spawn' method of starting child processes, and diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 5bb7de3..e0c3383 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -19,6 +19,9 @@ # Jürg Billeter <[email protected]> # Tristan Maat <[email protected]> +import asyncio +import functools +import multiprocessing as mp import os import sys import stat @@ -26,9 +29,9 @@ import shlex import shutil import tarfile import tempfile +import queue from contextlib import contextmanager, suppress from fnmatch import fnmatch -from collections import deque from typing import List, Tuple from ._artifactelement import verify_artifact_ref, ArtifactElement @@ -90,14 +93,13 @@ class Stream: self._project = None self._pipeline = None self._state = State(session_start) # Owned by Stream, used by Core to set state - self._notification_queue = deque() + # self._notification_pipe_front, self._notification_pipe_back = mp.Pipe() + self._subprocess = None self._starttime = session_start # Synchronised with Scheduler's relative start time context.messenger.set_state(self._state) - self._scheduler = Scheduler( - context, session_start, self._state, self._notification_queue, self._scheduler_notification_handler - ) + self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler) self._session_start_callback = session_start_callback self._ticker_callback = ticker_callback self._interrupt_callback = interrupt_callback @@ -105,6 +107,8 @@ class Stream: self._scheduler_running = False self._scheduler_terminated = False self._scheduler_suspended = False + self._notify_front = None + self._notify_back = None # init() # @@ -115,11 +119,65 @@ class Stream: self._artifacts = self._context.artifactcache self._sourcecache = self._context.sourcecache + @staticmethod + def _subprocess_main(func, notify, *args, **kwargs): + # Set main process + utils._set_stream_pid() + + try: + func(*args, **kwargs) + except Exception as e: + notify.put(Notification(NotificationType.EXCEPTION, exception=e)) + + def run_in_subprocess(self, func, *args, **kwargs): + assert not self._subprocess + + mp_context = mp.get_context(method="fork") + process_name = "stream-{}".format(func.__name__) + + self._notify_front = mp.Queue() + self._notify_back = mp.Queue() + # Tell the scheduler to not use the notifier callback + self._scheduler._notify_front = self._notify_front + self._scheduler._notify_back = self._notify_back + + args = list(args) + args.insert(0, self._notify_front) + args.insert(0, func) + + self._subprocess = mp_context.Process( + target=Stream._subprocess_main, args=args, kwargs=kwargs, name=process_name + ) + + self._subprocess.start() + + # TODO connect signal handlers with asyncio + while self._subprocess.exitcode is None: + # check every given time interval on subprocess state + self._subprocess.join(0.01) + # if no exit code, go back to checking the message queue + self._loop() + print("Stopping loop...") + + # Ensure no more notifcations to process + try: + while True: + notification = self._notify_front.get_nowait() + self._scheduler_notification_handler(notification) + except queue.Empty: + print("Finished processing notifications") + pass + # cleanup() # # Cleans up application state # def cleanup(self): + # Close the notification queue + for q in [self._notify_back, self._notify_front]: + if q is not None: + q.close() + # self._notification_queue.cancel_join_thread() if self._project: self._project.cleanup() @@ -261,6 +319,9 @@ class Stream: scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=buildtree ) + def build(self, *args, **kwargs): + self.run_in_subprocess(self._build, *args, **kwargs) + # build() # # Builds (assembles) elements in the pipeline. @@ -274,7 +335,7 @@ class Stream: # If `remote` specified as None, then regular configuration will be used # to determine where to push artifacts to. # - def build(self, targets, *, selection=PipelineSelection.PLAN, ignore_junction_targets=False, remote=None): + def _build(self, targets, *, selection=PipelineSelection.PLAN, ignore_junction_targets=False, remote=None): use_config = True if remote: @@ -1624,11 +1685,7 @@ class Stream: return element_targets, artifact_refs - def _scheduler_notification_handler(self): - # Check the queue is there - assert self._notification_queue - notification = self._notification_queue.pop() - + def _scheduler_notification_handler(self, notification): if notification.notification_type == NotificationType.MESSAGE: self._context.messenger.message(notification.message) elif notification.notification_type == NotificationType.INTERRUPT: @@ -1638,6 +1695,7 @@ class Stream: elif notification.notification_type == NotificationType.JOB_START: self._state.add_task(notification.job_action, notification.full_name, notification.time) elif notification.notification_type == NotificationType.JOB_COMPLETE: + # State between scheduler & stream is different if run in a subprocces self._state.remove_task(notification.job_action, notification.full_name) if notification.job_status == JobStatus.FAIL: self._state.fail_task(notification.job_action, notification.full_name, notification.element) @@ -1651,13 +1709,31 @@ class Stream: self._scheduler_suspended = not self._scheduler_suspended elif notification.notification_type == NotificationType.TASK_ERROR: set_last_task_error(*notification.task_error) + elif notification.notification_type == NotificationType.EXCEPTION: + raise notification.exception else: raise StreamError("Unrecognised notification type received") def _notify(self, notification): - # Stream to scheduler notifcations on left side - self._notification_queue.appendleft(notification) - self._notifier() + # Set that the notifcation is for the scheduler + # notification.for_scheduler = True + if self._notify_back: + self._notify_back.put(notification) + else: + self._scheduler._stream_notification_handler(notification) + + # The code to be run by the Stream's event loop while delegating + # work to a subprocess with the @subprocessed decorator + def _loop(self): + assert self._notify_front + # Check for and process new messages + while True: + try: + notification = self._notify_front.get_nowait() + self._scheduler_notification_handler(notification) + except queue.Empty: + notification = None + break def __getstate__(self): # The only use-cases for pickling in BuildStream at the time of writing diff --git a/src/buildstream/_workspaces.py b/src/buildstream/_workspaces.py index 3d50fd9..f9636f8 100644 --- a/src/buildstream/_workspaces.py +++ b/src/buildstream/_workspaces.py @@ -518,7 +518,7 @@ class Workspaces: # create_workspace permanent # def save_config(self): - assert utils._is_main_process() + assert not utils._is_job_process() config = { "format-version": BST_WORKSPACE_FORMAT_VERSION, diff --git a/src/buildstream/element.py b/src/buildstream/element.py index ffce257..6918c9c 100644 --- a/src/buildstream/element.py +++ b/src/buildstream/element.py @@ -769,7 +769,7 @@ class Element(Plugin): self.info("Resetting workspace state, last successful build is no longer in the cache") # In case we are staging in the main process - if utils._is_main_process(): + if not utils._is_job_process(): context.get_workspaces().save_config() for dep in self.dependencies(scope): @@ -794,7 +794,7 @@ class Element(Plugin): # In case we are running `bst shell`, this happens in the # main process and we need to update the workspace config - if utils._is_main_process(): + if not utils._is_job_process(): context.get_workspaces().save_config() result = dep.stage_artifact( @@ -1588,7 +1588,7 @@ class Element(Plugin): self._update_ready_for_runtime_and_cached() if self._get_workspace() and self._cached_success(): - assert utils._is_main_process(), "Attempted to save workspace configuration from child process" + assert not utils._is_job_process(), "Attempted to save workspace configuration from child process" # # Note that this block can only happen in the # main process, since `self._cached_success()` cannot diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index d4ffd64..c07ab8c 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -59,7 +59,7 @@ class SandboxRemote(SandboxREAPI): return # gRPC doesn't support fork without exec, which is used in the main process. - assert not utils._is_main_process() + assert utils._is_job_process() self.storage_url = config.storage_service["url"] self.exec_url = config.exec_service["url"] diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py index 7f7bf67..3902f7d 100644 --- a/src/buildstream/utils.py +++ b/src/buildstream/utils.py @@ -58,6 +58,9 @@ _URI_SCHEMES = ["http", "https", "ftp", "file", "git", "sftp", "ssh"] # Main process pid _MAIN_PID = os.getpid() +# This is different to _MAIN_PID if running a subprocessed stream entry point +_STREAM_PID = _MAIN_PID + # The number of threads in the main process at startup. # This is 1 except for certain test environments (xdist/execnet). _INITIAL_NUM_THREADS_IN_MAIN_PROCESS = 1 @@ -770,13 +773,18 @@ def _pretty_size(size, dec_places=0): return "{size:g}{unit}".format(size=round(psize, dec_places), unit=unit) -# _is_main_process() +# _is_job_process() # -# Return whether we are in the main process or not. +# Return whether we are in a job process. # -def _is_main_process(): - assert _MAIN_PID is not None - return os.getpid() == _MAIN_PID +def _is_job_process(): + assert _STREAM_PID is not None + return os.getpid() != _STREAM_PID + + +def _set_stream_pid() -> None: + global _STREAM_PID # pylint: disable=global-statement + _STREAM_PID = os.getpid() # Recursively remove directories, ignoring file permissions as much as @@ -1479,10 +1487,15 @@ def _is_single_threaded(): # Use psutil as threading.active_count() doesn't include gRPC threads. process = psutil.Process() - if process.pid == _MAIN_PID: - expected_num_threads = _INITIAL_NUM_THREADS_IN_MAIN_PROCESS - else: - expected_num_threads = 1 + expected_num_threads = 1 + + if process.pid == _STREAM_PID: + if _STREAM_PID != _MAIN_PID: + # multiprocessing.Queue() has a background thread for object pickling, + # see https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues + expected_num_threads += 1 + else: + expected_num_threads = _INITIAL_NUM_THREADS_IN_MAIN_PROCESS # gRPC threads are not joined when shut down. Wait for them to exit. wait = 0.1
