twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r241741665
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##########
 @@ -168,27 +170,23 @@ object AggregateUtil {
       generateRetraction: Boolean,
       consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = {
 
-    val (aggFields, aggregates, isDistinctAggs, accTypes, accSpecs) =
-      transformToAggregateFunctions(
+    val aggregateMetadata = extractAggregateMetadata(
         namedAggregates.map(_.getKey),
         inputRowType,
         consumeRetraction,
         tableConfig,
         isStateBackedDataViews = true)
 
-    val aggMapping = aggregates.indices.map(_ + groupings.length).toArray
-
-    val outputArity = groupings.length + aggregates.length
-
-    val aggregationStateType: RowTypeInfo = new RowTypeInfo(accTypes: _*)
+    val aggMapping = 
getAdjustedMapping(aggregateMetadata.getAggregateCallsCount, groupings.length)
 
 Review comment:
   Actually I thought that this function can be part of `AggregateMetadata`? 
Because you always apply it on the calls count.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to