flink git commit: [FLINK-1683] [jobmanager] Fix scheduling preference choice for non-unary execution tasks.

2015-03-13 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/master 81ebe980a - 07a9a56c1


[FLINK-1683] [jobmanager] Fix scheduling preference choice for non-unary 
execution tasks.

This closes #476


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07a9a56c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07a9a56c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07a9a56c

Branch: refs/heads/master
Commit: 07a9a56c1726b4a7aeb3e682b887d77ab1b0e440
Parents: 81ebe98
Author: Fabian Hueske fhue...@apache.org
Authored: Tue Mar 10 19:19:27 2015 +0100
Committer: Fabian Hueske fhue...@apache.org
Committed: Fri Mar 13 14:20:08 2015 +0100

--
 .../runtime/executiongraph/ExecutionVertex.java | 28 
 1 file changed, 23 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/07a9a56c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 41b78f8..794ca21 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -47,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import static com.google.common.base.Preconditions.checkElementIndex;
@@ -348,22 +349,39 @@ public class ExecutionVertex implements Serializable {
return Collections.emptySet();
}
else {
-   HashSetInstance locations = new HashSetInstance();
-   
+
+   SetInstance locations = new HashSetInstance();
+   SetInstance inputLocations = new HashSetInstance();
+
+   // go over all inputs
for (int i = 0; i  inputEdges.length; i++) {
+   inputLocations.clear();
ExecutionEdge[] sources = inputEdges[i];
if (sources != null) {
+   // go over all input sources
for (int k = 0; k  sources.length; 
k++) {
+   // look-up assigned slot of 
input source
SimpleSlot sourceSlot = 
sources[k].getSource().getProducer().getCurrentAssignedResource();
if (sourceSlot != null) {
-   
locations.add(sourceSlot.getInstance());
-   if (locations.size()  
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-   return null;
+   // add input location
+   
inputLocations.add(sourceSlot.getInstance());
+   // inputs which have 
too many distinct sources are not considered
+   if 
(inputLocations.size()  MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+   
inputLocations.clear();
+   break;
}
}
}
}
+   // keep the locations of the input with the 
least preferred locations
+   if(locations.isEmpty() || // nothing assigned 
yet
+   (!inputLocations.isEmpty()  
inputLocations.size()  locations.size())) {
+   // current input has fewer preferred 
locations
+   locations.clear();
+   locations.addAll(inputLocations);
+   }
}
+
return locations;
}
}



flink git commit: [FLINK-1683] [jobmanager] Fix scheduling preference choice for non-unary execution tasks.

2015-03-13 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/release-0.8 9f18cbb3a - 095301e02


[FLINK-1683] [jobmanager] Fix scheduling preference choice for non-unary 
execution tasks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/095301e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/095301e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/095301e0

Branch: refs/heads/release-0.8
Commit: 095301e025a87608f3b8785d02580da7f87f1e76
Parents: 9f18cbb
Author: Fabian Hueske fhue...@apache.org
Authored: Tue Mar 10 19:19:27 2015 +0100
Committer: Fabian Hueske fhue...@apache.org
Committed: Wed Mar 11 17:49:31 2015 +0100

--
 .../runtime/executiongraph/ExecutionVertex.java | 31 
 1 file changed, 25 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/095301e0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 57e441e..5240845 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -23,7 +23,6 @@ import static 
org.apache.flink.runtime.execution.ExecutionState.FAILED;
 import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
@@ -44,6 +43,9 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 
+import java.util.HashSet;
+import java.util.Set;
+
 /**
  * The ExecutionVertex is a parallel subtask of the execution. It may be 
executed once, or several times, each of
  * which time it spawns an {@link Execution}.
@@ -285,22 +287,39 @@ public class ExecutionVertex {
 * @return The preferred locations for this vertex execution, or null, 
if there is no preference.
 */
public IterableInstance getPreferredLocations() {
-   HashSetInstance locations = new HashSetInstance();
-   
+
+   SetInstance locations = new HashSetInstance();
+   SetInstance inputLocations = new HashSetInstance();
+
+   // go over all inputs
for (int i = 0; i  inputEdges.length; i++) {
+   inputLocations.clear();
ExecutionEdge[] sources = inputEdges[i];
if (sources != null) {
+   // go over all input sources
for (int k = 0; k  sources.length; k++) {
+   // look-up assigned slot of input source
SimpleSlot sourceSlot = 
sources[k].getSource().getProducer().getCurrentAssignedResource();
if (sourceSlot != null) {
-   
locations.add(sourceSlot.getInstance());
-   if (locations.size()  
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-   return null;
+   // add input location
+   
inputLocations.add(sourceSlot.getInstance());
+   // inputs which have too many 
distinct sources are not considered
+   if (inputLocations.size()  
MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+   inputLocations.clear();
+   break;
}
}
}
}
+   // keep the locations of the input with the least 
preferred locations
+   if(locations.isEmpty() || // nothing assigned yet
+   (!inputLocations.isEmpty()  
inputLocations.size()  locations.size())) {
+   // current input has fewer preferred locations
+   locations.clear();
+   locations.addAll(inputLocations);
+   }
}
+
return