johnyangk closed pull request #86: [NEMO-51] Intermediate data location aware
scheduling
URL: https://github.com/apache/incubator-nemo/pull/86
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
index 869644e4c..a94f25e43 100644
---
a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
+++
b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ResourceLocalityProperty.java
@@ -18,7 +18,8 @@
import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
/**
- * This property decides whether or not to schedule this vertex only on
executors where source data reside.
+ * This property decides whether or not to schedule this vertex only on
executors where
+ * source (including intermediate) data reside.
*/
public final class ResourceLocalityProperty extends
VertexExecutionProperty<Boolean> {
private static final ResourceLocalityProperty SOURCE_TRUE = new
ResourceLocalityProperty(true);
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
index f446362b6..d355aa73d 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java
@@ -16,28 +16,68 @@
package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.common.ir.Readable;
+import
edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
import
edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.plan.Task;
+import edu.snu.nemo.runtime.master.BlockManagerMaster;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import java.util.*;
+import java.util.concurrent.ExecutionException;
/**
- * This policy is same as {@link MinOccupancyFirstSchedulingPolicy}, however
for Tasks
- * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick
one of the executors
- * where the corresponding data reside.
+ * This policy tries to pick the executors where the corresponding source or
intermediate data for a task reside.
*/
@ThreadSafe
@DriverSide
@AssociatedProperty(ResourceLocalityProperty.class)
public final class SourceLocationAwareSchedulingConstraint implements
SchedulingConstraint {
+ private final BlockManagerMaster blockManagerMaster;
@Inject
- private SourceLocationAwareSchedulingConstraint() {
+ private SourceLocationAwareSchedulingConstraint(final BlockManagerMaster
blockManagerMaster) {
+ this.blockManagerMaster = blockManagerMaster;
+ }
+
+ /**
+ * Find the location of the intermediate data for a task.
+ * It is only possible if the task receives only one input edge with
One-to-One communication pattern, and
+ * the location of the input data is known.
+ *
+ * @param task the task to schedule.
+ * @return the intermediate data location.
+ */
+ private Optional<String> getIntermediateDataLocation(final Task task) {
+ if (task.getTaskIncomingEdges().size() == 1) {
+ final StageEdge physicalStageEdge = task.getTaskIncomingEdges().get(0);
+ if (CommunicationPatternProperty.Value.OneToOne.equals(
+
physicalStageEdge.getPropertyValue(CommunicationPatternProperty.class)
+ .orElseThrow(() -> new RuntimeException("No comm pattern!")))) {
+ final String blockIdToRead =
+ RuntimeIdGenerator.generateBlockId(physicalStageEdge.getId(),
+ RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId()));
+ final BlockManagerMaster.BlockLocationRequestHandler locationHandler =
+ blockManagerMaster.getBlockLocationHandler(blockIdToRead);
+ if (locationHandler.getLocationFuture().isDone()) { // if the location
is known.
+ try {
+ final String location = locationHandler.getLocationFuture().get();
+ return Optional.of(location);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (final ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+ return Optional.empty();
}
/**
@@ -55,19 +95,29 @@ private SourceLocationAwareSchedulingConstraint() {
@Override
public boolean testSchedulability(final ExecutorRepresenter executor, final
Task task) {
- final Set<String> sourceLocations;
- try {
- sourceLocations =
getSourceLocations(task.getIrVertexIdToReadable().values());
- } catch (final UnsupportedOperationException e) {
- return true;
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
+ if (task.getTaskIncomingEdges().isEmpty()) { // Source task
+ final Set<String> sourceLocations;
+ try {
+ sourceLocations =
getSourceLocations(task.getIrVertexIdToReadable().values());
+ } catch (final UnsupportedOperationException e) {
+ return true;
+ } catch (final Exception e) {
+ throw new RuntimeException(e);
+ }
- if (sourceLocations.size() == 0) {
- return true;
- }
+ if (sourceLocations.size() == 0) {
+ return true;
+ }
- return sourceLocations.contains(executor.getNodeName());
+ return sourceLocations.contains(executor.getNodeName());
+ } else { // Non-source task.
+ final Optional<String> optionalIntermediateLoc =
getIntermediateDataLocation(task);
+
+ if (getIntermediateDataLocation(task).isPresent()) {
+ return optionalIntermediateLoc.get().equals(executor.getExecutorId());
+ } else {
+ return true;
+ }
+ }
}
}
diff --git
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
index f7fb98278..64abefe04 100644
---
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
+++
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java
@@ -19,20 +19,30 @@
import
edu.snu.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
import
edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
+import edu.snu.nemo.runtime.master.BlockManagerMaster;
+import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
/**
* Tests {@link SchedulingConstraintRegistry}.
*/
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({BlockManagerMaster.class})
public final class SchedulingConstraintnRegistryTest {
@Test
public void testSchedulingConstraintRegistry() throws InjectionException {
- final SchedulingConstraintRegistry registry =
Tang.Factory.getTang().newInjector()
- .getInstance(SchedulingConstraintRegistry.class);
+ final Injector injector = Tang.Factory.getTang().newInjector();
+ injector.bindVolatileInstance(BlockManagerMaster.class,
mock(BlockManagerMaster.class));
+ final SchedulingConstraintRegistry registry =
+ injector.getInstance(SchedulingConstraintRegistry.class);
assertEquals(FreeSlotSchedulingConstraint.class,
getConstraintOf(ResourceSlotProperty.class, registry));
assertEquals(ContainerTypeAwareSchedulingConstraint.class,
getConstraintOf(ResourcePriorityProperty.class, registry));
diff --git
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
index 1beb8f953..ab1bf0bfe 100644
---
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
+++
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java
@@ -18,9 +18,12 @@
import
edu.snu.nemo.common.ir.vertex.executionproperty.ResourceLocalityProperty;
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.runtime.master.BlockManagerMaster;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -38,8 +41,9 @@
* Test cases for {@link SourceLocationAwareSchedulingConstraint}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class,
BlockManagerMaster.class})
public final class SourceLocationAwareSchedulingConstraintTest {
+ private Injector injector;
private static final String SITE_0 = "SEOUL";
private static final String SITE_1 = "JINJU";
private static final String SITE_2 = "BUSAN";
@@ -49,6 +53,12 @@ private static ExecutorRepresenter
mockExecutorRepresenter(final String executor
when(executorRepresenter.getNodeName()).thenReturn(executorId);
return executorRepresenter;
}
+
+ @Before
+ public void setUp() throws Exception {
+ injector = Tang.Factory.getTang().newInjector();
+ injector.bindVolatileInstance(BlockManagerMaster.class,
mock(BlockManagerMaster.class));
+ }
/**
* {@link SourceLocationAwareSchedulingConstraint} should fail to schedule a
{@link Task} when
@@ -56,7 +66,7 @@ private static ExecutorRepresenter
mockExecutorRepresenter(final String executor
*/
@Test
public void testSourceLocationAwareSchedulingNotAvailable() throws
InjectionException {
- final SchedulingConstraint schedulingConstraint =
Tang.Factory.getTang().newInjector()
+ final SchedulingConstraint schedulingConstraint = injector
.getInstance(SourceLocationAwareSchedulingConstraint.class);
// Prepare test scenario
@@ -76,7 +86,7 @@ public void testSourceLocationAwareSchedulingNotAvailable()
throws InjectionExce
*/
@Test
public void testSourceLocationAwareSchedulingWithMultiSource() throws
InjectionException {
- final SchedulingConstraint schedulingConstraint =
Tang.Factory.getTang().newInjector()
+ final SchedulingConstraint schedulingConstraint = injector
.getInstance(SourceLocationAwareSchedulingConstraint.class);
// Prepare test scenario
final Task task0 = CreateTask.withReadablesWithSourceLocations(
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services