[
https://issues.apache.org/jira/browse/FLINK-4062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15352896#comment-15352896
]
ASF GitHub Bot commented on FLINK-4062:
---------------------------------------
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/2154#discussion_r68744858
--- Diff: docs/apis/streaming/windows.md ---
@@ -24,1023 +24,593 @@ specific language governing permissions and
limitations
under the License.
-->
+Flink uses a concept called *windows* to divide a (potentially) infinite
`DataStream` into slices
+based on the timestamps of elements or other criteria. This division is
required when working
+with infinite streams of data and performing transformations that
aggregate elements.
+
+<span class="label label-info">Info</span> We will mostly talk about
*keyed windowing* here, this
+means that the elements are subdivided based on both window and key before
being given to
+a user function. Keyed windows have the advantage that work can be
distributed across the cluster
+because the elements for different keys can be processed in isolation. If
you absolutely must,
+you can check out [non-keyed windowing](#non-keyed-windowing) where we
describe how non-keyed
+windows work.
+
* This will be replaced by the TOC
{:toc}
-## Windows on Keyed Data Streams
-
-Flink offers a variety of methods for defining windows on a `KeyedStream`.
All of these group elements *per key*,
-i.e., each window will contain elements with the same key value.
+## Basics
-### Basic Window Constructs
+For a windowed transformations you must at least specify a *key* (usually
in the form of a
+`KeySelector`) a *window assigner* and a *window function*. The *key*
specifies how elements are
+put into groups. The *window assigner* specifies how the infinite stream
is divided into windows.
+Finally, the *window function* is used to process the elements of each
window.
-Flink offers a general window mechanism that provides flexibility, as well
as a number of pre-defined windows
-for common use cases. See first if your use case can be served by the
pre-defined windows below before moving
-to defining your own windows.
+The basic structure of a windowed transformation is thus as follows:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<T> input = ...;
-<br />
-
-<table class="table table-bordered">
- <thead>
- <tr>
- <th class="text-left" style="width: 25%">Transformation</th>
- <th class="text-center">Description</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td><strong>Tumbling time window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 5 seconds, that "tumbles". This means that
elements are
- grouped according to their timestamp in groups of 5 second
duration, and every element belongs to exactly one window.
- The notion of time is specified by the selected TimeCharacteristic
(see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
- {% highlight java %}
-keyedStream.timeWindow(Time.seconds(5));
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Sliding time window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 5 seconds, that "slides" by 1 second.
This means that elements are
- grouped according to their timestamp in groups of 5 second
duration, and elements can belong to more than
- one window (since windows overlap by at most 4 seconds)
- The notion of time is specified by the selected
TimeCharacteristic (see <a href="{{ site.baseurl
}}/apis/streaming/event_time.html">time</a>).
- {% highlight java %}
-keyedStream.timeWindow(Time.seconds(5), Time.seconds(1));
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Tumbling count window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 1000 elements, that "tumbles". This means
that elements are
- grouped according to their arrival time (equivalent to
processing time) in groups of 1000 elements,
- and every element belongs to exactly one window.
- {% highlight java %}
-keyedStream.countWindow(1000);
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Sliding count window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 1000 elements, that "slides" every 100
elements. This means that elements are
- grouped according to their arrival time (equivalent to
processing time) in groups of 1000 elements,
- and every element can belong to more than one window (as windows
overlap by at most 900 elements).
- {% highlight java %}
-keyedStream.countWindow(1000, 100)
- {% endhighlight %}
- </p>
- </td>
- </tr>
- </tbody>
-</table>
-
+input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .<windowed transformation>(<window function>);
+{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[T] = ...
-<br />
-
-<table class="table table-bordered">
- <thead>
- <tr>
- <th class="text-left" style="width: 25%">Transformation</th>
- <th class="text-center">Description</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td><strong>Tumbling time window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 5 seconds, that "tumbles". This means that
elements are
- grouped according to their timestamp in groups of 5 second
duration, and every element belongs to exactly one window.
- The notion of time is specified by the selected
TimeCharacteristic (see <a href="{{ site.baseurl
}}/apis/streaming/event_time.html">time</a>).
- {% highlight scala %}
-keyedStream.timeWindow(Time.seconds(5))
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Sliding time window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 5 seconds, that "slides" by 1 second.
This means that elements are
- grouped according to their timestamp in groups of 5 second
duration, and elements can belong to more than
- one window (since windows overlap by at most 4 seconds)
- The notion of time is specified by the selected
TimeCharacteristic (see <a href="{{ site.baseurl
}}/apis/streaming/event_time.html">time</a>).
- {% highlight scala %}
-keyedStream.timeWindow(Time.seconds(5), Time.seconds(1))
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Tumbling count window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 1000 elements, that "tumbles". This means
that elements are
- grouped according to their arrival time (equivalent to
processing time) in groups of 1000 elements,
- and every element belongs to exactly one window.
- {% highlight scala %}
-keyedStream.countWindow(1000)
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Sliding count window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 1000 elements, that "slides" every 100
elements. This means that elements are
- grouped according to their arrival time (equivalent to
processing time) in groups of 1000 elements,
- and every element can belong to more than one window (as windows
overlap by at most 900 elements).
- {% highlight scala %}
-keyedStream.countWindow(1000, 100)
- {% endhighlight %}
- </p>
- </td>
- </tr>
- </tbody>
-</table>
-
+input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .<windowed transformation>(<window function>)
+{% endhighlight %}
</div>
</div>
-### Advanced Window Constructs
+We will cover the different window assigners in [window
assigners](#window-assigners).
+
+The window transformation can be one of `reduce()`, `fold()` or `apply()`.
Which respectively
+takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe
each of these ways
+of specifying a windowed transformation in detail in [window
functions](#window-functions).
+
+For more advanced use cases you can also specify a `Trigger` that
determines when exactly a window
+is being considered as *ready for processing*. These will be covered in
more detail in
+[triggers](#triggers).
+
+## Window Assigners
+
+The window assigner specifies how elements of the stream are divided into
slices. You can provide
+your custom window assigner by implementing `WindowAssigner` but Flink
comes with
+window assigners for typical use cases: *tumbling windows*, *sliding
windows*, *session windows*
+and *global windows*. Except for the last, all of these assign windows
based on time, which
+can either be processing time or event time.
+
+Let's first look at how each of these window assigners works before
looking at how they can be used
+in a Flink program. We will be using abstract figures to visualize the
workings of each assigner:
+in the following, the purple circles are elements of the stream, they are
partitioned
+by some key (in this case *user 1*, *user 2* and *user 3*) and the x-axis
shows the progress
+of time.
+
+### Global Windows
+
+Global windows are a way of specifying that we don't want to subdivide our
elements into windows.
+Each element is assigned to the one single *global window* (still separate
for each key, of course).
+This is only useful if you also specify a custom [trigger](#triggers),
otherwise you will never
+process any data because the global window does not have a natural end at
which we could process
+the aggregated elements.
+
+<img src="non-windowed.svg" class="center" style="width: 80%;" />
-The general mechanism can define more powerful windows at the cost of more
verbose syntax. For example,
-below is a window definition where windows hold elements of the last 5
seconds and slides every 1 second,
-but the execution of the window function is triggered when 100 elements
have been added to the
-window, and every time execution is triggered, 10 elements are retained in
the window:
+### Tumbling Windows
+
+A *tumbling windows* assigner assigns elements to fixed time buckets of a
specified *window size*.
+For example, if you specify a window size of 5 minutes, the window
function will get 5 minutes
+worth of elements in each invocation.
+
+<img src="tumbling-windows.svg" class="center" style="width: 80%;" />
+
+### Sliding Windows
+
+The *sliding windows* assigner is very similar to the *tumbling windows*
assigner but it assigns
+one element to more than one windows based on a *window size* and *window
slide* size. For example,
+you could have windows of size 10 minutes that slide by 5 minutes. With
this you get 10 minutes
+worth of elements in each invocation of the window function and it will be
invoked for every
+5 minutes of data.
+
+<img src="sliding-windows.svg" class="center" style="width: 80%;" />
+
+### Session Windows
+
+The *session windows* assigner can be used if windows need to dynamically
adapt to the data.
+Both the *tumbling windows* and *sliding windows* assigner assign elements
to windows that start
+at fixed time points. With session windows it is possible to have windows
that start at
--- End diff --
time points, and have fixed duration.
> Update Windowing Documentation
> ------------------------------
>
> Key: FLINK-4062
> URL: https://issues.apache.org/jira/browse/FLINK-4062
> Project: Flink
> Issue Type: Sub-task
> Components: Documentation
> Affects Versions: 1.1.0
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> The window documentation could be a bit more principled and also needs
> updating with the new allowed lateness setting.
> There is also essentially no documentation about how to write a custom
> trigger.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)