[ 
https://issues.apache.org/jira/browse/FLINK-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838209#comment-15838209
 ] 

ASF GitHub Bot commented on FLINK-5529:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3191#discussion_r97832573
  
    --- Diff: docs/dev/windows.md ---
    @@ -622,133 +690,138 @@ input
     </div>
     </div>
     
    -## Dealing with Late Data
    +## Triggers
     
    -When working with event-time windowing it can happen that elements arrive 
late, i.e the
    -watermark that Flink uses to keep track of the progress of event-time is 
already past the
    -end timestamp of a window to which an element belongs. Please
    -see [event time](./event_time.html) and especially
    -[late elements](./event_time.html#late-elements) for a more thorough 
discussion of
    -how Flink deals with event time.
    +A `Trigger` determines when a window (as formed by the `WindowAssigner`) 
is ready to be
    +processed by the *window function*. Each `WindowAssigner` comes with a 
default `Trigger`. 
    +If the default trigger does not fit your needs, you can specify a custom 
trigger using `trigger(...)`.
     
    -You can specify how a windowed transformation should deal with late 
elements and how much lateness
    -is allowed. The parameter for this is called *allowed lateness*. This 
specifies by how much time
    -elements can be late. Elements that arrive within the allowed lateness are 
still put into windows
    -and are considered when computing window results. If elements arrive after 
the allowed lateness they
    -will be dropped. Flink will also make sure that any state held by the 
windowing operation is garbage
    -collected once the watermark passes the end of a window plus the allowed 
lateness.
    +The trigger interface provides five methods that react to different 
events: 
     
    -<span class="label label-info">Default</span> By default, the allowed 
lateness is set to
    -`0`. That is, elements that arrive behind the watermark will be dropped.
    +* The `onElement()` method is called for each element that is added to a 
window. 
    +* The `onEventTime()` method is called when  a registered event-time timer 
fires. 
    +* The `onProcessingTime()` method is called when a registered 
processing-time timer fires. 
    +* The `onMerge()` method is relevant for stateful triggers and merges the 
states of two triggers when their corresponding windows merge, *e.g.* when 
using session windows. 
    +* Finally the `clear()` method performs any action needed upon removal of 
the corresponding window. 
     
    -You can specify an allowed lateness like this:
    +Any of these methods can be used to register processing- or event-time 
timers for future actions. 
     
    -<div class="codetabs" markdown="1">
    -<div data-lang="java" markdown="1">
    -{% highlight java %}
    -DataStream<T> input = ...;
    +### Fire and Purge
     
    -input
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .allowedLateness(<time>)
    -    .<windowed transformation>(<window function>);
    -{% endhighlight %}
    -</div>
    +Once a trigger determines that a window is ready for processing, it fires. 
This is the signal for the window operator to emit the result of the current 
window. Given a window with a `WindowFunction` 
    +all elements are passed to the `WindowFunction` (possibly after passing 
them to an evictor). 
    +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly 
aggregated result. 
     
    -<div data-lang="scala" markdown="1">
    -{% highlight scala %}
    -val input: DataStream[T] = ...
    +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While 
`FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
    +By default, the pre-implemented triggers simply `FIRE` without purging the 
window state.
     
    -input
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .allowedLateness(<time>)
    -    .<windowed transformation>(<window function>)
    -{% endhighlight %}
    -</div>
    -</div>
    +<span class="label label-danger">Attention</span> When purging, only the 
contents of the window are cleared. The window itself is not removed and 
accepts new elements.
     
    -<span class="label label-info">Note</span> When using the `GlobalWindows` 
window assigner no
    -data is ever considered late because the end timestamp of the global 
window is `Long.MAX_VALUE`.
    +### Default Triggers of WindowAssigners
     
    -## Triggers
    +The default `Trigger` of a `WindowAssigner` is appropriate for many use 
cases. For example, all the event-time window assigners have an 
`EventTimeTrigger` as
    +default trigger. This trigger simply fires once the watermark passes the 
end of a window. 
     
    -A `Trigger` determines when a window (as assigned by the `WindowAssigner`) 
is ready for being
    -processed by the *window function*. The trigger observes how elements are 
added to windows
    -and can also keep track of the progress of processing time and event time. 
Once a trigger
    -determines that a window is ready for processing, it fires. This is the 
signal for the
    -window operation to take the elements that are currently in the window and 
pass them along to
    -the window function to produce output for the firing window.
    +<span class="label label-danger">Attention</span> The default trigger of 
the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently, 
you always have to define a custom trigger when using a `GlobalWindow`.
     
    -Each `WindowAssigner` (except `GlobalWindows`) comes with a default 
trigger that should be
    -appropriate for most use cases. For example, `TumblingEventTimeWindows` 
has an `EventTimeTrigger` as
    -default trigger. This trigger simply fires once the watermark passes the 
end of a window.
    +<span class="label label-danger">Attention</span> By specifying a trigger 
using `trigger()` you
    +are overwriting the default trigger of a `WindowAssigner`. For example, if 
you specify a
    +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get 
window firings based on the
    +progress of time but only by count. Right now, you have to write your own 
custom trigger if
    +you want to react based on both time and count.
     
    -You can specify the trigger to be used by calling `trigger()` with a given 
`Trigger`. The
    -whole specification of the windowed transformation would then look like 
this:
    +### Built-in and Custom Triggers
     
    -<div class="codetabs" markdown="1">
    -<div data-lang="java" markdown="1">
    -{% highlight java %}
    -DataStream<T> input = ...;
    +Flink comes with a few built-in triggers. 
     
    -input
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .trigger(<trigger>)
    -    .<windowed transformation>(<window function>);
    -{% endhighlight %}
    -</div>
    +* The (already mentioned) `EventTimeTrigger` fires based on the progress 
of event-time as measured by watermarks. 
    +* The `ProcessingTimeTrigger` fires based on processing time. 
    +* The `CountTrigger` which fires once the number of elements in a window 
exceeds the given limit.
    +* The `PurgingTrigger` takes as argument another trigger and transforms it 
into a purging one. 
     
    -<div data-lang="scala" markdown="1">
    -{% highlight scala %}
    -val input: DataStream[T] = ...
    +If you need to implement a custom trigger, you should check out the 
abstract {% gh_link 
/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 "Trigger" %} class. Please note that the API is still evolving and might 
change in future versions of Flink.
     
    -input
    -    .keyBy(<key selector>)
    -    .window(<window assigner>)
    -    .trigger(<trigger>)
    -    .<windowed transformation>(<window function>)
    -{% endhighlight %}
    -</div>
    -</div>
     
    -Flink comes with a few triggers out-of-box: there is the already mentioned 
`EventTimeTrigger` that
    -fires based on the progress of event-time as measured by the watermark, 
the `ProcessingTimeTrigger`
    -does the same but based on processing time and the `CountTrigger` fires 
once the number of elements
    -in a window exceeds the given limit.
    +## Evictors
     
    -<span class="label label-danger">Attention</span> By specifying a trigger 
using `trigger()` you
    -are overwriting the default trigger of a `WindowAssigner`. For example, if 
you specify a
    -`CountTrigger` for `TumblingEventTimeWindows` you will no longer get 
window firings based on the
    -progress of time but only by count. Right now, you have to write your own 
custom trigger if
    -you want to react based on both time and count.
    +Flinkā€™s windowing model allows specifying an optional `Evictor` in 
addition to the `WindowAssigner` and the `Trigger`. 
    +This can be done using the `evictor(...)` method (shown in the beginning 
of this document). The evictor has the ability 
    +to remove elements from a window *after* the trigger fires and *before 
and/or after* the window function is applied.
    +To do so, the `Evictor` interface has two methods: 
    +
    +    /**
    +    * Optionally evicts elements. Called before windowing function.
    +    *
    +    * @param elements The elements currently in the pane.
    +    * @param size The current number of elements in the pane.
    +    * @param window The {@link Window}
    +    * @param evictorContext The context for the Evictor
    +     */
    +   void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W 
window, EvictorContext evictorContext);
    +
    +   /**
    +    * Optionally evicts elements. Called after windowing function.
    +    *
    +    * @param elements The elements currently in the pane.
    +    * @param size The current number of elements in the pane.
    +    * @param window The {@link Window}
    +    * @param evictorContext The context for the Evictor
    +    */
    +   void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W 
window, EvictorContext evictorContext);
    +
    +The `evictBefore()` contains the eviction logic to be applied before the 
window function, while the `evictAfter()` 
    +contains the one to be applied after the window function. Elements evicted 
before the application of the window 
    +function will not be processed by it.
    +
    +Flink comes with three pre-implemented evictors. These are:
    +
    +* `CountEvictor`: keeps up to a user-specified number of elements from the 
window and discards the remaining ones from 
    +the beginning of the window buffer.
    +* `DeltaEvictor`: takes a `DeltaFunction` and a `threshold`, computes the 
delta between the last element in the 
    +window buffer and each of the remaining ones, and removes the ones with a 
delta greater or equal to the threshold.
    +* `TimeEvictor`: takes as argument an `interval` in milliseconds and for a 
given window, it finds the maximum 
    +timestamp `max_ts` among its elements and removes all the elements with 
timestamps smaller than `max_ts - interval`.
     
    -The internal `Trigger` API is still considered experimental but you can 
check out the code
    -if you want to write your own custom trigger:
    -{% gh_link 
/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 "Trigger.java" %}.
    +<span class="label label-info">Default</span> By default, all the 
pre-implemented evictors apply their logic before the 
    +window function.
     
    -## Non-keyed Windowing
    +<span class="label label-danger">Attention</span> Specifying an evictor 
prevents any pre-aggregation, as all the 
    +elements of a window have to be passed to the evictor before applying the 
computation.
     
    -You can also leave out the `keyBy()` when specifying a windowed 
transformation. This means, however,
    -that Flink cannot process windows for different keys in parallel, 
essentially turning the
    -transformation into a non-parallel operation.
    +<span class="label label-danger">Attention</span> Flink provides no 
guarantees about the order of the elements within
    +a window. This implies that although an evictor may remove elements from 
the beginning of the window, these are not 
    +necessarily the ones that arrive first or last.
     
    -<span class="label label-danger">Warning</span> As mentioned in the 
introduction, non-keyed
    -windows have the disadvantage that work cannot be distributed in the 
cluster because
    -windows cannot be computed independently per key. This can have severe 
performance implications.
     
    +## Allowed Lateness
     
    -The basic structure of a non-keyed windowed transformation is as follows:
    +When working with *event-time* windowing it can happen that elements 
arrive late, *i.e.* the watermark that Flink uses to 
    +keep track of the progress of event-time is already past the end timestamp 
of a window to which an element belongs. See 
    +[event time](./event_time.html) and especially [late 
elements](./event_time.html#late-elements) for a more thorough 
    +discussion of how Flink deals with event time.
    +
    +By default, late elements are dropped if their associated window was 
already evaluated. However, 
    +Flink allows to specify a maximum *allowed lateness* for window operators. 
Allowed lateness 
    +specifies by how much time elements can be late before they are dropped. 
Elements that arrive 
    +within the allowed lateness of a window are still added to the window and 
trigger an immediate evaluation of the window which might emit elements.
    --- End diff --
    
    "the trigger an immediate evaluation" depends on the `Trigger`.


> Improve / extends windowing documentation
> -----------------------------------------
>
>                 Key: FLINK-5529
>                 URL: https://issues.apache.org/jira/browse/FLINK-5529
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Documentation
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0, 1.3.0
>
>
> Suggested Outline:
> {code}
> Windows
> (0) Outline: The anatomy of a window operation
>   stream
>      [.keyBy(...)]         <-  keyed versus non-keyed windows
>       .window(...)         <-  required: "assigner"
>      [.trigger(...)]       <-  optional: "trigger" (else default trigger)
>      [.evictor(...)]       <-  optional: "evictor" (else no evictor)
>      [.allowedLateness()]  <-  optional, else zero
>       .reduce/fold/apply() <-  required: "function"
> (1) Types of windows
>   - tumble
>   - slide
>   - session
>   - global
> (2) Pre-defined windows
>    timeWindow() (tumble, slide)
>    countWindow() (tumble, slide)
>      - mention that count windows are inherently
>        resource leaky unless limited key space
> (3) Window Functions
>   - apply: most basic, iterates over elements in window
>   
>   - aggregating: reduce and fold, can be used with "apply()" which will get 
> one element
>   
>   - forward reference to state size section
> (4) Advanced Windows
>   - assigner
>     - simple
>     - merging
>   - trigger
>     - registering timers (processing time, event time)
>     - state in triggers
>   - life cycle of a window
>     - create
>     - state
>     - cleanup
>       - when is window contents purged
>       - when is state dropped
>       - when is metadata (like merging set) dropped
> (5) Late data
>   - picture
>   - fire vs fire_and_purge: late accumulates vs late resurrects (cf 
> discarding mode)
>   
> (6) Evictors
>   - TDB
>   
> (7) State size: How large will the state be?
> Basic rule: Each element has one copy per window it is assigned to
>   --> num windows * num elements in window
>   --> example: tumbline is one copy, sliding(n,m) is n/m copies
>   --> per key
> Pre-aggregation:
>   - if reduce or fold is set -> one element per window (rather than num 
> elements in window)
>   - evictor voids pre-aggregation from the perspective of state
> Special rules:
>   - fold cannot pre-aggregate on session windows (and other merging windows)
> (8) Non-keyed windows
>   - all elements through the same windows
>   - currently not parallel
>   - possible parallel in the future when having pre-aggregation functions
>   - inherently (by definition) produce a result stream with parallelism one
>   - state similar to one key of keyed windows
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to