Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2154#discussion_r69157901
  
    --- Diff: docs/apis/streaming/windows.md ---
    @@ -24,1023 +24,608 @@ specific language governing permissions and 
limitations
     under the License.
     -->
     
    +Flink uses a concept called *windows* to divide a (potentially) infinite 
`DataStream` into finite
    +slices based on the timestamps of elements or other criteria. This 
division is required when working
    +with infinite streams of data and performing transformations that 
aggregate elements.
    +
    +<span class="label label-info">Info</span> We will mostly talk about 
*keyed windowing* here, i.e.
    +windows that are applied on a `KeyedStream`. Keyed windows have the 
advantage that elements are
    +subdivided based on both window and key before being given to
    +a user function. The work can thus be distributed across the cluster
    +because the elements for different keys can be processed independently. If 
you absolutely have to,
    +you can check out [non-keyed windowing](#non-keyed-windowing) where we 
describe how non-keyed
    +windows work.
    +
     * This will be replaced by the TOC
     {:toc}
     
    -## Windows on Keyed Data Streams
    -
    -Flink offers a variety of methods for defining windows on a `KeyedStream`. 
All of these group elements *per key*,
    -i.e., each window will contain elements with the same key value.
    +## Basics
     
    -### Basic Window Constructs
    +For a windowed transformations you must at least specify a *key*
    +(see [specifying keys](apis/common/index.html#specifying-keys))
    +a *window assigner* and a *window function*. The *key* divides the 
infinite, non-keyed, stream
    +into logical keyed streams while the *window assigner* assigns elements to 
finite per-key windows.
    +Finally, the *window function* is used to process the elements of each 
window.
     
    -Flink offers a general window mechanism that provides flexibility, as well 
as a number of pre-defined windows
    -for common use cases. See first if your use case can be served by the 
pre-defined windows below before moving
    -to defining your own windows.
    +The basic structure of a windowed transformation is thus as follows:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<T> input = ...;
     
    -<br />
    -
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Tumbling time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 5 seconds, that "tumbles". This means that 
elements are
    -          grouped according to their timestamp in groups of 5 second 
duration, and every element belongs to exactly one window.
    -     The notion of time is specified by the selected TimeCharacteristic 
(see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
    -    {% highlight java %}
    -keyedStream.timeWindow(Time.seconds(5));
    -    {% endhighlight %}
    -          </p>
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Sliding time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
    -          <td>
    -            <p>
    -             Defines a window of 5 seconds, that "slides" by 1 second. 
This means that elements are
    -             grouped according to their timestamp in groups of 5 second 
duration, and elements can belong to more than
    -             one window (since windows overlap by at most 4 seconds)
    -             The notion of time is specified by the selected 
TimeCharacteristic (see <a href="{{ site.baseurl 
}}/apis/streaming/event_time.html">time</a>).
    -      {% highlight java %}
    -keyedStream.timeWindow(Time.seconds(5), Time.seconds(1));
    -      {% endhighlight %}
    -            </p>
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Tumbling count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 1000 elements, that "tumbles". This means 
that elements are
    -          grouped according to their arrival time (equivalent to 
processing time) in groups of 1000 elements,
    -          and every element belongs to exactly one window.
    -    {% highlight java %}
    -keyedStream.countWindow(1000);
    -    {% endhighlight %}
    -        </p>
    -        </td>
    -      </tr>
    -      <tr>
    -      <td><strong>Sliding count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
    -      <td>
    -        <p>
    -          Defines a window of 1000 elements, that "slides" every 100 
elements. This means that elements are
    -          grouped according to their arrival time (equivalent to 
processing time) in groups of 1000 elements,
    -          and every element can belong to more than one window (as windows 
overlap by at most 900 elements).
    -  {% highlight java %}
    -keyedStream.countWindow(1000, 100)
    -  {% endhighlight %}
    -        </p>
    -      </td>
    -    </tr>
    -  </tbody>
    -</table>
    -
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .<windowed transformation>(<window function>);
    +{% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val input: DataStream[T] = ...
     
    -<br />
    -
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Tumbling time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 5 seconds, that "tumbles". This means that 
elements are
    -          grouped according to their timestamp in groups of 5 second 
duration, and every element belongs to exactly one window.
    -          The notion of time is specified by the selected 
TimeCharacteristic (see <a href="{{ site.baseurl 
}}/apis/streaming/event_time.html">time</a>).
    -    {% highlight scala %}
    -keyedStream.timeWindow(Time.seconds(5))
    -    {% endhighlight %}
    -          </p>
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Sliding time window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
    -          <td>
    -            <p>
    -             Defines a window of 5 seconds, that "slides" by 1 second. 
This means that elements are
    -             grouped according to their timestamp in groups of 5 second 
duration, and elements can belong to more than
    -             one window (since windows overlap by at most 4 seconds)
    -             The notion of time is specified by the selected 
TimeCharacteristic (see <a href="{{ site.baseurl 
}}/apis/streaming/event_time.html">time</a>).
    -      {% highlight scala %}
    -keyedStream.timeWindow(Time.seconds(5), Time.seconds(1))
    -      {% endhighlight %}
    -            </p>
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Tumbling count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
    -        <td>
    -          <p>
    -          Defines a window of 1000 elements, that "tumbles". This means 
that elements are
    -          grouped according to their arrival time (equivalent to 
processing time) in groups of 1000 elements,
    -          and every element belongs to exactly one window.
    -    {% highlight scala %}
    -keyedStream.countWindow(1000)
    -    {% endhighlight %}
    -        </p>
    -        </td>
    -      </tr>
    -      <tr>
    -      <td><strong>Sliding count window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
    -      <td>
    -        <p>
    -          Defines a window of 1000 elements, that "slides" every 100 
elements. This means that elements are
    -          grouped according to their arrival time (equivalent to 
processing time) in groups of 1000 elements,
    -          and every element can belong to more than one window (as windows 
overlap by at most 900 elements).
    -  {% highlight scala %}
    -keyedStream.countWindow(1000, 100)
    -  {% endhighlight %}
    -        </p>
    -      </td>
    -    </tr>
    -  </tbody>
    -</table>
    -
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .<windowed transformation>(<window function>)
    +{% endhighlight %}
     </div>
     </div>
     
    -### Advanced Window Constructs
    +We will cover [window assigners](#window-assigners) in a separate section 
below.
    +
    +The window transformation can be one of `reduce()`, `fold()` or `apply()`. 
Which respectively
    +takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe 
each of these ways
    +of specifying a windowed transformation in detail below: [window 
functions](#window-functions).
    +
    +For more advanced use cases you can also specify a `Trigger` that 
determines when exactly a window
    +is being considered as *ready for processing*. These will be covered in 
more detail in
    +[triggers](#triggers).
    +
    +## Window Assigners
    +
    +The window assigner specifies how elements of the stream are divided into 
finite slices. Flink comes
    +with pre-implemented window assigners for the most typical use cases, 
namely *tumbling windows*,
    +*sliding windows*, *session windows* and *global windows* but you can 
implement your own by
    +extending the `WindowAssigner` class. All the built-in window assigners, 
except for the global
    +windows one, assign elements to windows based on time, which can either be 
processing time or event
    +time.
    +
    +Let's first look at how each of these window assigners works before 
looking at how they can be used
    +in a Flink program. We will be using abstract figures to visualize the 
workings of each assigner:
    +in the following, the purple circles are elements of the stream, they are 
partitioned
    +by some key (in this case *user 1*, *user 2* and *user 3*) and the x-axis 
shows the progress
    +of time.
    +
    +### Global Windows
    +
    +Global windows are a way of specifying that we don't want to subdivide our 
elements into windows.
    +Each element is assigned to one single per-key *global window*.
    +This windowing scheme is only useful if you also specify a custom 
[trigger](#triggers). Otherwise,
    +no computation is ever going to be performed, as the global window does 
not have a natural end at
    +which we could process the aggregated elements.
    +
    +<img src="non-windowed.svg" class="center" style="width: 80%;" />
    +
    +### Tumbling Windows
    +
    +A *tumbling windows* assigner assigns elements to fixed length, 
non-overlapping windows of a
    +specified *window size*.. For example, if you specify a window size of 5 
minutes, the window
    +function will get 5 minutes worth of elements in each invocation.
     
    -The general mechanism can define more powerful windows at the cost of more 
verbose syntax. For example,
    -below is a window definition where windows hold elements of the last 5 
seconds and slides every 1 second,
    -but the execution of the window function is triggered when 100 elements 
have been added to the
    -window, and every time execution is triggered, 10 elements are retained in 
the window:
    +<img src="tumbling-windows.svg" class="center" style="width: 80%;" />
    +
    +### Sliding Windows
    +
    +The *sliding windows* assigner assigns elements to windows of fixed length 
equal to *window size*,
    +as the tumbling windows assigner, but in this case, windows can be 
overlapping. The size of the
    +overlap is defined by the user-specified parameter *window slide*. As 
windows are overlapping, an
    +element can be assigned to multiple windows
    +
    +For example, you could have windows of size 10 minutes that slide by 5 
minutes. With this you get 10
    +minutes worth of elements in each invocation of the window function and it 
will be invoked for every
    +5 minutes of data.
    +
    +<img src="sliding-windows.svg" class="center" style="width: 80%;" />
    +
    +### Session Windows
    +
    +The *session windows* assigner is ideal for cases where the window 
boundaries need to adjust to the
    +incoming data. Both the *tumbling windows* and *sliding windows* assigner 
assign elements to windows
    +that start at fixed time points and have a fixed *window size*. With 
session windows it is possible
    +to have windows that start at individual points in time for each key and 
that end once there has
    +been a certain period of inactivity. The configuration parameter is the 
*session gap* that specifies
    +how long to wait for new data before considering a session as closed.
    +
    +<img src="session-windows.svg" class="center" style="width: 80%;" />
    +
    +### Specifying a Window Assigner
    +
    +The built-in window assigners (except `GlobalWindows`) come in two 
versions. One for processing-time
    +windowing and one for event-time windowing. The processing-time assigners 
assign elements to
    +windows based on the current clock of the worker machines while the 
event-time assigners assign
    +windows based on the timestamps of elements. Please have a look at
    +[event time](/apis/streaming/event_time.html) to learn about the 
difference between processing time
    +and event time and about how timestamps can be assigned to elements.
    +
    +The following code snippets show how each of the window assigners can be 
used in a program:
     
     <div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
     {% highlight java %}
    -keyedStream
    -    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    -    .trigger(CountTrigger.of(100))
    -    .evictor(CountEvictor.of(10));
    +DataStream<T> input = ...;
    +
    +// tumbling event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// sliding event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// event-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>);
    +
    +// tumbling processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// sliding processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), 
Time.seconds(5)))
    +    .<windowed transformation>(<window function>);
    +
    +// processing-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>);
    +
    +// global windows
    +input
    +    .keyBy(<key selector>)
    +    .window(GlobalWindows.create())
    +    .<windowed transformation>(<window function>);
     {% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
    -keyedStream
    -    .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
    -    .trigger(CountTrigger.of(100))
    -    .evictor(CountEvictor.of(10))
    +val input: DataStream[T] = ...
    +
    +// tumbling event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// sliding event-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// event-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>)
    +
    +// tumbling processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// sliding processing-time windows
    +input
    +    .keyBy(<key selector>)
    +    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), 
Time.seconds(5)))
    +    .<windowed transformation>(<window function>)
    +
    +// processing-time session windows
    +input
    +    .keyBy(<key selector>)
    +    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    +    .<windowed transformation>(<window function>)
    +
    +// global windows
    +input
    +    .keyBy(<key selector>)
    +    .window(GlobalWindows.create())
     {% endhighlight %}
     </div>
     </div>
     
    -The general recipe for building a custom window is to specify (1) a 
`WindowAssigner`, (2) a `Trigger` (optionally),
    -and (3) an `Evictor` (optionally).
    +## Window Functions
     
    -The `WindowAssigner` defines how incoming elements are assigned to 
windows. A window is a logical group of elements
    -that has a begin-value, and an end-value corresponding to a begin-time and 
end-time. Elements with timestamp (according
    -to some notion of time described above within these values are part of the 
window).
    +The *window function* is used to process the elements of each window (and 
key) once the system
    +determines that a window is ready for processing (see 
[triggers](#triggers) for how the system
    +determines when a window is ready).
     
    -For example, the `SlidingEventTimeWindows`
    -assigner in the code above defines a window of size 5 seconds, and a slide 
of 1 second. Assume that
    -time starts from 0 and is measured in milliseconds. Then, we have 6 windows
    -that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 
9000], and [5000, 10000]. Each incoming
    -element is assigned to the windows according to its timestamp. For 
example, an element with timestamp 2000 will be
    -assigned to the first three windows. Flink comes bundled with window 
assigners that cover the most common use cases. You can write your
    -own window types by extending the `WindowAssigner` class.
    +The window function can be one of `ReduceFunction`, `FoldFunction` or 
`WindowFunction`. The former
    +two can be executed more efficiently because Flink can incrementally 
aggregate the elements for each
    +window as they arrive. A `WindowFunction` gets an `Iterable` for all the 
elements contained in a
    +window and additional meta information about the window to which the 
elements belong.
     
    -<div class="codetabs" markdown="1">
    +A windowed transformation with a `WindowFunction` cannot be executed as 
efficiently as the other
    +cases because Flink has to buffer all elements for a window internally 
before invoking the function.
    +This can be mitigated by combining a `WindowFunction` with a 
`ReduceFunction` or `FoldFunction` to
    +get both incremental aggregation of window elements and the additional 
information that the
    +`WindowFunction` receives. We will look at examples for each of these 
variants.
    +
    +### ReduceFunction
    +
    +A reduce function specifies how two values can be combined to form one 
element. Flink can use this
    +to incrementally aggregate the elements in a window.
     
    +A `ReduceFunction` can be used in a program like this:
    +
    +<div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Global window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
    -        <td>
    -          <p>
    -       All incoming elements of a given key are assigned to the same 
window.
    -       The window does not contain a default trigger, hence it will never 
be triggered
    -       if a trigger is not explicitly specified.
    -          </p>
    -    {% highlight java %}
    -stream.window(GlobalWindows.create());
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -        <td><strong>Tumbling event-time windows</strong><br>KeyedStream 
&rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size 
(1 second below) based on
    -            their timestamp. Windows do not overlap, i.e., each element is 
assigned to exactly one window.
    -            This assigner comes with a default trigger that fires for a 
window when a
    -            watermark with value higher than its end-value is received.
    -          </p>
    -      {% highlight java %}
    -stream.window(TumblingEventTimeWindows.of(Time.seconds(1)));
    -      {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -        <td><strong>Sliding event-time windows</strong><br>KeyedStream 
&rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size 
(5 seconds below) based on
    -            their timestamp. Windows "slide" by the provided value (1 
second in the example), and hence
    -            overlap. This assigner comes with a default trigger that fires 
for a window when a
    -             watermark with value higher than its end-value is received.
    -          </p>
    -    {% highlight java %}
    -stream.window(SlidingEventTimeWindows.of(Time.seconds(5), 
Time.seconds(1)));
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Tumbling processing time 
windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -              Incoming elements are assigned to a window of a certain size 
(1 second below) based on
    -              the current processing time. Windows do not overlap, i.e., 
each element is assigned to exactly one window.
    -              This assigner comes with a default trigger that fires for a 
window a window when the current
    -              processing time exceeds its end-value.
    -            </p>
    -      {% highlight java %}
    -stream.window(TumblingProcessingTimeWindows.of(Time.seconds(1)));
    -      {% endhighlight %}
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Sliding processing time 
windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size 
(5 seconds below) based on
    -            their timestamp. Windows "slide" by the provided value (1 
second in the example), and hence
    -            overlap. This assigner comes with a default trigger that fires 
for a window a window when the current
    -            processing time exceeds its end-value.
    -          </p>
    -    {% highlight java %}
    -stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), 
Time.seconds(1)));
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -          <tr>
    -        <td><strong>Event-time Session windows</strong><br>KeyedStream 
&rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to sessions based on a session 
gap interval (5 seconds in the example below).
    -            Elements whose timestamp differs by more than the session gap 
are assigned to different sessions. If there are
    -            consecutive elements which are less than the session gap apart 
then these will also be put into the same session, i.e. elements
    -            can be connected into a session by intermediate elements.
    -          </p>
    -    {% highlight scala %}
    -keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -       <tr>
    -        <td><strong>Processing time Session 
windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -           This is similar to event-time session windows but works on the 
current processing
    -           time instead of the timestamp of elements
    -          </p>
    -    {% highlight scala %}
    -keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -  </tbody>
    -</table>
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> input = ...;
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .reduce(new ReduceFunction<Tuple2<String, Long>> {
    +      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, 
Tuple2<String, Long> v2) {
    +        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
    +      }
    +    });
    +{% endhighlight %}
     </div>
     
     <div data-lang="scala" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -      <tr>
    -        <td><strong>Global window</strong><br>KeyedStream &rarr; 
WindowedStream</td>
    -        <td>
    -          <p>
    -            All incoming elements of a given key are assigned to the same 
window.
    -       The window does not contain a default trigger, hence it will never 
be triggered
    -       if a trigger is not explicitly specified.
    -          </p>
    -    {% highlight scala %}
    -stream.window(GlobalWindows.create)
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Tumbling event-time windows</strong><br>KeyedStream 
&rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -             Incoming elements are assigned to a window of a certain size 
(1 second below) based on
    -            their timestamp. Windows do not overlap, i.e., each element is 
assigned to exactly one window.
    -            This assigner comes with a default trigger that fires for a 
window when a
    -            watermark with value higher than its end-value is received.
    -            </p>
    -      {% highlight scala %}
    -stream.window(TumblingEventTimeWindows.of(Time.seconds(1)))
    -      {% endhighlight %}
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Sliding event-time windows</strong><br>KeyedStream 
&rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size 
(5 seconds below) based on
    -            their timestamp. Windows "slide" by the provided value (1 
second in the example), and hence
    -            overlap. This assigner comes with a default trigger that fires 
for a window when a
    -            watermark with value higher than its end-value is received.
    -          </p>
    -    {% highlight scala %}
    -stream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -      <tr>
    -          <td><strong>Tumbling processing time 
windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -          <td>
    -            <p>
    -              Incoming elements are assigned to a window of a certain size 
(1 second below) based on
    -              the current processing time. Windows do not overlap, i.e., 
each element is assigned to exactly one window.
    -              This assigner comes with a default trigger that fires for a 
window a window when the current
    -              processing time exceeds its end-value.
    -
    -            </p>
    -      {% highlight scala %}
    -stream.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    -      {% endhighlight %}
    -          </td>
    -        </tr>
    -      <tr>
    -        <td><strong>Sliding processing time 
windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to a window of a certain size 
(5 seconds below) based on
    -            their timestamp. Windows "slide" by the provided value (1 
second in the example), and hence
    -            overlap. This assigner comes with a default trigger that fires 
for a window a window when the current
    -            processing time exceeds its end-value.
    -          </p>
    -    {% highlight scala %}
    -stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), 
Time.seconds(1)))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -         <tr>
    -        <td><strong>Event-time Session windows</strong><br>KeyedStream 
&rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -            Incoming elements are assigned to sessions based on a session 
gap interval (5 seconds in the example below).
    -            Elements whose timestamp differs by more than the session gap 
are assigned to different sessions. If there are
    -            consecutive elements which are less than the session gap apart 
then these will also be put into the same session, i.e. elements
    -            can be connected into a session by intermediate elements.
    -          </p>
    -    {% highlight scala %}
    -keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -       <tr>
    -        <td><strong>Processing time Session 
windows</strong><br>KeyedStream &rarr; WindowedStream</td>
    -        <td>
    -          <p>
    -           This is similar to event-time session windows but works on the 
current processing
    -           time instead of the timestamp of elements
    -          </p>
    -    {% highlight scala %}
    -keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
    -    {% endhighlight %}
    -        </td>
    -      </tr>
    -  </tbody>
    -</table>
    -</div>
    +{% highlight scala %}
    +val input: DataStream[(String, Long)] = ...
     
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
    +{% endhighlight %}
    +</div>
     </div>
     
    -The `Trigger` specifies when the function that comes after the window 
clause (e.g., `sum`, `count`) is evaluated ("fires")
    -for each window. If a trigger is not specified, a default trigger for each 
window type is used (that is part of the
    -definition of the `WindowAssigner`). Flink comes bundled with a set of 
triggers if the ones that windows use by
    -default do not fit the application. You can write your own trigger by 
implementing the `Trigger` interface. Note that
    -specifying a trigger will override the default trigger of the window 
assigner.
    +A `ReduceFunction` specifies how two elements from the input can be 
combined to produce
    +an output element. This example will sum up the second field of the tuple 
for all elements
    +in a window.
     
    -<div class="codetabs" markdown="1">
    +### FoldFunction
     
    +A fold function can be specified like this:
    +
    +<div class="codetabs" markdown="1">
     <div data-lang="java" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -  <tr>
    -    <td><strong>Processing time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when the current processing time exceeds its 
end-value.
    -        The elements on the triggered window are henceforth discarded.
    -      </p>
    -{% highlight java %}
    -windowedStream.trigger(ProcessingTimeTrigger.create());
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Watermark trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when a watermark with value that exceeds the 
window's end-value has been received.
    -        The elements on the triggered window are henceforth discarded.
    -      </p>
     {% highlight java %}
    -windowedStream.trigger(EventTimeTrigger.create());
    +DataStream<Tuple2<String, Long>> input = ...;
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .fold("", new FoldFunction<Tuple2<String, Long>, String>> {
    +       public String fold(String acc, Tuple2<String, Long> value) {
    +         return acc + value.f1;
    +       }
    +    });
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Continuous processing time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5 
seconds in the example).
    -        The window is actually fired only when the current processing time 
exceeds its end-value.
    -        The elements on the triggered window are retained.
    -      </p>
    -{% highlight java %}
    
-windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)));
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Continuous watermark time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5 
seconds in the example).
    -        A window is actually fired when a watermark with value that 
exceeds the window's end-value has been received.
    -        The elements on the triggered window are retained.
    -      </p>
    -{% highlight java %}
    -windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val input: DataStream[(String, Long)] = ...
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .fold("") { (acc, v) => acc + v._2 }
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Count trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when it has more than a certain number of 
elements (1000 below).
    -        The elements of the triggered window are retained.
    -      </p>
    +</div>
    +</div>
    +
    +A `FoldFunction` specifies how elements from the input will be added to an 
initial
    +accumulator value (`""`, the empty string, in our example). This example 
will compute
    +a concatenation of all the `Long` fields of the input.
    +
    +### WindowFunction - The Generic Case
    +
    +Using a `WindowFunction` provides most flexibility, at the cost of 
performance. The reason for this
    +is that elements cannot be incrementally aggregated for a window and 
instead need to be buffered
    +internally until the window is considered ready for processing. A 
`WindowFunction` gets an
    +`Iterable` containing all the elements of the window being processed. The 
signature of
    +`WindowFunction` is this:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
     {% highlight java %}
    -windowedStream.trigger(CountTrigger.of(1000));
    +public interface WindowFunction<IN, OUT, KEY, W extends Window> extends 
Function, Serializable {
    +
    +  /**
    +   * Evaluates the window and outputs none or several elements.
    +   *
    +   * @param key The key for which this window is evaluated.
    +   * @param window The window that is being evaluated.
    +   * @param input The elements in the window being evaluated.
    +   * @param out A collector for emitting elements.
    +   *
    +   * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
    +   */
    +  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) 
throws Exception;
    +}
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Purging trigger</strong></td>
    -    <td>
    -      <p>
    -        Takes any trigger as an argument and forces the triggered window 
elements to be
    -        "purged" (discarded) after triggering.
    -      </p>
    -{% highlight java %}
    -windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000)));
    +</div>
    +
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +public interface WindowFunction<IN, OUT, KEY, W extends Window> extends 
Function, Serializable {
    +
    +  /**
    +   * Evaluates the window and outputs none or several elements.
    +   *
    +   * @param key The key for which this window is evaluated.
    +   * @param window The window that is being evaluated.
    +   * @param input The elements in the window being evaluated.
    +   * @param out A collector for emitting elements.
    +   *
    +   * @throws Exception The function may throw exceptions to fail the 
program and trigger recovery.
    +   */
    +  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) 
throws Exception;
    +}
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Delta trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5000 
milliseconds in the example).
    -        A window is actually fired when the value of the last added 
element exceeds the value of
    -        the first element inserted in the window according to a 
`DeltaFunction`.
    -      </p>
    +</div>
    +</div>
    +
    +Here we show an example that uses a `WindowFunction` to count the elements 
in a window. We do this
    +because we want to access information about the window itself to emit it 
along with the count.
    +This is very inefficient, however, and should be implemented with a
    +`ReduceFunction` in practice. Below, we will see an example of how a 
`ReduceFunction` can
    +be combined with a `WindowFunction` to get both incremental aggregation 
and the added
    +information of a `WindowFunction`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
     {% highlight java %}
    -windowedStream.trigger(new DeltaTrigger.of(5000.0, new 
DeltaFunction<Double>() {
    -    @Override
    -    public double getDelta (Double old, Double new) {
    -        return (new - old > 0.01);
    +DataStream<Tuple2<String, Long>> input = ...;
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(new MyWindowFunction());
    +
    +/* ... */
    +
    +public class MyWindowFunction implements WindowFunction<Tuple<String, 
Long>, String, String, TimeWindow> {
    +
    +  void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> 
input, Collector<String> out) {
    +    long count = 0;
    +    for (Tuple<String, Long> in: input) {
    +      count++;
         }
    -}));
    +    out.collect("Window: " + window + "count: " + count);
    +  }
    +}
    +
     {% endhighlight %}
    -    </td>
    -  </tr>
    - </tbody>
    -</table>
     </div>
     
    -
     <div data-lang="scala" markdown="1">
    -<table class="table table-bordered">
    -  <thead>
    -    <tr>
    -      <th class="text-left" style="width: 25%">Transformation</th>
    -      <th class="text-center">Description</th>
    -    </tr>
    -  </thead>
    -  <tbody>
    -  <tr>
    -    <td><strong>Processing time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when the current processing time exceeds its 
end-value.
    -        The elements on the triggered window are henceforth discarded.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(ProcessingTimeTrigger.create);
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Watermark trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when a watermark with value that exceeds the 
window's end-value has been received.
    -        The elements on the triggered window are henceforth discarded.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(EventTimeTrigger.create);
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Continuous processing time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5 
seconds in the example).
    -        The window is actually fired only when the current processing time 
exceeds its end-value.
    -        The elements on the triggered window are retained.
    -      </p>
    -{% highlight scala %}
    
-windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)));
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Continuous watermark time trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is periodically considered for being fired (every 5 
seconds in the example).
    -        A window is actually fired when a watermark with value that 
exceeds the window's end-value has been received.
    -        The elements on the triggered window are retained.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(ContinuousEventTimeTrigger.of(Time.seconds(5)));
    -{% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Count trigger</strong></td>
    -    <td>
    -      <p>
    -        A window is fired when it has more than a certain number of 
elements (1000 below).
    -        The elements of the triggered window are retained.
    -      </p>
     {% highlight scala %}
    -windowedStream.trigger(CountTrigger.of(1000));
    +val input: DataStream[(String, Long)] = ...
    +
    +input
    +    .keyBy(<key selector>)
    +    .window(<window assigner>)
    +    .apply(new MyWindowFunction())
    +
    +/* ... */
    +
    +class MyWindowFunction extends WindowFunction[(String, Long), String, 
String, TimeWindow] {
    +
    +  def apply(key: String, window: TimeWindow, input: Iterable[(String, 
Long)], out: Collector[String]): () = {
    +    var count = 0L
    +    for (in <- input) {
    +      count = count + 1
    +    }
    +    out.collect(s"Window $window count: $count")
    +  }
    +}
     {% endhighlight %}
    -    </td>
    -  </tr>
    -  <tr>
    -    <td><strong>Purging trigger</strong></td>
    -    <td>
    -      <p>
    -        Takes any trigger as an argument and forces the triggered window 
elements to be
    -        "purged" (discarded) after triggering.
    -      </p>
    -{% highlight scala %}
    -windowedStream.trigger(PurgingTrigger.of(CountTrigger.of(1000)));
    +</div>
    +</div>
    +
    +### WindowFunction with Incremental Aggregation
    +
    +A `WindowFunction` can be combined with either a `ReduceFunction` or a 
`FoldFunction`. This allows
    +to get the benefit of incremental window computation and also have the 
additional meta information
    +that writing a `WindowFunction` provides.
    +
    +This is an exampel that shows how incremental aggregation functions can be 
combined with
    --- End diff --
    
    typo: exampel


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to