Repository: flink
Updated Branches:
  refs/heads/master 84b39dcb5 -> 91675296e


[FLINK-2353] Respect JobConfigurable interface in Hadoop mapred wrappers

This closes #908


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91675296
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91675296
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91675296

Branch: refs/heads/master
Commit: 91675296e1d0fcc503ab3af9c5da7fdb83b78fc5
Parents: 84b39dc
Author: Fabian Hueske <fhue...@apache.org>
Authored: Tue Jul 14 00:59:20 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue Jul 14 01:00:24 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/hadoop/mapred/HadoopInputFormatBase.java | 9 ++++++++-
 .../api/java/hadoop/mapred/HadoopOutputFormatBase.java      | 7 ++++++-
 .../api/java/hadoop/mapred/wrapper/HadoopInputSplit.java    | 4 ++++
 3 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/91675296/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
index 40f6631..d5dbf38 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
@@ -82,7 +83,13 @@ public abstract class HadoopInputFormatBase<K, V, T> 
implements InputFormat<T, H
        
        @Override
        public void configure(Configuration parameters) {
-               // nothing to do
+               // configure MR InputFormat if necessary
+               if(this.mapredInputFormat instanceof Configurable) {
+                       
((Configurable)this.mapredInputFormat).setConf(this.jobConf);
+               }
+               else if(this.mapredInputFormat instanceof JobConfigurable) {
+                       
((JobConfigurable)this.mapredInputFormat).configure(this.jobConf);
+               }
        }
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/91675296/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
index a6a318c..d6dfc2e 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RecordWriter;
@@ -67,9 +68,13 @@ public abstract class HadoopOutputFormatBase<K, V, T> 
implements OutputFormat<T>
 
        @Override
        public void configure(Configuration parameters) {
-               if(this.mapredOutputFormat instanceof Configurable){
+               // configure MR OutputFormat if necessary
+               if(this.mapredOutputFormat instanceof Configurable) {
                        
((Configurable)this.mapredOutputFormat).setConf(this.jobConf);
                }
+               else if(this.mapredOutputFormat instanceof JobConfigurable) {
+                       
((JobConfigurable)this.mapredOutputFormat).configure(this.jobConf);
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/91675296/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
index dee5452..d949dfd 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
 
 /**
  * A wrapper that represents an input split from the Hadoop mapred API as
@@ -113,6 +114,9 @@ public class HadoopInputSplit extends LocatableInputSplit {
                if (hadoopInputSplit instanceof Configurable) {
                        ((Configurable) hadoopInputSplit).setConf(this.jobConf);
                }
+               else if (hadoopInputSplit instanceof JobConfigurable) {
+                       ((JobConfigurable) 
hadoopInputSplit).configure(this.jobConf);
+               }
                hadoopInputSplit.readFields(in);
        }
 }

Reply via email to