Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3735#discussion_r112002408
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
---
@@ -38,74 +38,58 @@ import org.apache.flink.util.{Collector, Preconditions}
* it does no final aggregate evaluation. It also includes the logic of
* [[DataSetSlideTimeWindowAggFlatMapFunction]].
*
- * @param aggregates aggregate functions
- * @param groupingKeysLength number of grouping keys
- * @param timeFieldPos position of aligned time field
+ * @param genAggregations Code-generated [[GeneratedAggregations]]
+ * @param keysAndAggregatesArity The total arity of keys and aggregates
* @param windowSize window size of the sliding window
* @param windowSlide window slide of the sliding window
* @param returnType return type of this function
*/
class DataSetSlideTimeWindowAggReduceGroupFunction(
- private val aggregates: Array[AggregateFunction[_ <: Any]],
- private val groupingKeysLength: Int,
- private val timeFieldPos: Int,
+ private val genAggregations: GeneratedAggregationsFunction,
+ private val keysAndAggregatesArity: Int,
private val windowSize: Long,
private val windowSlide: Long,
@transient private val returnType: TypeInformation[Row])
extends RichGroupReduceFunction[Row, Row]
with CombineFunction[Row, Row]
- with ResultTypeQueryable[Row] {
+ with ResultTypeQueryable[Row]
+ with Compiler[GeneratedAggregations] {
- Preconditions.checkNotNull(aggregates)
+ private val timeFieldPos = returnType.getArity - 1
+ private val intermediateWindowStartPos = keysAndAggregatesArity
protected var intermediateRow: Row = _
- // add one field to store window start
- protected val intermediateRowArity: Int = groupingKeysLength +
aggregates.length + 1
- protected val accumulatorList: Array[JArrayList[Accumulator]] =
Array.fill(aggregates.length) {
- new JArrayList[Accumulator](2)
- }
- private val intermediateWindowStartPos: Int = intermediateRowArity - 1
+ private var accumulators: Row = _
+
+ val LOG = LoggerFactory.getLogger(this.getClass)
+ private var function: GeneratedAggregations = _
override def open(config: Configuration) {
- intermediateRow = new Row(intermediateRowArity)
-
- // init lists with two empty accumulators
- var i = 0
- while (i < aggregates.length) {
- val accumulator = aggregates(i).createAccumulator()
- accumulatorList(i).add(accumulator)
- accumulatorList(i).add(accumulator)
- i += 1
- }
+ LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+ s"Code:\n$genAggregations.code")
+ val clazz = compile(
+ getClass.getClassLoader,
+ genAggregations.name,
+ genAggregations.code)
+ LOG.debug("Instantiating AggregateHelper.")
+ function = clazz.newInstance()
+
+ accumulators = function.createAccumulators()
+ intermediateRow = function.createOutputRow()
}
override def reduce(records: Iterable[Row], out: Collector[Row]): Unit =
{
// reset first accumulator
- var i = 0
- while (i < aggregates.length) {
- val accumulator = aggregates(i).createAccumulator()
- accumulatorList(i).set(0, accumulator)
- i += 1
- }
+ function.resetAccumulator(accumulators)
val iterator = records.iterator()
while (iterator.hasNext) {
val record = iterator.next()
// accumulate
- i = 0
- while (i < aggregates.length) {
- // insert received accumulator into acc list
- val newAcc = record.getField(groupingKeysLength +
i).asInstanceOf[Accumulator]
- accumulatorList(i).set(1, newAcc)
- // merge acc list
- val retAcc = aggregates(i).merge(accumulatorList(i))
- // insert result into acc list
- accumulatorList(i).set(0, retAcc)
- i += 1
- }
+ function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
// trigger tumbling evaluation
if (!iterator.hasNext) {
--- End diff --
move this behind the loop
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---