Repository: flink
Updated Branches:
  refs/heads/master 283f5efd5 -> 9be5cc42c


[FLINK-6240] [table] Add code generation for DataStream group window 
aggregations.

This closes #3694.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9be5cc42
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9be5cc42
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9be5cc42

Branch: refs/heads/master
Commit: 9be5cc42c02c258d3843c373b7350240c9570523
Parents: 283f5ef
Author: shaoxuan-wang <wshaox...@gmail.com>
Authored: Fri Apr 7 16:18:14 2017 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Mon Apr 17 22:45:55 2017 +0200

----------------------------------------------------------------------
 .../flink/table/codegen/CodeGenerator.scala     | 82 +++++++++++++++++---
 .../nodes/datastream/DataStreamAggregate.scala  | 14 +++-
 .../aggregate/AggregateAggFunction.scala        | 74 ++++++++----------
 .../table/runtime/aggregate/AggregateUtil.scala | 19 ++++-
 .../aggregate/GeneratedAggregations.scala       |  9 +++
 .../scala/stream/table/AggregationsITCase.scala | 31 +++++---
 ...ProcessingOverRangeProcessFunctionTest.scala | 23 ++++--
 7 files changed, 175 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index ee6fced..c6e3c9a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -266,7 +266,7 @@ class CodeGenerator(
      outputArity: Int)
   : GeneratedAggregationsFunction = {
 
-    def generateSetAggregationResults(
+    def genSetAggregationResults(
       accTypes: Array[String],
       aggs: Array[String],
       aggMapping: Array[Int]): String = {
@@ -293,7 +293,7 @@ class CodeGenerator(
          |  }""".stripMargin
     }
 
-    def generateAccumulate(
+    def genAccumulate(
      accTypes: Array[String],
      aggs: Array[String],
      parameters: Array[String]): String = {
@@ -317,7 +317,7 @@ class CodeGenerator(
          |  }""".stripMargin
     }
 
-    def generateRetract(
+    def genRetract(
       accTypes: Array[String],
       aggs: Array[String],
       parameters: Array[String]): String = {
@@ -341,7 +341,7 @@ class CodeGenerator(
          |  }""".stripMargin
     }
 
-    def generateCreateAccumulators(
+    def genCreateAccumulators(
         aggs: Array[String]): String = {
 
       val sig: String =
@@ -373,7 +373,7 @@ class CodeGenerator(
          |  }""".stripMargin
     }
 
-    def generateSetForwardedFields(
+    def genSetForwardedFields(
         forwardMapping: Array[(Int, Int)]): String = {
 
       val sig: String =
@@ -396,13 +396,68 @@ class CodeGenerator(
          |  }""".stripMargin
     }
 
-    def generateCreateOutputRow(outputArity: Int): String = {
+    def genCreateOutputRow(outputArity: Int): String = {
       j"""
          |  public org.apache.flink.types.Row createOutputRow() {
          |    return new org.apache.flink.types.Row($outputArity);
          |  }""".stripMargin
     }
 
+    def genMergeAccumulatorsPair(
+        accTypes: Array[String],
+        aggs: Array[String]): String = {
+
+      val sig: String =
+        j"""
+           |  public org.apache.flink.types.Row mergeAccumulatorsPair(
+           |    org.apache.flink.types.Row a,
+           |    org.apache.flink.types.Row b)
+           """.stripMargin
+      val merge: String = {
+        for (i <- aggs.indices) yield
+          j"""
+             |    ${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
+             |    ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField($i);
+             |    accList$i.set(0, aAcc$i);
+             |    accList$i.set(1, bAcc$i);
+             |    a.setField(
+             |      $i,
+             |      ${aggs(i)}.merge(accList$i));
+             """.stripMargin
+      }.mkString("\n")
+      val ret: String =
+        j"""
+           |      return a;
+           """.stripMargin
+
+      j"""$sig {
+         |$merge
+         |$ret
+         |  }""".stripMargin
+    }
+
+    def genMergeList(accTypes: Array[String]): String = {
+      {
+        for (i <- accTypes.indices) yield
+          j"""
+             |    java.util.ArrayList<${accTypes(i)}> accList$i;
+             """.stripMargin
+      }.mkString("\n")
+    }
+
+    def initMergeList(
+        accTypes: Array[String],
+        aggs: Array[String]): String = {
+      {
+        for (i <- accTypes.indices) yield
+          j"""
+             |    accList$i = new java.util.ArrayList<${accTypes(i)}>(2);
+             |    accList$i.add(${aggs(i)}.createAccumulator());
+             |    accList$i.add(${aggs(i)}.createAccumulator());
+             """.stripMargin
+      }.mkString("\n")
+    }
+
     // get unique function name
     val funcName = newName(name)
     // register UDAGGs
@@ -428,19 +483,22 @@ class CodeGenerator(
          |  extends 
org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
          |
          |  ${reuseMemberCode()}
+         |  ${genMergeList(accTypes)}
          |  public $funcName() throws Exception {
          |    ${reuseInitCode()}
+         |    ${initMergeList(accTypes, aggs)}
          |  }
          |  ${reuseConstructorCode(funcName)}
          |
          """.stripMargin
 
-    funcCode += generateSetAggregationResults(accTypes, aggs, aggMapping) + 
"\n"
-    funcCode += generateAccumulate(accTypes, aggs, parameters) + "\n"
-    funcCode += generateRetract(accTypes, aggs, parameters) + "\n"
-    funcCode += generateCreateAccumulators(aggs) + "\n"
-    funcCode += generateSetForwardedFields(fwdMapping) + "\n"
-    funcCode += generateCreateOutputRow(outputArity) + "\n"
+    funcCode += genSetAggregationResults(accTypes, aggs, aggMapping) + "\n"
+    funcCode += genAccumulate(accTypes, aggs, parameters) + "\n"
+    funcCode += genRetract(accTypes, aggs, parameters) + "\n"
+    funcCode += genCreateAccumulators(aggs) + "\n"
+    funcCode += genSetForwardedFields(fwdMapping) + "\n"
+    funcCode += genCreateOutputRow(outputArity) + "\n"
+    funcCode += genMergeAccumulatorsPair(accTypes, aggs) + "\n"
     funcCode += "}"
 
     GeneratedAggregationsFunction(funcName, funcCode)

http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
index 50f8281..62bcfc8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -30,6 +30,7 @@ import 
org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWin
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.plan.nodes.CommonAggregate
@@ -117,6 +118,11 @@ class DataStreamAggregate(
       s"select: ($aggString)"
     val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
 
+    val generator = new CodeGenerator(
+      tableEnv.getConfig,
+      false,
+      inputDS.getType)
+
     // grouped / keyed aggregation
     if (groupingKeys.length > 0) {
       val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(
@@ -133,10 +139,10 @@ class DataStreamAggregate(
 
       val (aggFunction, accumulatorRowType, aggResultRowType) =
         AggregateUtil.createDataStreamAggregateFunction(
+          generator,
           namedAggregates,
           inputType,
-          rowRelDataType,
-          grouping)
+          rowRelDataType)
 
       windowedStream
         .aggregate(aggFunction, windowFunction, accumulatorRowType, 
aggResultRowType, rowTypeInfo)
@@ -155,10 +161,10 @@ class DataStreamAggregate(
 
       val (aggFunction, accumulatorRowType, aggResultRowType) =
         AggregateUtil.createDataStreamAggregateFunction(
+          generator,
           namedAggregates,
           inputType,
-          rowRelDataType,
-          grouping)
+          rowRelDataType)
 
       windowedStream
         .aggregate(aggFunction, windowFunction, accumulatorRowType, 
aggResultRowType, rowTypeInfo)

http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index c608b97..377e0ff 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -18,69 +18,61 @@
 
 package org.apache.flink.table.runtime.aggregate
 
-import java.util.{ArrayList => JArrayList, List => JList}
-import org.apache.flink.api.common.functions.{AggregateFunction => 
DataStreamAggFunc}
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.functions.AggregateFunction
+import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
 import org.apache.flink.types.Row
+import org.slf4j.LoggerFactory
 
 /**
   * Aggregate Function used for the aggregate operator in
   * [[org.apache.flink.streaming.api.datastream.WindowedStream]]
   *
-  * @param aggregates       the list of all 
[[org.apache.flink.table.functions.AggregateFunction]]
-  *                         used for this aggregation
-  * @param aggFields   the position (in the input Row) of the input value for 
each aggregate
+  * @param genAggregations Generated aggregate helper function
   */
-class AggregateAggFunction(
-    private val aggregates: Array[AggregateFunction[_]],
-    private val aggFields: Array[Array[Int]])
-  extends DataStreamAggFunc[Row, Row, Row] {
+class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction)
+  extends AggregateFunction[Row, Row, Row] with 
Compiler[GeneratedAggregations] {
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
 
   override def createAccumulator(): Row = {
-    val accumulatorRow: Row = new Row(aggregates.length)
-    var i = 0
-    while (i < aggregates.length) {
-      accumulatorRow.setField(i, aggregates(i).createAccumulator())
-      i += 1
+    if (function == null) {
+      initFunction
     }
-    accumulatorRow
+    function.createAccumulators()
   }
 
   override def add(value: Row, accumulatorRow: Row): Unit = {
-
-    var i = 0
-    while (i < aggregates.length) {
-      val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator]
-      val v = value.getField(aggFields(i)(0))
-      aggregates(i).accumulate(acc, v)
-      i += 1
+    if (function == null) {
+      initFunction
     }
+    function.accumulate(accumulatorRow, value)
   }
 
   override def getResult(accumulatorRow: Row): Row = {
-    val output = new Row(aggFields.length)
-
-    var i = 0
-    while (i < aggregates.length) {
-      val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator]
-      output.setField(i, aggregates(i).getValue(acc))
-      i += 1
+    if (function == null) {
+      initFunction
     }
+    val output = function.createOutputRow()
+    function.setAggregationResults(accumulatorRow, output)
     output
   }
 
   override def merge(aAccumulatorRow: Row, bAccumulatorRow: Row): Row = {
-
-    var i = 0
-    while (i < aggregates.length) {
-      val aAcc = aAccumulatorRow.getField(i).asInstanceOf[Accumulator]
-      val bAcc = bAccumulatorRow.getField(i).asInstanceOf[Accumulator]
-      val accumulators: JList[Accumulator] = new JArrayList[Accumulator]()
-      accumulators.add(aAcc)
-      accumulators.add(bAcc)
-      aAccumulatorRow.setField(i, aggregates(i).merge(accumulators))
-      i += 1
+    if (function == null) {
+      initFunction
     }
-    aAccumulatorRow
+    function.mergeAccumulatorsPair(aAccumulatorRow, bAccumulatorRow)
+  }
+
+  def initFunction(): Unit = {
+    LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+                s"Code:\n$genAggregations.code")
+    val clazz = compile(
+      getClass.getClassLoader,
+      genAggregations.name,
+      genAggregations.code)
+    LOG.debug("Instantiating AggregateHelper.")
+    function = clazz.newInstance()
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 09d1a13..da57153 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -746,10 +746,10 @@ object AggregateUtil {
   }
 
   private[flink] def createDataStreamAggregateFunction(
+      generator: CodeGenerator,
       namedAggregates: Seq[CalcitePair[AggregateCall, String]],
       inputType: RelDataType,
-      outputType: RelDataType,
-      groupKeysIndex: Array[Int])
+      outputType: RelDataType)
     : (DataStreamAggFunction[Row, Row, Row], RowTypeInfo, RowTypeInfo) = {
 
     val (aggFields, aggregates) =
@@ -758,6 +758,19 @@ object AggregateUtil {
         inputType,
         needRetraction = false)
 
+    val aggMapping = aggregates.indices.toArray
+    val outputArity = aggregates.length
+
+    val genFunction = generator.generateAggregations(
+      "GroupingWindowAggregateHelper",
+      generator,
+      inputType,
+      aggregates,
+      aggFields,
+      aggMapping,
+      Array(),
+      outputArity)
+
     val aggregateMapping = getAggregateMapping(namedAggregates, outputType)
 
     if (aggregateMapping.length != namedAggregates.length) {
@@ -769,7 +782,7 @@ object AggregateUtil {
 
     val accumulatorRowType = createAccumulatorRowType(aggregates)
     val aggResultRowType = new RowTypeInfo(aggResultTypes: _*)
-    val aggFunction = new AggregateAggFunction(aggregates, aggFields)
+    val aggFunction = new AggregateAggFunction(genFunction)
 
     (aggFunction, accumulatorRowType, aggResultRowType)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
index e28f098..17a1128 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
@@ -75,4 +75,13 @@ abstract class GeneratedAggregations extends Function {
     */
   def createOutputRow(): Row
 
+  /**
+    * Merges two rows of accumulators into one row
+    *
+    * @param a First row of accumulators
+    * @param b The other row of accumulators
+    * @return A row with the merged accumulators of both input rows.
+    */
+  def mergeAccumulatorsPair(a: Row, b: Row): Row
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
index 3e7b66b..9f366a8 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
-import 
org.apache.flink.table.api.scala.stream.table.AggregationsITCase.TimestampWithEqualWatermark
+import 
org.apache.flink.table.api.scala.stream.table.AggregationsITCase.TimestampAndWatermarkWithOffset
 import org.apache.flink.table.api.scala.stream.utils.StreamITCase
 import org.apache.flink.types.Row
 import org.junit.Assert._
@@ -72,26 +72,38 @@ class AggregationsITCase extends 
StreamingMultipleProgramsTestBase {
 
   @Test
   def testEventTimeSessionGroupWindowOverTime(): Unit = {
+    //To verify the "merge" functionality, we create this test with the 
following characteristics:
+    // 1. set the Parallelism to 1, and have the test data out of order
+    // 2. create a waterMark with 10ms offset to delay the window emission by 
10ms
+    val sessionWindowTestdata = List(
+      (1L, 1, "Hello"),
+      (2L, 2, "Hello"),
+      (8L, 8, "Hello"),
+      (9L, 9, "Hello World"),
+      (4L, 4, "Hello"),
+      (16L, 16, "Hello"))
+
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setParallelism(1)
     val tEnv = TableEnvironment.getTableEnvironment(env)
     StreamITCase.testResults = mutable.MutableList()
 
     val stream = env
-      .fromCollection(data)
-      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+      .fromCollection(sessionWindowTestdata)
+      .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(10L))
     val table = stream.toTable(tEnv, 'long, 'int, 'string)
 
     val windowedTable = table
-      .window(Session withGap 7.milli on 'rowtime as 'w)
+      .window(Session withGap 5.milli on 'rowtime as 'w)
       .groupBy('w, 'string)
-      .select('string, 'int.count)
+      .select('string, 'int.count, 'int.sum)
 
     val results = windowedTable.toDataStream[Row]
     results.addSink(new StreamITCase.StringSink)
     env.execute()
 
-    val expected = Seq("Hello world,1", "Hello world,1", "Hello,2", "Hi,1")
+    val expected = Seq("Hello World,1,9", "Hello,1,16", "Hello,4,15")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
@@ -127,7 +139,7 @@ class AggregationsITCase extends 
StreamingMultipleProgramsTestBase {
 
     val stream = env
       .fromCollection(data)
-      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+      .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(0L))
     val table = stream.toTable(tEnv, 'long, 'int, 'string)
 
     val windowedTable = table
@@ -149,13 +161,14 @@ class AggregationsITCase extends 
StreamingMultipleProgramsTestBase {
 }
 
 object AggregationsITCase {
-  class TimestampWithEqualWatermark extends 
AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
+  class TimestampAndWatermarkWithOffset(
+    offset: Long) extends AssignerWithPunctuatedWatermarks[(Long, Int, 
String)] {
 
     override def checkAndGetNextWatermark(
         lastElement: (Long, Int, String),
         extractedTimestamp: Long)
       : Watermark = {
-      new Watermark(extractedTimestamp)
+      new Watermark(extractedTimestamp - offset)
     }
 
     override def extractTimestamp(

http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
index 3610898..adc84bf 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala
@@ -59,16 +59,16 @@ class BoundedProcessingOverRangeProcessFunctionTest {
         |public class BoundedOverAggregateHelper$33
         |  extends 
org.apache.flink.table.runtime.aggregate.GeneratedAggregations {
         |
-        |transient 
org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction
-        |  fmin = null;
+        |  transient 
org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction
+        |    fmin = null;
         |
-        |transient 
org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction
-        |  fmax = null;
+        |  transient 
org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction
+        |    fmax = null;
         |
         |  public BoundedOverAggregateHelper$33() throws Exception {
         |
-        |  fmin = 
(org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
-        |  org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+        |    fmin = 
(org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction)
+        |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
         |    
.deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn"
 +
         |    
"MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL"
 +
         |    
"nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbkDcXxs1apkP"
 +
@@ -77,8 +77,8 @@ class BoundedProcessingOverRangeProcessFunctionTest {
         |    
"5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc"
 +
         |    "mluZyRMb25nJOda0iCPo2ukAgAAeHA");
         |
-        |  fmax = 
(org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
-        |  org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
+        |    fmax = 
(org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction)
+        |    org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
         |    
.deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn"
 +
         |    
"MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL"
 +
         |    
"nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbu4_w_gPePlO"
 +
@@ -158,6 +158,13 @@ class BoundedProcessingOverRangeProcessFunctionTest {
         |  public org.apache.flink.types.Row createOutputRow() {
         |    return new org.apache.flink.types.Row(7);
         |  }
+        |
+        |  //The test won't use this method
+        |  public org.apache.flink.types.Row mergeAccumulatorsPair(
+        |    org.apache.flink.types.Row a,
+        |    org.apache.flink.types.Row b) {
+        |    return null;
+        |  }
         |}
       """.stripMargin
 

Reply via email to