This is an automated email from the ASF dual-hosted git repository. not-in-ldap pushed a commit to branch tpollard/subrebase in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit f7fce34928720ddf98c8a8c256ae39ae546cb002 Author: Tom Pollard <[email protected]> AuthorDate: Tue Sep 10 15:10:04 2019 +0100 scheduler.py: Notification for last_task_error propagation Add a notification for TASK_ERROR. As queues & job handlers will be running in a different process to the front end, the global state in the frontend Exception process needs to be notified. This is used internally for the BST_TEST_SUITE. --- src/buildstream/_scheduler/jobs/job.py | 4 ++-- src/buildstream/_scheduler/queues/queue.py | 4 ++-- src/buildstream/_scheduler/scheduler.py | 19 ++++++++++++++++++- src/buildstream/_stream.py | 4 +++- 4 files changed, 25 insertions(+), 6 deletions(-) diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py index e7866bc..4cb80b8 100644 --- a/src/buildstream/_scheduler/jobs/job.py +++ b/src/buildstream/_scheduler/jobs/job.py @@ -29,7 +29,7 @@ import sys import traceback # BuildStream toplevel imports -from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob +from ..._exceptions import ImplError, BstError, SkipJob from ..._message import Message, MessageType, unconditional_messages from ...types import FastEnum from ... import _signals, utils @@ -491,7 +491,7 @@ class Job: # For regression tests only, save the last error domain / reason # reported from a child task in the main process, this global state # is currently managed in _exceptions.py - set_last_task_error(envelope.message["domain"], envelope.message["reason"]) + self._scheduler.set_last_task_error(envelope.message["domain"], envelope.message["reason"]) elif envelope.message_type is _MessageType.RESULT: assert self._result is None self._result = envelope.message diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py index 295161e..71a34a8 100644 --- a/src/buildstream/_scheduler/queues/queue.py +++ b/src/buildstream/_scheduler/queues/queue.py @@ -30,7 +30,7 @@ from ..jobs import ElementJob, JobStatus from ..resources import ResourceType # BuildStream toplevel imports -from ..._exceptions import BstError, ImplError, set_last_task_error +from ..._exceptions import BstError, ImplError from ..._message import Message, MessageType from ...types import FastEnum @@ -316,7 +316,7 @@ class Queue: # # This just allows us stronger testing capability # - set_last_task_error(e.domain, e.reason) + self._scheduler.set_last_task_error(e.domain, e.reason) except Exception: # pylint: disable=broad-except diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py index 80bd9fb..4e034c5 100644 --- a/src/buildstream/_scheduler/scheduler.py +++ b/src/buildstream/_scheduler/scheduler.py @@ -66,6 +66,7 @@ class NotificationType(FastEnum): SUSPENDED = "suspended" RETRY = "retry" MESSAGE = "message" + TASK_ERROR = "task_error" # Notification() @@ -86,7 +87,8 @@ class Notification: job_status=None, time=None, element=None, - message=None + message=None, + task_error=None ): self.notification_type = notification_type self.full_name = full_name @@ -95,6 +97,7 @@ class Notification: self.time = time self.element = element self.message = message + self.task_error = task_error # Tuple of domain & reason # Scheduler() @@ -328,6 +331,20 @@ class Scheduler: def notify_messenger(self, message): self._notify(Notification(NotificationType.MESSAGE, message=message)) + # set_last_task_error() + # + # Save the last error domain / reason reported from a child job or queue + # in the main process. + # + # Args: + # domain (ErrorDomain): Enum for the domain from which the error occurred + # reason (str): String identifier representing the reason for the error + # + def set_last_task_error(self, domain, reason): + task_error = domain, reason + notification = Notification(NotificationType.TASK_ERROR, task_error=task_error) + self._notify(notification) + ####################################################### # Local Private Methods # ####################################################### diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index 4b84093..5bb7de3 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -32,7 +32,7 @@ from collections import deque from typing import List, Tuple from ._artifactelement import verify_artifact_ref, ArtifactElement -from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError +from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error from ._message import Message, MessageType from ._scheduler import ( Scheduler, @@ -1649,6 +1649,8 @@ class Stream: self._scheduler_terminated = True elif notification.notification_type == NotificationType.SUSPENDED: self._scheduler_suspended = not self._scheduler_suspended + elif notification.notification_type == NotificationType.TASK_ERROR: + set_last_task_error(*notification.task_error) else: raise StreamError("Unrecognised notification type received")
