Repository: flink
Updated Branches:
  refs/heads/master d62ab4753 -> ba7a19c10


[FLINK-1271] [hadoop] Remove Writable limitation from Hadoop format and 
function wrappers

This closes #287


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba7a19c1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba7a19c1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba7a19c1

Branch: refs/heads/master
Commit: ba7a19c10df9f2abb5fe9828e57f46a49cbcfd18
Parents: d62ab47
Author: FelixNeutatz <neut...@googlemail.com>
Authored: Tue Jan 6 20:47:00 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Jan 15 17:34:22 2015 +0100

----------------------------------------------------------------------
 .../mapred/HadoopInputFormat.java               |   7 +-
 .../mapred/HadoopMapFunction.java               |  10 +-
 .../mapred/HadoopOutputFormat.java              |   3 +-
 .../mapred/HadoopReduceCombineFunction.java     |  12 +--
 .../mapred/HadoopReduceFunction.java            |  12 +--
 .../mapred/wrapper/HadoopOutputCollector.java   |   6 +-
 .../wrapper/HadoopTupleUnwrappingIterator.java  |  12 +--
 .../mapreduce/HadoopInputFormat.java            | 107 +++++++++----------
 .../mapreduce/HadoopOutputFormat.java           |   3 +-
 .../mapred/HadoopInputFormatTest.java           |  82 ++++++++++++++
 .../mapreduce/HadoopInputFormatTest.java        |  84 +++++++++++++++
 11 files changed, 242 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
index d116cdc..326a1c4 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
@@ -25,6 +25,7 @@ import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 
 import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.InputFormat;
@@ -34,7 +35,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
@@ -44,13 +44,12 @@ import 
org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
 import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.util.ReflectionUtils;
 
-public class HadoopInputFormat<K extends Writable, V extends Writable> 
implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, 
ResultTypeQueryable<Tuple2<K,V>> {
+public class HadoopInputFormat<K, V> implements InputFormat<Tuple2<K,V>, 
HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
        
        private static final long serialVersionUID = 1L;
        
@@ -293,6 +292,6 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
        
        @Override
        public TypeInformation<Tuple2<K,V>> getProducedType() {
-               return new TupleTypeInfo<Tuple2<K,V>>(new 
WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) 
valueClass));
+               return new 
TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), 
TypeExtractor.createTypeInfo(valueClass));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
index 9bc36f3..dfe0067 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
@@ -29,14 +29,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
 import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.Reporter;
@@ -45,8 +42,7 @@ import org.apache.hadoop.mapred.Reporter;
  * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. 
  */
 @SuppressWarnings("rawtypes")
-public final class HadoopMapFunction<KEYIN extends WritableComparable, VALUEIN 
extends Writable, 
-                                                                               
KEYOUT extends WritableComparable, VALUEOUT extends Writable> 
+public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
                                        extends 
RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> 
                                        implements 
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
 
@@ -108,8 +104,8 @@ public final class HadoopMapFunction<KEYIN extends 
WritableComparable, VALUEIN e
                Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
                Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, 
mapper.getClass(), 3);
                
-               final WritableTypeInfo<KEYOUT> keyTypeInfo = new 
WritableTypeInfo<KEYOUT>(outKeyClass);
-               final WritableTypeInfo<VALUEOUT> valueTypleInfo = new 
WritableTypeInfo<VALUEOUT>(outValClass);
+               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
                return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, 
valueTypleInfo);
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
index 64c539b..f3abfcd 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
@@ -30,7 +30,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
 import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
@@ -41,7 +40,7 @@ import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.ReflectionUtils;
 
 
-public class HadoopOutputFormat<K extends Writable,V extends Writable> 
implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster {
+public class HadoopOutputFormat<K,V> implements OutputFormat<Tuple2<K,V>>, 
FinalizeOnMaster {
        
        private static final long serialVersionUID = 1L;
        

http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
index 5d83bad..aa9f048 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
@@ -29,15 +29,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
 import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
 import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
@@ -47,8 +44,7 @@ import org.apache.hadoop.mapred.Reporter;
  */
 @SuppressWarnings("rawtypes")
 @org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
-public final class HadoopReduceCombineFunction<KEYIN extends 
WritableComparable, VALUEIN extends Writable,
-                                                                               
                KEYOUT extends WritableComparable, VALUEOUT extends Writable> 
+public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> 
                                        extends 
RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
                                        implements 
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
 
@@ -132,9 +128,9 @@ public final class HadoopReduceCombineFunction<KEYIN 
extends WritableComparable,
        public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
                Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
                Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, 
reducer.getClass(), 3);
-               
-               final WritableTypeInfo<KEYOUT> keyTypeInfo = new 
WritableTypeInfo<KEYOUT>(outKeyClass);
-               final WritableTypeInfo<VALUEOUT> valueTypleInfo = new 
WritableTypeInfo<VALUEOUT>(outValClass);
+
+               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
                return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, 
valueTypleInfo);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
index 1f0aedd..d9797c3 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java
@@ -29,15 +29,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
 import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
 import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
@@ -46,8 +43,7 @@ import org.apache.hadoop.mapred.Reporter;
  * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink 
GroupReduceFunction. 
  */
 @SuppressWarnings("rawtypes")
-public final class HadoopReduceFunction<KEYIN extends WritableComparable, 
VALUEIN extends Writable,
-                                                                               
KEYOUT extends WritableComparable, VALUEOUT extends Writable> 
+public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
                                        extends 
RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
                                        implements 
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
 
@@ -113,9 +109,9 @@ public final class HadoopReduceFunction<KEYIN extends 
WritableComparable, VALUEI
        public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
                Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
                Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, 
reducer.getClass(), 3);
-               
-               final WritableTypeInfo<KEYOUT> keyTypeInfo = new 
WritableTypeInfo<KEYOUT>(outKeyClass);
-               final WritableTypeInfo<VALUEOUT> valueTypleInfo = new 
WritableTypeInfo<VALUEOUT>(outValClass);
+
+               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
+               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
                return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, 
valueTypleInfo);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
index 280708f..fcb6841 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java
@@ -20,8 +20,6 @@ package org.apache.flink.hadoopcompatibility.mapred.wrapper;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.OutputCollector;
 
 import java.io.IOException;
@@ -32,7 +30,7 @@ import java.io.IOException;
  * 
  */
 @SuppressWarnings("rawtypes")
-public final class HadoopOutputCollector<KEY extends WritableComparable, VALUE 
extends Writable>
+public final class HadoopOutputCollector<KEY,VALUE>
                implements OutputCollector<KEY,VALUE> {
 
        private Collector<Tuple2<KEY,VALUE>> flinkCollector;
@@ -63,4 +61,4 @@ public final class HadoopOutputCollector<KEY extends 
WritableComparable, VALUE e
                this.flinkCollector.collect(outTuple);
        }
        
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
index 83afe39..5ecac2e 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java
@@ -20,32 +20,30 @@ package org.apache.flink.hadoopcompatibility.mapred.wrapper;
 
 import java.util.Iterator;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.WritableSerializer;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 /**
  * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the 
second (value) field.
  */
 @SuppressWarnings("rawtypes")
-public class HadoopTupleUnwrappingIterator<KEY extends WritableComparable, 
VALUE extends Writable> 
+public class HadoopTupleUnwrappingIterator<KEY,VALUE> 
                                                                        extends 
TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {
 
        private static final long serialVersionUID = 1L;
        
        private Iterator<Tuple2<KEY,VALUE>> iterator;
        
-       private final WritableSerializer<KEY> keySerializer;
+       private final TypeSerializer<KEY> keySerializer;
        
        private boolean atFirst = false;
        private KEY curKey = null;
        private VALUE firstValue = null;
        
        public HadoopTupleUnwrappingIterator(Class<KEY> keyClass) {
-               this.keySerializer = new WritableSerializer<KEY>(keyClass);
+               this.keySerializer = TypeExtractor.getForClass((Class<KEY>) 
keyClass).createSerializer();
        }
        
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
index 23e8aae..20006b8 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java
@@ -35,7 +35,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
@@ -43,7 +42,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
 import org.apache.flink.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -51,26 +49,27 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+public class HadoopInputFormat<K, V> implements InputFormat<Tuple2<K,V>, 
HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
 
-public class HadoopInputFormat<K extends Writable, V extends Writable> 
implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, 
ResultTypeQueryable<Tuple2<K,V>> {
-       
        private static final long serialVersionUID = 1L;
-       
+
        private static final Logger LOG = 
LoggerFactory.getLogger(HadoopInputFormat.class);
-       
+
        private org.apache.hadoop.mapreduce.InputFormat<K, V> 
mapreduceInputFormat;
        private Class<K> keyClass;
        private Class<V> valueClass;
        private org.apache.hadoop.conf.Configuration configuration;
-       
+
        private transient RecordReader<K, V> recordReader;
        private boolean fetched = false;
        private boolean hasNext;
-       
+
        public HadoopInputFormat() {
                super();
        }
-       
+
        public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> 
mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
                super();
                this.mapreduceInputFormat = mapreduceInputFormat;
@@ -79,46 +78,46 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
                this.configuration = job.getConfiguration();
                HadoopUtils.mergeHadoopConf(configuration);
        }
-       
+
        public void setConfiguration(org.apache.hadoop.conf.Configuration 
configuration) {
                this.configuration = configuration;
        }
-       
+
        public org.apache.hadoop.mapreduce.InputFormat<K,V> 
getHadoopInputFormat() {
                return this.mapreduceInputFormat;
        }
-       
+
        public void 
setHadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> 
mapreduceInputFormat) {
                this.mapreduceInputFormat = mapreduceInputFormat;
        }
-       
+
        public org.apache.hadoop.conf.Configuration getConfiguration() {
                return this.configuration;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  InputFormat
        // 
--------------------------------------------------------------------------------------------
-       
+
        @Override
        public void configure(Configuration parameters) {
                // nothing to do
        }
-       
+
        @Override
        public BaseStatistics getStatistics(BaseStatistics cachedStats) throws 
IOException {
                // only gather base statistics for FileInputFormats
                if(!(mapreduceInputFormat instanceof FileInputFormat)) {
                        return null;
                }
-               
+
                JobContext jobContext = null;
                try {
                        jobContext = 
HadoopUtils.instantiateJobContext(configuration, null);
                } catch (Exception e) {
                        throw new RuntimeException(e);
                }
-               
+
                final FileBaseStatistics cachedFileStats = (cachedStats != null 
&& cachedStats instanceof FileBaseStatistics) ?
                                (FileBaseStatistics) cachedStats : null;
                                
@@ -127,7 +126,7 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
                                        return getFileStats(cachedFileStats, 
paths, new ArrayList<FileStatus>(1));
                                } catch (IOException ioex) {
                                        if (LOG.isWarnEnabled()) {
-                                               LOG.warn("Could not determine 
statistics due to an io error: "
+                                               LOG.warn("Could not determine 
statistics due to an io error: " 
                                                                + 
ioex.getMessage());
                                        }
                                } catch (Throwable t) {
@@ -140,19 +139,19 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
                                // no statistics available
                                return null;
        }
-       
+
        @Override
        public HadoopInputSplit[] createInputSplits(int minNumSplits)
                        throws IOException {
                
configuration.setInt("mapreduce.input.fileinputformat.split.minsize", 
minNumSplits);
-               
+
                JobContext jobContext = null;
                try {
                        jobContext = 
HadoopUtils.instantiateJobContext(configuration, new JobID());
                } catch (Exception e) {
                        throw new RuntimeException(e);
                }
-               
+
                List<org.apache.hadoop.mapreduce.InputSplit> splits;
                try {
                        splits = 
this.mapreduceInputFormat.getSplits(jobContext);
@@ -160,18 +159,18 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
                        throw new IOException("Could not get Splits.", e);
                }
                HadoopInputSplit[] hadoopInputSplits = new 
HadoopInputSplit[splits.size()];
-               
+
                for(int i = 0; i < hadoopInputSplits.length; i++){
                        hadoopInputSplits[i] = new HadoopInputSplit(i, 
splits.get(i), jobContext);
                }
                return hadoopInputSplits;
        }
-       
+
        @Override
        public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] 
inputSplits) {
                return new LocatableInputSplitAssigner(inputSplits);
        }
-       
+
        @Override
        public void open(HadoopInputSplit split) throws IOException {
                TaskAttemptContext context = null;
@@ -180,7 +179,7 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
                } catch(Exception e) {
                        throw new RuntimeException(e);
                }
-               
+
                try {
                        this.recordReader = this.mapreduceInputFormat
                                        
.createRecordReader(split.getHadoopInputSplit(), context);
@@ -191,7 +190,7 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
                        this.fetched = false;
                }
        }
-       
+
        @Override
        public boolean reachedEnd() throws IOException {
                if(!this.fetched) {
@@ -199,7 +198,7 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
                }
                return !this.hasNext;
        }
-       
+
        private void fetchNext() throws IOException {
                try {
                        this.hasNext = this.recordReader.nextKeyValue();
@@ -209,7 +208,7 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
                        this.fetched = true;
                }
        }
-       
+
        @Override
        public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
                if(!this.fetched) {
@@ -225,38 +224,38 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
                        throw new IOException("Could not get KeyValue pair.", 
e);
                }
                this.fetched = false;
-               
+
                return record;
        }
-       
+
        @Override
        public void close() throws IOException {
                this.recordReader.close();
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  Helper methods
        // 
--------------------------------------------------------------------------------------------
-       
-       private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, 
org.apache.hadoop.fs.Path[] hadoopFilePaths,
-                       ArrayList<FileStatus> files) throws IOException {
-               
+
+       private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, 
org.apache.hadoop.fs.Path[] hadoopFilePaths, 
+                                                                               
        ArrayList<FileStatus> files) throws IOException {
+
                long latestModTime = 0L;
-               
+
                // get the file info and check whether the cached statistics 
are still valid.
                for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
-                       
+
                        final Path filePath = new Path(hadoopPath.toUri());
                        final FileSystem fs = FileSystem.get(filePath.toUri());
-                       
+
                        final FileStatus file = fs.getFileStatus(filePath);
                        latestModTime = Math.max(latestModTime, 
file.getModificationTime());
-                       
+
                        // enumerate all files and check their modification 
time stamp.
                        if (file.isDir()) {
                                FileStatus[] fss = fs.listStatus(filePath);
                                files.ensureCapacity(files.size() + fss.length);
-                               
+
                                for (FileStatus s : fss) {
                                        if (!s.isDir()) {
                                                files.add(s);
@@ -267,50 +266,50 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
                                files.add(file);
                        }
                }
-               
+
                // check whether the cached statistics are still valid, if we 
have any
                if (cachedStats != null && latestModTime <= 
cachedStats.getLastModificationTime()) {
                        return cachedStats;
                }
-               
+
                // calculate the whole length
                long len = 0;
                for (FileStatus s : files) {
                        len += s.getLen();
                }
-               
+
                // sanity check
                if (len <= 0) {
                        len = BaseStatistics.SIZE_UNKNOWN;
                }
-               
+
                return new FileBaseStatistics(latestModTime, len, 
BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  Custom serialization methods
        // 
--------------------------------------------------------------------------------------------
-       
+
        private void writeObject(ObjectOutputStream out) throws IOException {
                out.writeUTF(this.mapreduceInputFormat.getClass().getName());
                out.writeUTF(this.keyClass.getName());
                out.writeUTF(this.valueClass.getName());
                this.configuration.write(out);
        }
-       
+
        @SuppressWarnings("unchecked")
        private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
                String hadoopInputFormatClassName = in.readUTF();
                String keyClassName = in.readUTF();
                String valueClassName = in.readUTF();
-               
+
                org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
                configuration.readFields(in);
-               
+
                if(this.configuration == null) {
                        this.configuration = configuration;
                }
-               
+
                try {
                        this.mapreduceInputFormat = 
(org.apache.hadoop.mapreduce.InputFormat<K,V>) 
Class.forName(hadoopInputFormatClassName, true, 
Thread.currentThread().getContextClassLoader()).newInstance();
                } catch (Exception e) {
@@ -327,13 +326,13 @@ public class HadoopInputFormat<K extends Writable, V 
extends Writable> implement
                        throw new RuntimeException("Unable to find value 
class.", e);
                }
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        //  ResultTypeQueryable
        // 
--------------------------------------------------------------------------------------------
-       
+
        @Override
        public TypeInformation<Tuple2<K,V>> getProducedType() {
-               return new TupleTypeInfo<Tuple2<K,V>>(new 
WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) 
valueClass));
+               return new 
TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), 
TypeExtractor.createTypeInfo(valueClass));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
index 402372c..696e1be 100644
--- 
a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ 
b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
@@ -30,7 +30,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
@@ -40,7 +39,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 
 
-public class HadoopOutputFormat<K extends Writable,V extends Writable> 
implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster {
+public class HadoopOutputFormat<K,V> implements OutputFormat<Tuple2<K,V>>, 
FinalizeOnMaster {
        
        private static final long serialVersionUID = 1L;
        

http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
 
b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
new file mode 100644
index 0000000..00fd1f9
--- /dev/null
+++ 
b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.hadoopcompatibility.mapred;
+
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+
+public class HadoopInputFormatTest {
+
+
+       public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, 
T> {
+
+               public DummyVoidKeyInputFormat() {
+               }
+
+               @Override
+               public org.apache.hadoop.mapred.RecordReader<Void, T> 
getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf 
jobConf, Reporter reporter) throws IOException {
+                       return null;
+               }
+       }
+       
+       
+       @Test
+       public void checkTypeInformation() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       // Set up the Hadoop Input Format
+                       Job job = Job.getInstance();
+                       HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, 
Long.class, new JobConf());
+
+                       TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
+                       TypeInformation<Tuple2<Void,Long>> testTupleType = new 
TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO);
+                       
+                       if(tupleType.isTupleType()) {
+                               
if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
+                                       fail("Tuple type information was not 
set correctly!");
+                               }
+                       } else {
+                               fail("Type information was not set to tuple 
type information!");
+                       }
+
+               }
+               catch (Exception ex) {
+                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
+               }
+
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
 
b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
new file mode 100644
index 0000000..d79afaa
--- /dev/null
+++ 
b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.test.hadoopcompatibility.mapreduce;
+
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.fail;
+
+
+
+public class HadoopInputFormatTest {
+
+
+       public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, 
T> {
+
+               public DummyVoidKeyInputFormat() {
+               }
+
+               @Override
+               public RecordReader<Void, T> createRecordReader(InputSplit 
inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, 
InterruptedException {
+                       return null;
+               }
+       }
+       
+       
+       @Test
+       public void checkTypeInformation() {
+               try {
+                       final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+                       // Set up the Hadoop Input Format
+                       Job job = Job.getInstance();
+                       HadoopInputFormat<Void, Long> hadoopInputFormat = new 
HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, 
Long.class, job);
+
+                       TypeInformation<Tuple2<Void,Long>> tupleType = 
hadoopInputFormat.getProducedType();
+                       TypeInformation<Tuple2<Void,Long>> testTupleType = new 
TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO);
+                       
+                       if(tupleType.isTupleType()) {
+                               
if(!((TupleTypeInfo)tupleType).equals(testTupleType)) {
+                                       fail("Tuple type information was not 
set correctly!");
+                               }
+                       } else {
+                               fail("Type information was not set to tuple 
type information!");
+                       }
+
+               }
+               catch (Exception ex) {
+                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
+               }
+
+       }
+       
+}

Reply via email to