http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
deleted file mode 100644
index eac464f..0000000
--- a/docs/apis/streaming_guide.md
+++ /dev/null
@@ -1,4038 +0,0 @@
----
-title: "Flink DataStream API Programming Guide"
-is_beta: false
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<a href="#top"></a>
-
-DataStream programs in Flink are regular programs that implement 
transformations on data streams
-(e.g., filtering, updating state, defining windows, aggregating). The data 
streams are initially created from various
-sources (e.g., message queues, socket streams, files). Results are returned 
via sinks, which may for
-example write the data to files, or to standard output (for example the 
command line
-terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
-The execution can happen in a local JVM, or on clusters of many machines.
-
-In order to create your own Flink DataStream program, we encourage you to 
start with the
-[program skeleton](#program-skeleton) and gradually add your own
-[transformations](#transformations). The remaining sections act as references 
for additional
-operations and advanced features.
-
-
-* This will be replaced by the TOC
-{:toc}
-
-
-Example Program
----------------
-
-The following program is a complete, working example of streaming window word 
count application, that counts the
-words coming from a web socket in 5 second windows. You can copy &amp; paste 
the code to run it locally.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-{% highlight java %}
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Collector;
-
-public class WindowWordCount {
-
-    public static void main(String[] args) throws Exception {
-
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-        DataStream<Tuple2<String, Integer>> dataStream = env
-                .socketTextStream("localhost", 9999)
-                .flatMap(new Splitter())
-                .keyBy(0)
-                .timeWindow(Time.of(5, TimeUnit.SECONDS))
-                .sum(1);
-
-        dataStream.print();
-
-        env.execute("Window WordCount");
-    }
-
-    public static class Splitter implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
-        @Override
-        public void flatMap(String sentence, Collector<Tuple2<String, 
Integer>> out) throws Exception {
-            for (String word: sentence.split(" ")) {
-                out.collect(new Tuple2<String, Integer>(word, 1));
-            }
-        }
-    }
-
-}
-{% endhighlight %}
-
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.windowing.time.Time
-
-object WindowWordCount {
-  def main(args: Array[String]) {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val text = env.socketTextStream("localhost", 9999)
-
-    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { 
_.nonEmpty } }
-      .map { (_, 1) }
-      .keyBy(0)
-      .timeWindow(Time.of(5, TimeUnit.SECONDS))
-      .sum(1)
-
-    counts.print
-
-    env.execute("Window Stream WordCount")
-  }
-}
-{% endhighlight %}
-</div>
-
-</div>
-
-To run the example program, start the input stream with netcat first from a 
terminal:
-
-~~~bash
-nc -lk 9999
-~~~
-
-Just type some words hitting return for a new word. These will be the input to 
the
-word count program. If you want to see counts greater than 1, type the same 
word again and again within
-5 seconds (increase the window size from 5 seconds if you cannot type that 
fast &#9786;).
-
-[Back to top](#top)
-
-
-Linking with Flink
-------------------
-
-To write programs with Flink, you need to include the Flink DataStream library 
corresponding to
-your programming language in your project.
-
-The simplest way to do this is to use one of the quickstart scripts: either for
-[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for 
[Scala]({{ site.baseurl }}/quickstart/scala_api_quickstart.html). They
-create a blank project from a template (a Maven Archetype), which sets up 
everything for you. To
-manually create the project, you can use the archetype and create a project by 
calling:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight bash %}
-mvn archetype:generate /
-    -DarchetypeGroupId=org.apache.flink/
-    -DarchetypeArtifactId=flink-quickstart-java /
-    -DarchetypeVersion={{site.version }}
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight bash %}
-mvn archetype:generate /
-    -DarchetypeGroupId=org.apache.flink/
-    -DarchetypeArtifactId=flink-quickstart-scala /
-    -DarchetypeVersion={{site.version }}
-{% endhighlight %}
-</div>
-</div>
-
-The archetypes are working for stable releases and preview versions 
(`-SNAPSHOT`).
-
-If you want to add Flink to an existing Maven project, add the following entry 
to your
-*dependencies* section in the *pom.xml* file of your project:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-streaming-java</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-clients</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight xml %}
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-streaming-scala</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-clients</artifactId>
-  <version>{{site.version }}</version>
-</dependency>
-{% endhighlight %}
-</div>
-</div>
-
-In order to create your own Flink program, we encourage you to start with the
-[program skeleton](#program-skeleton) and gradually add your own
-[transformations](#transformations).
-
-[Back to top](#top)
-
-Program Skeleton
-----------------
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-As presented in the [example](#example-program), Flink DataStream programs 
look like regular Java
-programs with a `main()` method. Each program consists of the same basic parts:
-
-1. Obtaining a `StreamExecutionEnvironment`,
-2. Connecting to data stream sources,
-3. Specifying transformations on the data streams,
-4. Specifying output for the processed data,
-5. Executing the program.
-
-We will now give an overview of each of those steps, please refer to the 
respective sections for
-more details.
-
-The `StreamExecutionEnvironment` is the basis for all Flink DataStream 
programs. You can
-obtain one using these static methods on class `StreamExecutionEnvironment`:
-
-{% highlight java %}
-getExecutionEnvironment()
-
-createLocalEnvironment()
-createLocalEnvironment(int parallelism)
-createLocalEnvironment(int parallelism, Configuration customConfiguration)
-
-createRemoteEnvironment(String host, int port, String... jarFiles)
-createRemoteEnvironment(String host, int port, int parallelism, String... 
jarFiles)
-{% endhighlight %}
-
-Typically, you only need to use `getExecutionEnvironment()`, since this
-will do the right thing depending on the context: if you are executing
-your program inside an IDE or as a regular Java program it will create
-a local environment that will execute your program on your local machine. If
-you created a JAR file from your program, and invoke it through the [command 
line](cli.html)
-or the [web interface](web_client.html),
-the Flink cluster manager will execute your main method and 
`getExecutionEnvironment()` will return
-an execution environment for executing your program on a cluster.
-
-For specifying data sources the execution environment has several methods
-to read from files, sockets, and external systems using various methods. To 
just read
-data from a socket (useful also for debugging), you can use:
-
-{% highlight java %}
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-DataStream<String> lines = env.socketTextStream("localhost", 9999)
-{% endhighlight %}
-
-This will give you a DataStream on which you can then apply transformations. 
For
-more information on data sources and input formats, please refer to
-[Data Sources](#data-sources).
-
-Once you have a DataStream you can apply transformations to create a new
-DataStream which you can then write to a socket, transform again,
-combine with other DataStreams, or push to an external system (e.g., a message 
queue, or a file system).
-You apply transformations by calling
-methods on DataStream with your own custom transformation functions. For 
example,
-a map transformation looks like this:
-
-{% highlight java %}
-DataStream<String> input = ...;
-
-DataStream<Integer> intValues = input.map(new MapFunction<String, Integer>() {
-    @Override
-    public Integer map(String value) {
-        return Integer.parseInt(value);
-    }
-});
-{% endhighlight %}
-
-This will create a new DataStream by converting every String in the original
-stream to an Integer. For more information and a list of all the 
transformations,
-please refer to [Transformations](#transformations).
-
-Once you have a DataStream containing your final results, you can push the 
result
-to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, 
write to a file,
-or print it.
-
-{% highlight java %}
-writeAsText(String path, ...)
-writeAsCsv(String path, ...)
-writeToSocket(String hostname, int port, ...)
-
-print()
-
-addSink(...)
-{% endhighlight %}
-
-Once you specified the complete program you need to **trigger the program 
execution** by
-calling `execute()` on `StreamExecutionEnvironment`. This will either execute 
on
-the local machine or submit the program for execution on a cluster, depending 
on the chosen execution environment.
-
-{% highlight java %}
-env.execute();
-{% endhighlight %}
-
-</div>
-<div data-lang="scala" markdown="1">
-
-<br />
-
-As presented in the [example](#example-program), Flink DataStream programs 
look like regular Scala
-programs with a `main()` method. Each program consists of the same basic parts:
-
-1. Obtaining a `StreamExecutionEnvironment`,
-2. Connecting to data stream sources,
-3. Specifying transformations on the data streams,
-4. Specifying output for the processed data,
-5. Executing the program.
-
-We will now give an overview of each of those steps, please refer to the 
respective sections for
-more details.
-
-The `StreamExecutionEnvironment` is the basis for all Flink DataStream 
programs. You can
-obtain one using these static methods on class `StreamExecutionEnvironment`:
-
-{% highlight scala %}
-def getExecutionEnvironment
-
-def createLocalEnvironment(parallelism: Int =  
Runtime.getRuntime.availableProcessors())
-
-def createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
-def createRemoteEnvironment(host: String, port: Int, parallelism: Int, 
jarFiles: String*)
-{% endhighlight %}
-
-Typically, you only need to use `getExecutionEnvironment`, since this
-will do the right thing depending on the context: if you are executing
-your program inside an IDE or as a regular Java program it will create
-a local environment that will execute your program on your local machine. If
-you created a JAR file from your program, and invoke it through the [command 
line](cli.html)
-or the [web interface](web_client.html),
-the Flink cluster manager will execute your main method and 
`getExecutionEnvironment()` will return
-an execution environment for executing your program on a cluster.
-
-For specifying data sources the execution environment has several methods
-to read from files, sockets, and external systems using various methods. To 
just read
-data from a socket (useful also for debugging), you can use:
-
-{% highlight scala %}
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment
-
-DataStream<String> lines = env.socketTextStream("localhost", 9999)
-{% endhighlight %}
-
-This will give you a DataStream on which you can then apply transformations. 
For
-more information on data sources and input formats, please refer to
-[Data Sources](#data-sources).
-
-Once you have a DataStream you can apply transformations to create a new
-DataStream which you can then write to a file, transform again,
-combine with other DataStreams, or push to an external system.
-You apply transformations by calling
-methods on DataStream with your own custom transformation function. For 
example,
-a map transformation looks like this:
-
-{% highlight scala %}
-val input: DataStream[String] = ...
-
-val mapped = input.map { x => x.toInt }
-{% endhighlight %}
-
-This will create a new DataStream by converting every String in the original
-set to an Integer. For more information and a list of all the transformations,
-please refer to [Transformations](#transformations).
-
-Once you have a DataStream containing your final results, you can push the 
result
-to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, 
write to a file,
-or print it.
-
-{% highlight scala %}
-writeAsText(path: String, ...)
-writeAsCsv(path: String, ...)
-writeToSocket(hostname: String, port: Int, ...)
-
-print()
-
-addSink(...)
-{% endhighlight %}
-
-Once you specified the complete program you need to **trigger the program 
execution** by
-calling `execute` on `StreamExecutionEnvironment`. This will either execute on
-the local machine or submit the program for execution on a cluster, depending 
on the chosen execution environment.
-
-{% highlight scala %}
-env.execute()
-{% endhighlight %}
-
-</div>
-</div>
-
-[Back to top](#top)
-
-DataStream Abstraction
-----------------------
-
-A `DataStream` is a possibly unbounded immutable collection of data items of a 
the same type.
-
-Transformations may return different subtypes of `DataStream` allowing 
specialized transformations.
-For example the `keyBy(…)` method returns a `KeyedDataStream` which is a 
stream of data that
-is logically partitioned by a certain key, and can be further windowed.
-
-[Back to top](#top)
-
-Lazy Evaluation
----------------
-
-All Flink DataStream programs are executed lazily: When the program's main 
method is executed, the data loading
-and transformations do not happen directly. Rather, each operation is created 
and added to the
-program's plan. The operations are actually executed when the execution is 
explicitly triggered by
-an `execute()` call on the `StreamExecutionEnvironment` object. Whether the 
program is executed locally
-or on a cluster depends on the type of `StreamExecutionEnvironment`.
-
-The lazy evaluation lets you construct sophisticated programs that Flink 
executes as one
-holistically planned unit.
-
-[Back to top](#top)
-
-
-Transformations
----------------
-
-Data transformations transform one or more DataStreams into a new DataStream. 
Programs can combine
-multiple transformations into sophisticated topologies.
-
-This section gives a description of all the available transformations.
-
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Takes one element and produces one element. A map function that 
doubles the values of the input stream:</p>
-    {% highlight java %}
-DataStream<Integer> dataStream = //...
-dataStream.map(new MapFunction<Integer, Integer>() {
-    @Override
-    public Integer map(Integer value) throws Exception {
-        return 2 * value;
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-
-        <tr>
-          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Takes one element and produces zero, one, or more elements. A 
flatmap function that splits sentences to words:</p>
-    {% highlight java %}
-dataStream.flatMap(new FlatMapFunction<String, String>() {
-    @Override
-    public void flatMap(String value, Collector<String> out)
-        throws Exception {
-        for(String word: value.split(" ")){
-            out.collect(word);
-        }
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Evaluates a boolean function for each element and retains those 
for which the function returns true.
-            A filter that filters out zero values:
-            </p>
-    {% highlight java %}
-dataStream.filter(new FilterFunction<Integer>() {
-    @Override
-    public boolean filter(Integer value) throws Exception {
-        return value != 0;
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
-          <td>
-            <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
-            Internally, this is implemented with hash partitioning. See <a 
href="#specifying-keys">keys</a> on how to specify keys.
-            This transformation returns a KeyedDataStream.</p>
-    {% highlight java %}
-dataStream.keyBy("someKey") // Key by field "someKey"
-dataStream.keyBy(0) // Key by the first element of a Tuple
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
-          <td>
-            <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
-            emits the new value.
-                    <br/>
-               <br/>
-            A reduce function that creates a stream of partial sums:</p>
-            {% highlight java %}
-keyedStream.reduce(new ReduceFunction<Integer>() {
-    @Override
-    public Integer reduce(Integer value1, Integer value2)
-    throws Exception {
-        return value1 + value2;
-    }
-});
-            {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
-          <td>
-          <p>A "rolling" fold on a keyed data stream with an initial value.
-          Combines the current element with the last folded value and
-          emits the new value.
-          <br/>
-          <br/>
-          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
-          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
-          {% highlight java %}
-DataStream<String> result =
-  keyedStream.fold("start", new FoldFunction<Integer, String>() {
-    @Override
-    public String fold(String current, Integer value) {
-        return current + "-" + value;
-    }
-  });
-          {% endhighlight %}
-          </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Aggregations</strong><br>KeyedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Rolling aggregations on a keyed data stream. The difference 
between min
-           and minBy is that min returns the minimun value, whereas minBy 
returns
-           the element that has the minimum value in this field (same for max 
and maxBy).</p>
-    {% highlight java %}
-keyedStream.sum(0);
-keyedStream.sum("key");
-keyedStream.min(0);
-keyedStream.min("key");
-keyedStream.max(0);
-keyedStream.max("key");
-keyedStream.minBy(0);
-keyedStream.minBy("key");
-keyedStream.maxBy(0);
-keyedStream.maxBy("key");
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
-          <td>
-            <p>Windows can be defined on already partitioned KeyedStreams. 
Windows group the data in each
-            key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
-            See <a href="#windows">windows</a> for a complete description of 
windows.
-    {% highlight java %}
-dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.of(5, 
TimeUnit.SECONDS))); // Last 5 seconds of data
-    {% endhighlight %}
-        </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>WindowAll</strong><br>DataStream &rarr; 
AllWindowedStream</td>
-          <td>
-              <p>Windows can be defined on regular DataStreams. Windows group 
all the stream events
-              according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
-              See <a href="#windows">windows</a> for a complete description of 
windows.</p>
-              <p><strong>WARNING:</strong> This is in many cases a 
<strong>non-parallel</strong> transformation. All records will be
-               gathered in one task for the windowAll operator.</p>
-  {% highlight java %}
-dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))); // 
Last 5 seconds of data
-  {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Apply</strong><br>WindowedStream &rarr; 
DataStream<br>AllWindowedStream &rarr; DataStream</td>
-          <td>
-            <p>Applies a general function to the window as a whole. Below is a 
function that manually sums the elements of a window.</p>
-            <p><strong>Note:</strong> If you are using a windowAll 
transformation, you need to use an AllWindowFunction instead.</p>
-    {% highlight java %}
-windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, 
Tuple, Window>() {
-    public void apply (Tuple key,
-            Window window,
-            Iterable<Tuple2<String, Integer>> values,
-            Collector<Integer> out) throws Exception {
-        int sum = 0;
-        for (Tuple2<String,Integer> t: values) {
-            sum += t.f1;
-        }
-        out.collect (new Integer(sum));
-    }
-});
-
-// applying an AllWindowFunction on non-keyed window stream
-allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, 
Integer, Window>() {
-    public void apply (Window window,
-            Iterable<Tuple2<String, Integer>> values,
-            Collector<Integer> out) throws Exception {
-        int sum = 0;
-        for (Tuple2<String,Integer> t: values) {
-            sum += t.f1;
-        }
-        out.collect (new Integer(sum));
-    }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Applies a functional reduce function to the window and returns 
the reduced value.</p>
-    {% highlight java %}
-windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() {
-    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, 
Tuple2<String, Integer> value2) throws Exception {
-        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
-    }
-};
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Fold</strong><br>WindowedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Applies a functional fold function to the window and returns 
the folded value.
-               The example function, when applied on the sequence (1,2,3,4,5),
-               folds the sequence into the string "start-1-2-3-4-5":</p>
-    {% highlight java %}
-windowedStream.fold("start-", new FoldFunction<Integer, String>() {
-    public String fold(String current, Integer value) {
-        return current + "-" + value;
-    }
-};
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Aggregations on windows</strong><br>WindowedStream 
&rarr; DataStream</td>
-          <td>
-            <p>Aggregates the contents of a window. The difference between min
-           and minBy is that min returns the minimun value, whereas minBy 
returns
-           the element that has the minimum value in this field (same for max 
and maxBy).</p>
-    {% highlight java %}
-windowedStream.sum(0);
-windowedStream.sum("key");
-windowedStream.min(0);
-windowedStream.min("key");
-windowedStream.max(0);
-windowedStream.max("key");
-windowedStream.minBy(0);
-windowedStream.minBy("key");
-windowedStream.maxBy(0);
-windowedStream.maxBy("key");
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
-          <td>
-            <p>Union of two or more data streams creating a new stream 
containing all the elements from all the streams. Node: If you union a data 
stream
-            with itself you will get each element twice in the resulting 
stream.</p>
-    {% highlight java %}
-dataStream.union(otherStream1, otherStream2, ...);
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>Join two data streams on a given key and a common window.</p>
-    {% highlight java %}
-dataStream.join(otherStream)
-    .where(0).equalTo(1)
-    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
-    .apply (new JoinFunction () {...});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>Cogroups two data streams on a given key and a common 
window.</p>
-    {% highlight java %}
-dataStream.coGroup(otherStream)
-    .where(0).equalTo(1)
-    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
-    .apply (new CoGroupFunction () {...});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; 
ConnectedStreams</td>
-          <td>
-            <p>"Connects" two data streams retaining their types. Connect 
allowing for shared state between
-            the two streams.</p>
-    {% highlight java %}
-DataStream<Integer> someStream = //...
-DataStream<String> otherStream = //...
-
-ConnectedStreams<Integer, String> connectedStreams = 
someStream.connect(otherStream);
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; 
DataStream</td>
-          <td>
-            <p>Similar to map and flatMap on a connected data stream</p>
-    {% highlight java %}
-connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
-    @Override
-    public Boolean map1(Integer value) {
-        return true;
-    }
-
-    @Override
-    public Boolean map2(String value) {
-        return false;
-    }
-});
-connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
-
-   @Override
-   public void flatMap1(Integer value, Collector<String> out) {
-       out.collect(value.toString());
-   }
-
-   @Override
-   public void flatMap2(String value, Collector<String> out) {
-       for (String word: value.split(" ")) {
-         out.collect(word);
-       }
-   }
-});
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
-          <td>
-            <p>
-                Split the stream into two or more streams according to some 
criterion.
-                {% highlight java %}
-SplitStream<Integer> split = someDataStream.split(new 
OutputSelector<Integer>() {
-    @Override
-    public Iterable<String> select(Integer value) {
-        List<String> output = new ArrayList<String>();
-        if (value % 2 == 0) {
-            output.add("even");
-        }
-        else {
-            output.add("odd");
-        }
-        return output;
-    }
-});
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
-          <td>
-            <p>
-                Select one or more streams from a split stream.
-                {% highlight java %}
-SplitStream<Integer> split;
-DataStream<Integer> even = split.select("even");
-DataStream<Integer> odd = split.select("odd");
-DataStream<Integer> all = split.select("even","odd");
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream 
&rarr; DataStream</td>
-          <td>
-            <p>
-                Creates a "feedback" loop in the flow, by redirecting the 
output of one operator
-                to some previous operator. This is especially useful for 
defining algorithms that
-                continuously update a model. The following code starts with a 
stream and applies
-               the iteration body continuously. Elements that are greater than 
0 are sent back
-               to the feedback channel, and the rest of the elements are 
forwarded downstream.
-               See <a href="#iterations">iterations</a> for a complete 
description.
-                {% highlight java %}
-IterativeStream<Long> iteration = initialStream.iterate();
-DataStream<Long> iterationBody = iteration.map (/*do something*/);
-DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
-    @Override
-    public boolean filter(Integer value) throws Exception {
-        return value > 0;
-    }
-});
-iteration.closeWith(feedback);
-DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
-    @Override
-    public boolean filter(Integer value) throws Exception {
-        return value <= 0;
-    }
-});
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>
-                Extracts timestamps from records in order to work with windows
-                that use event time semantics. See <a 
href="#working-with-time">working with time</a>.
-                {% highlight java %}
-stream.assignTimestamps (new TimeStampExtractor() {...});
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-  </tbody>
-</table>
-
-</div>
-
-<div data-lang="scala" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-    <tr>
-          <td><strong>Map</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Takes one element and produces one element. A map function that 
doubles the values of the input stream:</p>
-    {% highlight scala %}
-dataStream.map { x => x * 2 }
-    {% endhighlight %}
-          </td>
-        </tr>
-
-        <tr>
-          <td><strong>FlatMap</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Takes one element and produces zero, one, or more elements. A 
flatmap function that splits sentences to words:</p>
-    {% highlight scala %}
-dataStream.flatMap { str => str.split(" ") }
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Filter</strong><br>DataStream &rarr; DataStream</td>
-          <td>
-            <p>Evaluates a boolean function for each element and retains those 
for which the function returns true.
-            A filter that filters out zero values:
-            </p>
-    {% highlight scala %}
-dataStream.filter { _ != 0 }
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>KeyBy</strong><br>DataStream &rarr; KeyedStream</td>
-          <td>
-            <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
-            Internally, this is implemented with hash partitioning. See <a 
href="#specifying-keys">keys</a> on how to specify keys.
-            This transformation returns a KeyedDataStream.</p>
-    {% highlight scala %}
-dataStream.keyBy("someKey") // Key by field "someKey"
-dataStream.keyBy(0) // Key by the first element of a Tuple
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Reduce</strong><br>KeyedStream &rarr; DataStream</td>
-          <td>
-            <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
-            emits the new value.
-                    <br/>
-               <br/>
-            A reduce function that creates a stream of partial sums:</p>
-            {% highlight scala %}
-keyedStream.reduce { _ + _ }
-            {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Fold</strong><br>KeyedStream &rarr; DataStream</td>
-          <td>
-          <p>A "rolling" fold on a keyed data stream with an initial value.
-          Combines the current element with the last folded value and
-          emits the new value.
-          <br/>
-          <br/>
-          <p>A fold function that, when applied on the sequence (1,2,3,4,5),
-          emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p>
-          {% highlight scala %}
-val result: DataStream[String] =
-    keyedStream.fold("start", (str, i) => { str + "-" + i })
-          {% endhighlight %}
-          </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Aggregations</strong><br>KeyedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Rolling aggregations on a keyed data stream. The difference 
between min
-           and minBy is that min returns the minimun value, whereas minBy 
returns
-           the element that has the minimum value in this field (same for max 
and maxBy).</p>
-    {% highlight scala %}
-keyedStream.sum(0)
-keyedStream.sum("key")
-keyedStream.min(0)
-keyedStream.min("key")
-keyedStream.max(0)
-keyedStream.max("key")
-keyedStream.minBy(0)
-keyedStream.minBy("key")
-keyedStream.maxBy(0)
-keyedStream.maxBy("key")
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window</strong><br>KeyedStream &rarr; WindowedStream</td>
-          <td>
-            <p>Windows can be defined on already partitioned KeyedStreams. 
Windows group the data in each
-            key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
-            See <a href="#windows">windows</a> for a description of windows.
-    {% highlight scala %}
-dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.of(5, 
TimeUnit.SECONDS))) // Last 5 seconds of data
-    {% endhighlight %}
-        </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>WindowAll</strong><br>DataStream &rarr; 
AllWindowedStream</td>
-          <td>
-              <p>Windows can be defined on regular DataStreams. Windows group 
all the stream events
-              according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
-              See <a href="#windows">windows</a> for a complete description of 
windows.</p>
-              <p><strong>WARNING:</strong> This is in many cases a 
<strong>non-parallel</strong> transformation. All records will be
-               gathered in one task for the windowAll operator.</p>
-  {% highlight scala %}
-dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) // 
Last 5 seconds of data
-  {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Apply</strong><br>WindowedStream &rarr; 
DataStream<br>AllWindowedStream &rarr; DataStream</td>
-          <td>
-            <p>Applies a general function to the window as a whole. Below is a 
function that manually sums the elements of a window.</p>
-            <p><strong>Note:</strong> If you are using a windowAll 
transformation, you need to use an AllWindowFunction instead.</p>
-    {% highlight scala %}
-windowedStream.apply { WindowFunction }
-
-// applying an AllWindowFunction on non-keyed window stream
-allWindowedStream.apply { AllWindowFunction }
-
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Reduce</strong><br>WindowedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Applies a functional reduce function to the window and returns 
the reduced value.</p>
-    {% highlight scala %}
-windowedStream.reduce { _ + _ }
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Fold</strong><br>WindowedStream &rarr; 
DataStream</td>
-          <td>
-            <p>Applies a functional fold function to the window and returns 
the folded value.
-               The example function, when applied on the sequence (1,2,3,4,5),
-               folds the sequence into the string "start-1-2-3-4-5":</p>
-          {% highlight scala %}
-val result: DataStream[String] =
-    windowedStream.fold("start", (str, i) => { str + "-" + i })
-          {% endhighlight %}
-          </td>
-       </tr>
-        <tr>
-          <td><strong>Aggregations on windows</strong><br>WindowedStream 
&rarr; DataStream</td>
-          <td>
-            <p>Aggregates the contents of a window. The difference between min
-           and minBy is that min returns the minimun value, whereas minBy 
returns
-           the element that has the minimum value in this field (same for max 
and maxBy).</p>
-    {% highlight scala %}
-windowedStream.sum(0)
-windowedStream.sum("key")
-windowedStream.min(0)
-windowedStream.min("key")
-windowedStream.max(0)
-windowedStream.max("key")
-windowedStream.minBy(0)
-windowedStream.minBy("key")
-windowedStream.maxBy(0)
-windowedStream.maxBy("key")
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Union</strong><br>DataStream* &rarr; DataStream</td>
-          <td>
-            <p>Union of two or more data streams creating a new stream 
containing all the elements from all the streams. Node: If you union a data 
stream
-            with itself you will get each element twice in the resulting 
stream.</p>
-    {% highlight scala %}
-dataStream.union(otherStream1, otherStream2, ...)
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window Join</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>Join two data streams on a given key and a common window.</p>
-    {% highlight scala %}
-dataStream.join(otherStream)
-    .where(0).equalTo(1)
-    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
-    .apply { ... }
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Window CoGroup</strong><br>DataStream,DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>Cogroups two data streams on a given key and a common 
window.</p>
-    {% highlight scala %}
-dataStream.coGroup(otherStream)
-    .where(0).equalTo(1)
-    .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
-    .apply {}
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Connect</strong><br>DataStream,DataStream &rarr; 
ConnectedStreams</td>
-          <td>
-            <p>"Connects" two data streams retaining their types, allowing for 
shared state between
-            the two streams.</p>
-    {% highlight scala %}
-someStream : DataStream[Int] = ...
-otherStream : DataStream[String] = ...
-
-val connectedStreams = someStream.connect(otherStream)
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams &rarr; 
DataStream</td>
-          <td>
-            <p>Similar to map and flatMap on a connected data stream</p>
-    {% highlight scala %}
-connectedStreams.map(
-    (_ : Int) => true,
-    (_ : String) => false
-)
-connectedStreams.flatMap(
-    (_ : Int) => true,
-    (_ : String) => false
-)
-    {% endhighlight %}
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Split</strong><br>DataStream &rarr; SplitStream</td>
-          <td>
-            <p>
-                Split the stream into two or more streams according to some 
criterion.
-                {% highlight scala %}
-val split = someDataStream.split(
-  (num: Int) =>
-    (num % 2) match {
-      case 0 => List("even")
-      case 1 => List("odd")
-    }
-)
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
-          <td>
-            <p>
-                Select one or more streams from a split stream.
-                {% highlight scala %}
-
-val even = split select "even"
-val odd = split select "odd"
-val all = split.select("even","odd")
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Iterate</strong><br>DataStream &rarr; IterativeStream  
&rarr; DataStream</td>
-          <td>
-            <p>
-                Creates a "feedback" loop in the flow, by redirecting the 
output of one operator
-                to some previous operator. This is especially useful for 
defining algorithms that
-                continuously update a model. The following code starts with a 
stream and applies
-               the iteration body continuously. Elements that are greater than 
0 are sent back
-               to the feedback channel, and the rest of the elements are 
forwarded downstream.
-               See <a href="#iterations">iterations</a> for a complete 
description.
-                {% highlight java %}
-initialStream. iterate {
-  iteration => {
-    val iterationBody = iteration.map {/*do something*/}
-    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
-  }
-}
-IterativeStream<Long> iteration = initialStream.iterate();
-DataStream<Long> iterationBody = iteration.map (/*do something*/);
-DataStream<Long> feedback = iterationBody.filter ( _ > 0);
-iteration.closeWith(feedback);
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-        <tr>
-          <td><strong>Extract Timestamps</strong><br>DataStream &rarr; 
DataStream</td>
-          <td>
-            <p>
-                Extracts timestamps from records in order to work with windows
-                that use event time semantics.
-                See <a href="#working-with-time">working with time</a>.
-                {% highlight scala %}
-stream.assignTimestamps { timestampExtractor }
-                {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-  </tbody>
-</table>
-
-</div>
-</div>
-
-The following transformations are available on data streams of Tuples:
-
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td><strong>Project</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>Selects a subset of fields from the tuples
-{% highlight java %}
-DataStream<Tuple3<Integer, Double, String>> in = // [...]
-DataStream<Tuple2<String, Integer>> out = in.project(2,0);
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-
-<div data-lang="scala" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td><strong>Project</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>Selects a subset of fields from the tuples
-{% highlight scala %}
-val in : DataStream[(Int,Double,String)] = // [...]
-val out = in.project(2,0)
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-</div>
-
-
-### Physical partitioning
-
-Flink also gives low-level control (if desired) on the exact stream 
partitioning after a transformation,
-via the following functions.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td><strong>Hash partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-      <td>
-        <p>
-            Identical to keyBy but returns a DataStream instead of a 
KeyedStream.
-            {% highlight java %}
-dataStream.partitionByHash("someKey");
-dataStream.partitionByHash(0);
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td><strong>Custom partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-      <td>
-        <p>
-            Uses a user-defined Partitioner to select the target task for each 
element.
-            {% highlight java %}
-dataStream.partitionCustom(new Partitioner(){...}, "someKey");
-dataStream.partitionCustom(new Partitioner(){...}, 0);
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-     <td><strong>Random partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-     <td>
-       <p>
-            Partitions elements randomly according to a uniform distribution.
-            {% highlight java %}
-dataStream.partitionRandom();
-            {% endhighlight %}
-       </p>
-     </td>
-   </tr>
-   <tr>
-      <td><strong>Rebalancing (Round-robin 
partitioning)</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Partitions elements round-robin, creating equal load per 
partition. Useful for performance
-            optimization in the presence of data skew.
-            {% highlight java %}
-dataStream.rebalance();
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Broadcasts elements to every partition.
-            {% highlight java %}
-dataStream.broadcast();
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-
-<div data-lang="scala" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td><strong>Hash partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-      <td>
-        <p>
-            Identical to keyBy but returns a DataStream instead of a 
KeyedStream.
-            {% highlight scala %}
-dataStream.partitionByHash("someKey")
-dataStream.partitionByHash(0)
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td><strong>Custom partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-      <td>
-        <p>
-            Uses a user-defined Partitioner to select the target task for each 
element.
-            {% highlight scala %}
-dataStream.partitionCustom(partitioner, "someKey")
-dataStream.partitionCustom(partitioner, 0)
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-     <td><strong>Random partitioning</strong><br>DataStream &rarr; 
DataStream</td>
-     <td>
-       <p>
-            Partitions elements randomly according to a uniform distribution.
-            {% highlight scala %}
-dataStream.partitionRandom()
-            {% endhighlight %}
-       </p>
-     </td>
-   </tr>
-   <tr>
-      <td><strong>Rebalancing (Round-robin 
partitioning)</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Partitions elements round-robin, creating equal load per 
partition. Useful for performance
-            optimization in the presence of data skew.
-            {% highlight scala %}
-dataStream.rebalance()
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
-      <td>
-        <p>
-            Broadcasts elements to every partition.
-            {% highlight scala %}
-dataStream.broadcast()
-            {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-</div>
-
-### Task chaining and resource groups
-
-Chaining two subsequent transformations means co-locating them within the same 
thread for better
-performance. Flink by default chains operators if this is possible (e.g., two 
subsequent map
-transformations). The API gives fine-grained control over chaining if desired:
-
-Use `StreamExecutionEnvironment.disableOperatorChaining()` if you want to 
disable chaining in
-the whole job. For more fine grained control, the following functions are 
available. Note that
-these functions can only be used right after a DataStream transformation as 
they refer to the
-previous transformation. For example, you can use 
`someStream.map(...).startNewChain()`, but
-you cannot use `someStream.startNewChain()`.
-
-A resource group is a slot in Flink, see
-[slots]({{site.baseurl}}/setup/config.html#configuring-taskmanager-processing-slots).
 You can
-manually isolate operators in separate slots if desired.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td>Start new chain</td>
-      <td>
-        <p>Begin a new chain, starting with this operator. The two
-       mappers will be chained, and filter will not be chained to
-       the first mapper.
-{% highlight java %}
-someStream.filter(...).map(...).startNewChain().map(...);
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td>Disable chaining</td>
-      <td>
-        <p>Do not chain the map operator
-{% highlight java %}
-someStream.map(...).disableChaining();
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td>Start a new resource group</td>
-      <td>
-        <p>Start a new resource group containing the filter and the subsequent 
operators.
-{% highlight java %}
-someStream.filter(...).startNewResourceGroup();
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td>Isolate resources</td>
-      <td>
-        <p>Isolate the operator in its own slot.
-{% highlight java %}
-someStream.map(...).isolateResources();
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-
-<div data-lang="scala" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 20%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-   <tr>
-      <td>Start new chain</td>
-      <td>
-        <p>Begin a new chain, starting with this operator. The two
-       mappers will be chained, and filter will not be chained to
-       the first mapper.
-{% highlight scala %}
-someStream.filter(...).map(...).startNewChain().map(...)
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td>Disable chaining</td>
-      <td>
-        <p>Do not chain the map operator
-{% highlight scala %}
-someStream.map(...).disableChaining()
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td>Start a new resource group</td>
-      <td>
-        <p>Start a new resource group containing the map and the subsequent 
operators.
-{% highlight scala %}
-someStream.filter(...).startNewResourceGroup()
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-   <tr>
-      <td>Isolate resources</td>
-      <td>
-        <p>Isolate the operator in its own slot.
-{% highlight scala %}
-someStream.map(...).isolateResources()
-{% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-</div>
-
-
-[Back to top](#top)
-
-Specifying Keys
-----------------
-
-The `keyBy` transformation requires that a key is defined on
-its argument DataStream.
-
-A DataStream is keyed as
-{% highlight java %}
-DataStream<...> input = // [...]
-DataStream<...> windowed = input
-       .keyBy(/*define key here*/)
-       .window(/*define window here*/);
-{% endhighlight %}
-
-The data model of Flink is not based on key-value pairs. Therefore,
-you do not need to physically pack the data stream types into keys and
-values. Keys are "virtual": they are defined as functions over the
-actual data to guide the grouping operator.
-
-See [the relevant section of the DataSet API 
documentation](programming_guide.html#specifying-keys) on how to specify keys.
-Just replace `DataSet` with `DataStream`, and `groupBy` with `keyBy`.
-
-
-
-Passing Functions to Flink
---------------------------
-
-Some transformations take user-defined functions as arguments.
-
-See [the relevant section of the DataSet API 
documentation](programming_guide.html#passing-functions-to-flink).
-
-
-[Back to top](#top)
-
-
-Data Types
-----------
-
-Flink places some restrictions on the type of elements that are used in 
DataStreams and in results
-of transformations. The reason for this is that the system analyzes the types 
to determine
-efficient execution strategies.
-
-See [the relevant section of the DataSet API 
documentation](programming_guide.html#data-types).
-
-[Back to top](#top)
-
-
-Data Sources
-------------
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-Sources can by created by using 
`StreamExecutionEnvironment.addSource(sourceFunction)`.
-You can either use one of the source functions that come with Flink or write a 
custom source
-by implementing the `SourceFunction` for non-parallel sources, or by 
implementing the
-`ParallelSourceFunction` interface or extending `RichParallelSourceFunction` 
for parallel sources.
-
-There are several predefined stream sources accessible from the 
`StreamExecutionEnvironment`:
-
-File-based:
-
-- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns 
them as Strings.
-
-- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line 
wise and returns them as
-  StringValues. StringValues are mutable strings.
-
-- `readFile(path)` / Any input format - Reads files as dictated by the input 
format.
-
-- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files 
of new-line (or another char sequence) delimited primitive data types such as 
`String` or `Integer`.
-
-- `readFileStream` - create a stream by appending elements when there are 
changes to a file
-
-Socket-based:
-
-- `socketTextStream` - Reads from a socket. Elements can be separated by a 
delimiter.
-
-Collection-based:
-
-- `fromCollection(Collection)` - Creates a data stream from the Java 
Java.util.Collection. All elements
-  in the collection must be of the same type.
-
-- `fromCollection(Iterator, Class)` - Creates a data stream from an iterator. 
The class specifies the
-  data type of the elements returned by the iterator.
-
-- `fromElements(T ...)` - Creates a data stream from the given sequence of 
objects. All objects must be
-  of the same type.
-
-- `fromParallelCollection(SplittableIterator, Class)` - Creates a data stream 
from an iterator, in
-  parallel. The class specifies the data type of the elements returned by the 
iterator.
-
-- `generateSequence(from, to)` - Generates the sequence of numbers in the 
given interval, in
-  parallel.
-
-Custom:
-
-- `addSource` - Attache a new source function. For example, to read from 
Apache Kafka you can use
-    `addSource(new FlinkKafkaConsumer082<>(...))`. See 
[connectors](#connectors) for more details.
-
-</div>
-
-<div data-lang="scala" markdown="1">
-
-<br />
-
-Sources can by created by using 
`StreamExecutionEnvironment.addSource(sourceFunction)`.
-You can either use one of the source functions that come with Flink or write a 
custom source
-by implementing the `SourceFunction` for non-parallel sources, or by 
implementing the
-`ParallelSourceFunction` interface or extending `RichParallelSourceFunction` 
for parallel sources.
-
-There are several predefined stream sources accessible from the 
`StreamExecutionEnvironment`:
-
-File-based:
-
-- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns 
them as Strings.
-
-- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line 
wise and returns them as
-  StringValues. StringValues are mutable strings.
-
-- `readFile(path)` / Any input format - Reads files as dictated by the input 
format.
-
-- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files 
of new-line (or another char sequence) delimited primitive data types such as 
`String` or `Integer`.
-
-- `readFileStream` - create a stream by appending elements when there are 
changes to a file
-
-Socket-based:
-
-- `socketTextStream` - Reads from a socket. Elements can be separated by a 
delimiter.
-
-Collection-based:
-
-- `fromCollection(Seq)` - Creates a data stream from the Java 
Java.util.Collection. All elements
-  in the collection must be of the same type.
-
-- `fromCollection(Iterator)` - Creates a data stream from an iterator. The 
class specifies the
-  data type of the elements returned by the iterator.
-
-- `fromElements(elements: _*)` - Creates a data stream from the given sequence 
of objects. All objects must be
-  of the same type.
-
-- `fromParallelCollection(SplittableIterator)` - Creates a data stream from an 
iterator, in
-  parallel. The class specifies the data type of the elements returned by the 
iterator.
-
-- `generateSequence(from, to)` - Generates the sequence of numbers in the 
given interval, in
-  parallel.
-
-Custom:
-
-- `addSource` - Attache a new source function. For example, to read from 
Apache Kafka you can use
-    `addSource(new FlinkKafkaConsumer082<>(...))`. See 
[connectors](#connectors) for more details.
-
-</div>
-</div>
-
-[Back to top](#top)
-
-
-Execution Configuration
-----------
-
-The `StreamExecutionEnvironment` also contains the `ExecutionConfig` which 
allows to set job specific configuration values for the runtime.
-
-See [the relevant section of the DataSet API 
documentation](programming_guide.html#execution-configuration).
-
-Parameters in the `ExecutionConfig` that pertain specifically to the 
DataStream API are:
-
-- `enableTimestamps()` / **`disableTimestamps()`**: Attach a timestamp to each 
event emitted from a source.
-    `areTimestampsEnabled()` returns the current value.
-
-- `setAutoWatermarkInterval(long milliseconds)`: Set the interval for 
automatic watermark emission. You can
-    get the current value with `long getAutoWatermarkInterval()`
-
-[Back to top](#top)
-
-Data Sinks
-----------
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-Data sinks consume DataStreams and forward them to files, sockets, external 
systems, or print them.
-Flink comes with a variety of built-in output formats that are encapsulated 
behind operations on the
-DataStreams:
-
-- `writeAsText()` / `TextOuputFormat` - Writes elements line-wise as Strings. 
The Strings are
-  obtained by calling the *toString()* method of each element.
-
-- `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated 
value files. Row and field
-  delimiters are configurable. The value for each field comes from the 
*toString()* method of the objects.
-
-- `print()` / `printToErr()`  - Prints the *toString()* value
-of each element on the standard out / strandard error stream. Optionally, a 
prefix (msg) can be provided which is
-prepended to the output. This can help to distinguish between different calls 
to *print*. If the parallelism is
-greater than 1, the output will also be prepended with the identifier of the 
task which produced the output.
-
-- `write()` / `FileOutputFormat` - Method and base class for custom file 
outputs. Supports
-  custom object-to-bytes conversion.
-
-- `writeToSocket` - Writes elements to a socket according to a 
`SerializationSchema`
-
-- `addSink` - Invokes a custom sink function. Flink comes bundled with 
connectors to other systems (such as
-    Apache Kafka) that are implemented as sink functions.
-
-</div>
-<div data-lang="scala" markdown="1">
-
-<br />
-
-Data sinks consume DataStreams and forward them to files, sockets, external 
systems, or print them.
-Flink comes with a variety of built-in output formats that are encapsulated 
behind operations on the
-DataStreams:
-
-- `writeAsText()` / `TextOuputFormat` - Writes elements line-wise as Strings. 
The Strings are
-  obtained by calling the *toString()* method of each element.
-
-- `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated 
value files. Row and field
-  delimiters are configurable. The value for each field comes from the 
*toString()* method of the objects.
-
-- `print()` / `printToErr()`  - Prints the *toString()* value
-of each element on the standard out / strandard error stream. Optionally, a 
prefix (msg) can be provided which is
-prepended to the output. This can help to distinguish between different calls 
to *print*. If the parallelism is
-greater than 1, the output will also be prepended with the identifier of the 
task which produced the output.
-
-- `write()` / `FileOutputFormat` - Method and base class for custom file 
outputs. Supports
-  custom object-to-bytes conversion.
-
-- `writeToSocket` - Writes elements to a socket according to a 
`SerializationSchema`
-
-- `addSink` - Invokes a custom sink function. Flink comes bundled with 
connectors to other systems (such as
-    Apache Kafka) that are implemented as sink functions.
-
-</div>
-</div>
-
-
-[Back to top](#top)
-
-Debugging
----------
-
-Before running a streaming program in a distributed cluster, it is a good
-idea to make sure that the implemented algorithm works as desired. Hence, 
implementing data analysis
-programs is usually an incremental process of checking results, debugging, and 
improving.
-
-Flink provides features to significantly ease the development process of data 
analysis
-programs by supporting local debugging from within an IDE, injection of test 
data, and collection of
-result data. This section give some hints how to ease the development of Flink 
programs.
-
-### Local Execution Environment
-
-A `LocalStreamEnvironment` starts a Flink system within the same JVM process 
it was created in. If you
-start the LocalEnvironement from an IDE, you can set breakpoints in your code 
and easily debug your
-program.
-
-A LocalEnvironment is created and used as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
-
-DataStream<String> lines = env.addSource(/* some source */);
-// build your program
-
-env.execute();
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-
-{% highlight scala %}
-val env = StreamExecutionEnvironment.createLocalEnvironment()
-
-val lines = env.addSource(/* some source */)
-// build your program
-
-env.execute()
-{% endhighlight %}
-</div>
-</div>
-
-### Collection Data Sources
-
-Flink provides special data sources which are backed
-by Java collections to ease testing. Once a program has been tested, the 
sources and sinks can be
-easily replaced by sources and sinks that read from / write to external 
systems.
-
-Collection data sources can be used as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
-
-// Create a DataStream from a list of elements
-DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);
-
-// Create a DataStream from any Java collection
-List<Tuple2<String, Integer>> data = ...
-DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);
-
-// Create a DataStream from an Iterator
-Iterator<Long> longIt = ...
-DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.createLocalEnvironment()
-
-// Create a DataStream from a list of elements
-val myInts = env.fromElements(1, 2, 3, 4, 5)
-
-// Create a DataStream from any Collection
-val data: Seq[(String, Int)] = ...
-val myTuples = env.fromCollection(data)
-
-// Create a DataStream from an Iterator
-val longIt: Iterator[Long] = ...
-val myLongs = env.fromCollection(longIt)
-{% endhighlight %}
-</div>
-</div>
-
-**Note:** Currently, the collection data source requires that data types and 
iterators implement
-`Serializable`. Furthermore, collection data sources can not be executed in 
parallel (
-parallelism = 1).
-
-### Iterator Data Sink
-
-Flink also provides a sink to collect DataStream results for testing and 
debugging purposes. It can be used as follows:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-import org.apache.flink.contrib.streaming.DataStreamUtils
-
-DataStream<Tuple2<String, Integer>> myResult = ...
-Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
-{% endhighlight %}
-
-</div>
-<div data-lang="scala" markdown="1">
-
-{% highlight scala %}
-import org.apache.flink.contrib.streaming.DataStreamUtils
-import scala.collection.JavaConverters.asScalaIteratorConverter
-
-val myResult: DataStream[(String, Int)] = ...
-val myOutput: Iterator[(String, Int)] = 
DataStreamUtils.collect(myResult.getJavaStream).asScala
-{% endhighlight %}
-</div>
-</div>
-
-
-[Back to top](#top)
-
-
-Windows
--------
-
-### Working with Time
-
-Windows are typically groups of events within a certain time period. Reasoning 
about time and windows assumes
-a definition of time. Flink has support for three kinds of time:
-
-- *Processing time:* Processing time is simply the wall clock time of the 
machine that happens to be
-    executing the transformation. Processing time is the simplest notion of 
time and provides the best
-    performance. However, in distributed and asynchronous environments 
processing time does not provide
-    determinism.
-
-- *Event time:* Event time is the time that each individual event occurred. 
This time is
-    typically embedded within the records before they enter Flink or can be 
extracted from their contents.
-    When using event time, out-of-order events can be properly handled. For 
example, an event with a lower
-    timestamp may arrive after an event with a higher timestamp, but 
transformations will handle these events
-    correctly. Event time processing provides predictable results, but incurs 
more latency, as out-of-order
-    events need to be buffered
-
-- *Ingestion time:* Ingestion time is the time that events enter Flink. In 
particular, the timestamp of
-    an event is assigned by the source operator as the current wall clock time 
of the machine that executes
-    the source task at the time the records enter the Flink source. Ingestion 
time is more predictable
-    than processing time, and gives lower latencies than event time as the 
latency does not depend on
-    external systems. Ingestion time provides thus a middle ground between 
processing time and event time.
-    Ingestion time is a special case of event time (and indeed, it is treated 
by Flink identically to
-    event time).
-
-When dealing with event time, transformations need to avoid indefinite
-wait times for events to arrive. *Watermarks* provide the mechanism to control 
the event time-processing time skew. Watermarks
-are emitted by the sources. A watermark with a certain timestamp denotes the 
knowledge that no event
-with timestamp lower than the timestamp of the watermark will ever arrive.
-
-You can specify the semantics of time in a Flink DataStream program using 
`StreamExecutionEnviroment`, as
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight java %}
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-The default value is `TimeCharacteristic.ProcessingTime`, so in order to write 
a program with processing
-time semantics nothing needs to be specified (e.g., the first 
[example](#example-program) in this guide follows processing
-time semantics).
-
-In order to work with event time semantics, you need to follow four steps:
-
-- Set `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)`
-
-- Use `DataStream.assignTimestamps(...)` in order to tell Flink how timestamps 
relate to events (e.g., which
-    record field is the timestamp)
-
-- Set `enableTimestamps()`, as well the interval for watermark emission 
(`setAutoWatermarkInterval(long milliseconds)`)
-    in `ExecutionConfig`.
-
-For example, assume that we have a data stream of tuples, in which the first 
field is the timestamp (assigned
-by the system that generates these data streams), and we know that the lag 
between the current processing
-time and the timestamp of an event is never more than 1 second:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<Tuple4<Long,Integer,Double,String>> stream = //...
-stream.assignTimestamps(new 
TimestampExtractor<Tuple4<Long,Integer,Double,String>>{
-    @Override
-    public long extractTimestamp(Tuple4<Long,Integer,Double,String> element, 
long currentTimestamp) {
-        return element.f0;
-    }
-
-    @Override
-    public long extractWatermark(Tuple4<Long,Integer,Double,String> element, 
long currentTimestamp) {
-        return element.f0 - 1000;
-    }
-
-    @Override
-    public long getCurrentWatermark() {
-        return Long.MIN_VALUE;
-    }
-});
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val stream: DataStream[(Long,Int,Double,String)] = null;
-stream.assignTimestampts(new TimestampExtractor[(Long, Int, Double, String)] {
-  override def extractTimestamp(element: (Long, Int, Double, String), 
currentTimestamp: Long): Long = element._1
-
-  override def extractWatermark(element: (Long, Int, Double, String), 
currentTimestamp: Long): Long = element._1 - 1000
-
-  override def getCurrentWatermark: Long = Long.MinValue
-})
-{% endhighlight %}
-</div>
-</div>
-
-If you know that timestamps of events are always ascending, i.e., elements 
arrive in order, you can use
-the `AscendingTimestampExtractor`, and the system generates watermarks 
automatically:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<Tuple4<Long,Integer,Double,String>> stream = //...
-stream.assignTimestamps(new 
AscendingTimestampExtractor<Tuple4<Long,Integer,Double,String>>{
-    @Override
-    public long extractAscendingTimestamp(Tuple4<Long,Integer,Double,String> 
element, long currentTimestamp) {
-        return element.f0;
-    }
-});
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-stream.extractAscendingTimestamp(record => record._1)
-{% endhighlight %}
-</div>
-</div>
-
-In order to write a program with ingestion time semantics, you need to
-set `env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)`. You 
can think of this setting as a
-shortcut for writing a `TimestampExtractor` which assignes timestamps to 
events at the sources
-based on the current source wall-clock time. Flink injects this timestamp 
extractor automatically.
-
-
-### Windows on Keyed Data Streams
-
-Flink offers a variety of methods for defining windows on a `KeyedStream`. All 
of these group elements *per key*,
-i.e., each window will contain elements with the same key value.
-
-#### Basic Window Constructs
-
-Flink offers a general window mechanism that provides flexibility, as well as 
a number of pre-defined windows
-for common use cases. See first if your use case can be served by the 
pre-defined windows below before moving
-to defining your own windows.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-      <tr>
-        <td><strong>Tumbling time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-          Defines a window of 5 seconds, that "tumbles". This means that 
elements are
-          grouped according to their timestamp in groups of 5 second duration, 
and every element belongs to exactly one window.
-         The notion of time is specified by the selected TimeCharacteristic 
(see <a href="#working-with-time">time</a>).
-    {% highlight java %}
-keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS));
-    {% endhighlight %}
-          </p>
-        </td>
-      </tr>
-      <tr>
-          <td><strong>Sliding time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-          <td>
-            <p>
-             Defines a window of 5 seconds, that "slides" by 1 seconds. This 
means that elements are
-             grouped according to their timestamp in groups of 5 second 
duration, and elements can belong to more than
-             one window (since windows overlap by at most 4 seconds)
-             The notion of time is specified by the selected 
TimeCharacteristic (see <a href="#working-with-time">time</a>).
-      {% highlight java %}
-keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS));
-      {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-      <tr>
-        <td><strong>Tumbling count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-          Defines a window of 1000 elements, that "tumbles". This means that 
elements are
-          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
-          and every element belongs to exactly one window.
-    {% highlight java %}
-keyedStream.countWindow(1000);
-    {% endhighlight %}
-        </p>
-        </td>
-      </tr>
-      <tr>
-      <td><strong>Sliding count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-      <td>
-        <p>
-          Defines a window of 1000 elements, that "slides" every 100 elements. 
This means that elements are
-          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
-          and every element can belong to more than one window (as windows 
overlap by at most 900 elements).
-  {% highlight java %}
-keyedStream.countWindow(1000, 100)
-  {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-
-<div data-lang="scala" markdown="1">
-
-<br />
-
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-      <tr>
-        <td><strong>Tumbling time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-          Defines a window of 5 seconds, that "tumbles". This means that 
elements are
-          grouped according to their timestamp in groups of 5 second duration, 
and every element belongs to exactly one window.
-          The notion of time is specified by the selected TimeCharacteristic 
(see <a href="#working-with-time">time</a>).
-    {% highlight scala %}
-keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS))
-    {% endhighlight %}
-          </p>
-        </td>
-      </tr>
-      <tr>
-          <td><strong>Sliding time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-          <td>
-            <p>
-             Defines a window of 5 seconds, that "slides" by 1 seconds. This 
means that elements are
-             grouped according to their timestamp in groups of 5 second 
duration, and elements can belong to more than
-             one window (since windows overlap by at most 4 seconds)
-             The notion of time is specified by the selected 
TimeCharacteristic (see <a href="#working-with-time">time</a>).
-      {% highlight scala %}
-keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS))
-      {% endhighlight %}
-            </p>
-          </td>
-        </tr>
-      <tr>
-        <td><strong>Tumbling count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-          Defines a window of 1000 elements, that "tumbles". This means that 
elements are
-          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
-          and every element belongs to exactly one window.
-    {% highlight scala %}
-keyedStream.countWindow(1000)
-    {% endhighlight %}
-        </p>
-        </td>
-      </tr>
-      <tr>
-      <td><strong>Sliding count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-      <td>
-        <p>
-          Defines a window of 1000 elements, that "slides" every 100 elements. 
This means that elements are
-          grouped according to their arrival time (equivalent to processing 
time) in groups of 1000 elements,
-          and every element can belong to more than one window (as windows 
overlap by at most 900 elements).
-  {% highlight scala %}
-keyedStream.countWindow(1000, 100)
-  {% endhighlight %}
-        </p>
-      </td>
-    </tr>
-  </tbody>
-</table>
-
-</div>
-</div>
-
-#### Advanced Window Constructs
-
-The general mechanism can define more powerful windows at the cost of more 
verbose syntax. For example,
-below is a window definition where windows hold elements of the last 5 seconds 
and slides every 1 second,
-but the execution of the window function is triggered when 100 elements have 
been added to the
-window, and every time execution is triggered, 10 elements are retained in the 
window:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-keyedStream
-    .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS))
-    .trigger(CountTrigger.of(100))
-    .evictor(CountEvictor.of(10));
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-keyedStream
-    .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS))
-    .trigger(CountTrigger.of(100))
-    .evictor(CountEvictor.of(10))
-{% endhighlight %}
-</div>
-</div>
-
-The general recipe for building a custom window is to specify (1) a 
`WindowAssigner`, (2) a `Trigger` (optionally),
-and (3) an `Evictor` (optionally).
-
-The `WindowAssigner` defines how incoming elements are assigned to windows. A 
window is a logical group of elements
-that has a begin-value, and an end-value corresponding to a begin-time and 
end-time. Elements with timestamp (according
-to some notion of time described above within these values are part of the 
window).
-
-For example, the `SlidingTimeWindows`
-assigner in the code above defines a window of size 5 seconds, and a slide of 
1 second. Assume that
-time starts from 0 and is measured in milliseconds. Then, we have 6 windows
-that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], 
and [5000, 10000]. Each incoming
-element is assigned to the windows according to its timestamp. For example, an 
element with timestamp 2000 will be
-assigned to the first three windows. Flink comes bundled with window assigners 
that cover the most common use cases. You can write your
-own window types by extending the `WindowAssigner` class.
-
-<div class="codetabs" markdown="1">
-
-<div data-lang="java" markdown="1">
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-      <tr>
-        <td><strong>Global window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-           All incoming elements of a given key are assigned to the same 
window.
-           The window does not contain a default trigger, hence it will never 
be triggered
-           if a trigger is not explicitly specified.
-          </p>
-    {% highlight java %}
-stream.window(GlobalWindows.create());
-    {% endhighlight %}
-        </td>
-      </tr>
-      <tr>
-          <td><strong>Tumbling time windows</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-          <td>
-            <p>
-              Incoming elements are assigned to a window of a certain size (1 
second below) based on
-              their timestamp. Windows do not overlap, i.e., each element is 
assigned to exactly one window.
-             The notion of time is picked from the specified 
TimeCharacteristic (see <a href="#working-with-time">time</a>).
-             The window comes with a default trigger. For event/ingestion 
time, a window is triggered when a
-             watermark with value higher than its end-value is received, 
whereas for processing time
-             when the current processing time exceeds its current end value.
-            </p>
-      {% highlight java %}
-stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)));
-      {% endhighlight %}
-          </td>
-        </tr>
-      <tr>
-        <td><strong>Sliding time windows</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-            Incoming elements are assigned to a window of a certain size (5 
seconds below) based on
-            their timestamp. Windows "slide" by the provided value (1 second 
in the example), and hence
-            overlap. The window comes with a default trigger. For 
event/ingestion time, a window is triggered when a
-           watermark with value higher than its end-value is received, whereas 
for processing time
-           when the current processing time exceeds its current end value.
-          </p>
-    {% highlight java %}
-stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS)));
-    {% endhighlight %}
-        </td>
-      </tr>
-  </tbody>
-</table>
-</div>
-
-<div data-lang="scala" markdown="1">
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-      <tr>
-        <td><strong>Global window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-            All incoming elements of a given key are assigned to the same 
window.
-           The window does not contain a default trigger, hence it will never 
be triggered
-           if a trigger is not explicitly specified.
-          </p>
-    {% highlight scala %}
-stream.window(GlobalWindows.create)
-    {% endhighlight %}
-        </td>
-      </tr>
-      <tr>
-          <td><strong>Tumbling time windows</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-          <td>
-            <p>
-              Incoming elements are assigned to a window of a certain size (1 
second below) based on
-              their timestamp. Windows do not overlap, i.e., each element is 
assigned to exactly one window.
-             The notion of time is specified by the selected 
TimeCharacteristic (see <a href="#working-with-time">time</a>).
-             The window comes with a default trigger. For event/ingestion 
time, a window is triggered when a
-             watermark with value higher than its end-value is received, 
whereas for processing time
-             when the current processing time exceeds its current end value.
-            </p>
-      {% highlight scala %}
-stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-      {% endhighlight %}
-          </td>
-        </tr>
-      <tr>
-        <td><strong>Sliding time windows</strong><br>KeyedStream &rarr; 
WindowedStream</td>
-        <td>
-          <p>
-            Incoming elements are assigned to a window of a certain size (5 
seconds below) based on
-            their timestamp. Windows "slide" by the provided value (1 second 
in the example), and hence
-            overlap. The window comes with a default trigger. For 
event/ingestion time, a window is triggered when a
-           watermark with value higher than its end-value is received, whereas 
for processing time
-           when the current processing time exceeds its current end value.
-          </p>
-    {% highlight scala %}
-stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS)))
-    {% endhighlight %}
-        </td>
-      </tr>
-  </tbody>
-</table>
-</div>
-
-</div>
-
-The `Trigger` specifies when the function that comes after the window clause 
(e.g., `sum`, `count`) is evaluated ("fires")
-for each window. If a trigger is not specified, a default trigger for each 
window type is used (that is part of the
-definition of the `WindowAssigner`). Flink comes bundled with a set of 
triggers if the ones that windows use by
-default do not fit the application. You can write your own trigger by 
implementing the `Trigger` interface. Note that
-specifying a trigger will override the default trigger of the window assigner.
-
-<div class="codetabs" markdown="1">
-
-<div data-lang="java" markdown="1">
-<table class="table table-bordered">
-  <thead>
-    <tr>
-      <th class="text-left" style="width: 25%">Transformation</th>
-      <th class="text-center">Description</th>
-    </tr>
-  </thead>
-  <tbody>
-  <tr>
-    <td><strong>Processing time trigger</strong></td>
-    <td>
-      <p>
-        A window is fired when the current processing time exceeds its 
end-value.
-        The elements on the triggered window are henceforth discarded.
-      </p>
-{% highlight java %}
-windowedStream.trigger(ProcessingTimeTrigger.create());
-{% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Watermark trigger</strong></td>
-    <td>
-      <p>
-        A window is fired when a watermark with value that exceeds the 
window's end-value has been received.
-        The elements on the triggered window are henceforth discarded.
-      </p>
-{% highlight java %}
-windowedStream.trigger(EventTimeTrigger.create());
-{% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Continuous processing time trigger</strong></td>
-    <td>
-      <p>
-        A window is periodically considered for being fired (every 5 seconds 
in the example).
-        The window is actually fired only when the current processing time 
exceeds its end-value.
-        The elements on the triggered window are retained.
-      </p>
-{% highlight java %}
-windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, 
TimeUnit.SECONDS)));
-{% endhighlight %}
-    </td>
-  </tr>
-  <tr>
-    <td><strong>Continuous watermark time trigger</strong></td>
-    <td>
-      <p>
-        A window is periodically considered for being fired (every 5 seconds 
in the example).
-        A window is actually fired wh

<TRUNCATED>

Reply via email to