jeongyooneo closed pull request #27: [NEMO-49] Replace failed executor with a 
new executor
URL: https://github.com/apache/incubator-nemo/pull/27
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java 
b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
index 60e455ef..b89e40a3 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/NemoDriver.java
@@ -153,13 +153,7 @@ private void startSchedulingUserApplication() {
   public final class FailedEvaluatorHandler implements 
EventHandler<FailedEvaluator> {
     @Override
     public void onNext(final FailedEvaluator failedEvaluator) {
-      // The list size is 0 if the evaluator failed before an executor 
started. For now, the size is 1 otherwise.
-      failedEvaluator.getFailedContextList().forEach(failedContext -> {
-        final String failedExecutorId = failedContext.getId();
-        runtimeMaster.onExecutorFailed(failedExecutorId);
-      });
-      throw new RuntimeException(failedEvaluator.getId()
-          + " failed. See driver's log for the stack trace in executor.");
+      runtimeMaster.onExecutorFailed(failedEvaluator);
     }
   }
 
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 51c505d8..d2b709f4 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -35,6 +35,7 @@
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.annotations.Parameter;
 import org.slf4j.Logger;
@@ -226,12 +227,20 @@ public boolean onExecutorLaunched(final ActiveContext 
activeContext) {
 
   /**
    * Called when an executor fails due to container failure on this runtime.
-   * @param failedExecutorId of the failed executor.
+   * @param failedEvaluator that failed.
    */
-  public void onExecutorFailed(final String failedExecutorId) {
+  public void onExecutorFailed(final FailedEvaluator failedEvaluator) {
     runtimeMasterThread.execute(() -> {
-      LOG.error(failedExecutorId + " executor failed");
-      scheduler.onExecutorRemoved(failedExecutorId);
+      LOG.info("onExecutorFailed: {}", failedEvaluator.getId());
+
+      // Note that getFailedContextList() can be empty if the failure occurred
+      // prior to launching an Executor on the Evaluator.
+      failedEvaluator.getFailedContextList().forEach(failedContext -> {
+        final String failedExecutorId = failedContext.getId();
+        scheduler.onExecutorRemoved(failedExecutorId);
+      });
+
+      containerManager.onContainerFailed(failedEvaluator.getId());
     });
   }
 
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
index 7d3d917e..2561db1e 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
@@ -70,6 +70,11 @@
   private final Map<String, ResourceSpecification> 
pendingContextIdToResourceSpec;
   private final Map<String, List<ResourceSpecification>> 
pendingContainerRequestsByContainerType;
 
+  /**
+   * Remember the resource spec for each evaluator.
+   */
+  private final Map<String, ResourceSpecification> evaluatorIdToResourceSpec;
+
   @Inject
   private ContainerManager(@Parameter(JobConf.ScheduleSerThread.class) final 
int scheduleSerThread,
                            final EvaluatorRequestor evaluatorRequestor,
@@ -79,6 +84,7 @@ private 
ContainerManager(@Parameter(JobConf.ScheduleSerThread.class) final int s
     this.messageEnvironment = messageEnvironment;
     this.pendingContextIdToResourceSpec = new HashMap<>();
     this.pendingContainerRequestsByContainerType = new HashMap<>();
+    this.evaluatorIdToResourceSpec = new HashMap<>();
     this.requestLatchByResourceSpecId = new HashMap<>();
     this.serializationExecutorService = 
Executors.newFixedThreadPool(scheduleSerThread);
   }
@@ -126,7 +132,8 @@ public void requestContainer(final int numToRequest, final 
ResourceSpecification
    * @param allocatedContainer the allocated container.
    * @param executorConfiguration executor related configuration.
    */
-  public void onContainerAllocated(final String executorId, final 
AllocatedEvaluator allocatedContainer,
+  public void onContainerAllocated(final String executorId,
+                                   final AllocatedEvaluator allocatedContainer,
                                    final Configuration executorConfiguration) {
     if (isTerminated) {
       LOG.info("ContainerManager is terminated, closing {}", 
allocatedContainer.getId());
@@ -135,6 +142,8 @@ public void onContainerAllocated(final String executorId, 
final AllocatedEvaluat
     }
 
     final ResourceSpecification resourceSpecification = 
selectResourceSpecForContainer();
+    evaluatorIdToResourceSpec.put(allocatedContainer.getId(), 
resourceSpecification);
+
     LOG.info("Container type (" + resourceSpecification.getContainerType()
         + ") allocated, will be used for [" + executorId + "]");
     pendingContextIdToResourceSpec.put(executorId, resourceSpecification);
@@ -181,6 +190,20 @@ public void onContainerAllocated(final String executorId, 
final AllocatedEvaluat
     return Optional.of(executorRepresenter);
   }
 
+  /**
+   * Re-acquire a new container using the failed container's resource spec.
+   * @param failedEvaluatorId of the failed evaluator
+   * @return the resource specification of the failed evaluator
+   */
+  public ResourceSpecification onContainerFailed(final String 
failedEvaluatorId) {
+    final ResourceSpecification resourceSpecification = 
evaluatorIdToResourceSpec.remove(failedEvaluatorId);
+    if (resourceSpecification == null) {
+      throw new IllegalStateException(failedEvaluatorId + " not in " + 
evaluatorIdToResourceSpec);
+    }
+    requestContainer(1, resourceSpecification);
+    return resourceSpecification;
+  }
+
   public void terminate() {
     if (isTerminated) {
       throw new IllegalStateException("Cannot terminate twice");
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
similarity index 99%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
index c08815c2..af8371b2 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/BlockManagerMasterTest.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master;
+package edu.snu.nemo.runtime.master;
 
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.exception.AbsentBlockException;
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java
new file mode 100644
index 00000000..86d1ad04
--- /dev/null
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/ContainerManagerTest.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.conf.JobConf;
+import edu.snu.nemo.runtime.common.message.MessageEnvironment;
+import edu.snu.nemo.runtime.master.resource.ContainerManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests {@link edu.snu.nemo.runtime.master.resource.ContainerManager}.
+ */
+public final class ContainerManagerTest {
+  private static final ResourceSpecification RESOURCE_SPEC_A =
+      new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 1, 1024);
+  private static final ResourceSpecification RESOURCE_SPEC_B =
+      new ResourceSpecification(ExecutorPlacementProperty.TRANSIENT, 2, 2048);
+  private static final ResourceSpecification RESOURCE_SPEC_C =
+      new ResourceSpecification(ExecutorPlacementProperty.RESERVED, 3, 3072);
+
+  private ContainerManager containerManager;
+  private AtomicInteger testIdNumber = new AtomicInteger(0);
+
+  private String getNodeName() {
+    return "NODE-" + testIdNumber.incrementAndGet();
+  }
+
+  private String getEvaluatorId() {
+    return "EVALUATOR-" + testIdNumber.incrementAndGet();
+  }
+
+  private String getExecutorId() {
+    return "EXECUTOR-" + testIdNumber.incrementAndGet();
+  }
+
+  @Before
+  public void setUp() throws InjectionException {
+
+    final MessageEnvironment mockMsgEnv = mock(MessageEnvironment.class);
+    when(mockMsgEnv.asyncConnect(anyString(), 
anyString())).thenReturn(mock(Future.class));
+    final Configuration configuration = 
Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(JobConf.ScheduleSerThread.class, "1")
+        .build();
+    final Injector injector = 
Tang.Factory.getTang().newInjector(configuration);
+    injector.bindVolatileInstance(EvaluatorRequestor.class, 
mock(EvaluatorRequestor.class));
+    injector.bindVolatileInstance(MessageEnvironment.class, mockMsgEnv);
+    containerManager = injector.getInstance(ContainerManager.class);
+  }
+
+  @Test
+  public void testRequestAllocateLaunch() {
+    // Create 2 of A, 2 of B and 1 of C.
+    final Map<Integer, ResourceSpecification> numToSpec = new HashMap();
+    numToSpec.put(2, RESOURCE_SPEC_A);
+    numToSpec.put(2, RESOURCE_SPEC_B);
+    numToSpec.put(1, RESOURCE_SPEC_C);
+
+    // Request -> Allocate -> Launch
+    for (final Map.Entry<Integer, ResourceSpecification> entry : 
numToSpec.entrySet()) {
+      final int num = entry.getKey();
+      final ResourceSpecification spec = entry.getValue();
+      containerManager.requestContainer(num, spec);
+
+      for (int i = 0; i < num; i++) {
+        final String evaluatorId = getEvaluatorId();
+        final String executorId = getExecutorId();
+        final EvaluatorDescriptor descriptor = createDescriptor(spec);
+
+        containerManager.onContainerAllocated(
+            executorId,
+            createMockEvaluator(evaluatorId, descriptor),
+            mock(Configuration.class));
+        final ExecutorRepresenter executorRepresenter =
+            containerManager.onContainerLaunched(createMockContext(executorId, 
descriptor)).get();
+        assertEquals(spec.getContainerType(), 
executorRepresenter.getContainerType());
+        assertEquals(spec.getCapacity(), 
executorRepresenter.getExecutorCapacity());
+        assertEquals(descriptor.getNodeDescriptor().getName(), 
executorRepresenter.getNodeName());
+      }
+    }
+  }
+
+  @Test
+  public void testFailureBeforeLaunch() {
+    containerManager.requestContainer(1, RESOURCE_SPEC_A);
+    final String evaluatorId = getEvaluatorId();
+
+    containerManager.onContainerAllocated(
+        getExecutorId(),
+        createMockEvaluator(evaluatorId, createDescriptor(RESOURCE_SPEC_A)),
+        mock(Configuration.class));
+    assertEquals(RESOURCE_SPEC_A, 
containerManager.onContainerFailed(evaluatorId));
+  }
+
+  @Test
+  public void testFailureAfterLaunch() {
+    containerManager.requestContainer(1, RESOURCE_SPEC_A);
+    final String evaluatorId = getEvaluatorId();
+    final String executorId = getExecutorId();
+    final EvaluatorDescriptor descriptor = createDescriptor(RESOURCE_SPEC_A);
+
+    containerManager.onContainerAllocated(
+        executorId,
+        createMockEvaluator(evaluatorId, descriptor),
+        mock(Configuration.class));
+    containerManager.onContainerLaunched(createMockContext(executorId, 
descriptor));
+    assertEquals(RESOURCE_SPEC_A, 
containerManager.onContainerFailed(evaluatorId));
+  }
+
+  private EvaluatorDescriptor createDescriptor(final ResourceSpecification 
spec) {
+    final EvaluatorDescriptor descriptor = mock(EvaluatorDescriptor.class);
+    when(descriptor.getMemory()).thenReturn(spec.getMemory());
+    when(descriptor.getNumberOfCores()).thenReturn(spec.getCapacity());
+
+    final NodeDescriptor node = mock(NodeDescriptor.class);
+    when(node.getName()).thenReturn(getNodeName());
+    when(descriptor.getNodeDescriptor()).thenReturn(node);
+    return descriptor;
+  }
+
+  private AllocatedEvaluator createMockEvaluator(final String id,
+                                                 final EvaluatorDescriptor 
descriptor) {
+    final AllocatedEvaluator evaluator = mock(AllocatedEvaluator.class);
+    when(evaluator.getId()).thenReturn(id);
+    when(evaluator.getEvaluatorDescriptor()).thenReturn(descriptor);
+    return evaluator;
+  }
+
+  private ActiveContext createMockContext(final String id,
+                                          final EvaluatorDescriptor 
descriptor) {
+    final ActiveContext mockedContext = mock(ActiveContext.class);
+    when(mockedContext.getId()).thenReturn(id);
+    when(mockedContext.getEvaluatorDescriptor()).thenReturn(descriptor);
+    return mockedContext;
+  }
+}
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
similarity index 68%
rename from 
tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
index 5af24765..2f5473b4 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/JobStateManagerTest.java
@@ -13,17 +13,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.tests.runtime.master;
+package edu.snu.nemo.runtime.master;
 
-import edu.snu.nemo.common.coder.Coder;
 import edu.snu.nemo.common.ir.edge.IREdge;
-import 
edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import edu.snu.nemo.compiler.frontend.beam.transform.DoTransform;
-import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
@@ -35,10 +28,7 @@
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
-import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.MetricMessageHandler;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
+import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.junit.Before;
@@ -88,49 +78,14 @@ public void setUp() throws Exception {
    */
   @Test
   public void testPhysicalPlanStateChanges() throws Exception {
-    final Transform t = mock(Transform.class);
-    final IRVertex v1 = new OperatorVertex(t);
-    v1.setProperty(ParallelismProperty.of(3));
-    irDAGBuilder.addVertex(v1);
-
-    final IRVertex v2 = new OperatorVertex(t);
-    v2.setProperty(ParallelismProperty.of(2));
-    irDAGBuilder.addVertex(v2);
-
-    final IRVertex v3 = new OperatorVertex(t);
-    v3.setProperty(ParallelismProperty.of(3));
-    irDAGBuilder.addVertex(v3);
-
-    final IRVertex v4 = new OperatorVertex(t);
-    v4.setProperty(ParallelismProperty.of(2));
-    irDAGBuilder.addVertex(v4);
-
-    final IRVertex v5 = new OperatorVertex(new DoTransform(null, null));
-    v5.setProperty(ParallelismProperty.of(2));
-    irDAGBuilder.addVertex(v5);
-
-    final IREdge e1 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e1);
-
-    final IREdge e2 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e2);
-
-    final IREdge e4 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e4);
-
-    final IREdge e5 = new 
IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v5, 
Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e5);
-
-    final DAG<IRVertex, IREdge> irDAG = 
CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(), "");
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
irDAG.convert(physicalPlanGenerator);
-    final JobStateManager jobStateManager = new JobStateManager(
-        new PhysicalPlan("TestPlan", physicalDAG, 
physicalPlanGenerator.getIdToIRVertex()),
-        blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
+    final PhysicalPlan physicalPlan =
+        
TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined,
 false);
+    final JobStateManager jobStateManager =
+        new JobStateManager(physicalPlan, blockManagerMaster, 
metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
 
     assertEquals(jobStateManager.getJobId(), "TestPlan");
 
-    final List<PhysicalStage> stageList = physicalDAG.getTopologicalSort();
+    final List<PhysicalStage> stageList = 
physicalPlan.getStageDAG().getTopologicalSort();
 
     for (int stageIdx = 0; stageIdx < stageList.size(); stageIdx++) {
       final PhysicalStage physicalStage = stageList.get(stageIdx);
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
similarity index 100%
rename from 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
similarity index 100%
rename from 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
similarity index 100%
rename from 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
similarity index 100%
rename from 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
similarity index 100%
rename from 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
similarity index 100%
rename from 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
similarity index 100%
rename from 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
diff --git 
a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
similarity index 100%
rename from 
runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
rename to 
runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
diff --git 
a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
 
b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
index 631f8373..f9d20bd1 100644
--- 
a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
+++ 
b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java
@@ -101,7 +101,7 @@ private static PhysicalPlan convertIRToPhysical(final 
DAG<IRVertex, IREdge> irDA
                                                   final Policy policy) throws 
Exception {
     final DAG<IRVertex, IREdge> optimized = 
CompiletimeOptimizer.optimize(irDAG, policy, EMPTY_DAG_DIRECTORY);
     final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = 
optimized.convert(PLAN_GENERATOR);
-    return new PhysicalPlan("Plan", physicalDAG, 
PLAN_GENERATOR.getIdToIRVertex());
+    return new PhysicalPlan("TestPlan", physicalDAG, 
PLAN_GENERATOR.getIdToIRVertex());
   }
 
   /**
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/ContainerManagerTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/ContainerManagerTest.java
deleted file mode 100644
index b23c2332..00000000
--- 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/ContainerManagerTest.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.tests.runtime.master;
-
-import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.message.MessageEnvironment;
-import edu.snu.nemo.runtime.master.resource.ContainerManager;
-import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
-import org.apache.reef.driver.catalog.NodeDescriptor;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
-import org.apache.reef.driver.evaluator.EvaluatorRequestor;
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.concurrent.*;
-
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests {@link edu.snu.nemo.runtime.master.resource.ContainerManager}.
- */
-public final class ContainerManagerTest {
-  private ContainerManager containerManager;
-  private int testIdNumber = 0;
-  private final ExecutorService containerAllocationPool = 
Executors.newFixedThreadPool(5);
-  private final BlockingDeque<ActiveContext> mockResourceAllocationQueue = new 
LinkedBlockingDeque<>();
-
-  private final int DEFAULT_CAPACITY = 4;
-  private final int DEFAULT_MEMORY = 10240;
-
-  @Before
-  public void setUp() throws InjectionException {
-
-    final MessageEnvironment mockMsgEnv = mock(MessageEnvironment.class);
-    when(mockMsgEnv.asyncConnect(anyString(), 
anyString())).thenReturn(mock(Future.class));
-    final Configuration configuration = 
Tang.Factory.getTang().newConfigurationBuilder()
-        .bindNamedParameter(JobConf.ScheduleSerThread.class, "1")
-        .build();
-    final Injector injector = 
Tang.Factory.getTang().newInjector(configuration);
-    injector.bindVolatileInstance(EvaluatorRequestor.class, 
mock(EvaluatorRequestor.class));
-    injector.bindVolatileInstance(MessageEnvironment.class, mockMsgEnv);
-    containerManager = injector.getInstance(ContainerManager.class);
-  }
-
-  @Test(timeout=5000)
-  public void testAllocationAfterJobCompletion() {
-    // Create 3 resource specifications, {A, B, C}.
-    final ResourceSpecification a =
-        new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 
DEFAULT_CAPACITY, DEFAULT_MEMORY);
-    final ResourceSpecification b =
-        new ResourceSpecification(ExecutorPlacementProperty.TRANSIENT, 
DEFAULT_CAPACITY, DEFAULT_MEMORY);
-    final ResourceSpecification c =
-        new ResourceSpecification(ExecutorPlacementProperty.RESERVED, 
DEFAULT_CAPACITY, DEFAULT_MEMORY);
-
-    // Create 2 of A, 2 of B and 1 of C.
-    containerManager.requestContainer(2, a);
-    containerManager.requestContainer(2, b);
-    containerManager.requestContainer(1, c);
-
-    // We allocate 4 containers and start 4 executors.
-    allocateResource(createMockContext());
-    allocateResource(createMockContext());
-    allocateResource(createMockContext());
-    allocateResource(createMockContext());
-  }
-
-  private AllocatedEvaluator createMockEvaluator() {
-    return mock(AllocatedEvaluator.class);
-  }
-
-  private ActiveContext createMockContext() {
-    final String name = "TestContext" + testIdNumber++;
-    final NodeDescriptor mockedNodeDescriptor = mock(NodeDescriptor.class);
-    when(mockedNodeDescriptor.getName()).thenReturn(name);
-    final EvaluatorDescriptor mockedEvaluatorDescriptor = 
mock(EvaluatorDescriptor.class);
-    
when(mockedEvaluatorDescriptor.getNodeDescriptor()).thenReturn(mockedNodeDescriptor);
-    final ActiveContext mockedContext = mock(ActiveContext.class);
-    when(mockedContext.getId()).thenReturn(name);
-    
when(mockedContext.getEvaluatorDescriptor()).thenReturn(mockedEvaluatorDescriptor);
-
-    return mockedContext;
-  }
-
-  private void allocateResource(final ActiveContext mockContext) {
-    containerManager.onContainerAllocated(mockContext.getId(), 
createMockEvaluator(), null);
-    containerManager.onContainerLaunched(mockContext);
-  }
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to