This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b1c708b60a0e376ba6c510d8f86b417ee78184a7 Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Tue Nov 9 15:35:44 2021 +0100 [FLINK-21407][doc][formats] Update hadoop doc --- .../docs/connectors/datastream/formats/hadoop.md | 71 +++------------------- docs/content/docs/dev/dataset/hadoop_map_reduce.md | 13 +--- 2 files changed, 11 insertions(+), 73 deletions(-) diff --git a/docs/content/docs/connectors/datastream/formats/hadoop.md b/docs/content/docs/connectors/datastream/formats/hadoop.md index 0756f02..56553ca 100644 --- a/docs/content/docs/connectors/datastream/formats/hadoop.md +++ b/docs/content/docs/connectors/datastream/formats/hadoop.md @@ -30,19 +30,10 @@ under the License. ## Project Configuration -Support for Hadoop input/output formats is part of the `flink-java` and -`flink-scala` Maven modules that are always required when writing Flink jobs. -The code is located in `org.apache.flink.api.java.hadoop` and -`org.apache.flink.api.scala.hadoop` in an additional sub-package for the -`mapred` and `mapreduce` API. - -Support for Hadoop Mappers and Reducers is contained in the `flink-hadoop-compatibility` +Support for Hadoop is contained in the `flink-hadoop-compatibility` Maven module. -This code resides in the `org.apache.flink.hadoopcompatibility` -package. -Add the following dependency to your `pom.xml` if you want to reuse Mappers -and Reducers. +Add the following dependency to your `pom.xml` to use hadoop ```xml <dependency> @@ -75,7 +66,7 @@ input formats. The resulting `InputFormat` can be used to create a data source by using `ExecutionEnvironmen#createInput`. -The resulting `DataSet` contains 2-tuples where the first field +The resulting `DataStream` contains 2-tuples where the first field is the key and the second field is the value retrieved from the Hadoop InputFormat. @@ -85,9 +76,9 @@ The following example shows how to use Hadoop's `TextInputFormat`. {{< tab "Java" >}} ```java -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); -DataSet<Tuple2<LongWritable, Text>> input = +DataStream<Tuple2<LongWritable, Text>> input = env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath)); @@ -99,9 +90,9 @@ DataSet<Tuple2<LongWritable, Text>> input = {{< tab "Scala" >}} ```scala -val env = ExecutionEnvironment.getExecutionEnvironment +val env = StreamExecutionEnvironment.getExecutionEnvironment -val input: DataSet[(LongWritable, Text)] = +val input: DataStream[(LongWritable, Text)] = env.createInput(HadoopInputs.readHadoopFile( new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)) @@ -127,7 +118,7 @@ The following example shows how to use Hadoop's `TextOutputFormat`. ```java // Obtain the result we want to emit -DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...] +DataStream<Tuple2<Text, IntWritable>> hadoopResult = [...] // Set up the Hadoop TextOutputFormat. HadoopOutputFormat<Text, IntWritable> hadoopOF = @@ -148,7 +139,7 @@ hadoopResult.output(hadoopOF); ```scala // Obtain your result to emit. -val hadoopResult: DataSet[(Text, IntWritable)] = [...] +val hadoopResult: DataStream[(Text, IntWritable)] = [...] val hadoopOF = new HadoopOutputFormat[Text,IntWritable]( new TextOutputFormat[Text, IntWritable], @@ -165,48 +156,4 @@ hadoopResult.output(hadoopOF) {{< /tab >}} {{< /tabs >}} -## Complete Hadoop WordCount Example - -The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations. - -```java -ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - -// Set up the Hadoop TextInputFormat. -Job job = Job.getInstance(); -HadoopInputFormat<LongWritable, Text> hadoopIF = - new HadoopInputFormat<LongWritable, Text>( - new TextInputFormat(), LongWritable.class, Text.class, job - ); -TextInputFormat.addInputPath(job, new Path(inputPath)); - -// Read data using the Hadoop TextInputFormat. -DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF); - -DataSet<Tuple2<Text, LongWritable>> result = text - // use Hadoop Mapper (Tokenizer) as MapFunction - .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>( - new Tokenizer() - )) - .groupBy(0) - // use Hadoop Reducer (Counter) as Reduce- and CombineFunction - .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>( - new Counter(), new Counter() - )); - -// Set up the Hadoop TextOutputFormat. -HadoopOutputFormat<Text, LongWritable> hadoopOF = - new HadoopOutputFormat<Text, LongWritable>( - new TextOutputFormat<Text, LongWritable>(), job - ); -hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); -TextOutputFormat.setOutputPath(job, new Path(outputPath)); - -// Emit data using the Hadoop TextOutputFormat. -result.output(hadoopOF); - -// Execute Program -env.execute("Hadoop WordCount"); -``` - {{< top >}} diff --git a/docs/content/docs/dev/dataset/hadoop_map_reduce.md b/docs/content/docs/dev/dataset/hadoop_map_reduce.md index c75c3c5..193217f 100644 --- a/docs/content/docs/dev/dataset/hadoop_map_reduce.md +++ b/docs/content/docs/dev/dataset/hadoop_map_reduce.md @@ -42,19 +42,10 @@ This document shows how to use existing Hadoop MapReduce code with Flink. Please ## Project Configuration -Support for Hadoop input/output formats is part of the `flink-java` and -`flink-scala` Maven modules that are always required when writing Flink jobs. -The code is located in `org.apache.flink.api.java.hadoop` and -`org.apache.flink.api.scala.hadoop` in an additional sub-package for the -`mapred` and `mapreduce` API. - -Support for Hadoop Mappers and Reducers is contained in the `flink-hadoop-compatibility` +Support for Hadoop is contained in the `flink-hadoop-compatibility` Maven module. -This code resides in the `org.apache.flink.hadoopcompatibility` -package. -Add the following dependency to your `pom.xml` if you want to reuse Mappers -and Reducers. +Add the following dependency to your `pom.xml` to use hadoop ```xml <dependency>