[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895558#comment-15895558
 ] 

ASF GitHub Bot commented on FLINK-5963:
---------------------------------------

Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3472#discussion_r104282131
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
    @@ -87,47 +89,67 @@ class DataSetAggregate(
     
       override def translateToPlan(tableEnv: BatchTableEnvironment): 
DataSet[Row] = {
     
    -    val groupingKeys = grouping.indices.toArray
    -
    -    val mapFunction = AggregateUtil.createPrepareMapFunction(
    -      namedAggregates,
    -      grouping,
    -      inputType)
    -
    -    val groupReduceFunction = 
AggregateUtil.createAggregateGroupReduceFunction(
    -      namedAggregates,
    -      inputType,
    -      rowRelDataType,
    -      grouping,
    -      inGroupingSet)
    +    val (preAgg: Option[DataSetPreAggFunction],
    +          preAggType: Option[TypeInformation[Row]],
    +          finalAgg: GroupReduceFunction[Row, Row]) =
    +      AggregateUtil.createDataSetAggregateFunctions(
    +        namedAggregates,
    +        inputType,
    +        rowRelDataType,
    +        grouping,
    +        inGroupingSet)
     
         val inputDS = 
getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
     
         val aggString = aggregationToString(inputType, grouping, getRowType, 
namedAggregates, Nil)
    -    val prepareOpName = s"prepare select: ($aggString)"
    -    val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
     
         val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
     
    -    if (groupingKeys.length > 0) {
    +    if (grouping.length > 0) {
           // grouped aggregation
           val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
             s"select: ($aggString)"
     
    -      mappedInput.asInstanceOf[DataSet[Row]]
    -        .groupBy(groupingKeys: _*)
    -        .reduceGroup(groupReduceFunction)
    -        .returns(rowTypeInfo)
    -        .name(aggOpName)
    +      if (preAgg.isDefined) {
    +        inputDS
    +          // pre-aggregation
    +          .groupBy(grouping: _*)
    +          .combineGroup(preAgg.get)
    +          .returns(preAggType.get)
    +          .name(aggOpName)
    +          // final aggregation
    +          .groupBy(grouping.indices: _*)
    --- End diff --
    
    I played around this PR with different test modes, everything works very 
well. Just curious why you use grouping.indices as the grouping key here?


> Remove preparation mapper of DataSetAggregate
> ---------------------------------------------
>
>                 Key: FLINK-5963
>                 URL: https://issues.apache.org/jira/browse/FLINK-5963
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: Fabian Hueske
>            Assignee: Fabian Hueske
>            Priority: Minor
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to