This is an automated email from the ASF dual-hosted git repository. root pushed a commit to branch phil/712 in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 391bda38013bfee52bb39c57588e66c536056b7d Author: Phil Dawson <[email protected]> AuthorDate: Tue Nov 27 15:20:28 2018 +0000 WIP: _schedular: introduce second 'high priority' waiting jobs queue This reverts commit b23e5d16aa0eb5e0ba41fddc41bc9d29b1cad8dc. --- buildstream/_scheduler/queues/queue.py | 1 - buildstream/_scheduler/scheduler.py | 38 ++++++++++++++-------------------- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py index 7df3bb1..909cebb 100644 --- a/buildstream/_scheduler/queues/queue.py +++ b/buildstream/_scheduler/queues/queue.py @@ -58,7 +58,6 @@ class Queue(): action_name = None complete_name = None resources = [] # Resources this queues' jobs want - high_priority = False # If the jobs from this queue should be prioritised by the scheduler def __init__(self, scheduler): diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py index cfaa0a2..b76c730 100644 --- a/buildstream/_scheduler/scheduler.py +++ b/buildstream/_scheduler/scheduler.py @@ -25,7 +25,6 @@ from itertools import chain import signal import datetime from contextlib import contextmanager -from collections import deque # Local imports from .resources import Resources, ResourceType @@ -72,16 +71,16 @@ class Scheduler(): # # Public members # - self.active_jobs = [] # Jobs currently being run in the scheduler - self.waiting_jobs = deque() # Jobs waiting for resources - self.queues = None # Exposed for the frontend to print summaries - self.context = context # The Context object shared with Queues - self.terminated = False # Whether the scheduler was asked to terminate or has terminated - self.suspended = False # Whether the scheduler is currently suspended + self.active_jobs = [] # Jobs currently being run in the scheduler + self.waiting_jobs = [] # Jobs waiting for resources + self.queues = None # Exposed for the frontend to print summaries + self.context = context # The Context object shared with Queues + self.terminated = False # Whether the scheduler was asked to terminate or has terminated + self.suspended = False # Whether the scheduler is currently suspended # These are shared with the Job, but should probably be removed or made private in some way. - self.loop = None # Shared for Job access to observe the message queue - self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py + self.loop = None # Shared for Job access to observe the message queue + self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py # # Private members @@ -216,15 +215,14 @@ class Scheduler(): # # Args: # jobs ([Job]): A list of jobs to schedule - # priority_jobs([Job]): A list of jobs which should be prioritised over those in jobs # # Schedule 'Job's for the scheduler to run. Jobs scheduled will be # run as soon any other queueing jobs finish, provided sufficient - # resources are available for them to run. + # resources are available for them to run # - def schedule_jobs(self, jobs, priority_jobs): - self.waiting_jobs.extend(jobs) - self.waiting_jobs.extendleft(priority_jobs) + def schedule_jobs(self, jobs): + for job in jobs: + self.waiting_jobs.append(job) # job_completed(): # @@ -300,7 +298,6 @@ class Scheduler(): # def _schedule_queue_jobs(self): ready = [] - ready_priority = [] process_queues = True while self._queue_jobs and process_queues: @@ -325,19 +322,16 @@ class Scheduler(): # to fetch tasks for elements which failed to pull, and # thus need all the pulls to complete before ever starting # a build - for queue in reversed(self.queues): - ready_jobs = queue.pop_ready_jobs() - if queue.high_priority: - ready_priority.extend(ready_jobs) - else: - ready.extend(ready_jobs) + ready.extend(chain.from_iterable( + queue.pop_ready_jobs() for queue in reversed(self.queues) + )) # pop_ready_jobs() may have skipped jobs, adding them to # the done_queue. Pull these skipped elements forward to # the next queue and process them. process_queues = any(q.dequeue_ready() for q in self.queues) - self.schedule_jobs(ready, ready_priority) + self.schedule_jobs(ready) self._sched() # _run_cleanup()
