[FLINK-1236] [hadoop compatibility] Add local split assignment support for HadoopInputFormats
This closes #267 Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/92ceacd2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/92ceacd2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/92ceacd2 Branch: refs/heads/master Commit: 92ceacd23749fa4113a2f778ff68fa5881313b25 Parents: 93eadca Author: Fabian Hueske <[email protected]> Authored: Mon Dec 15 15:18:27 2014 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Jan 6 13:13:42 2015 +0100 ---------------------------------------------------------------------- .../mapred/HadoopInputFormat.java | 6 ++--- .../mapred/record/HadoopRecordInputFormat.java | 2 +- .../mapred/wrapper/HadoopInputSplit.java | 25 +++++++++++--------- .../mapreduce/HadoopInputFormat.java | 6 ++--- .../mapreduce/wrapper/HadoopInputSplit.java | 22 +++++++++++------ 5 files changed, 36 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/92ceacd2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java index 8dfda67..d116cdc 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java @@ -24,9 +24,9 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; import org.apache.flink.api.common.io.statistics.BaseStatistics; @@ -143,14 +143,14 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits); HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length]; for(int i=0;i<splitArray.length;i++){ - hiSplit[i] = new HadoopInputSplit(splitArray[i], jobConf); + hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf); } return hiSplit; } @Override public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { - return new DefaultInputSplitAssigner(inputSplits); + return new LocatableInputSplitAssigner(inputSplits); } @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/92ceacd2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java index f8153a2..275fd4c 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java @@ -80,7 +80,7 @@ public class HadoopRecordInputFormat<K, V> implements InputFormat<Record, Hadoop org.apache.hadoop.mapred.InputSplit[] splitArray = hadoopInputFormat.getSplits(jobConf, minNumSplits); HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length]; for(int i=0;i<splitArray.length;i++){ - hiSplit[i] = new HadoopInputSplit(splitArray[i], jobConf); + hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf); } return hiSplit; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/92ceacd2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java index 77c40f5..aa2155d 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java @@ -23,7 +23,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.LocatableInputSplit; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.hadoop.conf.Configurable; @@ -31,7 +31,7 @@ import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.mapred.JobConf; -public class HadoopInputSplit implements InputSplit { +public class HadoopInputSplit extends LocatableInputSplit { private static final long serialVersionUID = 1L; @@ -52,10 +52,13 @@ public class HadoopInputSplit implements InputSplit { super(); } - public HadoopInputSplit(org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) { + public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) { + + this.splitNumber = splitNumber; this.hadoopInputSplit = hInputSplit; this.hadoopInputSplitTypeName = hInputSplit.getClass().getName(); this.jobConf = jobconf; + } @Override @@ -68,7 +71,7 @@ public class HadoopInputSplit implements InputSplit { @Override public void read(DataInputView in) throws IOException { - this.splitNumber=in.readInt(); + this.splitNumber = in.readInt(); this.hadoopInputSplitTypeName = in.readUTF(); if(hadoopInputSplit == null) { try { @@ -124,12 +127,12 @@ public class HadoopInputSplit implements InputSplit { return this.splitNumber; } - public void setSplitNumber(int splitNumber) { - this.splitNumber = splitNumber; - } - - public void setHadoopInputSplit( - org.apache.hadoop.mapred.InputSplit hadoopInputSplit) { - this.hadoopInputSplit = hadoopInputSplit; + @Override + public String[] getHostnames() { + try { + return this.hadoopInputSplit.getLocations(); + } catch(IOException ioe) { + return new String[0]; + } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/92ceacd2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java index 280aaf9..23e8aae 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java @@ -25,9 +25,9 @@ import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; import org.apache.flink.api.common.io.statistics.BaseStatistics; @@ -162,14 +162,14 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()]; for(int i = 0; i < hadoopInputSplits.length; i++){ - hadoopInputSplits[i] = new HadoopInputSplit(splits.get(i), jobContext); + hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext); } return hadoopInputSplits; } @Override public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { - return new DefaultInputSplitAssigner(inputSplits); + return new LocatableInputSplitAssigner(inputSplits); } @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/92ceacd2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java index 5afd89b..7477c28 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java @@ -23,7 +23,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.LocatableInputSplit; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.hadoop.io.Writable; @@ -31,12 +31,12 @@ import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.mapreduce.JobContext; -public class HadoopInputSplit implements InputSplit { +public class HadoopInputSplit extends LocatableInputSplit { public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit; public transient JobContext jobContext; - private int splitNumber; + private int splitNumber; public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() { return mapreduceInputSplit; @@ -48,7 +48,8 @@ public class HadoopInputSplit implements InputSplit { } - public HadoopInputSplit(org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) { + public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) { + this.splitNumber = splitNumber; if(!(mapreduceInputSplit instanceof Writable)) { throw new IllegalArgumentException("InputSplit must implement Writable interface."); } @@ -66,7 +67,7 @@ public class HadoopInputSplit implements InputSplit { @Override public void read(DataInputView in) throws IOException { - this.splitNumber=in.readInt(); + this.splitNumber = in.readInt(); String className = in.readUTF(); if(this.mapreduceInputSplit == null) { @@ -111,7 +112,14 @@ public class HadoopInputSplit implements InputSplit { return this.splitNumber; } - public void setSplitNumber(int splitNumber) { - this.splitNumber = splitNumber; + @Override + public String[] getHostnames() { + try { + return this.mapreduceInputSplit.getLocations(); + } catch (IOException e) { + return new String[0]; + } catch (InterruptedException e) { + return new String[0]; + } } }
