[FLINK-1443 [api-breaking] Extended split assigner interface by parallel task 
id.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7452802b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7452802b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7452802b

Branch: refs/heads/master
Commit: 7452802bc25c0915b7347d4faf2d60adcfc27644
Parents: 27a479f
Author: Fabian Hueske <fhue...@apache.org>
Authored: Tue Feb 3 15:15:52 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Feb 5 11:18:03 2015 +0100

----------------------------------------------------------------------
 .../common/io/DefaultInputSplitAssigner.java    |  2 +-
 .../common/io/LocatableInputSplitAssigner.java  |  2 +-
 .../flink/core/io/InputSplitAssigner.java       |  6 ++-
 .../flink/core/io/DefaultSplitAssignerTest.java |  8 ++--
 .../core/io/LocatableSplitAssignerTest.java     | 40 ++++++++++----------
 .../flink/runtime/jobmanager/JobManager.scala   |  4 +-
 6 files changed, 33 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7452802b/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
index 379fc4e..d69aa4f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java
@@ -51,7 +51,7 @@ public class DefaultInputSplitAssigner implements 
InputSplitAssigner {
        
        
        @Override
-       public InputSplit getNextInputSplit(String host) {
+       public InputSplit getNextInputSplit(String host, int taskId) {
                InputSplit next = null;
                
                // keep the synchronized part short

http://git-wip-us.apache.org/repos/asf/flink/blob/7452802b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
index 92fbdca..c038da6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
@@ -71,7 +71,7 @@ public final class LocatableInputSplitAssigner implements 
InputSplitAssigner {
        // 
--------------------------------------------------------------------------------------------
 
        @Override
-       public LocatableInputSplit getNextInputSplit(String host) {
+       public LocatableInputSplit getNextInputSplit(String host, int taskId) {
 
                // for a null host, we return a remote split
                if (host == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7452802b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java 
b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
index b4f72e9..3999c13 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java
@@ -28,8 +28,10 @@ public interface InputSplitAssigner {
         * Returns the next input split that shall be consumed. The consumer's 
host is passed as a parameter
         * to allow localized assignments.
         * 
-        * @param host The address of the host to assign the split to.
+        * @param host The host address of split requesting task.
+        * @param taskId The id of the split requesting task.
         * @return the next input split to be consumed, or <code>null</code> if 
no more splits remain.
         */
-       InputSplit getNextInputSplit(String host);
+       InputSplit getNextInputSplit(String host, int taskId);
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7452802b/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java
index d7d7cc8..f92b6af 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java
@@ -44,12 +44,12 @@ public class DefaultSplitAssignerTest {
                        
                        DefaultInputSplitAssigner ia = new 
DefaultInputSplitAssigner(splits);
                        InputSplit is = null;
-                       while ((is = ia.getNextInputSplit("")) != null) {
+                       while ((is = ia.getNextInputSplit("", 0)) != null) {
                                assertTrue(splits.remove(is));
                        }
                        
                        assertTrue(splits.isEmpty());
-                       assertNull(ia.getNextInputSplit(""));
+                       assertNull(ia.getNextInputSplit("", 0));
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -80,7 +80,7 @@ public class DefaultSplitAssignerTest {
                                public void run() {
                                        String host = "";
                                        GenericInputSplit split;
-                                       while ((split = (GenericInputSplit) 
ia.getNextInputSplit(host)) != null) {
+                                       while ((split = (GenericInputSplit) 
ia.getNextInputSplit(host, 0)) != null) {
                                                
splitsRetrieved.incrementAndGet();
                                                
sumOfIds.addAndGet(split.getSplitNumber());
                                        }
@@ -115,7 +115,7 @@ public class DefaultSplitAssignerTest {
                        assertEquals(SUM_OF_IDS, sumOfIds.get());
                        
                        // nothing left
-                       assertNull(ia.getNextInputSplit(""));
+                       assertNull(ia.getNextInputSplit("", 0));
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/7452802b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
index 7edad43..b30d209 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java
@@ -55,13 +55,13 @@ public class LocatableSplitAssignerTest {
                        // get all available splits
                        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
                        InputSplit is = null;
-                       while ((is = ia.getNextInputSplit(null)) != null) {
+                       while ((is = ia.getNextInputSplit(null, 0)) != null) {
                                assertTrue(splits.remove(is));
                        }
                        
                        // check we had all
                        assertTrue(splits.isEmpty());
-                       assertNull(ia.getNextInputSplit(""));
+                       assertNull(ia.getNextInputSplit("", 0));
                        assertEquals(NUM_SPLITS, 
ia.getNumberOfRemoteAssignments());
                        assertEquals(0, ia.getNumberOfLocalAssignments());
                }
@@ -85,13 +85,13 @@ public class LocatableSplitAssignerTest {
                        // get all available splits
                        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
                        InputSplit is = null;
-                       while ((is = ia.getNextInputSplit("testhost")) != null) 
{
+                       while ((is = ia.getNextInputSplit("testhost", 0)) != 
null) {
                                assertTrue(splits.remove(is));
                        }
                        
                        // check we had all
                        assertTrue(splits.isEmpty());
-                       assertNull(ia.getNextInputSplit(""));
+                       assertNull(ia.getNextInputSplit("", 0));
                        
                        assertEquals(0, ia.getNumberOfRemoteAssignments());
                        assertEquals(NUM_SPLITS, 
ia.getNumberOfLocalAssignments());
@@ -117,13 +117,13 @@ public class LocatableSplitAssignerTest {
                        // get all available splits
                        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
                        InputSplit is = null;
-                       while ((is = ia.getNextInputSplit("testhost")) != null) 
{
+                       while ((is = ia.getNextInputSplit("testhost", 0)) != 
null) {
                                assertTrue(splits.remove(is));
                        }
                        
                        // check we had all
                        assertTrue(splits.isEmpty());
-                       assertNull(ia.getNextInputSplit("anotherHost"));
+                       assertNull(ia.getNextInputSplit("anotherHost", 0));
                        
                        assertEquals(NUM_SPLITS, 
ia.getNumberOfRemoteAssignments());
                        assertEquals(0, ia.getNumberOfLocalAssignments());
@@ -167,13 +167,13 @@ public class LocatableSplitAssignerTest {
                        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
                        InputSplit is = null;
                        int i = 0;
-                       while ((is = ia.getNextInputSplit(hosts[i++ % 
hosts.length])) != null) {
+                       while ((is = ia.getNextInputSplit(hosts[i++ % 
hosts.length], 0)) != null) {
                                assertTrue(splits.remove(is));
                        }
 
                        // check we had all
                        assertTrue(splits.isEmpty());
-                       assertNull(ia.getNextInputSplit("anotherHost"));
+                       assertNull(ia.getNextInputSplit("anotherHost", 0));
 
                        assertEquals(NUM_REMOTE_SPLITS, 
ia.getNumberOfRemoteAssignments());
                        assertEquals(NUM_LOCAL_SPLITS, 
ia.getNumberOfLocalAssignments());
@@ -228,7 +228,7 @@ public class LocatableSplitAssignerTest {
                        LocatableInputSplit is = null;
                        for (int i = 0; i < NUM_SPLITS; i++) {
                                String host = requestingHosts[i % 
requestingHosts.length];
-                               is = ia.getNextInputSplit(host);
+                               is = ia.getNextInputSplit(host, 0);
                                // check valid split
                                assertTrue(is != null);
                                // check unassigned split
@@ -246,7 +246,7 @@ public class LocatableSplitAssignerTest {
                        }
                        // check we had all
                        assertTrue(splits.isEmpty());
-                       assertNull(ia.getNextInputSplit("anotherHost"));
+                       assertNull(ia.getNextInputSplit("anotherHost", 0));
 
                        assertEquals(NUM_REMOTE_SPLITS, 
ia.getNumberOfRemoteAssignments());
                        assertEquals(NUM_LOCAL_SPLITS, 
ia.getNumberOfLocalAssignments());
@@ -273,13 +273,13 @@ public class LocatableSplitAssignerTest {
                        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
                        InputSplit is = null;
                        int i = 0;
-                       while ((is = ia.getNextInputSplit(hosts[i++ % 
hosts.length])) != null) {
+                       while ((is = ia.getNextInputSplit(hosts[i++ % 
hosts.length], 0)) != null) {
                                assertTrue(splits.remove(is));
                        }
 
                        // check we had all
                        assertTrue(splits.isEmpty());
-                       assertNull(ia.getNextInputSplit("anotherHost"));
+                       assertNull(ia.getNextInputSplit("anotherHost", 0));
 
                        assertEquals(0, ia.getNumberOfRemoteAssignments());
                        assertEquals(NUM_SPLITS, 
ia.getNumberOfLocalAssignments());
@@ -319,7 +319,7 @@ public class LocatableSplitAssignerTest {
                                @Override
                                public void run() {
                                        LocatableInputSplit split;
-                                       while ((split = 
ia.getNextInputSplit(null)) != null) {
+                                       while ((split = 
ia.getNextInputSplit(null, 0)) != null) {
                                                
splitsRetrieved.incrementAndGet();
                                                
sumOfIds.addAndGet(split.getSplitNumber());
                                        }
@@ -354,7 +354,7 @@ public class LocatableSplitAssignerTest {
                        assertEquals(SUM_OF_IDS, sumOfIds.get());
                        
                        // nothing left
-                       assertNull(ia.getNextInputSplit(""));
+                       assertNull(ia.getNextInputSplit("", 0));
                        
                        assertEquals(NUM_SPLITS, 
ia.getNumberOfRemoteAssignments());
                        assertEquals(0, ia.getNumberOfLocalAssignments());
@@ -388,7 +388,7 @@ public class LocatableSplitAssignerTest {
                                @Override
                                public void run() {
                                        LocatableInputSplit split;
-                                       while ((split = 
ia.getNextInputSplit("testhost")) != null) {
+                                       while ((split = 
ia.getNextInputSplit("testhost", 0)) != null) {
                                                
splitsRetrieved.incrementAndGet();
                                                
sumOfIds.addAndGet(split.getSplitNumber());
                                        }
@@ -423,7 +423,7 @@ public class LocatableSplitAssignerTest {
                        assertEquals(SUM_OF_IDS, sumOfIds.get());
                        
                        // nothing left
-                       assertNull(ia.getNextInputSplit("testhost"));
+                       assertNull(ia.getNextInputSplit("testhost", 0));
                        
                        assertEquals(0, ia.getNumberOfRemoteAssignments());
                        assertEquals(NUM_SPLITS, 
ia.getNumberOfLocalAssignments());
@@ -461,7 +461,7 @@ public class LocatableSplitAssignerTest {
                                        final String threadHost = hosts[(int) 
(Math.random() * hosts.length)];
                                        
                                        LocatableInputSplit split;
-                                       while ((split = 
ia.getNextInputSplit(threadHost)) != null) {
+                                       while ((split = 
ia.getNextInputSplit(threadHost, 0)) != null) {
                                                
splitsRetrieved.incrementAndGet();
                                                
sumOfIds.addAndGet(split.getSplitNumber());
                                        }
@@ -496,7 +496,7 @@ public class LocatableSplitAssignerTest {
                        assertEquals(SUM_OF_IDS, sumOfIds.get());
                        
                        // nothing left
-                       assertNull(ia.getNextInputSplit("testhost"));
+                       assertNull(ia.getNextInputSplit("testhost", 0));
                        
                        // at least one fraction of hosts needs be local, no 
matter how bad the thread races
                        assertTrue(ia.getNumberOfLocalAssignments() >= 
NUM_SPLITS / hosts.length);
@@ -542,12 +542,12 @@ public class LocatableSplitAssignerTest {
                final LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
 
                for (int i = 0; i < NUM_SPLITS; i++) {
-                       LocatableInputSplit split = 
ia.getNextInputSplit(requestingHosts[rand.nextInt(requestingHosts.length)]);
+                       LocatableInputSplit split = 
ia.getNextInputSplit(requestingHosts[rand.nextInt(requestingHosts.length)], 0);
                        assertTrue(split != null);
                        assertTrue(splits.remove(split));
                }
 
                assertTrue(splits.isEmpty());
-               assertNull(ia.getNextInputSplit("testHost"));
+               assertNull(ia.getNextInputSplit("testHost", 0));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7452802b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c4bb793..f6dbab3 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -298,6 +298,7 @@ class JobManager(val configuration: Configuration)
             null
           }else{
             val slot = execution.getAssignedResource
+            val taskId = execution.getVertex.getParallelSubtaskIndex
 
             val host = if(slot != null){
               slot.getInstance().getInstanceConnectionInfo.getHostname
@@ -307,7 +308,8 @@ class JobManager(val configuration: Configuration)
 
             executionGraph.getJobVertex(vertexID) match {
               case vertex: ExecutionJobVertex => vertex.getSplitAssigner match 
{
-                case splitAssigner: InputSplitAssigner => 
splitAssigner.getNextInputSplit(host)
+                case splitAssigner: InputSplitAssigner =>
+                  splitAssigner.getNextInputSplit(host, taskId)
                 case _ =>
                   log.error("No InputSplitAssigner for vertex ID {}.", 
vertexID)
                   null

Reply via email to