johnyangk closed pull request #16: [NEMO-66] Move /tests/scheduler into 
/runtime/master/scheduler/test
URL: https://github.com/apache/incubator-nemo/pull/16
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index ba0a0991..18b0c20e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@ limitations under the License.
         <module>runtime/executor</module>
         <module>runtime/master</module>
         <module>runtime/driver</module>
+        <module>runtime/testplans</module>
         <module>tests</module>
     </modules>
 
diff --git a/runtime/master/pom.xml b/runtime/master/pom.xml
index a797767e..dc404118 100644
--- a/runtime/master/pom.xml
+++ b/runtime/master/pom.xml
@@ -46,6 +46,11 @@ limitations under the License.
             <artifactId>nemo-runtime-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>edu.snu.nemo</groupId>
+            <artifactId>nemo-runtime-testplans</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
similarity index 66%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
rename to 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
index c87bc818..ae2cb068 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
@@ -13,24 +13,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master.scheduler;
+package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import 
edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.compiler.frontend.beam.transform.DoTransform;
-import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
-import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
-import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
-import edu.snu.nemo.tests.runtime.RuntimeTestUtil;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
 import edu.snu.nemo.runtime.common.plan.physical.*;
@@ -39,13 +26,11 @@
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
+import edu.snu.nemo.runtime.master.physicalplans.TestPlanGenerator;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.dag.DAGBuilder;
-import edu.snu.nemo.runtime.master.scheduler.*;
-import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
@@ -75,7 +60,6 @@
     PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, 
MetricMessageHandler.class})
 public final class BatchSingleJobSchedulerTest {
   private static final Logger LOG = 
LoggerFactory.getLogger(BatchSingleJobSchedulerTest.class.getName());
-  private DAGBuilder<IRVertex, IREdge> irDAGBuilder;
   private Scheduler scheduler;
   private SchedulingPolicy schedulingPolicy;
   private SchedulerRunner schedulerRunner;
@@ -86,7 +70,6 @@
   private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
   private BlockManagerMaster blockManagerMaster = 
mock(BlockManagerMaster.class);
   private final MessageSender<ControlMessage.Message> mockMsgSender = 
mock(MessageSender.class);
-  private PhysicalPlanGenerator physicalPlanGenerator;
 
   private static final int EXECUTOR_CAPACITY = 20;
 
@@ -98,7 +81,6 @@ public void setUp() throws Exception {
     final Injector injector = Tang.Factory.getTang().newInjector();
     injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
 
-    irDAGBuilder = initializeDAGBuilder();
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
     metricMessageHandler = mock(MetricMessageHandler.class);
     pendingTaskGroupCollection = new SingleJobTaskGroupCollection();
@@ -139,8 +121,6 @@ public void setUp() throws Exception {
     // Add storage nodes
     scheduler.onExecutorAdded(b1);
     scheduler.onExecutorAdded(b2);
-
-    physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class);
   }
 
   /**
@@ -149,9 +129,7 @@ public void setUp() throws Exception {
    */
   @Test(timeout=10000)
   public void testPull() throws Exception {
-    final DAG<IRVertex, IREdge> pullIRDAG = 
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(), "");
-    scheduleAndCheckJobTermination(pullIRDAG);
+    scheduleAndCheckJobTermination(TestPlanGenerator.getSimplePullPlan());
   }
 
   /**
@@ -160,68 +138,19 @@ public void testPull() throws Exception {
    */
   @Test(timeout=10000)
   public void testPush() throws Exception {
-    final DAG<IRVertex, IREdge> pushIRDAG = 
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(true), "");
-    scheduleAndCheckJobTermination(pushIRDAG);
-  }
-
-  private DAGBuilder<IRVertex, IREdge> initializeDAGBuilder() {
-    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
-
-    final Transform t = new EmptyComponents.EmptyTransform("empty");
-    final IRVertex v1 = new OperatorVertex(t);
-    v1.setProperty(ParallelismProperty.of(1));
-    
v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    dagBuilder.addVertex(v1);
-
-    final IRVertex v2 = new OperatorVertex(t);
-    v2.setProperty(ParallelismProperty.of(2));
-    
v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    dagBuilder.addVertex(v2);
-
-    final IRVertex v3 = new OperatorVertex(t);
-    v3.setProperty(ParallelismProperty.of(3));
-    
v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    dagBuilder.addVertex(v3);
-
-    final IRVertex v4 = new OperatorVertex(t);
-    v4.setProperty(ParallelismProperty.of(4));
-    
v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    dagBuilder.addVertex(v4);
-
-    final IRVertex v5 = new OperatorVertex(new DoTransform(null, null));
-    v5.setProperty(ParallelismProperty.of(5));
-    
v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT));
-    dagBuilder.addVertex(v5);
-
-    final IREdge e1 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, 
Coder.DUMMY_CODER);
-    dagBuilder.connectVertices(e1);
-
-    final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, 
Coder.DUMMY_CODER);
-    dagBuilder.connectVertices(e2);
-
-    final IREdge e4 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, 
Coder.DUMMY_CODER);
-    dagBuilder.connectVertices(e4);
-
-    final IREdge e5 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v5, 
Coder.DUMMY_CODER);
-    dagBuilder.connectVertices(e5);
-
-    return dagBuilder;
+    scheduleAndCheckJobTermination(TestPlanGenerator.getSimplePushPlan());
   }
 
-  private void scheduleAndCheckJobTermination(final DAG<IRVertex, IREdge> 
irDAG) throws InjectionException {
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
irDAG.convert(physicalPlanGenerator);
-
-    final PhysicalPlan plan = new PhysicalPlan("TestPlan", physicalDAG, 
physicalPlanGenerator.getTaskIRVertexMap());
+  private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws 
InjectionException {
     final JobStateManager jobStateManager = new JobStateManager(plan, 
blockManagerMaster, metricMessageHandler, 1);
     scheduler.scheduleJob(plan, jobStateManager);
 
     // For each ScheduleGroup, test:
     // a) all stages in the ScheduleGroup enters the executing state
     // b) the stages of the next ScheduleGroup are scheduled after the stages 
of each ScheduleGroup are made "complete".
-    for (int i = 0; i < getNumScheduleGroups(irDAG); i++) {
+    for (int i = 0; i < getNumScheduleGroups(plan.getStageDAG()); i++) {
       final int scheduleGroupIdx = i;
-      final List<PhysicalStage> stages = 
filterStagesWithAScheduleGroupIndex(physicalDAG, scheduleGroupIdx);
+      final List<PhysicalStage> stages = 
filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx);
 
       LOG.debug("Checking that all stages of ScheduleGroup {} enter the 
executing state", scheduleGroupIdx);
       stages.forEach(physicalStage -> {
@@ -232,7 +161,7 @@ private void scheduleAndCheckJobTermination(final 
DAG<IRVertex, IREdge> irDAG) t
       });
 
       stages.forEach(physicalStage -> {
-        RuntimeTestUtil.completeStage(
+        SchedulerTestUtil.completeStage(
             jobStateManager, scheduler, executorRegistry, physicalStage, 
MAGIC_SCHEDULE_ATTEMPT_INDEX);
       });
     }
@@ -258,10 +187,9 @@ private void scheduleAndCheckJobTermination(final 
DAG<IRVertex, IREdge> irDAG) t
     return sortedStages;
   }
 
-  private int getNumScheduleGroups(final DAG<IRVertex, IREdge> irDAG) {
+  private int getNumScheduleGroups(final DAG<PhysicalStage, PhysicalStageEdge> 
physicalDAG) {
     final Set<Integer> scheduleGroupSet = new HashSet<>();
-    irDAG.getVertices().forEach(irVertex ->
-        scheduleGroupSet.add((Integer) 
irVertex.getProperty(ExecutionProperty.Key.ScheduleGroupIndex)));
+    physicalDAG.getVertices().forEach(stage -> 
scheduleGroupSet.add(stage.getScheduleGroupIndex()));
     return scheduleGroupSet.size();
   }
 }
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
similarity index 75%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
rename to 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
index 8bfd993c..28a82dcd 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
@@ -13,24 +13,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master.scheduler;
+package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.common.coder.Coder;
-import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import 
edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
-import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
-import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
-import edu.snu.nemo.tests.runtime.RuntimeTestUtil;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
 import edu.snu.nemo.runtime.common.plan.physical.*;
@@ -39,12 +28,10 @@
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
+import edu.snu.nemo.runtime.master.physicalplans.TestPlanGenerator;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
-import edu.snu.nemo.runtime.master.scheduler.*;
-import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
 import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.junit.Before;
@@ -89,7 +76,6 @@
   private BlockManagerMaster blockManagerMaster = 
mock(BlockManagerMaster.class);
   private final MessageSender<ControlMessage.Message> mockMsgSender = 
mock(MessageSender.class);
   private final ExecutorService serExecutorService = 
Executors.newSingleThreadExecutor();
-  private PhysicalPlanGenerator physicalPlanGenerator;
 
   private static final int MAX_SCHEDULE_ATTEMPT = 3;
 
@@ -101,9 +87,6 @@ public void setUp() throws Exception {
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = 
mock(UpdatePhysicalPlanEventHandler.class);
 
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
-    physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class);
   }
 
   private void setUpExecutors(final Collection<ExecutorRepresenter> executors,
@@ -128,53 +111,6 @@ private void setUpExecutors(final 
Collection<ExecutorRepresenter> executors,
     }
   }
 
-  private PhysicalPlan buildPlan() throws Exception {
-    // Build DAG
-    final Transform t = new EmptyComponents.EmptyTransform("empty");
-    final IRVertex v1 = new OperatorVertex(t);
-    v1.setProperty(ParallelismProperty.of(3));
-    
v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v1);
-
-    final IRVertex v2 = new OperatorVertex(t);
-    v2.setProperty(ParallelismProperty.of(2));
-    
v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v2);
-
-    final IRVertex v3 = new OperatorVertex(t);
-    v3.setProperty(ParallelismProperty.of(3));
-    
v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v3);
-
-    final IRVertex v4 = new OperatorVertex(t);
-    v4.setProperty(ParallelismProperty.of(2));
-    
v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v4);
-
-    final IRVertex v5 = new OperatorVertex(t);
-    v5.setProperty(ParallelismProperty.of(2));
-    
v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v5);
-
-    final IREdge e1 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e1);
-
-    final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e2);
-
-    final IREdge e3 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e3);
-
-    final IREdge e4 = new 
IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v5, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e4);
-
-    final DAG<IRVertex, IREdge> irDAG =
-        
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), new 
TestPolicy(), "");
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
irDAG.convert(physicalPlanGenerator);
-
-    return new PhysicalPlan("TestPlan", physicalDAG, 
physicalPlanGenerator.getTaskIRVertexMap());
-  }
-
   /**
    * Tests fault tolerance after a container removal.
    */
@@ -196,7 +132,7 @@ public void testContainerRemoval() throws Exception {
     executors.add(a3);
 
     setUpExecutors(executors, true);
-    final PhysicalPlan plan = buildPlan();
+    final PhysicalPlan plan = TestPlanGenerator.getSimplePullPlan();
     final JobStateManager jobStateManager =
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
     scheduler.scheduleJob(plan, jobStateManager);
@@ -207,14 +143,14 @@ public void testContainerRemoval() throws Exception {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 
TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+          SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
 
         // Due to round robin scheduling, "a2" is assured to have a running 
TaskGroup.
         scheduler.onExecutorRemoved("a2");
@@ -224,15 +160,15 @@ public void testContainerRemoval() throws Exception {
         }
         assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 
2);
 
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+          SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else {
         // There are 2 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 2.
         // Schedule only the first TaskGroup
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, true);
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, true);
 
         boolean first = true;
         for (final String taskGroupId : stage.getTaskGroupIds()) {
@@ -273,7 +209,7 @@ public void testOutputFailure() throws Exception {
     executors.add(a3);
     setUpExecutors(executors, true);
 
-    final PhysicalPlan plan = buildPlan();
+    final PhysicalPlan plan = TestPlanGenerator.getSimplePullPlan();
     final JobStateManager jobStateManager =
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
     scheduler.scheduleJob(plan, jobStateManager);
@@ -284,17 +220,17 @@ public void testOutputFailure() throws Exception {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 
TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+          SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+          SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1,
               TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
 
@@ -333,7 +269,7 @@ public void testInputReadFailure() throws Exception {
     executors.add(a3);
     setUpExecutors(executors, true);
 
-    final PhysicalPlan plan = buildPlan();
+    final PhysicalPlan plan = TestPlanGenerator.getSimplePullPlan();
     final JobStateManager jobStateManager =
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
     scheduler.scheduleJob(plan, jobStateManager);
@@ -344,17 +280,17 @@ public void testInputReadFailure() throws Exception {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 
TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+          SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 
TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, 
schedulingPolicy, jobStateManager, false);
 
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+          SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
               taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1,
               TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
 
@@ -393,7 +329,7 @@ public void testTaskGroupReexecutionForFailure() throws 
Exception {
 
     setUpExecutors(executors, false);
 
-    final PhysicalPlan plan = buildPlan();
+    final PhysicalPlan plan = TestPlanGenerator.getSimplePullPlan();
     final JobStateManager jobStateManager =
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 
MAX_SCHEDULE_ATTEMPT);
 
@@ -408,11 +344,11 @@ public void testTaskGroupReexecutionForFailure() throws 
Exception {
         final Set<String> a3RunningTaskGroups = new 
HashSet<>(a3.getRunningTaskGroups());
 
         a1RunningTaskGroups.forEach(taskGroupId ->
-            RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
                 taskGroupId, TaskGroupState.State.COMPLETE, 1));
 
         a3RunningTaskGroups.forEach(taskGroupId ->
-            RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
+            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, 
executorRegistry,
                 taskGroupId, TaskGroupState.State.COMPLETE, 1));
       }
     }
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
similarity index 95%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
rename to 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
index 22068a59..4ec8c9e6 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -13,19 +13,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master.scheduler;
+package edu.snu.nemo.runtime.master.scheduler;
 
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; 
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
-import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy;
-import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
new file mode 100644
index 00000000..b5af80d8
--- /dev/null
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.StageState;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+
+/**
+ * Utility class for runtime unit tests.
+ */
+public final class SchedulerTestUtil {
+  /**
+   * Complete the stage by completing all of its TaskGroups.
+   * @param jobStateManager for the submitted job.
+   * @param scheduler for the submitted job.
+   * @param executorRegistry provides executor representers
+   * @param physicalStage for which the states should be marked as complete.
+   */
+  public static void completeStage(final JobStateManager jobStateManager,
+                                   final Scheduler scheduler,
+                                   final ExecutorRegistry executorRegistry,
+                                   final PhysicalStage physicalStage,
+                                   final int attemptIdx) {
+    // Loop until the stage completes.
+    while (true) {
+      final Enum stageState = 
jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState();
+      if (StageState.State.COMPLETE == stageState) {
+        // Stage has completed, so we break out of the loop.
+        break;
+      } else if (StageState.State.EXECUTING == stageState) {
+        physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
+          final Enum tgState = 
jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState();
+          if (TaskGroupState.State.EXECUTING == tgState) {
+            sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, 
taskGroupId,
+                TaskGroupState.State.COMPLETE, attemptIdx, null);
+          } else if (TaskGroupState.State.READY == tgState || 
TaskGroupState.State.COMPLETE == tgState) {
+            // Skip READY (try in the next loop and see if it becomes 
EXECUTING) and COMPLETE.
+          } else {
+            throw new IllegalStateException(tgState.toString());
+          }
+        });
+      } else if (StageState.State.READY == stageState) {
+        // Skip and retry in the next loop.
+      } else {
+        throw new IllegalStateException(stageState.toString());
+      }
+    }
+  }
+
+  /**
+   * Sends task group state change event to scheduler.
+   * This replaces executor's task group completion messages for testing 
purposes.
+   * @param scheduler for the submitted job.
+   * @param executorRegistry provides executor representers
+   * @param taskGroupId for the task group to change the state.
+   * @param newState for the task group.
+   * @param cause in the case of a recoverable failure.
+   */
+  public static void sendTaskGroupStateEventToScheduler(final Scheduler 
scheduler,
+                                                        final ExecutorRegistry 
executorRegistry,
+                                                        final String 
taskGroupId,
+                                                        final 
TaskGroupState.State newState,
+                                                        final int attemptIdx,
+                                                        final 
TaskGroupState.RecoverableFailureCause cause) {
+    ExecutorRepresenter scheduledExecutor;
+    do {
+      scheduledExecutor = findExecutorForTaskGroup(executorRegistry, 
taskGroupId);
+    } while (scheduledExecutor == null);
+
+    scheduler.onTaskGroupStateChanged(scheduledExecutor.getExecutorId(), 
taskGroupId,
+        newState, attemptIdx, null, cause);
+  }
+
+  public static void sendTaskGroupStateEventToScheduler(final Scheduler 
scheduler,
+                                                        final ExecutorRegistry 
executorRegistry,
+                                                        final String 
taskGroupId,
+                                                        final 
TaskGroupState.State newState,
+                                                        final int attemptIdx) {
+    sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, 
taskGroupId, newState, attemptIdx, null);
+  }
+
+  public static void mockSchedulerRunner(final PendingTaskGroupCollection 
pendingTaskGroupCollection,
+                                         final SchedulingPolicy 
schedulingPolicy,
+                                         final JobStateManager jobStateManager,
+                                         final boolean isPartialSchedule) {
+    while (!pendingTaskGroupCollection.isEmpty()) {
+      final ScheduledTaskGroup taskGroupToSchedule = 
pendingTaskGroupCollection.remove(
+          
pendingTaskGroupCollection.peekSchedulableTaskGroups().get().iterator().next().getTaskGroupId());
+
+      schedulingPolicy.scheduleTaskGroup(taskGroupToSchedule, jobStateManager);
+
+      // Schedule only the first task group.
+      if (isPartialSchedule) {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Retrieves the executor to which the given task group was scheduled.
+   * @param taskGroupId of the task group to search.
+   * @param executorRegistry provides executor representers
+   * @return the {@link ExecutorRepresenter} of the executor the task group 
was scheduled to.
+   */
+  private static ExecutorRepresenter findExecutorForTaskGroup(final 
ExecutorRegistry executorRegistry,
+                                                              final String 
taskGroupId) {
+    for (final String executorId : executorRegistry.getRunningExecutorIds()) {
+      final ExecutorRepresenter executor = 
executorRegistry.getRunningExecutorRepresenter(executorId);
+      if (executor.getRunningTaskGroups().contains(taskGroupId)
+          || executor.getCompleteTaskGroups().contains(taskGroupId)) {
+        return executor;
+      }
+    }
+    return null;
+  }
+}
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java
similarity index 85%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java
rename to 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java
index 41bdb4dc..2fe285c6 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java
@@ -13,10 +13,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master.scheduler;
+package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.coder.Coder;
-import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.common.ir.edge.IREdge;
@@ -25,14 +24,9 @@
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
-import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.master.scheduler.SingleJobTaskGroupCollection;
-import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
+import edu.snu.nemo.runtime.master.physicalplans.TestPlanGenerator;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -54,7 +48,6 @@
 public final class SingleTaskGroupQueueTest {
   private DAGBuilder<IRVertex, IREdge> irDAGBuilder;
   private SingleJobTaskGroupCollection pendingTaskGroupPriorityQueue;
-  private PhysicalPlanGenerator physicalPlanGenerator;
 
   /**
    * To be used for a thread pool to execute task groups.
@@ -66,10 +59,6 @@ public void setUp() throws Exception{
     irDAGBuilder = new DAGBuilder<>();
     pendingTaskGroupPriorityQueue = new SingleJobTaskGroupCollection();
     executorService = Executors.newFixedThreadPool(2);
-
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
-    physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class);
   }
 
   /**
@@ -100,15 +89,9 @@ public void testPushPriority() throws Exception {
     final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, 
Coder.DUMMY_CODER);
     irDAGBuilder.connectVertices(e2);
 
-    final DAG<IRVertex, IREdge> irDAG = 
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(true), "");
-
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
irDAG.convert(physicalPlanGenerator);
-
-    pendingTaskGroupPriorityQueue.onJobScheduled(
-        new PhysicalPlan("TestPlan", physicalDAG, 
physicalPlanGenerator.getTaskIRVertexMap()));
-
-    final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort();
+    final PhysicalPlan physicalPlan = TestPlanGenerator.getSimplePullPlan();
+    pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan);
+    final List<PhysicalStage> dagOf2Stages = 
physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's 
requirements.
     assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 
dagOf2Stages.get(1).getScheduleGroupIndex());
@@ -183,15 +166,9 @@ public void testPullPriority() throws Exception {
     final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, 
Coder.DUMMY_CODER);
     irDAGBuilder.connectVertices(e2);
 
-    final DAG<IRVertex, IREdge> irDAG = 
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(), "");
-
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
irDAG.convert(physicalPlanGenerator);
-
-    pendingTaskGroupPriorityQueue.onJobScheduled(
-        new PhysicalPlan("TestPlan", physicalDAG, 
physicalPlanGenerator.getTaskIRVertexMap()));
-
-    final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort();
+    final PhysicalPlan physicalPlan = TestPlanGenerator.getSimplePullPlan();
+    pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan);
+    final List<PhysicalStage> dagOf2Stages = 
physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's 
requirements.
     assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 0);
@@ -263,15 +240,9 @@ public void testWithDifferentContainerType() throws 
Exception {
     final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, 
Coder.DUMMY_CODER);
     irDAGBuilder.connectVertices(e2);
 
-    final DAG<IRVertex, IREdge> irDAG = 
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(true), "");
-
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
irDAG.convert(physicalPlanGenerator);
-
-    pendingTaskGroupPriorityQueue.onJobScheduled(
-        new PhysicalPlan("TestPlan", physicalDAG, 
physicalPlanGenerator.getTaskIRVertexMap()));
-
-    final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort();
+    final PhysicalPlan physicalPlan = TestPlanGenerator.getSimplePullPlan();
+    pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan);
+    final List<PhysicalStage> dagOf2Stages = 
physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's 
requirements.
     assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 
dagOf2Stages.get(1).getScheduleGroupIndex());
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
similarity index 98%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
rename to 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 143ecb4c..8fc87cf1 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ 
b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -13,18 +13,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master.scheduler;
+package edu.snu.nemo.runtime.master.scheduler;
 
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy;
-import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
-import 
edu.snu.nemo.runtime.master.scheduler.SourceLocationAwareSchedulingPolicy;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
diff --git a/runtime/testplans/pom.xml b/runtime/testplans/pom.xml
new file mode 100644
index 00000000..64799ba1
--- /dev/null
+++ b/runtime/testplans/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright (C) 2017 Seoul National University
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>edu.snu.nemo</groupId>
+        <artifactId>nemo-project</artifactId>
+        <version>0.1-SNAPSHOT</version>
+        <relativePath>../../</relativePath>
+    </parent>
+
+    <artifactId>nemo-runtime-testplans</artifactId>
+    <name>Nemo Runtime Test Plans</name>
+
+    <repositories>
+        <repository>
+            <id>Bundled Maven Repository</id>
+            
<url>file://${basedir}/../../common/src/main/resources/repository</url>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>edu.snu.nemo</groupId>
+            <artifactId>nemo-compiler-optimizer</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>edu.snu.nemo</groupId>
+            <artifactId>nemo-runtime-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/BasicPullPolicy.java
 
b/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/BasicPullPolicy.java
new file mode 100644
index 00000000..dfeb7c88
--- /dev/null
+++ 
b/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/BasicPullPolicy.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.physicalplans;
+
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
+import 
edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
+import 
edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import edu.snu.nemo.compiler.optimizer.policy.Policy;
+import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic pull policy.
+ */
+public final class BasicPullPolicy implements Policy {
+  @Override
+  public List<CompileTimePass> getCompileTimePasses() {
+    List<CompileTimePass> policy = new ArrayList<>();
+    policy.add(new DefaultStagePartitioningPass());
+    policy.add(new ScheduleGroupPass());
+    return policy;
+  }
+
+  @Override
+  public List<RuntimePass<?>> getRuntimePasses() {
+    return new ArrayList<>();
+  }
+}
diff --git 
a/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/BasicPushPolicy.java
 
b/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/BasicPushPolicy.java
new file mode 100644
index 00000000..60a72699
--- /dev/null
+++ 
b/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/BasicPushPolicy.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.physicalplans;
+
+import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
+import 
edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass;
+import 
edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass;
+import 
edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ShuffleEdgePushPass;
+import edu.snu.nemo.compiler.optimizer.policy.Policy;
+import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic push policy.
+ */
+public final class BasicPushPolicy implements Policy {
+  @Override
+  public List<CompileTimePass> getCompileTimePasses() {
+    List<CompileTimePass> policy = new ArrayList<>();
+    policy.add(new DefaultStagePartitioningPass());
+    policy.add(new ShuffleEdgePushPass());
+    policy.add(new ScheduleGroupPass());
+    return policy;
+  }
+
+  @Override
+  public List<RuntimePass<?>> getRuntimePasses() {
+    return new ArrayList<>();
+  }
+}
diff --git 
a/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/TestPlanGenerator.java
 
b/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/TestPlanGenerator.java
new file mode 100644
index 00000000..352aa149
--- /dev/null
+++ 
b/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/TestPlanGenerator.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.physicalplans;
+
+import edu.snu.nemo.common.coder.Coder;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import 
edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
+import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
+import edu.snu.nemo.compiler.optimizer.policy.Policy;
+import edu.snu.nemo.conf.JobConf;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+/**
+ * Generates physical plans from IRs.
+ */
+public final class TestPlanGenerator {
+  private static final PhysicalPlanGenerator PLAN_GENERATOR;
+  private static final String EMPTY_DAG_DIRECTORY = "";
+
+  static {
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    injector.bindVolatileParameter(JobConf.DAGDirectory.class, 
EMPTY_DAG_DIRECTORY);
+    try {
+      PLAN_GENERATOR = injector.getInstance(PhysicalPlanGenerator.class);
+    } catch (InjectionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * private constructor.
+   */
+  private TestPlanGenerator() {
+  }
+
+  /**
+   * @return push-based plan
+   * @throws Exception exception
+   */
+  public static PhysicalPlan getSimplePushPlan() throws Exception {
+    return getSimplePlan(new BasicPushPolicy());
+  }
+
+  /**
+   * @return pull-based plan
+   * @throws Exception exception
+   */
+  public static PhysicalPlan getSimplePullPlan() throws Exception {
+    return getSimplePlan(new BasicPullPolicy());
+  }
+
+  /**
+   * @param policy
+   * @return a simple plan given the policy
+   * @throws Exception exception
+   */
+  private static PhysicalPlan getSimplePlan(final Policy policy) throws 
Exception {
+    final DAG<IRVertex, IREdge> simpleIRDAG = buildSimpleIRDAG();
+    final DAG<IRVertex, IREdge> irDAG = 
CompiletimeOptimizer.optimize(simpleIRDAG, policy, EMPTY_DAG_DIRECTORY);
+    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
irDAG.convert(PLAN_GENERATOR);
+    return new PhysicalPlan("SimplePlan", physicalDAG, 
PLAN_GENERATOR.getTaskIRVertexMap());
+  }
+
+  /**
+   * @return a simple IR DAG
+   */
+  private static DAG<IRVertex, IREdge> buildSimpleIRDAG() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+
+    final Transform t = new EmptyComponents.EmptyTransform("empty");
+    final IRVertex v1 = new OperatorVertex(t);
+    v1.setProperty(ParallelismProperty.of(5));
+    
v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
+    dagBuilder.addVertex(v1);
+
+    final IRVertex v2 = new OperatorVertex(t);
+    v2.setProperty(ParallelismProperty.of(4));
+    
v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
+    dagBuilder.addVertex(v2);
+
+    final IRVertex v3 = new OperatorVertex(t);
+    v3.setProperty(ParallelismProperty.of(3));
+    
v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
+    dagBuilder.addVertex(v3);
+
+    final IRVertex v4 = new OperatorVertex(t);
+    v4.setProperty(ParallelismProperty.of(2));
+    
v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
+    dagBuilder.addVertex(v4);
+
+    final IRVertex v5 = new OperatorVertex(t);
+    v5.setProperty(ParallelismProperty.of(4));
+    
v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT));
+    dagBuilder.addVertex(v5);
+
+    final IREdge e1 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, 
Coder.DUMMY_CODER);
+    dagBuilder.connectVertices(e1);
+
+    final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, 
Coder.DUMMY_CODER);
+    dagBuilder.connectVertices(e2);
+
+    final IREdge e4 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, 
Coder.DUMMY_CODER);
+    dagBuilder.connectVertices(e4);
+
+    final IREdge e5 = new 
IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v5, 
Coder.DUMMY_CODER);
+    dagBuilder.connectVertices(e5);
+
+    return dagBuilder.buildWithoutSourceSinkCheck();
+  }
+}
+
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
index f95c452a..fbe00d27 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
@@ -15,15 +15,6 @@
  */
 package edu.snu.nemo.tests.runtime;
 
-import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.common.state.StageState;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
-import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
-import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupCollection;
-import edu.snu.nemo.runtime.master.scheduler.Scheduler;
-import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.apache.beam.sdk.values.KV;
 
 import java.util.*;
@@ -34,111 +25,6 @@
  * Utility class for runtime unit tests.
  */
 public final class RuntimeTestUtil {
-  /**
-   * Complete the stage by completing all of its TaskGroups.
-   * @param jobStateManager for the submitted job.
-   * @param scheduler for the submitted job.
-   * @param executorRegistry provides executor representers
-   * @param physicalStage for which the states should be marked as complete.
-   */
-  public static void completeStage(final JobStateManager jobStateManager,
-                                   final Scheduler scheduler,
-                                   final ExecutorRegistry executorRegistry,
-                                   final PhysicalStage physicalStage,
-                                   final int attemptIdx) {
-    // Loop until the stage completes.
-    while (true) {
-      final Enum stageState = 
jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState();
-      if (StageState.State.COMPLETE == stageState) {
-        // Stage has completed, so we break out of the loop.
-        break;
-      } else if (StageState.State.EXECUTING == stageState) {
-        physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
-          final Enum tgState = 
jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState();
-          if (TaskGroupState.State.EXECUTING == tgState) {
-            sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, 
taskGroupId,
-                TaskGroupState.State.COMPLETE, attemptIdx, null);
-          } else if (TaskGroupState.State.READY == tgState || 
TaskGroupState.State.COMPLETE == tgState) {
-            // Skip READY (try in the next loop and see if it becomes 
EXECUTING) and COMPLETE.
-          } else {
-            throw new IllegalStateException(tgState.toString());
-          }
-        });
-      } else if (StageState.State.READY == stageState) {
-        // Skip and retry in the next loop.
-      } else {
-        throw new IllegalStateException(stageState.toString());
-      }
-    }
-  }
-
-  /**
-   * Sends task group state change event to scheduler.
-   * This replaces executor's task group completion messages for testing 
purposes.
-   * @param scheduler for the submitted job.
-   * @param executorRegistry provides executor representers
-   * @param taskGroupId for the task group to change the state.
-   * @param newState for the task group.
-   * @param cause in the case of a recoverable failure.
-   */
-  public static void sendTaskGroupStateEventToScheduler(final Scheduler 
scheduler,
-                                                        final ExecutorRegistry 
executorRegistry,
-                                                        final String 
taskGroupId,
-                                                        final 
TaskGroupState.State newState,
-                                                        final int attemptIdx,
-                                                        final 
TaskGroupState.RecoverableFailureCause cause) {
-    ExecutorRepresenter scheduledExecutor;
-    do {
-      scheduledExecutor = findExecutorForTaskGroup(executorRegistry, 
taskGroupId);
-    } while (scheduledExecutor == null);
-
-    scheduler.onTaskGroupStateChanged(scheduledExecutor.getExecutorId(), 
taskGroupId,
-        newState, attemptIdx, null, cause);
-  }
-
-  public static void sendTaskGroupStateEventToScheduler(final Scheduler 
scheduler,
-                                                        final ExecutorRegistry 
executorRegistry,
-                                                        final String 
taskGroupId,
-                                                        final 
TaskGroupState.State newState,
-                                                        final int attemptIdx) {
-    sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, 
taskGroupId, newState, attemptIdx, null);
-  }
-
-  public static void mockSchedulerRunner(final PendingTaskGroupCollection 
pendingTaskGroupCollection,
-                                         final SchedulingPolicy 
schedulingPolicy,
-                                         final JobStateManager jobStateManager,
-                                         final boolean isPartialSchedule) {
-    while (!pendingTaskGroupCollection.isEmpty()) {
-      final ScheduledTaskGroup taskGroupToSchedule = 
pendingTaskGroupCollection.remove(
-          
pendingTaskGroupCollection.peekSchedulableTaskGroups().get().iterator().next().getTaskGroupId());
-
-      schedulingPolicy.scheduleTaskGroup(taskGroupToSchedule, jobStateManager);
-
-      // Schedule only the first task group.
-      if (isPartialSchedule) {
-        break;
-      }
-    }
-  }
-
-  /**
-   * Retrieves the executor to which the given task group was scheduled.
-   * @param taskGroupId of the task group to search.
-   * @param executorRegistry provides executor representers
-   * @return the {@link ExecutorRepresenter} of the executor the task group 
was scheduled to.
-   */
-  private static ExecutorRepresenter findExecutorForTaskGroup(final 
ExecutorRegistry executorRegistry,
-                                                              final String 
taskGroupId) {
-    for (final String executorId : executorRegistry.getRunningExecutorIds()) {
-      final ExecutorRepresenter executor = 
executorRegistry.getRunningExecutorRepresenter(executorId);
-      if (executor.getRunningTaskGroups().contains(taskGroupId)
-          || executor.getCompleteTaskGroups().contains(taskGroupId)) {
-        return executor;
-      }
-    }
-    return null;
-  }
-
   /**
    * Gets a list of integer pair elements in range.
    * @param start value of the range (inclusive).


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to