[FLINK-1396][FLINK-1303] Hadoop Input/Output directly in API This adds methods on ExecutionEnvironment for reading with Hadoop Input/OutputFormat.
This also adds support in the Scala API for Hadoop Input/OutputFormats. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b3805ba Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b3805ba Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b3805ba Branch: refs/heads/master Commit: 8b3805ba5905c3d84f3e0631bc6090a618df8e90 Parents: 7bc78cb Author: Aljoscha Krettek <[email protected]> Authored: Wed Jan 28 15:13:30 2015 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Feb 9 13:38:10 2015 +0100 ---------------------------------------------------------------------- docs/_includes/sidenav.html | 2 +- docs/css/main/main.css | 12 +- docs/hadoop_compatibility.md | 112 ++++-- flink-java/pom.xml | 22 ++ .../java/org/apache/flink/api/java/DataSet.java | 3 +- .../flink/api/java/ExecutionEnvironment.java | 64 ++++ .../java/hadoop/mapred/HadoopInputFormat.java | 55 +++ .../hadoop/mapred/HadoopInputFormatBase.java | 253 ++++++++++++++ .../java/hadoop/mapred/HadoopOutputFormat.java | 37 ++ .../hadoop/mapred/HadoopOutputFormatBase.java | 165 +++++++++ .../java/hadoop/mapred/utils/HadoopUtils.java | 154 +++++++++ .../mapred/wrapper/HadoopDummyProgressable.java | 33 ++ .../mapred/wrapper/HadoopDummyReporter.java | 70 ++++ .../hadoop/mapred/wrapper/HadoopInputSplit.java | 138 ++++++++ .../hadoop/mapreduce/HadoopInputFormat.java | 60 ++++ .../hadoop/mapreduce/HadoopInputFormatBase.java | 289 ++++++++++++++++ .../hadoop/mapreduce/HadoopOutputFormat.java | 41 +++ .../mapreduce/HadoopOutputFormatBase.java | 203 +++++++++++ .../hadoop/mapreduce/utils/HadoopUtils.java | 82 +++++ .../mapreduce/wrapper/HadoopInputSplit.java | 125 +++++++ .../hadoop/mapred/HadoopInputFormatTest.java | 82 +++++ .../hadoop/mapreduce/HadoopInputFormatTest.java | 84 +++++ .../flink/api/scala/ExecutionEnvironment.scala | 97 +++++- .../scala/hadoop/mapred/HadoopInputFormat.scala | 41 +++ .../hadoop/mapred/HadoopOutputFormat.scala | 29 ++ .../hadoop/mapreduce/HadoopInputFormat.scala | 42 +++ .../hadoop/mapreduce/HadoopOutputFormat.scala | 30 ++ .../mapred/HadoopInputFormat.java | 297 ---------------- .../mapred/HadoopMapFunction.java | 2 +- .../mapred/HadoopOutputFormat.java | 183 ---------- .../mapred/HadoopReduceCombineFunction.java | 2 +- .../mapred/HadoopReduceFunction.java | 2 +- .../example/HadoopMapredCompatWordCount.java | 4 +- .../mapred/record/HadoopRecordInputFormat.java | 6 +- .../mapred/record/HadoopRecordOutputFormat.java | 6 +- .../mapred/utils/HadoopUtils.java | 87 ----- .../mapred/wrapper/HadoopDummyProgressable.java | 33 -- .../mapred/wrapper/HadoopDummyReporter.java | 70 ---- .../mapred/wrapper/HadoopInputSplit.java | 138 -------- .../mapreduce/HadoopInputFormat.java | 338 ------------------- .../mapreduce/HadoopOutputFormat.java | 226 ------------- .../mapreduce/example/WordCount.java | 4 +- .../mapreduce/utils/HadoopUtils.java | 83 ----- .../mapreduce/wrapper/HadoopInputSplit.java | 125 ------- .../mapred/HadoopIOFormatsITCase.java | 221 ------------ .../mapred/HadoopInputFormatTest.java | 82 ----- .../mapreduce/HadoopInputFormatTest.java | 84 ----- flink-tests/pom.xml | 12 +- .../hadoop/mapred/HadoopIOFormatsITCase.java | 222 ++++++++++++ .../hadoop/mapred/WordCountMapredITCase.java | 118 +++++++ .../mapreduce/WordCountMapreduceITCase.java | 118 +++++++ .../hadoop/mapred/WordCountMapredITCase.scala | 67 ++++ .../mapreduce/WordCountMapreduceITCase.scala | 70 ++++ 53 files changed, 2898 insertions(+), 2027 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/docs/_includes/sidenav.html ---------------------------------------------------------------------- diff --git a/docs/_includes/sidenav.html b/docs/_includes/sidenav.html index 6a73f8d..3f65a76 100644 --- a/docs/_includes/sidenav.html +++ b/docs/_includes/sidenav.html @@ -16,7 +16,7 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> -<ul> +<ul id="flink-doc-sidenav"> <li><div class="sidenav-category"><a href="faq.html">FAQ</a></div></li> <li><div class="sidenav-category">Quickstart</div></li> <li><div class="sidenav-item"><a href="setup_quickstart.html">Setup</a></div></li> http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/docs/css/main/main.css ---------------------------------------------------------------------- diff --git a/docs/css/main/main.css b/docs/css/main/main.css index 5c74475..e5fd0f5 100755 --- a/docs/css/main/main.css +++ b/docs/css/main/main.css @@ -34,7 +34,11 @@ body { font-size: 100%; } -ul { +#markdown-toc li { + list-style-type: none; +} + +#flink-doc-sidenav li { list-style-type: none; } @@ -281,15 +285,15 @@ ul { .sidenav-category { font-weight: bold; font-size: larger; - margin-top: 10px; + padding-top: 10px; } .sidenav-item { - border-top: thin solid #AAAAAA; + padding-top: 5px; } .sidenav-item-bottom { - border-bottom: thin solid #AAAAAA; border-top: thin solid #AAAAAA; + padding-top: 5px; } /*---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/docs/hadoop_compatibility.md ---------------------------------------------------------------------- diff --git a/docs/hadoop_compatibility.md b/docs/hadoop_compatibility.md index 9b43022..cacca0f 100644 --- a/docs/hadoop_compatibility.md +++ b/docs/hadoop_compatibility.md @@ -23,7 +23,8 @@ under the License. * This will be replaced by the TOC {:toc} -Flink is compatible with many Apache Hadoop's MapReduce interfaces and allows to reuse a lot of code that was implemented for Hadoop MapReduce. +Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows +reusing code that was implemented for Hadoop MapReduce. You can: @@ -38,9 +39,19 @@ This document shows how to use existing Hadoop MapReduce code with Flink. Please ### Project Configuration -The Hadoop Compatibility Layer is part of the `flink-addons` Maven module. All relevant classes are located in the `org.apache.flink.hadoopcompatibility` package. It includes separate packages and classes for the Hadoop `mapred` and `mapreduce` APIs. +Support for Haddop input/output formats is part of the `flink-java` and +`flink-scala` Maven modules that are always required when writing Flink jobs. +The code is located in `org.apache.flink.api.java.hadoop` and +`org.apache.flink.api.scala.hadoop` in an additional sub-package for the +`mapred` and `mapreduce` API. -Add the following dependency to your `pom.xml` to use the Hadoop Compatibility Layer. +Support for Hadoop Mappers and Reducers is contained in the `flink-staging` +Maven module. +This code resides in the `org.apache.flink.hadoopcompatibility` +package. + +Add the following dependency to your `pom.xml` if you want to reuse Mappers +and Reducers. ~~~xml <dependency> @@ -52,56 +63,70 @@ Add the following dependency to your `pom.xml` to use the Hadoop Compatibility L ### Using Hadoop Data Types -Flink supports all Hadoop `Writable` and `WritableComparable` data types out-of-the-box. You do not need to include the Hadoop Compatibility dependency, if you only want to use your Hadoop data types. See the [Programming Guide](programming_guide.html#data-types) for more details. +Flink supports all Hadoop `Writable` and `WritableComparable` data types +out-of-the-box. You do not need to include the Hadoop Compatibility dependency, +if you only want to use your Hadoop data types. See the +[Programming Guide](programming_guide.html#data-types) for more details. ### Using Hadoop InputFormats -Flink provides a compatibility wrapper for Hadoop `InputFormats`. Any class that implements `org.apache.hadoop.mapred.InputFormat` or extends `org.apache.hadoop.mapreduce.InputFormat` is supported. Thus, Flink can handle Hadoop built-in formats such as `TextInputFormat` as well as external formats such as Hive's `HCatInputFormat`. Data read from Hadoop InputFormats is converted into a `DataSet<Tuple2<KEY,VALUE>>` where `KEY` is the key and `VALUE` is the value of the original Hadoop key-value pair. - -Flink's InputFormat wrappers are - -- `org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat` and -- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat` +Hadoop input formats can be used to create a data source by using +one of the methods `readHadoopFile` or `createHadoopInput` of the +`ExecutionEnvironment`. The former is used for input formats derived +from `FileInputFormat` while the latter has to be used for general purpose +input formats. -and can be used as regular Flink [InputFormats](programming_guide.html#data-sources). +The resulting `DataSet` contains 2-tuples where the first field +is the key and the second field is the value retrieved from the Hadoop +InputFormat. The following example shows how to use Hadoop's `TextInputFormat`. +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + ~~~java ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - -// Set up the Hadoop TextInputFormat. -Job job = Job.getInstance(); -HadoopInputFormat<LongWritable, Text> hadoopIF = - // create the Flink wrapper. - new HadoopInputFormat<LongWritable, Text>( - // create the Hadoop InputFormat, specify key and value type, and job. - new TextInputFormat(), LongWritable.class, Text.class, job - ); -TextInputFormat.addInputPath(job, new Path(inputPath)); - -// Read data using the Hadoop TextInputFormat. -DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF); + +DataSet<Tuple2<LongWritable, Text>> input = + env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath); // Do something with the data. [...] ~~~ -### Using Hadoop OutputFormats +</div> +<div data-lang="scala" markdown="1"> -Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class that implements `org.apache.hadoop.mapred.OutputFormat` or extends `org.apache.hadoop.mapreduce.OutputFormat` is supported. The OutputFormat wrapper expects its input data to be a `DataSet<Tuple2<KEY,VALUE>>` where `KEY` is the key and `VALUE` is the value of the Hadoop key-value pair that is processed by the Hadoop OutputFormat. +~~~scala +val env = ExecutionEnvironment.getExecutionEnvironment + +val input: DataSet[(LongWritable, Text)] = + env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) -Flink's OUtputFormat wrappers are +// Do something with the data. +[...] +~~~ + +</div> -- `org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat` and -- `org.apache.flink.hadoopcompatibility.mapreduce.HadoopOutputFormat` +</div> + +### Using Hadoop OutputFormats -and can be used as regular Flink [OutputFormats](programming_guide.html#data-sinks). +Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class +that implements `org.apache.hadoop.mapred.OutputFormat` or extends +`org.apache.hadoop.mapreduce.OutputFormat` is supported. +The OutputFormat wrapper expects its input data to be a DataSet containing +2-tuples of key and value. These are to be processed by the Hadoop OutputFormat. The following example shows how to use Hadoop's `TextOutputFormat`. +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + ~~~java -// Obtain your result to emit. +// Obtain the result we want to emit DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...] // Set up the Hadoop TextOutputFormat. @@ -115,9 +140,32 @@ hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " TextOutputFormat.setOutputPath(job, new Path(outputPath)); // Emit data using the Hadoop TextOutputFormat. -result.output(hadoopOF); +hadoopResult.output(hadoopOF); ~~~ +</div> +<div data-lang="scala" markdown="1"> + +~~~scala +// Obtain your result to emit. +val hadoopResult: DataSet[(Text, IntWritable)] = [...] + +val hadoopOF = new HadoopOutputFormat[Text,IntWritable]( + new TextOutputFormat[Text, IntWritable], + new JobConf) + +hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ") +FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath)) + +hadoopResult.output(hadoopOF) + + +~~~ + +</div> + +</div> + ### Using Hadoop Mappers and Reducers Hadoop Mappers are semantically equivalent to Flink's [FlatMapFunctions](dataset_transformations.html#flatmap) and Hadoop Reducers are equivalent to Flink's [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset). Flink provides wrappers for implementations of Hadoop MapReduce's `Mapper` and `Reducer` interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop's mapred API (`org.apache.hadoop.mapred`) are supported. http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/pom.xml ---------------------------------------------------------------------- diff --git a/flink-java/pom.xml b/flink-java/pom.xml index fa5a1d6..3f668ce 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -119,6 +119,12 @@ under the License. <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> + <!--<exclusions>--> + <!--<exclusion>--> + <!--<groupId>*</groupId>--> + <!--<artifactId>*</artifactId>--> + <!--</exclusion>--> + <!--</exclusions>--> </dependency> </dependencies> </profile> @@ -134,6 +140,22 @@ under the License. <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> + <!--<exclusions>--> + <!--<exclusion>--> + <!--<groupId>*</groupId>--> + <!--<artifactId>*</artifactId>--> + <!--</exclusion>--> + <!--</exclusions>--> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <!--<exclusions>--> + <!--<exclusion>--> + <!--<groupId>*</groupId>--> + <!--<artifactId>*</artifactId>--> + <!--</exclusion>--> + <!--</exclusions>--> </dependency> </dependencies> </profile> http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 82e37ac..e5bb1fd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -1307,8 +1307,7 @@ public abstract class DataSet<T> { this.context.registerDataSink(sink); return sink; } - - + // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 05218e5..af8095c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -40,6 +40,7 @@ import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; import org.apache.flink.api.java.io.CollectionInputFormat; import org.apache.flink.api.java.io.CsvReader; import org.apache.flink.api.java.io.IteratorInputFormat; @@ -61,6 +62,8 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.types.StringValue; import org.apache.flink.util.NumberSequenceIterator; import org.apache.flink.util.SplittableIterator; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; /** * The ExecutionEnviroment is the context in which a program is executed. A @@ -458,6 +461,67 @@ public abstract class ExecutionEnvironment { return new DataSource<X>(this, inputFormat, producedType, Utils.getCallLocationName()); } + + // ----------------------------------- Hadoop Input Format --------------------------------------- + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. The + * given inputName is set on the given job. + */ + public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) { + DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job); + + org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath)); + + return result; + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A + * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created. + */ + public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) { + return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf()); + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}. + */ + public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) { + HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<K, V>(mapredInputFormat, key, value, job); + + return this.createInput(hadoopInputFormat); + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The + * given inputName is set on the given job. + */ + public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException { + DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job); + + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache + .hadoop.fs.Path(inputPath)); + + return result; + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A + * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created. + */ + public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException { + return readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance()); + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}. + */ + public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, Job job) { + org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V>(mapredInputFormat, key, value, job); + + return this.createInput(hadoopInputFormat); + } // ----------------------------------- Collection --------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java new file mode 100644 index 0000000..8b25249 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred; + +import java.io.IOException; + +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.hadoop.mapred.JobConf; + +public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K,V>> implements ResultTypeQueryable<Tuple2<K,V>> { + + public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) { + super(mapredInputFormat, key, value, job); + } + + @Override + public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException { + if(!fetched) { + fetchNext(); + } + if(!hasNext) { + return null; + } + record.f0 = key; + record.f1 = value; + fetched = false; + return record; + } + + @Override + public TypeInformation<Tuple2<K,V>> getProducedType() { + return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java new file mode 100644 index 0000000..40f6631 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred; + +import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; + +public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, HadoopInputSplit> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class); + + private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat; + protected Class<K> keyClass; + protected Class<V> valueClass; + private JobConf jobConf; + + protected transient K key; + protected transient V value; + + private transient RecordReader<K, V> recordReader; + protected transient boolean fetched = false; + protected transient boolean hasNext; + + public HadoopInputFormatBase(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) { + super(); + this.mapredInputFormat = mapredInputFormat; + this.keyClass = key; + this.valueClass = value; + HadoopUtils.mergeHadoopConf(job); + this.jobConf = job; + ReflectionUtils.setConf(mapredInputFormat, jobConf); + } + + public JobConf getJobConf() { + return jobConf; + } + + // -------------------------------------------------------------------------------------------- + // InputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(Configuration parameters) { + // nothing to do + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { + // only gather base statistics for FileInputFormats + if(!(mapredInputFormat instanceof FileInputFormat)) { + return null; + } + + final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ? + (FileBaseStatistics) cachedStats : null; + + try { + final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf); + + return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1)); + } catch (IOException ioex) { + if (LOG.isWarnEnabled()) { + LOG.warn("Could not determine statistics due to an io error: " + + ioex.getMessage()); + } + } catch (Throwable t) { + if (LOG.isErrorEnabled()) { + LOG.error("Unexpected problem while getting the file statistics: " + + t.getMessage(), t); + } + } + + // no statistics available + return null; + } + + @Override + public HadoopInputSplit[] createInputSplits(int minNumSplits) + throws IOException { + org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits); + HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length]; + for(int i=0;i<splitArray.length;i++){ + hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf); + } + return hiSplit; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { + return new LocatableInputSplitAssigner(inputSplits); + } + + @Override + public void open(HadoopInputSplit split) throws IOException { + this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter()); + if (this.recordReader instanceof Configurable) { + ((Configurable) this.recordReader).setConf(jobConf); + } + key = this.recordReader.createKey(); + value = this.recordReader.createValue(); + this.fetched = false; + } + + @Override + public boolean reachedEnd() throws IOException { + if(!fetched) { + fetchNext(); + } + return !hasNext; + } + + protected void fetchNext() throws IOException { + hasNext = this.recordReader.next(key, value); + fetched = true; + } + + @Override + public void close() throws IOException { + this.recordReader.close(); + } + + // -------------------------------------------------------------------------------------------- + // Helper methods + // -------------------------------------------------------------------------------------------- + + private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, + ArrayList<FileStatus> files) throws IOException { + + long latestModTime = 0L; + + // get the file info and check whether the cached statistics are still valid. + for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) { + + final Path filePath = new Path(hadoopPath.toUri()); + final FileSystem fs = FileSystem.get(filePath.toUri()); + + final FileStatus file = fs.getFileStatus(filePath); + latestModTime = Math.max(latestModTime, file.getModificationTime()); + + // enumerate all files and check their modification time stamp. + if (file.isDir()) { + FileStatus[] fss = fs.listStatus(filePath); + files.ensureCapacity(files.size() + fss.length); + + for (FileStatus s : fss) { + if (!s.isDir()) { + files.add(s); + latestModTime = Math.max(s.getModificationTime(), latestModTime); + } + } + } else { + files.add(file); + } + } + + // check whether the cached statistics are still valid, if we have any + if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { + return cachedStats; + } + + // calculate the whole length + long len = 0; + for (FileStatus s : files) { + len += s.getLen(); + } + + // sanity check + if (len <= 0) { + len = BaseStatistics.SIZE_UNKNOWN; + } + + return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); + } + + // -------------------------------------------------------------------------------------------- + // Custom serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(mapredInputFormat.getClass().getName()); + out.writeUTF(keyClass.getName()); + out.writeUTF(valueClass.getName()); + jobConf.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + String hadoopInputFormatClassName = in.readUTF(); + String keyClassName = in.readUTF(); + String valueClassName = in.readUTF(); + if(jobConf == null) { + jobConf = new JobConf(); + } + jobConf.readFields(in); + try { + this.mapredInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate the hadoop input format", e); + } + try { + this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Unable to find key class.", e); + } + try { + this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Unable to find value class.", e); + } + ReflectionUtils.setConf(mapredInputFormat, jobConf); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java new file mode 100644 index 0000000..75623e2 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred; + +import java.io.IOException; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.hadoop.mapred.JobConf; + +public class HadoopOutputFormat<K,V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> { + + public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) { + super(mapredOutputFormat, job); + } + + @Override + public void writeRecord(Tuple2<K, V> record) throws IOException { + this.recordWriter.write(record.f0, record.f1); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java new file mode 100644 index 0000000..a59b96f --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable; +import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; +import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.mapred.FileOutputCommitter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.util.ReflectionUtils; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + + +public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>, FinalizeOnMaster { + + private static final long serialVersionUID = 1L; + + private JobConf jobConf; + private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat; + protected transient RecordWriter<K,V> recordWriter; + private transient FileOutputCommitter fileOutputCommitter; + private transient TaskAttemptContext context; + private transient JobContext jobContext; + + public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) { + super(); + this.mapredOutputFormat = mapredOutputFormat; + HadoopUtils.mergeHadoopConf(job); + this.jobConf = job; + } + + public JobConf getJobConf() { + return jobConf; + } + + // -------------------------------------------------------------------------------------------- + // OutputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(Configuration parameters) { + // nothing to do + } + + /** + * create the temporary output file for hadoop RecordWriter. + * @param taskNumber The number of the parallel instance. + * @param numTasks The number of parallel tasks. + * @throws java.io.IOException + */ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + if (Integer.toString(taskNumber + 1).length() > 6) { + throw new IOException("Task id too large."); + } + + TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" + + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") + + Integer.toString(taskNumber + 1) + + "_0"); + + this.jobConf.set("mapred.task.id", taskAttemptID.toString()); + this.jobConf.setInt("mapred.task.partition", taskNumber + 1); + // for hadoop 2.2 + this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString()); + this.jobConf.setInt("mapreduce.task.partition", taskNumber + 1); + + try { + this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID); + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.fileOutputCommitter = new FileOutputCommitter(); + + try { + this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.fileOutputCommitter.setupJob(jobContext); + + this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable()); + } + + /** + * commit the task by moving the output file out from the temporary directory. + * @throws java.io.IOException + */ + @Override + public void close() throws IOException { + this.recordWriter.close(new HadoopDummyReporter()); + + if (this.fileOutputCommitter.needsTaskCommit(this.context)) { + this.fileOutputCommitter.commitTask(this.context); + } + } + + @Override + public void finalizeGlobal(int parallelism) throws IOException { + + try { + JobContext jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); + FileOutputCommitter fileOutputCommitter = new FileOutputCommitter(); + + // finalize HDFS output format + fileOutputCommitter.commitJob(jobContext); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // -------------------------------------------------------------------------------------------- + // Custom serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(mapredOutputFormat.getClass().getName()); + jobConf.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + String hadoopOutputFormatName = in.readUTF(); + if(jobConf == null) { + jobConf = new JobConf(); + } + jobConf.readFields(in); + try { + this.mapredOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(hadoopOutputFormatName, true, Thread.currentThread().getContextClassLoader()).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate the hadoop output format", e); + } + ReflectionUtils.setConf(mapredOutputFormat, jobConf); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java new file mode 100644 index 0000000..d4dc297 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred.utils; + +import java.io.File; +import java.lang.reflect.Constructor; +import java.util.Map; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class HadoopUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HadoopUtils.class); + + /** + * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration. + */ + public static void mergeHadoopConf(JobConf jobConf) { + org.apache.hadoop.conf.Configuration hadoopConf = getHadoopConfiguration(); + for (Map.Entry<String, String> e : hadoopConf) { + jobConf.set(e.getKey(), e.getValue()); + } + } + + public static JobContext instantiateJobContext(JobConf jobConf, JobID jobId) throws Exception { + try { + // for Hadoop 1.xx + Class<?> clazz = null; + if(!TaskAttemptContext.class.isInterface()) { + clazz = Class.forName("org.apache.hadoop.mapred.JobContext", true, Thread.currentThread().getContextClassLoader()); + } + // for Hadoop 2.xx + else { + clazz = Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, Thread.currentThread().getContextClassLoader()); + } + Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, org.apache.hadoop.mapreduce.JobID.class); + // for Hadoop 1.xx + constructor.setAccessible(true); + JobContext context = (JobContext) constructor.newInstance(jobConf, jobId); + + return context; + } catch(Exception e) { + throw new Exception("Could not create instance of JobContext.", e); + } + } + + public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf, TaskAttemptID taskAttemptID) throws Exception { + try { + // for Hadoop 1.xx + Class<?> clazz = null; + if(!TaskAttemptContext.class.isInterface()) { + clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader()); + } + // for Hadoop 2.xx + else { + clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader()); + } + Constructor<?> constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class); + // for Hadoop 1.xx + constructor.setAccessible(true); + TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID); + return context; + } catch(Exception e) { + throw new Exception("Could not create instance of TaskAttemptContext.", e); + } + } + + /** + * Returns a new Hadoop Configuration object using the path to the hadoop conf configured + * in the main configuration (flink-conf.yaml). + * This method is public because its being used in the HadoopDataSource. + */ + public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() { + Configuration retConf = new org.apache.hadoop.conf.Configuration(); + + // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and + // the hdfs configuration + // Try to load HDFS configuration from Hadoop's own configuration files + // 1. approach: Flink configuration + final String hdfsDefaultPath = GlobalConfiguration.getString(ConfigConstants + .HDFS_DEFAULT_CONFIG, null); + if (hdfsDefaultPath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); + } else { + LOG.debug("Cannot find hdfs-default configuration file"); + } + + final String hdfsSitePath = GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); + if (hdfsSitePath != null) { + retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); + } else { + LOG.debug("Cannot find hdfs-site configuration file"); + } + + // 2. Approach environment variables + String[] possibleHadoopConfPaths = new String[4]; + possibleHadoopConfPaths[0] = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); + + if (System.getenv("HADOOP_HOME") != null) { + possibleHadoopConfPaths[2] = System.getenv("HADOOP_HOME")+"/conf"; + possibleHadoopConfPaths[3] = System.getenv("HADOOP_HOME")+"/etc/hadoop"; // hadoop 2.2 + } + + for (String possibleHadoopConfPath : possibleHadoopConfPaths) { + if (possibleHadoopConfPath != null) { + if (new File(possibleHadoopConfPath).exists()) { + if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { + retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml")); + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration"); + } + } + if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) { + retConf.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml")); + + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration"); + } + } + } + } + } + return retConf; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java new file mode 100644 index 0000000..215b890 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred.wrapper; + +import org.apache.hadoop.util.Progressable; + +/** + * This is a dummy progress + * + */ +public class HadoopDummyProgressable implements Progressable { + @Override + public void progress() { + + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java new file mode 100644 index 0000000..01104ac --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred.wrapper; + +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.Reporter; + +/** + * This is a dummy progress monitor / reporter + * + */ +public class HadoopDummyReporter implements Reporter { + + @Override + public void progress() { + } + + @Override + public void setStatus(String status) { + + } + + @Override + public Counter getCounter(Enum<?> name) { + return null; + } + + @Override + public Counter getCounter(String group, String name) { + return null; + } + + @Override + public void incrCounter(Enum<?> key, long amount) { + + } + + @Override + public void incrCounter(String group, String counter, long amount) { + + } + + @Override + public InputSplit getInputSplit() throws UnsupportedOperationException { + return null; + } + // There should be an @Override, but some CDH4 dependency does not contain this method + public float getProgress() { + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java new file mode 100644 index 0000000..beef5d7 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapred.wrapper; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import org.apache.flink.core.io.LocatableInputSplit; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.mapred.JobConf; + + +public class HadoopInputSplit extends LocatableInputSplit { + + private static final long serialVersionUID = 1L; + + + private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit; + + private JobConf jobConf; + + private int splitNumber; + private String hadoopInputSplitTypeName; + + + public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() { + return hadoopInputSplit; + } + + public HadoopInputSplit() { + super(); + } + + public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) { + + this.splitNumber = splitNumber; + this.hadoopInputSplit = hInputSplit; + this.hadoopInputSplitTypeName = hInputSplit.getClass().getName(); + this.jobConf = jobconf; + + } + + @Override + public void write(DataOutputView out) throws IOException { + out.writeInt(splitNumber); + out.writeUTF(hadoopInputSplitTypeName); + jobConf.write(out); + hadoopInputSplit.write(out); + } + + @Override + public void read(DataInputView in) throws IOException { + this.splitNumber = in.readInt(); + this.hadoopInputSplitTypeName = in.readUTF(); + if(hadoopInputSplit == null) { + try { + Class<? extends org.apache.hadoop.io.Writable> inputSplit = + Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class); + this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit ); + } + catch (Exception e) { + throw new RuntimeException("Unable to create InputSplit", e); + } + } + jobConf = new JobConf(); + jobConf.readFields(in); + if (this.hadoopInputSplit instanceof Configurable) { + ((Configurable) this.hadoopInputSplit).setConf(this.jobConf); + } + this.hadoopInputSplit.readFields(in); + + } + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeInt(splitNumber); + out.writeUTF(hadoopInputSplitTypeName); + jobConf.write(out); + hadoopInputSplit.write(out); + + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + this.splitNumber=in.readInt(); + this.hadoopInputSplitTypeName = in.readUTF(); + if(hadoopInputSplit == null) { + try { + Class<? extends org.apache.hadoop.io.Writable> inputSplit = + Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class); + this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance( inputSplit ); + } + catch (Exception e) { + throw new RuntimeException("Unable to create InputSplit", e); + } + } + jobConf = new JobConf(); + jobConf.readFields(in); + if (this.hadoopInputSplit instanceof Configurable) { + ((Configurable) this.hadoopInputSplit).setConf(this.jobConf); + } + this.hadoopInputSplit.readFields(in); + } + + @Override + public int getSplitNumber() { + return this.splitNumber; + } + + @Override + public String[] getHostnames() { + try { + return this.hadoopInputSplit.getLocations(); + } catch(IOException ioe) { + return new String[0]; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java new file mode 100644 index 0000000..efe97f1 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapreduce; + +import java.io.IOException; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.hadoop.mapreduce.Job; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K, V>> implements ResultTypeQueryable<Tuple2<K,V>> { + + public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) { + super(mapreduceInputFormat, key, value, job); + } + + @Override + public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException { + if(!this.fetched) { + fetchNext(); + } + if(!this.hasNext) { + return null; + } + try { + record.f0 = recordReader.getCurrentKey(); + record.f1 = recordReader.getCurrentValue(); + } catch (InterruptedException e) { + throw new IOException("Could not get KeyValue pair.", e); + } + this.fetched = false; + + return record; + } + + @Override + public TypeInformation<Tuple2<K,V>> getProducedType() { + return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java new file mode 100644 index 0000000..2a6c0f4 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapreduce; + +import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.LocatableInputSplitAssigner; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; + +public abstract class HadoopInputFormatBase<K, V, T> implements InputFormat<T, HadoopInputSplit> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class); + + private org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat; + protected Class<K> keyClass; + protected Class<V> valueClass; + private org.apache.hadoop.conf.Configuration configuration; + + protected transient RecordReader<K, V> recordReader; + protected boolean fetched = false; + protected boolean hasNext; + + public HadoopInputFormatBase(org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) { + super(); + this.mapreduceInputFormat = mapreduceInputFormat; + this.keyClass = key; + this.valueClass = value; + this.configuration = job.getConfiguration(); + HadoopUtils.mergeHadoopConf(configuration); + } + + public org.apache.hadoop.conf.Configuration getConfiguration() { + return this.configuration; + } + + // -------------------------------------------------------------------------------------------- + // InputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(Configuration parameters) { + // nothing to do + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { + // only gather base statistics for FileInputFormats + if(!(mapreduceInputFormat instanceof FileInputFormat)) { + return null; + } + + JobContext jobContext = null; + try { + jobContext = HadoopUtils.instantiateJobContext(configuration, null); + } catch (Exception e) { + throw new RuntimeException(e); + } + + final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ? + (FileBaseStatistics) cachedStats : null; + + try { + final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext); + return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1)); + } catch (IOException ioex) { + if (LOG.isWarnEnabled()) { + LOG.warn("Could not determine statistics due to an io error: " + + ioex.getMessage()); + } + } catch (Throwable t) { + if (LOG.isErrorEnabled()) { + LOG.error("Unexpected problem while getting the file statistics: " + + t.getMessage(), t); + } + } + + // no statistics available + return null; + } + + @Override + public HadoopInputSplit[] createInputSplits(int minNumSplits) + throws IOException { + configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits); + + JobContext jobContext = null; + try { + jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + List<org.apache.hadoop.mapreduce.InputSplit> splits; + try { + splits = this.mapreduceInputFormat.getSplits(jobContext); + } catch (InterruptedException e) { + throw new IOException("Could not get Splits.", e); + } + HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()]; + + for(int i = 0; i < hadoopInputSplits.length; i++){ + hadoopInputSplits[i] = new HadoopInputSplit(i, splits.get(i), jobContext); + } + return hadoopInputSplits; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { + return new LocatableInputSplitAssigner(inputSplits); + } + + @Override + public void open(HadoopInputSplit split) throws IOException { + TaskAttemptContext context = null; + try { + context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID()); + } catch(Exception e) { + throw new RuntimeException(e); + } + + try { + this.recordReader = this.mapreduceInputFormat + .createRecordReader(split.getHadoopInputSplit(), context); + this.recordReader.initialize(split.getHadoopInputSplit(), context); + } catch (InterruptedException e) { + throw new IOException("Could not create RecordReader.", e); + } finally { + this.fetched = false; + } + } + + @Override + public boolean reachedEnd() throws IOException { + if(!this.fetched) { + fetchNext(); + } + return !this.hasNext; + } + + protected void fetchNext() throws IOException { + try { + this.hasNext = this.recordReader.nextKeyValue(); + } catch (InterruptedException e) { + throw new IOException("Could not fetch next KeyValue pair.", e); + } finally { + this.fetched = true; + } + } + + @Override + public void close() throws IOException { + this.recordReader.close(); + } + + // -------------------------------------------------------------------------------------------- + // Helper methods + // -------------------------------------------------------------------------------------------- + + private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, + ArrayList<FileStatus> files) throws IOException { + + long latestModTime = 0L; + + // get the file info and check whether the cached statistics are still valid. + for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) { + + final Path filePath = new Path(hadoopPath.toUri()); + final FileSystem fs = FileSystem.get(filePath.toUri()); + + final FileStatus file = fs.getFileStatus(filePath); + latestModTime = Math.max(latestModTime, file.getModificationTime()); + + // enumerate all files and check their modification time stamp. + if (file.isDir()) { + FileStatus[] fss = fs.listStatus(filePath); + files.ensureCapacity(files.size() + fss.length); + + for (FileStatus s : fss) { + if (!s.isDir()) { + files.add(s); + latestModTime = Math.max(s.getModificationTime(), latestModTime); + } + } + } else { + files.add(file); + } + } + + // check whether the cached statistics are still valid, if we have any + if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { + return cachedStats; + } + + // calculate the whole length + long len = 0; + for (FileStatus s : files) { + len += s.getLen(); + } + + // sanity check + if (len <= 0) { + len = BaseStatistics.SIZE_UNKNOWN; + } + + return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); + } + + // -------------------------------------------------------------------------------------------- + // Custom serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(this.mapreduceInputFormat.getClass().getName()); + out.writeUTF(this.keyClass.getName()); + out.writeUTF(this.valueClass.getName()); + this.configuration.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + String hadoopInputFormatClassName = in.readUTF(); + String keyClassName = in.readUTF(); + String valueClassName = in.readUTF(); + + org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); + configuration.readFields(in); + + if(this.configuration == null) { + this.configuration = configuration; + } + + try { + this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate the hadoop input format", e); + } + try { + this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Unable to find key class.", e); + } + try { + this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Unable to find value class.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java new file mode 100644 index 0000000..7d3675c --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapreduce; + +import java.io.IOException; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.hadoop.mapreduce.Job; + +public class HadoopOutputFormat<K, V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> { + + public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat, Job job) { + super(mapreduceOutputFormat, job); + } + + @Override + public void writeRecord(Tuple2<K, V> record) throws IOException { + try { + this.recordWriter.write(record.f0, record.f1); + } catch (InterruptedException e) { + throw new IOException("Could not write Record.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java new file mode 100644 index 0000000..a7ae428 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.hadoop.mapreduce; + +import org.apache.flink.api.common.io.FinalizeOnMaster; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + + +public abstract class HadoopOutputFormatBase<K, V, T> implements OutputFormat<T>, FinalizeOnMaster { + + private static final long serialVersionUID = 1L; + + private org.apache.hadoop.conf.Configuration configuration; + private org.apache.hadoop.mapreduce.OutputFormat<K,V> mapreduceOutputFormat; + protected transient RecordWriter<K,V> recordWriter; + private transient FileOutputCommitter fileOutputCommitter; + private transient TaskAttemptContext context; + private transient int taskNumber; + + public HadoopOutputFormatBase(org.apache.hadoop.mapreduce.OutputFormat<K, V> mapreduceOutputFormat, Job job) { + super(); + this.mapreduceOutputFormat = mapreduceOutputFormat; + this.configuration = job.getConfiguration(); + HadoopUtils.mergeHadoopConf(configuration); + } + + public org.apache.hadoop.conf.Configuration getConfiguration() { + return this.configuration; + } + + // -------------------------------------------------------------------------------------------- + // OutputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(Configuration parameters) { + // nothing to do + } + + /** + * create the temporary output file for hadoop RecordWriter. + * @param taskNumber The number of the parallel instance. + * @param numTasks The number of parallel tasks. + * @throws java.io.IOException + */ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + if (Integer.toString(taskNumber + 1).length() > 6) { + throw new IOException("Task id too large."); + } + + this.taskNumber = taskNumber+1; + + // for hadoop 2.2 + this.configuration.set("mapreduce.output.basename", "tmp"); + + TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" + + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") + + Integer.toString(taskNumber + 1) + + "_0"); + + this.configuration.set("mapred.task.id", taskAttemptID.toString()); + this.configuration.setInt("mapred.task.partition", taskNumber + 1); + // for hadoop 2.2 + this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString()); + this.configuration.setInt("mapreduce.task.partition", taskNumber + 1); + + try { + this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID); + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context); + + try { + this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID())); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1 + this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString()); + + try { + this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context); + } catch (InterruptedException e) { + throw new IOException("Could not create RecordWriter.", e); + } + } + + /** + * commit the task by moving the output file out from the temporary directory. + * @throws java.io.IOException + */ + @Override + public void close() throws IOException { + try { + this.recordWriter.close(this.context); + } catch (InterruptedException e) { + throw new IOException("Could not close RecordReader.", e); + } + + if (this.fileOutputCommitter.needsTaskCommit(this.context)) { + this.fileOutputCommitter.commitTask(this.context); + } + + Path outputPath = new Path(this.configuration.get("mapred.output.dir")); + + // rename tmp-file to final name + FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration); + + String taskNumberStr = Integer.toString(this.taskNumber); + String tmpFileTemplate = "tmp-r-00000"; + String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr; + + if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) { + fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr)); + } + } + + @Override + public void finalizeGlobal(int parallelism) throws IOException { + + JobContext jobContext; + TaskAttemptContext taskContext; + try { + + TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" + + String.format("%" + (6 - Integer.toString(1).length()) + "s"," ").replace(" ", "0") + + Integer.toString(1) + + "_0"); + + jobContext = HadoopUtils.instantiateJobContext(this.configuration, new JobID()); + taskContext = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID); + } catch (Exception e) { + throw new RuntimeException(e); + } + this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext); + + // finalize HDFS output format + this.fileOutputCommitter.commitJob(jobContext); + } + + // -------------------------------------------------------------------------------------------- + // Custom serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(this.mapreduceOutputFormat.getClass().getName()); + this.configuration.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + String hadoopOutputFormatClassName = in.readUTF(); + + org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); + configuration.readFields(in); + + if(this.configuration == null) { + this.configuration = configuration; + } + + try { + this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat<K,V>) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate the hadoop output format", e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8b3805ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java new file mode 100644 index 0000000..fe8f8cc --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.hadoop.mapreduce.utils; + +import java.lang.reflect.Constructor; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +public class HadoopUtils { + + /** + * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration. + */ + public static void mergeHadoopConf(Configuration configuration) { + Configuration hadoopConf = org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration(); + + for (Map.Entry<String, String> e : hadoopConf) { + configuration.set(e.getKey(), e.getValue()); + } + } + + public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception { + try { + Class<?> clazz; + // for Hadoop 1.xx + if(JobContext.class.isInterface()) { + clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader()); + } + // for Hadoop 2.xx + else { + clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader()); + } + Constructor<?> constructor = clazz.getConstructor(Configuration.class, JobID.class); + JobContext context = (JobContext) constructor.newInstance(configuration, jobId); + + return context; + } catch(Exception e) { + throw new Exception("Could not create instance of JobContext."); + } + } + + public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration, TaskAttemptID taskAttemptID) throws Exception { + try { + Class<?> clazz; + // for Hadoop 1.xx + if(JobContext.class.isInterface()) { + clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); + } + // for Hadoop 2.xx + else { + clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext"); + } + Constructor<?> constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class); + TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID); + + return context; + } catch(Exception e) { + throw new Exception("Could not create instance of TaskAttemptContext."); + } + } +}
