[
https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433883#comment-15433883
]
ASF GitHub Bot commented on FLINK-3899:
---------------------------------------
Github user danielblazevski commented on a diff in the pull request:
https://github.com/apache/flink/pull/2368#discussion_r75973261
--- Diff: docs/apis/streaming/windows.md ---
@@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit
of incremental window compu
the additional meta information that writing a `WindowFunction` provides.
This is an example that shows how incremental aggregation functions can be
combined with
-a `WindowFunction`.
+a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how
to extract the
+ending event-time of a window of sensor readings that contain a timestamp,
+and the `ReduceFunction`/`WindowFunctions` example shows how to do eager
window
+aggregation (only a single element is kept in the window).
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-DataStream<Tuple2<String, Long>> input = ...;
+DataStream<SensorReading> input = ...;
// for folding incremental computation
input
.keyBy(<key selector>)
.window(<window assigner>)
- .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
+ .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
+
+/* ... */
+
+private static class myFoldFunction implements
FoldFunction<SensorReading, Long> {
+
+ public Long fold(Long acc, SensorReading s) {
+ return Math.max(acc, s.timestamp());
+ }
+}
+
+private static class MyWindowFunction implements WindowFunction<Long,
Long, String, TimeWindow> {
+
+ public void apply(String key, TimeWindow window, Iterable<Long>
timestamps, Collector<Long> out) {
+ out.collect(timestamps.iterator().next());
--- End diff --
@fhueske does this look OK for this case? If so, I'll finish things up by
adding the Reduce example and add both corresponding Scala examples
```java
// for folding incremental computation
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new Tuple3<String, Long, Integer>("",0L, 0), new
MyFoldFunction(), new MyWindowFunction())
/* ... */
private static class MyFoldFunction implements FoldFunction<SensorReading,
Tuple3<String, Long, Integer> > {
public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer>
acc, SensorReading s) {
Integer cur = acc.getField(2);
return new Tuple3<String, Long, Integer> (acc.getField(0),
acc.getField(1), cur + 1);
}
}
private static class MyWindowFunction implements
WindowFunction<Tuple3<String, Long, Integer>,
Tuple3<String, Long, Integer>, String, TimeWindow> {
public void apply(String s,
TimeWindow window,
Iterable<Tuple3<String, Long, Integer>> counts,
Collector<Tuple3<String, Long, Integer>> out) {
out.collect(new Tuple3<String, Long, Integer>(s, window.getEnd(),
counts.iterator().next().getField(2));
}
}
```
I found that I had to have the `FoldFunction` include `Tuple3` in its
signature since the `WindowFunction` must be of the form `WindowFunction<ACC,
ACC, K, W>` according to
[here](https://github.com/apache/flink/blob/b8299bf92d8e3dbe140dd89602699394019b783d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java)
> Document window processing with Reduce/FoldFunction + WindowFunction
> --------------------------------------------------------------------
>
> Key: FLINK-3899
> URL: https://issues.apache.org/jira/browse/FLINK-3899
> Project: Flink
> Issue Type: Improvement
> Components: Documentation, Streaming
> Affects Versions: 1.1.0
> Reporter: Fabian Hueske
>
> The streaming documentation does not describe how windows can be processed
> with FoldFunction or ReduceFunction and a subsequent WindowFunction. This
> combination allows for eager window aggregation (only a single element is
> kept in the window) and access of the Window object, e.g., to have access to
> the window's start and end time.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)