[
https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15435452#comment-15435452
]
ASF GitHub Bot commented on FLINK-3899:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2368#discussion_r76115561
--- Diff: docs/apis/streaming/windows.md ---
@@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit
of incremental window compu
the additional meta information that writing a `WindowFunction` provides.
This is an example that shows how incremental aggregation functions can be
combined with
-a `WindowFunction`.
+a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how
to extract the
+ending event-time of a window of sensor readings that contain a timestamp,
+and the `ReduceFunction`/`WindowFunctions` example shows how to do eager
window
+aggregation (only a single element is kept in the window).
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-DataStream<Tuple2<String, Long>> input = ...;
+DataStream<SensorReading> input = ...;
// for folding incremental computation
input
.keyBy(<key selector>)
.window(<window assigner>)
- .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
+ .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
+
+/* ... */
+
+private static class myFoldFunction implements
FoldFunction<SensorReading, Long> {
+
+ public Long fold(Long acc, SensorReading s) {
+ return Math.max(acc, s.timestamp());
+ }
+}
+
+private static class MyWindowFunction implements WindowFunction<Long,
Long, String, TimeWindow> {
+
+ public void apply(String key, TimeWindow window, Iterable<Long>
timestamps, Collector<Long> out) {
+ out.collect(timestamps.iterator().next());
--- End diff --
Thanks for the update.
The following Scala code does not show an error in my IntelliJ:
```
val readings: DataStream[SensorReading] = ???
val result: DataStream[(String, Long, Int)] = readings
.keyBy(_.sensorId)
.timeWindow(Time.minutes(1), Time.seconds(10))
.apply(
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => {
("", 0L, acc._3 + 1)
},
(k: String, w: TimeWindow, cnts: Iterable[(String, Long, Int)],
out: Collector[(String, Long, Int)]) => {
val cnt = cnts.iterator.next()
out.collect((k, w.getEnd, cnt._3))
}
)
```
Thanks, Fabian
> Document window processing with Reduce/FoldFunction + WindowFunction
> --------------------------------------------------------------------
>
> Key: FLINK-3899
> URL: https://issues.apache.org/jira/browse/FLINK-3899
> Project: Flink
> Issue Type: Improvement
> Components: Documentation, Streaming
> Affects Versions: 1.1.0
> Reporter: Fabian Hueske
>
> The streaming documentation does not describe how windows can be processed
> with FoldFunction or ReduceFunction and a subsequent WindowFunction. This
> combination allows for eager window aggregation (only a single element is
> kept in the window) and access of the Window object, e.g., to have access to
> the window's start and end time.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)