This is an automated email from the ASF dual-hosted git repository. not-in-ldap pushed a commit to branch phil/ui-split-refactor in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 8b32793fa8ce95273ee034829166e0697363d00e Author: Phil Dawson <[email protected]> AuthorDate: Fri Jul 5 11:26:04 2019 +0100 Move subprocess machinery into a method This get's round the pickling identity issues --- src/buildstream/_stream.py | 128 ++++++++++++++++++++++++++++++--------------- 1 file changed, 87 insertions(+), 41 deletions(-) diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 0fe2234..e4f6bc0 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -19,7 +19,6 @@ # Jürg Billeter <[email protected]> # Tristan Maat <[email protected]> -import asyncio import itertools import functools import multiprocessing as mp @@ -48,38 +47,44 @@ from . import utils, _yaml, _site from . import Scope, Consistency -# A decorator which runs the decorated method to be run in a subprocess -def subprocessed(func): - - @functools.wraps(func) - def _subprocessed(self, *args, **kwargs): - assert self - print("Args: {}".format([*args])) - print("Kwargs: {}".format(list(kwargs.items()))) - assert not self._subprocess - - # TODO use functools to pass arguments to func to make target for subprocess - - # Start subprocessed work - mp_context = mp.get_context(method='spawn') - process_name = "stream-{}".format(func.__name__) - target = functools.partial(func, self, *args, **kwargs) - print("launching subprocess:", process_name) - self._subprocess = mp_context.Process(target=target, name=process_name) - self._subprocess.start() - - # TODO connect signal handlers - - # Run event loop. This event loop should exit once the - # subprocessed work has completed - print("Starting loop...") - while not self._subprocess.exitcode: - self._loop() - print("Stopping loop...") - - # Return result of subprocessed function - - return _subprocessed +def _subprocessed(self, *args, **kwargs): + assert self + print("Args: {}".format([*args])) + print("Kwargs: {}".format(list(kwargs.items()))) + assert not self._subprocess + + global notification_count + notification_count = 0 + # TODO use functools to pass arguments to func to make target for subprocess + + # Start subprocessed work + mp_context = mp.get_context(method='spawn') + process_name = "stream-{}".format(func.__name__) + print("launchinglaunching subprocess:", process_name) + print(func.__module__, func.__name__) + import buildstream + try: + assert func is buildstream._stream.Stream.build or func is Stream.build + except AssertionError: + print(func, func.__qualname__, func.__name__, func.__module__, id(func)) + self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name) + self._subprocess.start() + + # TODO connect signal handlers + + self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name) + + print("Starting loop...") + while not self._subprocess.exitcode: + self._loop() + print("Stopping loop...") + + try: + while True: + notification = self.notification_queue.get() + self._scheduler_notification_handler(notification) + except queue.Empty: + pass # Stream() @@ -141,6 +146,46 @@ class Stream(): def init(self): self._artifacts = self._context.artifactcache self._sourcecache = self._context.sourcecache + print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__, id(Stream.build)) + + + def run_in_subprocess(self, func, *args, **kwargs): + print("Args: {}".format([*args])) + print("Kwargs: {}".format(list(kwargs.items()))) + assert not self._subprocess + + global notification_count + notification_count = 0 + # TODO use functools to pass arguments to func to make target for subprocess + + # Start subprocessed work + mp_context = mp.get_context(method='fork') + process_name = "stream-{}".format(func.__name__) + print("launchinglaunching subprocess:", process_name) + print(func.__module__, func.__name__) + import buildstream + try: + assert func is buildstream._stream.Stream.build or func is Stream.build + except AssertionError: + print(func, func.__qualname__, func.__name__, func.__module__, id(func)) + self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name) + self._subprocess.start() + + # TODO connect signal handlers + + self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name) + + print("Starting loop...") + while not self._subprocess.exitcode: + self._loop() + print("Stopping loop...") + + try: + while True: + notification = self.notification_queue.get() + self._scheduler_notification_handler(notification) + except queue.Empty: + pass # cleanup() # @@ -265,6 +310,8 @@ class Stream(): return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=buildtree) + def build(self, *args, **kwargs): + self.run_in_subprocess(self._build, *args, **kwargs) # build() # # Builds (assembles) elements in the pipeline. @@ -282,14 +329,13 @@ class Stream(): # If `remote` specified as None, then regular configuration will be used # to determine where to push artifacts to. # - @subprocessed - def build(self, targets, *, - track_targets=None, - track_except=None, - track_cross_junctions=False, - ignore_junction_targets=False, - build_all=False, - remote=None): + def _build(self, targets, *, + track_targets=None, + track_except=None, + track_cross_junctions=False, + ignore_junction_targets=False, + build_all=False, + remote=None): if build_all: selection = PipelineSelection.ALL
