seojangho closed pull request #15: [NEMO-53] Make SchedulingPolicy Stackable
URL: https://github.com/apache/incubator-nemo/pull/15
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index d579b4da..aa27c0ef 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2017 Seoul National University
+ * Copyright (C) 2018 Seoul National University
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -63,6 +63,7 @@
   private final SchedulingPolicy schedulingPolicy;
   private final SchedulerRunner schedulerRunner;
   private final PendingTaskGroupCollection pendingTaskGroupCollection;
+  private final ExecutorRegistry executorRegistry;
 
   /**
    * Other necessary components of this {@link 
edu.snu.nemo.runtime.master.RuntimeMaster}.
@@ -83,7 +84,8 @@ public BatchSingleJobScheduler(final SchedulingPolicy 
schedulingPolicy,
                                  final PendingTaskGroupCollection 
pendingTaskGroupCollection,
                                  final BlockManagerMaster blockManagerMaster,
                                  final PubSubEventHandlerWrapper 
pubSubEventHandlerWrapper,
-                                 final UpdatePhysicalPlanEventHandler 
updatePhysicalPlanEventHandler) {
+                                 final UpdatePhysicalPlanEventHandler 
updatePhysicalPlanEventHandler,
+                                 final ExecutorRegistry executorRegistry) {
     this.schedulingPolicy = schedulingPolicy;
     this.schedulerRunner = schedulerRunner;
     this.pendingTaskGroupCollection = pendingTaskGroupCollection;
@@ -94,6 +96,7 @@ public BatchSingleJobScheduler(final SchedulingPolicy 
schedulingPolicy,
       pubSubEventHandlerWrapper.getPubSubEventHandler()
           .subscribe(updatePhysicalPlanEventHandler.getEventClass(), 
updatePhysicalPlanEventHandler);
     }
+    this.executorRegistry = executorRegistry;
   }
 
   /**
@@ -166,7 +169,7 @@ public void onTaskGroupStateChanged(final String 
executorId, final String taskGr
 
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
-    schedulingPolicy.onExecutorAdded(executorRepresenter);
+    executorRegistry.registerRepresenter(executorRepresenter);
     schedulerRunner.onAnExecutorAvailable();
   }
 
@@ -178,7 +181,10 @@ public void onExecutorRemoved(final String executorId) {
     taskGroupsToReExecute.addAll(blockManagerMaster.removeWorker(executorId));
 
     // TaskGroups executing on the removed executor
-    
taskGroupsToReExecute.addAll(schedulingPolicy.onExecutorRemoved(executorId));
+    executorRegistry.setRepresenterAsFailed(executorId);
+    final ExecutorRepresenter executor = 
executorRegistry.getFailedExecutorRepresenter(executorId);
+    executor.onExecutorFailed();
+    taskGroupsToReExecute.addAll(executor.getFailedTaskGroups());
 
     taskGroupsToReExecute.forEach(failedTaskGroupId ->
       onTaskGroupStateChanged(executorId, failedTaskGroupId, 
TaskGroupState.State.FAILED_RECOVERABLE,
@@ -430,7 +436,7 @@ private void onTaskGroupExecutionComplete(final String 
executorId,
                                             final Boolean isOnHoldToComplete) {
     LOG.debug("{} completed in {}", new Object[]{taskGroupId, executorId});
     if (!isOnHoldToComplete) {
-      schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+      
executorRegistry.getRunningExecutorRepresenter(executorId).onTaskGroupExecutionComplete(taskGroupId);
     }
 
     final String stageIdForTaskGroupUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
@@ -453,7 +459,7 @@ private void onTaskGroupExecutionOnHold(final String 
executorId,
                                           final String taskGroupId,
                                           final String taskPutOnHold) {
     LOG.info("{} put on hold in {}", new Object[]{taskGroupId, executorId});
-    schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+    
executorRegistry.getRunningExecutorRepresenter(executorId).onTaskGroupExecutionComplete(taskGroupId);
     final String stageIdForTaskGroupUponCompletion = 
RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
 
     final boolean stageComplete =
@@ -493,7 +499,7 @@ private void onTaskGroupExecutionFailedRecoverable(final 
String executorId, fina
                                                      final int attemptIdx, 
final TaskGroupState.State newState,
                                                      final 
TaskGroupState.RecoverableFailureCause failureCause) {
     LOG.info("{} failed in {} by {}", new Object[]{taskGroupId, executorId, 
failureCause});
-    schedulingPolicy.onTaskGroupExecutionFailed(executorId, taskGroupId);
+    
executorRegistry.getExecutorRepresenter(executorId).onTaskGroupExecutionFailed(taskGroupId);
 
     final String stageId = 
RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
     final int attemptIndexForStage =
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
new file mode 100644
index 00000000..ac021839
--- /dev/null
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Temporary class to implement stacked scheduling policy.
+ * At now, policies are injected through Tang, but have to be configurable by 
users
+ * when Nemo supports job-wide execution property.
+ * TODO #69: Support job-wide execution property.
+ */
+public final class CompositeSchedulingPolicy implements SchedulingPolicy {
+  private final List<SchedulingPolicy> schedulingPolicies;
+
+  @Inject
+  private CompositeSchedulingPolicy(final SourceLocationAwareSchedulingPolicy 
sourceLocationAwareSchedulingPolicy,
+                                    final RoundRobinSchedulingPolicy 
roundRobinSchedulingPolicy,
+                                    final FreeSlotSchedulingPolicy 
freeSlotSchedulingPolicy,
+                                    final ContainerTypeAwareSchedulingPolicy 
containerTypeAwareSchedulingPolicy) {
+    schedulingPolicies = Arrays.asList(
+        freeSlotSchedulingPolicy,
+        containerTypeAwareSchedulingPolicy,
+        sourceLocationAwareSchedulingPolicy,
+        roundRobinSchedulingPolicy);
+  }
+
+  @Override
+  public Set<ExecutorRepresenter> filterExecutorRepresenters(final 
Set<ExecutorRepresenter> executorRepresenterSet,
+                                                             final 
ScheduledTaskGroup scheduledTaskGroup) {
+    Set<ExecutorRepresenter> candidates = executorRepresenterSet;
+    for (final SchedulingPolicy schedulingPolicy : schedulingPolicies) {
+      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, 
scheduledTaskGroup);
+    }
+    return candidates;
+  }
+}
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
new file mode 100644
index 00000000..b88c0b65
--- /dev/null
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This policy find executors which has corresponding container type.
+ */
+public final class ContainerTypeAwareSchedulingPolicy implements 
SchedulingPolicy {
+
+  @VisibleForTesting
+  @Inject
+  public ContainerTypeAwareSchedulingPolicy() {
+  }
+
+  /**
+   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be 
filtered by the container type.
+   *                               If the container type of target TaskGroup 
is NONE, it will return the original set.
+   * @param scheduledTaskGroup {@link ScheduledTaskGroup} to be scheduled.
+   * @return filtered Set of {@link ExecutorRepresenter}.
+   */
+  @Override
+  public Set<ExecutorRepresenter> filterExecutorRepresenters(final 
Set<ExecutorRepresenter> executorRepresenterSet,
+                                                             final 
ScheduledTaskGroup scheduledTaskGroup) {
+
+    if 
(scheduledTaskGroup.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
+      return executorRepresenterSet;
+    }
+
+    final Set<ExecutorRepresenter> candidateExecutors =
+        executorRepresenterSet.stream()
+            .filter(executor -> 
executor.getContainerType().equals(scheduledTaskGroup.getContainerType()))
+            .collect(Collectors.toSet());
+
+    return candidateExecutors;
+  }
+}
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
new file mode 100644
index 00000000..1eb29588
--- /dev/null
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This policy finds executor that has free slot for a TaskGroup.
+ */
+public final class FreeSlotSchedulingPolicy implements SchedulingPolicy {
+  @VisibleForTesting
+  @Inject
+  public FreeSlotSchedulingPolicy() {
+  }
+
+  /**
+   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be 
filtered by the free slot of executors.
+   *                               Executors that do not have any free slots 
will be filtered by this policy.
+   * @param scheduledTaskGroup {@link ScheduledTaskGroup} to be scheduled.
+   * @return filtered Set of {@link ExecutorRepresenter}.
+   */
+  @Override
+  public Set<ExecutorRepresenter> filterExecutorRepresenters(final 
Set<ExecutorRepresenter> executorRepresenterSet,
+                                                             final 
ScheduledTaskGroup scheduledTaskGroup) {
+    final Set<ExecutorRepresenter> candidateExecutors =
+        executorRepresenterSet.stream()
+            .filter(executor -> executor.getRunningTaskGroups().size() < 
executor.getExecutorCapacity())
+            .collect(Collectors.toSet());
+
+    return candidateExecutors;
+  }
+}
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index fa5f647b..ae19b664 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2017 Seoul National University
+ * Copyright (C) 2018 Seoul National University
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,10 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
-import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -28,6 +25,7 @@
 import java.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.util.stream.Collectors;
 
 /**
@@ -42,162 +40,33 @@
 public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
   private static final Logger LOG = 
LoggerFactory.getLogger(RoundRobinSchedulingPolicy.class.getName());
 
-  private final ExecutorRegistry executorRegistry;
-
-  /**
-   * The pool of executors available for each container type.
-   */
-  private final Map<String, List<String>> executorIdByContainerType;
-
-  /**
-   * The index of the next executor to be assigned for each container type.
-   * This map allows the executor index computation of the RR scheduling.
-   */
-  private final Map<String, Integer> nextExecutorIndexByContainerType;
-
-  @Inject
   @VisibleForTesting
-  public RoundRobinSchedulingPolicy(final ExecutorRegistry executorRegistry) {
-    this.executorRegistry = executorRegistry;
-    this.executorIdByContainerType = new HashMap<>();
-    this.nextExecutorIndexByContainerType = new HashMap<>();
-    initializeContainerTypeIfAbsent(ExecutorPlacementProperty.NONE); // Need 
this to avoid potential null errors
-  }
-
-  @Override
-  public boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup,
-                                   final JobStateManager jobStateManager) {
-    final String containerType = scheduledTaskGroup.getContainerType();
-    initializeContainerTypeIfAbsent(containerType);
-
-    Optional<String> executorId = selectExecutorByRR(containerType);
-    if (!executorId.isPresent()) { // If there is no available executor to 
schedule this task group now,
-      return false;
-    } else {
-      scheduleTaskGroup(executorId.get(), scheduledTaskGroup, jobStateManager);
-      return true;
-    }
-  }
-
-  @Override
-  public void onExecutorAdded(final ExecutorRepresenter executor) {
-    executorRegistry.registerRepresenter(executor);
-    final String containerType = executor.getContainerType();
-    initializeContainerTypeIfAbsent(containerType);
-
-    executorIdByContainerType.get(containerType)
-        .add(nextExecutorIndexByContainerType.get(containerType), 
executor.getExecutorId());
-  }
-
-  @Override
-  public Set<String> onExecutorRemoved(final String executorId) {
-    executorRegistry.setRepresenterAsFailed(executorId);
-    final ExecutorRepresenter executor = 
executorRegistry.getFailedExecutorRepresenter(executorId);
-    executor.onExecutorFailed();
-
-    final String containerType = executor.getContainerType();
-
-    final List<String> executorIdList = 
executorIdByContainerType.get(containerType);
-    int nextExecutorIndex = 
nextExecutorIndexByContainerType.get(containerType);
-
-    final int executorAssignmentLocation = executorIdList.indexOf(executorId);
-    if (executorAssignmentLocation < nextExecutorIndex) {
-      nextExecutorIndexByContainerType.put(containerType, nextExecutorIndex - 
1);
-    } else if (executorAssignmentLocation == nextExecutorIndex) {
-      nextExecutorIndexByContainerType.put(containerType, 0);
-    }
-    executorIdList.remove(executorId);
-
-    return Collections.unmodifiableSet(executor.getFailedTaskGroups());
-  }
-
-  @Override
-  public void onTaskGroupExecutionComplete(final String executorId, final 
String taskGroupId) {
-    final ExecutorRepresenter executor = 
executorRegistry.getRunningExecutorRepresenter(executorId);
-    executor.onTaskGroupExecutionComplete(taskGroupId);
-    LOG.info("{" + taskGroupId + "} completed in [" + executorId + "]");
-  }
-
-  @Override
-  public void onTaskGroupExecutionFailed(final String executorId, final String 
taskGroupId) {
-    final ExecutorRepresenter executor = 
executorRegistry.getExecutorRepresenter(executorId);
-
-    executor.onTaskGroupExecutionFailed(taskGroupId);
-    LOG.info("{" + taskGroupId + "} failed in [" + executorId + "]");
-  }
-
-  @Override
-  public void terminate() {
-    for (final String executorId : executorRegistry.getRunningExecutorIds()) {
-      final ExecutorRepresenter representer = 
executorRegistry.getRunningExecutorRepresenter(executorId);
-      representer.shutDown();
-      executorRegistry.setRepresenterAsCompleted(executorId);
-    }
+  @Inject
+  public RoundRobinSchedulingPolicy() {
   }
 
   /**
-   * Sticks to the RR policy to select an executor for the next task group.
-   * It checks the task groups running (as compared to each executor's 
capacity).
-   *
-   * @param containerType to select an executor for.
-   * @return (optionally) the selected executor.
+   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be 
filtered by round robin behaviour.
+   * @param scheduledTaskGroup {@link ScheduledTaskGroup} to be scheduled.
+   * @return filtered Set of {@link ExecutorRepresenter}.
    */
-  private Optional<String> selectExecutorByRR(final String containerType) {
-    final List<String> candidateExecutorIds = 
(containerType.equals(ExecutorPlacementProperty.NONE))
-        ? getAllContainers() // all containers
-        : executorIdByContainerType.get(containerType); // containers of a 
particular type
-
-    if (candidateExecutorIds != null && !candidateExecutorIds.isEmpty()) {
-      final int numExecutors = candidateExecutorIds.size();
-      int nextExecutorIndex = 
nextExecutorIndexByContainerType.get(containerType);
-      for (int i = 0; i < numExecutors; i++) {
-        final int index = (nextExecutorIndex + i) % numExecutors;
-        final String selectedExecutorId = candidateExecutorIds.get(index);
-
-        final ExecutorRepresenter executor = 
executorRegistry.getRunningExecutorRepresenter(selectedExecutorId);
-        if (hasFreeSlot(executor)) {
-          nextExecutorIndex = (index + 1) % numExecutors;
-          nextExecutorIndexByContainerType.put(containerType, 
nextExecutorIndex);
-          return Optional.of(selectedExecutorId);
-        }
-      }
+  @Override
+  public Set<ExecutorRepresenter> filterExecutorRepresenters(final 
Set<ExecutorRepresenter> executorRepresenterSet,
+                                                             final 
ScheduledTaskGroup scheduledTaskGroup) {
+    final OptionalInt minOccupancy =
+        executorRepresenterSet.stream()
+        .map(executor -> executor.getRunningTaskGroups().size())
+        .mapToInt(i -> i).min();
+
+    if (!minOccupancy.isPresent()) {
+      return Collections.emptySet();
     }
 
-    return Optional.empty();
-  }
-
-  /**
-   * Schedules and sends a TaskGroup to the given executor.
-   *
-   * @param executorId         of the executor to execute the TaskGroup.
-   * @param scheduledTaskGroup to assign.
-   * @param jobStateManager    which the TaskGroup belongs to.
-   */
-  private void scheduleTaskGroup(final String executorId,
-                                 final ScheduledTaskGroup scheduledTaskGroup,
-                                 final JobStateManager jobStateManager) {
-    
jobStateManager.onTaskGroupStateChanged(scheduledTaskGroup.getTaskGroupId(), 
TaskGroupState.State.EXECUTING);
-
-    final ExecutorRepresenter executor = 
executorRegistry.getRunningExecutorRepresenter(executorId);
-    LOG.info("Scheduling {} to {}",
-        new Object[]{scheduledTaskGroup.getTaskGroupId(), executorId});
-    executor.onTaskGroupScheduled(scheduledTaskGroup);
-  }
-
-  private List<String> getAllContainers() {
-    return executorIdByContainerType.values().stream()
-        .flatMap(List::stream) // flatten the list of lists to a flat stream
-        .collect(Collectors.toList()); // convert the stream to a list
-  }
-
-  private boolean hasFreeSlot(final ExecutorRepresenter executor) {
-    LOG.debug("Has Free Slot: " + executor.getExecutorId());
-    LOG.debug("Running TaskGroups: " + executor.getRunningTaskGroups());
-    return executor.getRunningTaskGroups().size() < 
executor.getExecutorCapacity();
-  }
+    final Set<ExecutorRepresenter> candidateExecutors =
+        executorRepresenterSet.stream()
+        .filter(executor -> executor.getRunningTaskGroups().size() == 
minOccupancy.getAsInt())
+        .collect(Collectors.toSet());
 
-  private void initializeContainerTypeIfAbsent(final String containerType) {
-    executorIdByContainerType.putIfAbsent(containerType, new ArrayList<>());
-    nextExecutorIndexByContainerType.putIfAbsent(containerType, 0);
+    return candidateExecutors;
   }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index 4e08467d..49ca72ef 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2017 Seoul National University
+ * Copyright (C) 2018 Seoul National University
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -15,9 +15,12 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
+import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
 import java.util.*;
@@ -27,6 +30,7 @@
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,23 +46,27 @@
 public final class SchedulerRunner {
   private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerRunner.class.getName());
   private final Map<String, JobStateManager> jobStateManagers;
-  private final SchedulingPolicy schedulingPolicy;
   private final PendingTaskGroupCollection pendingTaskGroupCollection;
   private final ExecutorService schedulerThread;
   private boolean initialJobScheduled;
   private boolean isTerminated;
   private final DelayedSignalingCondition 
mustCheckSchedulingAvailabilityOrSchedulerTerminated
       = new DelayedSignalingCondition();
+  private ExecutorRegistry executorRegistry;
+  private SchedulingPolicy schedulingPolicy;
 
+  @VisibleForTesting
   @Inject
   public SchedulerRunner(final SchedulingPolicy schedulingPolicy,
-                         final PendingTaskGroupCollection 
pendingTaskGroupCollection) {
+                         final PendingTaskGroupCollection 
pendingTaskGroupCollection,
+                         final ExecutorRegistry executorRegistry) {
     this.jobStateManagers = new HashMap<>();
     this.pendingTaskGroupCollection = pendingTaskGroupCollection;
-    this.schedulingPolicy = schedulingPolicy;
     this.schedulerThread = Executors.newSingleThreadExecutor(runnable -> new 
Thread(runnable, "SchedulerRunner"));
     this.initialJobScheduled = false;
     this.isTerminated = false;
+    this.executorRegistry = executorRegistry;
+    this.schedulingPolicy = schedulingPolicy;
   }
 
   /**
@@ -92,7 +100,11 @@ void scheduleJob(final JobStateManager jobStateManager) {
   }
 
   void terminate() {
-    schedulingPolicy.terminate();
+    for (final String executorId : executorRegistry.getRunningExecutorIds()) {
+      final ExecutorRepresenter representer = 
executorRegistry.getRunningExecutorRepresenter(executorId);
+      representer.shutDown();
+      executorRegistry.setRepresenterAsCompleted(executorId);
+    }
     isTerminated = true;
     mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
   }
@@ -122,12 +134,24 @@ public void run() {
         for (final ScheduledTaskGroup schedulableTaskGroup : 
schedulableTaskGroups) {
           final JobStateManager jobStateManager = 
jobStateManagers.get(schedulableTaskGroup.getJobId());
           LOG.debug("Trying to schedule {}...", 
schedulableTaskGroup.getTaskGroupId());
-          final boolean isScheduled =
-              schedulingPolicy.scheduleTaskGroup(schedulableTaskGroup, 
jobStateManager);
-          if (isScheduled) {
-            LOG.debug("Successfully scheduled {}", 
schedulableTaskGroup.getTaskGroupId());
+
+          final Set<ExecutorRepresenter> runningExecutorRepresenter =
+              executorRegistry.getRunningExecutorIds().stream()
+              .map(executorId -> 
executorRegistry.getExecutorRepresenter(executorId))
+              .collect(Collectors.toSet());
+
+          final Set<ExecutorRepresenter> candidateExecutors =
+              
schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, 
schedulableTaskGroup);
+
+          if (candidateExecutors.size() != 0) {
+            
jobStateManager.onTaskGroupStateChanged(schedulableTaskGroup.getTaskGroupId(),
+                TaskGroupState.State.EXECUTING);
+            final ExecutorRepresenter executor = 
candidateExecutors.stream().findFirst().get();
+            executor.onTaskGroupScheduled(schedulableTaskGroup);
+
             
pendingTaskGroupCollection.remove(schedulableTaskGroup.getTaskGroupId());
             numScheduledTaskGroups++;
+            LOG.debug("Successfully scheduled {}", 
schedulableTaskGroup.getTaskGroupId());
           } else {
             LOG.debug("Failed to schedule {}", 
schedulableTaskGroup.getTaskGroupId());
           }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 061adae9..b0e39ac7 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2017 Seoul National University
+ * Copyright (C) 2018 Seoul National University
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,7 +16,6 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
-import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
@@ -29,61 +28,9 @@
  */
 @DriverSide
 @ThreadSafe
-@DefaultImplementation(SourceLocationAwareSchedulingPolicy.class)
+@FunctionalInterface
+@DefaultImplementation(CompositeSchedulingPolicy.class)
 public interface SchedulingPolicy {
-
-  /**
-   * Attempts to schedule the given taskGroup to an executor according to this 
policy.
-   * If there is no executor available for the taskGroup, it waits for an 
executor to be assigned before it times out.
-   * (Depending on the executor's resource type)
-   *
-   * @param scheduledTaskGroup to schedule.
-   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
-   * @return true if the task group is successfully scheduled, false otherwise.
-   */
-  boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup, final 
JobStateManager jobStateManager);
-
-  /**
-   * Adds the executorId to the pool of available executors.
-   * Unlocks this policy to schedule a next taskGroup if locked.
-   * (Depending on the executor's resource type)
-   *
-   * @param executorRepresenter for the executor that has been added.
-   */
-  void onExecutorAdded(ExecutorRepresenter executorRepresenter);
-
-  /**
-   * Deletes the executorId from the pool of available executors.
-   * Locks this policy from scheduling if there is no more executor currently 
available for the next taskGroup.
-   * (Depending on the executor's resource type)
-   *
-   * @param executorId for the executor that has been deleted.
-   * @return the ids of the set of task groups that were running on the 
executor.
-   */
-  Set<String> onExecutorRemoved(String executorId);
-
-  /**
-   * Marks the taskGroup's completion in the executor.
-   * Unlocks this policy to schedule a next taskGroup if locked.
-   * (Depending on the executor's resource type)
-   *
-   * @param executorId of the executor where the taskGroup's execution has 
completed.
-   * @param taskGroupId whose execution has completed.
-   */
-  void onTaskGroupExecutionComplete(String executorId, String taskGroupId);
-
-  /**
-   * Marks the taskGroup's failure in the executor.
-   * Unlocks this policy to reschedule this taskGroup if locked.
-   * (Depending on the executor's resource type)
-   *
-   * @param executorId of the executor where the taskGroup's execution has 
failed.
-   * @param taskGroupId whose execution has completed.
-   */
-  void onTaskGroupExecutionFailed(String executorId, String taskGroupId);
-
-  /**
-   * End of scheduling.
-   */
-  void terminate();
+  Set<ExecutorRepresenter> filterExecutorRepresenters(final 
Set<ExecutorRepresenter> executorRepresenterSet,
+                                                      final ScheduledTaskGroup 
scheduledTaskGroup);
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index 9c692531..c5eee7f8 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -15,11 +15,9 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
+import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.Readable;
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
-import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
@@ -28,9 +26,7 @@
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * This policy is same as {@link RoundRobinSchedulingPolicy}, however for 
TaskGroups
@@ -42,130 +38,51 @@
 public final class SourceLocationAwareSchedulingPolicy implements 
SchedulingPolicy {
   private static final Logger LOG = 
LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
 
-  private final ExecutorRegistry executorRegistry;
-  private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
+  @VisibleForTesting
+  @Inject
+  public SourceLocationAwareSchedulingPolicy() {
+  }
 
   /**
-   * Injectable constructor for {@link SourceLocationAwareSchedulingPolicy}.
-   * @param executorRegistry provides catalog of available executors
-   * @param roundRobinSchedulingPolicy provides fallback for TaskGroups with 
no input source information
+   * @param readables collection of readables
+   * @return Set of source locations from source tasks in {@code taskGroupDAG}
+   * @throws Exception for any exception raised during querying source 
locations for a readable
    */
-  @Inject
-  private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry 
executorRegistry,
-                                              final RoundRobinSchedulingPolicy 
roundRobinSchedulingPolicy) {
-    this.executorRegistry = executorRegistry;
-    this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy;
+  private static Set<String> getSourceLocations(final Collection<Readable> 
readables) throws Exception {
+    final List<String> sourceLocations = new ArrayList<>();
+    for (final Readable readable : readables) {
+      sourceLocations.addAll(readable.getLocations());
+    }
+    return new HashSet<>(sourceLocations);
   }
 
   /**
-   * Try to schedule a TaskGroup.
-   * If the TaskGroup has one or more source tasks, this method schedules the 
task group to one of the physical nodes,
-   * chosen from union of set of locations where splits of each source task 
resides.
-   * If the TaskGroup has no source tasks, falls back to {@link 
RoundRobinSchedulingPolicy}.
-   * @param scheduledTaskGroup to schedule.
-   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
-   * @return true if the task group is successfully scheduled, false otherwise.
+   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be 
filtered by source location.
+   *                               If there is no source locations, will 
return original set.
+   * @param scheduledTaskGroup {@link ScheduledTaskGroup} to be scheduled.
+   * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
-  public boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup,
-                                   final JobStateManager jobStateManager) {
-    Set<String> sourceLocations = Collections.emptySet();
+  public Set<ExecutorRepresenter> filterExecutorRepresenters(final 
Set<ExecutorRepresenter> executorRepresenterSet,
+                                                             final 
ScheduledTaskGroup scheduledTaskGroup) {
+    final Set<String> sourceLocations;
     try {
       sourceLocations = 
getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
     } catch (final UnsupportedOperationException e) {
-      // do nothing
+      return executorRepresenterSet;
     } catch (final Exception e) {
       throw new RuntimeException(e);
     }
-    if (sourceLocations.size() == 0) {
-      // No source location information found, fall back to the 
RoundRobinSchedulingPolicy
-      return roundRobinSchedulingPolicy.scheduleTaskGroup(scheduledTaskGroup, 
jobStateManager);
-    }
-
-    return scheduleToLocalNode(scheduledTaskGroup, jobStateManager, 
sourceLocations);
-  }
 
-  /**
-   * Try to schedule a TaskGroup with source task.
-   * @param scheduledTaskGroup TaskGroup to schedule
-   * @param jobStateManager {@link JobStateManager}
-   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
-   * @return true if the task group is successfully scheduled, false otherwise.
-   */
-  private boolean scheduleToLocalNode(final ScheduledTaskGroup 
scheduledTaskGroup,
-                                      final JobStateManager jobStateManager,
-                                      final Set<String> sourceLocations) {
-    final List<ExecutorRepresenter> candidateExecutors =
-        
selectExecutorByContainerTypeAndNodeNames(scheduledTaskGroup.getContainerType(),
 sourceLocations);
-    if (candidateExecutors.size() == 0) {
-      return false;
+    if (sourceLocations.size() == 0) {
+      return executorRepresenterSet;
     }
-    final int randomIndex = ThreadLocalRandom.current().nextInt(0, 
candidateExecutors.size());
-    final ExecutorRepresenter selectedExecutor = 
candidateExecutors.get(randomIndex);
 
-    
jobStateManager.onTaskGroupStateChanged(scheduledTaskGroup.getTaskGroupId(), 
TaskGroupState.State.EXECUTING);
-    selectedExecutor.onTaskGroupScheduled(scheduledTaskGroup);
-    LOG.info("Scheduling {} (source location: {}) to {} (node name: {})", 
scheduledTaskGroup.getTaskGroupId(),
-        String.join(", ", sourceLocations), selectedExecutor.getExecutorId(),
-        selectedExecutor.getNodeName());
-    return true;
-  }
-
-  @Override
-  public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
-    roundRobinSchedulingPolicy.onExecutorAdded(executorRepresenter);
-  }
-
-  @Override
-  public Set<String> onExecutorRemoved(final String executorId) {
-    return roundRobinSchedulingPolicy.onExecutorRemoved(executorId);
-  }
+    final Set<ExecutorRepresenter> candidateExecutors =
+            executorRepresenterSet.stream()
+            .filter(executor -> 
sourceLocations.contains(executor.getNodeName()))
+            .collect(Collectors.toSet());
 
-  @Override
-  public void onTaskGroupExecutionComplete(final String executorId, final 
String taskGroupId) {
-    roundRobinSchedulingPolicy.onTaskGroupExecutionComplete(executorId, 
taskGroupId);
-  }
-
-  @Override
-  public void onTaskGroupExecutionFailed(final String executorId, final String 
taskGroupId) {
-    roundRobinSchedulingPolicy.onTaskGroupExecutionFailed(executorId, 
taskGroupId);
-  }
-
-  @Override
-  public void terminate() {
-    roundRobinSchedulingPolicy.terminate();
-  }
-
-  /**
-   * @param containerType type of the desired container type
-   * @param nodeNames set of node names
-   * @return list of executors, which resides in one of {@code nodeNames}, has 
container type of {@code containerType},
-   *         and has an empty slot for execution
-   */
-  private List<ExecutorRepresenter> selectExecutorByContainerTypeAndNodeNames(
-    final String containerType, final Set<String> nodeNames) {
-    final Stream<ExecutorRepresenter> localNodesWithSpareCapacity = 
executorRegistry.getRunningExecutorIds().stream()
-        .map(executorId -> 
executorRegistry.getRunningExecutorRepresenter(executorId))
-        .filter(executor -> executor.getRunningTaskGroups().size() < 
executor.getExecutorCapacity())
-        .filter(executor -> nodeNames.contains(executor.getNodeName()));
-    if (containerType.equals(ExecutorPlacementProperty.NONE)) {
-      return localNodesWithSpareCapacity.collect(Collectors.toList());
-    } else {
-      return localNodesWithSpareCapacity.filter(executor -> 
executor.getContainerType().equals(containerType))
-          .collect(Collectors.toList());
-    }
-  }
-
-  /**
-   * @param readables collection of readables
-   * @return Set of source locations from source tasks in {@code taskGroupDAG}
-   * @throws Exception for any exception raised during querying source 
locations for a readable
-   */
-  private static Set<String> getSourceLocations(final Collection<Readable> 
readables) throws Exception {
-    final List<String> sourceLocations = new ArrayList<>();
-    for (final Readable readable : readables) {
-      sourceLocations.addAll(readable.getLocations());
-    }
-    return new HashSet<>(sourceLocations);
+    return candidateExecutors;
   }
 }
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
index f95c452a..49d5d4dc 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
@@ -107,12 +107,24 @@ public static void 
sendTaskGroupStateEventToScheduler(final Scheduler scheduler,
   public static void mockSchedulerRunner(final PendingTaskGroupCollection 
pendingTaskGroupCollection,
                                          final SchedulingPolicy 
schedulingPolicy,
                                          final JobStateManager jobStateManager,
+                                         final ExecutorRegistry 
executorRegistry,
                                          final boolean isPartialSchedule) {
     while (!pendingTaskGroupCollection.isEmpty()) {
       final ScheduledTaskGroup taskGroupToSchedule = 
pendingTaskGroupCollection.remove(
           
pendingTaskGroupCollection.peekSchedulableTaskGroups().get().iterator().next().getTaskGroupId());
 
-      schedulingPolicy.scheduleTaskGroup(taskGroupToSchedule, jobStateManager);
+      final Set<ExecutorRepresenter> runningExecutorRepresenter =
+          executorRegistry.getRunningExecutorIds().stream()
+              .map(executorId -> 
executorRegistry.getExecutorRepresenter(executorId))
+              .collect(Collectors.toSet());
+      final Set<ExecutorRepresenter> candidateExecutors =
+          
schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, 
taskGroupToSchedule);
+      if (candidateExecutors.size() > 0) {
+        
jobStateManager.onTaskGroupStateChanged(taskGroupToSchedule.getTaskGroupId(),
+            TaskGroupState.State.EXECUTING);
+        final ExecutorRepresenter executor = 
candidateExecutors.stream().findFirst().get();
+        executor.onTaskGroupScheduled(taskGroupToSchedule);
+      }
 
       // Schedule only the first task group.
       if (isPartialSchedule) {
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index ff16bf44..7131b86f 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -74,9 +74,11 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.awt.*;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
@@ -128,17 +130,17 @@ public void setUp() throws InjectionException {
     injector.bindVolatileInstance(EvaluatorRequestor.class, 
mock(EvaluatorRequestor.class));
     injector.bindVolatileInstance(MessageEnvironment.class, 
messageEnvironment);
     final ContainerManager containerManager = 
injector.getInstance(ContainerManager.class);
+    final ExecutorRegistry executorRegistry = 
injector.getInstance(ExecutorRegistry.class);
 
     final MetricMessageHandler metricMessageHandler = 
mock(MetricMessageHandler.class);
     final PubSubEventHandlerWrapper pubSubEventHandler = 
mock(PubSubEventHandlerWrapper.class);
     final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler = 
mock(UpdatePhysicalPlanEventHandler.class);
-    final SchedulingPolicy schedulingPolicy = new RoundRobinSchedulingPolicy(
-        injector.getInstance(ExecutorRegistry.class));
+    final SchedulingPolicy schedulingPolicy = 
injector.getInstance(CompositeSchedulingPolicy.class);
     final PendingTaskGroupCollection taskGroupQueue = new 
SingleJobTaskGroupCollection();
-    final SchedulerRunner schedulerRunner = new 
SchedulerRunner(schedulingPolicy, taskGroupQueue);
+    final SchedulerRunner schedulerRunner = new 
SchedulerRunner(schedulingPolicy, taskGroupQueue, executorRegistry);
     final Scheduler scheduler =
         new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, 
taskGroupQueue, master,
-            pubSubEventHandler, updatePhysicalPlanEventHandler);
+            pubSubEventHandler, updatePhysicalPlanEventHandler, 
executorRegistry);
     final AtomicInteger executorCount = new AtomicInteger(0);
 
     // Necessary for wiring up the message environments
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index c87bc818..ab39a968 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -102,13 +102,13 @@ public void setUp() throws Exception {
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
     metricMessageHandler = mock(MetricMessageHandler.class);
     pendingTaskGroupCollection = new SingleJobTaskGroupCollection();
-    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry);
-    schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskGroupCollection);
+    schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
+    schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskGroupCollection, executorRegistry);
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = 
mock(UpdatePhysicalPlanEventHandler.class);
     scheduler =
         new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, 
pendingTaskGroupCollection,
-            blockManagerMaster, pubSubEventHandler, 
updatePhysicalPlanEventHandler);
+            blockManagerMaster, pubSubEventHandler, 
updatePhysicalPlanEventHandler, executorRegistry);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
new file mode 100644
index 00000000..1f9462a8
--- /dev/null
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.runtime.master.scheduler;
+
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import 
edu.snu.nemo.runtime.master.scheduler.ContainerTypeAwareSchedulingPolicy;
+import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests {@link ContainerTypeAwareSchedulingPolicy}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ExecutorRepresenter.class, ScheduledTaskGroup.class})
+public final class ContainerTypeAwareSchedulingPolicyTest {
+
+  private static ExecutorRepresenter mockExecutorRepresenter(final String 
containerType) {
+    final ExecutorRepresenter executorRepresenter = 
mock(ExecutorRepresenter.class);
+    when(executorRepresenter.getContainerType()).thenReturn(containerType);
+    return executorRepresenter;
+  }
+
+  @Test
+  public void testContainerTypeAware() {
+    final SchedulingPolicy schedulingPolicy = new 
ContainerTypeAwareSchedulingPolicy();
+    final ExecutorRepresenter a0 = 
mockExecutorRepresenter(ExecutorPlacementProperty.TRANSIENT);
+    final ExecutorRepresenter a1 = 
mockExecutorRepresenter(ExecutorPlacementProperty.RESERVED);
+    final ExecutorRepresenter a2 = 
mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
+
+    final ScheduledTaskGroup scheduledTaskGroup1 = 
mock(ScheduledTaskGroup.class);
+    
when(scheduledTaskGroup1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
+
+    final Set<ExecutorRepresenter> executorRepresenterList1 = new 
HashSet<>(Arrays.asList(a0, a1, a2));
+
+    final Set<ExecutorRepresenter> candidateExecutors1 =
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, 
scheduledTaskGroup1);
+
+    final Set<ExecutorRepresenter> expectedExecutors1 = new 
HashSet<>(Arrays.asList(a1));
+    assertEquals(expectedExecutors1, candidateExecutors1);
+
+    final ScheduledTaskGroup scheduledTaskGroup2 = 
mock(ScheduledTaskGroup.class);
+    
when(scheduledTaskGroup2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
+
+    final Set<ExecutorRepresenter> executorRepresenterList2 = new 
HashSet<>(Arrays.asList(a0, a1, a2));
+
+    final Set<ExecutorRepresenter> candidateExecutors2 =
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, 
scheduledTaskGroup2);
+
+    final Set<ExecutorRepresenter> expectedExecutors2 = new 
HashSet<>(Arrays.asList(a0, a1, a2));
+    assertEquals(expectedExecutors2, candidateExecutors2);
+  }
+}
+
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
index 8bfd993c..f6419247 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
@@ -108,19 +108,20 @@ public void setUp() throws Exception {
 
   private void setUpExecutors(final Collection<ExecutorRepresenter> executors,
                               final boolean useMockSchedulerRunner) throws 
InjectionException {
-    executorRegistry = 
Tang.Factory.getTang().newInjector().getInstance(ExecutorRegistry.class);
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    executorRegistry = injector.getInstance(ExecutorRegistry.class);
 
     pendingTaskGroupCollection = new SingleJobTaskGroupCollection();
-    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry);
+    schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
 
     if (useMockSchedulerRunner) {
       schedulerRunner = mock(SchedulerRunner.class);
     } else {
-      schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskGroupCollection);
+      schedulerRunner = new SchedulerRunner(schedulingPolicy, 
pendingTaskGroupCollection, executorRegistry);
     }
     scheduler =
         new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, 
pendingTaskGroupCollection,
-            blockManagerMaster, pubSubEventHandler, 
updatePhysicalPlanEventHandler);
+            blockManagerMaster, pubSubEventHandler, 
updatePhysicalPlanEventHandler, executorRegistry);
 
     // Add nodes
     for (final ExecutorRepresenter executor : executors) {
@@ -207,14 +208,17 @@ public void testContainerRemoval() throws Exception {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 
TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+            executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
-        // There are 3 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        scheduler.onExecutorRemoved("a3");
+        // There are 2 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 1.
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+            executorRegistry, false);
 
         // Due to round robin scheduling, "a2" is assured to have a running 
TaskGroup.
         scheduler.onExecutorRemoved("a2");
@@ -224,30 +228,17 @@ public void testContainerRemoval() throws Exception {
         }
         assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 
2);
 
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+            executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else {
-        // There are 2 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 2.
+        // There are 1 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 2.
         // Schedule only the first TaskGroup
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, true);
-
-        boolean first = true;
-        for (final String taskGroupId : stage.getTaskGroupIds()) {
-          // When a TaskGroup fails while the siblings are still in the queue,
-          if (first) {
-            // Due to round robin scheduling, "a3" is assured to have a 
running TaskGroup.
-            scheduler.onExecutorRemoved("a3");
-            first = false;
-          } else {
-            // Test that the sibling TaskGroup state remains unchanged.
-            assertEquals(
-                
jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState(),
-                TaskGroupState.State.READY);
-          }
-        }
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+            executorRegistry, true);
       }
     }
   }
@@ -284,14 +275,16 @@ public void testOutputFailure() throws Exception {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 
TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+            executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+            executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
@@ -344,14 +337,16 @@ public void testInputReadFailure() throws Exception {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 
TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+            executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager,
+            executorRegistry, false);
 
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
new file mode 100644
index 00000000..5308ab6a
--- /dev/null
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.runtime.master.scheduler;
+
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import edu.snu.nemo.runtime.master.scheduler.FreeSlotSchedulingPolicy;
+import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests {@link FreeSlotSchedulingPolicy}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ExecutorRepresenter.class, ScheduledTaskGroup.class})
+public final class FreeSlotSchedulingPolicyTest {
+
+  private static ExecutorRepresenter mockExecutorRepresenter(final int 
numRunningTaskGroups,
+                                                             final int 
capacity) {
+    final ExecutorRepresenter executorRepresenter = 
mock(ExecutorRepresenter.class);
+    final Set<String> runningTaskGroups = new HashSet<>();
+    IntStream.range(0, numRunningTaskGroups).forEach(i -> 
runningTaskGroups.add(String.valueOf(i)));
+    
when(executorRepresenter.getRunningTaskGroups()).thenReturn(runningTaskGroups);
+    when(executorRepresenter.getExecutorCapacity()).thenReturn(capacity);
+    return executorRepresenter;
+  }
+
+  @Test
+  public void testFreeSlot() {
+    final SchedulingPolicy schedulingPolicy = new FreeSlotSchedulingPolicy();
+    final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
+    final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
+
+    final ScheduledTaskGroup scheduledTaskGroup = 
mock(ScheduledTaskGroup.class);
+
+    final Set<ExecutorRepresenter> executorRepresenterList = new 
HashSet<>(Arrays.asList(a0, a1));
+
+    final Set<ExecutorRepresenter> candidateExecutors =
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, 
scheduledTaskGroup);
+
+    final Set<ExecutorRepresenter> expectedExecutors = new 
HashSet<>(Arrays.asList(a1));
+    assertEquals(expectedExecutors, candidateExecutors);
+  }
+}
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
index 22068a59..6061fc06 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2017 Seoul National University
+ * Copyright (C) 2018 Seoul National University
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -15,130 +15,52 @@
  */
 package edu.snu.nemo.tests.runtime.master.scheduler;
 
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.comm.ControlMessage;
-import edu.snu.nemo.runtime.common.message.MessageSender;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
-import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
 import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.Function;
+import java.util.stream.IntStream;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
 
 /**
  * Tests {@link RoundRobinSchedulingPolicy}
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(JobStateManager.class)
+@PrepareForTest({ExecutorRepresenter.class, ScheduledTaskGroup.class})
 public final class RoundRobinSchedulingPolicyTest {
-  private SchedulingPolicy schedulingPolicy;
-  private ExecutorRegistry executorRegistry;
-  private final MessageSender<ControlMessage.Message> mockMsgSender = 
mock(MessageSender.class);
-  private JobStateManager jobStateManager = mock(JobStateManager.class);
 
-  // This schedule index will make sure that task group events are not ignored
-  private static final int MAGIC_SCHEDULE_ATTEMPT_INDEX = Integer.MAX_VALUE;
-  private static final String RESERVED_EXECUTOR_ID = "RESERVED";
-
-  @Before
-  public void setUp() throws InjectionException {
-    executorRegistry = 
Tang.Factory.getTang().newInjector().getInstance(ExecutorRegistry.class);
-
-    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry);
-
-    final ActiveContext activeContext = mock(ActiveContext.class);
-    Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
-    final ExecutorService serExecutorService = 
Executors.newSingleThreadExecutor();
-    final ResourceSpecification computeSpec = new 
ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 1, 0);
-    final Function<String, ExecutorRepresenter> 
computeSpecExecutorRepresenterGenerator = executorId ->
-        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, 
activeContext, serExecutorService, executorId);
-    final ExecutorRepresenter a3 = 
computeSpecExecutorRepresenterGenerator.apply("a3");
-    final ExecutorRepresenter a2 = 
computeSpecExecutorRepresenterGenerator.apply("a2");
-    final ExecutorRepresenter a1 = 
computeSpecExecutorRepresenterGenerator.apply("a1");
-
-    final ResourceSpecification storageSpec = new 
ResourceSpecification(ExecutorPlacementProperty.TRANSIENT, 1, 0);
-    final Function<String, ExecutorRepresenter> 
storageSpecExecutorRepresenterGenerator = executorId ->
-        new ExecutorRepresenter(executorId, storageSpec, mockMsgSender, 
activeContext, serExecutorService, executorId);
-    final ExecutorRepresenter b2 = 
storageSpecExecutorRepresenterGenerator.apply("b2");
-    final ExecutorRepresenter b1 = 
storageSpecExecutorRepresenterGenerator.apply("b1");
-
-    final ResourceSpecification reservedSpec = new 
ResourceSpecification(ExecutorPlacementProperty.RESERVED, 1, 0);
-    final Function<String, ExecutorRepresenter> 
reservedSpecExecutorRepresenterGenerator = executorId ->
-        new ExecutorRepresenter(executorId, reservedSpec, mockMsgSender, 
activeContext, serExecutorService, executorId);
-    final ExecutorRepresenter r = 
reservedSpecExecutorRepresenterGenerator.apply(RESERVED_EXECUTOR_ID);
-
-    // Add compute nodes
-    schedulingPolicy.onExecutorAdded(a3);
-    schedulingPolicy.onExecutorAdded(a2);
-    schedulingPolicy.onExecutorAdded(a1);
-
-    // Add storage nodes
-    schedulingPolicy.onExecutorAdded(b2);
-    schedulingPolicy.onExecutorAdded(b1);
-
-    // Add reserved node
-    schedulingPolicy.onExecutorAdded(r);
+  private static ExecutorRepresenter mockExecutorRepresenter(final int 
numRunningTaskGroups) {
+    final ExecutorRepresenter executorRepresenter = 
mock(ExecutorRepresenter.class);
+    final Set<String> runningTaskGroups = new HashSet<>();
+    IntStream.range(0, numRunningTaskGroups).forEach(i -> 
runningTaskGroups.add(String.valueOf(i)));
+    
when(executorRepresenter.getRunningTaskGroups()).thenReturn(runningTaskGroups);
+    return executorRepresenter;
   }
 
   @Test
-  public void testNoneContainerType() {
-    final int slots = 6;
-    final List<ScheduledTaskGroup> scheduledTaskGroups =
-        convertToScheduledTaskGroups(slots + 1, new byte[0], "Stage A", 
ExecutorPlacementProperty.NONE);
+  public void testRoundRobin() {
+    final SchedulingPolicy schedulingPolicy = new RoundRobinSchedulingPolicy();
+    final ExecutorRepresenter a0 = mockExecutorRepresenter(1);
+    final ExecutorRepresenter a1 = mockExecutorRepresenter(2);
+    final ExecutorRepresenter a2 = mockExecutorRepresenter(2);
 
-    boolean isScheduled;
-    for (int i = 0; i < slots; i++) {
-      isScheduled = 
schedulingPolicy.scheduleTaskGroup(scheduledTaskGroups.get(i), jobStateManager);
-      assertTrue(isScheduled);
-    }
+    final ScheduledTaskGroup scheduledTaskGroup = 
mock(ScheduledTaskGroup.class);
 
-    // No more slot
-    isScheduled = 
schedulingPolicy.scheduleTaskGroup(scheduledTaskGroups.get(slots), 
jobStateManager);
-    assertFalse(isScheduled);
-  }
+    final Set<ExecutorRepresenter> executorRepresenterList = new 
HashSet<>(Arrays.asList(a0, a1, a2));
+
+    final Set<ExecutorRepresenter> candidateExecutors =
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, 
scheduledTaskGroup);
 
-  /**
-   * Wrap a DAG of a task group into {@link ScheduledTaskGroup}s.
-   *
-   * @param parallelism            how many scheduled task group will be 
generated.
-   * @param serializedTaskGroupDag the serialized DAG of the task group.
-   * @param stageId                the ID of the stage.
-   * @param containerType          the type of container to execute the task 
group on.
-   * @return the wrapped scheduled task groups.
-   */
-  private List<ScheduledTaskGroup> convertToScheduledTaskGroups(final int 
parallelism,
-                                                                final byte[] 
serializedTaskGroupDag,
-                                                                final String 
stageId,
-                                                                final String 
containerType) {
-    final List<ScheduledTaskGroup> scheduledTaskGroups = new 
ArrayList<>(parallelism);
-    for (int taskGroupIdx = 0; taskGroupIdx < parallelism; taskGroupIdx++) {
-      final String taskGroupId = 
RuntimeIdGenerator.generateTaskGroupId(taskGroupIdx, stageId);
-      scheduledTaskGroups.add(
-          new ScheduledTaskGroup("TestPlan", serializedTaskGroupDag, 
taskGroupId, Collections.emptyList(),
-              Collections.emptyList(), MAGIC_SCHEDULE_ATTEMPT_INDEX, 
containerType, Collections.emptyMap()));
-    }
-    return scheduledTaskGroups;
+    final Set<ExecutorRepresenter> expectedExecutors = new 
HashSet<>(Arrays.asList(a0));
+    assertEquals(expectedExecutors, candidateExecutors);
   }
 }
 
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 143ecb4c..810bb313 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -15,21 +15,11 @@
  */
 package edu.snu.nemo.tests.runtime.master.scheduler;
 
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import 
edu.snu.nemo.runtime.master.scheduler.SourceLocationAwareSchedulingPolicy;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -37,91 +27,24 @@
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.*;
 
 /**
- * Test cases for
+ * Test cases for {@link SourceLocationAwareSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({JobStateManager.class, ExecutorRepresenter.class, 
RoundRobinSchedulingPolicy.class,
-    ScheduledTaskGroup.class, Readable.class})
+@PrepareForTest({ExecutorRepresenter.class, ScheduledTaskGroup.class, 
Readable.class})
 public final class SourceLocationAwareSchedulingPolicyTest {
   private static final String SITE_0 = "SEOUL";
   private static final String SITE_1 = "JINJU";
   private static final String SITE_2 = "BUSAN";
 
-  private SourceLocationAwareSchedulingPolicy sourceLocationAware;
-  private SpiedSchedulingPolicyWrapper<RoundRobinSchedulingPolicy> roundRobin;
-  private MockJobStateManagerWrapper jobStateManager;
-
-  @Before
-  public void setup() {
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    jobStateManager = new MockJobStateManagerWrapper();
-
-    final ExecutorRegistry executorRegistry = new ExecutorRegistry();
-    final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy =
-        new RoundRobinSchedulingPolicy(executorRegistry);
-    roundRobin = new SpiedSchedulingPolicyWrapper(roundRobinSchedulingPolicy, 
jobStateManager.get());
-
-    injector.bindVolatileInstance(RoundRobinSchedulingPolicy.class, 
roundRobin.get());
-    injector.bindVolatileInstance(JobStateManager.class, 
jobStateManager.get());
-    injector.bindVolatileInstance(ExecutorRegistry.class, executorRegistry);
-    try {
-      sourceLocationAware = 
injector.getInstance(SourceLocationAwareSchedulingPolicy.class);
-    } catch (final InjectionException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @After
-  public void teardown() {
-    // All expectations should be resolved at this time.
-    roundRobin.ensureNoUnresolvedExpectation();
-  }
-
-  /**
-   * {@link SourceLocationAwareSchedulingPolicy} should delegate scheduling 
decision when the
-   * {@link ScheduledTaskGroup} does not have any source tasks.
-   */
-  @Test
-  public void testRoundRobinSchedulerFallback() {
-    // Prepare test scenario
-    final ScheduledTaskGroup tg0 = 
CreateScheduledTaskGroup.withoutReadables(ExecutorPlacementProperty.NONE);
-    final ScheduledTaskGroup tg1 = 
CreateScheduledTaskGroup.withReadablesWithoutSourceLocations(2,
-        ExecutorPlacementProperty.NONE);
-    final ScheduledTaskGroup tg2 = 
CreateScheduledTaskGroup.withReadablesWhichThrowException(5,
-        ExecutorPlacementProperty.NONE);
-    addExecutor(new MockExecutorRepresenterWrapper(SITE_0, 
ExecutorPlacementProperty.NONE, 1));
-    addExecutor(new MockExecutorRepresenterWrapper(SITE_1, 
ExecutorPlacementProperty.NONE, 1));
-
-    // Trying to schedule tg0: expected to fall back to 
RoundRobinSchedulingPolicy
-    roundRobin.expectSchedulingRequest(tg0);
-    // ...and scheduling attempt must success
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg0, 
jobStateManager.get()));
-    // ...thus the TaskGroup should be running
-    jobStateManager.assertTaskGroupState(tg0.getTaskGroupId(), 
TaskGroupState.State.EXECUTING);
-
-    // Trying to schedule tg1: expected to fall back to 
RoundRobinSchedulingPolicy
-    roundRobin.expectSchedulingRequest(tg1);
-    // ...and scheduling attempt must success
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg1, 
jobStateManager.get()));
-    // ...thus the TaskGroup should be running
-    jobStateManager.assertTaskGroupState(tg1.getTaskGroupId(), 
TaskGroupState.State.EXECUTING);
-
-    // Trying to schedule tg2: expected to fall back to 
RoundRobinSchedulingPolicy
-    roundRobin.expectSchedulingRequest(tg2);
-    // ...and scheduling attempt must success
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg2, 
jobStateManager.get()));
-    // ...thus the TaskGroup should be running
-    jobStateManager.assertTaskGroupState(tg2.getTaskGroupId(), 
TaskGroupState.State.EXECUTING);
+  private static ExecutorRepresenter mockExecutorRepresenter(final String 
executorId) {
+    final ExecutorRepresenter executorRepresenter = 
mock(ExecutorRepresenter.class);
+    when(executorRepresenter.getNodeName()).thenReturn(executorId);
+    return executorRepresenter;
   }
 
   /**
@@ -130,83 +53,16 @@ public void testRoundRobinSchedulerFallback() {
    */
   @Test
   public void testSourceLocationAwareSchedulingNotAvailable() {
-    // Prepare test scenario
-    final ScheduledTaskGroup tg = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_0)), 
ExecutorPlacementProperty.NONE);
-    final MockExecutorRepresenterWrapper e0 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_1, 
ExecutorPlacementProperty.NONE, 1));
-    final MockExecutorRepresenterWrapper e1 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_1, 
ExecutorPlacementProperty.NONE, 1));
-
-    // Attempt to schedule tg must fail (fallback to round robin policy is not 
expected)
-    assertFalse(sourceLocationAware.scheduleTaskGroup(tg, 
jobStateManager.get()));
-    // Thus executors should have no running TaskGroups at all
-    e0.assertScheduledTaskGroups(Collections.emptyList());
-    e1.assertScheduledTaskGroups(Collections.emptyList());
-  }
+    final SchedulingPolicy schedulingPolicy = new 
SourceLocationAwareSchedulingPolicy();
 
-  private static final String CONTAINER_TYPE_A = "A";
-
-  /**
-   * {@link SourceLocationAwareSchedulingPolicy} should schedule TG to one of 
the executors with appropriate
-   * location and container type.
-   */
-  @Test
-  public void testSourceLocationAwareSchedulingWithContainerType() {
     // Prepare test scenario
     final ScheduledTaskGroup tg = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_0)), 
CONTAINER_TYPE_A);
-    final MockExecutorRepresenterWrapper e0 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_0, 
ExecutorPlacementProperty.NONE, 1));
-    final MockExecutorRepresenterWrapper e1 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_1, 
ExecutorPlacementProperty.NONE, 1));
-    final MockExecutorRepresenterWrapper e2 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_2, 
ExecutorPlacementProperty.NONE, 1));
-    final MockExecutorRepresenterWrapper e3 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_0, CONTAINER_TYPE_A, 1));
-    final MockExecutorRepresenterWrapper e4 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_1, CONTAINER_TYPE_A, 1));
-    final MockExecutorRepresenterWrapper e5 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_2, CONTAINER_TYPE_A, 1));
-
-    // Attempt to schedule tg must success (fallback to round robin is not 
expected)
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg, 
jobStateManager.get()));
-    // tg must run on e3
-    e0.assertScheduledTaskGroups(Collections.emptyList());
-    e1.assertScheduledTaskGroups(Collections.emptyList());
-    e2.assertScheduledTaskGroups(Collections.emptyList());
-    e3.assertScheduledTaskGroups(Collections.singletonList(tg));
-    e4.assertScheduledTaskGroups(Collections.emptyList());
-    e5.assertScheduledTaskGroups(Collections.emptyList());
-  }
-
-  /**
-   * {@link SourceLocationAwareSchedulingPolicy} should not schedule more TGs 
than executor capacity allows.
-   */
-  @Test
-  public void testSourceLocationAwareSchedulingDoesNotOverSchedule() {
-    // Prepare test scenario
-    final ScheduledTaskGroup tg0 = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_0)), 
CONTAINER_TYPE_A);
-    final ScheduledTaskGroup tg1 = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_0)), 
CONTAINER_TYPE_A);
-    final ScheduledTaskGroup tg2 = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_0)), 
CONTAINER_TYPE_A);
-    final ScheduledTaskGroup tg3 = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_0)), 
CONTAINER_TYPE_A);
-    final MockExecutorRepresenterWrapper e = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_0, CONTAINER_TYPE_A, 3));
-
-    // Attempt to schedule TG must success (fallback to round robin is not 
expected)
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg0, 
jobStateManager.get()));
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg1, 
jobStateManager.get()));
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg2, 
jobStateManager.get()));
+        Collections.singletonList(Collections.singletonList(SITE_0)));
+    final ExecutorRepresenter e0 = mockExecutorRepresenter(SITE_1);
+    final ExecutorRepresenter e1 = mockExecutorRepresenter(SITE_1);
 
-    // This must fail
-    assertFalse(sourceLocationAware.scheduleTaskGroup(tg3, 
jobStateManager.get()));
-
-    // Expected executor status
-    e.assertScheduledTaskGroups(Arrays.asList(tg0, tg1, tg2));
+    assertEquals(Collections.emptySet(),
+        schedulingPolicy.filterExecutorRepresenters(new 
HashSet<>(Arrays.asList(e0, e1)), tg));
   }
 
   /**
@@ -214,34 +70,26 @@ public void 
testSourceLocationAwareSchedulingDoesNotOverSchedule() {
    */
   @Test
   public void testSourceLocationAwareSchedulingWithMultiSource() {
+    final SchedulingPolicy schedulingPolicy = new 
SourceLocationAwareSchedulingPolicy();
     // Prepare test scenario
     final ScheduledTaskGroup tg0 = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_1)), 
CONTAINER_TYPE_A);
+        Collections.singletonList(Collections.singletonList(SITE_1)));
     final ScheduledTaskGroup tg1 = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Arrays.asList(SITE_0, SITE_1, SITE_2)), 
CONTAINER_TYPE_A);
+        Collections.singletonList(Arrays.asList(SITE_0, SITE_1, SITE_2)));
     final ScheduledTaskGroup tg2 = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_0), 
Collections.singletonList(SITE_1),
-            Arrays.asList(SITE_1, SITE_2)), CONTAINER_TYPE_A);
+            Arrays.asList(SITE_1, SITE_2)));
     final ScheduledTaskGroup tg3 = 
CreateScheduledTaskGroup.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_1), 
Collections.singletonList(SITE_0),
-            Arrays.asList(SITE_0, SITE_2)), CONTAINER_TYPE_A);
-    final MockExecutorRepresenterWrapper e = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_1, CONTAINER_TYPE_A, 4));
-
-    // Attempt to schedule TG must success (fallback to round robin is not 
expected)
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg0, 
jobStateManager.get()));
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg1, 
jobStateManager.get()));
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg2, 
jobStateManager.get()));
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg3, 
jobStateManager.get()));
+            Arrays.asList(SITE_0, SITE_2)));
 
-    // Expected executor status
-    e.assertScheduledTaskGroups(Arrays.asList(tg0, tg1, tg2, tg3));
+    final ExecutorRepresenter e = mockExecutorRepresenter(SITE_1);
+    for (final ScheduledTaskGroup tg : new HashSet<>(Arrays.asList(tg0, tg1, 
tg2, tg3))) {
+      assertEquals(new HashSet<>(Collections.singletonList(e)), 
schedulingPolicy.filterExecutorRepresenters(
+          new HashSet<>(Collections.singletonList(e)), tg));
+    }
   }
 
-  private MockExecutorRepresenterWrapper addExecutor(final 
MockExecutorRepresenterWrapper executor) {
-    sourceLocationAware.onExecutorAdded(executor.get());
-    return executor;
-  }
 
   /**
    * Utility for creating {@link ScheduledTaskGroup}.
@@ -250,19 +98,17 @@ private MockExecutorRepresenterWrapper addExecutor(final 
MockExecutorRepresenter
     private static final AtomicInteger taskGroupIndex = new AtomicInteger(0);
     private static final AtomicInteger taskIndex = new AtomicInteger(0);
 
-    private static ScheduledTaskGroup doCreate(final Collection<Readable> 
readables, final String containerType) {
+    private static ScheduledTaskGroup doCreate(final Collection<Readable> 
readables) {
       final ScheduledTaskGroup mockInstance = mock(ScheduledTaskGroup.class);
       final Map<String, Readable> readableMap = new HashMap<>();
       readables.forEach(readable -> readableMap.put(String.format("TASK-%d", 
taskIndex.getAndIncrement()),
           readable));
       when(mockInstance.getTaskGroupId()).thenReturn(String.format("TG-%d", 
taskGroupIndex.getAndIncrement()));
       when(mockInstance.getLogicalTaskIdToReadable()).thenReturn(readableMap);
-      when(mockInstance.getContainerType()).thenReturn(containerType);
       return mockInstance;
     }
 
-    static ScheduledTaskGroup withReadablesWithSourceLocations(final 
Collection<List<String>> sourceLocation,
-                                                               final String 
containerType) {
+    static ScheduledTaskGroup withReadablesWithSourceLocations(final 
Collection<List<String>> sourceLocation) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (final List<String> locations : sourceLocation) {
@@ -270,14 +116,13 @@ static ScheduledTaskGroup 
withReadablesWithSourceLocations(final Collection<List
           when(readable.getLocations()).thenReturn(locations);
           readables.add(readable);
         }
-        return doCreate(readables, containerType);
+        return doCreate(readables);
       } catch (final Exception e) {
         throw new RuntimeException(e);
       }
     }
 
-    static ScheduledTaskGroup withReadablesWithoutSourceLocations(final int 
numReadables,
-                                                                  final String 
containerType) {
+    static ScheduledTaskGroup withReadablesWithoutSourceLocations(final int 
numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -285,14 +130,13 @@ static ScheduledTaskGroup 
withReadablesWithoutSourceLocations(final int numReada
           when(readable.getLocations()).thenReturn(Collections.emptyList());
           readables.add(readable);
         }
-        return doCreate(readables, containerType);
+        return doCreate(readables);
       } catch (final Exception e) {
         throw new RuntimeException(e);
       }
     }
 
-    static ScheduledTaskGroup withReadablesWhichThrowException(final int 
numReadables,
-                                                               final String 
containerType) {
+    static ScheduledTaskGroup withReadablesWhichThrowException(final int 
numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -300,128 +144,14 @@ static ScheduledTaskGroup 
withReadablesWhichThrowException(final int numReadable
           when(readable.getLocations()).thenThrow(new 
UnsupportedOperationException());
           readables.add(readable);
         }
-        return doCreate(readables, containerType);
+        return doCreate(readables);
       } catch (final Exception e) {
         throw new RuntimeException(e);
       }
     }
 
-    static ScheduledTaskGroup withoutReadables(final String containerType) {
-      return doCreate(Collections.emptyList(), containerType);
-    }
-  }
-
-  /**
-   * Wrapper for mock {@link ExecutorRepresenter}.
-   */
-  private static final class MockExecutorRepresenterWrapper {
-    private static final AtomicInteger executorIndex = new AtomicInteger(0);
-
-    private final ExecutorRepresenter mockInstance;
-    private final List<ScheduledTaskGroup> scheduledTaskGroups = new 
ArrayList<>();
-
-    MockExecutorRepresenterWrapper(final String nodeName, final String 
containerType, final int capacity) {
-      mockInstance = mock(ExecutorRepresenter.class);
-      doAnswer(invocationOnMock -> {
-        final ScheduledTaskGroup scheduledTaskGroup = 
invocationOnMock.getArgument(0);
-        scheduledTaskGroups.add(scheduledTaskGroup);
-        return null;
-      
}).when(mockInstance).onTaskGroupScheduled(any(ScheduledTaskGroup.class));
-      doAnswer(invocationOnMock -> {
-        final String taskGroupId = invocationOnMock.getArgument(0);
-        scheduledTaskGroups.removeIf(scheduledTaskGroup -> 
scheduledTaskGroup.getTaskGroupId().equals(taskGroupId));
-        return null;
-      }).when(mockInstance).onTaskGroupExecutionComplete(anyString());
-      
when(mockInstance.getExecutorId()).thenReturn(String.format("EXECUTOR-%d", 
executorIndex.getAndIncrement()));
-      when(mockInstance.getNodeName()).thenReturn(nodeName);
-      when(mockInstance.getContainerType()).thenReturn(containerType);
-      doAnswer(invocationOnMock ->
-          
scheduledTaskGroups.stream().map(ScheduledTaskGroup::getTaskGroupId).collect(Collectors.toSet()))
-          .when(mockInstance).getRunningTaskGroups();
-      when(mockInstance.getExecutorCapacity()).thenReturn(capacity);
-    }
-
-    void assertScheduledTaskGroups(final List<ScheduledTaskGroup> expected) {
-      assertEquals(expected, scheduledTaskGroups);
-    }
-
-    ExecutorRepresenter get() {
-      return mockInstance;
-    }
-  }
-
-  /**
-   * Wrapper for spied {@link SchedulingPolicy}.
-   * @param <T> the class of the spied instance
-   */
-  private static final class SpiedSchedulingPolicyWrapper<T extends 
SchedulingPolicy> {
-    private final T spiedInstance;
-    private ScheduledTaskGroup expectedArgument = null;
-
-    SpiedSchedulingPolicyWrapper(final T schedulingPolicy, final 
JobStateManager jobStateManager) {
-      spiedInstance = spy(schedulingPolicy);
-      doAnswer(invocationOnMock -> {
-        final ScheduledTaskGroup scheduledTaskGroup = 
invocationOnMock.getArgument(0);
-        assertEquals(expectedArgument, scheduledTaskGroup);
-        expectedArgument = null;
-        
jobStateManager.onTaskGroupStateChanged(scheduledTaskGroup.getTaskGroupId(), 
TaskGroupState.State.EXECUTING);
-        return true;
-      }).when(spiedInstance).scheduleTaskGroup(any(ScheduledTaskGroup.class), 
any());
-    }
-
-    /**
-     * Sets expected {@link 
SchedulingPolicy#scheduleTaskGroup(ScheduledTaskGroup, JobStateManager)} 
invocation
-     * on this spied object.
-     * @param scheduledTaskGroup expected parameter for the task group to 
schedule
-     */
-    void expectSchedulingRequest(final ScheduledTaskGroup scheduledTaskGroup) {
-      ensureNoUnresolvedExpectation();
-      this.expectedArgument = scheduledTaskGroup;
-    }
-
-    void ensureNoUnresolvedExpectation() {
-      assertEquals(null, expectedArgument);
-    }
-
-    /**
-     * @return spied instance for {@link SchedulingPolicy}.
-     */
-    T get() {
-      return spiedInstance;
-    }
-  }
-
-  /**
-   * Wrapper for mock {@link JobStateManager} instance.
-   */
-  private static final class MockJobStateManagerWrapper {
-    private final JobStateManager mockInstance;
-    private final Map<String, TaskGroupState.State> taskGroupStates = new 
HashMap<>();
-
-    MockJobStateManagerWrapper() {
-      mockInstance = mock(JobStateManager.class);
-      doAnswer(invocationOnMock -> {
-        final String taskGroupId = invocationOnMock.getArgument(0);
-        final TaskGroupState.State newState = invocationOnMock.getArgument(1);
-        taskGroupStates.put(taskGroupId, newState);
-        return null;
-      }).when(mockInstance).onTaskGroupStateChanged(anyString(), 
any(TaskGroupState.State.class));
-    }
-
-    /**
-     * Ensures the TaskGroup state has been changed as expected.
-     * @param taskGroupId id of the TaskGroup
-     * @param state the expected state
-     */
-    void assertTaskGroupState(final String taskGroupId, final 
TaskGroupState.State state) {
-      assertEquals(state, taskGroupStates.get(taskGroupId));
-    }
-
-    /**
-     * @return mock instance for {@link JobStateManager}.
-     */
-    JobStateManager get() {
-      return mockInstance;
+    static ScheduledTaskGroup withoutReadables() {
+      return doCreate(Collections.emptyList());
     }
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to