Repository: flink Updated Branches: refs/heads/master 81ebe980a -> 07a9a56c1
[FLINK-1683] [jobmanager]Â Fix scheduling preference choice for non-unary execution tasks. This closes #476 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07a9a56c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07a9a56c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07a9a56c Branch: refs/heads/master Commit: 07a9a56c1726b4a7aeb3e682b887d77ab1b0e440 Parents: 81ebe98 Author: Fabian Hueske <fhue...@apache.org> Authored: Tue Mar 10 19:19:27 2015 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Mar 13 14:20:08 2015 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionVertex.java | 28 ++++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/07a9a56c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 41b78f8..794ca21 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import static com.google.common.base.Preconditions.checkElementIndex; @@ -348,22 +349,39 @@ public class ExecutionVertex implements Serializable { return Collections.emptySet(); } else { - HashSet<Instance> locations = new HashSet<Instance>(); - + + Set<Instance> locations = new HashSet<Instance>(); + Set<Instance> inputLocations = new HashSet<Instance>(); + + // go over all inputs for (int i = 0; i < inputEdges.length; i++) { + inputLocations.clear(); ExecutionEdge[] sources = inputEdges[i]; if (sources != null) { + // go over all input sources for (int k = 0; k < sources.length; k++) { + // look-up assigned slot of input source SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource(); if (sourceSlot != null) { - locations.add(sourceSlot.getInstance()); - if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { - return null; + // add input location + inputLocations.add(sourceSlot.getInstance()); + // inputs which have too many distinct sources are not considered + if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { + inputLocations.clear(); + break; } } } } + // keep the locations of the input with the least preferred locations + if(locations.isEmpty() || // nothing assigned yet + (!inputLocations.isEmpty() && inputLocations.size() < locations.size())) { + // current input has fewer preferred locations + locations.clear(); + locations.addAll(inputLocations); + } } + return locations; } }