[FLINK-1236] [hadoop compatibility] Add local split assignment support for 
HadoopInputFormats

This closes #267


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/92ceacd2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/92ceacd2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/92ceacd2

Branch: refs/heads/master
Commit: 92ceacd23749fa4113a2f778ff68fa5881313b25
Parents: 93eadca
Author: Fabian Hueske <[email protected]>
Authored: Mon Dec 15 15:18:27 2014 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Jan 6 13:13:42 2015 +0100

----------------------------------------------------------------------
 .../mapred/HadoopInputFormat.java               |  6 ++---
 .../mapred/record/HadoopRecordInputFormat.java  |  2 +-
 .../mapred/wrapper/HadoopInputSplit.java        | 25 +++++++++++---------
 .../mapreduce/HadoopInputFormat.java            |  6 ++---
 .../mapreduce/wrapper/HadoopInputSplit.java     | 22 +++++++++++------
 5 files changed, 36 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/92ceacd2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
index 8dfda67..d116cdc 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
@@ -24,9 +24,9 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
@@ -143,14 +143,14 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
                org.apache.hadoop.mapred.InputSplit[] splitArray = 
mapredInputFormat.getSplits(jobConf, minNumSplits);
                HadoopInputSplit[] hiSplit = new 
HadoopInputSplit[splitArray.length];
                for(int i=0;i<splitArray.length;i++){
-                       hiSplit[i] = new HadoopInputSplit(splitArray[i], 
jobConf);
+                       hiSplit[i] = new HadoopInputSplit(i, splitArray[i], 
jobConf);
                }
                return hiSplit;
        }
        
        @Override
        public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] 
inputSplits) {
-               return new DefaultInputSplitAssigner(inputSplits);
+               return new LocatableInputSplitAssigner(inputSplits);
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/92ceacd2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
index f8153a2..275fd4c 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
@@ -80,7 +80,7 @@ public class HadoopRecordInputFormat<K, V> implements 
InputFormat<Record, Hadoop
                org.apache.hadoop.mapred.InputSplit[] splitArray = 
hadoopInputFormat.getSplits(jobConf, minNumSplits);
                HadoopInputSplit[] hiSplit = new 
HadoopInputSplit[splitArray.length];
                for(int i=0;i<splitArray.length;i++){
-                       hiSplit[i] = new HadoopInputSplit(splitArray[i], 
jobConf);
+                       hiSplit[i] = new HadoopInputSplit(i, splitArray[i], 
jobConf);
                }
                return hiSplit;
        }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/92ceacd2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
index 77c40f5..aa2155d 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
-import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.hadoop.conf.Configurable;
@@ -31,7 +31,7 @@ import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.mapred.JobConf;
 
 
-public class HadoopInputSplit implements InputSplit {
+public class HadoopInputSplit extends LocatableInputSplit {
 
        private static final long serialVersionUID = 1L;
 
@@ -52,10 +52,13 @@ public class HadoopInputSplit implements InputSplit {
                super();
        }
 
-       public HadoopInputSplit(org.apache.hadoop.mapred.InputSplit 
hInputSplit, JobConf jobconf) {
+       public HadoopInputSplit(int splitNumber, 
org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) {
+
+               this.splitNumber = splitNumber;
                this.hadoopInputSplit = hInputSplit;
                this.hadoopInputSplitTypeName = 
hInputSplit.getClass().getName();
                this.jobConf = jobconf;
+
        }
 
        @Override
@@ -68,7 +71,7 @@ public class HadoopInputSplit implements InputSplit {
 
        @Override
        public void read(DataInputView in) throws IOException {
-               this.splitNumber=in.readInt();
+               this.splitNumber = in.readInt();
                this.hadoopInputSplitTypeName = in.readUTF();
                if(hadoopInputSplit == null) {
                        try {
@@ -124,12 +127,12 @@ public class HadoopInputSplit implements InputSplit {
                return this.splitNumber;
        }
 
-       public void setSplitNumber(int splitNumber) {
-               this.splitNumber = splitNumber;
-       }
-
-       public void setHadoopInputSplit(
-                       org.apache.hadoop.mapred.InputSplit hadoopInputSplit) {
-               this.hadoopInputSplit = hadoopInputSplit;
+       @Override
+       public String[] getHostnames() {
+               try {
+                       return this.hadoopInputSplit.getLocations();
+               } catch(IOException ioe) {
+                       return new String[0];
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/92ceacd2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
index 280aaf9..23e8aae 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
@@ -25,9 +25,9 @@ import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
@@ -162,14 +162,14 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
                HadoopInputSplit[] hadoopInputSplits = new 
HadoopInputSplit[splits.size()];
                
                for(int i = 0; i < hadoopInputSplits.length; i++){
-                       hadoopInputSplits[i] = new 
HadoopInputSplit(splits.get(i), jobContext);
+                       hadoopInputSplits[i] = new HadoopInputSplit(i, 
splits.get(i), jobContext);
                }
                return hadoopInputSplits;
        }
        
        @Override
        public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] 
inputSplits) {
-               return new DefaultInputSplitAssigner(inputSplits);
+               return new LocatableInputSplitAssigner(inputSplits);
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/92ceacd2/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
index 5afd89b..7477c28 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
-import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.hadoop.io.Writable;
@@ -31,12 +31,12 @@ import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.mapreduce.JobContext;
 
 
-public class HadoopInputSplit implements InputSplit {
+public class HadoopInputSplit extends LocatableInputSplit {
        
        public transient org.apache.hadoop.mapreduce.InputSplit 
mapreduceInputSplit;
        public transient JobContext jobContext;
        
-       private int splitNumber;        
+       private int splitNumber;
        
        public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
                return mapreduceInputSplit;
@@ -48,7 +48,8 @@ public class HadoopInputSplit implements InputSplit {
        }
        
        
-       public HadoopInputSplit(org.apache.hadoop.mapreduce.InputSplit 
mapreduceInputSplit, JobContext jobContext) {
+       public HadoopInputSplit(int splitNumber, 
org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext 
jobContext) {
+               this.splitNumber = splitNumber;
                if(!(mapreduceInputSplit instanceof Writable)) {
                        throw new IllegalArgumentException("InputSplit must 
implement Writable interface.");
                }
@@ -66,7 +67,7 @@ public class HadoopInputSplit implements InputSplit {
        
        @Override
        public void read(DataInputView in) throws IOException {
-               this.splitNumber=in.readInt();
+               this.splitNumber = in.readInt();
                String className = in.readUTF();
                
                if(this.mapreduceInputSplit == null) {
@@ -111,7 +112,14 @@ public class HadoopInputSplit implements InputSplit {
                return this.splitNumber;
        }
        
-       public void setSplitNumber(int splitNumber) {
-               this.splitNumber = splitNumber;
+       @Override
+       public String[] getHostnames() {
+               try {
+                       return this.mapreduceInputSplit.getLocations();
+               } catch (IOException e) {
+                       return new String[0];
+               } catch (InterruptedException e) {
+                       return new String[0];
+               }
        }
 }

Reply via email to