sanha commented on a change in pull request #11: [NEMO-42] Make SchedulerRunner 
reactive, Ensure reverse-topological ordering in scheduling
URL: https://github.com/apache/incubator-nemo/pull/11#discussion_r176907052
 
 

 ##########
 File path: 
runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupQueue.java
 ##########
 @@ -59,81 +57,85 @@ public SingleJobTaskGroupQueue() {
   }
 
   @Override
-  public void enqueue(final ScheduledTaskGroup scheduledTaskGroup) {
+  public synchronized void add(final ScheduledTaskGroup scheduledTaskGroup) {
     final String stageId = 
RuntimeIdGenerator.getStageIdFromTaskGroupId(scheduledTaskGroup.getTaskGroupId());
 
-    synchronized (stageIdToPendingTaskGroups) {
-      stageIdToPendingTaskGroups.compute(stageId,
-          new BiFunction<String, Deque<ScheduledTaskGroup>, 
Deque<ScheduledTaskGroup>>() {
-            @Override
-            public Deque<ScheduledTaskGroup> apply(final String s,
-                                                   final 
Deque<ScheduledTaskGroup> scheduledTaskGroups) {
-              if (scheduledTaskGroups == null) {
-                final Deque<ScheduledTaskGroup> pendingTaskGroupsForStage = 
new ArrayDeque<>();
-                pendingTaskGroupsForStage.add(scheduledTaskGroup);
-                updateSchedulableStages(stageId, 
scheduledTaskGroup.getContainerType());
-                return pendingTaskGroupsForStage;
-              } else {
-                scheduledTaskGroups.add(scheduledTaskGroup);
-                return scheduledTaskGroups;
-              }
-            }
-          });
-    }
+    stageIdToPendingTaskGroups.compute(stageId, (s, taskGroupIdToTaskGroup) -> 
{
+      if (taskGroupIdToTaskGroup == null) {
+        final Map<String, ScheduledTaskGroup> taskGroupIdToTaskGroupMap = new 
HashMap<>();
+        taskGroupIdToTaskGroupMap.put(scheduledTaskGroup.getTaskGroupId(), 
scheduledTaskGroup);
+        updateSchedulableStages(stageId, 
scheduledTaskGroup.getContainerType());
+        return taskGroupIdToTaskGroupMap;
+      } else {
+        taskGroupIdToTaskGroup.put(scheduledTaskGroup.getTaskGroupId(), 
scheduledTaskGroup);
+        return taskGroupIdToTaskGroup;
+      }
+    });
   }
 
   /**
-   * Dequeues the next TaskGroup to be scheduled according to job dependency 
priority.
-   * @return the next TaskGroup to be scheduled
+   * {@inheritDoc}
    */
   @Override
-  public Optional<ScheduledTaskGroup> dequeue() {
-    ScheduledTaskGroup taskGroupToSchedule = null;
-    final String stageId;
-    try {
-      stageId = schedulableStages.takeFirst();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-      throw new SchedulingException(new Throwable("An exception occurred while 
trying to dequeue the next TaskGroup"));
+  public synchronized ScheduledTaskGroup remove(final String taskGroupId) 
throws NoSuchElementException {
+    final String stageId = schedulableStages.peekFirst();
 
 Review comment:
   This code assumes the task group to be removed always belongs to the first 
stage of `schedulableStage`.
   Please describe this constraint to the method comment in 
`PendingTaskGroupQueue`. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to