[ 
https://issues.apache.org/jira/browse/FLINK-4937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15661168#comment-15661168
 ] 

ASF GitHub Bot commented on FLINK-4937:
---------------------------------------

Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2792#discussion_r87708645
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
    @@ -61,25 +61,108 @@ object AggregateUtil {
        * }}}
        *
        */
    -  def createOperatorFunctionsForAggregates(
    +    def createOperatorFunctionsForAggregates(
           namedAggregates: Seq[CalcitePair[AggregateCall, String]],
           inputType: RelDataType,
           outputType: RelDataType,
           groupings: Array[Int])
         : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
     
    -    val aggregateFunctionsAndFieldIndexes =
    -      transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
    -    // store the aggregate fields of each aggregate function, by the same 
order of aggregates.
    -    val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
    -    val aggregates = aggregateFunctionsAndFieldIndexes._2
    +       val (aggFieldIndexes, aggregates)  =
    +           transformToAggregateFunctions(namedAggregates.map(_.getKey),
    +             inputType, groupings.length)
     
    -    val mapReturnType: RowTypeInfo =
    -      createAggregateBufferDataType(groupings, aggregates, inputType)
    +        createOperatorFunctionsForAggregates(namedAggregates,
    +          inputType,
    +          outputType,
    +          groupings,
    +          aggregates,aggFieldIndexes)
    +    }
     
    -    val mapFunction = new AggregateMapFunction[Row, Row](
    -        aggregates, aggFieldIndexes, groupings,
    -        
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
    +    def createOperatorFunctionsForAggregates(
    +        namedAggregates: Seq[CalcitePair[AggregateCall, String]],
    +        inputType: RelDataType,
    +        outputType: RelDataType,
    +        groupings: Array[Int],
    +        aggregates:Array[Aggregate[_ <: Any]],
    +        aggFieldIndexes:Array[Int])
    +    : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row])= {
    +
    +      val mapFunction = createAggregateMapFunction(aggregates,
    +                        aggFieldIndexes, groupings, inputType)
    +
    +      // the mapping relation between field index of intermediate 
aggregate Row and output Row.
    +      val groupingOffsetMapping = getGroupKeysMapping(inputType, 
outputType, groupings)
    +
    +      // the mapping relation between aggregate function index in list and 
its corresponding
    +      // field index in output Row.
    +      val aggOffsetMapping = getAggregateMapping(namedAggregates, 
outputType)
    +
    +      if (groupingOffsetMapping.length != groupings.length ||
    +        aggOffsetMapping.length != namedAggregates.length) {
    +        throw new TableException("Could not find output field in input 
data type " +
    +          "or aggregate functions.")
    +      }
    +
    +      val allPartialAggregate = aggregates.map(_.supportPartial).forall(x 
=> x)
    +
    +      val intermediateRowArity = groupings.length +
    +                        aggregates.map(_.intermediateDataType.length).sum
    +
    +      val reduceGroupFunction =
    +        if (allPartialAggregate) {
    +          new AggregateReduceCombineFunction(
    +            aggregates,
    +            groupingOffsetMapping,
    +            aggOffsetMapping,
    +            intermediateRowArity,
    +            outputType.getFieldCount)
    +        }
    +        else {
    +          new AggregateReduceGroupFunction(
    +            aggregates,
    +            groupingOffsetMapping,
    +            aggOffsetMapping,
    +            intermediateRowArity,
    +            outputType.getFieldCount)
    +        }
    +
    +      (mapFunction, reduceGroupFunction)
    +  }
    +
    +  /**
    +    * Create Flink operator functions for Incremental aggregates.
    +    * It includes 2 implementations of Flink operator functions:
    +    * [[org.apache.flink.api.common.functions.MapFunction]] and
    +    * [[org.apache.flink.api.common.functions.ReduceFunction]]
    +    * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
    +    * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
    +    * format:
    +    *
    +    * {{{
    +    *                   avg(x) aggOffsetInRow = 2          count(z) 
aggOffsetInRow = 5
    +    *                             |                          |
    +    *                             v                          v
    +    *        +---------+---------+--------+--------+--------+--------+
    +    *        |groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
    +    *        +---------+---------+--------+--------+--------+--------+
    +    *                                              ^
    +    *                                              |
    +    *                               sum(y) aggOffsetInRow = 4
    +    * }}}
    +    *
    +    */
    --- End diff --
    
    It would be better to describe the meaning of the return value. Especially  
`Array[(Int, Int)],Array[(Int, Int)],Int`.


> Add incremental group window aggregation for streaming Table API
> ----------------------------------------------------------------
>
>                 Key: FLINK-4937
>                 URL: https://issues.apache.org/jira/browse/FLINK-4937
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>    Affects Versions: 1.2.0
>            Reporter: Fabian Hueske
>            Assignee: sunjincheng
>
> Group-window aggregates for streaming tables are currently not done in an 
> incremental fashion. This means that the window collects all records and 
> performs the aggregation when the window is closed instead of eagerly 
> updating a partial aggregate for every added record. Since records are 
> buffered, non-incremental aggregation requires more storage space than 
> incremental aggregation.
> The DataStream API which is used under the hood of the streaming Table API 
> features [incremental 
> aggregation|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#windowfunction-with-incremental-aggregation]
>  using a {{ReduceFunction}}.
> We should add support for incremental aggregation in group-windows.
> This is a follow-up task of FLINK-4691.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to