Repository: flink Updated Branches: refs/heads/master d62ab4753 -> ba7a19c10
[FLINK-1271] [hadoop] Remove Writable limitation from Hadoop format and function wrappers This closes #287 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba7a19c1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba7a19c1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba7a19c1 Branch: refs/heads/master Commit: ba7a19c10df9f2abb5fe9828e57f46a49cbcfd18 Parents: d62ab47 Author: FelixNeutatz <neut...@googlemail.com> Authored: Tue Jan 6 20:47:00 2015 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Jan 15 17:34:22 2015 +0100 ---------------------------------------------------------------------- .../mapred/HadoopInputFormat.java | 7 +- .../mapred/HadoopMapFunction.java | 10 +- .../mapred/HadoopOutputFormat.java | 3 +- .../mapred/HadoopReduceCombineFunction.java | 12 +-- .../mapred/HadoopReduceFunction.java | 12 +-- .../mapred/wrapper/HadoopOutputCollector.java | 6 +- .../wrapper/HadoopTupleUnwrappingIterator.java | 12 +-- .../mapreduce/HadoopInputFormat.java | 107 +++++++++---------- .../mapreduce/HadoopOutputFormat.java | 3 +- .../mapred/HadoopInputFormatTest.java | 82 ++++++++++++++ .../mapreduce/HadoopInputFormatTest.java | 84 +++++++++++++++ 11 files changed, 242 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java index d116cdc..326a1c4 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java @@ -25,6 +25,7 @@ import java.io.ObjectOutputStream; import java.util.ArrayList; import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.io.InputFormat; @@ -34,7 +35,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.WritableTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; @@ -44,13 +44,12 @@ import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit; import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.ReflectionUtils; -public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> { +public class HadoopInputFormat<K, V> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> { private static final long serialVersionUID = 1L; @@ -293,6 +292,6 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement @Override public TypeInformation<Tuple2<K,V>> getProducedType() { - return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass)); + return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java index 9bc36f3..dfe0067 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java @@ -29,14 +29,11 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.WritableTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; import org.apache.flink.util.Collector; import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Reporter; @@ -45,8 +42,7 @@ import org.apache.hadoop.mapred.Reporter; * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. */ @SuppressWarnings("rawtypes") -public final class HadoopMapFunction<KEYIN extends WritableComparable, VALUEIN extends Writable, - KEYOUT extends WritableComparable, VALUEOUT extends Writable> +public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { @@ -108,8 +104,8 @@ public final class HadoopMapFunction<KEYIN extends WritableComparable, VALUEIN e Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2); Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3); - final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass); - final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass); + final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); + final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); } http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java index 64c539b..f3abfcd 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java @@ -30,7 +30,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; @@ -41,7 +40,7 @@ import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.util.ReflectionUtils; -public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster { +public class HadoopOutputFormat<K,V> implements OutputFormat<Tuple2<K,V>>, FinalizeOnMaster { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java index 5d83bad..aa9f048 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java @@ -29,15 +29,12 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.WritableTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; @@ -47,8 +44,7 @@ import org.apache.hadoop.mapred.Reporter; */ @SuppressWarnings("rawtypes") @org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable -public final class HadoopReduceCombineFunction<KEYIN extends WritableComparable, VALUEIN extends Writable, - KEYOUT extends WritableComparable, VALUEOUT extends Writable> +public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { @@ -132,9 +128,9 @@ public final class HadoopReduceCombineFunction<KEYIN extends WritableComparable, public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); - - final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass); - final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass); + + final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); + final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); } http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java index 1f0aedd..d9797c3 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java @@ -29,15 +29,12 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.WritableTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; @@ -46,8 +43,7 @@ import org.apache.hadoop.mapred.Reporter; * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. */ @SuppressWarnings("rawtypes") -public final class HadoopReduceFunction<KEYIN extends WritableComparable, VALUEIN extends Writable, - KEYOUT extends WritableComparable, VALUEOUT extends Writable> +public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { @@ -113,9 +109,9 @@ public final class HadoopReduceFunction<KEYIN extends WritableComparable, VALUEI public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); - - final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass); - final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass); + + final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); + final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); } http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java index 280708f..fcb6841 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java @@ -20,8 +20,6 @@ package org.apache.flink.hadoopcompatibility.mapred.wrapper; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.OutputCollector; import java.io.IOException; @@ -32,7 +30,7 @@ import java.io.IOException; * */ @SuppressWarnings("rawtypes") -public final class HadoopOutputCollector<KEY extends WritableComparable, VALUE extends Writable> +public final class HadoopOutputCollector<KEY,VALUE> implements OutputCollector<KEY,VALUE> { private Collector<Tuple2<KEY,VALUE>> flinkCollector; @@ -63,4 +61,4 @@ public final class HadoopOutputCollector<KEY extends WritableComparable, VALUE e this.flinkCollector.collect(outTuple); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java index 83afe39..5ecac2e 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java @@ -20,32 +20,30 @@ package org.apache.flink.hadoopcompatibility.mapred.wrapper; import java.util.Iterator; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.runtime.WritableSerializer; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - +import org.apache.flink.api.java.typeutils.TypeExtractor; /** * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field. */ @SuppressWarnings("rawtypes") -public class HadoopTupleUnwrappingIterator<KEY extends WritableComparable, VALUE extends Writable> +public class HadoopTupleUnwrappingIterator<KEY,VALUE> extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable { private static final long serialVersionUID = 1L; private Iterator<Tuple2<KEY,VALUE>> iterator; - private final WritableSerializer<KEY> keySerializer; + private final TypeSerializer<KEY> keySerializer; private boolean atFirst = false; private KEY curKey = null; private VALUE firstValue = null; public HadoopTupleUnwrappingIterator(Class<KEY> keyClass) { - this.keySerializer = new WritableSerializer<KEY>(keyClass); + this.keySerializer = TypeExtractor.getForClass((Class<KEY>) keyClass).createSerializer(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java index 23e8aae..20006b8 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopInputFormat.java @@ -35,7 +35,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.WritableTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; @@ -43,7 +42,6 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils; import org.apache.flink.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -51,26 +49,27 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +public class HadoopInputFormat<K, V> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> { -public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> { - private static final long serialVersionUID = 1L; - + private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormat.class); - + private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat; private Class<K> keyClass; private Class<V> valueClass; private org.apache.hadoop.conf.Configuration configuration; - + private transient RecordReader<K, V> recordReader; private boolean fetched = false; private boolean hasNext; - + public HadoopInputFormat() { super(); } - + public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) { super(); this.mapreduceInputFormat = mapreduceInputFormat; @@ -79,46 +78,46 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement this.configuration = job.getConfiguration(); HadoopUtils.mergeHadoopConf(configuration); } - + public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) { this.configuration = configuration; } - + public org.apache.hadoop.mapreduce.InputFormat<K,V> getHadoopInputFormat() { return this.mapreduceInputFormat; } - + public void setHadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat) { this.mapreduceInputFormat = mapreduceInputFormat; } - + public org.apache.hadoop.conf.Configuration getConfiguration() { return this.configuration; } - + // -------------------------------------------------------------------------------------------- // InputFormat // -------------------------------------------------------------------------------------------- - + @Override public void configure(Configuration parameters) { // nothing to do } - + @Override public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { // only gather base statistics for FileInputFormats if(!(mapreduceInputFormat instanceof FileInputFormat)) { return null; } - + JobContext jobContext = null; try { jobContext = HadoopUtils.instantiateJobContext(configuration, null); } catch (Exception e) { throw new RuntimeException(e); } - + final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ? (FileBaseStatistics) cachedStats : null; @@ -127,7 +126,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1)); } catch (IOException ioex) { if (LOG.isWarnEnabled()) { - LOG.warn("Could not determine statistics due to an io error: " + LOG.warn("Could not determine statistics due to an io error: " + ioex.getMessage()); } } catch (Throwable t) { @@ -140,19 +139,19 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement // no statistics available return null; } - + @Override public HadoopInputSplit[] createInputSplits(int minNumSplits) throws IOException { configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits); - + JobContext jobContext = null; try { jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID()); } catch (Exception e) { throw new RuntimeException(e); } - + List<org.apache.hadoop.mapreduce.InputSplit> splits; try { splits = this.mapreduceInputFormat.getSplits(jobContext); @@ -160,18 +159,18 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement throw new IOException("Could not get Splits.", e); } HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()]; - + for(int i = 0; i < hadoopInputSplits.length; i++){ hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext); } return hadoopInputSplits; } - + @Override public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { return new LocatableInputSplitAssigner(inputSplits); } - + @Override public void open(HadoopInputSplit split) throws IOException { TaskAttemptContext context = null; @@ -180,7 +179,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement } catch(Exception e) { throw new RuntimeException(e); } - + try { this.recordReader = this.mapreduceInputFormat .createRecordReader(split.getHadoopInputSplit(), context); @@ -191,7 +190,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement this.fetched = false; } } - + @Override public boolean reachedEnd() throws IOException { if(!this.fetched) { @@ -199,7 +198,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement } return !this.hasNext; } - + private void fetchNext() throws IOException { try { this.hasNext = this.recordReader.nextKeyValue(); @@ -209,7 +208,7 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement this.fetched = true; } } - + @Override public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException { if(!this.fetched) { @@ -225,38 +224,38 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement throw new IOException("Could not get KeyValue pair.", e); } this.fetched = false; - + return record; } - + @Override public void close() throws IOException { this.recordReader.close(); } - + // -------------------------------------------------------------------------------------------- // Helper methods // -------------------------------------------------------------------------------------------- - - private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, - ArrayList<FileStatus> files) throws IOException { - + + private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, + ArrayList<FileStatus> files) throws IOException { + long latestModTime = 0L; - + // get the file info and check whether the cached statistics are still valid. for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) { - + final Path filePath = new Path(hadoopPath.toUri()); final FileSystem fs = FileSystem.get(filePath.toUri()); - + final FileStatus file = fs.getFileStatus(filePath); latestModTime = Math.max(latestModTime, file.getModificationTime()); - + // enumerate all files and check their modification time stamp. if (file.isDir()) { FileStatus[] fss = fs.listStatus(filePath); files.ensureCapacity(files.size() + fss.length); - + for (FileStatus s : fss) { if (!s.isDir()) { files.add(s); @@ -267,50 +266,50 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement files.add(file); } } - + // check whether the cached statistics are still valid, if we have any if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { return cachedStats; } - + // calculate the whole length long len = 0; for (FileStatus s : files) { len += s.getLen(); } - + // sanity check if (len <= 0) { len = BaseStatistics.SIZE_UNKNOWN; } - + return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); } - + // -------------------------------------------------------------------------------------------- // Custom serialization methods // -------------------------------------------------------------------------------------------- - + private void writeObject(ObjectOutputStream out) throws IOException { out.writeUTF(this.mapreduceInputFormat.getClass().getName()); out.writeUTF(this.keyClass.getName()); out.writeUTF(this.valueClass.getName()); this.configuration.write(out); } - + @SuppressWarnings("unchecked") private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { String hadoopInputFormatClassName = in.readUTF(); String keyClassName = in.readUTF(); String valueClassName = in.readUTF(); - + org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); configuration.readFields(in); - + if(this.configuration == null) { this.configuration = configuration; } - + try { this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); } catch (Exception e) { @@ -327,13 +326,13 @@ public class HadoopInputFormat<K extends Writable, V extends Writable> implement throw new RuntimeException("Unable to find value class.", e); } } - + // -------------------------------------------------------------------------------------------- // ResultTypeQueryable // -------------------------------------------------------------------------------------------- - + @Override public TypeInformation<Tuple2<K,V>> getProducedType() { - return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass)); + return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java index 402372c..696e1be 100644 --- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java +++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java @@ -30,7 +30,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.hadoopcompatibility.mapreduce.utils.HadoopUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -40,7 +39,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster { +public class HadoopOutputFormat<K,V> implements OutputFormat<Tuple2<K,V>>, FinalizeOnMaster { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java new file mode 100644 index 0000000..00fd1f9 --- /dev/null +++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopInputFormatTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.test.hadoopcompatibility.mapred; + + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapred.FileInputFormat; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.fail; + + +public class HadoopInputFormatTest { + + + public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> { + + public DummyVoidKeyInputFormat() { + } + + @Override + public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(org.apache.hadoop.mapred.InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException { + return null; + } + } + + + @Test + public void checkTypeInformation() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the Hadoop Input Format + Job job = Job.getInstance(); + HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, new JobConf()); + + TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); + TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + + if(tupleType.isTupleType()) { + if(!((TupleTypeInfo)tupleType).equals(testTupleType)) { + fail("Tuple type information was not set correctly!"); + } + } else { + fail("Type information was not set to tuple type information!"); + } + + } + catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ba7a19c1/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java new file mode 100644 index 0000000..d79afaa --- /dev/null +++ b/flink-addons/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputFormatTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.test.hadoopcompatibility.mapreduce; + + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.fail; + + + +public class HadoopInputFormatTest { + + + public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> { + + public DummyVoidKeyInputFormat() { + } + + @Override + public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return null; + } + } + + + @Test + public void checkTypeInformation() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the Hadoop Input Format + Job job = Job.getInstance(); + HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, job); + + TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); + TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + + if(tupleType.isTupleType()) { + if(!((TupleTypeInfo)tupleType).equals(testTupleType)) { + fail("Tuple type information was not set correctly!"); + } + } else { + fail("Type information was not set to tuple type information!"); + } + + } + catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + } + + } + +}