jeongyooneo closed pull request #27: [NEMO-49] Replace failed executor with a new executor URL: https://github.com/apache/incubator-nemo/pull/27
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java index 60e455ef..b89e40a3 100644 --- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java +++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java @@ -153,13 +153,7 @@ private void startSchedulingUserApplication() { public final class FailedEvaluatorHandler implements EventHandler<FailedEvaluator> { @Override public void onNext(final FailedEvaluator failedEvaluator) { - // The list size is 0 if the evaluator failed before an executor started. For now, the size is 1 otherwise. - failedEvaluator.getFailedContextList().forEach(failedContext -> { - final String failedExecutorId = failedContext.getId(); - runtimeMaster.onExecutorFailed(failedExecutorId); - }); - throw new RuntimeException(failedEvaluator.getId() - + " failed. See driver's log for the stack trace in executor."); + runtimeMaster.onExecutorFailed(failedEvaluator); } } diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java index 51c505d8..d2b709f4 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java @@ -35,6 +35,7 @@ import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.FailedEvaluator; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.annotations.Parameter; import org.slf4j.Logger; @@ -226,12 +227,20 @@ public boolean onExecutorLaunched(final ActiveContext activeContext) { /** * Called when an executor fails due to container failure on this runtime. - * @param failedExecutorId of the failed executor. + * @param failedEvaluator that failed. */ - public void onExecutorFailed(final String failedExecutorId) { + public void onExecutorFailed(final FailedEvaluator failedEvaluator) { runtimeMasterThread.execute(() -> { - LOG.error(failedExecutorId + " executor failed"); - scheduler.onExecutorRemoved(failedExecutorId); + LOG.info("onExecutorFailed: {}", failedEvaluator.getId()); + + // Note that getFailedContextList() can be empty if the failure occurred + // prior to launching an Executor on the Evaluator. + failedEvaluator.getFailedContextList().forEach(failedContext -> { + final String failedExecutorId = failedContext.getId(); + scheduler.onExecutorRemoved(failedExecutorId); + }); + + containerManager.onContainerFailed(failedEvaluator.getId()); }); } diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java index 7d3d917e..2561db1e 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java @@ -70,6 +70,11 @@ private final Map<String, ResourceSpecification> pendingContextIdToResourceSpec; private final Map<String, List<ResourceSpecification>> pendingContainerRequestsByContainerType; + /** + * Remember the resource spec for each evaluator. + */ + private final Map<String, ResourceSpecification> evaluatorIdToResourceSpec; + @Inject private ContainerManager(@Parameter(JobConf.ScheduleSerThread.class) final int scheduleSerThread, final EvaluatorRequestor evaluatorRequestor, @@ -79,6 +84,7 @@ private ContainerManager(@Parameter(JobConf.ScheduleSerThread.class) final int s this.messageEnvironment = messageEnvironment; this.pendingContextIdToResourceSpec = new HashMap<>(); this.pendingContainerRequestsByContainerType = new HashMap<>(); + this.evaluatorIdToResourceSpec = new HashMap<>(); this.requestLatchByResourceSpecId = new HashMap<>(); this.serializationExecutorService = Executors.newFixedThreadPool(scheduleSerThread); } @@ -126,7 +132,8 @@ public void requestContainer(final int numToRequest, final ResourceSpecification * @param allocatedContainer the allocated container. * @param executorConfiguration executor related configuration. */ - public void onContainerAllocated(final String executorId, final AllocatedEvaluator allocatedContainer, + public void onContainerAllocated(final String executorId, + final AllocatedEvaluator allocatedContainer, final Configuration executorConfiguration) { if (isTerminated) { LOG.info("ContainerManager is terminated, closing {}", allocatedContainer.getId()); @@ -135,6 +142,8 @@ public void onContainerAllocated(final String executorId, final AllocatedEvaluat } final ResourceSpecification resourceSpecification = selectResourceSpecForContainer(); + evaluatorIdToResourceSpec.put(allocatedContainer.getId(), resourceSpecification); + LOG.info("Container type (" + resourceSpecification.getContainerType() + ") allocated, will be used for [" + executorId + "]"); pendingContextIdToResourceSpec.put(executorId, resourceSpecification); @@ -181,6 +190,20 @@ public void onContainerAllocated(final String executorId, final AllocatedEvaluat return Optional.of(executorRepresenter); } + /** + * Re-acquire a new container using the failed container's resource spec. + * @param failedEvaluatorId of the failed evaluator + * @return the resource specification of the failed evaluator + */ + public ResourceSpecification onContainerFailed(final String failedEvaluatorId) { + final ResourceSpecification resourceSpecification = evaluatorIdToResourceSpec.remove(failedEvaluatorId); + if (resourceSpecification == null) { + throw new IllegalStateException(failedEvaluatorId + " not in " + evaluatorIdToResourceSpec); + } + requestContainer(1, resourceSpecification); + return resourceSpecification; + } + public void terminate() { if (isTerminated) { throw new IllegalStateException("Cannot terminate twice"); diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java similarity index 99% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java index c08815c2..af8371b2 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java +++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master; +package edu.snu.nemo.runtime.master; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import edu.snu.nemo.runtime.common.exception.AbsentBlockException; diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java new file mode 100644 index 00000000..86d1ad04 --- /dev/null +++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java @@ -0,0 +1,173 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master; + +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; +import edu.snu.nemo.conf.JobConf; +import edu.snu.nemo.runtime.common.message.MessageEnvironment; +import edu.snu.nemo.runtime.master.resource.ContainerManager; +import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; +import edu.snu.nemo.runtime.master.resource.ResourceSpecification; +import org.apache.reef.driver.catalog.NodeDescriptor; +import org.apache.reef.driver.context.ActiveContext; +import org.apache.reef.driver.evaluator.AllocatedEvaluator; +import org.apache.reef.driver.evaluator.EvaluatorDescriptor; +import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests {@link edu.snu.nemo.runtime.master.resource.ContainerManager}. + */ +public final class ContainerManagerTest { + private static final ResourceSpecification RESOURCE_SPEC_A = + new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 1, 1024); + private static final ResourceSpecification RESOURCE_SPEC_B = + new ResourceSpecification(ExecutorPlacementProperty.TRANSIENT, 2, 2048); + private static final ResourceSpecification RESOURCE_SPEC_C = + new ResourceSpecification(ExecutorPlacementProperty.RESERVED, 3, 3072); + + private ContainerManager containerManager; + private AtomicInteger testIdNumber = new AtomicInteger(0); + + private String getNodeName() { + return "NODE-" + testIdNumber.incrementAndGet(); + } + + private String getEvaluatorId() { + return "EVALUATOR-" + testIdNumber.incrementAndGet(); + } + + private String getExecutorId() { + return "EXECUTOR-" + testIdNumber.incrementAndGet(); + } + + @Before + public void setUp() throws InjectionException { + + final MessageEnvironment mockMsgEnv = mock(MessageEnvironment.class); + when(mockMsgEnv.asyncConnect(anyString(), anyString())).thenReturn(mock(Future.class)); + final Configuration configuration = Tang.Factory.getTang().newConfigurationBuilder() + .bindNamedParameter(JobConf.ScheduleSerThread.class, "1") + .build(); + final Injector injector = Tang.Factory.getTang().newInjector(configuration); + injector.bindVolatileInstance(EvaluatorRequestor.class, mock(EvaluatorRequestor.class)); + injector.bindVolatileInstance(MessageEnvironment.class, mockMsgEnv); + containerManager = injector.getInstance(ContainerManager.class); + } + + @Test + public void testRequestAllocateLaunch() { + // Create 2 of A, 2 of B and 1 of C. + final Map<Integer, ResourceSpecification> numToSpec = new HashMap(); + numToSpec.put(2, RESOURCE_SPEC_A); + numToSpec.put(2, RESOURCE_SPEC_B); + numToSpec.put(1, RESOURCE_SPEC_C); + + // Request -> Allocate -> Launch + for (final Map.Entry<Integer, ResourceSpecification> entry : numToSpec.entrySet()) { + final int num = entry.getKey(); + final ResourceSpecification spec = entry.getValue(); + containerManager.requestContainer(num, spec); + + for (int i = 0; i < num; i++) { + final String evaluatorId = getEvaluatorId(); + final String executorId = getExecutorId(); + final EvaluatorDescriptor descriptor = createDescriptor(spec); + + containerManager.onContainerAllocated( + executorId, + createMockEvaluator(evaluatorId, descriptor), + mock(Configuration.class)); + final ExecutorRepresenter executorRepresenter = + containerManager.onContainerLaunched(createMockContext(executorId, descriptor)).get(); + assertEquals(spec.getContainerType(), executorRepresenter.getContainerType()); + assertEquals(spec.getCapacity(), executorRepresenter.getExecutorCapacity()); + assertEquals(descriptor.getNodeDescriptor().getName(), executorRepresenter.getNodeName()); + } + } + } + + @Test + public void testFailureBeforeLaunch() { + containerManager.requestContainer(1, RESOURCE_SPEC_A); + final String evaluatorId = getEvaluatorId(); + + containerManager.onContainerAllocated( + getExecutorId(), + createMockEvaluator(evaluatorId, createDescriptor(RESOURCE_SPEC_A)), + mock(Configuration.class)); + assertEquals(RESOURCE_SPEC_A, containerManager.onContainerFailed(evaluatorId)); + } + + @Test + public void testFailureAfterLaunch() { + containerManager.requestContainer(1, RESOURCE_SPEC_A); + final String evaluatorId = getEvaluatorId(); + final String executorId = getExecutorId(); + final EvaluatorDescriptor descriptor = createDescriptor(RESOURCE_SPEC_A); + + containerManager.onContainerAllocated( + executorId, + createMockEvaluator(evaluatorId, descriptor), + mock(Configuration.class)); + containerManager.onContainerLaunched(createMockContext(executorId, descriptor)); + assertEquals(RESOURCE_SPEC_A, containerManager.onContainerFailed(evaluatorId)); + } + + private EvaluatorDescriptor createDescriptor(final ResourceSpecification spec) { + final EvaluatorDescriptor descriptor = mock(EvaluatorDescriptor.class); + when(descriptor.getMemory()).thenReturn(spec.getMemory()); + when(descriptor.getNumberOfCores()).thenReturn(spec.getCapacity()); + + final NodeDescriptor node = mock(NodeDescriptor.class); + when(node.getName()).thenReturn(getNodeName()); + when(descriptor.getNodeDescriptor()).thenReturn(node); + return descriptor; + } + + private AllocatedEvaluator createMockEvaluator(final String id, + final EvaluatorDescriptor descriptor) { + final AllocatedEvaluator evaluator = mock(AllocatedEvaluator.class); + when(evaluator.getId()).thenReturn(id); + when(evaluator.getEvaluatorDescriptor()).thenReturn(descriptor); + return evaluator; + } + + private ActiveContext createMockContext(final String id, + final EvaluatorDescriptor descriptor) { + final ActiveContext mockedContext = mock(ActiveContext.class); + when(mockedContext.getId()).thenReturn(id); + when(mockedContext.getEvaluatorDescriptor()).thenReturn(descriptor); + return mockedContext; + } +} diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java similarity index 68% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java index 5af24765..2f5473b4 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java +++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java @@ -13,17 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master; +package edu.snu.nemo.runtime.master; -import edu.snu.nemo.common.coder.Coder; import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; -import edu.snu.nemo.common.ir.vertex.OperatorVertex; -import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; -import edu.snu.nemo.compiler.frontend.beam.transform.DoTransform; -import edu.snu.nemo.common.ir.vertex.transform.Transform; -import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer; import edu.snu.nemo.conf.JobConf; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import edu.snu.nemo.runtime.common.message.MessageEnvironment; @@ -35,10 +28,7 @@ import edu.snu.nemo.runtime.common.state.TaskState; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; -import edu.snu.nemo.runtime.master.JobStateManager; -import edu.snu.nemo.runtime.master.MetricMessageHandler; -import edu.snu.nemo.runtime.master.BlockManagerMaster; -import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy; +import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator; import org.apache.reef.tang.Injector; import org.apache.reef.tang.Tang; import org.junit.Before; @@ -88,49 +78,14 @@ public void setUp() throws Exception { */ @Test public void testPhysicalPlanStateChanges() throws Exception { - final Transform t = mock(Transform.class); - final IRVertex v1 = new OperatorVertex(t); - v1.setProperty(ParallelismProperty.of(3)); - irDAGBuilder.addVertex(v1); - - final IRVertex v2 = new OperatorVertex(t); - v2.setProperty(ParallelismProperty.of(2)); - irDAGBuilder.addVertex(v2); - - final IRVertex v3 = new OperatorVertex(t); - v3.setProperty(ParallelismProperty.of(3)); - irDAGBuilder.addVertex(v3); - - final IRVertex v4 = new OperatorVertex(t); - v4.setProperty(ParallelismProperty.of(2)); - irDAGBuilder.addVertex(v4); - - final IRVertex v5 = new OperatorVertex(new DoTransform(null, null)); - v5.setProperty(ParallelismProperty.of(2)); - irDAGBuilder.addVertex(v5); - - final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e1); - - final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e2); - - final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e4); - - final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v5, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e5); - - final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), - new TestPolicy(), ""); - final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator); - final JobStateManager jobStateManager = new JobStateManager( - new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getIdToIRVertex()), - blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); + final PhysicalPlan physicalPlan = + TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false); + final JobStateManager jobStateManager = + new JobStateManager(physicalPlan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); assertEquals(jobStateManager.getJobId(), "TestPlan"); - final List<PhysicalStage> stageList = physicalDAG.getTopologicalSort(); + final List<PhysicalStage> stageList = physicalPlan.getStageDAG().getTopologicalSort(); for (int stageIdx = 0; stageIdx < stageList.size(); stageIdx++) { final PhysicalStage physicalStage = stageList.get(stageIdx); diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java similarity index 100% rename from runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java similarity index 100% rename from runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java similarity index 100% rename from runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java similarity index 100% rename from runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java similarity index 100% rename from runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java similarity index 100% rename from runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java similarity index 100% rename from runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java similarity index 100% rename from runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java index 631f8373..f9d20bd1 100644 --- a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java +++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java @@ -101,7 +101,7 @@ private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge> irDA final Policy policy) throws Exception { final DAG<IRVertex, IREdge> optimized = CompiletimeOptimizer.optimize(irDAG, policy, EMPTY_DAG_DIRECTORY); final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR); - return new PhysicalPlan("Plan", physicalDAG, PLAN_GENERATOR.getIdToIRVertex()); + return new PhysicalPlan("TestPlan", physicalDAG, PLAN_GENERATOR.getIdToIRVertex()); } /** diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/ContainerManagerTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/ContainerManagerTest.java deleted file mode 100644 index b23c2332..00000000 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/ContainerManagerTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright (C) 2017 Seoul National University - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package edu.snu.nemo.tests.runtime.master; - -import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; -import edu.snu.nemo.conf.JobConf; -import edu.snu.nemo.runtime.common.message.MessageEnvironment; -import edu.snu.nemo.runtime.master.resource.ContainerManager; -import edu.snu.nemo.runtime.master.resource.ResourceSpecification; -import org.apache.reef.driver.catalog.NodeDescriptor; -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.driver.evaluator.AllocatedEvaluator; -import org.apache.reef.driver.evaluator.EvaluatorDescriptor; -import org.apache.reef.driver.evaluator.EvaluatorRequestor; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.exceptions.InjectionException; -import org.junit.Before; -import org.junit.Test; - -import java.util.concurrent.*; - -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Tests {@link edu.snu.nemo.runtime.master.resource.ContainerManager}. - */ -public final class ContainerManagerTest { - private ContainerManager containerManager; - private int testIdNumber = 0; - private final ExecutorService containerAllocationPool = Executors.newFixedThreadPool(5); - private final BlockingDeque<ActiveContext> mockResourceAllocationQueue = new LinkedBlockingDeque<>(); - - private final int DEFAULT_CAPACITY = 4; - private final int DEFAULT_MEMORY = 10240; - - @Before - public void setUp() throws InjectionException { - - final MessageEnvironment mockMsgEnv = mock(MessageEnvironment.class); - when(mockMsgEnv.asyncConnect(anyString(), anyString())).thenReturn(mock(Future.class)); - final Configuration configuration = Tang.Factory.getTang().newConfigurationBuilder() - .bindNamedParameter(JobConf.ScheduleSerThread.class, "1") - .build(); - final Injector injector = Tang.Factory.getTang().newInjector(configuration); - injector.bindVolatileInstance(EvaluatorRequestor.class, mock(EvaluatorRequestor.class)); - injector.bindVolatileInstance(MessageEnvironment.class, mockMsgEnv); - containerManager = injector.getInstance(ContainerManager.class); - } - - @Test(timeout=5000) - public void testAllocationAfterJobCompletion() { - // Create 3 resource specifications, {A, B, C}. - final ResourceSpecification a = - new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, DEFAULT_CAPACITY, DEFAULT_MEMORY); - final ResourceSpecification b = - new ResourceSpecification(ExecutorPlacementProperty.TRANSIENT, DEFAULT_CAPACITY, DEFAULT_MEMORY); - final ResourceSpecification c = - new ResourceSpecification(ExecutorPlacementProperty.RESERVED, DEFAULT_CAPACITY, DEFAULT_MEMORY); - - // Create 2 of A, 2 of B and 1 of C. - containerManager.requestContainer(2, a); - containerManager.requestContainer(2, b); - containerManager.requestContainer(1, c); - - // We allocate 4 containers and start 4 executors. - allocateResource(createMockContext()); - allocateResource(createMockContext()); - allocateResource(createMockContext()); - allocateResource(createMockContext()); - } - - private AllocatedEvaluator createMockEvaluator() { - return mock(AllocatedEvaluator.class); - } - - private ActiveContext createMockContext() { - final String name = "TestContext" + testIdNumber++; - final NodeDescriptor mockedNodeDescriptor = mock(NodeDescriptor.class); - when(mockedNodeDescriptor.getName()).thenReturn(name); - final EvaluatorDescriptor mockedEvaluatorDescriptor = mock(EvaluatorDescriptor.class); - when(mockedEvaluatorDescriptor.getNodeDescriptor()).thenReturn(mockedNodeDescriptor); - final ActiveContext mockedContext = mock(ActiveContext.class); - when(mockedContext.getId()).thenReturn(name); - when(mockedContext.getEvaluatorDescriptor()).thenReturn(mockedEvaluatorDescriptor); - - return mockedContext; - } - - private void allocateResource(final ActiveContext mockContext) { - containerManager.onContainerAllocated(mockContext.getId(), createMockEvaluator(), null); - containerManager.onContainerLaunched(mockContext); - } -} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services