http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/batch/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/index.md b/docs/apis/batch/index.md
new file mode 100644
index 0000000..2c4f429
--- /dev/null
+++ b/docs/apis/batch/index.md
@@ -0,0 +1,3411 @@
+---
+title: "Flink DataSet API Programming Guide"
+
+# Top-level navigation
+top-nav-group: apis
+top-nav-pos: 2
+top-nav-title: <strong>Batch Guide</strong> (DataSet API)
+
+# Sub-level navigation
+sub-nav-group: batch
+sub-nav-group-title: Batch Guide
+sub-nav-id: dataset_api
+sub-nav-pos: 1
+sub-nav-title: DataSet API
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+DataSet programs in Flink are regular programs that implement transformations 
on data sets
+(e.g., filtering, mapping, joining, grouping). The data sets are initially 
created from certain
+sources (e.g., by reading files, or from local collections). Results are 
returned via sinks, which may for
+example write the data to (distributed) files, or to standard output (for 
example the command line
+terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+In order to create your own Flink DataSet program, we encourage you to start 
with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as references 
for additional
+operations and advanced features.
+
+* This will be replaced by the TOC
+{:toc}
+
+Example Program
+---------------
+
+The following program is a complete, working example of WordCount. You can 
copy &amp; paste the code
+to run it locally. You only have to include the correct Flink's library into 
your project
+(see Section [Linking with Flink](#linking-with-flink)) and specify the 
imports. Then you are ready
+to go!
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+public class WordCountExample {
+    public static void main(String[] args) throws Exception {
+        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+        DataSet<String> text = env.fromElements(
+            "Who's there?",
+            "I think I hear them. Stand, ho! Who's there?");
+
+        DataSet<Tuple2<String, Integer>> wordCounts = text
+            .flatMap(new LineSplitter())
+            .groupBy(0)
+            .sum(1);
+
+        wordCounts.print();
+    }
+
+    public static class LineSplitter implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+        @Override
+        public void flatMap(String line, Collector<Tuple2<String, Integer>> 
out) {
+            for (String word : line.split(" ")) {
+                out.collect(new Tuple2<String, Integer>(word, 1));
+            }
+        }
+    }
+}
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+
+object WordCount {
+  def main(args: Array[String]) {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val text = env.fromElements(
+      "Who's there?",
+      "I think I hear them. Stand, ho! Who's there?")
+
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { 
_.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    counts.print()
+  }
+}
+{% endhighlight %}
+</div>
+
+</div>
+
+{% top %}
+
+
+Linking with Flink
+------------------
+
+To write programs with Flink, you need to include the Flink library 
corresponding to
+your programming language in your project.
+
+The simplest way to do this is to use one of the quickstart scripts: either for
+[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for 
[Scala]({{ site.baseurl }}/quickstart/scala_api_quickstart.html). They
+create a blank project from a template (a Maven Archetype), which sets up 
everything for you. To
+manually create the project, you can use the archetype and create a project by 
calling:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-quickstart-java \
+    -DarchetypeVersion={{site.version }}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+mvn archetype:generate \
+    -DarchetypeGroupId=org.apache.flink \
+    -DarchetypeArtifactId=flink-quickstart-scala \
+    -DarchetypeVersion={{site.version }}
+{% endhighlight %}
+</div>
+</div>
+
+The archetypes are working for stable releases and preview versions 
(`-SNAPSHOT`).
+
+If you want to add Flink to an existing Maven project, add the following entry 
to your
+*dependencies* section in the *pom.xml* file of your project:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-java</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-scala</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+**Important:** When working with the Scala API you must have one of these two 
imports:
+{% highlight scala %}
+import org.apache.flink.api.scala._
+{% endhighlight %}
+
+or
+
+{% highlight scala %}
+import org.apache.flink.api.scala.createTypeInformation
+{% endhighlight %}
+
+The reason is that Flink analyzes the types that are used in a program and 
generates serializers
+and comparaters for them. By having either of those imports you enable an 
implicit conversion
+that creates the type information for Flink operations.
+</div>
+</div>
+
+#### Scala Dependency Versions
+
+Because Scala 2.10 binary is not compatible with Scala 2.11 binary, we provide 
multiple artifacts
+to support both Scala versions.
+
+Starting from the 0.10 line, we cross-build all Flink modules for both 2.10 
and 2.11. If you want
+to run your program on Flink with Scala 2.11, you need to add a `_2.11` suffix 
to the `artifactId`
+values of the Flink modules in your dependencies section.
+
+If you are looking for building Flink with Scala 2.11, please check
+[build guide]({{ site.baseurl }}/setup/building.html#scala-versions).
+
+#### Hadoop Dependency Versions
+
+If you are using Flink together with Hadoop, the version of the dependency may 
vary depending on the
+version of Hadoop (or more specifically, HDFS) that you want to use Flink 
with. Please refer to the
+[downloads page](http://flink.apache.org/downloads.html) for a list of 
available versions, and instructions
+on how to link with custom versions of Hadoop.
+
+In order to link against the latest SNAPSHOT versions of the code, please 
follow
+[this 
guide](http://flink.apache.org/how-to-contribute.html#snapshots-nightly-builds).
+
+The *flink-clients* dependency is only necessary to invoke the Flink program 
locally (for example to
+run it standalone for testing and debugging).  If you intend to only export 
the program as a JAR
+file and [run it on a cluster]({{ site.baseurl 
}}/apis/cluster_execution.html), you can skip that dependency.
+
+{% top %}
+
+Program Skeleton
+----------------
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+As we already saw in the example, Flink DataSet programs look like regular Java
+programs with a `main()` method. Each program consists of the same basic parts:
+
+1. Obtain an `ExecutionEnvironment`,
+2. Load/create the initial data,
+3. Specify transformations on this data,
+4. Specify where to put the results of your computations,
+5. Trigger the program execution
+
+We will now give an overview of each of those steps, please refer to the 
respective sections for
+more details. Note that all core classes of the Java API are found in the 
package {% gh_link /flink-java/src/main/java/org/apache/flink/api/java 
"org.apache.flink.api.java" %}.
+
+The `ExecutionEnvironment` is the basis for all Flink DataSet programs. You can
+obtain one using these static methods on class `ExecutionEnvironment`:
+
+{% highlight java %}
+getExecutionEnvironment()
+
+createCollectionsEnvironment()
+
+createLocalEnvironment()
+createLocalEnvironment(int parallelism)
+createLocalEnvironment(Configuration customConfiguration)
+
+createRemoteEnvironment(String host, int port, String... jarFiles)
+createRemoteEnvironment(String host, int port, int parallelism, String... 
jarFiles)
+{% endhighlight %}
+
+Typically, you only need to use `getExecutionEnvironment()`, since this
+will do the right thing depending on the context: if you are executing
+your program inside an IDE or as a regular Java program it will create
+a local environment that will execute your program on your local machine. If
+you created a JAR file from your program, and invoke it through the [command 
line]({{ site.baseurl }}/apis/cli.html)
+or the [web interface]({{ site.baseurl }}/apis/web_client.html),
+the Flink cluster manager will execute your main method and 
`getExecutionEnvironment()` will return
+an execution environment for executing your program on a cluster.
+
+For specifying data sources the execution environment has several methods
+to read from files using various methods: you can just read them line by line,
+as CSV files, or using completely custom data input formats. To just read
+a text file as a sequence of lines, you can use:
+
+{% highlight java %}
+final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<String> text = env.readTextFile("file:///path/to/file");
+{% endhighlight %}
+
+This will give you a DataSet on which you can then apply transformations. For
+more information on data sources and input formats, please refer to
+[Data Sources](#data-sources).
+
+Once you have a DataSet you can apply transformations to create a new
+DataSet which you can then write to a file, transform again, or
+combine with other DataSets. You apply transformations by calling
+methods on DataSet with your own custom transformation functions. For example,
+a map transformation looks like this:
+
+{% highlight java %}
+DataSet<String> input = ...;
+
+DataSet<Integer> tokenized = input.map(new MapFunction<String, Integer>() {
+    @Override
+    public Integer map(String value) {
+        return Integer.parseInt(value);
+    }
+});
+{% endhighlight %}
+
+This will create a new DataSet by converting every String in the original
+set to an Integer. For more information and a list of all the transformations,
+please refer to [Transformations](#transformations).
+
+Once you have a DataSet containing your final results, you can either write 
the result
+to a file system (HDFS or local) or print it.
+
+{% highlight java %}
+writeAsText(String path)
+writeAsCsv(String path)
+write(FileOutputFormat<T> outputFormat, String filePath)
+
+print()
+printOnTaskManager()
+
+collect()
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+As we already saw in the example, Flink programs look like regular Scala
+programs with a `main()` method. Each program consists of the same basic parts:
+
+1. Obtain an `ExecutionEnvironment`,
+2. Load/create the initial data,
+3. Specify transformations on this data,
+4. Specify where to put the results of your computations,
+5. Trigger the program execution
+
+We will now give an overview of each of those steps, please refer to the 
respective sections for
+more details. Note that all core classes of the Scala API are found in the 
package
+{% gh_link /flink-scala/src/main/scala/org/apache/flink/api/scala 
"org.apache.flink.api.scala" %}.
+
+
+The `ExecutionEnvironment` is the basis for all Flink programs. You can
+obtain one using these static methods on class `ExecutionEnvironment`:
+
+{% highlight scala %}
+def getExecutionEnvironment
+
+def createLocalEnvironment(parallelism: Int = 
Runtime.getRuntime.availableProcessors())
+def createLocalEnvironment(customConfiguration: Configuration)
+
+def createCollectionsEnvironment
+
+def createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
+def createRemoteEnvironment(host: String, port: Int, parallelism: Int, 
jarFiles: String*)
+{% endhighlight %}
+
+Typically, you only need to use `getExecutionEnvironment()`, since this
+will do the right thing depending on the context: if you are executing
+your program inside an IDE or as a regular Scala program it will create
+a local environment that will execute your program on your local machine. If
+you created a JAR file from you program, and invoke it through the [command 
line](cli.html)
+or the [web interface](web_client.html),
+the Flink cluster manager will execute your main method and 
`getExecutionEnvironment()` will return
+an execution environment for executing your program on a cluster.
+
+For specifying data sources the execution environment has several methods
+to read from files using various methods: you can just read them line by line,
+as CSV files, or using completely custom data input formats. To just read
+a text file as a sequence of lines, you can use:
+
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+
+val text = env.readTextFile("file:///path/to/file")
+{% endhighlight %}
+
+This will give you a DataSet on which you can then apply transformations. For
+more information on data sources and input formats, please refer to
+[Data Sources](#data-sources).
+
+Once you have a DataSet you can apply transformations to create a new
+DataSet which you can then write to a file, transform again, or
+combine with other DataSets. You apply transformations by calling
+methods on DataSet with your own custom transformation function. For example,
+a map transformation looks like this:
+
+{% highlight scala %}
+val input: DataSet[String] = ...
+
+val mapped = input.map { x => x.toInt }
+{% endhighlight %}
+
+This will create a new DataSet by converting every String in the original
+set to an Integer. For more information and a list of all the transformations,
+please refer to [Transformations](#transformations).
+
+Once you have a DataSet containing your final results, you can either write 
the result
+to a file system (HDFS or local) or print it.
+
+{% highlight scala %}
+def writeAsText(path: String, writeMode: WriteMode = WriteMode.NO_OVERWRITE)
+def writeAsCsv(
+    filePath: String,
+    rowDelimiter: String = "\n",
+    fieldDelimiter: String = ',',
+    writeMode: WriteMode = WriteMode.NO_OVERWRITE)
+def write(outputFormat: FileOutputFormat[T],
+    path: String,
+    writeMode: WriteMode = WriteMode.NO_OVERWRITE)
+
+def printOnTaskManager()
+
+def print()
+
+def collect()
+{% endhighlight %}
+
+</div>
+</div>
+
+
+The first two methods (`writeAsText()` and `writeAsCsv()`) do as the name 
suggests, the third one
+can be used to specify a custom data output format. Please refer to [Data 
Sinks](#data-sinks) for
+more information on writing to files and also about custom data output formats.
+
+The `print()` method is useful for developing/debugging. It will output the 
contents of the DataSet
+to standard output (on the JVM starting the Flink execution). **NOTE** The 
behavior of the `print()`
+method changed with Flink 0.9.x. Before it was printing to the log file of the 
workers, now its
+sending the DataSet results to the client and printing the results there.
+
+`collect()` retrieve the DataSet from the cluster to the local JVM. The 
`collect()` method
+will return a `List` containing the elements.
+
+Both `print()` and `collect()` will trigger the execution of the program. You 
don't need to further call `execute()`.
+
+
+**NOTE** `print()` and `collect()` retrieve the data from the cluster to the 
client. Currently,
+the data sizes you can retrieve with `collect()` are limited due to our RPC 
system. It is not advised
+to collect DataSets larger than 10MBs.
+
+There is also a `printOnTaskManager()` method which will print the DataSet 
contents on the TaskManager
+(so you have to get them from the log file). The `printOnTaskManager()` method 
will not trigger a
+program execution.
+
+Once you specified the complete program you need to **trigger the program 
execution**. You can call
+`execute()` directly on the `ExecutionEnviroment` or you implicitly trigger 
the execution with
+`collect()` or `print()`.
+Depending on the type of the `ExecutionEnvironment` the execution will be 
triggered on your local
+machine or submit your program for execution on a cluster.
+
+Note that you can not call both `print()` (or `collect()`) and `execute()` at 
the end of program.
+
+The `execute()` method is returning the `JobExecutionResult`, including 
execution times and
+accumulator results. `print()` and `collect()` are not returning the result, 
but it can be
+accessed from the `getLastJobExecutionResult()` method.
+
+
+{% top %}
+
+
+DataSet abstraction
+---------------
+
+A `DataSet` is an abstract representation of a finite immutable collection of 
data of the same type that may contain duplicates.
+
+Note that Flink is not always physically creating (materializing) each DataSet 
at runtime. This
+depends on the used runtime, the configuration and optimizer decisions. 
DataSets may be "streamed through"
+operations during execution, as under the hood Flink uses a streaming data 
processing engine.
+
+Some DataSets are materialized automatically to avoid distributed deadlocks 
(at points where the data flow graph branches
+out and joins again later) or if the execution mode has explicitly been set to 
blocking execution.
+
+{% top %}
+
+
+Lazy Evaluation
+---------------
+
+All Flink DataSet programs are executed lazily: When the program's main method 
is executed, the data loading
+and transformations do not happen directly. Rather, each operation is created 
and added to the
+program's plan. The operations are actually executed when the execution is 
explicitly triggered by
+an `execute()` call on the ExecutionEnvironment object. Also, `collect()` and 
`print()` will trigger
+the job execution. Whether the program is executed locally or on a cluster 
depends
+on the environment of the program.
+
+The lazy evaluation lets you construct sophisticated programs that Flink 
executes as one
+holistically planned unit.
+
+{% top %}
+
+
+Transformations
+---------------
+
+Data transformations transform one or more DataSets into a new DataSet. 
Programs can combine
+multiple transformations into sophisticated assemblies.
+
+This section gives a brief overview of the available transformations. The 
[transformations
+documentation](dataset_transformations.html) has a full description of all 
transformations with
+examples.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>Map</strong></td>
+      <td>
+        <p>Takes one element and produces one element.</p>
+{% highlight java %}
+data.map(new MapFunction<String, Integer>() {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>FlatMap</strong></td>
+      <td>
+        <p>Takes one element and produces zero, one, or more elements. </p>
+{% highlight java %}
+data.flatMap(new FlatMapFunction<String, String>() {
+  public void flatMap(String value, Collector<String> out) {
+    for (String s : value.split(" ")) {
+      out.collect(s);
+    }
+  }
+});
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>MapPartition</strong></td>
+      <td>
+        <p>Transforms a parallel partition in a single function call. The 
function get the partition
+        as an `Iterable` stream and can produce an arbitrary number of result 
values. The number of
+        elements in each partition depends on the degree-of-parallelism and 
previous operations.</p>
+{% highlight java %}
+data.mapPartition(new MapPartitionFunction<String, Long>() {
+  public void mapPartition(Iterable<String> values, Collector<Long> out) {
+    long c = 0;
+    for (String s : values) {
+      c++;
+    }
+    out.collect(c);
+  }
+});
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Filter</strong></td>
+      <td>
+        <p>Evaluates a boolean function for each element and retains those for 
which the function
+        returns true.<br/>
+
+        <strong>IMPORTANT:</strong> The system assumes that the function does 
not modify the elements on which the predicate is applied. Violating this 
assumption
+        can lead to incorrect results.
+        </p>
+{% highlight java %}
+data.filter(new FilterFunction<Integer>() {
+  public boolean filter(Integer value) { return value > 1000; }
+});
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Reduce</strong></td>
+      <td>
+        <p>Combines a group of elements into a single element by repeatedly 
combining two elements
+        into one. Reduce may be applied on a full data set, or on a grouped 
data set.</p>
+{% highlight java %}
+data.reduce(new ReduceFunction<Integer> {
+  public Integer reduce(Integer a, Integer b) { return a + b; }
+});
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>ReduceGroup</strong></td>
+      <td>
+        <p>Combines a group of elements into one or more elements. ReduceGroup 
may be applied on a
+        full data set, or on a grouped data set.</p>
+{% highlight java %}
+data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
+  public void reduce(Iterable<Integer> values, Collector<Integer> out) {
+    int prefixSum = 0;
+    for (Integer i : values) {
+      prefixSum += i;
+      out.collect(prefixSum);
+    }
+  }
+});
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Aggregate</strong></td>
+      <td>
+        <p>Aggregates a group of values into a single value. Aggregation 
functions can be thought of
+        as built-in reduce functions. Aggregate may be applied on a full data 
set, or on a grouped
+        data set.</p>
+{% highlight java %}
+Dataset<Tuple3<Integer, String, Double>> input = // [...]
+DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 
0).and(MIN, 2);
+{% endhighlight %}
+       <p>You can also use short-hand syntax for minimum, maximum, and sum 
aggregations.</p>
+       {% highlight java %}
+       Dataset<Tuple3<Integer, String, Double>> input = // [...]
+DataSet<Tuple3<Integer, String, Double>> output = input.sum(0).andMin(2);
+       {% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Distinct</strong></td>
+      <td>
+        <p>Returns the distinct elements of a data set. It removes the 
duplicate entries
+        from the input DataSet, with respect to all fields of the elements, or 
a subset of fields.</p>
+    {% highlight java %}
+        data.distinct();
+    {% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Join</strong></td>
+      <td>
+        Joins two data sets by creating all pairs of elements that are equal 
on their keys.
+        Optionally uses a JoinFunction to turn the pair of elements into a 
single element, or a
+        FlatJoinFunction to turn the pair of elements into arbitrarily many 
(including none)
+        elements. See the <a href="#specifying-keys">keys section</a> to learn 
how to define join keys.
+{% highlight java %}
+result = input1.join(input2)
+               .where(0)       // key of the first input (tuple field 0)
+               .equalTo(1);    // key of the second input (tuple field 1)
+{% endhighlight %}
+        You can specify the way that the runtime executes the join via <i>Join 
Hints</i>. The hints
+        describe whether the join happens through partitioning or 
broadcasting, and whether it uses
+        a sort-based or a hash-based algorithm. Please refer to the
+        <a 
href="dataset_transformations.html#join-algorithm-hints">Transformations 
Guide</a> for
+        a list of possible hints and an example.</br>
+        If no hint is specified, the system will try to make an estimate of 
the input sizes and
+        pick a the best strategy according to those estimates.
+{% highlight java %}
+// This executes a join by broadcasting the first data set
+// using a hash table for the broadcasted data
+result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
+               .where(0).equalTo(1);
+{% endhighlight %}
+        Note that the join transformation works only for equi-joins. Other 
join types need to be expressed using OuterJoin or CoGroup.
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>OuterJoin</strong></td>
+      <td>
+        Performs a left, right, or full outer join on two data sets. Outer 
joins are similar to regular (inner) joins and create all pairs of elements 
that are equal on their keys. In addition, records of the "outer" side (left, 
right, or both in case of full) are preserved if no matching key is found in 
the other side. Matching pairs of elements (or one element and a `null` value 
for the other input) are given to a JoinFunction to turn the pair of elements 
into a single element, or to a FlatJoinFunction to turn the pair of elements 
into arbitrarily many (including none)         elements. See the <a 
href="#specifying-keys">keys section</a> to learn how to define join keys.
+{% highlight java %}
+input1.leftOuterJoin(input2) // rightOuterJoin or fullOuterJoin for right or 
full outer joins
+      .where(0)              // key of the first input (tuple field 0)
+      .equalTo(1)            // key of the second input (tuple field 1)
+      .with(new JoinFunction<String, String, String>() {
+          public String join(String v1, String v2) {
+             // NOTE:
+             // - v2 might be null for leftOuterJoin
+             // - v1 might be null for rightOuterJoin
+             // - v1 OR v2 might be null for fullOuterJoin
+          }
+      });
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>CoGroup</strong></td>
+      <td>
+        <p>The two-dimensional variant of the reduce operation. Groups each 
input on one or more
+        fields and then joins the groups. The transformation function is 
called per pair of groups.
+        See the <a href="#specifying-keys">keys section</a> to learn how to 
define coGroup keys.</p>
+{% highlight java %}
+data1.coGroup(data2)
+     .where(0)
+     .equalTo(1)
+     .with(new CoGroupFunction<String, String, String>() {
+         public void coGroup(Iterable<String> in1, Iterable<String> in2, 
Collector<String> out) {
+           out.collect(...);
+         }
+      });
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Cross</strong></td>
+      <td>
+        <p>Builds the Cartesian product (cross product) of two inputs, 
creating all pairs of
+        elements. Optionally uses a CrossFunction to turn the pair of elements 
into a single
+        element</p>
+{% highlight java %}
+DataSet<Integer> data1 = // [...]
+DataSet<String> data2 = // [...]
+DataSet<Tuple2<Integer, String>> result = data1.cross(data2);
+{% endhighlight %}
+      <p>Note: Cross is potentially a <b>very</b> compute-intensive operation 
which can challenge even large compute clusters! It is adviced to hint the 
system with the DataSet sizes by using <i>crossWithTiny()</i> and 
<i>crossWithHuge()</i>.</p>
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Union</strong></td>
+      <td>
+        <p>Produces the union of two data sets. This operation happens 
implicitly if more than one
+        data set is used for a specific function input.</p>
+{% highlight java %}
+DataSet<String> data1 = // [...]
+DataSet<String> data2 = // [...]
+DataSet<String> result = data1.union(data2);
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Rebalance</strong></td>
+      <td>
+        <p>Evenly rebalances the parallel partitions of a data set to 
eliminate data skew. Only Map-like transformations may follow a rebalance 
transformation.</p>
+{% highlight java %}
+DataSet<String> in = // [...]
+DataSet<String> result = in.rebalance()
+                           .map(new Mapper());
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Hash-Partition</strong></td>
+      <td>
+        <p>Hash-partitions a data set on a given key. Keys can be specified as 
position keys, expression keys, and key selector functions.</p>
+{% highlight java %}
+DataSet<Tuple2<String,Integer>> in = // [...]
+DataSet<Integer> result = in.partitionByHash(0)
+                            .mapPartition(new PartitionMapper());
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Range-Partition</strong></td>
+      <td>
+        <p>Range-partitions a data set on a given key. Keys can be specified 
as position keys, expression keys, and key selector functions.</p>
+{% highlight java %}
+DataSet<Tuple2<String,Integer>> in = // [...]
+DataSet<Integer> result = in.partitionByRange(0)
+                            .mapPartition(new PartitionMapper());
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Custom Partitioning</strong></td>
+      <td>
+        <p>Manually specify a partitioning over the data.
+          <br/>
+          <i>Note</i>: This method works only on single field keys.</p>
+{% highlight java %}
+DataSet<Tuple2<String,Integer>> in = // [...]
+DataSet<Integer> result = in.partitionCustom(Partitioner<K> partitioner, key)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Sort Partition</strong></td>
+      <td>
+        <p>Locally sorts all partitions of a data set on a specified field in 
a specified order.
+          Fields can be specified as tuple positions or field expressions.
+          Sorting on multiple fields is done by chaining sortPartition() 
calls.</p>
+{% highlight java %}
+DataSet<Tuple2<String,Integer>> in = // [...]
+DataSet<Integer> result = in.sortPartition(1, Order.ASCENDING)
+                            .mapPartition(new PartitionMapper());
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>First-n</strong></td>
+      <td>
+        <p>Returns the first n (arbitrary) elements of a data set. First-n can 
be applied on a regular data set, a grouped data set, or a grouped-sorted data 
set. Grouping keys can be specified as key-selector functions or field position 
keys.</p>
+{% highlight java %}
+DataSet<Tuple2<String,Integer>> in = // [...]
+// regular data set
+DataSet<Tuple2<String,Integer>> result1 = in.first(3);
+// grouped data set
+DataSet<Tuple2<String,Integer>> result2 = in.groupBy(0)
+                                            .first(3);
+// grouped-sorted data set
+DataSet<Tuple2<String,Integer>> result3 = in.groupBy(0)
+                                            .sortGroup(1, Order.ASCENDING)
+                                            .first(3);
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+----------
+
+The following transformations are available on data sets of Tuples:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Project</strong></td>
+      <td>
+        <p>Selects a subset of fields from the tuples</p>
+{% highlight java %}
+DataSet<Tuple3<Integer, Double, String>> in = // [...]
+DataSet<Tuple2<String, Integer>> out = in.project(2,0);
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+<div data-lang="scala" markdown="1">
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>Map</strong></td>
+      <td>
+        <p>Takes one element and produces one element.</p>
+{% highlight scala %}
+data.map { x => x.toInt }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>FlatMap</strong></td>
+      <td>
+        <p>Takes one element and produces zero, one, or more elements. </p>
+{% highlight scala %}
+data.flatMap { str => str.split(" ") }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>MapPartition</strong></td>
+      <td>
+        <p>Transforms a parallel partition in a single function call. The 
function get the partition
+        as an `Iterator` and can produce an arbitrary number of result values. 
The number of
+        elements in each partition depends on the degree-of-parallelism and 
previous operations.</p>
+{% highlight scala %}
+data.mapPartition { in => in map { (_, 1) } }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Filter</strong></td>
+      <td>
+        <p>Evaluates a boolean function for each element and retains those for 
which the function
+        returns true.<br/>
+        <strong>IMPORTANT:</strong> The system assumes that the function does 
not modify the element on which the predicate is applied.
+        Violating this assumption can lead to incorrect results.</p>
+{% highlight scala %}
+data.filter { _ > 1000 }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Reduce</strong></td>
+      <td>
+        <p>Combines a group of elements into a single element by repeatedly 
combining two elements
+        into one. Reduce may be applied on a full data set, or on a grouped 
data set.</p>
+{% highlight scala %}
+data.reduce { _ + _ }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>ReduceGroup</strong></td>
+      <td>
+        <p>Combines a group of elements into one or more elements. ReduceGroup 
may be applied on a
+        full data set, or on a grouped data set.</p>
+{% highlight scala %}
+data.reduceGroup { elements => elements.sum }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Aggregate</strong></td>
+      <td>
+        <p>Aggregates a group of values into a single value. Aggregation 
functions can be thought of
+        as built-in reduce functions. Aggregate may be applied on a full data 
set, or on a grouped
+        data set.</p>
+{% highlight scala %}
+val input: DataSet[(Int, String, Double)] = // [...]
+val output: DataSet[(Int, String, Doublr)] = input.aggregate(SUM, 
0).aggregate(MIN, 2);
+{% endhighlight %}
+  <p>You can also use short-hand syntax for minimum, maximum, and sum 
aggregations.</p>
+{% highlight scala %}
+val input: DataSet[(Int, String, Double)] = // [...]
+val output: DataSet[(Int, String, Doublr)] = input.sum(0).min(2)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Distinct</strong></td>
+      <td>
+        <p>Returns the distinct elements of a data set. It removes the 
duplicate entries
+        from the input DataSet, with respect to all fields of the elements, or 
a subset of fields.</p>
+      {% highlight scala %}
+         data.distinct()
+      {% endhighlight %}
+      </td>
+    </tr>
+
+    </tr>
+      <td><strong>Join</strong></td>
+      <td>
+        Joins two data sets by creating all pairs of elements that are equal 
on their keys.
+        Optionally uses a JoinFunction to turn the pair of elements into a 
single element, or a
+        FlatJoinFunction to turn the pair of elements into arbitrarily many 
(including none)
+        elements. See the <a href="#specifying-keys">keys section</a> to learn 
how to define join keys.
+{% highlight scala %}
+// In this case tuple fields are used as keys. "0" is the join field on the 
first tuple
+// "1" is the join field on the second tuple.
+val result = input1.join(input2).where(0).equalTo(1)
+{% endhighlight %}
+        You can specify the way that the runtime executes the join via <i>Join 
Hints</i>. The hints
+        describe whether the join happens through partitioning or 
broadcasting, and whether it uses
+        a sort-based or a hash-based algorithm. Please refer to the
+        <a 
href="dataset_transformations.html#join-algorithm-hints">Transformations 
Guide</a> for
+        a list of possible hints and an example.</br>
+        If no hint is specified, the system will try to make an estimate of 
the input sizes and
+        pick a the best strategy according to those estimates.
+{% highlight scala %}
+// This executes a join by broadcasting the first data set
+// using a hash table for the broadcasted data
+val result = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
+                   .where(0).equalTo(1)
+{% endhighlight %}
+          Note that the join transformation works only for equi-joins. Other 
join types need to be expressed using OuterJoin or CoGroup.
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>OuterJoin</strong></td>
+      <td>
+        Performs a left, right, or full outer join on two data sets. Outer 
joins are similar to regular (inner) joins and create all pairs of elements 
that are equal on their keys. In addition, records of the "outer" side (left, 
right, or both in case of full) are preserved if no matching key is found in 
the other side. Matching pairs of elements (or one element and a `null` value 
for the other input) are given to a JoinFunction to turn the pair of elements 
into a single element, or to a FlatJoinFunction to turn the pair of elements 
into arbitrarily many (including none)         elements. See the <a 
href="#specifying-keys">keys section</a> to learn how to define join keys.
+{% highlight scala %}
+val joined = left.leftOuterJoin(right).where(0).equalTo(1) {
+   (left, right) =>
+     val a = if (left == null) "none" else left._1
+     (a, right)
+  }
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>CoGroup</strong></td>
+      <td>
+        <p>The two-dimensional variant of the reduce operation. Groups each 
input on one or more
+        fields and then joins the groups. The transformation function is 
called per pair of groups.
+        See the <a href="#specifying-keys">keys section</a> to learn how to 
define coGroup keys.</p>
+{% highlight scala %}
+data1.coGroup(data2).where(0).equalTo(1)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Cross</strong></td>
+      <td>
+        <p>Builds the Cartesian product (cross product) of two inputs, 
creating all pairs of
+        elements. Optionally uses a CrossFunction to turn the pair of elements 
into a single
+        element</p>
+{% highlight scala %}
+val data1: DataSet[Int] = // [...]
+val data2: DataSet[String] = // [...]
+val result: DataSet[(Int, String)] = data1.cross(data2)
+{% endhighlight %}
+        <p>Note: Cross is potentially a <b>very</b> compute-intensive 
operation which can challenge even large compute clusters! It is adviced to 
hint the system with the DataSet sizes by using <i>crossWithTiny()</i> and 
<i>crossWithHuge()</i>.</p>
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Union</strong></td>
+      <td>
+        <p>Produces the union of two data sets.</p>
+{% highlight scala %}
+data.union(data2)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Rebalance</strong></td>
+      <td>
+        <p>Evenly rebalances the parallel partitions of a data set to 
eliminate data skew. Only Map-like transformations may follow a rebalance 
transformation.</p>
+{% highlight scala %}
+val data1: DataSet[Int] = // [...]
+val result: DataSet[(Int, String)] = data1.rebalance().map(...)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Hash-Partition</strong></td>
+      <td>
+        <p>Hash-partitions a data set on a given key. Keys can be specified as 
position keys, expression keys, and key selector functions.</p>
+{% highlight scala %}
+val in: DataSet[(Int, String)] = // [...]
+val result = in.partitionByHash(0).mapPartition { ... }
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Range-Partition</strong></td>
+      <td>
+        <p>Range-partitions a data set on a given key. Keys can be specified 
as position keys, expression keys, and key selector functions.</p>
+{% highlight scala %}
+val in: DataSet[(Int, String)] = // [...]
+val result = in.partitionByRange(0).mapPartition { ... }
+{% endhighlight %}
+      </td>
+    </tr>
+    </tr>
+    <tr>
+      <td><strong>Custom Partitioning</strong></td>
+      <td>
+        <p>Manually specify a partitioning over the data.
+          <br/>
+          <i>Note</i>: This method works only on single field keys.</p>
+{% highlight scala %}
+val in: DataSet[(Int, String)] = // [...]
+val result = in
+  .partitionCustom(partitioner: Partitioner[K], key)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Sort Partition</strong></td>
+      <td>
+        <p>Locally sorts all partitions of a data set on a specified field in 
a specified order.
+          Fields can be specified as tuple positions or field expressions.
+          Sorting on multiple fields is done by chaining sortPartition() 
calls.</p>
+{% highlight scala %}
+val in: DataSet[(Int, String)] = // [...]
+val result = in.sortPartition(1, Order.ASCENDING).mapPartition { ... }
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>First-n</strong></td>
+      <td>
+        <p>Returns the first n (arbitrary) elements of a data set. First-n can 
be applied on a regular data set, a grouped data set, or a grouped-sorted data 
set. Grouping keys can be specified as key-selector functions,
+        tuple positions or case class fields.</p>
+{% highlight scala %}
+val in: DataSet[(Int, String)] = // [...]
+// regular data set
+val result1 = in.first(3)
+// grouped data set
+val result2 = in.groupBy(0).first(3)
+// grouped-sorted data set
+val result3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+The [parallelism](#parallel-execution) of a transformation can be defined by 
`setParallelism(int)` while
+`name(String)` assigns a custom name to a transformation which is helpful for 
debugging. The same is
+possible for [Data Sources](#data-sources) and [Data Sinks](#data-sinks).
+
+`withParameters(Configuration)` passes Configuration objects, which can be 
accessed from the `open()` method inside the user function.
+
+{% top %}
+
+
+Specifying Keys
+-------------
+
+
+
+Some transformations (join, coGroup) require that a key is defined on
+its argument DataSets, and other transformations (Reduce, GroupReduce,
+Aggregate) allow that the DataSet is grouped on a key before they are
+applied.
+
+A DataSet is grouped as
+{% highlight java %}
+DataSet<...> input = // [...]
+DataSet<...> reduced = input
+       .groupBy(/*define key here*/)
+       .reduceGroup(/*do something*/);
+{% endhighlight %}
+
+The data model of Flink is not based on key-value pairs. Therefore,
+you do not need to physically pack the data set types into keys and
+values. Keys are "virtual": they are defined as functions over the
+actual data to guide the grouping operator.
+
+### Define keys for Tuples
+{:.no_toc}
+
+The simplest case is grouping a data set of Tuples on one or more
+fields of the Tuple:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataSet<Tuple3<Integer,String,Long>> input = // [...]
+DataSet<Tuple3<Integer,String,Long> grouped = input
+       .groupBy(0)
+       .reduceGroup(/*do something*/);
+{% endhighlight %}
+
+The data set is grouped on the first field of the tuples (the one of
+Integer type). The GroupReduce function will thus receive groups of tuples with
+the same value in the first field.
+
+{% highlight java %}
+DataSet<Tuple3<Integer,String,Long>> input = // [...]
+DataSet<Tuple3<Integer,String,Long> grouped = input
+       .groupBy(0,1)
+       .reduce(/*do something*/);
+{% endhighlight %}
+
+The data set is grouped on the composite key consisting of the first and the
+second field. Therefore, the GroupReduce function will receive groups
+with the same value for both fields.
+
+A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
+
+{% highlight java %}
+DataSet<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
+{% endhighlight %}
+
+Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a 
key (with the Integer and Float being the key). If you want to "navigate" into 
the nested `Tuple2`, you have to use field expression keys which are explained 
below.
+
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataSet[(Int, String, Long)] = // [...]
+val grouped = input
+  .groupBy(0)
+  .reduceGroup(/*do something*/)
+{% endhighlight %}
+
+The data set is grouped on the first field of the tuples (the one of
+Integer type). The GroupReduce function will thus receive groups of tuples with
+the same value in the first field.
+
+{% highlight scala %}
+val input: DataSet[(Int, String, Long)] = // [...]
+val grouped = input
+  .groupBy(0,1)
+  .reduce(/*do something*/)
+{% endhighlight %}
+
+The data set is grouped on the composite key consisting of the first and the
+second field. Therefore, the GroupReduce function will receive groups
+with the same value for both fields.
+
+A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
+
+{% highlight scala %}
+val ds: DataSet[((Int, Float), String, Long)]
+{% endhighlight %}
+
+Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a 
key (with the Int and
+Float being the key). If you want to "navigate" into the nested `Tuple2`, you 
have to use field expression keys which are explained below.
+
+</div>
+</div>
+
+### Define keys using Field Expressions
+{:.no_toc}
+
+Starting from release 0.7-incubating, you can use String-based field 
expressions to reference nested fields and define keys for grouping, sorting, 
joining, or coGrouping. In addition, field expressions can be used to define 
[semantic function annotations](#semantic-annotations).
+
+Field expressions make it very easy to select fields in (nested) composite 
types such as [Tuple](#tuples-and-case-classes) and [POJO](#pojos) types.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+In the example below, we have a `WC` POJO with two fields "word" and "count". 
To group by the field `word`, we just pass its name to the `groupBy()` function.
+{% highlight java %}
+// some ordinary POJO (Plain old Java Object)
+public class WC {
+  public String word;
+  public int count;
+}
+DataSet<WC> words = // [...]
+DataSet<WC> wordCounts = words.groupBy("word").reduce(/*do something*/);
+{% endhighlight %}
+
+**Field Expression Syntax**:
+
+- Select POJO fields by their field name. For example `"user"` refers to the 
"user" field of a POJO type.
+
+- Select Tuple fields by their field name or 0-offset field index. For example 
`"f0"` and `"5"` refer to the first and sixth field of a Java Tuple type, 
respectively.
+
+- You can select nested fields in POJOs and Tuples. For example `"user.zip"` 
refers to the "zip" field of a POJO which is stored in the "user" field of a 
POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such 
as `"f1.user.zip"` or `"user.f3.1.zip"`.
+
+- You can select the full type using the `"*"` wildcard expressions. This does 
also work for types which are not Tuple or POJO types.
+
+**Field Expression Example**:
+
+{% highlight java %}
+public static class WC {
+  public ComplexNestedClass complex; //nested POJO
+  private int count;
+  // getter / setter for private field (count)
+  public int getCount() {
+    return count;
+  }
+  public void setCount(int c) {
+    this.count = c;
+  }
+}
+public static class ComplexNestedClass {
+  public Integer someNumber;
+  public float someFloat;
+  public Tuple3<Long, Long, String> word;
+  public IntWritable hadoopCitizen;
+}
+{% endhighlight %}
+
+These are valid field expressions for the example code above:
+
+- `"count"`: The count field in the `WC` class.
+
+- `"complex"`: Recursively selects all fields of the field complex of POJO 
type `ComplexNestedClass`.
+
+- `"complex.word.f2"`: Selects the last field of the nested `Tuple3`.
+
+- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type.
+
+</div>
+<div data-lang="scala" markdown="1">
+
+In the example below, we have a `WC` POJO with two fields "word" and "count". 
To group by the field `word`, we just pass its name to the `groupBy()` function.
+{% highlight java %}
+// some ordinary POJO (Plain old Java Object)
+class WC(var word: String, var count: Int) {
+  def this() { this("", 0L) }
+}
+val words: DataSet[WC] = // [...]
+val wordCounts = words.groupBy("word").reduce(/*do something*/)
+
+// or, as a case class, which is less typing
+case class WC(word: String, count: Int)
+val words: DataSet[WC] = // [...]
+val wordCounts = words.groupBy("word").reduce(/*do something*/)
+{% endhighlight %}
+
+**Field Expression Syntax**:
+
+- Select POJO fields by their field name. For example `"user"` refers to the 
"user" field of a POJO type.
+
+- Select Tuple fields by their 1-offset field name or 0-offset field index. 
For example `"_1"` and `"5"` refer to the first and sixth field of a Scala 
Tuple type, respectively.
+
+- You can select nested fields in POJOs and Tuples. For example `"user.zip"` 
refers to the "zip" field of a POJO which is stored in the "user" field of a 
POJO type. Arbitrary nesting and mixing of POJOs and Tuples is supported such 
as `"_2.user.zip"` or `"user._4.1.zip"`.
+
+- You can select the full type using the `"_"` wildcard expressions. This does 
also work for types which are not Tuple or POJO types.
+
+**Field Expression Example**:
+
+{% highlight scala %}
+class WC(var complex: ComplexNestedClass, var count: Int) {
+  def this() { this(null, 0) }
+}
+
+class ComplexNestedClass(
+    var someNumber: Int,
+    someFloat: Float,
+    word: (Long, Long, String),
+    hadoopCitizen: IntWritable) {
+  def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
+}
+{% endhighlight %}
+
+These are valid field expressions for the example code above:
+
+- `"count"`: The count field in the `WC` class.
+
+- `"complex"`: Recursively selects all fields of the field complex of POJO 
type `ComplexNestedClass`.
+
+- `"complex.word._3"`: Selects the last field of the nested `Tuple3`.
+
+- `"complex.hadoopCitizen"`: Selects the Hadoop `IntWritable` type.
+
+</div>
+</div>
+
+### Define keys using Key Selector Functions
+{:.no_toc}
+
+An additional way to define keys are "key selector" functions. A key selector 
function
+takes a single dataset element as input and returns the key for the element. 
The key can be of any type and be derived from arbitrary computations.
+
+The following example shows a key selector function that simply returns the 
field of an object:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+// some ordinary POJO
+public class WC {public String word; public int count;}
+DataSet<WC> words = // [...]
+DataSet<WC> wordCounts = words
+                         .groupBy(
+                           new KeySelector<WC, String>() {
+                             public String getKey(WC wc) { return wc.word; }
+                           })
+                         .reduce(/*do something*/);
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+// some ordinary case class
+case class WC(word: String, count: Int)
+val words: DataSet[WC] = // [...]
+val wordCounts = words
+  .groupBy( _.word ).reduce(/*do something*/)
+{% endhighlight %}
+</div>
+</div>
+
+
+{% top %}
+
+
+Passing Functions to Flink
+--------------------------
+
+Operations require user-defined functions. This section lists several ways for 
doing this.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+#### Implementing an interface
+
+The most basic way is to implement one of the provided interfaces:
+
+{% highlight java %}
+class MyMapFunction implements MapFunction<String, Integer> {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+data.map (new MyMapFunction());
+{% endhighlight %}
+
+#### Anonymous classes
+
+You can pass a function as an anonymous class:
+{% highlight java %}
+data.map(new MapFunction<String, Integer> () {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+
+#### Java 8 Lambdas
+
+Flink also supports Java 8 Lambdas in the Java API. Please see the full [Java 
8 Guide]({{ site.baseurl }}/apis/java8.html).
+
+{% highlight java %}
+DataSet<String> data = // [...]
+data.filter(s -> s.startsWith("http://";));
+{% endhighlight %}
+
+{% highlight java %}
+DataSet<Integer> data = // [...]
+data.reduce((i1,i2) -> i1 + i2);
+{% endhighlight %}
+
+#### Rich functions
+
+All transformations that take as argument a user-defined function can
+instead take as argument a *rich* function. For example, instead of
+
+{% highlight java %}
+class MyMapFunction implements MapFunction<String, Integer> {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+
+you can write
+
+{% highlight java %}
+class MyMapFunction extends RichMapFunction<String, Integer> {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+
+and pass the function as usual to a `map` transformation:
+
+{% highlight java %}
+data.map(new MyMapFunction());
+{% endhighlight %}
+
+Rich functions can also be defined as an anonymous class:
+{% highlight java %}
+data.map (new RichMapFunction<String, Integer>() {
+  public Integer map(String value) { return Integer.parseInt(value); }
+});
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+
+#### Lambda Functions
+
+As already seen in previous examples all operations accept lambda functions 
for describing
+the operation:
+{% highlight scala %}
+val data: DataSet[String] = // [...]
+data.filter { _.startsWith("http://";) }
+{% endhighlight %}
+
+{% highlight scala %}
+val data: DataSet[Int] = // [...]
+data.reduce { (i1,i2) => i1 + i2 }
+// or
+data.reduce { _ + _ }
+{% endhighlight %}
+
+#### Rich functions
+
+All transformations that take as argument a lambda function can
+instead take as argument a *rich* function. For example, instead of
+
+{% highlight scala %}
+data.map { x => x.toInt }
+{% endhighlight %}
+
+you can write
+
+{% highlight scala %}
+class MyMapFunction extends RichMapFunction[String, Int] {
+  def map(in: String):Int = { in.toInt }
+})
+{% endhighlight %}
+
+and pass the function to a `map` transformation:
+
+{% highlight scala %}
+data.map(new MyMapFunction())
+{% endhighlight %}
+
+Rich functions can also be defined as an anonymous class:
+{% highlight scala %}
+data.map (new RichMapFunction[String, Int] {
+  def map(in: String):Int = { in.toInt }
+})
+{% endhighlight %}
+</div>
+
+</div>
+
+Rich functions provide, in addition to the user-defined function (map,
+reduce, etc), four methods: `open`, `close`, `getRuntimeContext`, and
+`setRuntimeContext`. These are useful for parameterizing the function
+(see [Passing Parameters to Functions](#passing-parameters-to-functions)),
+creating and finalizing local state, accessing broadcast variables (see
+[Broadcast Variables](#broadcast-variables), and for accessing runtime
+information such as accumulators and counters (see
+[Accumulators and Counters](#accumulators--counters), and information
+on iterations (see [Iterations](iterations.html)).
+
+In particular for the `reduceGroup` transformation, using a rich
+function is the only way to define an optional `combine` function. See
+the
+[transformations documentation](dataset_transformations.html)
+for a complete example.
+
+{% top %}
+
+
+Data Types
+----------
+
+Flink places some restrictions on the type of elements that are used in 
DataSets and in results
+of transformations. The reason for this is that the system analyzes the types 
to determine
+efficient execution strategies.
+
+There are six different categories of data types:
+
+1. **Java Tuples** and **Scala Case Classes**
+2. **Java POJOs**
+3. **Primitive Types**
+4. **Regular Classes**
+5. **Values**
+6. **Hadoop Writables**
+7. **Special Types**
+
+#### Tuples and Case Classes
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+Tuples are composite types that contain a fixed number of fields with various 
types.
+The Java API provides classes from `Tuple1` up to `Tuple25`. Every field of a 
tuple
+can be an arbitrary Flink type including further tuples, resulting in nested 
tuples. Fields of a
+tuple can be accessed directly using the field's name as `tuple.f4`, or using 
the generic getter method
+`tuple.getField(int position)`. The field indices start at 0. Note that this 
stands in contrast
+to the Scala tuples, but it is more consistent with Java's general indexing.
+
+{% highlight java %}
+DataSet<Tuple2<String, Integer>> wordCounts = env.fromElements(
+    new Tuple2<String, Integer>("hello", 1),
+    new Tuple2<String, Integer>("world", 2));
+
+wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
+    @Override
+    public String map(Tuple2<String, Integer> value) throws Exception {
+        return value.f1;
+    }
+});
+{% endhighlight %}
+
+When grouping, sorting, or joining a data set of tuples, keys can be specified 
as field positions or field expressions. See the [key definition 
section](#specifying-keys) or [data transformation section](#transformations) 
for details.
+
+{% highlight java %}
+wordCounts
+    .groupBy(0) // also valid .groupBy("f0")
+    .reduce(new MyReduceFunction());
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+Scala case classes (and Scala tuples which are a special case of case 
classes), are composite types that contain a fixed number of fields with 
various types. Tuple fields are addressed by their 1-offset names such as `_1` 
for the first field. Case class fields are accessed by their name.
+
+{% highlight scala %}
+case class WordCount(word: String, count: Int)
+val input = env.fromElements(
+    WordCount("hello", 1),
+    WordCount("world", 2)) // Case Class Data Set
+
+input.groupBy("word").reduce(...) // group by field expression "word"
+
+val input2 = env.fromElements(("hello", 1), ("world", 2)) // Tuple2 Data Set
+
+input2.groupBy(0, 1).reduce(...) // group by field positions 0 and 1
+{% endhighlight %}
+
+When grouping, sorting, or joining a data set of tuples, keys can be specified 
as field positions or field expressions. See the [key definition 
section](#specifying-keys) or [data transformation section](#transformations) 
for details.
+
+</div>
+</div>
+
+#### POJOs
+
+Java and Scala classes are treated by Flink as a special POJO data type if 
they fulfill the following requirements:
+
+- The class must be public.
+
+- It must have a public constructor without arguments (default constructor).
+
+- All fields are either public or must be accessible through getter and setter 
functions. For a field called `foo` the getter and setter methods must be named 
`getFoo()` and `setFoo()`.
+
+- The type of a field must be supported by Flink. At the moment, Flink uses 
[Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`).
+
+Flink analyzes the structure of POJO types, i.e., it learns about the fields 
of a POJO. As a result POJO types are easier to use than general types. 
Moreover, Flink can process POJOs more efficiently than general types.
+
+The following example shows a simple POJO with two public fields.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class WordWithCount {
+
+    public String word;
+    public int count;
+
+    public WordWithCount() {}
+
+    public WordWithCount(String word, int count) {
+        this.word = word;
+        this.count = count;
+    }
+}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class WordWithCount(var word: String, var count: Int) {
+    def this() {
+      this(null, -1)
+    }
+}
+{% endhighlight %}
+</div>
+</div>
+
+When grouping, sorting, or joining a data set of POJO types, keys can be 
specified with field expressions. See the [key definition 
section](#specifying-keys) or [data transformation section](#transformations) 
for details.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+wordCounts
+    .groupBy("word")                    // group by field expression "word"
+    .reduce(new MyReduceFunction());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+wordCounts groupBy { _.word } reduce(new MyReduceFunction())
+{% endhighlight %}
+</div>
+</div>
+
+#### Primitive Types
+
+Flink supports all Java and Scala primitive types such as `Integer`, `String`, 
and `Double`.
+
+#### General Class Types
+
+Flink supports most Java and Scala classes (API and custom).
+Restrictions apply to classes containing fields that cannot be serialized, 
like file pointers, I/O streams, or other native
+resources. Classes that follow the Java Beans conventions work well in general.
+
+All classes that are not identified as POJO types (see POJO requirements 
above) are handled by Flink as general class types.
+Flink treats these data types as black boxes and is not able to access their 
their content (i.e., for efficient sorting). General types are de/serialized 
using the serialization framework 
[Kryo](https://github.com/EsotericSoftware/kryo).
+
+When grouping, sorting, or joining a data set of generic types, keys must be 
specified with key selector functions. See the [key definition 
section](#specifying-keys) or [data transformation section](#transformations) 
for details.
+
+
+#### Values
+
+*Value* types describe their serialization and deserialization manually. 
Instead of going through a
+general purpose serialization framework, they provide custom code for those 
operations by means of
+implementing the `org.apache.flinktypes.Value` interface with the methods 
`read` and `write`. Using
+a Value type is reasonable when general purpose serialization would be highly 
inefficient. An
+example would be a data type that implements a sparse vector of elements as an 
array. Knowing that
+the array is mostly zero, one can use a special encoding for the non-zero 
elements, while the
+general purpose serialization would simply write all array elements.
+
+The `org.apache.flinktypes.CopyableValue` interface supports manual internal 
cloning logic in a
+similar way.
+
+Flink comes with pre-defined Value types that correspond to basic data types. 
(`ByteValue`,
+`ShortValue`, `IntValue`, `LongValue`, `FloatValue`, `DoubleValue`, 
`StringValue`, `CharValue`,
+`BooleanValue`). These Value types act as mutable variants of the basic data 
types: Their value can
+be altered, allowing programmers to reuse objects and take pressure off the 
garbage collector.
+
+
+#### Hadoop Writables
+
+You can use types that implement the `org.apache.hadoop.Writable` interface. 
The serialization logic
+defined in the `write()`and `readFields()` methods will be used for 
serialization.
+
+#### Special Types
+
+You can use special types, including Scala's `Either`, `Option`, and `Try`.
+The Java API has its own custom implementation of `Either`.
+Similarly to Scala's `Either`, it represents a value of one two possible 
types, *Left* or *Right*.
+`Either` can be useful for error handling or operators that need to output two 
different types of records.
+
+#### Type Erasure & Type Inference
+
+*Note: This Section is only relevant for Java.*
+
+The Java compiler throws away much of the generic type information after 
compilation. This is
+known as *type erasure* in Java. It means that at runtime, an instance of an 
object does not know
+its generic type any more. For example, instances of `DataSet<String>` and 
`DataSet<Long>` look the
+same to the JVM.
+
+Flink requires type information at the time when it prepares the program for 
execution (when the
+main method of the program is called). The Flink Java API tries to reconstruct 
the type information
+that was thrown away in various ways and store it explicitly in the data sets 
and operators. You can
+retrieve the type via `DataSet.getType()`. The method returns an instance of 
`TypeInformation`,
+which is Flink's internal way of representing types.
+
+The type inference has its limits and needs the "cooperation" of the 
programmer in some cases.
+Examples for that are methods that create data sets from collections, such as
+`ExecutionEnvironment.fromCollection(),` where you can pass an argument that 
describes the type. But
+also generic functions like `MapFunction<I, O>` may need extra type 
information.
+
+The
+{% gh_link 
/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.java
 "ResultTypeQueryable" %}
+interface can be implemented by input formats and functions to tell the API
+explicitly about their return type. The *input types* that the functions are 
invoked with can
+usually be inferred by the result types of the previous operations.
+
+
+#### Object reuse behavior
+
+Apache Flink is trying to reduce the number of object allocations for better 
performance.
+
+By default, user defined functions (like `map()` or `groupReduce()`) are 
getting new objects on each call (or through an iterator). So it is possible to 
keep references to the objects inside the function (for example in a List).
+
+User defined functions are often chained, for example when two mappers with 
the same parallelism are defined one after another. In the chaining case, the 
functions in the chain are receiving the same object instances. So the the 
second `map()` function is receiving the objects the first `map()` is returning.
+This behavior can lead to errors when the first `map()` function keeps a list 
of all objects and the second mapper is modifying objects. In that case, the 
user has to manually create copies of the objects before putting them into the 
list.
+
+Also note that the system assumes that the user is not modifying the incoming 
objects in the `filter()` function.
+
+There is a switch at the `ExectionConfig` which allows users to enable the 
object reuse mode (`enableObjectReuse()`). For mutable types, Flink will reuse 
object instances. In practice that means that a `map()` function will always 
receive the same object instance (with its fields set to new values). The 
object reuse mode will lead to better performance because fewer objects are 
created, but the user has to manually take care of what they are doing with the 
object references.
+
+
+
+{% top %}
+
+
+Data Sources
+------------
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+Data sources create the initial data sets, such as from files or from Java 
collections. The general
+mechanism of creating data sets is abstracted behind an
+{% gh_link 
/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java 
"InputFormat"%}.
+Flink comes
+with several built-in formats to create data sets from common file formats. 
Many of them have
+shortcut methods on the *ExecutionEnvironment*.
+
+File-based:
+
+- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns 
them as Strings.
+
+- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line 
wise and returns them as
+  StringValues. StringValues are mutable strings.
+
+- `readCsvFile(path)` / `CsvInputFormat` - Parses files of comma (or another 
char) delimited fields.
+  Returns a DataSet of tuples or POJOs. Supports the basic java types and 
their Value counterparts as field
+  types.
+
+- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files 
of new-line (or another char sequence)
+  delimited primitive data types such as `String` or `Integer`.
+
+- `readFileOfPrimitives(path, delimiter, Class)` / `PrimitiveInputFormat` - 
Parses files of new-line (or another char sequence)
+   delimited primitive data types such as `String` or `Integer` using the 
given delimiter.
+
+- `readHadoopFile(FileInputFormat, Key, Value, path)` / `FileInputFormat` - 
Creates a JobConf and reads file from the specified
+   path with the specified FileInputFormat, Key class and Value class and 
returns them as Tuple2<Key, Value>.
+
+- `readSequenceFile(Key, Value, path)` / `SequenceFileInputFormat` - Creates a 
JobConf and reads file from the specified path with
+   type SequenceFileInputFormat, Key class and Value class and returns them as 
Tuple2<Key, Value>.
+
+
+Collection-based:
+
+- `fromCollection(Collection)` - Creates a data set from the Java 
Java.util.Collection. All elements
+  in the collection must be of the same type.
+
+- `fromCollection(Iterator, Class)` - Creates a data set from an iterator. The 
class specifies the
+  data type of the elements returned by the iterator.
+
+- `fromElements(T ...)` - Creates a data set from the given sequence of 
objects. All objects must be
+  of the same type.
+
+- `fromParallelCollection(SplittableIterator, Class)` - Creates a data set 
from an iterator, in
+  parallel. The class specifies the data type of the elements returned by the 
iterator.
+
+- `generateSequence(from, to)` - Generates the sequence of numbers in the 
given interval, in
+  parallel.
+
+Generic:
+
+- `readFile(inputFormat, path)` / `FileInputFormat` - Accepts a file input 
format.
+
+- `createInput(inputFormat)` / `InputFormat` - Accepts a generic input format.
+
+**Examples**
+
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// read text file from local files system
+DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
+
+// read text file from a HDFS running at nnHost:nnPort
+DataSet<String> hdfsLines = 
env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
+
+// read a CSV file with three fields
+DataSet<Tuple3<Integer, String, Double>> csvInput = 
env.readCsvFile("hdfs:///the/CSV/file")
+                              .types(Integer.class, String.class, 
Double.class);
+
+// read a CSV file with five fields, taking only two of them
+DataSet<Tuple2<String, Double>> csvInput = 
env.readCsvFile("hdfs:///the/CSV/file")
+                               .includeFields("10010")  // take the first and 
the fourth field
+                              .types(String.class, Double.class);
+
+// read a CSV file with three fields into a POJO (Person.class) with 
corresponding fields
+DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
+                         .pojoType(Person.class, "name", "age", "zipcode");  
+
+
+// read a file from the specified path of type TextInputFormat
+DataSet<Tuple2<LongWritable, Text>> tuples =
+ env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, 
"hdfs://nnHost:nnPort/path/to/file");
+
+// read a file from the specified path of type SequenceFileInputFormat
+DataSet<Tuple2<IntWritable, Text>> tuples =
+ env.readSequenceFile(IntWritable.class, Text.class, 
"hdfs://nnHost:nnPort/path/to/file");
+
+// creates a set from some given elements
+DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
+
+// generate a number sequence
+DataSet<Long> numbers = env.generateSequence(1, 10000000);
+
+// Read data from a relational database using the JDBC input format
+DataSet<Tuple2<String, Integer> dbData =
+    env.createInput(
+      // create and configure input format
+      JDBCInputFormat.buildJDBCInputFormat()
+                     .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                     .setDBUrl("jdbc:derby:memory:persons")
+                     .setQuery("select name, age from persons")
+                     .finish(),
+      // specify type information for DataSet
+      new TupleTypeInfo(Tuple2.class, STRING_TYPE_INFO, INT_TYPE_INFO)
+    );
+
+// Note: Flink's program compiler needs to infer the data types of the data 
items which are returned
+// by an InputFormat. If this information cannot be automatically inferred, it 
is necessary to
+// manually provide the type information as shown in the examples above.
+{% endhighlight %}
+
+#### Configuring CSV Parsing
+
+Flink offers a number of configuration options for CSV parsing:
+
+- `types(Class ... types)` specifies the types of the fields to parse. **It is 
mandatory to configure the types of the parsed fields.**
+  In case of the type class Boolean.class, "True" (case-insensitive), "False" 
(case-insensitive), "1" and "0" are treated as booleans.
+
+- `lineDelimiter(String del)` specifies the delimiter of individual records. 
The default line delimiter is the new-line character `'\n'`.
+
+- `fieldDelimiter(String del)` specifies the delimiter that separates fields 
of a record. The default field delimiter is the comma character `','`.
+
+- `includeFields(boolean ... flag)`, `includeFields(String mask)`, or 
`includeFields(long bitMask)` defines which fields to read from the input file 
(and which to ignore). By default the first *n* fields (as defined by the 
number of types in the `types()` call) are parsed.
+
+- `parseQuotedStrings(char quoteChar)` enables quoted string parsing. Strings 
are parsed as quoted strings if the first character of the string field is the 
quote character (leading or tailing whitespaces are *not* trimmed). Field 
delimiters within quoted strings are ignored. Quoted string parsing fails if 
the last character of a quoted string field is not the quote character or if 
the quote character appears at some point which is not the start or the end of 
the quoted string field (unless the quote character is escaped using '\'). If 
quoted string parsing is enabled and the first character of the field is *not* 
the quoting string, the string is parsed as unquoted string. By default, quoted 
string parsing is disabled.
+
+- `ignoreComments(String commentPrefix)` specifies a comment prefix. All lines 
that start with the specified comment prefix are not parsed and ignored. By 
default, no lines are ignored.
+
+- `ignoreInvalidLines()` enables lenient parsing, i.e., lines that cannot be 
correctly parsed are ignored. By default, lenient parsing is disabled and 
invalid lines raise an exception.
+
+- `ignoreFirstLine()` configures the InputFormat to ignore the first line of 
the input file. By default no line is ignored.
+
+
+#### Recursive Traversal of the Input Path Directory
+
+For file-based inputs, when the input path is a directory, nested files are 
not enumerated by default. Instead, only the files inside the base directory 
are read, while nested files are ignored. Recursive enumeration of nested files 
can be enabled through the `recursive.file.enumeration` configuration 
parameter, like in the following example.
+
+{% highlight java %}
+// enable recursive enumeration of nested input files
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+// create a configuration object
+Configuration parameters = new Configuration();
+
+// set the recursive enumeration parameter
+parameters.setBoolean("recursive.file.enumeration", true);
+
+// pass the configuration to the data source
+DataSet<String> logs = env.readTextFile("file:///path/with.nested/files")
+                         .withParameters(parameters);
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+Data sources create the initial data sets, such as from files or from Java 
collections. The general
+mechanism of creating data sets is abstracted behind an
+{% gh_link 
/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java 
"InputFormat"%}.
+Flink comes
+with several built-in formats to create data sets from common file formats. 
Many of them have
+shortcut methods on the *ExecutionEnvironment*.
+
+File-based:
+
+- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns 
them as Strings.
+
+- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line 
wise and returns them as
+  StringValues. StringValues are mutable strings.
+
+- `readCsvFile(path)` / `CsvInputFormat` - Parses files of comma (or another 
char) delimited fields.
+  Returns a DataSet of tuples, case class objects, or POJOs. Supports the 
basic java types and their Value counterparts as field
+  types.
+
+- `readFileOfPrimitives(path, delimiter)` / `PrimitiveInputFormat` - Parses 
files of new-line (or another char sequence)
+  delimited primitive data types such as `String` or `Integer` using the given 
delimiter.
+
+- `readHadoopFile(FileInputFormat, Key, Value, path)` / `FileInputFormat` - 
Creates a JobConf and reads file from the specified
+   path with the specified FileInputFormat, Key class and Value class and 
returns them as Tuple2<Key, Value>.
+
+- `readSequenceFile(Key, Value, path)` / `SequenceFileInputFormat` - Creates a 
JobConf and reads file from the specified path with
+   type SequenceFileInputFormat, Key class and Value class and returns them as 
Tuple2<Key, Value>.  
+
+Collection-based:
+
+- `fromCollection(Seq)` - Creates a data set from a Seq. All elements
+  in the collection must be of the same type.
+
+- `fromCollection(Iterator)` - Creates a data set from an Iterator. The class 
specifies the
+  data type of the elements returned by the iterator.
+
+- `fromElements(elements: _*)` - Creates a data set from the given sequence of 
objects. All objects
+  must be of the same type.
+
+- `fromParallelCollection(SplittableIterator)` - Creates a data set from an 
iterator, in
+  parallel. The class specifies the data type of the elements returned by the 
iterator.
+
+- `generateSequence(from, to)` - Generates the squence of numbers in the given 
interval, in
+  parallel.
+
+Generic:
+
+- `readFile(inputFormat, path)` / `FileInputFormat` - Accepts a file input 
format.
+
+- `createInput(inputFormat)` / `InputFormat` - Accepts a generic input format.
+
+**Examples**
+
+{% highlight scala %}
+val env  = ExecutionEnvironment.getExecutionEnvironment
+
+// read text file from local files system
+val localLines = env.readTextFile("file:///path/to/my/textfile")
+
+// read text file from a HDFS running at nnHost:nnPort
+val hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")
+
+// read a CSV file with three fields
+val csvInput = env.readCsvFile[(Int, String, Double)]("hdfs:///the/CSV/file")
+
+// read a CSV file with five fields, taking only two of them
+val csvInput = env.readCsvFile[(String, Double)](
+  "hdfs:///the/CSV/file",
+  includedFields = Array(0, 3)) // take the first and the fourth field
+
+// CSV input can also be used with Case Classes
+case class MyCaseClass(str: String, dbl: Double)
+val csvInput = env.readCsvFile[MyCaseClass](
+  "hdfs:///the/CSV/file",
+  includedFields = Array(0, 3)) // take the first and the fourth field
+
+// read a CSV file with three fields into a POJO (Person) with corresponding 
fields
+val csvInput = env.readCsvFile[Person](
+  "hdfs:///the/CSV/file",
+  pojoFields = Array("name", "age", "zipcode"))
+
+// create a set from some given elements
+val values = env.fromElements("Foo", "bar", "foobar", "fubar")
+
+// generate a number sequence
+val numbers = env.generateSequence(1, 10000000);
+
+// read a file from the specified path of type TextInputFormat
+val tuples = env.readHadoopFile(new TextInputFormat, classOf[LongWritable],
+ classOf[Text], "hdfs://nnHost:nnPort/path/to/file")
+
+// read a file from the specified path of type SequenceFileInputFormat
+val tuples = env.readSequenceFile(classOf[IntWritable], classOf[Text],
+ "hdfs://nnHost:nnPort/path/to/file")
+
+{% endhighlight %}
+
+#### Configuring CSV Parsing
+
+Flink offers a number of configuration options for CSV parsing:
+
+- `lineDelimiter: String` specifies the delimiter of individual records. The 
default line delimiter is the new-line character `'\n'`.
+
+- `fieldDelimiter: String` specifies the delimiter that separates fields of a 
record. The default field delimiter is the comma character `','`.
+
+- `includeFields: Array[Int]` defines which fields to read from the input file 
(and which to ignore). By default the first *n* fields (as defined by the 
number of types in the `types()` call) are parsed.
+
+- `pojoFields: Array[String]` specifies the fields of a POJO that are mapped 
to CSV fields. Parsers for CSV fields are automatically initialized based on 
the type and order of the POJO fields.
+
+- `parseQuotedStrings: Character` enables quoted string parsing. Strings are 
parsed as quoted strings if the first character of the string field is the 
quote character (leading or tailing whitespaces are *not* trimmed). Field 
delimiters within quoted strings are ignored. Quoted string parsing fails if 
the last character of a quoted string field is not the quote character. If 
quoted string parsing is enabled and the first character of the field is *not* 
the quoting string, the string is parsed as unquoted string. By default, quoted 
string parsing is disabled.
+
+- `ignoreComments: String` specifies a comment prefix. All lines that start 
with the specified comment prefix are not parsed and ignored. By default, no 
lines are ignored.
+
+- `lenient: Boolean` enables lenient parsing, i.e., lines that cannot be 
correctly parsed are ignored. By default, lenient parsing is disabled and 
invalid lines raise an exception.
+
+- `ignoreFirstLine: Boolean` configures the InputFormat to ignore the first 
line of the input file. By default no line is ignored.
+
+#### Recursive Traversal of the Input Path Directory
+
+For file-based inputs, when the input path is a directory, nested files are 
not enumerated by default. Instead, only the files inside the base directory 
are read, while nested files are ignored. Recursive enumeration of nested files 
can be enabled through the `recursive.file.enumeration` configuration 
parameter, like in the following example.
+
+{% highlight scala %}
+// enable recursive enumeration of nested input files
+val env  = ExecutionEnvironment.getExecutionEnvironment
+
+// create a configuration object
+val parameters = new Configuration
+
+// set the recursive enumeration parameter
+parameters.setBoolean("recursive.file.enumeration", true)
+
+// pass the configuration to the data source
+env.readTextFile("file:///path/with.nested/files").withParameters(parameters)
+{% endhighlight %}
+
+</div>
+</div>
+
+### Read Compressed Files
+
+Flink currently supports transparent decompression of input files if these are 
marked with an appropriate file extension. In particular, this means that no 
further configuration of the input formats is necessary and any 
`FileInputFormat` support the compression, including custom input formats. 
Please notice that compressed files might not be read in parallel, thus 
impacting job scalability.
+
+The following table lists the currently supported compression methods.
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Compression method</th>
+      <th class="text-left">File extensions</th>
+      <th class="text-left" style="width: 20%">Parallelizable</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>DEFLATE</strong></td>
+      <td>`.deflate`</td>
+      <td>no</td>
+    </tr>
+    <tr>
+      <td><strong>GZip</strong></td>
+      <td>`.gz`, `.gzip`</td>
+      <td>no</td>
+    </tr>
+  </tbody>
+</table>
+
+
+{% top %}
+
+
+Execution Configuration
+----------
+
+The `ExecutionEnvironment` also contains the `ExecutionConfig` which allows to 
set job specific configuration values for the runtime.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ExecutionConfig executionConfig = env.getConfig();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+var executionConfig = env.getConfig
+{% endhighlight %}
+</div>
+</div>
+
+The following configuration options are available: (the default is bold)
+
+- **`enableClosureCleaner()`** / `disableClosureCleaner()`. The closure 
cleaner is enabled by default. The closure cleaner removes unneeded references 
to the surrounding class of anonymous functions inside Flink programs.
+With the closure cleaner disabled, it might happen that an anonymous user 
function is referencing the surrounding class, which is usually not 
Serializable. This will lead to exceptions by the serializer.
+
+- `getParallelism()` / `setParallelism(int parallelism)` Set the default 
parallelism for the job.
+
+- `getNumberOfExecutionRetries()` / `setNumberOfExecutionRetries(int 
numberOfExecutionRetries)` Sets the number of times that failed tasks are 
re-executed. A value of zero effectively disables fault tolerance. A value of 
`-1` indicates that the system default value (as defined in the configuration) 
should be used.
+
+- `getExecutionRetryDelay()` / `setExecutionRetryDelay(long 
executionRetryDelay)` Sets the delay in milliseconds that the system waits 
after a job has failed, before re-executing it. The delay starts after all 
tasks have been successfully been stopped on the TaskManagers, and once the 
delay is past, the tasks are re-started. This parameter is useful to delay 
re-execution in order to let certain time-out related failures surface fully 
(like broken connections that have not fully timed out), before attempting a 
re-execution and immediately failing again due to the same problem. This 
parameter only has an effect if the number of execution re-tries is one or more.
+
+- `getExecutionMode()` / `setExecutionMode()`. The default execution mode is 
PIPELINED. Sets the execution mode to execute the program. The execution mode 
defines whether data exchanges are performed in a batch or on a pipelined 
manner.
+
+- `enableForceKryo()` / **`disableForceKryo`**. Kryo is not forced by default. 
Forces the GenericTypeInformation to use the Kryo serializer for POJOS even 
though we could analyze them as a POJO. In some cases this might be preferable. 
For example, when Flink's internal serializers fail to handle a POJO properly.
+
+- `enableForceAvro()` / **`disableForceAvro()`**. Avro is not forced by 
default. Forces the Flink AvroTypeInformation to use the Avro serializer 
instead of Kryo for serializing Avro POJOs.
+
+- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are 
not reused in Flink. Enabling the [object reuse mode](#object-reuse-behavior) 
will instruct the runtime to reuse user objects for better performance. Keep in 
mind that this can lead to bugs when the user-code function of an operation is 
not aware of this behavior.
+
+- **`enableSysoutLogging()`** / `disableSysoutLogging()` JobManager status 
updates are printed to `System.out` by default. This setting allows to disable 
this behavior.
+
+- `getGlobalJobParameters()` / `setGlobalJobParameters()` This method allows 
users to set custom objects as a global configuration for the job. Since the 
`ExecutionConfig` is accessible in all user defined functions, this is an easy 
method for making configuration globally available in a job.
+
+- `addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer)` Register 
a Kryo serializer instance for the given `type`.
+
+- `addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> 
serializerClass)` Register a Kryo serializer class for the given `type`.
+
+- `registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer)` 
Register the given type with Kryo and specify a serializer for it. By 
registering a type with Kryo, the serialization of the type will be much more 
efficient.
+
+- `registerKryoType(Class<?> type)` If the type ends up being serialized with 
Kryo, then it will be registered at Kryo to make sure that only tags (integer 
IDs) are written. If a type is not registered with Kryo, its entire class-name 
will be serialized with every instance, leading to much higher I/O costs.
+
+- `registerPojoType(Class<?> type)` Registers the given type with the ser

<TRUNCATED>

Reply via email to