sanha closed pull request #82: [NEMO-166] Do not attach NodeNamesProperty to Source Stages URL: https://github.com/apache/incubator-nemo/pull/82
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java index 8c6d8a15a..f86d35550 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/NodeNamesAssignmentPass.java @@ -81,6 +81,16 @@ public static void setBandwidthSpecificationString(final String value) { bandwidthSpecificationString = value; } + private static HashMap<String, Integer> getEvenShares(final List<String> nodes, final int parallelism) { + final HashMap<String, Integer> shares = new HashMap<>(); + final int defaultShare = parallelism / nodes.size(); + final int remainder = parallelism % nodes.size(); + for (int i = 0; i < nodes.size(); i++) { + shares.put(nodes.get(i), defaultShare + (i < remainder ? 1 : 0)); + } + return shares; + } + private static void assignNodeShares( final DAG<IRVertex, IREdge> dag, final BandwidthSpecification bandwidthSpecification) { @@ -89,26 +99,23 @@ private static void assignNodeShares( final int parallelism = irVertex.getPropertyValue(ParallelismProperty.class) .orElseThrow(() -> new RuntimeException("Parallelism property required")); if (inEdges.size() == 0) { - // The stage is root stage. + // This vertex is root vertex. // Fall back to setting even distribution - final HashMap<String, Integer> shares = new HashMap<>(); - final List<String> nodes = bandwidthSpecification.getNodes(); - final int defaultShare = parallelism / nodes.size(); - final int remainder = parallelism % nodes.size(); - for (int i = 0; i < nodes.size(); i++) { - shares.put(nodes.get(i), defaultShare + (i < remainder ? 1 : 0)); - } - irVertex.getExecutionProperties().put(NodeNamesProperty.of(shares)); + irVertex.getExecutionProperties().put(NodeNamesProperty.of(EMPTY_MAP)); } else if (isOneToOneEdge(inEdges)) { - final Optional<NodeNamesProperty> property = dag.getIncomingEdgesOf(irVertex).iterator().next() + final Optional<HashMap<String, Integer>> property = inEdges.iterator().next().getSrc() .getExecutionProperties().get(NodeNamesProperty.class); - irVertex.getExecutionProperties().put(property.get()); + irVertex.getExecutionProperties().put(NodeNamesProperty.of(property.get())); } else { // This IRVertex has shuffle inEdge(s), or has multiple inEdges. final Map<String, Integer> parentLocationShares = new HashMap<>(); for (final IREdge edgeToIRVertex : dag.getIncomingEdgesOf(irVertex)) { final IRVertex parentVertex = edgeToIRVertex.getSrc(); - final Map<String, Integer> shares = parentVertex.getPropertyValue(NodeNamesProperty.class).get(); + final Map<String, Integer> parentShares = parentVertex.getPropertyValue(NodeNamesProperty.class).get(); + final int parentParallelism = parentVertex.getPropertyValue(ParallelismProperty.class) + .orElseThrow(() -> new RuntimeException("Parallelism property required")); + final Map<String, Integer> shares = parentShares.isEmpty() ? getEvenShares(bandwidthSpecification.getNodes(), + parentParallelism) : parentShares; for (final Map.Entry<String, Integer> element : shares.entrySet()) { parentLocationShares.putIfAbsent(element.getKey(), 0); parentLocationShares.put(element.getKey(), diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java index 584cd67d8..6fcbc933f 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java @@ -39,7 +39,7 @@ private String getNodeName(final Map<String, Integer> propertyValue, final int t Collections.sort(nodeNames, Comparator.naturalOrder()); int index = taskIndex; for (final String nodeName : nodeNames) { - if (index < propertyValue.get(nodeName)) { + if (index >= propertyValue.get(nodeName)) { index -= propertyValue.get(nodeName); } else { return nodeName; @@ -55,7 +55,11 @@ public boolean testSchedulability(final ExecutorRepresenter executor, final Task if (propertyValue.isEmpty()) { return true; } - return executor.getNodeName().equals( - getNodeName(propertyValue, RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId()))); + try { + return executor.getNodeName().equals( + getNodeName(propertyValue, RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId()))); + } catch (final IllegalStateException e) { + throw new RuntimeException(String.format("Cannot schedule %s", task.getTaskId(), e)); + } } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services