http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming/index.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md
new file mode 100644
index 0000000..06c0014
--- /dev/null
+++ b/docs/apis/streaming/index.md
@@ -0,0 +1,3306 @@
+---
+title: "Flink DataStream API Programming Guide"
+
+# Top-level navigation
+top-nav-group: apis
+top-nav-pos: 1
+top-nav-title: <strong>Streaming Guide</strong> (DataStream API)
+
+# Sub-level navigation
+sub-nav-group: streaming
+sub-nav-group-title: Streaming Guide
+sub-nav-pos: 1
+sub-nav-title: DataStream API
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+DataStream programs in Flink are regular programs that implement 
transformations on data streams
+(e.g., filtering, updating state, defining windows, aggregating). The data 
streams are initially created from various
+sources (e.g., message queues, socket streams, files). Results are returned 
via sinks, which may for
+example write the data to files, or to standard output (for example the 
command line
+terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+In order to create your own Flink DataStream program, we encourage you to 
start with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as references 
for additional
+operations and advanced features.
+
+
+* This will be replaced by the TOC
+{:toc}
+
+
+Example Program
+---------------
+
+The following program is a complete, working example of streaming window word 
count application, that counts the
+words coming from a web socket in 5 second windows. You can copy &amp; paste 
the code to run it locally.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Collector;
+
+public class WindowWordCount {
+
+    public static void main(String[] args) throws Exception {
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        DataStream<Tuple2<String, Integer>> dataStream = env
+                .socketTextStream("localhost", 9999)
+                .flatMap(new Splitter())
+                .keyBy(0)
+                .timeWindow(Time.of(5, TimeUnit.SECONDS))
+                .sum(1);
+
+        dataStream.print();
+
+        env.execute("Window WordCount");
+    }
+
+    public static class Splitter implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
+        @Override
+        public void flatMap(String sentence, Collector<Tuple2<String, 
Integer>> out) throws Exception {
+            for (String word: sentence.split(" ")) {
+                out.collect(new Tuple2<String, Integer>(word, 1));
+            }
+        }
+    }
+
+}
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.windowing.time.Time
+
+object WindowWordCount {
+  def main(args: Array[String]) {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val text = env.socketTextStream("localhost", 9999)
+
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { 
_.nonEmpty } }
+      .map { (_, 1) }
+      .keyBy(0)
+      .timeWindow(Time.of(5, TimeUnit.SECONDS))
+      .sum(1)
+
+    counts.print
+
+    env.execute("Window Stream WordCount")
+  }
+}
+{% endhighlight %}
+</div>
+
+</div>
+
+To run the example program, start the input stream with netcat first from a 
terminal:
+
+~~~bash
+nc -lk 9999
+~~~
+
+Just type some words hitting return for a new word. These will be the input to 
the
+word count program. If you want to see counts greater than 1, type the same 
word again and again within
+5 seconds (increase the window size from 5 seconds if you cannot type that 
fast &#9786;).
+
+{% top %}
+
+
+Linking with Flink
+------------------
+
+To write programs with Flink, you need to include the Flink DataStream library 
corresponding to
+your programming language in your project.
+
+The simplest way to do this is to use one of the quickstart scripts: either for
+[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for 
[Scala]({{ site.baseurl }}/quickstart/scala_api_quickstart.html). They
+create a blank project from a template (a Maven Archetype), which sets up 
everything for you. To
+manually create the project, you can use the archetype and create a project by 
calling:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight bash %}
+mvn archetype:generate /
+    -DarchetypeGroupId=org.apache.flink/
+    -DarchetypeArtifactId=flink-quickstart-java /
+    -DarchetypeVersion={{site.version }}
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight bash %}
+mvn archetype:generate /
+    -DarchetypeGroupId=org.apache.flink/
+    -DarchetypeArtifactId=flink-quickstart-scala /
+    -DarchetypeVersion={{site.version }}
+{% endhighlight %}
+</div>
+</div>
+
+The archetypes are working for stable releases and preview versions 
(`-SNAPSHOT`).
+
+If you want to add Flink to an existing Maven project, add the following entry 
to your
+*dependencies* section in the *pom.xml* file of your project:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-java</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-scala</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+</div>
+
+In order to create your own Flink program, we encourage you to start with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations).
+
+{% top %}
+
+Program Skeleton
+----------------
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+As presented in the [example](#example-program), Flink DataStream programs 
look like regular Java
+programs with a `main()` method. Each program consists of the same basic parts:
+
+1. Obtaining a `StreamExecutionEnvironment`,
+2. Connecting to data stream sources,
+3. Specifying transformations on the data streams,
+4. Specifying output for the processed data,
+5. Executing the program.
+
+We will now give an overview of each of those steps, please refer to the 
respective sections for
+more details.
+
+The `StreamExecutionEnvironment` is the basis for all Flink DataStream 
programs. You can
+obtain one using these static methods on class `StreamExecutionEnvironment`:
+
+{% highlight java %}
+getExecutionEnvironment()
+
+createLocalEnvironment()
+createLocalEnvironment(int parallelism)
+createLocalEnvironment(int parallelism, Configuration customConfiguration)
+
+createRemoteEnvironment(String host, int port, String... jarFiles)
+createRemoteEnvironment(String host, int port, int parallelism, String... 
jarFiles)
+{% endhighlight %}
+
+Typically, you only need to use `getExecutionEnvironment()`, since this
+will do the right thing depending on the context: if you are executing
+your program inside an IDE or as a regular Java program it will create
+a local environment that will execute your program on your local machine. If
+you created a JAR file from your program, and invoke it through the [command 
line]({{ site.baseurl }}/apis/cli.html)
+or the [web interface]({{ site.baseurl }}/apis/web_client.html),
+the Flink cluster manager will execute your main method and 
`getExecutionEnvironment()` will return
+an execution environment for executing your program on a cluster.
+
+For specifying data sources the execution environment has several methods
+to read from files, sockets, and external systems using various methods. To 
just read
+data from a socket (useful also for debugging), you can use:
+
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> lines = env.socketTextStream("localhost", 9999)
+{% endhighlight %}
+
+This will give you a DataStream on which you can then apply transformations. 
For
+more information on data sources and input formats, please refer to
+[Data Sources](#data-sources).
+
+Once you have a DataStream you can apply transformations to create a new
+DataStream which you can then write to a socket, transform again,
+combine with other DataStreams, or push to an external system (e.g., a message 
queue, or a file system).
+You apply transformations by calling
+methods on DataStream with your own custom transformation functions. For 
example,
+a map transformation looks like this:
+
+{% highlight java %}
+DataStream<String> input = ...;
+
+DataStream<Integer> intValues = input.map(new MapFunction<String, Integer>() {
+    @Override
+    public Integer map(String value) {
+        return Integer.parseInt(value);
+    }
+});
+{% endhighlight %}
+
+This will create a new DataStream by converting every String in the original
+stream to an Integer. For more information and a list of all the 
transformations,
+please refer to [Transformations](#transformations).
+
+Once you have a DataStream containing your final results, you can push the 
result
+to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, 
write to a file,
+or print it.
+
+{% highlight java %}
+writeAsText(String path, ...)
+writeAsCsv(String path, ...)
+writeToSocket(String hostname, int port, ...)
+
+print()
+
+addSink(...)
+{% endhighlight %}
+
+Once you specified the complete program you need to **trigger the program 
execution** by
+calling `execute()` on `StreamExecutionEnvironment`. This will either execute 
on
+the local machine or submit the program for execution on a cluster, depending 
on the chosen execution environment.
+
+{% highlight java %}
+env.execute();
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+<br />
+
+As presented in the [example](#example-program), Flink DataStream programs 
look like regular Scala
+programs with a `main()` method. Each program consists of the same basic parts:
+
+1. Obtaining a `StreamExecutionEnvironment`,
+2. Connecting to data stream sources,
+3. Specifying transformations on the data streams,
+4. Specifying output for the processed data,
+5. Executing the program.
+
+We will now give an overview of each of those steps, please refer to the 
respective sections for
+more details.
+
+The `StreamExecutionEnvironment` is the basis for all Flink DataStream 
programs. You can
+obtain one using these static methods on class `StreamExecutionEnvironment`:
+
+{% highlight scala %}
+def getExecutionEnvironment
+
+def createLocalEnvironment(parallelism: Int =  
Runtime.getRuntime.availableProcessors())
+
+def createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
+def createRemoteEnvironment(host: String, port: Int, parallelism: Int, 
jarFiles: String*)
+{% endhighlight %}
+
+Typically, you only need to use `getExecutionEnvironment`, since this
+will do the right thing depending on the context: if you are executing
+your program inside an IDE or as a regular Java program it will create
+a local environment that will execute your program on your local machine. If
+you created a JAR file from your program, and invoke it through the [command 
line](cli.html)
+or the [web interface](web_client.html),
+the Flink cluster manager will execute your main method and 
`getExecutionEnvironment()` will return
+an execution environment for executing your program on a cluster.
+
+For specifying data sources the execution environment has several methods
+to read from files, sockets, and external systems using various methods. To 
just read
+data from a socket (useful also for debugging), you can use:
+
+{% highlight scala %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment
+
+DataStream<String> lines = env.socketTextStream("localhost", 9999)
+{% endhighlight %}
+
+This will give you a DataStream on which you can then apply transformations. 
For
+more information on data sources and input formats, please refer to
+[Data Sources](#data-sources).
+
+Once you have a DataStream you can apply transformations to create a new
+DataStream which you can then write to a file, transform again,
+combine with other DataStreams, or push to an external system.
+You apply transformations by calling
+methods on DataStream with your own custom transformation function. For 
example,
+a map transformation looks like this:
+
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val mapped = input.map { x => x.toInt }
+{% endhighlight %}
+
+This will create a new DataStream by converting every String in the original
+set to an Integer. For more information and a list of all the transformations,
+please refer to [Transformations](#transformations).
+
+Once you have a DataStream containing your final results, you can push the 
result
+to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, 
write to a file,
+or print it.
+
+{% highlight scala %}
+writeAsText(path: String, ...)
+writeAsCsv(path: String, ...)
+writeToSocket(hostname: String, port: Int, ...)
+
+print()
+
+addSink(...)
+{% endhighlight %}
+
+Once you specified the complete program you need to **trigger the program 
execution** by
+calling `execute` on `StreamExecutionEnvironment`. This will either execute on
+the local machine or submit the program for execution on a cluster, depending 
on the chosen execution environment.
+
+{% highlight scala %}
+env.execute()
+{% endhighlight %}
+
+</div>
+</div>
+
+{% top %}
+
+DataStream Abstraction
+----------------------
+
+A `DataStream` is a possibly unbounded immutable collection of data items of a 
the same type.
+
+Transformations may return different subtypes of `DataStream` allowing 
specialized transformations.
+For example the `keyBy(…)` method returns a `KeyedDataStream` which is a 
stream of data that
+is logically partitioned by a certain key, and can be further windowed.
+
+{% top %}
+
+Lazy Evaluation
+---------------
+
+All Flink DataStream programs are executed lazily: When the program's main 
method is executed, the data loading
+and transformations do not happen directly. Rather, each operation is created 
and added to the
+program's plan. The operations are actually executed when the execution is 
explicitly triggered by
+an `execute()` call on the `StreamExecutionEnvironment` object. Whether the 
program is executed locally
+or on a cluster depends on the type of `StreamExecutionEnvironment`.
+
+The lazy evaluation lets you construct sophisticated programs that Flink 
executes as one
+holistically planned unit.
+
+{% top %}
+
+
+Transformations
+---------------
+
+Data transformations transform one or more DataStreams into a new DataStream. 
Programs can combine
+multiple transformations into sophisticated topologies.
+
+This section gives a description of all the available transformations.
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces one element. A map function that 
doubles the values of the input stream:</p>
+    {% highlight java %}
+DataStream<Integer> dataStream = //...
+dataStream.map(new MapFunction<Integer, Integer>() {
+    @Override
+    public Integer map(Integer value) throws Exception {
+        return 2 * value;
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+
+        <tr>
+          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces zero, one, or more elements. A 
flatmap function that splits sentences to words:</p>
+    {% highlight java %}
+dataStream.flatMap(new FlatMapFunction<String, String>() {
+    @Override
+    public void flatMap(String value, Collector<String> out)
+        throws Exception {
+        for(String word: value.split(" ")){
+            out.collect(word);
+        }
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Evaluates a boolean function for each element and retains those 
for which the function returns true.
+            A filter that filters out zero values:
+            </p>
+    {% highlight java %}
+dataStream.filter(new FilterFunction<Integer>() {
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value != 0;
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
+          <td>
+            <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
+            Internally, this is implemented with hash partitioning. See <a 
href="#specifying-keys">keys</a> on how to specify keys.
+            This transformation returns a KeyedDataStream.</p>
+    {% highlight java %}
+dataStream.keyBy("someKey") // Key by field "someKey"
+dataStream.keyBy(0) // Key by the first element of a Tuple
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
+            emits the new value.
+                    <br/>
+               <br/>
+            A reduce function that creates a stream of partial sums:</p>
+            {% highlight java %}
+keyedStream.reduce(new ReduceFunction<Integer>() {
+    @Override
+    public Integer reduce(Integer value1, Integer value2)
+    throws Exception {
+        return value1 + value2;
+    }
+});
+            {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+          <p>A "rolling" fold on a keyed data stream with an initial value.
+          Combines the current element with the last folded value and
+          emits the new value.
+          <br/>
+          <br/>
+          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
+          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
+          {% highlight java %}
+DataStream<String> result =
+  keyedStream.fold("start", new FoldFunction<Integer, String>() {
+    @Override
+    public String fold(String current, Integer value) {
+        return current + "-" + value;
+    }
+  });
+          {% endhighlight %}
+          </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations</strong><br>KeyedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Rolling aggregations on a keyed data stream. The difference 
between min
+           and minBy is that min returns the minimun value, whereas minBy 
returns
+           the element that has the minimum value in this field (same for max 
and maxBy).</p>
+    {% highlight java %}
+keyedStream.sum(0);
+keyedStream.sum("key");
+keyedStream.min(0);
+keyedStream.min("key");
+keyedStream.max(0);
+keyedStream.max("key");
+keyedStream.minBy(0);
+keyedStream.minBy("key");
+keyedStream.maxBy(0);
+keyedStream.maxBy("key");
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
+          <td>
+            <p>Windows can be defined on already partitioned KeyedStreams. 
Windows group the data in each
+            key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
+            See <a href="#windows">windows</a> for a complete description of 
windows.
+    {% highlight java %}
+dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.of(5, 
TimeUnit.SECONDS))); // Last 5 seconds of data
+    {% endhighlight %}
+        </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>WindowAll</strong><br>DataStream &rarr; 
AllWindowedStream</td>
+          <td>
+              <p>Windows can be defined on regular DataStreams. Windows group 
all the stream events
+              according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
+              See <a href="#windows">windows</a> for a complete description of 
windows.</p>
+              <p><strong>WARNING:</strong> This is in many cases a 
<strong>non-parallel</strong> transformation. All records will be
+               gathered in one task for the windowAll operator.</p>
+  {% highlight java %}
+dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))); // 
Last 5 seconds of data
+  {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Apply</strong><br>WindowedStream &rarr; 
DataStream<br>AllWindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a general function to the window as a whole. Below is a 
function that manually sums the elements of a window.</p>
+            <p><strong>Note:</strong> If you are using a windowAll 
transformation, you need to use an AllWindowFunction instead.</p>
+    {% highlight java %}
+windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, 
Tuple, Window>() {
+    public void apply (Tuple tuple,
+            Window window,
+            Iterable<Tuple2<String, Integer>> values,
+            Collector<Integer> out) throws Exception {
+        int sum = 0;
+        for (value t: values) {
+            sum += t.f1;
+        }
+        out.collect (new Integer(sum));
+    }
+});
+
+// applying an AllWindowFunction on non-keyed window stream
+allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, 
Integer, Window>() {
+    public void apply (Window window,
+            Iterable<Tuple2<String, Integer>> values,
+            Collector<Integer> out) throws Exception {
+        int sum = 0;
+        for (value t: values) {
+            sum += t.f1;
+        }
+        out.collect (new Integer(sum));
+    }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Applies a functional reduce function to the window and returns 
the reduced value.</p>
+    {% highlight java %}
+windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() {
+    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, 
Tuple2<String, Integer> value2) throws Exception {
+        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
+    }
+};
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Fold</strong><br>WindowedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Applies a functional fold function to the window and returns 
the folded value.
+               The example function, when applied on the sequence (1,2,3,4,5),
+               folds the sequence into the string "start-1-2-3-4-5":</p>
+    {% highlight java %}
+windowedStream.fold("start-", new FoldFunction<Integer, String>() {
+    public String fold(String current, Integer value) {
+        return current + "-" + value;
+    }
+};
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations on windows</strong><br>WindowedStream 
&rarr; DataStream</td>
+          <td>
+            <p>Aggregates the contents of a window. The difference between min
+           and minBy is that min returns the minimun value, whereas minBy 
returns
+           the element that has the minimum value in this field (same for max 
and maxBy).</p>
+    {% highlight java %}
+windowedStream.sum(0);
+windowedStream.sum("key");
+windowedStream.min(0);
+windowedStream.min("key");
+windowedStream.max(0);
+windowedStream.max("key");
+windowedStream.minBy(0);
+windowedStream.minBy("key");
+windowedStream.maxBy(0);
+windowedStream.maxBy("key");
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
+          <td>
+            <p>Union of two or more data streams creating a new stream 
containing all the elements from all the streams. Node: If you union a data 
stream
+            with itself you will get each element twice in the resulting 
stream.</p>
+    {% highlight java %}
+dataStream.union(otherStream1, otherStream2, ...);
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>Join two data streams on a given key and a common window.</p>
+    {% highlight java %}
+dataStream.join(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
+    .apply (new JoinFunction () {...});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>Cogroups two data streams on a given key and a common 
window.</p>
+    {% highlight java %}
+dataStream.coGroup(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
+    .apply (new CoGroupFunction () {...});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; 
ConnectedStreams</td>
+          <td>
+            <p>"Connects" two data streams retaining their types. Connect 
allowing for shared state between
+            the two streams.</p>
+    {% highlight java %}
+DataStream<Integer> someStream = //...
+DataStream<String> otherStream = //...
+
+ConnectedStreams<Integer, String> connectedStreams = 
someStream.connect(otherStream);
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; 
DataStream</td>
+          <td>
+            <p>Similar to map and flatMap on a connected data stream</p>
+    {% highlight java %}
+connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
+    @Override
+    public Boolean map1(Integer value) {
+        return true;
+    }
+
+    @Override
+    public Boolean map2(String value) {
+        return false;
+    }
+});
+connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
+
+   @Override
+   public void flatMap1(Integer value, Collector<String> out) {
+       out.collect(value.toString());
+   }
+
+   @Override
+   public void flatMap2(String value, Collector<String> out) {
+       for (String word: value.split(" ")) {
+         out.collect(word);
+       }
+   }
+});
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
+          <td>
+            <p>
+                Split the stream into two or more streams according to some 
criterion.
+                {% highlight java %}
+SplitStream<Integer> split = someDataStream.split(new 
OutputSelector<Integer>() {
+    @Override
+    public Iterable<String> select(Integer value) {
+        List<String> output = new ArrayList<String>();
+        if (value % 2 == 0) {
+            output.add("even");
+        }
+        else {
+            output.add("odd");
+        }
+        return output;
+    }
+});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Select one or more streams from a split stream.
+                {% highlight java %}
+SplitStream<Integer> split;
+DataStream<Integer> even = split.select("even");
+DataStream<Integer> odd = split.select("odd");
+DataStream<Integer> all = split.select("even","odd");
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream 
&rarr; DataStream</td>
+          <td>
+            <p>
+                Creates a "feedback" loop in the flow, by redirecting the 
output of one operator
+                to some previous operator. This is especially useful for 
defining algorithms that
+                continuously update a model. The following code starts with a 
stream and applies
+               the iteration body continuously. Elements that are greater than 
0 are sent back
+               to the feedback channel, and the rest of the elements are 
forwarded downstream.
+               See <a href="#iterations">iterations</a> for a complete 
description.
+                {% highlight java %}
+IterativeStream<Long> iteration = initialStream.iterate();
+DataStream<Long> iterationBody = iteration.map (/*do something*/);
+DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value > 0;
+    }
+});
+iteration.closeWith(feedback);
+DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
+    @Override
+    public boolean filter(Integer value) throws Exception {
+        return value <= 0;
+    }
+});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>
+                Extracts timestamps from records in order to work with windows
+                that use event time semantics. See <a 
href="#working-with-time">working with time</a>.
+                {% highlight java %}
+stream.assignTimestamps (new TimeStampExtractor() {...});
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces one element. A map function that 
doubles the values of the input stream:</p>
+    {% highlight scala %}
+dataStream.map { x => x * 2 }
+    {% endhighlight %}
+          </td>
+        </tr>
+
+        <tr>
+          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Takes one element and produces zero, one, or more elements. A 
flatmap function that splits sentences to words:</p>
+    {% highlight scala %}
+dataStream.flatMap { str => str.split(" ") }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
+          <td>
+            <p>Evaluates a boolean function for each element and retains those 
for which the function returns true.
+            A filter that filters out zero values:
+            </p>
+    {% highlight scala %}
+dataStream.filter { _ != 0 }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
+          <td>
+            <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
+            Internally, this is implemented with hash partitioning. See <a 
href="#specifying-keys">keys</a> on how to specify keys.
+            This transformation returns a KeyedDataStream.</p>
+    {% highlight scala %}
+dataStream.keyBy("someKey") // Key by field "someKey"
+dataStream.keyBy(0) // Key by the first element of a Tuple
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+            <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
+            emits the new value.
+                    <br/>
+               <br/>
+            A reduce function that creates a stream of partial sums:</p>
+            {% highlight scala %}
+keyedStream.reduce { _ + _ }
+            {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
+          <td>
+          <p>A "rolling" fold on a keyed data stream with an initial value.
+          Combines the current element with the last folded value and
+          emits the new value.
+          <br/>
+          <br/>
+          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
+          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
+          {% highlight scala %}
+val result: DataStream[String] =
+    keyedStream.fold("start", (str, i) => { str + "-" + i })
+          {% endhighlight %}
+          </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Aggregations</strong><br>KeyedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Rolling aggregations on a keyed data stream. The difference 
between min
+           and minBy is that min returns the minimun value, whereas minBy 
returns
+           the element that has the minimum value in this field (same for max 
and maxBy).</p>
+    {% highlight scala %}
+keyedStream.sum(0)
+keyedStream.sum("key")
+keyedStream.min(0)
+keyedStream.min("key")
+keyedStream.max(0)
+keyedStream.max("key")
+keyedStream.minBy(0)
+keyedStream.minBy("key")
+keyedStream.maxBy(0)
+keyedStream.maxBy("key")
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
+          <td>
+            <p>Windows can be defined on already partitioned KeyedStreams. 
Windows group the data in each
+            key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
+            See <a href="#windows">windows</a> for a description of windows.
+    {% highlight scala %}
+dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.of(5, 
TimeUnit.SECONDS))) // Last 5 seconds of data
+    {% endhighlight %}
+        </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>WindowAll</strong><br>DataStream &rarr; 
AllWindowedStream</td>
+          <td>
+              <p>Windows can be defined on regular DataStreams. Windows group 
all the stream events
+              according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
+              See <a href="#windows">windows</a> for a complete description of 
windows.</p>
+              <p><strong>WARNING:</strong> This is in many cases a 
<strong>non-parallel</strong> transformation. All records will be
+               gathered in one task for the windowAll operator.</p>
+  {% highlight scala %}
+dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) // 
Last 5 seconds of data
+  {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Apply</strong><br>WindowedStream &rarr; 
DataStream<br>AllWindowedStream &rarr; DataStream</td>
+          <td>
+            <p>Applies a general function to the window as a whole. Below is a 
function that manually sums the elements of a window.</p>
+            <p><strong>Note:</strong> If you are using a windowAll 
transformation, you need to use an AllWindowFunction instead.</p>
+    {% highlight scala %}
+windowedStream.apply { WindowFunction }
+
+// applying an AllWindowFunction on non-keyed window stream
+allWindowedStream.apply { AllWindowFunction }
+
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Applies a functional reduce function to the window and returns 
the reduced value.</p>
+    {% highlight scala %}
+windowedStream.reduce { _ + _ }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Fold</strong><br>WindowedStream &rarr; 
DataStream</td>
+          <td>
+            <p>Applies a functional fold function to the window and returns 
the folded value.
+               The example function, when applied on the sequence (1,2,3,4,5),
+               folds the sequence into the string "start-1-2-3-4-5":</p>
+          {% highlight scala %}
+val result: DataStream[String] =
+    windowedStream.fold("start", (str, i) => { str + "-" + i })
+          {% endhighlight %}
+          </td>
+       </tr>
+        <tr>
+          <td><strong>Aggregations on windows</strong><br>WindowedStream 
&rarr; DataStream</td>
+          <td>
+            <p>Aggregates the contents of a window. The difference between min
+           and minBy is that min returns the minimun value, whereas minBy 
returns
+           the element that has the minimum value in this field (same for max 
and maxBy).</p>
+    {% highlight scala %}
+windowedStream.sum(0)
+windowedStream.sum("key")
+windowedStream.min(0)
+windowedStream.min("key")
+windowedStream.max(0)
+windowedStream.max("key")
+windowedStream.minBy(0)
+windowedStream.minBy("key")
+windowedStream.maxBy(0)
+windowedStream.maxBy("key")
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
+          <td>
+            <p>Union of two or more data streams creating a new stream 
containing all the elements from all the streams. Node: If you union a data 
stream
+            with itself you will get each element twice in the resulting 
stream.</p>
+    {% highlight scala %}
+dataStream.union(otherStream1, otherStream2, ...)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>Join two data streams on a given key and a common window.</p>
+    {% highlight scala %}
+dataStream.join(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
+    .apply { ... }
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>Cogroups two data streams on a given key and a common 
window.</p>
+    {% highlight scala %}
+dataStream.coGroup(otherStream)
+    .where(0).equalTo(1)
+    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
+    .apply {}
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; 
ConnectedStreams</td>
+          <td>
+            <p>"Connects" two data streams retaining their types, allowing for 
shared state between
+            the two streams.</p>
+    {% highlight scala %}
+someStream : DataStream[Int] = ...
+otherStream : DataStream[String] = ...
+
+val connectedStreams = someStream.connect(otherStream)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; 
DataStream</td>
+          <td>
+            <p>Similar to map and flatMap on a connected data stream</p>
+    {% highlight scala %}
+connectedStreams.map(
+    (_ : Int) => true,
+    (_ : String) => false
+)
+connectedStreams.flatMap(
+    (_ : Int) => true,
+    (_ : String) => false
+)
+    {% endhighlight %}
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
+          <td>
+            <p>
+                Split the stream into two or more streams according to some 
criterion.
+                {% highlight scala %}
+val split = someDataStream.split(
+  (num: Int) =>
+    (num % 2) match {
+      case 0 => List("even")
+      case 1 => List("odd")
+    }
+)
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
+          <td>
+            <p>
+                Select one or more streams from a split stream.
+                {% highlight scala %}
+
+val even = split select "even"
+val odd = split select "odd"
+val all = split.select("even","odd")
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream  
&rarr; DataStream</td>
+          <td>
+            <p>
+                Creates a "feedback" loop in the flow, by redirecting the 
output of one operator
+                to some previous operator. This is especially useful for 
defining algorithms that
+                continuously update a model. The following code starts with a 
stream and applies
+               the iteration body continuously. Elements that are greater than 
0 are sent back
+               to the feedback channel, and the rest of the elements are 
forwarded downstream.
+               See <a href="#iterations">iterations</a> for a complete 
description.
+                {% highlight java %}
+initialStream. iterate {
+  iteration => {
+    val iterationBody = iteration.map {/*do something*/}
+    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
+  }
+}
+IterativeStream<Long> iteration = initialStream.iterate();
+DataStream<Long> iterationBody = iteration.map (/*do something*/);
+DataStream<Long> feedback = iterationBody.filter ( _ > 0);
+iteration.closeWith(feedback);
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+        <tr>
+          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; 
DataStream</td>
+          <td>
+            <p>
+                Extracts timestamps from records in order to work with windows
+                that use event time semantics.
+                See <a href="#working-with-time">working with time</a>.
+                {% highlight scala %}
+stream.assignTimestamps { timestampExtractor }
+                {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+The following transformations are available on data streams of Tuples:
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Project</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>Selects a subset of fields from the tuples
+{% highlight java %}
+DataStream<Tuple3<Integer, Double, String>> in = // [...]
+DataStream<Tuple2<String, Integer>> out = in.project(2,0);
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Project</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>Selects a subset of fields from the tuples
+{% highlight scala %}
+val in : DataStream[(Int,Double,String)] = // [...]
+val out = in.project(2,0)
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+
+### Physical partitioning
+
+Flink also gives low-level control (if desired) on the exact stream 
partitioning after a transformation,
+via the following functions.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Hash partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+      <td>
+        <p>
+            Identical to keyBy but returns a DataStream instead of a 
KeyedStream.
+            {% highlight java %}
+dataStream.partitionByHash("someKey");
+dataStream.partitionByHash(0);
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td><strong>Custom partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+      <td>
+        <p>
+            Uses a user-defined Partitioner to select the target task for each 
element.
+            {% highlight java %}
+dataStream.partitionCustom(new Partitioner(){...}, "someKey");
+dataStream.partitionCustom(new Partitioner(){...}, 0);
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+     <td><strong>Random partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+     <td>
+       <p>
+            Partitions elements randomly according to a uniform distribution.
+            {% highlight java %}
+dataStream.partitionRandom();
+            {% endhighlight %}
+       </p>
+     </td>
+   </tr>
+   <tr>
+      <td><strong>Rebalancing (Round-robin 
partitioning)</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements round-robin, creating equal load per 
partition. Useful for performance
+            optimization in the presence of data skew.
+            {% highlight java %}
+dataStream.rebalance();
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Broadcasts elements to every partition.
+            {% highlight java %}
+dataStream.broadcast();
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td><strong>Hash partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+      <td>
+        <p>
+            Identical to keyBy but returns a DataStream instead of a 
KeyedStream.
+            {% highlight scala %}
+dataStream.partitionByHash("someKey")
+dataStream.partitionByHash(0)
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td><strong>Custom partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+      <td>
+        <p>
+            Uses a user-defined Partitioner to select the target task for each 
element.
+            {% highlight scala %}
+dataStream.partitionCustom(partitioner, "someKey")
+dataStream.partitionCustom(partitioner, 0)
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+     <td><strong>Random partitioning</strong><br>DataStream &rarr; 
DataStream</td>
+     <td>
+       <p>
+            Partitions elements randomly according to a uniform distribution.
+            {% highlight scala %}
+dataStream.partitionRandom()
+            {% endhighlight %}
+       </p>
+     </td>
+   </tr>
+   <tr>
+      <td><strong>Rebalancing (Round-robin 
partitioning)</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Partitions elements round-robin, creating equal load per 
partition. Useful for performance
+            optimization in the presence of data skew.
+            {% highlight scala %}
+dataStream.rebalance()
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
+      <td>
+        <p>
+            Broadcasts elements to every partition.
+            {% highlight scala %}
+dataStream.broadcast()
+            {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+### Task chaining and resource groups
+
+Chaining two subsequent transformations means co-locating them within the same 
thread for better
+performance. Flink by default chains operators if this is possible (e.g., two 
subsequent map
+transformations). The API gives fine-grained control over chaining if desired:
+
+Use `StreamExecutionEnvironment.disableOperatorChaining()` if you want to 
disable chaining in
+the whole job. For more fine grained control, the following functions are 
available. Note that
+these functions can only be used right after a DataStream transformation as 
they refer to the
+previous transformation. For example, you can use 
`someStream.map(...).startNewChain()`, but
+you cannot use `someStream.startNewChain()`.
+
+A resource group is a slot in Flink, see
+[slots]({{site.baseurl}}/setup/config.html#configuring-taskmanager-processing-slots).
 You can
+manually isolate operators in separate slots if desired.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td>Start new chain</td>
+      <td>
+        <p>Begin a new chain, starting with this operator. The two
+       mappers will be chained, and filter will not be chained to
+       the first mapper.
+{% highlight java %}
+someStream.filter(...).map(...).startNewChain().map(...);
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td>Disable chaining</td>
+      <td>
+        <p>Do not chain the map operator
+{% highlight java %}
+someStream.map(...).disableChaining();
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td>Start a new resource group</td>
+      <td>
+        <p>Start a new resource group containing the map and the subsequent 
operators.
+{% highlight java %}
+someStream.filter(...).startNewResourceGroup();
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td>Isolate resources</td>
+      <td>
+        <p>Isolate the operator in its own slot.
+{% highlight java %}
+someStream.map(...).isolateResources();
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+   <tr>
+      <td>Start new chain</td>
+      <td>
+        <p>Begin a new chain, starting with this operator. The two
+       mappers will be chained, and filter will not be chained to
+       the first mapper.
+{% highlight scala %}
+someStream.filter(...).map(...).startNewChain().map(...)
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td>Disable chaining</td>
+      <td>
+        <p>Do not chain the map operator
+{% highlight scala %}
+someStream.map(...).disableChaining()
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td>Start a new resource group</td>
+      <td>
+        <p>Start a new resource group containing the map and the subsequent 
operators.
+{% highlight scala %}
+someStream.filter(...).startNewResourceGroup()
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+   <tr>
+      <td>Isolate resources</td>
+      <td>
+        <p>Isolate the operator in its own slot.
+{% highlight scala %}
+someStream.map(...).isolateResources()
+{% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+
+{% top %}
+
+Specifying Keys
+----------------
+
+The `keyBy` transformation requires that a key is defined on
+its argument DataStream.
+
+A DataStream is keyed as
+{% highlight java %}
+DataStream<...> input = // [...]
+DataStream<...> windowed = input
+       .keyBy(/*define key here*/)
+       .window(/*define window here*/);
+{% endhighlight %}
+
+The data model of Flink is not based on key-value pairs. Therefore,
+you do not need to physically pack the data stream types into keys and
+values. Keys are "virtual": they are defined as functions over the
+actual data to guide the grouping operator.
+
+See [the relevant section of the DataSet API documentation]({{ site.baseurl 
}}/apis/batch/index.html#specifying-keys) on how to specify keys.
+Just replace `DataSet` with `DataStream`, and `groupBy` with `keyBy`.
+
+
+
+Passing Functions to Flink
+--------------------------
+
+Some transformations take user-defined functions as arguments.
+
+See [the relevant section of the DataSet API documentation]({{ site.baseurl 
}}/apis/batch/index.html#passing-functions-to-flink).
+
+
+{% top %}
+
+
+Data Types
+----------
+
+Flink places some restrictions on the type of elements that are used in 
DataStreams and in results
+of transformations. The reason for this is that the system analyzes the types 
to determine
+efficient execution strategies.
+
+See [the relevant section of the DataSet API documentation]({{ site.baseurl 
}}/apis/batch/index.html#data-types).
+
+{% top %}
+
+
+Data Sources
+------------
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+Sources can by created by using 
`StreamExecutionEnvironment.addSource(sourceFunction)`.
+You can either use one of the source functions that come with Flink or write a 
custom source
+by implementing the `SourceFunction` for non-parallel sources, or by 
implementing the
+`ParallelSourceFunction` interface or extending `RichParallelSourceFunction` 
for parallel sources.
+
+There are several predefined stream sources accessible from the 
`StreamExecutionEnvironment`:
+
+File-based:
+
+- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns 
them as Strings.
+
+- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line 
wise and returns them as
+  StringValues. StringValues are mutable strings.
+
+- `readFile(path)` / Any input format - Reads files as dictated by the input 
format.
+
+- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files 
of new-line (or another char sequence) delimited primitive data types such as 
`String` or `Integer`.
+
+- `readFileStream` - create a stream by appending elements when there are 
changes to a file
+
+Socket-based:
+
+- `socketTextStream` - Reads from a socket. Elements can be separated by a 
delimiter.
+
+Collection-based:
+
+- `fromCollection(Collection)` - Creates a data stream from the Java 
Java.util.Collection. All elements
+  in the collection must be of the same type.
+
+- `fromCollection(Iterator, Class)` - Creates a data stream from an iterator. 
The class specifies the
+  data type of the elements returned by the iterator.
+
+- `fromElements(T ...)` - Creates a data stream from the given sequence of 
objects. All objects must be
+  of the same type.
+
+- `fromParallelCollection(SplittableIterator, Class)` - Creates a data stream 
from an iterator, in
+  parallel. The class specifies the data type of the elements returned by the 
iterator.
+
+- `generateSequence(from, to)` - Generates the sequence of numbers in the 
given interval, in
+  parallel.
+
+Custom:
+
+- `addSource` - Attache a new source function. For example, to read from 
Apache Kafka you can use
+    `addSource(new FlinkKafkaConsumer082<>(...))`. See [connectors]({{ 
site.baseurl }}/apis/streaming/connectors/) for more details.
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+Sources can by created by using 
`StreamExecutionEnvironment.addSource(sourceFunction)`.
+You can either use one of the source functions that come with Flink or write a 
custom source
+by implementing the `SourceFunction` for non-parallel sources, or by 
implementing the
+`ParallelSourceFunction` interface or extending `RichParallelSourceFunction` 
for parallel sources.
+
+There are several predefined stream sources accessible from the 
`StreamExecutionEnvironment`:
+
+File-based:
+
+- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns 
them as Strings.
+
+- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line 
wise and returns them as
+  StringValues. StringValues are mutable strings.
+
+- `readFile(path)` / Any input format - Reads files as dictated by the input 
format.
+
+- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files 
of new-line (or another char sequence) delimited primitive data types such as 
`String` or `Integer`.
+
+- `readFileStream` - create a stream by appending elements when there are 
changes to a file
+
+Socket-based:
+
+- `socketTextStream` - Reads from a socket. Elements can be separated by a 
delimiter.
+
+Collection-based:
+
+- `fromCollection(Seq)` - Creates a data stream from the Java 
Java.util.Collection. All elements
+  in the collection must be of the same type.
+
+- `fromCollection(Iterator)` - Creates a data stream from an iterator. The 
class specifies the
+  data type of the elements returned by the iterator.
+
+- `fromElements(elements: _*)` - Creates a data stream from the given sequence 
of objects. All objects must be
+  of the same type.
+
+- `fromParallelCollection(SplittableIterator)` - Creates a data stream from an 
iterator, in
+  parallel. The class specifies the data type of the elements returned by the 
iterator.
+
+- `generateSequence(from, to)` - Generates the sequence of numbers in the 
given interval, in
+  parallel.
+
+Custom:
+
+- `addSource` - Attache a new source function. For example, to read from 
Apache Kafka you can use
+    `addSource(new FlinkKafkaConsumer082<>(...))`. See [connectors]({{ 
site.baseurl }}/apis/streaming/connectors/) for more details.
+
+</div>
+</div>
+
+{% top %}
+
+
+Execution Configuration
+----------
+
+The `StreamExecutionEnvironment` also contains the `ExecutionConfig` which 
allows to set job specific configuration values for the runtime.
+
+See [the relevant section of the DataSet API documentation]({{ site.baseurl 
}}/apis/batch/index.html#execution-configuration).
+
+Parameters in the `ExecutionConfig` that pertain specifically to the 
DataStream API are:
+
+- `enableTimestamps()` / **`disableTimestamps()`**: Attach a timestamp to each 
event emitted from a source.
+    `areTimestampsEnabled()` returns the current value.
+
+- `setAutoWatermarkInterval(long milliseconds)`: Set the interval for 
automatic watermark emission. You can
+    get the current value with `long getAutoWatermarkInterval()`
+
+{% top %}
+
+Data Sinks
+----------
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+Data sinks consume DataStreams and forward them to files, sockets, external 
systems, or print them.
+Flink comes with a variety of built-in output formats that are encapsulated 
behind operations on the
+DataStreams:
+
+- `writeAsText()` / `TextOuputFormat` - Writes elements line-wise as Strings. 
The Strings are
+  obtained by calling the *toString()* method of each element.
+
+- `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated 
value files. Row and field
+  delimiters are configurable. The value for each field comes from the 
*toString()* method of the objects.
+
+- `print()` / `printToErr()`  - Prints the *toString()* value
+of each element on the standard out / strandard error stream. Optionally, a 
prefix (msg) can be provided which is
+prepended to the output. This can help to distinguish between different calls 
to *print*. If the parallelism is
+greater than 1, the output will also be prepended with the identifier of the 
task which produced the output.
+
+- `write()` / `FileOutputFormat` - Method and base class for custom file 
outputs. Supports
+  custom object-to-bytes conversion.
+
+- `writeToSocket` - Writes elements to a socket according to a 
`SerializationSchema`
+
+- `addSink` - Invokes a custom sink function. Flink comes bundled with 
connectors to other systems (such as
+    Apache Kafka) that are implemented as sink functions.
+
+</div>
+<div data-lang="scala" markdown="1">
+
+<br />
+
+Data sinks consume DataStreams and forward them to files, sockets, external 
systems, or print them.
+Flink comes with a variety of built-in output formats that are encapsulated 
behind operations on the
+DataStreams:
+
+- `writeAsText()` / `TextOuputFormat` - Writes elements line-wise as Strings. 
The Strings are
+  obtained by calling the *toString()* method of each element.
+
+- `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated 
value files. Row and field
+  delimiters are configurable. The value for each field comes from the 
*toString()* method of the objects.
+
+- `print()` / `printToErr()`  - Prints the *toString()* value
+of each element on the standard out / strandard error stream. Optionally, a 
prefix (msg) can be provided which is
+prepended to the output. This can help to distinguish between different calls 
to *print*. If the parallelism is
+greater than 1, the output will also be prepended with the identifier of the 
task which produced the output.
+
+- `write()` / `FileOutputFormat` - Method and base class for custom file 
outputs. Supports
+  custom object-to-bytes conversion.
+
+- `writeToSocket` - Writes elements to a socket according to a 
`SerializationSchema`
+
+- `addSink` - Invokes a custom sink function. Flink comes bundled with 
connectors to other systems (such as
+    Apache Kafka) that are implemented as sink functions.
+
+</div>
+</div>
+
+
+{% top %}
+
+Debugging
+---------
+
+Before running a streaming program in a distributed cluster, it is a good
+idea to make sure that the implemented algorithm works as desired. Hence, 
implementing data analysis
+programs is usually an incremental process of checking results, debugging, and 
improving.
+
+Flink provides features to significantly ease the development process of data 
analysis
+programs by supporting local debugging from within an IDE, injection of test 
data, and collection of
+result data. This section give some hints how to ease the development of Flink 
programs.
+
+### Local Execution Environment
+
+A `LocalStreamEnvironment` starts a Flink system within the same JVM process 
it was created in. If you
+start the LocalEnvironement from an IDE, you can set breakpoints in your code 
and easily debug your
+program.
+
+A LocalEnvironment is created and used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+
+DataStream<String> lines = env.addSource(/* some source */);
+// build your program
+
+env.execute();
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+val env = StreamExecutionEnvironment.createLocalEnvironment()
+
+val lines = env.addSource(/* some source */)
+// build your program
+
+env.execute()
+{% endhighlight %}
+</div>
+</div>
+
+### Collection Data Sources
+
+Flink provides special data sources which are backed
+by Java collections to ease testing. Once a program has been tested, the 
sources and sinks can be
+easily replaced by sources and sinks that read from / write to external 
systems.
+
+Collection data sources can be used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+
+// Create a DataStream from a list of elements
+DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
+
+// Create a DataStream from any Java collection
+List<Tuple2<String, Integer>> data = ...
+DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
+
+// Create a DataStream from an Iterator
+Iterator<Long> longIt = ...
+DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.createLocalEnvironment()
+
+// Create a DataStream from a list of elements
+val myInts = env.fromElements(1, 2, 3, 4, 5)
+
+// Create a DataStream from any Collection
+val data: Seq[(String, Int)] = ...
+val myTuples = env.fromCollection(data)
+
+// Create a DataStream from an Iterator
+val longIt: Iterator[Long] = ...
+val myLongs = env.fromCollection(longIt)
+{% endhighlight %}
+</div>
+</div>
+
+**Note:** Currently, the collection data source requires that data types and 
iterators implement
+`Serializable`. Furthermore, collection data sources can not be executed in 
parallel (
+parallelism = 1).
+
+### Iterator Data Sink
+
+Flink also provides a sink to collect DataStream results for testing and 
debugging purposes. It can be used as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+import org.apache.flink.contrib.streaming.DataStreamUtils
+
+DataStream<Tuple2<String, Integer>> myResult = ...
+Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+import org.apache.flink.contrib.streaming.DataStreamUtils
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+val myResult: DataStream[(String, Int)] = ...
+val myOutput: Iterator[(String, Int)] = 
DataStreamUtils.collect(myResult.getJavaStream).asScala
+{% endhighlight %}
+</div>
+</div>
+
+
+{% top %}
+
+
+Windows
+-------
+
+### Working with Time
+
+Windows are typically groups of events within a certain time period. Reasoning 
about time and windows assumes
+a definition of time. Flink has support for three kinds of time:
+
+- *Processing time:* Processing time is simply the wall clock time of the 
machine that happens to be
+    executing the transformation. Processing time is the simplest notion of 
time and provides the best
+    performance. However, in distributed and asynchronous environments 
processing time does not provide
+    determinism.
+
+- *Event time:* Event time is the time that each individual event occurred. 
This time is
+    typically embedded within the records before they enter Flink or can be 
extracted from their contents.
+    When using event time, out-of-order events can be properly handled. For 
example, an event with a lower
+    timestamp may arrive after an event with a higher timestamp, but 
transformations will handle these events
+    correctly. Event time processing provides predictable results, but incurs 
more latency, as out-of-order
+    events need to be buffered
+
+- *Ingestion time:* Ingestion time is the time that events enter Flink. In 
particular, the timestamp of
+    an event is assigned by the source operator as the current wall clock time 
of the machine that executes
+    the source task at the time the records enter the Flink source. Ingestion 
time is more predictable
+    than processing time, and gives lower latencies than event time as the 
latency does not depend on
+    external systems. Ingestion time provides thus a middle ground between 
processing time and event time.
+    Ingestion time is a special case of event time (and indeed, it is treated 
by Flink identically to
+    event time).
+
+When dealing with event time, transformations need to avoid indefinite
+wait times for events to arrive. *Watermarks* provide the mechanism to control 
the event time-processing time skew. Watermarks
+are emitted by the sources. A watermark with a certain timestamp denotes the 
knowledge that no event
+with timestamp lower than the timestamp of the watermark will ever arrive.
+
+You can specify the semantics of time in a Flink DataStream program using 
`StreamExecutionEnviroment`, as
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight java %}
+env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+{% endhighlight %}
+</div>
+</div>
+
+The default value is `TimeCharacteristic.ProcessingTime`, so in order to write 
a program with processing
+time semantics nothing needs to be specified (e.g., the first 
[example](#example-program) in this guide follows processing
+time semantics).
+
+In order to work with event time semantics, you need to follow four steps:
+
+- Set `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`
+
+- Use `DataStream.assignTimestamps(...)` in order to tell Flink how timestamps 
relate to events (e.g., which
+    record field is the timestamp)
+
+- Set `enableTimestamps()`, as well the interval for watermark emission 
(`setAutoWatermarkInterval(long milliseconds)`)
+    in `ExecutionConfig`.
+
+For example, assume that we have a data stream of tuples, in which the first 
field is the timestamp (assigned
+by the system that generates these data streams), and we know that the lag 
between the current processing
+time and the timestamp of an event is never more than 1 second:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple4<Long,Integer,Double,String>> stream = //...
+stream.assignTimestamps(new 
TimestampExtractor<Tuple4<Long,Integer,Double,String>>{
+    @Override
+    public long extractTimestamp(Tuple4<Long,Integer,Double,String> element, 
long currentTimestamp) {
+        return element.f0;
+    }
+
+    @Override
+    public long extractWatermark(Tuple4<Long,Integer,Double,String> element, 
long currentTimestamp) {
+        return element.f0 - 1000;
+    }
+
+    @Override
+    public long getCurrentWatermark() {
+        return Long.MIN_VALUE;
+    }
+});
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val stream: DataStream[(Long,Int,Double,String)] = null;
+stream.assignTimestampts(new TimestampExtractor[(Long, Int, Double, String)] {
+  override def extractTimestamp(element: (Long, Int, Double, String), 
currentTimestamp: Long): Long = element._1
+
+  override def extractWatermark(element: (Long, Int, Double, String), 
currentTimestamp: Long): Long = element._1 - 1000
+
+  override def getCurrentWatermark: Long = Long.MinValue
+})
+{% endhighlight %}
+</div>
+</div>
+
+If you know that timestamps of events are always ascending, i.e., elements 
arrive in order, you can use
+the `AscendingTimestampExtractor`, and the system generates watermarks 
automatically:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple4<Long,Integer,Double,String>> stream = //...
+stream.assignTimestamps(new 
AscendingTimestampExtractor<Tuple4<Long,Integer,Double,String>>{
+    @Override
+    public long extractAscendingTimestamp(Tuple4<Long,Integer,Double,String> 
element, long currentTimestamp) {
+        return element.f0;
+    }
+});
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+stream.extractAscendingTimestamp(record => record._1)
+{% endhighlight %}
+</div>
+</div>
+
+In order to write a program with ingestion time semantics, you need to
+set `env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)`. You 
can think of this setting as a
+shortcut for writing a `TimestampExtractor` which assignes timestamps to 
events at the sources
+based on the current source wall-clock time. Flink injects this timestamp 
extractor automatically.
+
+
+### Windows on Keyed Data Streams
+
+Flink offers a variety of methods for defining windows on a `KeyedStream`. All 
of these group elements *per key*,
+i.e., each window will contain elements with the same key value.
+
+#### Basic Window Constructs
+
+Flink offers a general window mechanism that provides flexibility, as well as 
a number of pre-defined windows
+for common use cases. See first if your use case can be served by the 
pre-defined windows below before moving
+to defining your own windows.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+      <tr>
+        <td><strong>Tumbling time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+        <td>
+          <p>
+          Defines a window of 5 seconds, that "tumbles". This means that 
elements are
+          grouped according to their timestamp in groups of 5 second duration, 
and every element belongs to exactly one window.
+         The notion of time is specified by the selected TimeCharacteristic 
(see <a href="#working-with-time">time</a>).
+    {% highlight java %}
+keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS));
+    {% endhighlight %}
+          </p>
+        </td>
+      </tr>
+      <tr>
+          <td><strong>Sliding time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+          <td>
+            <p>
+             Defines a window of 5 seconds, that "slides" by 1 seconds. This 
means that elements are
+             grouped according to their timestamp in groups of 5 second 
duration, and elements can belong to more than
+             one window (since windows overlap by at most 4 seconds)
+             The notion of time is specified by the selected 
TimeCharacteristic (see <a href="#working-with-time">time</a>).
+      {% highlight java %}
+keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS));
+      {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+      <tr>
+        <td><strong>Tumbling count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+        <td>
+          <p>
+          Defines a window of 1000 elements, that "tumbles". This means that 
elements are
+          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
+          and every element belongs to exactly one window.
+    {% highlight java %}
+keyedStream.countWindow(1000);
+    {% endhighlight %}
+        </p>
+        </td>
+      </tr>
+      <tr>
+      <td><strong>Sliding count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+      <td>
+        <p>
+          Defines a window of 1000 elements, that "slides" every 100 elements. 
This means that elements are
+          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
+          and every element can belong to more than one window (as windows 
overlap by at most 900 elements).
+  {% highlight java %}
+keyedStream.countWindow(1000, 100)
+  {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+
+<div data-lang="scala" markdown="1">
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+      <tr>
+        <td><strong>Tumbling time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+        <td>
+          <p>
+          Defines a window of 5 seconds, that "tumbles". This means that 
elements are
+          grouped according to their timestamp in groups of 5 second duration, 
and every element belongs to exactly one window.
+          The notion of time is specified by the selected TimeCharacteristic 
(see <a href="#working-with-time">time</a>).
+    {% highlight scala %}
+keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS))
+    {% endhighlight %}
+          </p>
+        </td>
+      </tr>
+      <tr>
+          <td><strong>Sliding time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+          <td>
+            <p>
+             Defines a window of 5 seconds, that "slides" by 1 seconds. This 
means that elements are
+             grouped according to their timestamp in groups of 5 second 
duration, and elements can belong to more than
+             one window (since windows overlap by at most 4 seconds)
+             The notion of time is specified by the selected 
TimeCharacteristic (see <a href="#working-with-time">time</a>).
+      {% highlight scala %}
+keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS))
+      {% endhighlight %}
+            </p>
+          </td>
+        </tr>
+      <tr>
+        <td><strong>Tumbling count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+        <td>
+          <p>
+          Defines a window of 1000 elements, that "tumbles". This means that 
elements are
+          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
+          and every element belongs to exactly one window.
+    {% highlight scala %}
+keyedStream.countWindow(1000)
+    {% endhighlight %}
+        </p>
+        </td>
+      </tr>
+      <tr>
+      <td><strong>Sliding count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+      <td>
+        <p>
+          Defines a window of 1000 elements, that "slides" every 100 elements. 
This means that elements are
+          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
+          and every element can belong to more than one window (as windows 
overlap by at most 900 elements).
+  {% highlight scala %}
+keyedStream.countWindow(1000, 100)
+  {% endhighlight %}
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+</div>
+</div>
+
+#### Advanced Window Constructs
+
+The general mechanism can define more powerful windows at the cost of more 
verbose syntax. For example,
+below is a window definition where windows hold elements of the last 5 seconds 
and slides every 1 second,
+but the execution of the window function is triggered when 100 elements have 
been added to the
+window, and every time execution is triggered, 10 elements are retained in the 
window:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+keyedStream
+    .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS))
+    .trigger(CountTrigger.of(100))
+    .evictor(CountEvictor.of(10));
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+keyedStream
+    .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS))
+    .trigger(CountTrigger.of(100))
+    .evictor(CountEvictor.of(10))
+{% endhighlight %}
+</div>
+</div>
+
+The general recipe for building a custom window is to specify (1) a 
`WindowAssigner`, (2) a `Trigger` (optionally),
+and (3) an `Evictor` (optionally).
+
+The `WindowAssigner` defines how incoming elements are assigned to windows. A 
window is a logical group of elements
+that has a begin-value, and an end-value corresponding to a begin-time and 
end-time. Elements with timestamp (according
+to some notion of time described above within these values are part of the 
window).
+
+For example, the `SlidingTimeWindows`
+assigner in the code above defines a window of size 5 seconds, and a slide of 
1 second. Assume that
+time starts from 0 and is measured in milliseconds. Then, we have 6 windows
+that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], 
and [5000, 10000]. Each incoming
+element is assigned to the windows according to its timestamp. For example, an 
element with timestamp 2000 will be
+assigned to the first three windows. Flink comes bundled with window assigners 
that cover the most common use cases. You can write your
+own window types by extending the `WindowAssigner` class.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="java" markdown="1">
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+      <tr>
+        <td><strong>Global window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+        <td>
+          <p>
+           All incoming elements of a given key are assigned to the same 
window.
+           The window does not contain a default trigger, hence it will never 
be triggered
+           if a trigger is not explicitly specified.
+          </p>
+    {% highlight java %}
+stream.window(GlobalWindows.create());
+    {% endhighlight %}
+        </td>
+      </tr>
+      <tr>
+          <td><strong>Tumbling time windows</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+          <td>
+            <p>
+              Incoming elements are assigned to a window of a certain size (1 
second below) based on
+              their timestamp. Windows do not overlap, i.e., each element is 
assigned to exactly one window.
+             The notion of time is picked from the specified 
TimeCharacteristic (see <a href="#working-with-time">time</a>).
+             The window comes with a default trigger. For event/ingestion 
time, a window is triggered when a
+             watermark with value higher than its end-value is received, 
whereas for processing time
+             when the current processing time exceeds its current end value.
+            </p>
+      {% highlight java %}
+stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)));
+      {% endhighlight %}
+          </td>
+        </tr>
+      <tr>
+        <td><strong>Sliding time windows</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+        <td>
+          <p>
+            Incoming elements are assigned to a window of a certain size (5 
seconds below) based on
+            their timestamp. Windows "slide" by the provided value (1 second 
in the example), and hence
+            overlap. The window comes with a default trigger. For 
event/ingestion time, a window is triggered when a
+           watermark with value higher than its end-value is received, whereas 
for processing time
+           when the current processing time exceeds its current end value.
+          </p>
+    {% highlight java %}
+stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS)));
+    {% endhighlight %}
+        </td>
+      </tr>
+  </tbody>
+</table>
+</div>
+
+<div data-lang="scala" markdown="1">
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+      <tr>
+        <td><strong>Global window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+        <td>
+          <p>
+            All incoming elements of a given key are assigned to the same 
window.
+           The window does not contain a default trigger, hence it will never 
be triggered
+           if a trigger is not explicitly specified.
+          </p>
+    {% highlight scala %}
+stream.window(GlobalWindows.create)
+    {% endhighlight %}
+        </td>
+      </tr>
+      <tr>
+          <td><strong>Tumbling time windows</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+          <td>
+            <p>
+              Incoming elements are assigned to a window of a certain size (1 
second below) based on
+              their timestamp. Windows do not overlap, i.e., each element is 
assigned to exactly one window.
+             The notion of time is specified by the selected 
TimeCharacteristic (see <a href="#working-with-time">time</a>).
+             The window comes with a default trigger. For event/ingestion 
time, a window is triggered when a
+             watermark with value higher than its end-value is received, 
whereas for processing time
+             when the current processing time exceeds its current end value.
+            </p>
+      {% highlight scala %}
+stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      {% endhighlight %}
+          </td>
+        </tr>
+      <tr>
+        <td><strong>Sliding time windows</strong><br>KeyedStream &rarr; 
WindowedStream</td>
+        <td>
+          <p>
+            Incoming elements are assigned to a window of a certain size (5 
seconds below) based on
+            their timestamp. Windows "slide" by the provided value (1 second 
in the example), and hence
+            overlap. The window comes with a default trigger. For 
event/ingestion time, a window is triggered when a
+           watermark with value higher than its end-value is received, whereas 
for processing time
+           when the current processing time exceeds its current end value.
+          </p>
+    {% highlight scala %}
+stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS)))
+    {% endhighlight %}
+        </td>
+      </tr>
+  </tbody>
+</table>
+</div>
+
+</div>
+
+The `Trigger` specifies when the function that comes after the window clause 
(e.g., `sum`, `count`) is evaluated ("fires")
+for each window. If a trigger is not specified, a default trigger for each 
window type is used (that is part of the
+definition of the `WindowAssigner`). Flink comes bundled with a set of 
triggers if the ones that windows use by
+default do not fit the application. You can write your own trigger by 
implementing the `Trigger` interface. Note that
+specifying a trigger will override the default trigger of the window assigner.
+
+<div class="codetabs" markdown="1">
+
+<div data-lang="java" markdown="1">
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+  <tbody>
+  <tr>
+    <td><strong>Processing time trigger</strong></td>
+    <td>
+      <p>
+        A window is fired when the current processing time exceeds its 
end-value.
+        The elements on the triggered window are henceforth discarded.
+      </p>
+{% highlight java %}
+windowedStream.trigger(ProcessingTimeTrigger.create());
+{% endhighlight %}
+    </td>
+  </tr>
+  <tr>
+    <td><strong>Watermark trigger</strong></td>
+    <td>
+      <p>
+        A window is fired when a watermark with value that exceeds the 
window's end-value has been received.
+        The elements on the triggered window are henceforth discarded.
+      </p>
+{% highlight java %}
+windowedStream.trigger(EventTimeTrigger.create());
+{% endhighlight %}
+    </td>
+  </tr>
+  <tr>
+    <td><strong>Continuous processing time trigger</strong></td>
+    <td>
+      <p>
+        A window is periodically considered for being fired (every 5 seconds 
in the example).
+        The window is actually fired only when the current processing time 
exceeds its end-value.
+        The elements on the triggered window are retained.
+      </p>
+{% highlight java %}
+windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, 
TimeUnit.SECONDS)));
+{% endhigh

<TRUNCATED>

Reply via email to