Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97823809 --- Diff: docs/dev/windows.md --- @@ -204,72 +221,120 @@ input {% highlight scala %} val input: DataStream[T] = ... -// tumbling event-time windows -input - .keyBy(<key selector>) - .window(TumblingEventTimeWindows.of(Time.seconds(5))) - .<windowed transformation>(<window function>) - // sliding event-time windows input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) -// event-time session windows +// sliding processing-time windows input .keyBy(<key selector>) - .window(EventTimeSessionWindows.withGap(Time.minutes(10))) + .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) -// tumbling processing-time windows +// sliding processing-time windows offset by -8 hours input .keyBy(<key selector>) - .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) + .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>) +{% endhighlight %} +</div> +</div> -// sliding processing-time windows +Time intervals can be specified by using one of `Time.milliseconds(x)`, `Time.seconds(x)`, +`Time.minutes(x)`, and so on. + +As shown in the last example, sliding window assigners also take an optional `offset` parameter +that can be used to change the alignment of windows. For example, without offsets hourly windows +sliding by 30 minutes are aligned with epoch, that is you will get windows such as +`1:00:00.000 - 1:59:59.999`, `1:30:00.000 - 2:29:59.999` and so on. If you want to change that +you can give an offset. With an offset of 15 minutes you would, for example, get +`1:15:00.000 - 2:14:59.999`, `1:45:00.000 - 2:44:59.999` etc. +An important use case for offsets is to adjust windows to timezones other than UTC-0. +For example, in China you would have to specify an offset of `Time.hours(-8)`. + +### Session Windows + +The *session windows* assigner groups elements by sessions of activity. Session windows do not overlap and +do not have a fixed start and end time in contrast to *tumbling windows* and *sliding windows*. Instead a +session window assigner closes a window when it does not receive elements for a certain period +of time, i.e., when a gap of inactivity occurred. A session window assigner is configured with the *session gap* which +defines how long the assigner waits until it closes the current session window and assigns following elements +to a new session window. + +<img src="{{ site.baseurl }}/fig/session-windows.svg" class="center" style="width: 80%;" /> + +The following code snippets show how to use session windows. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<T> input = ...; + +// event-time session windows input .keyBy(<key selector>) - .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) - .<windowed transformation>(<window function>) + .window(EventTimeSessionWindows.withGap(Time.minutes(10))) + .<windowed transformation>(<window function>); // processing-time session windows input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) + .<windowed transformation>(<window function>); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val input: DataStream[T] = ... + +// event-time session windows +input + .keyBy(<key selector>) + .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>) -// global windows +// processing-time session windows input .keyBy(<key selector>) - .window(GlobalWindows.create()) + .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) + .<windowed transformation>(<window function>) {% endhighlight %} </div> </div> -Note, how we can specify a time interval by using one of `Time.milliseconds(x)`, `Time.seconds(x)`, +Time intervals can be specified by using one of `Time.milliseconds(x)`, `Time.seconds(x)`, `Time.minutes(x)`, and so on. -The time-based window assigners also take an optional `offset` parameter that can be used to -change the alignment of windows. For example, without offsets hourly windows are aligned -with epoch, that is you will get windows such as `1:00 - 1:59`, `2:00 - 2:59` and so on. If you -want to change that you can give an offset. With an offset of 15 minutes you would, for example, -get `1:15 - 2:14`, `2:15 - 3:14` etc. Another important use case for offsets is when you -want to have daily windows and live in a timezone other than UTC-0. For example, in China -you would have to specify an offset of `Time.hours(-8)`. +<span class="label label-danger">Attention</span> Since session windows do not have a fixed start and end, +they are evaluated differently than tumbling and sliding windows. Internally, a session window operator +creates a new window for each arriving record and merges windows together if their are closer to each other +than the defined gap. +In order to be mergable, a session window operator requires a mergable [Trigger](#triggers) and a mergable --- End diff -- I think it should be `mergeable` instead of `mergable`, appears several times in the text.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---