sanha commented on a change in pull request #11: [NEMO-42] Make SchedulerRunner reactive, Ensure reverse-topological ordering in scheduling URL: https://github.com/apache/incubator-nemo/pull/11#discussion_r176907052
########## File path: runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupQueue.java ########## @@ -59,81 +57,85 @@ public SingleJobTaskGroupQueue() { } @Override - public void enqueue(final ScheduledTaskGroup scheduledTaskGroup) { + public synchronized void add(final ScheduledTaskGroup scheduledTaskGroup) { final String stageId = RuntimeIdGenerator.getStageIdFromTaskGroupId(scheduledTaskGroup.getTaskGroupId()); - synchronized (stageIdToPendingTaskGroups) { - stageIdToPendingTaskGroups.compute(stageId, - new BiFunction<String, Deque<ScheduledTaskGroup>, Deque<ScheduledTaskGroup>>() { - @Override - public Deque<ScheduledTaskGroup> apply(final String s, - final Deque<ScheduledTaskGroup> scheduledTaskGroups) { - if (scheduledTaskGroups == null) { - final Deque<ScheduledTaskGroup> pendingTaskGroupsForStage = new ArrayDeque<>(); - pendingTaskGroupsForStage.add(scheduledTaskGroup); - updateSchedulableStages(stageId, scheduledTaskGroup.getContainerType()); - return pendingTaskGroupsForStage; - } else { - scheduledTaskGroups.add(scheduledTaskGroup); - return scheduledTaskGroups; - } - } - }); - } + stageIdToPendingTaskGroups.compute(stageId, (s, taskGroupIdToTaskGroup) -> { + if (taskGroupIdToTaskGroup == null) { + final Map<String, ScheduledTaskGroup> taskGroupIdToTaskGroupMap = new HashMap<>(); + taskGroupIdToTaskGroupMap.put(scheduledTaskGroup.getTaskGroupId(), scheduledTaskGroup); + updateSchedulableStages(stageId, scheduledTaskGroup.getContainerType()); + return taskGroupIdToTaskGroupMap; + } else { + taskGroupIdToTaskGroup.put(scheduledTaskGroup.getTaskGroupId(), scheduledTaskGroup); + return taskGroupIdToTaskGroup; + } + }); } /** - * Dequeues the next TaskGroup to be scheduled according to job dependency priority. - * @return the next TaskGroup to be scheduled + * {@inheritDoc} */ @Override - public Optional<ScheduledTaskGroup> dequeue() { - ScheduledTaskGroup taskGroupToSchedule = null; - final String stageId; - try { - stageId = schedulableStages.takeFirst(); - } catch (InterruptedException e) { - e.printStackTrace(); - throw new SchedulingException(new Throwable("An exception occurred while trying to dequeue the next TaskGroup")); + public synchronized ScheduledTaskGroup remove(final String taskGroupId) throws NoSuchElementException { + final String stageId = schedulableStages.peekFirst(); Review comment: This code assumes the task group to be removed always belongs to the first stage of `schedulableStage`. Please describe this constraint to the method comment in `PendingTaskGroupQueue`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services