http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java new file mode 100644 index 0000000..97b9768 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility.mapred; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; +import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +/** + * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction. + */ +@SuppressWarnings("rawtypes") +@org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable +public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> + extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> + implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { + + private static final long serialVersionUID = 1L; + + private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer; + private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner; + private transient JobConf jobConf; + + private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator; + private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector; + private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector; + private transient Reporter reporter; + + /** + * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction. + * + * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction. + * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function. + */ + public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, + Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) { + this(hadoopReducer, hadoopCombiner, new JobConf()); + } + + /** + * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction. + * + * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction. + * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function. + * @param conf The JobConf that is used to configure both Hadoop Reducers. + */ + public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, + Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) { + if(hadoopReducer == null) { + throw new NullPointerException("Reducer may not be null."); + } + if(hadoopCombiner == null) { + throw new NullPointerException("Combiner may not be null."); + } + if(conf == null) { + throw new NullPointerException("JobConf may not be null."); + } + + this.reducer = hadoopReducer; + this.combiner = hadoopCombiner; + this.jobConf = conf; + } + + @SuppressWarnings("unchecked") + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.reducer.configure(jobConf); + this.combiner.configure(jobConf); + + this.reporter = new HadoopDummyReporter(); + Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); + TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass((Class<KEYIN>) inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig()); + this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer); + this.combineCollector = new HadoopOutputCollector<KEYIN, VALUEIN>(); + this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); + } + + @Override + public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) + throws Exception { + reduceCollector.setFlinkCollector(out); + valueIterator.set(values.iterator()); + reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter); + } + + @Override + public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception { + combineCollector.setFlinkCollector(out); + valueIterator.set(values.iterator()); + combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter); + } + + @SuppressWarnings("unchecked") + @Override + public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { + Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); + Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); + + final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); + final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); + return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); + } + + /** + * Custom serialization methods. + * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a> + */ + private void writeObject(final ObjectOutputStream out) throws IOException { + + out.writeObject(reducer.getClass()); + out.writeObject(combiner.getClass()); + jobConf.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { + + Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = + (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject(); + reducer = InstantiationUtil.instantiate(reducerClass); + + Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = + (Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject(); + combiner = InstantiationUtil.instantiate(combinerClass); + + jobConf = new JobConf(); + jobConf.readFields(in); + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java new file mode 100644 index 0000000..1c47696 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceFunction.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility.mapred; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; +import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.InstantiationUtil; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +/** + * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. + */ +@SuppressWarnings("rawtypes") +public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> + extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> + implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { + + private static final long serialVersionUID = 1L; + + private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer; + private transient JobConf jobConf; + + private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator; + private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector; + private transient Reporter reporter; + + /** + * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. + * + * @param hadoopReducer The Hadoop Reducer to wrap. + */ + public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer) { + this(hadoopReducer, new JobConf()); + } + + /** + * Maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. + * + * @param hadoopReducer The Hadoop Reducer to wrap. + * @param conf The JobConf that is used to configure the Hadoop Reducer. + */ + public HadoopReduceFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, JobConf conf) { + if(hadoopReducer == null) { + throw new NullPointerException("Reducer may not be null."); + } + if(conf == null) { + throw new NullPointerException("JobConf may not be null."); + } + + this.reducer = hadoopReducer; + this.jobConf = conf; + } + + @SuppressWarnings("unchecked") + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.reducer.configure(jobConf); + + this.reporter = new HadoopDummyReporter(); + this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); + Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); + TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass(inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig()); + this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer); + } + + @Override + public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) + throws Exception { + + reduceCollector.setFlinkCollector(out); + valueIterator.set(values.iterator()); + reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter); + } + + @SuppressWarnings("unchecked") + @Override + public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { + Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); + Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); + + final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); + final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); + return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); + } + + /** + * Custom serialization methods + * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a> + */ + private void writeObject(final ObjectOutputStream out) throws IOException { + + out.writeObject(reducer.getClass()); + jobConf.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { + + Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = + (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject(); + reducer = InstantiationUtil.instantiate(reducerClass); + + jobConf = new JobConf(); + jobConf.readFields(in); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java new file mode 100644 index 0000000..3547e47 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility.mapred.example; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; +import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction; +import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; +import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; + + + +/** + * Implements a word count which takes the input file and counts the number of + * occurrences of each word in the file and writes the result back to disk. + * + * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to + * common Java types for better usage in a Flink job and how to use Hadoop Output Formats. + */ +public class HadoopMapredCompatWordCount { + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: WordCount <input path> <result path>"); + return; + } + + final String inputPath = args[0]; + final String outputPath = args[1]; + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the Hadoop Input Format + HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf()); + TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath)); + + // Create a Flink job with it + DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat); + + DataSet<Tuple2<Text, LongWritable>> words = + text.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer())) + .groupBy(0).reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter())); + + // Set up Hadoop Output Format + HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = + new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf()); + hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " "); + TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath)); + + // Output & Execute + words.output(hadoopOutputFormat).setParallelism(1); + env.execute("Hadoop Compat WordCount"); + } + + + public static final class Tokenizer implements Mapper<LongWritable, Text, Text, LongWritable> { + + @Override + public void map(LongWritable k, Text v, OutputCollector<Text, LongWritable> out, Reporter rep) + throws IOException { + // normalize and split the line + String line = v.toString(); + String[] tokens = line.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Text(token), new LongWritable(1l)); + } + } + } + + @Override + public void configure(JobConf arg0) { } + + @Override + public void close() throws IOException { } + + } + + public static final class Counter implements Reducer<Text, LongWritable, Text, LongWritable> { + + @Override + public void reduce(Text k, Iterator<LongWritable> vs, OutputCollector<Text, LongWritable> out, Reporter rep) + throws IOException { + + long cnt = 0; + while(vs.hasNext()) { + cnt += vs.next().get(); + } + out.collect(k, new LongWritable(cnt)); + + } + + @Override + public void configure(JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java new file mode 100644 index 0000000..fcb6841 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopOutputCollector.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility.mapred.wrapper; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; +import org.apache.hadoop.mapred.OutputCollector; + +import java.io.IOException; + +/** + * A Hadoop OutputCollector that wraps a Flink OutputCollector. + * On each call of collect() the data is forwarded to the wrapped Flink collector. + * + */ +@SuppressWarnings("rawtypes") +public final class HadoopOutputCollector<KEY,VALUE> + implements OutputCollector<KEY,VALUE> { + + private Collector<Tuple2<KEY,VALUE>> flinkCollector; + + private final Tuple2<KEY,VALUE> outTuple = new Tuple2<KEY, VALUE>(); + + /** + * Set the wrapped Flink collector. + * + * @param flinkCollector The wrapped Flink OutputCollector. + */ + public void setFlinkCollector(Collector<Tuple2<KEY, VALUE>> flinkCollector) { + this.flinkCollector = flinkCollector; + } + + /** + * Use the wrapped Flink collector to collect a key-value pair for Flink. + * + * @param key the key to collect + * @param val the value to collect + * @throws IOException unexpected of key or value in key-value pair. + */ + @Override + public void collect(final KEY key, final VALUE val) throws IOException { + + this.outTuple.f0 = key; + this.outTuple.f1 = val; + this.flinkCollector.collect(outTuple); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java new file mode 100644 index 0000000..a063183 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIterator.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility.mapred.wrapper; + +import java.util.Iterator; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator; +import org.apache.flink.api.java.tuple.Tuple2; + +/** + * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field. + */ +@SuppressWarnings("rawtypes") +public class HadoopTupleUnwrappingIterator<KEY,VALUE> + extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + private Iterator<Tuple2<KEY,VALUE>> iterator; + + private final TypeSerializer<KEY> keySerializer; + + private boolean atFirst = false; + private KEY curKey = null; + private VALUE firstValue = null; + + public HadoopTupleUnwrappingIterator(TypeSerializer<KEY> keySerializer) { + this.keySerializer = keySerializer; + } + + /** + * Set the Flink iterator to wrap. + * + * @param iterator The Flink iterator to wrap. + */ + @Override() + public void set(final Iterator<Tuple2<KEY,VALUE>> iterator) { + this.iterator = iterator; + if(this.hasNext()) { + final Tuple2<KEY, VALUE> tuple = iterator.next(); + this.curKey = keySerializer.copy(tuple.f0); + this.firstValue = tuple.f1; + this.atFirst = true; + } else { + this.atFirst = false; + } + } + + @Override + public boolean hasNext() { + if(this.atFirst) { + return true; + } + return iterator.hasNext(); + } + + @Override + public VALUE next() { + if(this.atFirst) { + this.atFirst = false; + return firstValue; + } + + final Tuple2<KEY, VALUE> tuple = iterator.next(); + return tuple.f1; + } + + public KEY getCurrentKey() { + return this.curKey; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java new file mode 100644 index 0000000..f5758eb --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility.mapreduce.example; + +import org.apache.flink.api.java.aggregation.Aggregations; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; + +/** + * Implements a word count which takes the input file and counts the number of + * occurrences of each word in the file and writes the result back to disk. + * + * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to + * common Java types for better usage in a Flink job and how to use Hadoop Output Formats. + */ +@SuppressWarnings("serial") +public class WordCount { + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: WordCount <input path> <result path>"); + return; + } + + final String inputPath = args[0]; + final String outputPath = args[1]; + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the Hadoop Input Format + Job job = Job.getInstance(); + HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job); + TextInputFormat.addInputPath(job, new Path(inputPath)); + + // Create a Flink job with it + DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat); + + // Tokenize the line and convert from Writable "Text" to String for better handling + DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer()); + + // Sum up the words + DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1); + + // Convert String back to Writable "Text" for use with Hadoop Output Format + DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper()); + + // Set up Hadoop Output Format + HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job); + hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); + hadoopOutputFormat.getConfiguration().set("mapred.textoutputformat.separator", " "); // set the value for both, since this test + // is being executed with both types (hadoop1 and hadoop2 profile) + TextOutputFormat.setOutputPath(job, new Path(outputPath)); + + // Output & Execute + hadoopResult.output(hadoopOutputFormat); + env.execute("Word Count"); + } + + /** + * Splits a line into words and converts Hadoop Writables into normal Java data types. + */ + public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> { + + @Override + public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) { + // normalize and split the line + String line = value.f1.toString(); + String[] tokens = line.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + } + + /** + * Converts Java data types to Hadoop Writables. + */ + public static final class HadoopDatatypeMapper extends RichMapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> { + + @Override + public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception { + return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1)); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java new file mode 100644 index 0000000..4d1acb4 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred; + +import java.io.IOException; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class HadoopMapFunctionITCase extends MultipleProgramsTestBase { + + public HadoopMapFunctionITCase(TestExecutionMode mode){ + super(mode); + } + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testNonPassingMapper() throws Exception{ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env); + DataSet<Tuple2<IntWritable, Text>> nonPassingFlatMapDs = ds. + flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new NonPassingMapper())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + nonPassingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + + compareResultsByLinesInMemory("\n", resultPath); + } + + @Test + public void testDataDuplicatingMapper() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env); + DataSet<Tuple2<IntWritable, Text>> duplicatingFlatMapDs = ds. + flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new DuplicatingMapper())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + duplicatingFlatMapDs.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + + String expected = "(1,Hi)\n" + "(1,HI)\n" + + "(2,Hello)\n" + "(2,HELLO)\n" + + "(3,Hello world)\n" + "(3,HELLO WORLD)\n" + + "(4,Hello world, how are you?)\n" + "(4,HELLO WORLD, HOW ARE YOU?)\n" + + "(5,I am fine.)\n" + "(5,I AM FINE.)\n" + + "(6,Luke Skywalker)\n" + "(6,LUKE SKYWALKER)\n" + + "(7,Comment#1)\n" + "(7,COMMENT#1)\n" + + "(8,Comment#2)\n" + "(8,COMMENT#2)\n" + + "(9,Comment#3)\n" + "(9,COMMENT#3)\n" + + "(10,Comment#4)\n" + "(10,COMMENT#4)\n" + + "(11,Comment#5)\n" + "(11,COMMENT#5)\n" + + "(12,Comment#6)\n" + "(12,COMMENT#6)\n" + + "(13,Comment#7)\n" + "(13,COMMENT#7)\n" + + "(14,Comment#8)\n" + "(14,COMMENT#8)\n" + + "(15,Comment#9)\n" + "(15,COMMENT#9)\n" + + "(16,Comment#10)\n" + "(16,COMMENT#10)\n" + + "(17,Comment#11)\n" + "(17,COMMENT#11)\n" + + "(18,Comment#12)\n" + "(18,COMMENT#12)\n" + + "(19,Comment#13)\n" + "(19,COMMENT#13)\n" + + "(20,Comment#14)\n" + "(20,COMMENT#14)\n" + + "(21,Comment#15)\n" + "(21,COMMENT#15)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testConfigurableMapper() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + JobConf conf = new JobConf(); + conf.set("my.filterPrefix", "Hello"); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env); + DataSet<Tuple2<IntWritable, Text>> hellos = ds. + flatMap(new HadoopMapFunction<IntWritable, Text, IntWritable, Text>(new ConfigurableMapper(), conf)); + + String resultPath = tempFolder.newFile().toURI().toString(); + + hellos.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + env.execute(); + + String expected = "(2,Hello)\n" + + "(3,Hello world)\n" + + "(4,Hello world, how are you?)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + + + public static class NonPassingMapper implements Mapper<IntWritable, Text, IntWritable, Text> { + + @Override + public void map(final IntWritable k, final Text v, + final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException { + if ( v.toString().contains("bananas") ) { + out.collect(k,v); + } + } + + @Override + public void configure(final JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class DuplicatingMapper implements Mapper<IntWritable, Text, IntWritable, Text> { + + @Override + public void map(final IntWritable k, final Text v, + final OutputCollector<IntWritable, Text> out, final Reporter r) throws IOException { + out.collect(k, v); + out.collect(k, new Text(v.toString().toUpperCase())); + } + + @Override + public void configure(final JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class ConfigurableMapper implements Mapper<IntWritable, Text, IntWritable, Text> { + private String filterPrefix; + + @Override + public void map(IntWritable k, Text v, OutputCollector<IntWritable, Text> out, Reporter r) + throws IOException { + if(v.toString().startsWith(filterPrefix)) { + out.collect(k, v); + } + } + + @Override + public void configure(JobConf c) { + filterPrefix = c.get("my.filterPrefix"); + } + + @Override + public void close() throws IOException { } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java new file mode 100644 index 0000000..bbb7503 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred; + +import org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.JavaProgramTestBase; + +public class HadoopMapredITCase extends JavaProgramTestBase { + + protected String textPath; + protected String resultPath; + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + this.setParallelism(4); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"}); + } + + @Override + protected void testProgram() throws Exception { + HadoopMapredCompatWordCount.main(new String[] { textPath, resultPath }); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java new file mode 100644 index 0000000..13d971c --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction; +import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.hamcrest.core.IsEqual; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase { + + public HadoopReduceCombineFunctionITCase(TestExecutionMode mode){ + super(mode); + } + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testStandardCountingWithCombiner() throws Exception{ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper1()); + + DataSet<Tuple2<IntWritable, IntWritable>> counts = ds. + groupBy(0). + reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>( + new SumReducer(), new SumReducer())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + counts.writeAsText(resultPath); + env.execute(); + + String expected = "(0,5)\n"+ + "(1,6)\n" + + "(2,6)\n" + + "(3,4)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testUngroupedHadoopReducer() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper2()); + + DataSet<Tuple2<IntWritable, IntWritable>> sum = ds. + reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>( + new SumReducer(), new SumReducer())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + sum.writeAsText(resultPath); + env.execute(); + + String expected = "(0,231)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testCombiner() throws Exception { + org.junit.Assume.assumeThat(mode, new IsEqual<TestExecutionMode>(TestExecutionMode.CLUSTER)); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, IntWritable>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper3()); + + DataSet<Tuple2<IntWritable, IntWritable>> counts = ds. + groupBy(0). + reduceGroup(new HadoopReduceCombineFunction<IntWritable, IntWritable, IntWritable, IntWritable>( + new SumReducer(), new KeyChangingReducer())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + counts.writeAsText(resultPath); + env.execute(); + + String expected = "(0,5)\n"+ + "(1,6)\n" + + "(2,5)\n" + + "(3,5)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testConfigurationViaJobConf() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + JobConf conf = new JobConf(); + conf.set("my.cntPrefix", "Hello"); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper4()); + + DataSet<Tuple2<IntWritable, IntWritable>> hellos = ds. + groupBy(0). + reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>( + new ConfigurableCntReducer(), conf)); + + String resultPath = tempFolder.newFile().toURI().toString(); + + hellos.writeAsText(resultPath); + env.execute(); + + // return expected result + String expected = "(0,0)\n"+ + "(1,0)\n" + + "(2,1)\n" + + "(3,1)\n" + + "(4,1)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + public static class SumReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { + + @Override + public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + + int sum = 0; + while(v.hasNext()) { + sum += v.next().get(); + } + out.collect(k, new IntWritable(sum)); + } + + @Override + public void configure(JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class KeyChangingReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { + + @Override + public void reduce(IntWritable k, Iterator<IntWritable> v, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + while(v.hasNext()) { + out.collect(new IntWritable(k.get() % 4), v.next()); + } + } + + @Override + public void configure(JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { + private String countPrefix; + + @Override + public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + int commentCnt = 0; + while(vs.hasNext()) { + String v = vs.next().toString(); + if(v.startsWith(this.countPrefix)) { + commentCnt++; + } + } + out.collect(k, new IntWritable(commentCnt)); + } + + @Override + public void configure(final JobConf c) { + this.countPrefix = c.get("my.cntPrefix"); + } + + @Override + public void close() throws IOException { } + } + + public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, + IntWritable>> { + private static final long serialVersionUID = 1L; + Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>(); + @Override + public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v) + throws Exception { + outT.f0 = new IntWritable(v.f0.get() / 6); + outT.f1 = new IntWritable(1); + return outT; + } + } + + public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, + IntWritable>> { + private static final long serialVersionUID = 1L; + Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>(); + @Override + public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v) + throws Exception { + outT.f0 = new IntWritable(0); + outT.f1 = v.f0; + return outT; + } + } + + public static class Mapper3 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>> { + private static final long serialVersionUID = 1L; + Tuple2<IntWritable,IntWritable> outT = new Tuple2<IntWritable,IntWritable>(); + @Override + public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> v) + throws Exception { + outT.f0 = v.f0; + outT.f1 = new IntWritable(1); + return outT; + } + } + + public static class Mapper4 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> { + private static final long serialVersionUID = 1L; + @Override + public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v) + throws Exception { + v.f0 = new IntWritable(v.f0.get() % 5); + return v; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java new file mode 100644 index 0000000..abc0e9c --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceFunctionITCase.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class HadoopReduceFunctionITCase extends MultipleProgramsTestBase { + + public HadoopReduceFunctionITCase(TestExecutionMode mode){ + super(mode); + } + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Test + public void testStandardGrouping() throws Exception{ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper1()); + + DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds. + groupBy(0). + reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new CommentCntReducer())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + commentCnts.writeAsText(resultPath); + env.execute(); + + String expected = "(0,0)\n"+ + "(1,3)\n" + + "(2,5)\n" + + "(3,5)\n" + + "(4,2)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testUngroupedHadoopReducer() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env); + + DataSet<Tuple2<IntWritable, IntWritable>> commentCnts = ds. + reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>(new AllCommentCntReducer())); + + String resultPath = tempFolder.newFile().toURI().toString(); + + commentCnts.writeAsText(resultPath); + env.execute(); + + String expected = "(42,15)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testConfigurationViaJobConf() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + JobConf conf = new JobConf(); + conf.set("my.cntPrefix", "Hello"); + + DataSet<Tuple2<IntWritable, Text>> ds = HadoopTestData.getKVPairDataSet(env). + map(new Mapper2()); + + DataSet<Tuple2<IntWritable, IntWritable>> helloCnts = ds. + groupBy(0). + reduceGroup(new HadoopReduceFunction<IntWritable, Text, IntWritable, IntWritable>( + new ConfigurableCntReducer(), conf)); + + String resultPath = tempFolder.newFile().toURI().toString(); + + helloCnts.writeAsText(resultPath); + env.execute(); + + String expected = "(0,0)\n"+ + "(1,0)\n" + + "(2,1)\n" + + "(3,1)\n" + + "(4,1)\n"; + + compareResultsByLinesInMemory(expected, resultPath); + } + + public static class CommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { + + @Override + public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + int commentCnt = 0; + while(vs.hasNext()) { + String v = vs.next().toString(); + if(v.startsWith("Comment")) { + commentCnt++; + } + } + out.collect(k, new IntWritable(commentCnt)); + } + + @Override + public void configure(final JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class AllCommentCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { + + @Override + public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + int commentCnt = 0; + while(vs.hasNext()) { + String v = vs.next().toString(); + if(v.startsWith("Comment")) { + commentCnt++; + } + } + out.collect(new IntWritable(42), new IntWritable(commentCnt)); + } + + @Override + public void configure(final JobConf arg0) { } + + @Override + public void close() throws IOException { } + } + + public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> { + private String countPrefix; + + @Override + public void reduce(IntWritable k, Iterator<Text> vs, OutputCollector<IntWritable, IntWritable> out, Reporter r) + throws IOException { + int commentCnt = 0; + while(vs.hasNext()) { + String v = vs.next().toString(); + if(v.startsWith(this.countPrefix)) { + commentCnt++; + } + } + out.collect(k, new IntWritable(commentCnt)); + } + + @Override + public void configure(final JobConf c) { + this.countPrefix = c.get("my.cntPrefix"); + } + + @Override + public void close() throws IOException { } + } + + public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> { + private static final long serialVersionUID = 1L; + @Override + public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v) + throws Exception { + v.f0 = new IntWritable(v.f0.get() / 5); + return v; + } + } + + public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> { + private static final long serialVersionUID = 1L; + @Override + public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> v) + throws Exception { + v.f0 = new IntWritable(v.f0.get() % 5); + return v; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java new file mode 100644 index 0000000..eed6f8f --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopTestData.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; + +public class HadoopTestData { + + public static DataSet<Tuple2<IntWritable, Text>> getKVPairDataSet(ExecutionEnvironment env) { + + List<Tuple2<IntWritable, Text>> data = new ArrayList<Tuple2<IntWritable, Text>>(); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(1),new Text("Hi"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(2),new Text("Hello"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(3),new Text("Hello world"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(4),new Text("Hello world, how are you?"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(5),new Text("I am fine."))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(6),new Text("Luke Skywalker"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(7),new Text("Comment#1"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(8),new Text("Comment#2"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(9),new Text("Comment#3"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(10),new Text("Comment#4"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(11),new Text("Comment#5"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(12),new Text("Comment#6"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(13),new Text("Comment#7"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(14),new Text("Comment#8"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(15),new Text("Comment#9"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(16),new Text("Comment#10"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(17),new Text("Comment#11"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(18),new Text("Comment#12"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(19),new Text("Comment#13"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(20),new Text("Comment#14"))); + data.add(new Tuple2<IntWritable, Text>(new IntWritable(21),new Text("Comment#15"))); + + Collections.shuffle(data); + + return env.fromCollection(data); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java new file mode 100644 index 0000000..524318c --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/wrapper/HadoopTupleUnwrappingIteratorTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapred.wrapper; + +import java.util.ArrayList; +import java.util.NoSuchElementException; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.WritableSerializer; +import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; +import org.apache.hadoop.io.IntWritable; +import org.junit.Assert; +import org.junit.Test; + +public class HadoopTupleUnwrappingIteratorTest { + + @Test + public void testValueIterator() { + + HadoopTupleUnwrappingIterator<IntWritable, IntWritable> valIt = + new HadoopTupleUnwrappingIterator<IntWritable, IntWritable>(new WritableSerializer + <IntWritable>(IntWritable.class)); + + // many values + + ArrayList<Tuple2<IntWritable, IntWritable>> tList = new ArrayList<Tuple2<IntWritable, IntWritable>>(); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(1))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(2))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(3))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(4))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(5))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(6))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(7))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(1),new IntWritable(8))); + + int expectedKey = 1; + int[] expectedValues = new int[] {1,2,3,4,5,6,7,8}; + + valIt.set(tList.iterator()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + for(int expectedValue : expectedValues) { + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.next().get() == expectedValue); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + } + Assert.assertFalse(valIt.hasNext()); + Assert.assertFalse(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + + // one value + + tList.clear(); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(2),new IntWritable(10))); + + expectedKey = 2; + expectedValues = new int[]{10}; + + valIt.set(tList.iterator()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + for(int expectedValue : expectedValues) { + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.next().get() == expectedValue); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + } + Assert.assertFalse(valIt.hasNext()); + Assert.assertFalse(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + + // more values + + tList.clear(); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(10))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(4))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(7))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(3),new IntWritable(9))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(21))); + + expectedKey = 3; + expectedValues = new int[]{10,4,7,9,21}; + + valIt.set(tList.iterator()); + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + for(int expectedValue : expectedValues) { + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.hasNext()); + Assert.assertTrue(valIt.next().get() == expectedValue); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + } + Assert.assertFalse(valIt.hasNext()); + Assert.assertFalse(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + + // no has next calls + + tList.clear(); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(5))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(8))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(42))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(-1))); + tList.add(new Tuple2<IntWritable, IntWritable>(new IntWritable(4),new IntWritable(0))); + + expectedKey = 4; + expectedValues = new int[]{5,8,42,-1,0}; + + valIt.set(tList.iterator()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + for(int expectedValue : expectedValues) { + Assert.assertTrue(valIt.next().get() == expectedValue); + } + try { + valIt.next(); + Assert.fail(); + } catch (NoSuchElementException nsee) { + // expected + } + Assert.assertFalse(valIt.hasNext()); + Assert.assertTrue(valIt.getCurrentKey().get() == expectedKey); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java new file mode 100644 index 0000000..9b4aeea --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoopcompatibility.mapreduce; + +import org.apache.flink.hadoopcompatibility.mapreduce.example.WordCount; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.JavaProgramTestBase; + +public class HadoopInputOutputITCase extends JavaProgramTestBase { + + protected String textPath; + protected String resultPath; + + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + this.setParallelism(4); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[]{".", "_"}); + } + + @Override + protected void testProgram() throws Exception { + WordCount.main(new String[] { textPath, resultPath }); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..0b686e5 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=OFF, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml b/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml new file mode 100644 index 0000000..8b3bb27 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/resources/logback-test.xml @@ -0,0 +1,29 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-hbase/pom.xml b/flink-batch-connectors/flink-hbase/pom.xml new file mode 100644 index 0000000..ba4cf85 --- /dev/null +++ b/flink-batch-connectors/flink-hbase/pom.xml @@ -0,0 +1,220 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-batch-connectors</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-hbase</artifactId> + <name>flink-hbase</name> + <packaging>jar</packaging> + + <properties> + <hbase.hadoop1.version>0.98.11-hadoop1</hbase.hadoop1.version> + <hbase.hadoop2.version>0.98.11-hadoop2</hbase.hadoop2.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>${shading-artifact.name}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-hadoop-compatibility</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-include-yarn</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- HBase server needed for TableOutputFormat --> + <!-- TODO implement bulk output format for HBase --> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <exclusions> + <!-- Remove unneeded dependency, which is conflicting with our jetty-util version. --> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-sslengine</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-api-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api-2.5</artifactId> + </exclusion> + <!-- The hadoop dependencies are handled through flink-shaded-hadoop --> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </exclusion> + <!-- Bug in hbase annotations, can be removed when fixed. See FLINK-2153. --> + <exclusion> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-annotations</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>hadoop-1</id> + <activation> + <property> + <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh --> + <!--hadoop1--><name>hadoop.profile</name><value>1</value> + </property> + </activation> + <properties> + <hbase.version>${hbase.hadoop1.version}</hbase.version> + </properties> + </profile> + + <profile> + <id>hadoop-2</id> + <repositories> + <repository> + <id>hadoop-2-repo2</id> + <url>https://repo.maven.apache.org/maven2</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + <activation> + <property> + <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh --> + <!--hadoop2--><name>!hadoop.profile</name> + </property> + </activation> + <properties> + <hbase.version>${hbase.hadoop2.version}</hbase.version> + </properties> + </profile> + + <profile> + <id>cdh5.1.3</id> + <properties> + <hadoop.profile>2</hadoop.profile> + <hbase.version>0.98.1-cdh5.1.3</hbase.version> + <hadoop.version>2.3.0-cdh5.1.3</hadoop.version> + <!-- Cloudera use different versions for hadoop core and commons--> + <!-- This profile could be removed if Cloudera fix this mismatch! --> + <hadoop.core.version>2.3.0-mr1-cdh5.1.3</hadoop.core.version> + </properties> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <version>${hadoop.core.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + </profile> + + </profiles> + +</project>