Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3735#discussion_r112002707
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
 ---
    @@ -18,111 +18,77 @@
     package org.apache.flink.table.runtime.aggregate
     
     import java.lang.Iterable
    -import java.util.{ArrayList => JArrayList}
     
     import org.apache.flink.api.common.functions.RichGroupReduceFunction
     import org.apache.flink.configuration.Configuration
    -import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
    +import org.apache.flink.table.codegen.{Compiler, 
GeneratedAggregationsFunction}
     import org.apache.flink.types.Row
    -import org.apache.flink.util.{Collector, Preconditions}
    +import org.apache.flink.util.Collector
    +import org.slf4j.LoggerFactory
     
     /**
       * It wraps the aggregate logic inside of
       * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
       *
       * It is used for sliding on batch for both time and count-windows.
       *
    -  * @param aggregates aggregate functions.
    -  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
    -  *                         and output Row.
    -  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
    -  *                         index in output Row.
    -  * @param finalRowArity output row field count
    +  * @param genAggregations Code-generated [[GeneratedAggregations]]
    +  * @param keysAndAggregatesArity The total arity of keys and aggregates
       * @param finalRowWindowStartPos relative window-start position to last 
field of output row
       * @param finalRowWindowEndPos relative window-end position to last field 
of output row
       * @param windowSize size of the window, used to determine window-end for 
output row
       */
     class DataSetSlideWindowAggReduceGroupFunction(
    -    aggregates: Array[AggregateFunction[_ <: Any]],
    -    groupKeysMapping: Array[(Int, Int)],
    -    aggregateMapping: Array[(Int, Int)],
    -    finalRowArity: Int,
    +    genAggregations: GeneratedAggregationsFunction,
    +    keysAndAggregatesArity: Int,
         finalRowWindowStartPos: Option[Int],
         finalRowWindowEndPos: Option[Int],
         windowSize: Long)
    -  extends RichGroupReduceFunction[Row, Row] {
    -
    -  Preconditions.checkNotNull(aggregates)
    -  Preconditions.checkNotNull(groupKeysMapping)
    +  extends RichGroupReduceFunction[Row, Row]
    +    with Compiler[GeneratedAggregations] {
     
       private var collector: TimeWindowPropertyCollector = _
    +  protected val windowStartPos: Int = keysAndAggregatesArity
    +
       private var output: Row = _
    -  private val accumulatorStartPos: Int = groupKeysMapping.length
    -  protected val windowStartPos: Int = accumulatorStartPos + 
aggregates.length
    +  protected var accumulators: Row = _
     
    -  val accumulatorList: Array[JArrayList[Accumulator]] = 
Array.fill(aggregates.length) {
    -    new JArrayList[Accumulator](2)
    -  }
    +  val LOG = LoggerFactory.getLogger(this.getClass)
    +  protected var function: GeneratedAggregations = _
     
       override def open(config: Configuration) {
    -    output = new Row(finalRowArity)
    +    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
    +                s"Code:\n$genAggregations.code")
    +    val clazz = compile(
    +      getClass.getClassLoader,
    +      genAggregations.name,
    +      genAggregations.code)
    +    LOG.debug("Instantiating AggregateHelper.")
    +    function = clazz.newInstance()
    +
    +    output = function.createOutputRow()
    +    accumulators = function.createAccumulators()
         collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
    -
    -    // init lists with two empty accumulators
    -    var i = 0
    -    while (i < aggregates.length) {
    -      val accumulator = aggregates(i).createAccumulator()
    -      accumulatorList(i).add(accumulator)
    -      accumulatorList(i).add(accumulator)
    -      i += 1
    -    }
       }
     
       override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
     
    -    // reset first accumulator
    -    var i = 0
    -    while (i < aggregates.length) {
    -      aggregates(i).resetAccumulator(accumulatorList(i).get(0))
    -      i += 1
    -    }
    +    // reset accumulator
    +    function.resetAccumulator(accumulators)
     
         val iterator = records.iterator()
         while (iterator.hasNext) {
           val record = iterator.next()
     
    -      // accumulate
    -      i = 0
    -      while (i < aggregates.length) {
    -        // insert received accumulator into acc list
    -        val newAcc = record.getField(accumulatorStartPos + 
i).asInstanceOf[Accumulator]
    -        accumulatorList(i).set(1, newAcc)
    -        // merge acc list
    -        val retAcc = aggregates(i).merge(accumulatorList(i))
    -        // insert result into acc list
    -        accumulatorList(i).set(0, retAcc)
    -        i += 1
    -      }
    +      function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
     
           // check if this record is the last record
           if (!iterator.hasNext) {
    --- End diff --
    
    move this behind the loop


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to