[ https://issues.apache.org/jira/browse/FLINK-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838209#comment-15838209 ]
ASF GitHub Bot commented on FLINK-5529: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97832573 --- Diff: docs/dev/windows.md --- @@ -622,133 +690,138 @@ input </div> </div> -## Dealing with Late Data +## Triggers -When working with event-time windowing it can happen that elements arrive late, i.e the -watermark that Flink uses to keep track of the progress of event-time is already past the -end timestamp of a window to which an element belongs. Please -see [event time](./event_time.html) and especially -[late elements](./event_time.html#late-elements) for a more thorough discussion of -how Flink deals with event time. +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be +processed by the *window function*. Each `WindowAssigner` comes with a default `Trigger`. +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`. -You can specify how a windowed transformation should deal with late elements and how much lateness -is allowed. The parameter for this is called *allowed lateness*. This specifies by how much time -elements can be late. Elements that arrive within the allowed lateness are still put into windows -and are considered when computing window results. If elements arrive after the allowed lateness they -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage -collected once the watermark passes the end of a window plus the allowed lateness. +The trigger interface provides five methods that react to different events: -<span class="label label-info">Default</span> By default, the allowed lateness is set to -`0`. That is, elements that arrive behind the watermark will be dropped. +* The `onElement()` method is called for each element that is added to a window. +* The `onEventTime()` method is called when a registered event-time timer fires. +* The `onProcessingTime()` method is called when a registered processing-time timer fires. +* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, *e.g.* when using session windows. +* Finally the `clear()` method performs any action needed upon removal of the corresponding window. -You can specify an allowed lateness like this: +Any of these methods can be used to register processing- or event-time timers for future actions. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -DataStream<T> input = ...; +### Fire and Purge -input - .keyBy(<key selector>) - .window(<window assigner>) - .allowedLateness(<time>) - .<windowed transformation>(<window function>); -{% endhighlight %} -</div> +Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction` +all elements are passed to the `WindowFunction` (possibly after passing them to an evictor). +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result. -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream[T] = ... +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content. +By default, the pre-implemented triggers simply `FIRE` without purging the window state. -input - .keyBy(<key selector>) - .window(<window assigner>) - .allowedLateness(<time>) - .<windowed transformation>(<window function>) -{% endhighlight %} -</div> -</div> +<span class="label label-danger">Attention</span> When purging, only the contents of the window are cleared. The window itself is not removed and accepts new elements. -<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no -data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`. +### Default Triggers of WindowAssigners -## Triggers +The default `Trigger` of a `WindowAssigner` is appropriate for many use cases. For example, all the event-time window assigners have an `EventTimeTrigger` as +default trigger. This trigger simply fires once the watermark passes the end of a window. -A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is ready for being -processed by the *window function*. The trigger observes how elements are added to windows -and can also keep track of the progress of processing time and event time. Once a trigger -determines that a window is ready for processing, it fires. This is the signal for the -window operation to take the elements that are currently in the window and pass them along to -the window function to produce output for the firing window. +<span class="label label-danger">Attention</span> The default trigger of the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently, you always have to define a custom trigger when using a `GlobalWindow`. -Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger that should be -appropriate for most use cases. For example, `TumblingEventTimeWindows` has an `EventTimeTrigger` as -default trigger. This trigger simply fires once the watermark passes the end of a window. +<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you +are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the +progress of time but only by count. Right now, you have to write your own custom trigger if +you want to react based on both time and count. -You can specify the trigger to be used by calling `trigger()` with a given `Trigger`. The -whole specification of the windowed transformation would then look like this: +### Built-in and Custom Triggers -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -DataStream<T> input = ...; +Flink comes with a few built-in triggers. -input - .keyBy(<key selector>) - .window(<window assigner>) - .trigger(<trigger>) - .<windowed transformation>(<window function>); -{% endhighlight %} -</div> +* The (already mentioned) `EventTimeTrigger` fires based on the progress of event-time as measured by watermarks. +* The `ProcessingTimeTrigger` fires based on processing time. +* The `CountTrigger` which fires once the number of elements in a window exceeds the given limit. +* The `PurgingTrigger` takes as argument another trigger and transforms it into a purging one. -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream[T] = ... +If you need to implement a custom trigger, you should check out the abstract {% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger" %} class. Please note that the API is still evolving and might change in future versions of Flink. -input - .keyBy(<key selector>) - .window(<window assigner>) - .trigger(<trigger>) - .<windowed transformation>(<window function>) -{% endhighlight %} -</div> -</div> -Flink comes with a few triggers out-of-box: there is the already mentioned `EventTimeTrigger` that -fires based on the progress of event-time as measured by the watermark, the `ProcessingTimeTrigger` -does the same but based on processing time and the `CountTrigger` fires once the number of elements -in a window exceeds the given limit. +## Evictors -<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you -are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a -`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the -progress of time but only by count. Right now, you have to write your own custom trigger if -you want to react based on both time and count. +Flinkās windowing model allows specifying an optional `Evictor` in addition to the `WindowAssigner` and the `Trigger`. +This can be done using the `evictor(...)` method (shown in the beginning of this document). The evictor has the ability +to remove elements from a window *after* the trigger fires and *before and/or after* the window function is applied. +To do so, the `Evictor` interface has two methods: + + /** + * Optionally evicts elements. Called before windowing function. + * + * @param elements The elements currently in the pane. + * @param size The current number of elements in the pane. + * @param window The {@link Window} + * @param evictorContext The context for the Evictor + */ + void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); + + /** + * Optionally evicts elements. Called after windowing function. + * + * @param elements The elements currently in the pane. + * @param size The current number of elements in the pane. + * @param window The {@link Window} + * @param evictorContext The context for the Evictor + */ + void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); + +The `evictBefore()` contains the eviction logic to be applied before the window function, while the `evictAfter()` +contains the one to be applied after the window function. Elements evicted before the application of the window +function will not be processed by it. + +Flink comes with three pre-implemented evictors. These are: + +* `CountEvictor`: keeps up to a user-specified number of elements from the window and discards the remaining ones from +the beginning of the window buffer. +* `DeltaEvictor`: takes a `DeltaFunction` and a `threshold`, computes the delta between the last element in the +window buffer and each of the remaining ones, and removes the ones with a delta greater or equal to the threshold. +* `TimeEvictor`: takes as argument an `interval` in milliseconds and for a given window, it finds the maximum +timestamp `max_ts` among its elements and removes all the elements with timestamps smaller than `max_ts - interval`. -The internal `Trigger` API is still considered experimental but you can check out the code -if you want to write your own custom trigger: -{% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger.java" %}. +<span class="label label-info">Default</span> By default, all the pre-implemented evictors apply their logic before the +window function. -## Non-keyed Windowing +<span class="label label-danger">Attention</span> Specifying an evictor prevents any pre-aggregation, as all the +elements of a window have to be passed to the evictor before applying the computation. -You can also leave out the `keyBy()` when specifying a windowed transformation. This means, however, -that Flink cannot process windows for different keys in parallel, essentially turning the -transformation into a non-parallel operation. +<span class="label label-danger">Attention</span> Flink provides no guarantees about the order of the elements within +a window. This implies that although an evictor may remove elements from the beginning of the window, these are not +necessarily the ones that arrive first or last. -<span class="label label-danger">Warning</span> As mentioned in the introduction, non-keyed -windows have the disadvantage that work cannot be distributed in the cluster because -windows cannot be computed independently per key. This can have severe performance implications. +## Allowed Lateness -The basic structure of a non-keyed windowed transformation is as follows: +When working with *event-time* windowing it can happen that elements arrive late, *i.e.* the watermark that Flink uses to +keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See +[event time](./event_time.html) and especially [late elements](./event_time.html#late-elements) for a more thorough +discussion of how Flink deals with event time. + +By default, late elements are dropped if their associated window was already evaluated. However, +Flink allows to specify a maximum *allowed lateness* for window operators. Allowed lateness +specifies by how much time elements can be late before they are dropped. Elements that arrive +within the allowed lateness of a window are still added to the window and trigger an immediate evaluation of the window which might emit elements. --- End diff -- "the trigger an immediate evaluation" depends on the `Trigger`. > Improve / extends windowing documentation > ----------------------------------------- > > Key: FLINK-5529 > URL: https://issues.apache.org/jira/browse/FLINK-5529 > Project: Flink > Issue Type: Sub-task > Components: Documentation > Reporter: Stephan Ewen > Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.3.0 > > > Suggested Outline: > {code} > Windows > (0) Outline: The anatomy of a window operation > stream > [.keyBy(...)] <- keyed versus non-keyed windows > .window(...) <- required: "assigner" > [.trigger(...)] <- optional: "trigger" (else default trigger) > [.evictor(...)] <- optional: "evictor" (else no evictor) > [.allowedLateness()] <- optional, else zero > .reduce/fold/apply() <- required: "function" > (1) Types of windows > - tumble > - slide > - session > - global > (2) Pre-defined windows > timeWindow() (tumble, slide) > countWindow() (tumble, slide) > - mention that count windows are inherently > resource leaky unless limited key space > (3) Window Functions > - apply: most basic, iterates over elements in window > > - aggregating: reduce and fold, can be used with "apply()" which will get > one element > > - forward reference to state size section > (4) Advanced Windows > - assigner > - simple > - merging > - trigger > - registering timers (processing time, event time) > - state in triggers > - life cycle of a window > - create > - state > - cleanup > - when is window contents purged > - when is state dropped > - when is metadata (like merging set) dropped > (5) Late data > - picture > - fire vs fire_and_purge: late accumulates vs late resurrects (cf > discarding mode) > > (6) Evictors > - TDB > > (7) State size: How large will the state be? > Basic rule: Each element has one copy per window it is assigned to > --> num windows * num elements in window > --> example: tumbline is one copy, sliding(n,m) is n/m copies > --> per key > Pre-aggregation: > - if reduce or fold is set -> one element per window (rather than num > elements in window) > - evictor voids pre-aggregation from the perspective of state > Special rules: > - fold cannot pre-aggregate on session windows (and other merging windows) > (8) Non-keyed windows > - all elements through the same windows > - currently not parallel > - possible parallel in the future when having pre-aggregation functions > - inherently (by definition) produce a result stream with parallelism one > - state similar to one key of keyed windows > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)