Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2154#discussion_r69157148
--- Diff: docs/apis/streaming/windows.md ---
@@ -24,1023 +24,608 @@ specific language governing permissions and
limitations
under the License.
-->
+Flink uses a concept called *windows* to divide a (potentially) infinite
`DataStream` into finite
+slices based on the timestamps of elements or other criteria. This
division is required when working
+with infinite streams of data and performing transformations that
aggregate elements.
+
+<span class="label label-info">Info</span> We will mostly talk about
*keyed windowing* here, i.e.
+windows that are applied on a `KeyedStream`. Keyed windows have the
advantage that elements are
+subdivided based on both window and key before being given to
+a user function. The work can thus be distributed across the cluster
+because the elements for different keys can be processed independently. If
you absolutely have to,
+you can check out [non-keyed windowing](#non-keyed-windowing) where we
describe how non-keyed
+windows work.
+
* This will be replaced by the TOC
{:toc}
-## Windows on Keyed Data Streams
-
-Flink offers a variety of methods for defining windows on a `KeyedStream`.
All of these group elements *per key*,
-i.e., each window will contain elements with the same key value.
+## Basics
-### Basic Window Constructs
+For a windowed transformations you must at least specify a *key*
+(see [specifying keys](apis/common/index.html#specifying-keys))
+a *window assigner* and a *window function*. The *key* divides the
infinite, non-keyed, stream
+into logical keyed streams while the *window assigner* assigns elements to
finite per-key windows.
+Finally, the *window function* is used to process the elements of each
window.
-Flink offers a general window mechanism that provides flexibility, as well
as a number of pre-defined windows
-for common use cases. See first if your use case can be served by the
pre-defined windows below before moving
-to defining your own windows.
+The basic structure of a windowed transformation is thus as follows:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<T> input = ...;
-<br />
-
-<table class="table table-bordered">
- <thead>
- <tr>
- <th class="text-left" style="width: 25%">Transformation</th>
- <th class="text-center">Description</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td><strong>Tumbling time window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 5 seconds, that "tumbles". This means that
elements are
- grouped according to their timestamp in groups of 5 second
duration, and every element belongs to exactly one window.
- The notion of time is specified by the selected TimeCharacteristic
(see <a href="{{ site.baseurl }}/apis/streaming/event_time.html">time</a>).
- {% highlight java %}
-keyedStream.timeWindow(Time.seconds(5));
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Sliding time window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 5 seconds, that "slides" by 1 second.
This means that elements are
- grouped according to their timestamp in groups of 5 second
duration, and elements can belong to more than
- one window (since windows overlap by at most 4 seconds)
- The notion of time is specified by the selected
TimeCharacteristic (see <a href="{{ site.baseurl
}}/apis/streaming/event_time.html">time</a>).
- {% highlight java %}
-keyedStream.timeWindow(Time.seconds(5), Time.seconds(1));
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Tumbling count window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 1000 elements, that "tumbles". This means
that elements are
- grouped according to their arrival time (equivalent to
processing time) in groups of 1000 elements,
- and every element belongs to exactly one window.
- {% highlight java %}
-keyedStream.countWindow(1000);
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Sliding count window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 1000 elements, that "slides" every 100
elements. This means that elements are
- grouped according to their arrival time (equivalent to
processing time) in groups of 1000 elements,
- and every element can belong to more than one window (as windows
overlap by at most 900 elements).
- {% highlight java %}
-keyedStream.countWindow(1000, 100)
- {% endhighlight %}
- </p>
- </td>
- </tr>
- </tbody>
-</table>
-
+input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .<windowed transformation>(<window function>);
+{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val input: DataStream[T] = ...
-<br />
-
-<table class="table table-bordered">
- <thead>
- <tr>
- <th class="text-left" style="width: 25%">Transformation</th>
- <th class="text-center">Description</th>
- </tr>
- </thead>
- <tbody>
- <tr>
- <td><strong>Tumbling time window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 5 seconds, that "tumbles". This means that
elements are
- grouped according to their timestamp in groups of 5 second
duration, and every element belongs to exactly one window.
- The notion of time is specified by the selected
TimeCharacteristic (see <a href="{{ site.baseurl
}}/apis/streaming/event_time.html">time</a>).
- {% highlight scala %}
-keyedStream.timeWindow(Time.seconds(5))
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Sliding time window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 5 seconds, that "slides" by 1 second.
This means that elements are
- grouped according to their timestamp in groups of 5 second
duration, and elements can belong to more than
- one window (since windows overlap by at most 4 seconds)
- The notion of time is specified by the selected
TimeCharacteristic (see <a href="{{ site.baseurl
}}/apis/streaming/event_time.html">time</a>).
- {% highlight scala %}
-keyedStream.timeWindow(Time.seconds(5), Time.seconds(1))
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Tumbling count window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 1000 elements, that "tumbles". This means
that elements are
- grouped according to their arrival time (equivalent to
processing time) in groups of 1000 elements,
- and every element belongs to exactly one window.
- {% highlight scala %}
-keyedStream.countWindow(1000)
- {% endhighlight %}
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Sliding count window</strong><br>KeyedStream →
WindowedStream</td>
- <td>
- <p>
- Defines a window of 1000 elements, that "slides" every 100
elements. This means that elements are
- grouped according to their arrival time (equivalent to
processing time) in groups of 1000 elements,
- and every element can belong to more than one window (as windows
overlap by at most 900 elements).
- {% highlight scala %}
-keyedStream.countWindow(1000, 100)
- {% endhighlight %}
- </p>
- </td>
- </tr>
- </tbody>
-</table>
-
+input
+ .keyBy(<key selector>)
+ .window(<window assigner>)
+ .<windowed transformation>(<window function>)
+{% endhighlight %}
</div>
</div>
-### Advanced Window Constructs
+We will cover [window assigners](#window-assigners) in a separate section
below.
+
+The window transformation can be one of `reduce()`, `fold()` or `apply()`.
Which respectively
+takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe
each of these ways
+of specifying a windowed transformation in detail below: [window
functions](#window-functions).
+
+For more advanced use cases you can also specify a `Trigger` that
determines when exactly a window
+is being considered as *ready for processing*. These will be covered in
more detail in
+[triggers](#triggers).
+
+## Window Assigners
+
+The window assigner specifies how elements of the stream are divided into
finite slices. Flink comes
+with pre-implemented window assigners for the most typical use cases,
namely *tumbling windows*,
+*sliding windows*, *session windows* and *global windows* but you can
implement your own by
+extending the `WindowAssigner` class. All the built-in window assigners,
except for the global
+windows one, assign elements to windows based on time, which can either be
processing time or event
+time.
+
+Let's first look at how each of these window assigners works before
looking at how they can be used
+in a Flink program. We will be using abstract figures to visualize the
workings of each assigner:
+in the following, the purple circles are elements of the stream, they are
partitioned
+by some key (in this case *user 1*, *user 2* and *user 3*) and the x-axis
shows the progress
+of time.
+
+### Global Windows
+
+Global windows are a way of specifying that we don't want to subdivide our
elements into windows.
+Each element is assigned to one single per-key *global window*.
+This windowing scheme is only useful if you also specify a custom
[trigger](#triggers). Otherwise,
+no computation is ever going to be performed, as the global window does
not have a natural end at
+which we could process the aggregated elements.
+
+<img src="non-windowed.svg" class="center" style="width: 80%;" />
+
+### Tumbling Windows
+
+A *tumbling windows* assigner assigns elements to fixed length,
non-overlapping windows of a
+specified *window size*.. For example, if you specify a window size of 5
minutes, the window
+function will get 5 minutes worth of elements in each invocation.
-The general mechanism can define more powerful windows at the cost of more
verbose syntax. For example,
-below is a window definition where windows hold elements of the last 5
seconds and slides every 1 second,
-but the execution of the window function is triggered when 100 elements
have been added to the
-window, and every time execution is triggered, 10 elements are retained in
the window:
+<img src="tumbling-windows.svg" class="center" style="width: 80%;" />
+
+### Sliding Windows
+
+The *sliding windows* assigner assigns elements to windows of fixed length
equal to *window size*,
+as the tumbling windows assigner, but in this case, windows can be
overlapping. The size of the
+overlap is defined by the user-specified parameter *window slide*. As
windows are overlapping, an
+element can be assigned to multiple windows
+
+For example, you could have windows of size 10 minutes that slide by 5
minutes. With this you get 10
+minutes worth of elements in each invocation of the window function and it
will be invoked for every
+5 minutes of data.
+
+<img src="sliding-windows.svg" class="center" style="width: 80%;" />
+
+### Session Windows
+
+The *session windows* assigner is ideal for cases where the window
boundaries need to adjust to the
+incoming data. Both the *tumbling windows* and *sliding windows* assigner
assign elements to windows
+that start at fixed time points and have a fixed *window size*. With
session windows it is possible
+to have windows that start at individual points in time for each key and
that end once there has
+been a certain period of inactivity. The configuration parameter is the
*session gap* that specifies
+how long to wait for new data before considering a session as closed.
+
+<img src="session-windows.svg" class="center" style="width: 80%;" />
+
+### Specifying a Window Assigner
+
+The built-in window assigners (except `GlobalWindows`) come in two
versions. One for processing-time
+windowing and one for event-time windowing. The processing-time assigners
assign elements to
+windows based on the current clock of the worker machines while the
event-time assigners assign
+windows based on the timestamps of elements. Please have a look at
+[event time](/apis/streaming/event_time.html) to learn about the
difference between processing time
+and event time and about how timestamps can be assigned to elements.
+
+The following code snippets show how each of the window assigners can be
used in a program:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-keyedStream
- .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
- .trigger(CountTrigger.of(100))
- .evictor(CountEvictor.of(10));
+DataStream<T> input = ...;
+
+// tumbling event-time windows
+input
+ .keyBy(<key selector>)
+ .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+ .<windowed transformation>(<window function>);
+
+// sliding event-time windows
+input
+ .keyBy(<key selector>)
+ .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
+ .<windowed transformation>(<window function>);
+
+// event-time session windows
+input
+ .keyBy(<key selector>)
+ .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
+ .<windowed transformation>(<window function>);
+
+// tumbling processing-time windows
+input
+ .keyBy(<key selector>)
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
+ .<windowed transformation>(<window function>);
+
+// sliding processing-time windows
+input
+ .keyBy(<key selector>)
+ .window(SlidingProcessingTimeWindows.of(Time.seconds(10),
Time.seconds(5)))
+ .<windowed transformation>(<window function>);
+
+// processing-time session windows
+input
+ .keyBy(<key selector>)
+ .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
+ .<windowed transformation>(<window function>);
+
+// global windows
+input
+ .keyBy(<key selector>)
+ .window(GlobalWindows.create())
+ .<windowed transformation>(<window function>);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
-keyedStream
- .window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1))
- .trigger(CountTrigger.of(100))
- .evictor(CountEvictor.of(10))
+val input: DataStream[T] = ...
+
+// tumbling event-time windows
+input
+ .keyBy(<key selector>)
+ .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+ .<windowed transformation>(<window function>)
+
+// sliding event-time windows
+input
+ .keyBy(<key selector>)
+ .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
+ .<windowed transformation>(<window function>)
+
+// event-time session windows
+input
+ .keyBy(<key selector>)
+ .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
+ .<windowed transformation>(<window function>)
+
+// tumbling processing-time windows
+input
+ .keyBy(<key selector>)
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
+ .<windowed transformation>(<window function>)
+
+// sliding processing-time windows
+input
+ .keyBy(<key selector>)
+ .window(SlidingProcessingTimeWindows.of(Time.seconds(10),
Time.seconds(5)))
+ .<windowed transformation>(<window function>)
+
+// processing-time session windows
+input
+ .keyBy(<key selector>)
+ .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
+ .<windowed transformation>(<window function>)
+
+// global windows
+input
+ .keyBy(<key selector>)
+ .window(GlobalWindows.create())
{% endhighlight %}
</div>
</div>
-The general recipe for building a custom window is to specify (1) a
`WindowAssigner`, (2) a `Trigger` (optionally),
-and (3) an `Evictor` (optionally).
+## Window Functions
-The `WindowAssigner` defines how incoming elements are assigned to
windows. A window is a logical group of elements
-that has a begin-value, and an end-value corresponding to a begin-time and
end-time. Elements with timestamp (according
-to some notion of time described above within these values are part of the
window).
+The *window function* is used to process the elements of each window (and
key) once the system
+determines that a window is ready for processing (see
[triggers](#triggers) for how the system
+determines when a window is ready).
-For example, the `SlidingEventTimeWindows`
-assigner in the code above defines a window of size 5 seconds, and a slide
of 1 second. Assume that
-time starts from 0 and is measured in milliseconds. Then, we have 6 windows
-that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000,
9000], and [5000, 10000]. Each incoming
-element is assigned to the windows according to its timestamp. For
example, an element with timestamp 2000 will be
-assigned to the first three windows. Flink comes bundled with window
assigners that cover the most common use cases. You can write your
-own window types by extending the `WindowAssigner` class.
+The window function can be one of `ReduceFunction`, `FoldFunction` or
`WindowFunction`. The former
+two can be executed more efficiently because Flink can incrementally
aggregate the elements for each
+window as they arrive. A `WindowFunction` gets an `Iterable` for all the
elements contained in a
+window and additional meta information about the window to which the
elements belong.
-<div class="codetabs" markdown="1">
+A windowed transformation with a `WindowFunction` cannot be executed as
efficiently as the other
+cases because Flink has to buffer all elements for a window internally
before invoking the function.
--- End diff --
maybe emphasize all like `*all*`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---