This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit b7b445cdb0de0223015c71506d8689135151c966
Author: Phil Dawson <[email protected]>
AuthorDate: Thu Jun 13 17:47:04 2019 +0100

    WIP: Refactor scheduler-frontend communication
---
 src/buildstream/_frontend/app.py        |  2 +
 src/buildstream/_scheduler/__init__.py  |  2 +-
 src/buildstream/_scheduler/scheduler.py | 76 +++++++++++++++++++++++++--------
 src/buildstream/_stream.py              | 25 ++++++++++-
 4 files changed, 85 insertions(+), 20 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 76e3bc7..9f90938 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -607,6 +607,7 @@ class App():
                 pass
             return
 
+        assert False
         # Interactive mode for element failures
         with self._interrupted():
 
@@ -646,6 +647,7 @@ class App():
 
                 # Handle choices which you can come back from
                 #
+                assert choice != 'shell'  # This won't work for now
                 if choice == 'shell':
                     click.echo("\nDropping into an interactive shell in the 
failed build sandbox\n", err=True)
                     try:
diff --git a/src/buildstream/_scheduler/__init__.py 
b/src/buildstream/_scheduler/__init__.py
index d2f458f..d689d6e 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -26,5 +26,5 @@ from .queues.buildqueue import BuildQueue
 from .queues.artifactpushqueue import ArtifactPushQueue
 from .queues.pullqueue import PullQueue
 
-from .scheduler import Scheduler, SchedStatus
+from .scheduler import Scheduler, SchedStatus, Notification, NotificationType
 from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/scheduler.py 
b/src/buildstream/_scheduler/scheduler.py
index 4f668c6..a9286d4 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -21,6 +21,7 @@
 # System imports
 import os
 import asyncio
+import enum
 from itertools import chain
 import signal
 import datetime
@@ -45,6 +46,34 @@ _ACTION_NAME_CLEANUP = 'clean'
 _ACTION_NAME_CACHE_SIZE = 'size'
 
 
[email protected]
+class NotificationType(enum.Enum):
+    INTERRUPT = "interrupt"
+    JOB_START = "job_start"
+    JOB_COMPLETE = "job_complete"
+    TICK = "tick"
+
+
+class Notification:
+
+    def __init__(self,
+                 notification_type,
+                 *,
+                 full_name=None,
+                 job_action=None,
+                 job_status=None,
+                 failed_element=False,
+                 elapsed_time=None,
+                 element=None):
+        self.notification_type = notification_type
+        self.full_name = full_name
+        self.job_action = job_action
+        self.job_status = job_status
+        self.failed_element = failed_element
+        self.elapsed_time = elapsed_time
+        self.element = element
+
+
 # Scheduler()
 #
 # The scheduler operates on a list queues, each of which is meant to accomplish
@@ -69,7 +98,7 @@ _ACTION_NAME_CACHE_SIZE = 'size'
 class Scheduler():
 
     def __init__(self, context,
-                 start_time, state,
+                 start_time, state, message_handler,
                  interrupt_callback=None,
                  ticker_callback=None,
                  interactive_failure=False):
@@ -102,9 +131,17 @@ class Scheduler():
         self._cleanup_scheduled = False       # Whether we have a cleanup job 
scheduled
         self._cleanup_running = None          # A running CleanupJob, or None
 
-        # Callbacks to report back to the Scheduler owner
-        self._interrupt_callback = interrupt_callback
-        self._ticker_callback = ticker_callback
+        # Callback to send messages to report back to the Scheduler's owner
+        self.message = message_handler
+
+        # Whether our exclusive jobs, like 'cleanup' are currently already
+        # waiting or active.
+        #
+        # This is just a bit quicker than scanning the wait queue and active
+        # queue and comparing job action names.
+        #
+        self._exclusive_waiting = set()
+        self._exclusive_active = set()
 
         self.resources = Resources(context.sched_builders,
                                    context.sched_fetchers,
@@ -134,8 +171,7 @@ class Scheduler():
         asyncio.set_event_loop(self.loop)
 
         # Add timeouts
-        if self._ticker_callback:
-            self.loop.call_later(1, self._tick)
+        self.loop.call_later(1, self._tick)
 
         # Handle unix signals while running
         self._connect_signals()
@@ -254,13 +290,19 @@ class Scheduler():
         # Remove from the active jobs list
         self._active_jobs.remove(job)
 
-        self._state.remove_task(job.action_name, job.name)
         if status == JobStatus.FAIL:
             # If it's an elementjob, we want to compare against the failure 
messages
             # and send the Element() instance if interactive failure handling. 
Note
             # this may change if the frontend is run in a separate process for 
pickling
             element = job._element if (job.element_job and 
self._interactive_failure) else None
-            self._state.fail_task(job.action_name, job.name, 
element_job=job.element_job, element=element)
+
+            message = Notification(NotificationType.JOB_COMPLETE,
+                                   full_name=job.name,
+                                   job_action=job.action_name,
+                                   job_status=status,
+                                   failed_element=job.element_job,
+                                   element=element)
+            self.message(message)
 
         # Now check for more jobs
         self._sched()
@@ -319,7 +361,11 @@ class Scheduler():
     #
     def _start_job(self, job):
         self._active_jobs.append(job)
-        self._state.add_task(job.action_name, job.name, self.elapsed_time())
+        message = Notification(NotificationType.JOB_START,
+                               full_name=job.name,
+                               job_action=job.action_name,
+                               elapsed_time=self.elapsed_time())
+        self.message(message)
         job.start()
 
     # Callback for the cache size job
@@ -538,13 +584,8 @@ class Scheduler():
         if self.terminated:
             return
 
-        # Leave this to the frontend to decide, if no
-        # interrrupt callback was specified, then just terminate.
-        if self._interrupt_callback:
-            self._interrupt_callback()
-        else:
-            # Default without a frontend is just terminate
-            self.terminate_jobs()
+        message = Notification(NotificationType.INTERRUPT)
+        self.message(message)
 
     # _terminate_event():
     #
@@ -603,7 +644,8 @@ class Scheduler():
 
     # Regular timeout for driving status in the UI
     def _tick(self):
-        self._ticker_callback()
+        message = Notification(NotificationType.TICK)
+        self.message(message)
         self.loop.call_later(1, self._tick)
 
     def __getstate__(self):
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 5f12889..45bb41b 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -36,7 +36,7 @@ from ._artifactelement import verify_artifact_ref, 
ArtifactElement
 from ._exceptions import StreamError, ImplError, BstError, 
ArtifactElementError, ArtifactError
 from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
-    SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue
+    SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, 
NotificationType, JobStatus
 from ._pipeline import Pipeline, PipelineSelection
 from ._profile import Topics, PROFILER
 from ._state import State
@@ -85,12 +85,14 @@ class Stream():
 
         context.messenger.set_state(self._state)
 
-        self._scheduler = Scheduler(context, session_start, self._state,
+        self._scheduler = Scheduler(context, session_start, self._state, 
self._scheduler_notification_handler,
                                     interrupt_callback=interrupt_callback,
                                     ticker_callback=ticker_callback,
                                     interactive_failure=interactive_failure)
         self._first_non_track_queue = None
         self._session_start_callback = session_start_callback
+        self._ticker_callback = ticker_callback
+        self._interrupt_callback = interrupt_callback
 
     # init()
     #
@@ -1572,6 +1574,25 @@ class Stream():
 
         return element_targets, artifact_refs
 
+    def _scheduler_notification_handler(self, notification):
+        if notification.notification_type == NotificationType.INTERRUPT:
+            self._interrupt_callback()
+        elif notification.notification_type == NotificationType.TICK:
+            self._ticker_callback()
+        elif notification.notification_type == NotificationType.JOB_START:
+            self._state.add_task(notification.job_action, 
notification.full_name, notification.elapsed_time)
+
+        elif notification.notification_type == NotificationType.JOB_COMPLETE:
+            self._state.remove_task(notification.job_action, 
notification.full_name)
+            if notification.job_status == JobStatus.FAIL:
+                if notification.failed_element:
+                    unique_id = notification.full_name
+                else:
+                    unique_id = None
+                self._state.fail_task(notification.job_action, 
notification.full_name, unique_id)
+        else:
+            raise StreamError("Unreccognised notification type recieved")
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and

Reply via email to