Repository: flink Updated Branches: refs/heads/release-0.8 944e2e3d5 -> cd2f88afd
http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java new file mode 100644 index 0000000..236d149 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapreduce; + + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.fail; + + + +public class HadoopInputFormatTest { + + + public class DummyVoidKeyInputFormat<T> extends FileInputFormat<Void, T> { + + public DummyVoidKeyInputFormat() { + } + + @Override + public RecordReader<Void, T> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { + return null; + } + } + + + @Test + public void checkTypeInformation() { + try { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // Set up the Hadoop Input Format + Job job = Job.getInstance(); + HadoopInputFormat<Void, Long> hadoopInputFormat = new HadoopInputFormat<Void, Long>( new DummyVoidKeyInputFormat(), Void.class, Long.class, job); + + TypeInformation<Tuple2<Void,Long>> tupleType = hadoopInputFormat.getProducedType(); + TypeInformation<Tuple2<Void,Long>> testTupleType = new TupleTypeInfo<Tuple2<Void,Long>>(BasicTypeInfo.VOID_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO); + + if(tupleType.isTupleType()) { + if(!((TupleTypeInfo)tupleType).equals(testTupleType)) { + fail("Tuple type information was not set correctly!"); + } + } else { + fail("Type information was not set to tuple type information!"); + } + + } + catch (Exception ex) { + fail("Test failed due to a " + ex.getClass().getSimpleName() + ": " + ex.getMessage()); + } + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index 43f8609..089412e 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -25,16 +25,23 @@ import org.apache.flink.api.java.io._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.typeutils.{ValueTypeInfo, TupleTypeInfoBase} +import org.apache.flink.api.scala.hadoop.mapred +import org.apache.flink.api.scala.hadoop.mapreduce import org.apache.flink.api.scala.operators.ScalaCsvInputFormat import org.apache.flink.core.fs.Path -import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv, -CollectionEnvironment} +import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv, CollectionEnvironment} import org.apache.flink.api.common.io.{InputFormat, FileInputFormat} import org.apache.flink.api.java.operators.DataSource import org.apache.flink.types.StringValue import org.apache.flink.util.{NumberSequenceIterator, SplittableIterator} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat} +import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, Job} +import org.apache.hadoop.mapred.{FileInputFormat => MapredFileInputFormat, +InputFormat => MapredInputFormat, JobConf} +import org.apache.hadoop.fs.{Path => HadoopPath} + import scala.collection.JavaConverters._ @@ -269,6 +276,92 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { } /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. The + * given inputName is set on the given job. + */ + def readHadoopFile[K, V]( + mapredInputFormat: MapredFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String, + job: JobConf) + (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { + val result = createHadoopInput(mapredInputFormat, key, value, job) + MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) + result + } + + /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. A + * [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created. + */ + def readHadoopFile[K, V]( + mapredInputFormat: MapredFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String) + (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { + readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf) + } + + /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.InputFormat]]. + */ + def createHadoopInput[K, V]( + mapredInputFormat: MapredInputFormat[K, V], + key: Class[K], + value: Class[V], + job: JobConf) + (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { + val hadoopInputFormat = new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job) + createInput(hadoopInputFormat) + } + + /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. + * The given inputName is set on the given job. + */ + def readHadoopFile[K, V]( + mapredInputFormat: MapreduceFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String, + job: Job) + (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { + val result = createHadoopInput(mapredInputFormat, key, value, job) + MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) + result + } + + /** + * Creates a [[DataSet]] from the given + * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A + * [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be created. + */ + def readHadoopFile[K, V]( + mapredInputFormat: MapreduceFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String) + (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = { + readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance) + } + + /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.InputFormat]]. + */ + def createHadoopInput[K, V]( + mapredInputFormat: MapreduceInputFormat[K, V], + key: Class[K], + value: Class[V], + job: Job) + (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = { + val hadoopInputFormat = + new mapreduce.HadoopInputFormat[K, V](mapredInputFormat, key, value, job) + createInput(hadoopInputFormat) + } + + /** * Creates a DataSet from the given non-empty [[Seq]]. The elements need to be serializable * because the framework may move the elements into the cluster if needed. * http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala new file mode 100644 index 0000000..5170d14 --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.hadoop.mapred + +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase +import org.apache.hadoop.mapred.{JobConf, InputFormat} + +class HadoopInputFormat[K, V]( + mapredInputFormat: InputFormat[K, V], + keyClass: Class[K], + valueClass: Class[V], + job: JobConf) + extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job) { + + def nextRecord(reuse: (K, V)): (K, V) = { + if (!fetched) { + fetchNext() + } + if (!hasNext) { + return null + } + fetched = false + (key, value) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala new file mode 100644 index 0000000..180a8bf --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.hadoop.mapred + +import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase +import org.apache.hadoop.mapred.{JobConf, OutputFormat} + +class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: JobConf) + extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) { + + def writeRecord(record: (K, V)) { + this.recordWriter.write(record._1, record._2) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala new file mode 100644 index 0000000..cafbdcb --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.hadoop.mapreduce + +import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase +import org.apache.hadoop.mapreduce.{InputFormat, Job} + +class HadoopInputFormat[K, V]( + mapredInputFormat: InputFormat[K, V], + keyClass: Class[K], + valueClass: Class[V], + job: Job) + extends HadoopInputFormatBase[K, V, (K, V)](mapredInputFormat, keyClass, valueClass, job) { + + def nextRecord(reuse: (K, V)): (K, V) = { + if (!fetched) { + fetchNext() + } + if (!hasNext) { + return null + } + fetched = false + (recordReader.getCurrentKey, recordReader.getCurrentValue) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala new file mode 100644 index 0000000..51db9de --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.hadoop.mapreduce + +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase +import org.apache.hadoop.mapreduce.{Job, OutputFormat} + +class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: Job) + extends HadoopOutputFormatBase[K, V, (K, V)](mapredOutputFormat, job) { + + def writeRecord(record: (K, V)) { + this.recordWriter.write(record._1, record._2) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 3ee7a26..b94283d 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -116,12 +116,12 @@ under the License. <scope>test</scope> </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - <scope>test</scope> - </dependency> + <!--<dependency>--> + <!--<groupId>com.google.guava</groupId>--> + <!--<artifactId>guava</artifactId>--> + <!--<version>${guava.version}</version>--> + <!--<scope>test</scope>--> + <!--</dependency>--> <dependency> <groupId>org.scalatest</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java new file mode 100644 index 0000000..037610e --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoop.mapred; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.Collector; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; + +public class WordCountMapredITCase extends JavaProgramTestBase { + + protected String textPath; + protected String resultPath; + + public WordCountMapredITCase(){ +// setDegreeOfParallelism(4); +// setNumTaskManagers(2); +// setTaskManagerNumSlots(2); + } + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[] {".", "_"}); + } + + @Override + protected void testProgram() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +// env.setDegreeOfParallelism(1); + + + DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(), + LongWritable.class, Text.class, textPath); + + DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() { + @Override + public String map(Tuple2<LongWritable, Text> value) throws Exception { + return value.f1.toString(); + } + }); + + + DataSet<Tuple2<String, Integer>> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() { + + + @Override + public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exception { + return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1)); + } + }); + + // Set up Hadoop Output Format + HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = + new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), new JobConf()); + hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " "); + TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(resultPath)); + + // Output & Execute + words.output(hadoopOutputFormat); + env.execute("Hadoop Compat WordCount"); + } + + public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { + + @Override + public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java new file mode 100644 index 0000000..3bdaa22 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.hadoop.mapreduce; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.Collector; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +public class WordCountMapreduceITCase extends JavaProgramTestBase { + + protected String textPath; + protected String resultPath; + + public WordCountMapreduceITCase(){ +// setDegreeOfParallelism(4); +// setNumTaskManagers(2); +// setTaskManagerNumSlots(2); + } + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, new String[] {".", "_"}); + } + + @Override + protected void testProgram() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + + DataSet<Tuple2<LongWritable, Text>> input = env.readHadoopFile(new TextInputFormat(), + LongWritable.class, Text.class, textPath); + + DataSet<String> text = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() { + @Override + public String map(Tuple2<LongWritable, Text> value) throws Exception { + return value.f1.toString(); + } + }); + + + DataSet<Tuple2<String, Integer>> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + DataSet<Tuple2<Text, LongWritable>> words = counts.map(new MapFunction<Tuple2<String, Integer>, Tuple2<Text, LongWritable>>() { + + + @Override + public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exception { + return new Tuple2<Text, LongWritable>(new Text(value.f0), new LongWritable(value.f1)); + } + }); + + // Set up Hadoop Output Format + Job job = Job.getInstance(); + HadoopOutputFormat<Text, LongWritable> hadoopOutputFormat = + new HadoopOutputFormat<Text, LongWritable>(new TextOutputFormat<Text, LongWritable>(), job); + job.getConfiguration().set("mapred.textoutputformat.separator", " "); + TextOutputFormat.setOutputPath(job, new Path(resultPath)); + + // Output & Execute + words.output(hadoopOutputFormat); + env.execute("Hadoop Compat WordCount"); + } + + public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { + + @Override + public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2<String, Integer>(token, 1)); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala new file mode 100644 index 0000000..25c878f --- /dev/null +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.hadoop.mapred + +import org.apache.flink.api.scala._ + +import org.apache.flink.test.testdata.WordCountData +import org.apache.flink.test.util.JavaProgramTestBase +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{Text, LongWritable} +import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextOutputFormat, TextInputFormat} + +class WordCountMapredITCase extends JavaProgramTestBase { + protected var textPath: String = null + protected var resultPath: String = null + + protected override def preSubmit() { + textPath = createTempFile("text.txt", WordCountData.TEXT) + resultPath = getTempDirPath("result") + } + + protected override def postSubmit() { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, Array[String](".", "_")) + } + + protected def testProgram() { + val env = ExecutionEnvironment.getExecutionEnvironment + + val input = + env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) + + val text = input map { _._2.toString } + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .groupBy(0) + .sum(1) + + val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) } + + val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable]( + new TextOutputFormat[Text, LongWritable], + new JobConf) + hadoopOutputFormat.getJobConf.set("mapred.textoutputformat.separator", " ") + + FileOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf, new Path(resultPath)) + + words.output(hadoopOutputFormat) + + env.execute("Hadoop Compat WordCount") + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/cd2f88af/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala new file mode 100644 index 0000000..4178267 --- /dev/null +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.hadoop.mapreduce + +import org.apache.flink.api.scala._ +import org.apache.flink.test.testdata.WordCountData +import org.apache.flink.test.util.JavaProgramTestBase +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{Text, LongWritable} +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} + +class WordCountMapreduceITCase extends JavaProgramTestBase { + protected var textPath: String = null + protected var resultPath: String = null + + protected override def preSubmit() { + textPath = createTempFile("text.txt", WordCountData.TEXT) + resultPath = getTempDirPath("result") + } + + protected override def postSubmit() { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath, Array[String](".", "_")) + } + + protected def testProgram() { + val env = ExecutionEnvironment.getExecutionEnvironment + + val input = + env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) + + val text = input map { _._2.toString } + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .groupBy(0) + .sum(1) + + val words = counts map { t => (new Text(t._1), new LongWritable(t._2)) } + + val job = Job.getInstance() + val hadoopOutputFormat = new HadoopOutputFormat[Text,LongWritable]( + new TextOutputFormat[Text, LongWritable], + job) + hadoopOutputFormat.getConfiguration.set("mapred.textoutputformat.separator", " ") + + FileOutputFormat.setOutputPath(job, new Path(resultPath)) + + words.output(hadoopOutputFormat) + + env.execute("Hadoop Compat WordCount") + } +} +
