[
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15686711#comment-15686711
]
ASF GitHub Bot commented on FLINK-4937:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2792#discussion_r89102066
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
---
@@ -115,14 +124,210 @@ object AggregateUtil {
intermediateRowArity,
outputType.getFieldCount)
}
+ groupReduceFunction
+ }
+
+ /**
+ * Create IncrementalAggregateReduceFunction for Incremental
aggregates. It implement
+ * [[org.apache.flink.api.common.functions.ReduceFunction]]
+ *
+ */
+ private[flink] def createIncrementalAggregateReduceFunction(
+ aggregates: Array[Aggregate[_ <: Any]],
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+ val groupingOffsetMapping =
+ getGroupingOffsetAndaggOffsetMapping(
+ namedAggregates,
+ inputType,
+ outputType,
+ groupings)._1
+ val intermediateRowArity = groupings.length +
aggregates.map(_.intermediateDataType.length).sum
+ val reduceFunction = new IncrementalAggregateReduceFunction(
+ aggregates,
+ groupingOffsetMapping,
+ intermediateRowArity)
+ reduceFunction
+ }
+
+ /**
+ * @return groupingOffsetMapping (mapping relation between field index
of intermediate
+ * aggregate Row and output Row.)
+ * and aggOffsetMapping (the mapping relation between aggregate
function index in list
+ * and its corresponding field index in output Row.)
+ */
+ def getGroupingOffsetAndaggOffsetMapping(
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+ // the mapping relation between field index of intermediate aggregate
Row and output Row.
+ val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType,
groupings)
+
+ // the mapping relation between aggregate function index in list and
its corresponding
+ // field index in output Row.
+ val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
- (mapFunction, reduceGroupFunction)
+ if (groupingOffsetMapping.length != groupings.length ||
+ aggOffsetMapping.length != namedAggregates.length) {
+ throw new TableException(
+ "Could not find output field in input data type " +
+ "or aggregate functions.")
+ }
+ (groupingOffsetMapping, aggOffsetMapping)
+ }
+
+
+ private[flink] def createAllWindowAggregationFunction(
+ window: LogicalWindow,
+ properties: Seq[NamedWindowProperty],
+ aggFunction: RichGroupReduceFunction[Row, Row])
+ : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+ if (isTimeWindow(window)) {
+ val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+ new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+ .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+ } else {
+ new AggregateAllWindowFunction(aggFunction)
+ }
+
+ }
+
+
+ private[flink] def createWindowAggregationFunction(
+ window: LogicalWindow,
+ properties: Seq[NamedWindowProperty],
+ aggFunction: RichGroupReduceFunction[Row, Row])
+ : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+ if (isTimeWindow(window)) {
+ val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+ new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+ .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+ } else {
+ new AggregateWindowFunction(aggFunction)
+ }
+
+ }
+
+ private[flink] def createAllWindowIncrementalAggregationFunction(
+ window: LogicalWindow,
+ aggregates: Array[Aggregate[_ <: Any]],
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ properties: Seq[NamedWindowProperty])
+ : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+ val (groupingOffsetMapping, aggOffsetMapping) =
+ getGroupingOffsetAndaggOffsetMapping(
+ namedAggregates,
+ inputType,
+ outputType,
+ groupings)
+
+ val finalRowArity = outputType.getFieldCount
+
+ if (isTimeWindow(window)) {
+ val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+ new IncrementalAggregateAllTimeWindowFunction(
+ aggregates,
+ groupingOffsetMapping,
+ aggOffsetMapping,
+ finalRowArity,
+ startPos,
+ endPos)
+ .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+ } else {
+ new IncrementalAggregateAllWindowFunction(
+ aggregates,
+ groupingOffsetMapping,
+ aggOffsetMapping,
+ finalRowArity)
+ }
+
+ }
+
+ private[flink] def createWindowIncrementalAggregationFunction(
+ window: LogicalWindow,
+ aggregates: Array[Aggregate[_ <: Any]],
--- End diff --
Compute `aggregates` from `namedAggregates`
> Add incremental group window aggregation for streaming Table API
> ----------------------------------------------------------------
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Affects Versions: 1.2.0
> Reporter: Fabian Hueske
> Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an
> incremental fashion. This means that the window collects all records and
> performs the aggregation when the window is closed instead of eagerly
> updating a partial aggregate for every added record. Since records are
> buffered, non-incremental aggregation requires more storage space than
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API
> features [incremental
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
> using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)