This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 96fd4c5  [FLINK-11817][docs] Replace fold example in DataStream API 
Tutorial
96fd4c5 is described below

commit 96fd4c5b6c4c781eb5c91c7d9d66029b9eb64be8
Author: leesf <490081...@qq.com>
AuthorDate: Wed Mar 20 18:40:34 2019 +0800

    [FLINK-11817][docs] Replace fold example in DataStream API Tutorial
---
 docs/tutorials/datastream_api.md | 62 +++++++++++++++++++++++++++++-----------
 1 file changed, 46 insertions(+), 16 deletions(-)

diff --git a/docs/tutorials/datastream_api.md b/docs/tutorials/datastream_api.md
index b0964ee..7f69abe 100644
--- a/docs/tutorials/datastream_api.md
+++ b/docs/tutorials/datastream_api.md
@@ -178,18 +178,33 @@ that we want to aggregate the sum of edited bytes for 
every five seconds:
 {% highlight java %}
 DataStream<Tuple2<String, Long>> result = keyedEdits
     .timeWindow(Time.seconds(5))
-    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, 
Tuple2<String, Long>>() {
+    .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Long>, 
Tuple2<String, Long>>() {
         @Override
-        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, 
WikipediaEditEvent event) {
-            acc.f0 = event.getUser();
-            acc.f1 += event.getByteDiff();
-            return acc;
+        public Tuple2<String, Long> createAccumulator() {
+            return new Tuple2<>("", 0L);
+        }
+
+        @Override
+        public Tuple2<String, Long> add(WikipediaEditEvent value, 
Tuple2<String, Long> accumulator) {
+            accumulator.f0 = value.getUser();
+            accumulator.f1 += value.getByteDiff();
+            return accumulator;
+        }
+
+        @Override
+        public Tuple2<String, Long> getResult(Tuple2<String, Long> 
accumulator) {
+            return accumulator;
+        }
+
+        @Override
+        public Tuple2<String, Long> merge(Tuple2<String, Long> a, 
Tuple2<String, Long> b) {
+            return new Tuple2<>(a.f0, a.f1 + b.f1);
         }
     });
 {% endhighlight %}
 
 The first call, `.timeWindow()`, specifies that we want to have tumbling 
(non-overlapping) windows
-of five seconds. The second call specifies a *Fold transformation* on each 
window slice for
+of five seconds. The second call specifies a *Aggregate transformation* on 
each window slice for
 each unique key. In our case we start from an initial value of `("", 0L)` and 
add to it the byte
 difference of every edit in that time window for a user. The resulting Stream 
now contains
 a `Tuple2<String, Long>` for every user which gets emitted every five seconds.
@@ -212,7 +227,7 @@ The complete code so far is this:
 {% highlight java %}
 package wikiedits;
 
-import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -240,13 +255,28 @@ public class WikipediaAnalysis {
 
     DataStream<Tuple2<String, Long>> result = keyedEdits
       .timeWindow(Time.seconds(5))
-      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, 
Tuple2<String, Long>>() {
+      .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, 
Long>, Tuple2<String, Long>>() {
         @Override
-        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, 
WikipediaEditEvent event) {
-          acc.f0 = event.getUser();
-          acc.f1 += event.getByteDiff();
-          return acc;
-        }
+       public Tuple2<String, Long> createAccumulator() {
+         return new Tuple2<>("", 0L);
+       }
+
+       @Override
+       public Tuple2<String, Long> add(WikipediaEditEvent value, 
Tuple2<String, Long> accumulator) {
+         accumulator.f0 = value.getUser();
+         accumulator.f1 += value.getByteDiff();
+          return accumulator;
+       }
+
+       @Override
+       public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) 
{
+         return accumulator;
+       }
+
+       @Override
+       public Tuple2<String, Long> merge(Tuple2<String, Long> a, 
Tuple2<String, Long> b) {
+         return new Tuple2<>(a.f0, a.f1 + b.f1);
+       }
       });
 
     result.print();
@@ -368,9 +398,9 @@ The output of that command should look similar to this, if 
everything went accor
 03/08/2016 15:09:27 Job execution switched to status RUNNING.
 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
-03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), 
FoldingStateDescriptor{name=window-contents, defaultValue=(,0), 
serializer=null}, ProcessingTimeTrigger(), 
WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) 
switched to SCHEDULED
-03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), 
FoldingStateDescriptor{name=window-contents, defaultValue=(,0), 
serializer=null}, ProcessingTimeTrigger(), 
WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) 
switched to DEPLOYING
-03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), 
FoldingStateDescriptor{name=window-contents, defaultValue=(,0), 
serializer=null}, ProcessingTimeTrigger(), 
WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) 
switched to RUNNING
+03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), 
ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: 
Print to Std. Out (1/1) switched from CREATED to SCHEDULED
+03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), 
ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: 
Print to Std. Out (1/1) switched from SCHEDULED to DEPLOYING
+03/08/2016 15:09:27 Window(TumblingProcessingTimeWindows(5000), 
ProcessingTimeTrigger, AggregateFunction$3, PassThroughWindowFunction) -> Sink: 
Print to Std. Out (1/1) switched from DEPLOYING to RUNNING
 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
 {% endhighlight %}
 

Reply via email to