[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895558#comment-15895558 ]
ASF GitHub Bot commented on FLINK-5963: --------------------------------------- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104282131 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala --- @@ -87,47 +89,67 @@ class DataSetAggregate( override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { - val groupingKeys = grouping.indices.toArray - - val mapFunction = AggregateUtil.createPrepareMapFunction( - namedAggregates, - grouping, - inputType) - - val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction( - namedAggregates, - inputType, - rowRelDataType, - grouping, - inGroupingSet) + val (preAgg: Option[DataSetPreAggFunction], + preAggType: Option[TypeInformation[Row]], + finalAgg: GroupReduceFunction[Row, Row]) = + AggregateUtil.createDataSetAggregateFunctions( + namedAggregates, + inputType, + rowRelDataType, + grouping, + inGroupingSet) val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil) - val prepareOpName = s"prepare select: ($aggString)" - val mappedInput = inputDS.map(mapFunction).name(prepareOpName) val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] - if (groupingKeys.length > 0) { + if (grouping.length > 0) { // grouped aggregation val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " + s"select: ($aggString)" - mappedInput.asInstanceOf[DataSet[Row]] - .groupBy(groupingKeys: _*) - .reduceGroup(groupReduceFunction) - .returns(rowTypeInfo) - .name(aggOpName) + if (preAgg.isDefined) { + inputDS + // pre-aggregation + .groupBy(grouping: _*) + .combineGroup(preAgg.get) + .returns(preAggType.get) + .name(aggOpName) + // final aggregation + .groupBy(grouping.indices: _*) --- End diff -- I played around this PR with different test modes, everything works very well. Just curious why you use grouping.indices as the grouping key here? > Remove preparation mapper of DataSetAggregate > --------------------------------------------- > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Affects Versions: 1.3.0 > Reporter: Fabian Hueske > Assignee: Fabian Hueske > Priority: Minor > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)