johnyangk closed pull request #86: [NEMO-51] Intermediate data location aware 
scheduling
URL: https://github.com/apache/incubator-nemo/pull/86
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
index 869644e4c..a94f25e43 100644
--- 
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
+++ 
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
@@ -18,7 +18,8 @@
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 /**
- * This property decides whether or not to schedule this vertex only on 
executors where source data reside.
+ * This property decides whether or not to schedule this vertex only on 
executors where
+ * source (including intermediate) data reside.
  */
 public final class ResourceLocalityProperty extends 
VertexExecutionProperty<Boolean> {
   private static final ResourceLocalityProperty SOURCE_TRUE = new 
ResourceLocalityProperty(true);
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
index f446362b6..d355aa73d 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
@@ -16,28 +16,68 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.ir.Readable;
+import 
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.Task;
+import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
 
 /**
- * This policy is same as {@link MinOccupancyFirstSchedulingPolicy}, however 
for Tasks
- * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick 
one of the executors
- * where the corresponding data reside.
+ * This policy tries to pick the executors where the corresponding source or 
intermediate data for a task reside.
  */
 @ThreadSafe
 @DriverSide
 @AssociatedProperty(ResourceLocalityProperty.class)
 public final class SourceLocationAwareSchedulingConstraint implements 
SchedulingConstraint {
+  private final BlockManagerMaster blockManagerMaster;
 
   @Inject
-  private SourceLocationAwareSchedulingConstraint() {
+  private SourceLocationAwareSchedulingConstraint(final BlockManagerMaster 
blockManagerMaster) {
+    this.blockManagerMaster = blockManagerMaster;
+  }
+
+  /**
+   * Find the location of the intermediate data for a task.
+   * It is only possible if the task receives only one input edge with 
One-to-One communication pattern, and
+   * the location of the input data is known.
+   *
+   * @param task the task to schedule.
+   * @return the intermediate data location.
+   */
+  private Optional<String> getIntermediateDataLocation(final Task task) {
+    if (task.getTaskIncomingEdges().size() == 1) {
+      final StageEdge physicalStageEdge = task.getTaskIncomingEdges().get(0);
+      if (CommunicationPatternProperty.Value.OneToOne.equals(
+          
physicalStageEdge.getPropertyValue(CommunicationPatternProperty.class)
+              .orElseThrow(() -> new RuntimeException("No comm pattern!")))) {
+        final String blockIdToRead =
+            RuntimeIdGenerator.generateBlockId(physicalStageEdge.getId(),
+                RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId()));
+        final BlockManagerMaster.BlockLocationRequestHandler locationHandler =
+            blockManagerMaster.getBlockLocationHandler(blockIdToRead);
+        if (locationHandler.getLocationFuture().isDone()) { // if the location 
is known.
+          try {
+            final String location = locationHandler.getLocationFuture().get();
+            return Optional.of(location);
+          } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          } catch (final ExecutionException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }
+    return Optional.empty();
   }
 
   /**
@@ -55,19 +95,29 @@ private SourceLocationAwareSchedulingConstraint() {
 
   @Override
   public boolean testSchedulability(final ExecutorRepresenter executor, final 
Task task) {
-    final Set<String> sourceLocations;
-    try {
-      sourceLocations = 
getSourceLocations(task.getIrVertexIdToReadable().values());
-    } catch (final UnsupportedOperationException e) {
-      return true;
-    } catch (final Exception e) {
-      throw new RuntimeException(e);
-    }
+    if (task.getTaskIncomingEdges().isEmpty()) { // Source task
+      final Set<String> sourceLocations;
+      try {
+        sourceLocations = 
getSourceLocations(task.getIrVertexIdToReadable().values());
+      } catch (final UnsupportedOperationException e) {
+        return true;
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
 
-    if (sourceLocations.size() == 0) {
-      return true;
-    }
+      if (sourceLocations.size() == 0) {
+        return true;
+      }
 
-    return sourceLocations.contains(executor.getNodeName());
+      return sourceLocations.contains(executor.getNodeName());
+    } else { // Non-source task.
+      final Optional<String> optionalIntermediateLoc = 
getIntermediateDataLocation(task);
+
+      if (getIntermediateDataLocation(task).isPresent()) {
+        return optionalIntermediateLoc.get().equals(executor.getExecutorId());
+      } else {
+        return true;
+      }
+    }
   }
 }
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
index f7fb98278..64abefe04 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
@@ -19,20 +19,30 @@
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
+import edu.snu.nemo.runtime.master.BlockManagerMaster;
+import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests {@link SchedulingConstraintRegistry}.
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({BlockManagerMaster.class})
 public final class SchedulingConstraintnRegistryTest {
   @Test
   public void testSchedulingConstraintRegistry() throws InjectionException {
-    final SchedulingConstraintRegistry registry = 
Tang.Factory.getTang().newInjector()
-        .getInstance(SchedulingConstraintRegistry.class);
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    injector.bindVolatileInstance(BlockManagerMaster.class, 
mock(BlockManagerMaster.class));
+    final SchedulingConstraintRegistry registry =
+        injector.getInstance(SchedulingConstraintRegistry.class);
     assertEquals(FreeSlotSchedulingConstraint.class, 
getConstraintOf(ResourceSlotProperty.class, registry));
     assertEquals(ContainerTypeAwareSchedulingConstraint.class,
         getConstraintOf(ResourcePriorityProperty.class, registry));
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
index 1beb8f953..ab1bf0bfe 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
@@ -18,9 +18,12 @@
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -38,8 +41,9 @@
  * Test cases for {@link SourceLocationAwareSchedulingConstraint}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class, 
BlockManagerMaster.class})
 public final class SourceLocationAwareSchedulingConstraintTest {
+  private Injector injector;
   private static final String SITE_0 = "SEOUL";
   private static final String SITE_1 = "JINJU";
   private static final String SITE_2 = "BUSAN";
@@ -49,6 +53,12 @@ private static ExecutorRepresenter 
mockExecutorRepresenter(final String executor
     when(executorRepresenter.getNodeName()).thenReturn(executorId);
     return executorRepresenter;
   }
+  
+  @Before
+  public void setUp() throws Exception {
+    injector = Tang.Factory.getTang().newInjector();
+    injector.bindVolatileInstance(BlockManagerMaster.class, 
mock(BlockManagerMaster.class));
+  }
 
   /**
    * {@link SourceLocationAwareSchedulingConstraint} should fail to schedule a 
{@link Task} when
@@ -56,7 +66,7 @@ private static ExecutorRepresenter 
mockExecutorRepresenter(final String executor
    */
   @Test
   public void testSourceLocationAwareSchedulingNotAvailable() throws 
InjectionException {
-    final SchedulingConstraint schedulingConstraint = 
Tang.Factory.getTang().newInjector()
+    final SchedulingConstraint schedulingConstraint = injector
         .getInstance(SourceLocationAwareSchedulingConstraint.class);
 
     // Prepare test scenario
@@ -76,7 +86,7 @@ public void testSourceLocationAwareSchedulingNotAvailable() 
throws InjectionExce
    */
   @Test
   public void testSourceLocationAwareSchedulingWithMultiSource() throws 
InjectionException {
-    final SchedulingConstraint schedulingConstraint = 
Tang.Factory.getTang().newInjector()
+    final SchedulingConstraint schedulingConstraint = injector
         .getInstance(SourceLocationAwareSchedulingConstraint.class);
     // Prepare test scenario
     final Task task0 = CreateTask.withReadablesWithSourceLocations(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to