[FLINK-1443 [api-breaking] Extended split assigner interface by parallel task id.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7452802b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7452802b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7452802b Branch: refs/heads/master Commit: 7452802bc25c0915b7347d4faf2d60adcfc27644 Parents: 27a479f Author: Fabian Hueske <fhue...@apache.org> Authored: Tue Feb 3 15:15:52 2015 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Feb 5 11:18:03 2015 +0100 ---------------------------------------------------------------------- .../common/io/DefaultInputSplitAssigner.java | 2 +- .../common/io/LocatableInputSplitAssigner.java | 2 +- .../flink/core/io/InputSplitAssigner.java | 6 ++- .../flink/core/io/DefaultSplitAssignerTest.java | 8 ++-- .../core/io/LocatableSplitAssignerTest.java | 40 ++++++++++---------- .../flink/runtime/jobmanager/JobManager.scala | 4 +- 6 files changed, 33 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7452802b/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java index 379fc4e..d69aa4f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java @@ -51,7 +51,7 @@ public class DefaultInputSplitAssigner implements InputSplitAssigner { @Override - public InputSplit getNextInputSplit(String host) { + public InputSplit getNextInputSplit(String host, int taskId) { InputSplit next = null; // keep the synchronized part short http://git-wip-us.apache.org/repos/asf/flink/blob/7452802b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java index 92fbdca..c038da6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java @@ -71,7 +71,7 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner { // -------------------------------------------------------------------------------------------- @Override - public LocatableInputSplit getNextInputSplit(String host) { + public LocatableInputSplit getNextInputSplit(String host, int taskId) { // for a null host, we return a remote split if (host == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/7452802b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java index b4f72e9..3999c13 100644 --- a/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java +++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplitAssigner.java @@ -28,8 +28,10 @@ public interface InputSplitAssigner { * Returns the next input split that shall be consumed. The consumer's host is passed as a parameter * to allow localized assignments. * - * @param host The address of the host to assign the split to. + * @param host The host address of split requesting task. + * @param taskId The id of the split requesting task. * @return the next input split to be consumed, or <code>null</code> if no more splits remain. */ - InputSplit getNextInputSplit(String host); + InputSplit getNextInputSplit(String host, int taskId); + } http://git-wip-us.apache.org/repos/asf/flink/blob/7452802b/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java b/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java index d7d7cc8..f92b6af 100644 --- a/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java @@ -44,12 +44,12 @@ public class DefaultSplitAssignerTest { DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits); InputSplit is = null; - while ((is = ia.getNextInputSplit("")) != null) { + while ((is = ia.getNextInputSplit("", 0)) != null) { assertTrue(splits.remove(is)); } assertTrue(splits.isEmpty()); - assertNull(ia.getNextInputSplit("")); + assertNull(ia.getNextInputSplit("", 0)); } catch (Exception e) { e.printStackTrace(); @@ -80,7 +80,7 @@ public class DefaultSplitAssignerTest { public void run() { String host = ""; GenericInputSplit split; - while ((split = (GenericInputSplit) ia.getNextInputSplit(host)) != null) { + while ((split = (GenericInputSplit) ia.getNextInputSplit(host, 0)) != null) { splitsRetrieved.incrementAndGet(); sumOfIds.addAndGet(split.getSplitNumber()); } @@ -115,7 +115,7 @@ public class DefaultSplitAssignerTest { assertEquals(SUM_OF_IDS, sumOfIds.get()); // nothing left - assertNull(ia.getNextInputSplit("")); + assertNull(ia.getNextInputSplit("", 0)); } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/flink/blob/7452802b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java index 7edad43..b30d209 100644 --- a/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java @@ -55,13 +55,13 @@ public class LocatableSplitAssignerTest { // get all available splits LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); InputSplit is = null; - while ((is = ia.getNextInputSplit(null)) != null) { + while ((is = ia.getNextInputSplit(null, 0)) != null) { assertTrue(splits.remove(is)); } // check we had all assertTrue(splits.isEmpty()); - assertNull(ia.getNextInputSplit("")); + assertNull(ia.getNextInputSplit("", 0)); assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments()); assertEquals(0, ia.getNumberOfLocalAssignments()); } @@ -85,13 +85,13 @@ public class LocatableSplitAssignerTest { // get all available splits LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); InputSplit is = null; - while ((is = ia.getNextInputSplit("testhost")) != null) { + while ((is = ia.getNextInputSplit("testhost", 0)) != null) { assertTrue(splits.remove(is)); } // check we had all assertTrue(splits.isEmpty()); - assertNull(ia.getNextInputSplit("")); + assertNull(ia.getNextInputSplit("", 0)); assertEquals(0, ia.getNumberOfRemoteAssignments()); assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments()); @@ -117,13 +117,13 @@ public class LocatableSplitAssignerTest { // get all available splits LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); InputSplit is = null; - while ((is = ia.getNextInputSplit("testhost")) != null) { + while ((is = ia.getNextInputSplit("testhost", 0)) != null) { assertTrue(splits.remove(is)); } // check we had all assertTrue(splits.isEmpty()); - assertNull(ia.getNextInputSplit("anotherHost")); + assertNull(ia.getNextInputSplit("anotherHost", 0)); assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments()); assertEquals(0, ia.getNumberOfLocalAssignments()); @@ -167,13 +167,13 @@ public class LocatableSplitAssignerTest { LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); InputSplit is = null; int i = 0; - while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) { + while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) != null) { assertTrue(splits.remove(is)); } // check we had all assertTrue(splits.isEmpty()); - assertNull(ia.getNextInputSplit("anotherHost")); + assertNull(ia.getNextInputSplit("anotherHost", 0)); assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments()); assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments()); @@ -228,7 +228,7 @@ public class LocatableSplitAssignerTest { LocatableInputSplit is = null; for (int i = 0; i < NUM_SPLITS; i++) { String host = requestingHosts[i % requestingHosts.length]; - is = ia.getNextInputSplit(host); + is = ia.getNextInputSplit(host, 0); // check valid split assertTrue(is != null); // check unassigned split @@ -246,7 +246,7 @@ public class LocatableSplitAssignerTest { } // check we had all assertTrue(splits.isEmpty()); - assertNull(ia.getNextInputSplit("anotherHost")); + assertNull(ia.getNextInputSplit("anotherHost", 0)); assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments()); assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments()); @@ -273,13 +273,13 @@ public class LocatableSplitAssignerTest { LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); InputSplit is = null; int i = 0; - while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length])) != null) { + while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) != null) { assertTrue(splits.remove(is)); } // check we had all assertTrue(splits.isEmpty()); - assertNull(ia.getNextInputSplit("anotherHost")); + assertNull(ia.getNextInputSplit("anotherHost", 0)); assertEquals(0, ia.getNumberOfRemoteAssignments()); assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments()); @@ -319,7 +319,7 @@ public class LocatableSplitAssignerTest { @Override public void run() { LocatableInputSplit split; - while ((split = ia.getNextInputSplit(null)) != null) { + while ((split = ia.getNextInputSplit(null, 0)) != null) { splitsRetrieved.incrementAndGet(); sumOfIds.addAndGet(split.getSplitNumber()); } @@ -354,7 +354,7 @@ public class LocatableSplitAssignerTest { assertEquals(SUM_OF_IDS, sumOfIds.get()); // nothing left - assertNull(ia.getNextInputSplit("")); + assertNull(ia.getNextInputSplit("", 0)); assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments()); assertEquals(0, ia.getNumberOfLocalAssignments()); @@ -388,7 +388,7 @@ public class LocatableSplitAssignerTest { @Override public void run() { LocatableInputSplit split; - while ((split = ia.getNextInputSplit("testhost")) != null) { + while ((split = ia.getNextInputSplit("testhost", 0)) != null) { splitsRetrieved.incrementAndGet(); sumOfIds.addAndGet(split.getSplitNumber()); } @@ -423,7 +423,7 @@ public class LocatableSplitAssignerTest { assertEquals(SUM_OF_IDS, sumOfIds.get()); // nothing left - assertNull(ia.getNextInputSplit("testhost")); + assertNull(ia.getNextInputSplit("testhost", 0)); assertEquals(0, ia.getNumberOfRemoteAssignments()); assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments()); @@ -461,7 +461,7 @@ public class LocatableSplitAssignerTest { final String threadHost = hosts[(int) (Math.random() * hosts.length)]; LocatableInputSplit split; - while ((split = ia.getNextInputSplit(threadHost)) != null) { + while ((split = ia.getNextInputSplit(threadHost, 0)) != null) { splitsRetrieved.incrementAndGet(); sumOfIds.addAndGet(split.getSplitNumber()); } @@ -496,7 +496,7 @@ public class LocatableSplitAssignerTest { assertEquals(SUM_OF_IDS, sumOfIds.get()); // nothing left - assertNull(ia.getNextInputSplit("testhost")); + assertNull(ia.getNextInputSplit("testhost", 0)); // at least one fraction of hosts needs be local, no matter how bad the thread races assertTrue(ia.getNumberOfLocalAssignments() >= NUM_SPLITS / hosts.length); @@ -542,12 +542,12 @@ public class LocatableSplitAssignerTest { final LocatableInputSplitAssigner ia = new LocatableInputSplitAssigner(splits); for (int i = 0; i < NUM_SPLITS; i++) { - LocatableInputSplit split = ia.getNextInputSplit(requestingHosts[rand.nextInt(requestingHosts.length)]); + LocatableInputSplit split = ia.getNextInputSplit(requestingHosts[rand.nextInt(requestingHosts.length)], 0); assertTrue(split != null); assertTrue(splits.remove(split)); } assertTrue(splits.isEmpty()); - assertNull(ia.getNextInputSplit("testHost")); + assertNull(ia.getNextInputSplit("testHost", 0)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7452802b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index c4bb793..f6dbab3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -298,6 +298,7 @@ class JobManager(val configuration: Configuration) null }else{ val slot = execution.getAssignedResource + val taskId = execution.getVertex.getParallelSubtaskIndex val host = if(slot != null){ slot.getInstance().getInstanceConnectionInfo.getHostname @@ -307,7 +308,8 @@ class JobManager(val configuration: Configuration) executionGraph.getJobVertex(vertexID) match { case vertex: ExecutionJobVertex => vertex.getSplitAssigner match { - case splitAssigner: InputSplitAssigner => splitAssigner.getNextInputSplit(host) + case splitAssigner: InputSplitAssigner => + splitAssigner.getNextInputSplit(host, taskId) case _ => log.error("No InputSplitAssigner for vertex ID {}.", vertexID) null