Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3735#discussion_r112007816
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
---
@@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
def setAggregationResults(accumulators: Row, output: Row)
/**
+ * Calculates the results from accumulators, and set the results to the
output (with key offset)
+ *
+ * @param accumulators the accumulators (saved in a row) which contains
the current
+ * aggregated results
+ * @param output output results collected in a row
+ */
+ def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
--- End diff --
Actually, I'm not sure if we really need to implement a different code
generation function. I had a look at the code generation code and think that we
could just add a few more parameters to the current code gen method. Right now,
the behavior of most generated methods can be exactly defined:
- `createAccumulators()`: generates a `Row` with the accumulators for each
provided `AggregationFunction`. Some methods to `GeneratedAggregations` expect
a Row of accumulators with exactly this layout as one of their input
parameters. In the following, this parameter is called `accs`.
- `accumulate(accs, row)`: The `aggFields` parameter controls which fields
of `row` are accumulated into which accumulator. We should rename this
parameter to `accFields` though, IMO.
- `retract(accs, row)`: same as for `accumulate`. We should add a separate
parameter `retractFields: Array[Int]` though.
- `setForwardedFields(input, output)`: The `fwdMapping` parameter controls
which field of the input row is copied to which position of the output row. We
could add an optional parameter to copy the `groupSetMapping` to the output as
well.
- `setAggregationResults(accs, output)`: The `aggMapping` parameter
controls to which output fields the aggregation results are copied. If we add
another parameter `partialResults: Boolean`, we can control whether to copy
final results (`AggregateFunction.getValue()`) or partial results (the
accumulator).
- `createOutputRow()`: the `outputArity` parameter specfies the arity of
the output row.
- `mergeAccumulatorsPair(accs, other)`: **This is the only inflexible
method**. We could change the behavior of the method as follows: The method
expects as first parameter (`accs`) a Row with the same layout as generated by
`createAccumulators`. The second parameter can be any row with accumulators at
arbitrary positions. To enable the merging, we add a parameter `mergeMapping:
Array[Int]` to the code generating function which defines which fields of the
`other` parameter are merged with the fields in the `accs` Row. The method
returns a Row with the default layout (as generated by `createAccumulators()`).
- `resetAccumulator(accs)`: resets a Row of accumulators of the known
layout.
I haven't checked this thoroughly, but I think with these parameters, we
can control the generated code sufficiently to support all aggregation
operators for DataSet and DataStream, i.e., we can generate the currently
existing functions such that they behave as the more specialized ones that you
added. Since all code gen parameters (`accFields`, `retractFields`,
`fwdMapping`, `groupSetMapping`, `aggMapping`, `partialResults`, `outputArity`,
`mergeMapping`) can be independently set for each type of operator, this should
give us the flexibility for all types for operators. We only need to
parameterize the code generation method appropriately.
In addition, we could make all parameters `Option` and generate empty
methods if the parameters for a function are not set. (This could also be a
follow up issue, IMO)
What do you think @shaoxuan-wang ?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---