dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add Aggregate operator to Table API URL: https://github.com/apache/flink/pull/8311#discussion_r281000946
########## File path: docs/dev/table/tableApi.md ########## @@ -1957,6 +2008,52 @@ class MyFlatMapFunction extends TableFunction[Row] { val func = new MyFlatMapFunction val table = input .flatMap(func('c)).as('a, 'b) +{% endhighlight %} + </td> + </tr> + + <tr> + <td> + <strong>Aggregate</strong><br> + <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span> + </td> + <td> + <p>Performs an aggregate operation with an aggregate function. You have to close the "aggregate" with a select statement. The output of aggregate will be flattened if the output type is a composite type.</p> +{% highlight scala %} +case class MyMinMaxAcc(var min: Int, var max: Int) + +class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] { + + def accumulate(acc: MyMinMaxAcc, value: Int): Unit = { + if (value < acc.min) { + acc.min = value + } + if (value > acc.max) { + acc.max = value + } + } + + def resetAccumulator(acc: MyMinMaxAcc): Unit = { Review comment: The Scala example has resetAccumulator, while the Java example has not. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services