[
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15686707#comment-15686707
]
ASF GitHub Bot commented on FLINK-4937:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/2792#discussion_r89086247
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
---
@@ -39,66 +45,69 @@ object AggregateUtil {
type JavaList[T] = java.util.List[T]
/**
- * Create Flink operator functions for aggregates. It includes 2
implementations of Flink
- * operator functions:
- * [[org.apache.flink.api.common.functions.MapFunction]] and
- * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's
partial aggregate,
- * should also implement
[[org.apache.flink.api.common.functions.CombineFunction]] as well).
- * The output of [[org.apache.flink.api.common.functions.MapFunction]]
contains the
- * intermediate aggregate values of all aggregate function, it's stored
in Row by the following
- * format:
- *
- * {{{
- * avg(x) aggOffsetInRow = 2 count(z)
aggOffsetInRow = 5
- * | |
- * v v
- * +---------+---------+--------+--------+--------+--------+
- * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 |
- * +---------+---------+--------+--------+--------+--------+
- * ^
- * |
- * sum(y) aggOffsetInRow = 4
- * }}}
- *
- */
- def createOperatorFunctionsForAggregates(
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- inputType: RelDataType,
- outputType: RelDataType,
- groupings: Array[Int])
- : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
- val aggregateFunctionsAndFieldIndexes =
- transformToAggregateFunctions(namedAggregates.map(_.getKey),
inputType, groupings.length)
- // store the aggregate fields of each aggregate function, by the same
order of aggregates.
- val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
- val aggregates = aggregateFunctionsAndFieldIndexes._2
+ * Create prepare MapFunction for aggregates.
+ * The output of [[org.apache.flink.api.common.functions.MapFunction]]
contains the
+ * intermediate aggregate values of all aggregate function, it's stored
in Row by the following
+ * format:
+ *
+ * {{{
+ * avg(x) aggOffsetInRow = 2 count(z)
aggOffsetInRow = 5
+ * | |
+ * v v
+ * +---------+---------+--------+--------+--------+--------+
+ * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 |
+ * +---------+---------+--------+--------+--------+--------+
+ * ^
+ * |
+ * sum(y) aggOffsetInRow = 4
+ * }}}
+ *
+ */
+ private[flink] def createPrepareMapFunction(
+ aggregates: Array[Aggregate[_ <: Any]],
+ aggFieldIndexes: Array[Int],
+ groupings: Array[Int],
+ inputType:
+ RelDataType): MapFunction[Any, Row] = {
val mapReturnType: RowTypeInfo =
createAggregateBufferDataType(groupings, aggregates, inputType)
val mapFunction = new AggregateMapFunction[Row, Row](
- aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
- // the mapping relation between field index of intermediate aggregate
Row and output Row.
- val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType,
groupings)
-
- // the mapping relation between aggregate function index in list and
its corresponding
- // field index in output Row.
- val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
- if (groupingOffsetMapping.length != groupings.length ||
- aggOffsetMapping.length != namedAggregates.length) {
- throw new TableException("Could not find output field in input data
type " +
- "or aggregate functions.")
- }
-
- val allPartialAggregate = aggregates.map(_.supportPartial).forall(x =>
x)
+ aggregates,
+ aggFieldIndexes,
+ groupings,
+
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
- val intermediateRowArity = groupings.length +
aggregates.map(_.intermediateDataType.length).sum
+ mapFunction
+ }
- val reduceGroupFunction =
+ /**
+ * Create AggregateGroupReduceFunction for aggregates. It implement
+ * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if
it's partial aggregate,
--- End diff --
insert space after `GroupReduceFunction]]`
> Add incremental group window aggregation for streaming Table API
> ----------------------------------------------------------------
>
> Key: FLINK-4937
> URL: https://issues.apache.org/jira/browse/FLINK-4937
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Affects Versions: 1.2.0
> Reporter: Fabian Hueske
> Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an
> incremental fashion. This means that the window collects all records and
> performs the aggregation when the window is closed instead of eagerly
> updating a partial aggregate for every added record. Since records are
> buffered, non-incremental aggregation requires more storage space than
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API
> features [incremental
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
> using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)