http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fault_tolerance.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/fault_tolerance.md b/docs/apis/batch/fault_tolerance.md new file mode 100644 index 0000000..51a6b41 --- /dev/null +++ b/docs/apis/batch/fault_tolerance.md @@ -0,0 +1,100 @@ +--- +title: "Fault Tolerance" + +# Sub-level navigation +sub-nav-group: batch +sub-nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Flink's fault tolerance mechanism recovers programs in the presence of failures and +continues to execute them. Such failures include machine hardware failures, network failures, +transient program failures, etc. + +* This will be replaced by the TOC +{:toc} + +Batch Processing Fault Tolerance (DataSet API) +---------------------------------------------- + +Fault tolerance for programs in the *DataSet API* works by retrying failed executions. +The number of time that Flink retries the execution before the job is declared as failed is configurable +via the *execution retries* parameter. A value of *0* effectively means that fault tolerance is deactivated. + +To activate the fault tolerance, set the *execution retries* to a value larger than zero. A common choice is a value +of three. + +This example shows how to configure the execution retries for a Flink DataSet program. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +env.setNumberOfExecutionRetries(3); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = ExecutionEnvironment.getExecutionEnvironment() +env.setNumberOfExecutionRetries(3) +{% endhighlight %} +</div> +</div> + + +You can also define default values for the number of execution retries and the retry delay in the `flink-conf.yaml`: + +~~~ +execution-retries.default: 3 +~~~ + + +Retry Delays +------------ + +Execution retries can be configured to be delayed. Delaying the retry means that after a failed execution, the re-execution does not start +immediately, but only after a certain delay. + +Delaying the retries can be helpful when the program interacts with external systems where for example connections or pending transactions should reach a timeout before re-execution is attempted. + +You can set the retry delay for each program as follows (the sample shows the DataStream API - the DataSet API works similarly): + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.getConfig().setExecutionRetryDelay(5000); // 5000 milliseconds delay +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.getConfig.setExecutionRetryDelay(5000) // 5000 milliseconds delay +{% endhighlight %} +</div> +</div> + +You can also define the default value for the retry delay in the `flink-conf.yaml`: + +~~~ +execution-retries.delay: 10 s +~~~ + +{% top %}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/LICENSE.txt ---------------------------------------------------------------------- diff --git a/docs/apis/batch/fig/LICENSE.txt b/docs/apis/batch/fig/LICENSE.txt new file mode 100644 index 0000000..35b8673 --- /dev/null +++ b/docs/apis/batch/fig/LICENSE.txt @@ -0,0 +1,17 @@ +All image files in the folder and its subfolders are +licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_delta_iterate_operator.png ---------------------------------------------------------------------- diff --git a/docs/apis/batch/fig/iterations_delta_iterate_operator.png b/docs/apis/batch/fig/iterations_delta_iterate_operator.png new file mode 100644 index 0000000..470485a Binary files /dev/null and b/docs/apis/batch/fig/iterations_delta_iterate_operator.png differ http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png ---------------------------------------------------------------------- diff --git a/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png b/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png new file mode 100644 index 0000000..15f2b54 Binary files /dev/null and b/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png differ http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_iterate_operator.png ---------------------------------------------------------------------- diff --git a/docs/apis/batch/fig/iterations_iterate_operator.png b/docs/apis/batch/fig/iterations_iterate_operator.png new file mode 100644 index 0000000..aaf4158 Binary files /dev/null and b/docs/apis/batch/fig/iterations_iterate_operator.png differ http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_iterate_operator_example.png ---------------------------------------------------------------------- diff --git a/docs/apis/batch/fig/iterations_iterate_operator_example.png b/docs/apis/batch/fig/iterations_iterate_operator_example.png new file mode 100644 index 0000000..be4841c Binary files /dev/null and b/docs/apis/batch/fig/iterations_iterate_operator_example.png differ http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_supersteps.png ---------------------------------------------------------------------- diff --git a/docs/apis/batch/fig/iterations_supersteps.png b/docs/apis/batch/fig/iterations_supersteps.png new file mode 100644 index 0000000..331dbc7 Binary files /dev/null and b/docs/apis/batch/fig/iterations_supersteps.png differ http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/plan_visualizer.png ---------------------------------------------------------------------- diff --git a/docs/apis/batch/fig/plan_visualizer.png b/docs/apis/batch/fig/plan_visualizer.png new file mode 100644 index 0000000..85b8c55 Binary files /dev/null and b/docs/apis/batch/fig/plan_visualizer.png differ http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/hadoop_compatibility.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/hadoop_compatibility.md b/docs/apis/batch/hadoop_compatibility.md new file mode 100644 index 0000000..187aa6b --- /dev/null +++ b/docs/apis/batch/hadoop_compatibility.md @@ -0,0 +1,249 @@ +--- +title: "Hadoop Compatibility" +is_beta: true +# Sub-level navigation +sub-nav-group: batch +sub-nav-pos: 7 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Flink is compatible with Apache Hadoop MapReduce interfaces and therefore allows +reusing code that was implemented for Hadoop MapReduce. + +You can: + +- use Hadoop's `Writable` [data types](programming_guide.html#data-types) in Flink programs. +- use any Hadoop `InputFormat` as a [DataSource](programming_guide.html#data-sources). +- use any Hadoop `OutputFormat` as a [DataSink](programming_guide.html#data-sinks). +- use a Hadoop `Mapper` as [FlatMapFunction](dataset_transformations.html#flatmap). +- use a Hadoop `Reducer` as [GroupReduceFunction](dataset_transformations.html#groupreduce-on-grouped-dataset). + +This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to the +[Connecting to other systems]({{ site.baseurl }}/apis/connectors.html) guide for reading from Hadoop supported file systems. + +* This will be replaced by the TOC +{:toc} + +### Project Configuration + +Support for Haddop input/output formats is part of the `flink-java` and +`flink-scala` Maven modules that are always required when writing Flink jobs. +The code is located in `org.apache.flink.api.java.hadoop` and +`org.apache.flink.api.scala.hadoop` in an additional sub-package for the +`mapred` and `mapreduce` API. + +Support for Hadoop Mappers and Reducers is contained in the `flink-hadoop-compatibility` +Maven module. +This code resides in the `org.apache.flink.hadoopcompatibility` +package. + +Add the following dependency to your `pom.xml` if you want to reuse Mappers +and Reducers. + +~~~xml +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-hadoop-compatibility</artifactId> + <version>{{site.version}}</version> +</dependency> +~~~ + +### Using Hadoop Data Types + +Flink supports all Hadoop `Writable` and `WritableComparable` data types +out-of-the-box. You do not need to include the Hadoop Compatibility dependency, +if you only want to use your Hadoop data types. See the +[Programming Guide](programming_guide.html#data-types) for more details. + +### Using Hadoop InputFormats + +Hadoop input formats can be used to create a data source by using +one of the methods `readHadoopFile` or `createHadoopInput` of the +`ExecutionEnvironment`. The former is used for input formats derived +from `FileInputFormat` while the latter has to be used for general purpose +input formats. + +The resulting `DataSet` contains 2-tuples where the first field +is the key and the second field is the value retrieved from the Hadoop +InputFormat. + +The following example shows how to use Hadoop's `TextInputFormat`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + +~~~java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +DataSet<Tuple2<LongWritable, Text>> input = + env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath); + +// Do something with the data. +[...] +~~~ + +</div> +<div data-lang="scala" markdown="1"> + +~~~scala +val env = ExecutionEnvironment.getExecutionEnvironment + +val input: DataSet[(LongWritable, Text)] = + env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) + +// Do something with the data. +[...] +~~~ + +</div> + +</div> + +### Using Hadoop OutputFormats + +Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class +that implements `org.apache.hadoop.mapred.OutputFormat` or extends +`org.apache.hadoop.mapreduce.OutputFormat` is supported. +The OutputFormat wrapper expects its input data to be a DataSet containing +2-tuples of key and value. These are to be processed by the Hadoop OutputFormat. + +The following example shows how to use Hadoop's `TextOutputFormat`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + +~~~java +// Obtain the result we want to emit +DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...] + +// Set up the Hadoop TextOutputFormat. +HadoopOutputFormat<Text, IntWritable> hadoopOF = + // create the Flink wrapper. + new HadoopOutputFormat<Text, IntWritable>( + // set the Hadoop OutputFormat and specify the job. + new TextOutputFormat<Text, IntWritable>(), job + ); +hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); +TextOutputFormat.setOutputPath(job, new Path(outputPath)); + +// Emit data using the Hadoop TextOutputFormat. +hadoopResult.output(hadoopOF); +~~~ + +</div> +<div data-lang="scala" markdown="1"> + +~~~scala +// Obtain your result to emit. +val hadoopResult: DataSet[(Text, IntWritable)] = [...] + +val hadoopOF = new HadoopOutputFormat[Text,IntWritable]( + new TextOutputFormat[Text, IntWritable], + new JobConf) + +hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ") +FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath)) + +hadoopResult.output(hadoopOF) + + +~~~ + +</div> + +</div> + +### Using Hadoop Mappers and Reducers + +Hadoop Mappers are semantically equivalent to Flink's [FlatMapFunctions](dataset_transformations.html#flatmap) and Hadoop Reducers are equivalent to Flink's [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset). Flink provides wrappers for implementations of Hadoop MapReduce's `Mapper` and `Reducer` interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in regular Flink programs. At the moment, only the Mapper and Reduce interfaces of Hadoop's mapred API (`org.apache.hadoop.mapred`) are supported. + +The wrappers take a `DataSet<Tuple2<KEYIN,VALUEIN>>` as input and produce a `DataSet<Tuple2<KEYOUT,VALUEOUT>>` as output where `KEYIN` and `KEYOUT` are the keys and `VALUEIN` and `VALUEOUT` are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (`HadoopReduceCombineFunction`) and without a Combiner (`HadoopReduceFunction`). The wrappers accept an optional `JobConf` object to configure the Hadoop Mapper or Reducer. + +Flink's function wrappers are + +- `org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction`, +- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction`, and +- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction`. + +and can be used as regular Flink [FlatMapFunctions](dataset_transformations.html#flatmap) or [GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset). + +The following example shows how to use Hadoop `Mapper` and `Reducer` functions. + +~~~java +// Obtain data to process somehow. +DataSet<Tuple2<Text, LongWritable>> text = [...] + +DataSet<Tuple2<Text, LongWritable>> result = text + // use Hadoop Mapper (Tokenizer) as MapFunction + .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>( + new Tokenizer() + )) + .groupBy(0) + // use Hadoop Reducer (Counter) as Reduce- and CombineFunction + .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>( + new Counter(), new Counter() + )); +~~~ + +**Please note:** The Reducer wrapper works on groups as defined by Flink's [groupBy()](dataset_transformations.html#transformations-on-grouped-dataset) operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the `JobConf`. + +### Complete Hadoop WordCount Example + +The following example shows a complete WordCount implementation using Hadoop data types, Input- and OutputFormats, and Mapper and Reducer implementations. + +~~~java +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +// Set up the Hadoop TextInputFormat. +Job job = Job.getInstance(); +HadoopInputFormat<LongWritable, Text> hadoopIF = + new HadoopInputFormat<LongWritable, Text>( + new TextInputFormat(), LongWritable.class, Text.class, job + ); +TextInputFormat.addInputPath(job, new Path(inputPath)); + +// Read data using the Hadoop TextInputFormat. +DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF); + +DataSet<Tuple2<Text, LongWritable>> result = text + // use Hadoop Mapper (Tokenizer) as MapFunction + .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>( + new Tokenizer() + )) + .groupBy(0) + // use Hadoop Reducer (Counter) as Reduce- and CombineFunction + .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>( + new Counter(), new Counter() + )); + +// Set up the Hadoop TextOutputFormat. +HadoopOutputFormat<Text, IntWritable> hadoopOF = + new HadoopOutputFormat<Text, IntWritable>( + new TextOutputFormat<Text, IntWritable>(), job + ); +hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); +TextOutputFormat.setOutputPath(job, new Path(outputPath)); + +// Emit data using the Hadoop TextOutputFormat. +result.output(hadoopOF); + +// Execute Program +env.execute("Hadoop WordCount"); +~~~