Repository: flink
Updated Branches:
refs/heads/release-0.8 9f18cbb3a -> 095301e02
[FLINK-1683] [jobmanager]Â Fix scheduling preference choice for non-unary
execution tasks.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/095301e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/095301e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/095301e0
Branch: refs/heads/release-0.8
Commit: 095301e025a87608f3b8785d02580da7f87f1e76
Parents: 9f18cbb
Author: Fabian Hueske
Authored: Tue Mar 10 19:19:27 2015 +0100
Committer: Fabian Hueske
Committed: Wed Mar 11 17:49:31 2015 +0100
--
.../runtime/executiongraph/ExecutionVertex.java | 31
1 file changed, 25 insertions(+), 6 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/flink/blob/095301e0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
--
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 57e441e..5240845 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -23,7 +23,6 @@ import static
org.apache.flink.runtime.execution.ExecutionState.FAILED;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -44,6 +43,9 @@ import
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import java.util.HashSet;
+import java.util.Set;
+
/**
* The ExecutionVertex is a parallel subtask of the execution. It may be
executed once, or several times, each of
* which time it spawns an {@link Execution}.
@@ -285,22 +287,39 @@ public class ExecutionVertex {
* @return The preferred locations for this vertex execution, or null,
if there is no preference.
*/
public Iterable getPreferredLocations() {
- HashSet locations = new HashSet();
-
+
+ Set locations = new HashSet();
+ Set inputLocations = new HashSet();
+
+ // go over all inputs
for (int i = 0; i < inputEdges.length; i++) {
+ inputLocations.clear();
ExecutionEdge[] sources = inputEdges[i];
if (sources != null) {
+ // go over all input sources
for (int k = 0; k < sources.length; k++) {
+ // look-up assigned slot of input source
SimpleSlot sourceSlot =
sources[k].getSource().getProducer().getCurrentAssignedResource();
if (sourceSlot != null) {
-
locations.add(sourceSlot.getInstance());
- if (locations.size() >
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
- return null;
+ // add input location
+
inputLocations.add(sourceSlot.getInstance());
+ // inputs which have too many
distinct sources are not considered
+ if (inputLocations.size() >
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+ inputLocations.clear();
+ break;
}
}
}
}
+ // keep the locations of the input with the least
preferred locations
+ if(locations.isEmpty() || // nothing assigned yet
+ (!inputLocations.isEmpty() &&
inputLocations.size() < locations.size())) {
+ // current input has fewer preferred locations
+ locations.clear();
+ locations.addAll(inputLocations);
+ }
}
+
return locations;
}