[
https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15434409#comment-15434409
]
ASF GitHub Bot commented on FLINK-3899:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2368#discussion_r76008397
--- Diff: docs/apis/streaming/windows.md ---
@@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit
of incremental window compu
the additional meta information that writing a `WindowFunction` provides.
This is an example that shows how incremental aggregation functions can be
combined with
-a `WindowFunction`.
+a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how
to extract the
+ending event-time of a window of sensor readings that contain a timestamp,
+and the `ReduceFunction`/`WindowFunctions` example shows how to do eager
window
+aggregation (only a single element is kept in the window).
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
-DataStream<Tuple2<String, Long>> input = ...;
+DataStream<SensorReading> input = ...;
// for folding incremental computation
input
.keyBy(<key selector>)
.window(<window assigner>)
- .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
+ .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
+
+/* ... */
+
+private static class myFoldFunction implements
FoldFunction<SensorReading, Long> {
+
+ public Long fold(Long acc, SensorReading s) {
+ return Math.max(acc, s.timestamp());
+ }
+}
+
+private static class MyWindowFunction implements WindowFunction<Long,
Long, String, TimeWindow> {
+
+ public void apply(String key, TimeWindow window, Iterable<Long>
timestamps, Collector<Long> out) {
+ out.collect(timestamps.iterator().next());
--- End diff --
The example looks good, thanks! Two minor suggestions: 1) I think we can
omit setting key and time in the `FoldFunction`, 2) the `WindowFunction` could
fetch the count in a separate variable. This would make the `out.collect` line
a bit shorter.
Regarding the type restriction: You discovered a bug that we would like to
fix but can't until Flink 2.0.0 because we promoted the interface to be
`@Public` and the API is stable in Flink 1.0 releases. IMO it makes sense to
point out this accidental restriction in the documentation.
> Document window processing with Reduce/FoldFunction + WindowFunction
> --------------------------------------------------------------------
>
> Key: FLINK-3899
> URL: https://issues.apache.org/jira/browse/FLINK-3899
> Project: Flink
> Issue Type: Improvement
> Components: Documentation, Streaming
> Affects Versions: 1.1.0
> Reporter: Fabian Hueske
>
> The streaming documentation does not describe how windows can be processed
> with FoldFunction or ReduceFunction and a subsequent WindowFunction. This
> combination allows for eager window aggregation (only a single element is
> kept in the window) and access of the Window object, e.g., to have access to
> the window's start and end time.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)