Repository: flink
Updated Branches:
  refs/heads/release-0.8 9f18cbb3a -> 095301e02


[FLINK-1683] [jobmanager] Fix scheduling preference choice for non-unary 
execution tasks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/095301e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/095301e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/095301e0

Branch: refs/heads/release-0.8
Commit: 095301e025a87608f3b8785d02580da7f87f1e76
Parents: 9f18cbb
Author: Fabian Hueske <fhue...@apache.org>
Authored: Tue Mar 10 19:19:27 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Mar 11 17:49:31 2015 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionVertex.java | 31 ++++++++++++++++----
 1 file changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/095301e0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 57e441e..5240845 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -23,7 +23,6 @@ import static 
org.apache.flink.runtime.execution.ExecutionState.FAILED;
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
@@ -44,6 +43,9 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 
+import java.util.HashSet;
+import java.util.Set;
+
 /**
  * The ExecutionVertex is a parallel subtask of the execution. It may be 
executed once, or several times, each of
  * which time it spawns an {@link Execution}.
@@ -285,22 +287,39 @@ public class ExecutionVertex {
         * @return The preferred locations for this vertex execution, or null, 
if there is no preference.
         */
        public Iterable<Instance> getPreferredLocations() {
-               HashSet<Instance> locations = new HashSet<Instance>();
-               
+
+               Set<Instance> locations = new HashSet<Instance>();
+               Set<Instance> inputLocations = new HashSet<Instance>();
+
+               // go over all inputs
                for (int i = 0; i < inputEdges.length; i++) {
+                       inputLocations.clear();
                        ExecutionEdge[] sources = inputEdges[i];
                        if (sources != null) {
+                               // go over all input sources
                                for (int k = 0; k < sources.length; k++) {
+                                       // look-up assigned slot of input source
                                        SimpleSlot sourceSlot = 
sources[k].getSource().getProducer().getCurrentAssignedResource();
                                        if (sourceSlot != null) {
-                                               
locations.add(sourceSlot.getInstance());
-                                               if (locations.size() > 
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-                                                       return null;
+                                               // add input location
+                                               
inputLocations.add(sourceSlot.getInstance());
+                                               // inputs which have too many 
distinct sources are not considered
+                                               if (inputLocations.size() > 
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+                                                       inputLocations.clear();
+                                                       break;
                                                }
                                        }
                                }
                        }
+                       // keep the locations of the input with the least 
preferred locations
+                       if(locations.isEmpty() || // nothing assigned yet
+                                       (!inputLocations.isEmpty() && 
inputLocations.size() < locations.size())) {
+                               // current input has fewer preferred locations
+                               locations.clear();
+                               locations.addAll(inputLocations);
+                       }
                }
+
                return locations;
        }
        

Reply via email to