Repository: flink Updated Branches: refs/heads/master 84b39dcb5 -> 91675296e
[FLINK-2353] Respect JobConfigurable interface in Hadoop mapred wrappers This closes #908 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91675296 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91675296 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91675296 Branch: refs/heads/master Commit: 91675296e1d0fcc503ab3af9c5da7fdb83b78fc5 Parents: 84b39dc Author: Fabian Hueske <fhue...@apache.org> Authored: Tue Jul 14 00:59:20 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Jul 14 01:00:24 2015 +0200 ---------------------------------------------------------------------- .../flink/api/java/hadoop/mapred/HadoopInputFormatBase.java | 9 ++++++++- .../api/java/hadoop/mapred/HadoopOutputFormatBase.java | 7 ++++++- .../api/java/hadoop/mapred/wrapper/HadoopInputSplit.java | 4 ++++ 3 files changed, 18 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/91675296/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java index 40f6631..d5dbf38 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java @@ -34,6 +34,7 @@ import org.apache.flink.core.io.InputSplitAssigner; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; @@ -82,7 +83,13 @@ public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, H @Override public void configure(Configuration parameters) { - // nothing to do + // configure MR InputFormat if necessary + if(this.mapredInputFormat instanceof Configurable) { + ((Configurable)this.mapredInputFormat).setConf(this.jobConf); + } + else if(this.mapredInputFormat instanceof JobConfigurable) { + ((JobConfigurable)this.mapredInputFormat).configure(this.jobConf); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/91675296/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java index a6a318c..d6dfc2e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RecordWriter; @@ -67,9 +68,13 @@ public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T> @Override public void configure(Configuration parameters) { - if(this.mapredOutputFormat instanceof Configurable){ + // configure MR OutputFormat if necessary + if(this.mapredOutputFormat instanceof Configurable) { ((Configurable)this.mapredOutputFormat).setConf(this.jobConf); } + else if(this.mapredOutputFormat instanceof JobConfigurable) { + ((JobConfigurable)this.mapredOutputFormat).configure(this.jobConf); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/91675296/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java index dee5452..d949dfd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java @@ -28,6 +28,7 @@ import org.apache.flink.core.io.LocatableInputSplit; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.io.WritableFactories; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; /** * A wrapper that represents an input split from the Hadoop mapred API as @@ -113,6 +114,9 @@ public class HadoopInputSplit extends LocatableInputSplit { if (hadoopInputSplit instanceof Configurable) { ((Configurable) hadoopInputSplit).setConf(this.jobConf); } + else if (hadoopInputSplit instanceof JobConfigurable) { + ((JobConfigurable) hadoopInputSplit).configure(this.jobConf); + } hadoopInputSplit.readFields(in); } }