http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fault_tolerance.md 
b/docs/apis/batch/fault_tolerance.md
new file mode 100644
index 0000000..51a6b41
--- /dev/null
+++ b/docs/apis/batch/fault_tolerance.md
@@ -0,0 +1,100 @@
+---
+title: "Fault Tolerance"
+
+# Sub-level navigation
+sub-nav-group: batch
+sub-nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink's fault tolerance mechanism recovers programs in the presence of 
failures and
+continues to execute them. Such failures include machine hardware failures, 
network failures,
+transient program failures, etc.
+
+* This will be replaced by the TOC
+{:toc}
+
+Batch Processing Fault Tolerance (DataSet API)
+----------------------------------------------
+
+Fault tolerance for programs in the *DataSet API* works by retrying failed 
executions.
+The number of time that Flink retries the execution before the job is declared 
as failed is configurable
+via the *execution retries* parameter. A value of *0* effectively means that 
fault tolerance is deactivated.
+
+To activate the fault tolerance, set the *execution retries* to a value larger 
than zero. A common choice is a value
+of three.
+
+This example shows how to configure the execution retries for a Flink DataSet 
program.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setNumberOfExecutionRetries(3);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setNumberOfExecutionRetries(3)
+{% endhighlight %}
+</div>
+</div>
+
+
+You can also define default values for the number of execution retries and the 
retry delay in the `flink-conf.yaml`:
+
+~~~
+execution-retries.default: 3
+~~~
+
+
+Retry Delays
+------------
+
+Execution retries can be configured to be delayed. Delaying the retry means 
that after a failed execution, the re-execution does not start
+immediately, but only after a certain delay.
+
+Delaying the retries can be helpful when the program interacts with external 
systems where for example connections or pending transactions should reach a 
timeout before re-execution is attempted.
+
+You can set the retry delay for each program as follows (the sample shows the 
DataStream API - the DataSet API works similarly):
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.getConfig().setExecutionRetryDelay(5000); // 5000 milliseconds delay
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.getConfig.setExecutionRetryDelay(5000) // 5000 milliseconds delay
+{% endhighlight %}
+</div>
+</div>
+
+You can also define the default value for the retry delay in the 
`flink-conf.yaml`:
+
+~~~
+execution-retries.delay: 10 s
+~~~
+
+{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/LICENSE.txt
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/LICENSE.txt b/docs/apis/batch/fig/LICENSE.txt
new file mode 100644
index 0000000..35b8673
--- /dev/null
+++ b/docs/apis/batch/fig/LICENSE.txt
@@ -0,0 +1,17 @@
+All image files in the folder and its subfolders are
+licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_delta_iterate_operator.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_delta_iterate_operator.png 
b/docs/apis/batch/fig/iterations_delta_iterate_operator.png
new file mode 100644
index 0000000..470485a
Binary files /dev/null and 
b/docs/apis/batch/fig/iterations_delta_iterate_operator.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png 
b/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png
new file mode 100644
index 0000000..15f2b54
Binary files /dev/null and 
b/docs/apis/batch/fig/iterations_delta_iterate_operator_example.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_iterate_operator.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_iterate_operator.png 
b/docs/apis/batch/fig/iterations_iterate_operator.png
new file mode 100644
index 0000000..aaf4158
Binary files /dev/null and 
b/docs/apis/batch/fig/iterations_iterate_operator.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_iterate_operator_example.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_iterate_operator_example.png 
b/docs/apis/batch/fig/iterations_iterate_operator_example.png
new file mode 100644
index 0000000..be4841c
Binary files /dev/null and 
b/docs/apis/batch/fig/iterations_iterate_operator_example.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/iterations_supersteps.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/iterations_supersteps.png 
b/docs/apis/batch/fig/iterations_supersteps.png
new file mode 100644
index 0000000..331dbc7
Binary files /dev/null and b/docs/apis/batch/fig/iterations_supersteps.png 
differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/fig/plan_visualizer.png
----------------------------------------------------------------------
diff --git a/docs/apis/batch/fig/plan_visualizer.png 
b/docs/apis/batch/fig/plan_visualizer.png
new file mode 100644
index 0000000..85b8c55
Binary files /dev/null and b/docs/apis/batch/fig/plan_visualizer.png differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/hadoop_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/hadoop_compatibility.md 
b/docs/apis/batch/hadoop_compatibility.md
new file mode 100644
index 0000000..187aa6b
--- /dev/null
+++ b/docs/apis/batch/hadoop_compatibility.md
@@ -0,0 +1,249 @@
+---
+title: "Hadoop Compatibility"
+is_beta: true
+# Sub-level navigation
+sub-nav-group: batch
+sub-nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink is compatible with Apache Hadoop MapReduce interfaces and therefore 
allows
+reusing code that was implemented for Hadoop MapReduce.
+
+You can:
+
+- use Hadoop's `Writable` [data types](programming_guide.html#data-types) in 
Flink programs.
+- use any Hadoop `InputFormat` as a 
[DataSource](programming_guide.html#data-sources).
+- use any Hadoop `OutputFormat` as a 
[DataSink](programming_guide.html#data-sinks).
+- use a Hadoop `Mapper` as 
[FlatMapFunction](dataset_transformations.html#flatmap).
+- use a Hadoop `Reducer` as 
[GroupReduceFunction](dataset_transformations.html#groupreduce-on-grouped-dataset).
+
+This document shows how to use existing Hadoop MapReduce code with Flink. 
Please refer to the
+[Connecting to other systems]({{ site.baseurl }}/apis/connectors.html) guide 
for reading from Hadoop supported file systems.
+
+* This will be replaced by the TOC
+{:toc}
+
+### Project Configuration
+
+Support for Haddop input/output formats is part of the `flink-java` and
+`flink-scala` Maven modules that are always required when writing Flink jobs.
+The code is located in `org.apache.flink.api.java.hadoop` and
+`org.apache.flink.api.scala.hadoop` in an additional sub-package for the
+`mapred` and `mapreduce` API.
+
+Support for Hadoop Mappers and Reducers is contained in the 
`flink-hadoop-compatibility`
+Maven module.
+This code resides in the `org.apache.flink.hadoopcompatibility`
+package.
+
+Add the following dependency to your `pom.xml` if you want to reuse Mappers
+and Reducers.
+
+~~~xml
+<dependency>
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-hadoop-compatibility</artifactId>
+       <version>{{site.version}}</version>
+</dependency>
+~~~
+
+### Using Hadoop Data Types
+
+Flink supports all Hadoop `Writable` and `WritableComparable` data types
+out-of-the-box. You do not need to include the Hadoop Compatibility dependency,
+if you only want to use your Hadoop data types. See the
+[Programming Guide](programming_guide.html#data-types) for more details.
+
+### Using Hadoop InputFormats
+
+Hadoop input formats can be used to create a data source by using
+one of the methods `readHadoopFile` or `createHadoopInput` of the
+`ExecutionEnvironment`. The former is used for input formats derived
+from `FileInputFormat` while the latter has to be used for general purpose
+input formats.
+
+The resulting `DataSet` contains 2-tuples where the first field
+is the key and the second field is the value retrieved from the Hadoop
+InputFormat.
+
+The following example shows how to use Hadoop's `TextInputFormat`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<Tuple2<LongWritable, Text>> input =
+    env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, 
textPath);
+
+// Do something with the data.
+[...]
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val input: DataSet[(LongWritable, Text)] =
+  env.readHadoopFile(new TextInputFormat, classOf[LongWritable], 
classOf[Text], textPath)
+
+// Do something with the data.
+[...]
+~~~
+
+</div>
+
+</div>
+
+### Using Hadoop OutputFormats
+
+Flink provides a compatibility wrapper for Hadoop `OutputFormats`. Any class
+that implements `org.apache.hadoop.mapred.OutputFormat` or extends
+`org.apache.hadoop.mapreduce.OutputFormat` is supported.
+The OutputFormat wrapper expects its input data to be a DataSet containing
+2-tuples of key and value. These are to be processed by the Hadoop 
OutputFormat.
+
+The following example shows how to use Hadoop's `TextOutputFormat`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+~~~java
+// Obtain the result we want to emit
+DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
+
+// Set up the Hadoop TextOutputFormat.
+HadoopOutputFormat<Text, IntWritable> hadoopOF =
+  // create the Flink wrapper.
+  new HadoopOutputFormat<Text, IntWritable>(
+    // set the Hadoop OutputFormat and specify the job.
+    new TextOutputFormat<Text, IntWritable>(), job
+  );
+hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", 
" ");
+TextOutputFormat.setOutputPath(job, new Path(outputPath));
+
+// Emit data using the Hadoop TextOutputFormat.
+hadoopResult.output(hadoopOF);
+~~~
+
+</div>
+<div data-lang="scala" markdown="1">
+
+~~~scala
+// Obtain your result to emit.
+val hadoopResult: DataSet[(Text, IntWritable)] = [...]
+
+val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
+  new TextOutputFormat[Text, IntWritable],
+  new JobConf)
+
+hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
+FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
+
+hadoopResult.output(hadoopOF)
+
+
+~~~
+
+</div>
+
+</div>
+
+### Using Hadoop Mappers and Reducers
+
+Hadoop Mappers are semantically equivalent to Flink's 
[FlatMapFunctions](dataset_transformations.html#flatmap) and Hadoop Reducers 
are equivalent to Flink's 
[GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset).
 Flink provides wrappers for implementations of Hadoop MapReduce's `Mapper` and 
`Reducer` interfaces, i.e., you can reuse your Hadoop Mappers and Reducers in 
regular Flink programs. At the moment, only the Mapper and Reduce interfaces of 
Hadoop's mapred API (`org.apache.hadoop.mapred`) are supported.
+
+The wrappers take a `DataSet<Tuple2<KEYIN,VALUEIN>>` as input and produce a 
`DataSet<Tuple2<KEYOUT,VALUEOUT>>` as output where `KEYIN` and `KEYOUT` are the 
keys and `VALUEIN` and `VALUEOUT` are the values of the Hadoop key-value pairs 
that are processed by the Hadoop functions. For Reducers, Flink offers a 
wrapper for a GroupReduceFunction with (`HadoopReduceCombineFunction`) and 
without a Combiner (`HadoopReduceFunction`). The wrappers accept an optional 
`JobConf` object to configure the Hadoop Mapper or Reducer.
+
+Flink's function wrappers are
+
+- `org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction`,
+- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction`, and
+- `org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction`.
+
+and can be used as regular Flink 
[FlatMapFunctions](dataset_transformations.html#flatmap) or 
[GroupReduceFunctions](dataset_transformations.html#groupreduce-on-grouped-dataset).
+
+The following example shows how to use Hadoop `Mapper` and `Reducer` functions.
+
+~~~java
+// Obtain data to process somehow.
+DataSet<Tuple2<Text, LongWritable>> text = [...]
+
+DataSet<Tuple2<Text, LongWritable>> result = text
+  // use Hadoop Mapper (Tokenizer) as MapFunction
+  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
+    new Tokenizer()
+  ))
+  .groupBy(0)
+  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
+  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, 
LongWritable>(
+    new Counter(), new Counter()
+  ));
+~~~
+
+**Please note:** The Reducer wrapper works on groups as defined by Flink's 
[groupBy()](dataset_transformations.html#transformations-on-grouped-dataset) 
operation. It does not consider any custom partitioners, sort or grouping 
comparators you might have set in the `JobConf`.
+
+### Complete Hadoop WordCount Example
+
+The following example shows a complete WordCount implementation using Hadoop 
data types, Input- and OutputFormats, and Mapper and Reducer implementations.
+
+~~~java
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// Set up the Hadoop TextInputFormat.
+Job job = Job.getInstance();
+HadoopInputFormat<LongWritable, Text> hadoopIF =
+  new HadoopInputFormat<LongWritable, Text>(
+    new TextInputFormat(), LongWritable.class, Text.class, job
+  );
+TextInputFormat.addInputPath(job, new Path(inputPath));
+
+// Read data using the Hadoop TextInputFormat.
+DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
+
+DataSet<Tuple2<Text, LongWritable>> result = text
+  // use Hadoop Mapper (Tokenizer) as MapFunction
+  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
+    new Tokenizer()
+  ))
+  .groupBy(0)
+  // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
+  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, 
LongWritable>(
+    new Counter(), new Counter()
+  ));
+
+// Set up the Hadoop TextOutputFormat.
+HadoopOutputFormat<Text, IntWritable> hadoopOF =
+  new HadoopOutputFormat<Text, IntWritable>(
+    new TextOutputFormat<Text, IntWritable>(), job
+  );
+hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", 
" ");
+TextOutputFormat.setOutputPath(job, new Path(outputPath));
+
+// Emit data using the Hadoop TextOutputFormat.
+result.output(hadoopOF);
+
+// Execute Program
+env.execute("Hadoop WordCount");
+~~~

Reply via email to