sanha closed pull request #82: [NEMO-166] Do not attach NodeNamesProperty to 
Source Stages
URL: https://github.com/apache/incubator-nemo/pull/82
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
index 8c6d8a15a..f86d35550 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java
@@ -81,6 +81,16 @@ public static void setBandwidthSpecificationString(final 
String value) {
     bandwidthSpecificationString = value;
   }
 
+  private static HashMap<String, Integer> getEvenShares(final List<String> 
nodes, final int parallelism) {
+    final HashMap<String, Integer> shares = new HashMap<>();
+    final int defaultShare = parallelism / nodes.size();
+    final int remainder = parallelism % nodes.size();
+    for (int i = 0; i < nodes.size(); i++) {
+      shares.put(nodes.get(i), defaultShare + (i < remainder ? 1 : 0));
+    }
+    return shares;
+  }
+
   private static void assignNodeShares(
       final DAG<IRVertex, IREdge> dag,
       final BandwidthSpecification bandwidthSpecification) {
@@ -89,26 +99,23 @@ private static void assignNodeShares(
       final int parallelism = 
irVertex.getPropertyValue(ParallelismProperty.class)
           .orElseThrow(() -> new RuntimeException("Parallelism property 
required"));
       if (inEdges.size() == 0) {
-        // The stage is root stage.
+        // This vertex is root vertex.
         // Fall back to setting even distribution
-        final HashMap<String, Integer> shares = new HashMap<>();
-        final List<String> nodes = bandwidthSpecification.getNodes();
-        final int defaultShare = parallelism / nodes.size();
-        final int remainder = parallelism % nodes.size();
-        for (int i = 0; i < nodes.size(); i++) {
-          shares.put(nodes.get(i), defaultShare + (i < remainder ? 1 : 0));
-        }
-        irVertex.getExecutionProperties().put(NodeNamesProperty.of(shares));
+        irVertex.getExecutionProperties().put(NodeNamesProperty.of(EMPTY_MAP));
       } else if (isOneToOneEdge(inEdges)) {
-        final Optional<NodeNamesProperty> property = 
dag.getIncomingEdgesOf(irVertex).iterator().next()
+        final Optional<HashMap<String, Integer>> property = 
inEdges.iterator().next().getSrc()
             .getExecutionProperties().get(NodeNamesProperty.class);
-        irVertex.getExecutionProperties().put(property.get());
+        
irVertex.getExecutionProperties().put(NodeNamesProperty.of(property.get()));
       } else {
         // This IRVertex has shuffle inEdge(s), or has multiple inEdges.
         final Map<String, Integer> parentLocationShares = new HashMap<>();
         for (final IREdge edgeToIRVertex : dag.getIncomingEdgesOf(irVertex)) {
           final IRVertex parentVertex = edgeToIRVertex.getSrc();
-          final Map<String, Integer> shares = 
parentVertex.getPropertyValue(NodeNamesProperty.class).get();
+          final Map<String, Integer> parentShares = 
parentVertex.getPropertyValue(NodeNamesProperty.class).get();
+          final int parentParallelism = 
parentVertex.getPropertyValue(ParallelismProperty.class)
+              .orElseThrow(() -> new RuntimeException("Parallelism property 
required"));
+          final Map<String, Integer> shares = parentShares.isEmpty() ? 
getEvenShares(bandwidthSpecification.getNodes(),
+              parentParallelism) : parentShares;
           for (final Map.Entry<String, Integer> element : shares.entrySet()) {
             parentLocationShares.putIfAbsent(element.getKey(), 0);
             parentLocationShares.put(element.getKey(),
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
index 584cd67d8..6fcbc933f 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
@@ -39,7 +39,7 @@ private String getNodeName(final Map<String, Integer> 
propertyValue, final int t
     Collections.sort(nodeNames, Comparator.naturalOrder());
     int index = taskIndex;
     for (final String nodeName : nodeNames) {
-      if (index < propertyValue.get(nodeName)) {
+      if (index >= propertyValue.get(nodeName)) {
         index -= propertyValue.get(nodeName);
       } else {
         return nodeName;
@@ -55,7 +55,11 @@ public boolean testSchedulability(final ExecutorRepresenter 
executor, final Task
     if (propertyValue.isEmpty()) {
       return true;
     }
-    return executor.getNodeName().equals(
-        getNodeName(propertyValue, 
RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId())));
+    try {
+      return executor.getNodeName().equals(
+          getNodeName(propertyValue, 
RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId())));
+    } catch (final IllegalStateException e) {
+      throw new RuntimeException(String.format("Cannot schedule %s", 
task.getTaskId(), e));
+    }
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to