dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add 
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000946
 
 

 ##########
 File path: docs/dev/table/tableApi.md
 ##########
 @@ -1957,6 +2008,52 @@ class MyFlatMapFunction extends TableFunction[Row] {
 val func = new MyFlatMapFunction
 val table = input
   .flatMap(func('c)).as('a, 'b)
+{% endhighlight %}
+      </td>
+    </tr>
+    
+    <tr>
+      <td>
+        <strong>Aggregate</strong><br>
+        <span class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span>
+      </td>
+      <td>
+        <p>Performs an aggregate operation with an aggregate function. You 
have to close the "aggregate" with a select statement. The output of aggregate 
will be flattened if the output type is a composite type.</p>
+{% highlight scala %}
+case class MyMinMaxAcc(var min: Int, var max: Int)
+
+class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] {
+
+  def accumulate(acc: MyMinMaxAcc, value: Int): Unit = {
+    if (value < acc.min) {
+      acc.min = value
+    }
+    if (value > acc.max) {
+      acc.max = value
+    }
+  }
+
+  def resetAccumulator(acc: MyMinMaxAcc): Unit = {
 
 Review comment:
   The Scala example has resetAccumulator, while the Java example has not. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to