johnyangk closed pull request #16: [NEMO-66] Move /tests/scheduler into /runtime/master/scheduler/test URL: https://github.com/apache/incubator-nemo/pull/16
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pom.xml b/pom.xml index ba0a0991..18b0c20e 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ limitations under the License. <module>runtime/executor</module> <module>runtime/master</module> <module>runtime/driver</module> + <module>runtime/testplans</module> <module>tests</module> </modules> diff --git a/runtime/master/pom.xml b/runtime/master/pom.xml index a797767e..dc404118 100644 --- a/runtime/master/pom.xml +++ b/runtime/master/pom.xml @@ -46,6 +46,11 @@ limitations under the License. <artifactId>nemo-runtime-common</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>edu.snu.nemo</groupId> + <artifactId>nemo-runtime-testplans</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java similarity index 66% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java rename to runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java index c87bc818..ae2cb068 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java @@ -13,24 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master.scheduler; +package edu.snu.nemo.runtime.master.scheduler; -import edu.snu.nemo.common.coder.Coder; import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper; -import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; -import edu.snu.nemo.common.ir.vertex.IRVertex; -import edu.snu.nemo.common.ir.vertex.OperatorVertex; import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; -import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; -import edu.snu.nemo.compiler.frontend.beam.transform.DoTransform; -import edu.snu.nemo.common.ir.vertex.transform.Transform; -import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty; -import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer; -import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents; import edu.snu.nemo.conf.JobConf; -import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry; -import edu.snu.nemo.tests.runtime.RuntimeTestUtil; import edu.snu.nemo.runtime.common.comm.ControlMessage; import edu.snu.nemo.runtime.common.message.MessageSender; import edu.snu.nemo.runtime.common.plan.physical.*; @@ -39,13 +26,11 @@ import edu.snu.nemo.runtime.master.MetricMessageHandler; import edu.snu.nemo.runtime.master.BlockManagerMaster; import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler; +import edu.snu.nemo.runtime.master.physicalplans.TestPlanGenerator; import edu.snu.nemo.runtime.master.resource.ContainerManager; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; import edu.snu.nemo.runtime.master.resource.ResourceSpecification; import edu.snu.nemo.common.dag.DAG; -import edu.snu.nemo.common.dag.DAGBuilder; -import edu.snu.nemo.runtime.master.scheduler.*; -import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy; import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.tang.Injector; import org.apache.reef.tang.Tang; @@ -75,7 +60,6 @@ PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class}) public final class BatchSingleJobSchedulerTest { private static final Logger LOG = LoggerFactory.getLogger(BatchSingleJobSchedulerTest.class.getName()); - private DAGBuilder<IRVertex, IREdge> irDAGBuilder; private Scheduler scheduler; private SchedulingPolicy schedulingPolicy; private SchedulerRunner schedulerRunner; @@ -86,7 +70,6 @@ private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler; private BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class); private final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class); - private PhysicalPlanGenerator physicalPlanGenerator; private static final int EXECUTOR_CAPACITY = 20; @@ -98,7 +81,6 @@ public void setUp() throws Exception { final Injector injector = Tang.Factory.getTang().newInjector(); injector.bindVolatileParameter(JobConf.DAGDirectory.class, ""); - irDAGBuilder = initializeDAGBuilder(); executorRegistry = injector.getInstance(ExecutorRegistry.class); metricMessageHandler = mock(MetricMessageHandler.class); pendingTaskGroupCollection = new SingleJobTaskGroupCollection(); @@ -139,8 +121,6 @@ public void setUp() throws Exception { // Add storage nodes scheduler.onExecutorAdded(b1); scheduler.onExecutorAdded(b2); - - physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class); } /** @@ -149,9 +129,7 @@ public void setUp() throws Exception { */ @Test(timeout=10000) public void testPull() throws Exception { - final DAG<IRVertex, IREdge> pullIRDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), - new TestPolicy(), ""); - scheduleAndCheckJobTermination(pullIRDAG); + scheduleAndCheckJobTermination(TestPlanGenerator.getSimplePullPlan()); } /** @@ -160,68 +138,19 @@ public void testPull() throws Exception { */ @Test(timeout=10000) public void testPush() throws Exception { - final DAG<IRVertex, IREdge> pushIRDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), - new TestPolicy(true), ""); - scheduleAndCheckJobTermination(pushIRDAG); - } - - private DAGBuilder<IRVertex, IREdge> initializeDAGBuilder() { - final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>(); - - final Transform t = new EmptyComponents.EmptyTransform("empty"); - final IRVertex v1 = new OperatorVertex(t); - v1.setProperty(ParallelismProperty.of(1)); - v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - dagBuilder.addVertex(v1); - - final IRVertex v2 = new OperatorVertex(t); - v2.setProperty(ParallelismProperty.of(2)); - v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - dagBuilder.addVertex(v2); - - final IRVertex v3 = new OperatorVertex(t); - v3.setProperty(ParallelismProperty.of(3)); - v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - dagBuilder.addVertex(v3); - - final IRVertex v4 = new OperatorVertex(t); - v4.setProperty(ParallelismProperty.of(4)); - v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - dagBuilder.addVertex(v4); - - final IRVertex v5 = new OperatorVertex(new DoTransform(null, null)); - v5.setProperty(ParallelismProperty.of(5)); - v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT)); - dagBuilder.addVertex(v5); - - final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER); - dagBuilder.connectVertices(e1); - - final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, Coder.DUMMY_CODER); - dagBuilder.connectVertices(e2); - - final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER); - dagBuilder.connectVertices(e4); - - final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v5, Coder.DUMMY_CODER); - dagBuilder.connectVertices(e5); - - return dagBuilder; + scheduleAndCheckJobTermination(TestPlanGenerator.getSimplePushPlan()); } - private void scheduleAndCheckJobTermination(final DAG<IRVertex, IREdge> irDAG) throws InjectionException { - final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator); - - final PhysicalPlan plan = new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()); + private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws InjectionException { final JobStateManager jobStateManager = new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 1); scheduler.scheduleJob(plan, jobStateManager); // For each ScheduleGroup, test: // a) all stages in the ScheduleGroup enters the executing state // b) the stages of the next ScheduleGroup are scheduled after the stages of each ScheduleGroup are made "complete". - for (int i = 0; i < getNumScheduleGroups(irDAG); i++) { + for (int i = 0; i < getNumScheduleGroups(plan.getStageDAG()); i++) { final int scheduleGroupIdx = i; - final List<PhysicalStage> stages = filterStagesWithAScheduleGroupIndex(physicalDAG, scheduleGroupIdx); + final List<PhysicalStage> stages = filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx); LOG.debug("Checking that all stages of ScheduleGroup {} enter the executing state", scheduleGroupIdx); stages.forEach(physicalStage -> { @@ -232,7 +161,7 @@ private void scheduleAndCheckJobTermination(final DAG<IRVertex, IREdge> irDAG) t }); stages.forEach(physicalStage -> { - RuntimeTestUtil.completeStage( + SchedulerTestUtil.completeStage( jobStateManager, scheduler, executorRegistry, physicalStage, MAGIC_SCHEDULE_ATTEMPT_INDEX); }); } @@ -258,10 +187,9 @@ private void scheduleAndCheckJobTermination(final DAG<IRVertex, IREdge> irDAG) t return sortedStages; } - private int getNumScheduleGroups(final DAG<IRVertex, IREdge> irDAG) { + private int getNumScheduleGroups(final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG) { final Set<Integer> scheduleGroupSet = new HashSet<>(); - irDAG.getVertices().forEach(irVertex -> - scheduleGroupSet.add((Integer) irVertex.getProperty(ExecutionProperty.Key.ScheduleGroupIndex))); + physicalDAG.getVertices().forEach(stage -> scheduleGroupSet.add(stage.getScheduleGroupIndex())); return scheduleGroupSet.size(); } } diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java similarity index 75% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java rename to runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java index 8bfd993c..28a82dcd 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java @@ -13,24 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master.scheduler; +package edu.snu.nemo.runtime.master.scheduler; -import edu.snu.nemo.common.coder.Coder; -import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper; import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; -import edu.snu.nemo.common.ir.vertex.OperatorVertex; import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; -import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; -import edu.snu.nemo.common.ir.vertex.transform.Transform; -import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer; -import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents; -import edu.snu.nemo.conf.JobConf; -import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry; -import edu.snu.nemo.tests.runtime.RuntimeTestUtil; import edu.snu.nemo.runtime.common.comm.ControlMessage; import edu.snu.nemo.runtime.common.message.MessageSender; import edu.snu.nemo.runtime.common.plan.physical.*; @@ -39,12 +28,10 @@ import edu.snu.nemo.runtime.master.MetricMessageHandler; import edu.snu.nemo.runtime.master.BlockManagerMaster; import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler; +import edu.snu.nemo.runtime.master.physicalplans.TestPlanGenerator; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; import edu.snu.nemo.runtime.master.resource.ResourceSpecification; -import edu.snu.nemo.runtime.master.scheduler.*; -import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy; import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.tang.Injector; import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; import org.junit.Before; @@ -89,7 +76,6 @@ private BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class); private final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class); private final ExecutorService serExecutorService = Executors.newSingleThreadExecutor(); - private PhysicalPlanGenerator physicalPlanGenerator; private static final int MAX_SCHEDULE_ATTEMPT = 3; @@ -101,9 +87,6 @@ public void setUp() throws Exception { pubSubEventHandler = mock(PubSubEventHandlerWrapper.class); updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class); - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(JobConf.DAGDirectory.class, ""); - physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class); } private void setUpExecutors(final Collection<ExecutorRepresenter> executors, @@ -128,53 +111,6 @@ private void setUpExecutors(final Collection<ExecutorRepresenter> executors, } } - private PhysicalPlan buildPlan() throws Exception { - // Build DAG - final Transform t = new EmptyComponents.EmptyTransform("empty"); - final IRVertex v1 = new OperatorVertex(t); - v1.setProperty(ParallelismProperty.of(3)); - v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v1); - - final IRVertex v2 = new OperatorVertex(t); - v2.setProperty(ParallelismProperty.of(2)); - v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v2); - - final IRVertex v3 = new OperatorVertex(t); - v3.setProperty(ParallelismProperty.of(3)); - v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v3); - - final IRVertex v4 = new OperatorVertex(t); - v4.setProperty(ParallelismProperty.of(2)); - v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v4); - - final IRVertex v5 = new OperatorVertex(t); - v5.setProperty(ParallelismProperty.of(2)); - v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v5); - - final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e1); - - final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e2); - - final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e3); - - final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v5, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e4); - - final DAG<IRVertex, IREdge> irDAG = - CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), new TestPolicy(), ""); - final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator); - - return new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()); - } - /** * Tests fault tolerance after a container removal. */ @@ -196,7 +132,7 @@ public void testContainerRemoval() throws Exception { executors.add(a3); setUpExecutors(executors, true); - final PhysicalPlan plan = buildPlan(); + final PhysicalPlan plan = TestPlanGenerator.getSimplePullPlan(); final JobStateManager jobStateManager = new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); scheduler.scheduleJob(plan, jobStateManager); @@ -207,14 +143,14 @@ public void testContainerRemoval() throws Exception { if (stage.getScheduleGroupIndex() == 0) { // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0. - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); assertTrue(pendingTaskGroupCollection.isEmpty()); stage.getTaskGroupIds().forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, TaskGroupState.State.COMPLETE, 1)); } else if (stage.getScheduleGroupIndex() == 1) { // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1. - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); // Due to round robin scheduling, "a2" is assured to have a running TaskGroup. scheduler.onExecutorRemoved("a2"); @@ -224,15 +160,15 @@ public void testContainerRemoval() throws Exception { } assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 2); - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); assertTrue(pendingTaskGroupCollection.isEmpty()); stage.getTaskGroupIds().forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, TaskGroupState.State.COMPLETE, 1)); } else { // There are 2 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 2. // Schedule only the first TaskGroup - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, true); + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, true); boolean first = true; for (final String taskGroupId : stage.getTaskGroupIds()) { @@ -273,7 +209,7 @@ public void testOutputFailure() throws Exception { executors.add(a3); setUpExecutors(executors, true); - final PhysicalPlan plan = buildPlan(); + final PhysicalPlan plan = TestPlanGenerator.getSimplePullPlan(); final JobStateManager jobStateManager = new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); scheduler.scheduleJob(plan, jobStateManager); @@ -284,17 +220,17 @@ public void testOutputFailure() throws Exception { if (stage.getScheduleGroupIndex() == 0) { // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0. - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); assertTrue(pendingTaskGroupCollection.isEmpty()); stage.getTaskGroupIds().forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, TaskGroupState.State.COMPLETE, 1)); } else if (stage.getScheduleGroupIndex() == 1) { // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1. - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); assertTrue(pendingTaskGroupCollection.isEmpty()); stage.getTaskGroupIds().forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1, TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE)); @@ -333,7 +269,7 @@ public void testInputReadFailure() throws Exception { executors.add(a3); setUpExecutors(executors, true); - final PhysicalPlan plan = buildPlan(); + final PhysicalPlan plan = TestPlanGenerator.getSimplePullPlan(); final JobStateManager jobStateManager = new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); scheduler.scheduleJob(plan, jobStateManager); @@ -344,17 +280,17 @@ public void testInputReadFailure() throws Exception { if (stage.getScheduleGroupIndex() == 0) { // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0. - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); assertTrue(pendingTaskGroupCollection.isEmpty()); stage.getTaskGroupIds().forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, TaskGroupState.State.COMPLETE, 1)); } else if (stage.getScheduleGroupIndex() == 1) { // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1. - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false); stage.getTaskGroupIds().forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1, TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE)); @@ -393,7 +329,7 @@ public void testTaskGroupReexecutionForFailure() throws Exception { setUpExecutors(executors, false); - final PhysicalPlan plan = buildPlan(); + final PhysicalPlan plan = TestPlanGenerator.getSimplePullPlan(); final JobStateManager jobStateManager = new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); @@ -408,11 +344,11 @@ public void testTaskGroupReexecutionForFailure() throws Exception { final Set<String> a3RunningTaskGroups = new HashSet<>(a3.getRunningTaskGroups()); a1RunningTaskGroups.forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, TaskGroupState.State.COMPLETE, 1)); a3RunningTaskGroups.forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, TaskGroupState.State.COMPLETE, 1)); } } diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java similarity index 95% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java rename to runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java index 22068a59..4ec8c9e6 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java @@ -13,19 +13,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master.scheduler; +package edu.snu.nemo.runtime.master.scheduler; -import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; -import edu.snu.nemo.runtime.common.RuntimeIdGenerator; +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import edu.snu.nemo.runtime.common.comm.ControlMessage; import edu.snu.nemo.runtime.common.message.MessageSender; import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; import edu.snu.nemo.runtime.master.JobStateManager; -import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; import edu.snu.nemo.runtime.master.resource.ResourceSpecification; -import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy; -import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy; import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java new file mode 100644 index 00000000..b5af80d8 --- /dev/null +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java @@ -0,0 +1,133 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master.scheduler; + +import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage; +import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; +import edu.snu.nemo.runtime.common.state.StageState; +import edu.snu.nemo.runtime.common.state.TaskGroupState; +import edu.snu.nemo.runtime.master.JobStateManager; +import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; + +/** + * Utility class for runtime unit tests. + */ +public final class SchedulerTestUtil { + /** + * Complete the stage by completing all of its TaskGroups. + * @param jobStateManager for the submitted job. + * @param scheduler for the submitted job. + * @param executorRegistry provides executor representers + * @param physicalStage for which the states should be marked as complete. + */ + public static void completeStage(final JobStateManager jobStateManager, + final Scheduler scheduler, + final ExecutorRegistry executorRegistry, + final PhysicalStage physicalStage, + final int attemptIdx) { + // Loop until the stage completes. + while (true) { + final Enum stageState = jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState(); + if (StageState.State.COMPLETE == stageState) { + // Stage has completed, so we break out of the loop. + break; + } else if (StageState.State.EXECUTING == stageState) { + physicalStage.getTaskGroupIds().forEach(taskGroupId -> { + final Enum tgState = jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState(); + if (TaskGroupState.State.EXECUTING == tgState) { + sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, + TaskGroupState.State.COMPLETE, attemptIdx, null); + } else if (TaskGroupState.State.READY == tgState || TaskGroupState.State.COMPLETE == tgState) { + // Skip READY (try in the next loop and see if it becomes EXECUTING) and COMPLETE. + } else { + throw new IllegalStateException(tgState.toString()); + } + }); + } else if (StageState.State.READY == stageState) { + // Skip and retry in the next loop. + } else { + throw new IllegalStateException(stageState.toString()); + } + } + } + + /** + * Sends task group state change event to scheduler. + * This replaces executor's task group completion messages for testing purposes. + * @param scheduler for the submitted job. + * @param executorRegistry provides executor representers + * @param taskGroupId for the task group to change the state. + * @param newState for the task group. + * @param cause in the case of a recoverable failure. + */ + public static void sendTaskGroupStateEventToScheduler(final Scheduler scheduler, + final ExecutorRegistry executorRegistry, + final String taskGroupId, + final TaskGroupState.State newState, + final int attemptIdx, + final TaskGroupState.RecoverableFailureCause cause) { + ExecutorRepresenter scheduledExecutor; + do { + scheduledExecutor = findExecutorForTaskGroup(executorRegistry, taskGroupId); + } while (scheduledExecutor == null); + + scheduler.onTaskGroupStateChanged(scheduledExecutor.getExecutorId(), taskGroupId, + newState, attemptIdx, null, cause); + } + + public static void sendTaskGroupStateEventToScheduler(final Scheduler scheduler, + final ExecutorRegistry executorRegistry, + final String taskGroupId, + final TaskGroupState.State newState, + final int attemptIdx) { + sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, newState, attemptIdx, null); + } + + public static void mockSchedulerRunner(final PendingTaskGroupCollection pendingTaskGroupCollection, + final SchedulingPolicy schedulingPolicy, + final JobStateManager jobStateManager, + final boolean isPartialSchedule) { + while (!pendingTaskGroupCollection.isEmpty()) { + final ScheduledTaskGroup taskGroupToSchedule = pendingTaskGroupCollection.remove( + pendingTaskGroupCollection.peekSchedulableTaskGroups().get().iterator().next().getTaskGroupId()); + + schedulingPolicy.scheduleTaskGroup(taskGroupToSchedule, jobStateManager); + + // Schedule only the first task group. + if (isPartialSchedule) { + break; + } + } + } + + /** + * Retrieves the executor to which the given task group was scheduled. + * @param taskGroupId of the task group to search. + * @param executorRegistry provides executor representers + * @return the {@link ExecutorRepresenter} of the executor the task group was scheduled to. + */ + private static ExecutorRepresenter findExecutorForTaskGroup(final ExecutorRegistry executorRegistry, + final String taskGroupId) { + for (final String executorId : executorRegistry.getRunningExecutorIds()) { + final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(executorId); + if (executor.getRunningTaskGroups().contains(taskGroupId) + || executor.getCompleteTaskGroups().contains(taskGroupId)) { + return executor; + } + } + return null; + } +} diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java similarity index 85% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java rename to runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java index 41bdb4dc..2fe285c6 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java @@ -13,10 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master.scheduler; +package edu.snu.nemo.runtime.master.scheduler; import edu.snu.nemo.common.coder.Coder; -import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; import edu.snu.nemo.common.ir.vertex.transform.Transform; import edu.snu.nemo.common.ir.edge.IREdge; @@ -25,14 +24,9 @@ import edu.snu.nemo.common.ir.vertex.OperatorVertex; import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; -import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer; -import edu.snu.nemo.conf.JobConf; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import edu.snu.nemo.runtime.common.plan.physical.*; -import edu.snu.nemo.runtime.master.scheduler.SingleJobTaskGroupCollection; -import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; +import edu.snu.nemo.runtime.master.physicalplans.TestPlanGenerator; import org.junit.Before; import org.junit.Test; @@ -54,7 +48,6 @@ public final class SingleTaskGroupQueueTest { private DAGBuilder<IRVertex, IREdge> irDAGBuilder; private SingleJobTaskGroupCollection pendingTaskGroupPriorityQueue; - private PhysicalPlanGenerator physicalPlanGenerator; /** * To be used for a thread pool to execute task groups. @@ -66,10 +59,6 @@ public void setUp() throws Exception{ irDAGBuilder = new DAGBuilder<>(); pendingTaskGroupPriorityQueue = new SingleJobTaskGroupCollection(); executorService = Executors.newFixedThreadPool(2); - - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(JobConf.DAGDirectory.class, ""); - physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class); } /** @@ -100,15 +89,9 @@ public void testPushPriority() throws Exception { final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, Coder.DUMMY_CODER); irDAGBuilder.connectVertices(e2); - final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), - new TestPolicy(true), ""); - - final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator); - - pendingTaskGroupPriorityQueue.onJobScheduled( - new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap())); - - final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort(); + final PhysicalPlan physicalPlan = TestPlanGenerator.getSimplePullPlan(); + pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan); + final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort(); // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements. assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), dagOf2Stages.get(1).getScheduleGroupIndex()); @@ -183,15 +166,9 @@ public void testPullPriority() throws Exception { final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, Coder.DUMMY_CODER); irDAGBuilder.connectVertices(e2); - final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), - new TestPolicy(), ""); - - final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator); - - pendingTaskGroupPriorityQueue.onJobScheduled( - new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap())); - - final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort(); + final PhysicalPlan physicalPlan = TestPlanGenerator.getSimplePullPlan(); + pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan); + final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort(); // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements. assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 0); @@ -263,15 +240,9 @@ public void testWithDifferentContainerType() throws Exception { final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, Coder.DUMMY_CODER); irDAGBuilder.connectVertices(e2); - final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), - new TestPolicy(true), ""); - - final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator); - - pendingTaskGroupPriorityQueue.onJobScheduled( - new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap())); - - final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort(); + final PhysicalPlan physicalPlan = TestPlanGenerator.getSimplePullPlan(); + pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan); + final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort(); // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements. assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), dagOf2Stages.get(1).getScheduleGroupIndex()); diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java similarity index 98% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java rename to runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java index 143ecb4c..8fc87cf1 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java @@ -13,18 +13,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master.scheduler; +package edu.snu.nemo.runtime.master.scheduler; import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; import edu.snu.nemo.runtime.common.state.TaskGroupState; import edu.snu.nemo.common.ir.Readable; import edu.snu.nemo.runtime.master.JobStateManager; -import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; -import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy; -import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy; -import edu.snu.nemo.runtime.master.scheduler.SourceLocationAwareSchedulingPolicy; import org.apache.reef.tang.Injector; import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; diff --git a/runtime/testplans/pom.xml b/runtime/testplans/pom.xml new file mode 100644 index 00000000..64799ba1 --- /dev/null +++ b/runtime/testplans/pom.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Copyright (C) 2017 Seoul National University +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>edu.snu.nemo</groupId> + <artifactId>nemo-project</artifactId> + <version>0.1-SNAPSHOT</version> + <relativePath>../../</relativePath> + </parent> + + <artifactId>nemo-runtime-testplans</artifactId> + <name>Nemo Runtime Test Plans</name> + + <repositories> + <repository> + <id>Bundled Maven Repository</id> + <url>file://${basedir}/../../common/src/main/resources/repository</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>edu.snu.nemo</groupId> + <artifactId>nemo-compiler-optimizer</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>edu.snu.nemo</groupId> + <artifactId>nemo-runtime-common</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> diff --git a/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/BasicPullPolicy.java b/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/BasicPullPolicy.java new file mode 100644 index 00000000..dfeb7c88 --- /dev/null +++ b/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/BasicPullPolicy.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master.physicalplans; + +import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass; +import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass; +import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass; +import edu.snu.nemo.compiler.optimizer.policy.Policy; +import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass; + +import java.util.ArrayList; +import java.util.List; + +/** + * Basic pull policy. + */ +public final class BasicPullPolicy implements Policy { + @Override + public List<CompileTimePass> getCompileTimePasses() { + List<CompileTimePass> policy = new ArrayList<>(); + policy.add(new DefaultStagePartitioningPass()); + policy.add(new ScheduleGroupPass()); + return policy; + } + + @Override + public List<RuntimePass<?>> getRuntimePasses() { + return new ArrayList<>(); + } +} diff --git a/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/BasicPushPolicy.java b/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/BasicPushPolicy.java new file mode 100644 index 00000000..60a72699 --- /dev/null +++ b/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/BasicPushPolicy.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master.physicalplans; + +import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass; +import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass; +import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass; +import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ShuffleEdgePushPass; +import edu.snu.nemo.compiler.optimizer.policy.Policy; +import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass; + +import java.util.ArrayList; +import java.util.List; + +/** + * Basic push policy. + */ +public final class BasicPushPolicy implements Policy { + @Override + public List<CompileTimePass> getCompileTimePasses() { + List<CompileTimePass> policy = new ArrayList<>(); + policy.add(new DefaultStagePartitioningPass()); + policy.add(new ShuffleEdgePushPass()); + policy.add(new ScheduleGroupPass()); + return policy; + } + + @Override + public List<RuntimePass<?>> getRuntimePasses() { + return new ArrayList<>(); + } +} diff --git a/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/TestPlanGenerator.java b/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/TestPlanGenerator.java new file mode 100644 index 00000000..352aa149 --- /dev/null +++ b/runtime/testplans/src/main/java/edu.snu.nemo.runtime.master.physicalplans/TestPlanGenerator.java @@ -0,0 +1,138 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master.physicalplans; + +import edu.snu.nemo.common.coder.Coder; +import edu.snu.nemo.common.dag.DAG; +import edu.snu.nemo.common.dag.DAGBuilder; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; +import edu.snu.nemo.common.ir.vertex.IRVertex; +import edu.snu.nemo.common.ir.vertex.OperatorVertex; +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; +import edu.snu.nemo.common.ir.vertex.transform.Transform; +import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer; +import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents; +import edu.snu.nemo.compiler.optimizer.policy.Policy; +import edu.snu.nemo.conf.JobConf; +import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan; +import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator; +import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage; +import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; + +/** + * Generates physical plans from IRs. + */ +public final class TestPlanGenerator { + private static final PhysicalPlanGenerator PLAN_GENERATOR; + private static final String EMPTY_DAG_DIRECTORY = ""; + + static { + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(JobConf.DAGDirectory.class, EMPTY_DAG_DIRECTORY); + try { + PLAN_GENERATOR = injector.getInstance(PhysicalPlanGenerator.class); + } catch (InjectionException e) { + throw new RuntimeException(e); + } + } + + /** + * private constructor. + */ + private TestPlanGenerator() { + } + + /** + * @return push-based plan + * @throws Exception exception + */ + public static PhysicalPlan getSimplePushPlan() throws Exception { + return getSimplePlan(new BasicPushPolicy()); + } + + /** + * @return pull-based plan + * @throws Exception exception + */ + public static PhysicalPlan getSimplePullPlan() throws Exception { + return getSimplePlan(new BasicPullPolicy()); + } + + /** + * @param policy + * @return a simple plan given the policy + * @throws Exception exception + */ + private static PhysicalPlan getSimplePlan(final Policy policy) throws Exception { + final DAG<IRVertex, IREdge> simpleIRDAG = buildSimpleIRDAG(); + final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(simpleIRDAG, policy, EMPTY_DAG_DIRECTORY); + final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(PLAN_GENERATOR); + return new PhysicalPlan("SimplePlan", physicalDAG, PLAN_GENERATOR.getTaskIRVertexMap()); + } + + /** + * @return a simple IR DAG + */ + private static DAG<IRVertex, IREdge> buildSimpleIRDAG() { + final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>(); + + final Transform t = new EmptyComponents.EmptyTransform("empty"); + final IRVertex v1 = new OperatorVertex(t); + v1.setProperty(ParallelismProperty.of(5)); + v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); + dagBuilder.addVertex(v1); + + final IRVertex v2 = new OperatorVertex(t); + v2.setProperty(ParallelismProperty.of(4)); + v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); + dagBuilder.addVertex(v2); + + final IRVertex v3 = new OperatorVertex(t); + v3.setProperty(ParallelismProperty.of(3)); + v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); + dagBuilder.addVertex(v3); + + final IRVertex v4 = new OperatorVertex(t); + v4.setProperty(ParallelismProperty.of(2)); + v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); + dagBuilder.addVertex(v4); + + final IRVertex v5 = new OperatorVertex(t); + v5.setProperty(ParallelismProperty.of(4)); + v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT)); + dagBuilder.addVertex(v5); + + final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER); + dagBuilder.connectVertices(e1); + + final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, Coder.DUMMY_CODER); + dagBuilder.connectVertices(e2); + + final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER); + dagBuilder.connectVertices(e4); + + final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v5, Coder.DUMMY_CODER); + dagBuilder.connectVertices(e5); + + return dagBuilder.buildWithoutSourceSinkCheck(); + } +} + diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java index f95c452a..fbe00d27 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java +++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java @@ -15,15 +15,6 @@ */ package edu.snu.nemo.tests.runtime; -import edu.snu.nemo.runtime.common.plan.physical.*; -import edu.snu.nemo.runtime.common.state.StageState; -import edu.snu.nemo.runtime.common.state.TaskGroupState; -import edu.snu.nemo.runtime.master.JobStateManager; -import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry; -import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; -import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupCollection; -import edu.snu.nemo.runtime.master.scheduler.Scheduler; -import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy; import org.apache.beam.sdk.values.KV; import java.util.*; @@ -34,111 +25,6 @@ * Utility class for runtime unit tests. */ public final class RuntimeTestUtil { - /** - * Complete the stage by completing all of its TaskGroups. - * @param jobStateManager for the submitted job. - * @param scheduler for the submitted job. - * @param executorRegistry provides executor representers - * @param physicalStage for which the states should be marked as complete. - */ - public static void completeStage(final JobStateManager jobStateManager, - final Scheduler scheduler, - final ExecutorRegistry executorRegistry, - final PhysicalStage physicalStage, - final int attemptIdx) { - // Loop until the stage completes. - while (true) { - final Enum stageState = jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState(); - if (StageState.State.COMPLETE == stageState) { - // Stage has completed, so we break out of the loop. - break; - } else if (StageState.State.EXECUTING == stageState) { - physicalStage.getTaskGroupIds().forEach(taskGroupId -> { - final Enum tgState = jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState(); - if (TaskGroupState.State.EXECUTING == tgState) { - sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, - TaskGroupState.State.COMPLETE, attemptIdx, null); - } else if (TaskGroupState.State.READY == tgState || TaskGroupState.State.COMPLETE == tgState) { - // Skip READY (try in the next loop and see if it becomes EXECUTING) and COMPLETE. - } else { - throw new IllegalStateException(tgState.toString()); - } - }); - } else if (StageState.State.READY == stageState) { - // Skip and retry in the next loop. - } else { - throw new IllegalStateException(stageState.toString()); - } - } - } - - /** - * Sends task group state change event to scheduler. - * This replaces executor's task group completion messages for testing purposes. - * @param scheduler for the submitted job. - * @param executorRegistry provides executor representers - * @param taskGroupId for the task group to change the state. - * @param newState for the task group. - * @param cause in the case of a recoverable failure. - */ - public static void sendTaskGroupStateEventToScheduler(final Scheduler scheduler, - final ExecutorRegistry executorRegistry, - final String taskGroupId, - final TaskGroupState.State newState, - final int attemptIdx, - final TaskGroupState.RecoverableFailureCause cause) { - ExecutorRepresenter scheduledExecutor; - do { - scheduledExecutor = findExecutorForTaskGroup(executorRegistry, taskGroupId); - } while (scheduledExecutor == null); - - scheduler.onTaskGroupStateChanged(scheduledExecutor.getExecutorId(), taskGroupId, - newState, attemptIdx, null, cause); - } - - public static void sendTaskGroupStateEventToScheduler(final Scheduler scheduler, - final ExecutorRegistry executorRegistry, - final String taskGroupId, - final TaskGroupState.State newState, - final int attemptIdx) { - sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, newState, attemptIdx, null); - } - - public static void mockSchedulerRunner(final PendingTaskGroupCollection pendingTaskGroupCollection, - final SchedulingPolicy schedulingPolicy, - final JobStateManager jobStateManager, - final boolean isPartialSchedule) { - while (!pendingTaskGroupCollection.isEmpty()) { - final ScheduledTaskGroup taskGroupToSchedule = pendingTaskGroupCollection.remove( - pendingTaskGroupCollection.peekSchedulableTaskGroups().get().iterator().next().getTaskGroupId()); - - schedulingPolicy.scheduleTaskGroup(taskGroupToSchedule, jobStateManager); - - // Schedule only the first task group. - if (isPartialSchedule) { - break; - } - } - } - - /** - * Retrieves the executor to which the given task group was scheduled. - * @param taskGroupId of the task group to search. - * @param executorRegistry provides executor representers - * @return the {@link ExecutorRepresenter} of the executor the task group was scheduled to. - */ - private static ExecutorRepresenter findExecutorForTaskGroup(final ExecutorRegistry executorRegistry, - final String taskGroupId) { - for (final String executorId : executorRegistry.getRunningExecutorIds()) { - final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(executorId); - if (executor.getRunningTaskGroups().contains(taskGroupId) - || executor.getCompleteTaskGroups().contains(taskGroupId)) { - return executor; - } - } - return null; - } - /** * Gets a list of integer pair elements in range. * @param start value of the range (inclusive). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services