Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r112007816
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
    @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
       def setAggregationResults(accumulators: Row, output: Row)
     
       /**
    +    * Calculates the results from accumulators, and set the results to the 
output (with key offset)
    +    *
    +    * @param accumulators the accumulators (saved in a row) which contains 
the current
    +    *                     aggregated results
    +    * @param output       output results collected in a row
    +    */
    +  def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
    --- End diff --
    
    Actually, I'm not sure if we really need to implement a different code 
generation function. I had a look at the code generation code and think that we 
could just add a few more parameters to the current code gen method. Right now, 
the behavior of most generated methods can be exactly defined:
    
    - `createAccumulators()`: generates a `Row` with the accumulators for each 
provided `AggregationFunction`. Some methods to `GeneratedAggregations` expect 
a Row of accumulators with exactly this layout as one of their input 
parameters. In the following, this parameter is called `accs`.
    - `accumulate(accs, row)`: The `aggFields` parameter controls which fields 
of `row` are accumulated into which accumulator. We should rename this 
parameter to `accFields` though, IMO.
    - `retract(accs, row)`: same as for `accumulate`. We should add a separate 
parameter `retractFields: Array[Int]` though.
    - `setForwardedFields(input, output)`: The `fwdMapping` parameter controls 
which field of the input row is copied to which position of the output row. We 
could add an optional parameter to copy the `groupSetMapping` to the output as 
well.
    - `setAggregationResults(accs, output)`: The `aggMapping` parameter 
controls to which output fields the aggregation results are copied. If we add 
another parameter `partialResults: Boolean`, we can control whether to copy 
final results (`AggregateFunction.getValue()`) or partial results (the 
accumulator).
    - `createOutputRow()`: the `outputArity` parameter specfies the arity of 
the output row.
    - `mergeAccumulatorsPair(accs, other)`: **This is the only inflexible 
method**. We could change the behavior of the method as follows: The method 
expects as first parameter (`accs`) a Row with the same layout as generated by 
`createAccumulators`. The second parameter can be any row with accumulators at 
arbitrary positions. To enable the merging, we add a parameter `mergeMapping: 
Array[Int]` to the code generating function which defines which fields of the 
`other` parameter are merged with the fields in the `accs` Row. The method 
returns a Row with the default layout (as generated by `createAccumulators()`).
    - `resetAccumulator(accs)`: resets a Row of accumulators of the known 
layout.
    
    I haven't checked this thoroughly, but I think with these parameters, we 
can control the generated code sufficiently to support all aggregation 
operators for DataSet and DataStream, i.e., we can generate the currently 
existing functions such that they behave as the more specialized ones that you 
added. Since all code gen parameters (`accFields`, `retractFields`, 
`fwdMapping`, `groupSetMapping`, `aggMapping`, `partialResults`, `outputArity`, 
`mergeMapping`) can be independently set for each type of operator, this should 
give us the flexibility for all types for operators. We only need to 
parameterize the code generation method appropriately. 
    
    In addition, we could make all parameters `Option` and generate empty 
methods if the parameters for a function are not set. (This could also be a 
follow up issue, IMO)
    
    What do you think @shaoxuan-wang ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to