[
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15686712#comment-15686712
]
ASF GitHub Bot commented on FLINK-4937:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2792#discussion_r89090109
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
---
@@ -115,14 +124,210 @@ object AggregateUtil {
intermediateRowArity,
outputType.getFieldCount)
}
+ groupReduceFunction
+ }
+
+ /**
+ * Create IncrementalAggregateReduceFunction for Incremental
aggregates. It implement
+ * [[org.apache.flink.api.common.functions.ReduceFunction]]
+ *
+ */
+ private[flink] def createIncrementalAggregateReduceFunction(
+ aggregates: Array[Aggregate[_ <: Any]],
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+ val groupingOffsetMapping =
+ getGroupingOffsetAndaggOffsetMapping(
+ namedAggregates,
+ inputType,
+ outputType,
+ groupings)._1
+ val intermediateRowArity = groupings.length +
aggregates.map(_.intermediateDataType.length).sum
+ val reduceFunction = new IncrementalAggregateReduceFunction(
+ aggregates,
+ groupingOffsetMapping,
+ intermediateRowArity)
+ reduceFunction
+ }
+
+ /**
+ * @return groupingOffsetMapping (mapping relation between field index
of intermediate
+ * aggregate Row and output Row.)
+ * and aggOffsetMapping (the mapping relation between aggregate
function index in list
+ * and its corresponding field index in output Row.)
+ */
+ def getGroupingOffsetAndaggOffsetMapping(
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+ // the mapping relation between field index of intermediate aggregate
Row and output Row.
+ val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType,
groupings)
+
+ // the mapping relation between aggregate function index in list and
its corresponding
+ // field index in output Row.
+ val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
- (mapFunction, reduceGroupFunction)
+ if (groupingOffsetMapping.length != groupings.length ||
+ aggOffsetMapping.length != namedAggregates.length) {
+ throw new TableException(
+ "Could not find output field in input data type " +
+ "or aggregate functions.")
+ }
+ (groupingOffsetMapping, aggOffsetMapping)
+ }
+
+
+ private[flink] def createAllWindowAggregationFunction(
+ window: LogicalWindow,
+ properties: Seq[NamedWindowProperty],
+ aggFunction: RichGroupReduceFunction[Row, Row])
+ : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+ if (isTimeWindow(window)) {
+ val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+ new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+ .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+ } else {
+ new AggregateAllWindowFunction(aggFunction)
+ }
+
+ }
+
+
+ private[flink] def createWindowAggregationFunction(
+ window: LogicalWindow,
+ properties: Seq[NamedWindowProperty],
+ aggFunction: RichGroupReduceFunction[Row, Row])
+ : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+ if (isTimeWindow(window)) {
+ val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+ new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+ .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+ } else {
+ new AggregateWindowFunction(aggFunction)
+ }
+
+ }
+
+ private[flink] def createAllWindowIncrementalAggregationFunction(
+ window: LogicalWindow,
+ aggregates: Array[Aggregate[_ <: Any]],
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ properties: Seq[NamedWindowProperty])
+ : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+ val (groupingOffsetMapping, aggOffsetMapping) =
+ getGroupingOffsetAndaggOffsetMapping(
+ namedAggregates,
+ inputType,
+ outputType,
+ groupings)
+
+ val finalRowArity = outputType.getFieldCount
+
+ if (isTimeWindow(window)) {
+ val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+ new IncrementalAggregateAllTimeWindowFunction(
+ aggregates,
+ groupingOffsetMapping,
+ aggOffsetMapping,
+ finalRowArity,
+ startPos,
+ endPos)
+ .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+ } else {
+ new IncrementalAggregateAllWindowFunction(
+ aggregates,
+ groupingOffsetMapping,
+ aggOffsetMapping,
+ finalRowArity)
+ }
+
+ }
+
+ private[flink] def createWindowIncrementalAggregationFunction(
+ window: LogicalWindow,
+ aggregates: Array[Aggregate[_ <: Any]],
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ inputType: RelDataType,
+ outputType: RelDataType,
+ groupings: Array[Int],
+ properties: Seq[NamedWindowProperty])
+ : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+ val (groupingOffsetMapping, aggOffsetMapping) =
+ getGroupingOffsetAndaggOffsetMapping(
+ namedAggregates,
+ inputType,
+ outputType,
+ groupings)
+
+ val finalRowArity = outputType.getFieldCount
+
+ if (isTimeWindow(window)) {
+ val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+ new IncrementalAggregateTimeWindowFunction(
+ aggregates,
+ groupingOffsetMapping,
+ aggOffsetMapping,
+ finalRowArity,
+ startPos,
+ endPos)
+ .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+ } else {
+ new IncrementalAggregateWindowFunction(
+ aggregates,
+ groupingOffsetMapping,
+ aggOffsetMapping,
+ finalRowArity)
+ }
+
+ }
+
+ private[flink] def isTimeWindow(window: LogicalWindow) = {
+ window match {
+ case ProcessingTimeTumblingGroupWindow(_, size) =>
isTimeInterval(size.resultType)
+ case ProcessingTimeSlidingGroupWindow(_, size, _) =>
isTimeInterval(size.resultType)
+ case ProcessingTimeSessionGroupWindow(_, _) => true
+ case EventTimeTumblingGroupWindow(_, _, size) =>
isTimeInterval(size.resultType)
+ case EventTimeSlidingGroupWindow(_, _, size, _) =>
isTimeInterval(size.resultType)
+ case EventTimeSessionGroupWindow(_, _, _) => true
+ }
+ }
+
+ private[flink] def computeWindowStartEndPropertyPos(properties:
Seq[NamedWindowProperty])
+ : (Option[Int], Option[Int]) = {
+
+ val propPos = properties.foldRight((None: Option[Int], None:
Option[Int], 0)) {
+ (p, x) => p match {
+ case NamedWindowProperty(name, prop) =>
+ prop match {
+ case WindowStart(_) if x._1.isDefined =>
+ throw new TableException("Duplicate WindowStart property
encountered. This is a bug.")
+ case WindowStart(_) =>
+ (Some(x._3), x._2, x._3 - 1)
+ case WindowEnd(_) if x._2.isDefined =>
+ throw new TableException("Duplicate WindowEnd property
encountered. This is a bug.")
+ case WindowEnd(_) =>
+ (x._1, Some(x._3), x._3 - 1)
+ }
+ }
+ }
+ (propPos._1, propPos._2)
}
- private def transformToAggregateFunctions(
- aggregateCalls: Seq[AggregateCall],
- inputType: RelDataType,
- groupKeysCount: Int): (Array[Int], Array[Aggregate[_ <: Any]]) = {
+ private[flink] def transformToAggregateFunctions(
--- End diff --
If we only use `namedAggregates` as parameters for the `create*Function`
methods, we can make this method private again if we add a method
```
private[flink] def doAllSupportPartialAggregation(aggregateCalls:
Seq[AggregateCall]): Boolean
```
> Add incremental group window aggregation for streaming Table API
> ----------------------------------------------------------------
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Affects Versions: 1.2.0
> Reporter: Fabian Hueske
> Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an
> incremental fashion. This means that the window collects all records and
> performs the aggregation when the window is closed instead of eagerly
> updating a partial aggregate for every added record. Since records are
> buffered, non-incremental aggregation requires more storage space than
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API
> features [incremental
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
> using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)