[
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15686708#comment-15686708
]
ASF GitHub Bot commented on FLINK-4937:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2792#discussion_r89086106
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
---
@@ -39,66 +45,69 @@ object AggregateUtil {
type JavaList[T] = java.util.List[T]
/**
- * Create Flink operator functions for aggregates. It includes 2
implementations of Flink
- * operator functions:
- * [[org.apache.flink.api.common.functions.MapFunction]] and
- * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's
partial aggregate,
- * should also implement
[[org.apache.flink.api.common.functions.CombineFunction]] as well).
- * The output of [[org.apache.flink.api.common.functions.MapFunction]]
contains the
- * intermediate aggregate values of all aggregate function, it's stored
in Row by the following
- * format:
- *
- * {{{
- * avg(x) aggOffsetInRow = 2 count(z)
aggOffsetInRow = 5
- * | |
- * v v
- * +---------+---------+--------+--------+--------+--------+
- * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 |
- * +---------+---------+--------+--------+--------+--------+
- * ^
- * |
- * sum(y) aggOffsetInRow = 4
- * }}}
- *
- */
- def createOperatorFunctionsForAggregates(
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int])
- : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
- val aggregateFunctionsAndFieldIndexes =
- transformToAggregateFunctions(namedAggregates.map(_.getKey),
inputType, groupings.length)
- // store the aggregate fields of each aggregate function, by the same
order of aggregates.
- val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
- val aggregates = aggregateFunctionsAndFieldIndexes._2
+ * Create prepare MapFunction for aggregates.
+ * The output of [[org.apache.flink.api.common.functions.MapFunction]]
contains the
+ * intermediate aggregate values of all aggregate function, it's stored
in Row by the following
+ * format:
+ *
+ * {{{
+ * avg(x) aggOffsetInRow = 2 count(z)
aggOffsetInRow = 5
+ * | |
+ * v v
+ * +---------+---------+--------+--------+--------+--------+
+ * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 |
+ * +---------+---------+--------+--------+--------+--------+
+ * ^
+ * |
+ * sum(y) aggOffsetInRow = 4
+ * }}}
+ *
+ */
+ private[flink] def createPrepareMapFunction(
+ aggregates: Array[Aggregate[_ <: Any]],
+ aggFieldIndexes: Array[Int],
+ groupings: Array[Int],
+ inputType:
+ RelDataType): MapFunction[Any, Row] = {
--- End diff --
remove this line break.
> Add incremental group window aggregation for streaming Table API
> ----------------------------------------------------------------
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Affects Versions: 1.2.0
> Reporter: Fabian Hueske
> Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an
> incremental fashion. This means that the window collects all records and
> performs the aggregation when the window is closed instead of eagerly
> updating a partial aggregate for every added record. Since records are
> buffered, non-incremental aggregation requires more storage space than
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API
> features [incremental
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
> using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)