Repository: flink Updated Branches: refs/heads/release-0.8 9f18cbb3a -> 095301e02
[FLINK-1683] [jobmanager]Â Fix scheduling preference choice for non-unary execution tasks. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/095301e0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/095301e0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/095301e0 Branch: refs/heads/release-0.8 Commit: 095301e025a87608f3b8785d02580da7f87f1e76 Parents: 9f18cbb Author: Fabian Hueske <fhue...@apache.org> Authored: Tue Mar 10 19:19:27 2015 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Mar 11 17:49:31 2015 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionVertex.java | 31 ++++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/095301e0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 57e441e..5240845 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -23,7 +23,6 @@ import static org.apache.flink.runtime.execution.ExecutionState.FAILED; import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -44,6 +43,9 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import java.util.HashSet; +import java.util.Set; + /** * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of * which time it spawns an {@link Execution}. @@ -285,22 +287,39 @@ public class ExecutionVertex { * @return The preferred locations for this vertex execution, or null, if there is no preference. */ public Iterable<Instance> getPreferredLocations() { - HashSet<Instance> locations = new HashSet<Instance>(); - + + Set<Instance> locations = new HashSet<Instance>(); + Set<Instance> inputLocations = new HashSet<Instance>(); + + // go over all inputs for (int i = 0; i < inputEdges.length; i++) { + inputLocations.clear(); ExecutionEdge[] sources = inputEdges[i]; if (sources != null) { + // go over all input sources for (int k = 0; k < sources.length; k++) { + // look-up assigned slot of input source SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource(); if (sourceSlot != null) { - locations.add(sourceSlot.getInstance()); - if (locations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { - return null; + // add input location + inputLocations.add(sourceSlot.getInstance()); + // inputs which have too many distinct sources are not considered + if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { + inputLocations.clear(); + break; } } } } + // keep the locations of the input with the least preferred locations + if(locations.isEmpty() || // nothing assigned yet + (!inputLocations.isEmpty() && inputLocations.size() < locations.size())) { + // current input has fewer preferred locations + locations.clear(); + locations.addAll(inputLocations); + } } + return locations; }