Repository: flink
Updated Branches:
  refs/heads/master 81ebe980a -> 07a9a56c1


[FLINK-1683] [jobmanager] Fix scheduling preference choice for non-unary 
execution tasks.

This closes #476


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07a9a56c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07a9a56c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07a9a56c

Branch: refs/heads/master
Commit: 07a9a56c1726b4a7aeb3e682b887d77ab1b0e440
Parents: 81ebe98
Author: Fabian Hueske <fhue...@apache.org>
Authored: Tue Mar 10 19:19:27 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Mar 13 14:20:08 2015 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionVertex.java | 28 ++++++++++++++++----
 1 file changed, 23 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/07a9a56c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 41b78f8..794ca21 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -47,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import static com.google.common.base.Preconditions.checkElementIndex;
@@ -348,22 +349,39 @@ public class ExecutionVertex implements Serializable {
                        return Collections.emptySet();
                }
                else {
-                       HashSet<Instance> locations = new HashSet<Instance>();
-               
+
+                       Set<Instance> locations = new HashSet<Instance>();
+                       Set<Instance> inputLocations = new HashSet<Instance>();
+
+                       // go over all inputs
                        for (int i = 0; i < inputEdges.length; i++) {
+                               inputLocations.clear();
                                ExecutionEdge[] sources = inputEdges[i];
                                if (sources != null) {
+                                       // go over all input sources
                                        for (int k = 0; k < sources.length; 
k++) {
+                                               // look-up assigned slot of 
input source
                                                SimpleSlot sourceSlot = 
sources[k].getSource().getProducer().getCurrentAssignedResource();
                                                if (sourceSlot != null) {
-                                                       
locations.add(sourceSlot.getInstance());
-                                                       if (locations.size() > 
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-                                                               return null;
+                                                       // add input location
+                                                       
inputLocations.add(sourceSlot.getInstance());
+                                                       // inputs which have 
too many distinct sources are not considered
+                                                       if 
(inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+                                                               
inputLocations.clear();
+                                                               break;
                                                        }
                                                }
                                        }
                                }
+                               // keep the locations of the input with the 
least preferred locations
+                               if(locations.isEmpty() || // nothing assigned 
yet
+                                               (!inputLocations.isEmpty() && 
inputLocations.size() < locations.size())) {
+                                       // current input has fewer preferred 
locations
+                                       locations.clear();
+                                       locations.addAll(inputLocations);
+                               }
                        }
+
                        return locations;
                }
        }

Reply via email to