This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 96fd4c5 [FLINK-11817][docs] Replace fold example in DataStream API Tutorial 96fd4c5 is described below commit 96fd4c5b6c4c781eb5c91c7d9d66029b9eb64be8 Author: leesf <490081...@qq.com> AuthorDate: Wed Mar 20 18:40:34 2019 +0800 [FLINK-11817][docs] Replace fold example in DataStream API Tutorial --- docs/tutorials/datastream_api.md | 62 +++++++++++++++++++++++++++++----------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/docs/tutorials/datastream_api.md b/docs/tutorials/datastream_api.md index b0964ee..7f69abe 100644 --- a/docs/tutorials/datastream_api.md +++ b/docs/tutorials/datastream_api.md @@ -178,18 +178,33 @@ that we want to aggregate the sum of edited bytes for every five seconds: {% highlight java %} DataStream<Tuple2<String, Long>> result = keyedEdits .timeWindow(Time.seconds(5)) - .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { + .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, Tuple2<String, Long>>() { @Override - public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) { - acc.f0 = event.getUser(); - acc.f1 += event.getByteDiff(); - return acc; + public Tuple2<String, Long> createAccumulator() { + return new Tuple2<>("", 0L); + } + + @Override + public Tuple2<String, Long> add(WikipediaEditEvent value, Tuple2<String, Long> accumulator) { + accumulator.f0 = value.getUser(); + accumulator.f1 += value.getByteDiff(); + return accumulator; + } + + @Override + public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) { + return accumulator; + } + + @Override + public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) { + return new Tuple2<>(a.f0, a.f1 + b.f1); } }); {% endhighlight %} The first call, `.timeWindow()`, specifies that we want to have tumbling (non-overlapping) windows -of five seconds. The second call specifies a *Fold transformation* on each window slice for +of five seconds. The second call specifies a *Aggregate transformation* on each window slice for each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte difference of every edit in that time window for a user. The resulting Stream now contains a `Tuple2<String, Long>` for every user which gets emitted every five seconds. @@ -212,7 +227,7 @@ The complete code so far is this: {% highlight java %} package wikiedits; -import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; @@ -240,13 +255,28 @@ public class WikipediaAnalysis { DataStream<Tuple2<String, Long>> result = keyedEdits .timeWindow(Time.seconds(5)) - .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { + .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, Tuple2<String, Long>>() { @Override - public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) { - acc.f0 = event.getUser(); - acc.f1 += event.getByteDiff(); - return acc; - } + public Tuple2<String, Long> createAccumulator() { + return new Tuple2<>("", 0L); + } + + @Override + public Tuple2<String, Long> add(WikipediaEditEvent value, Tuple2<String, Long> accumulator) { + accumulator.f0 = value.getUser(); + accumulator.f1 += value.getByteDiff(); + return accumulator; + } + + @Override + public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) { + return accumulator; + } + + @Override + public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) { + return new Tuple2<>(a.f0, a.f1 + b.f1); + } }); result.print(); @@ -368,9 +398,9 @@ The output of that command should look similar to this, if everything went accor 03/08/2016 15:09:27 Job execution switched to status RUNNING. 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING -03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED -03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING -03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING +03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from CREATED to SCHEDULED +03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from SCHEDULED to DEPLOYING +03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1) switched from DEPLOYING to RUNNING 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING {% endhighlight %}