Repository: flink Updated Branches: refs/heads/master 283f5efd5 -> 9be5cc42c
[FLINK-6240] [table] Add code generation for DataStream group window aggregations. This closes #3694. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9be5cc42 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9be5cc42 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9be5cc42 Branch: refs/heads/master Commit: 9be5cc42c02c258d3843c373b7350240c9570523 Parents: 283f5ef Author: shaoxuan-wang <wshaox...@gmail.com> Authored: Fri Apr 7 16:18:14 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Mon Apr 17 22:45:55 2017 +0200 ---------------------------------------------------------------------- .../flink/table/codegen/CodeGenerator.scala | 82 +++++++++++++++++--- .../nodes/datastream/DataStreamAggregate.scala | 14 +++- .../aggregate/AggregateAggFunction.scala | 74 ++++++++---------- .../table/runtime/aggregate/AggregateUtil.scala | 19 ++++- .../aggregate/GeneratedAggregations.scala | 9 +++ .../scala/stream/table/AggregationsITCase.scala | 31 +++++--- ...ProcessingOverRangeProcessFunctionTest.scala | 23 ++++-- 7 files changed, 175 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index ee6fced..c6e3c9a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -266,7 +266,7 @@ class CodeGenerator( outputArity: Int) : GeneratedAggregationsFunction = { - def generateSetAggregationResults( + def genSetAggregationResults( accTypes: Array[String], aggs: Array[String], aggMapping: Array[Int]): String = { @@ -293,7 +293,7 @@ class CodeGenerator( | }""".stripMargin } - def generateAccumulate( + def genAccumulate( accTypes: Array[String], aggs: Array[String], parameters: Array[String]): String = { @@ -317,7 +317,7 @@ class CodeGenerator( | }""".stripMargin } - def generateRetract( + def genRetract( accTypes: Array[String], aggs: Array[String], parameters: Array[String]): String = { @@ -341,7 +341,7 @@ class CodeGenerator( | }""".stripMargin } - def generateCreateAccumulators( + def genCreateAccumulators( aggs: Array[String]): String = { val sig: String = @@ -373,7 +373,7 @@ class CodeGenerator( | }""".stripMargin } - def generateSetForwardedFields( + def genSetForwardedFields( forwardMapping: Array[(Int, Int)]): String = { val sig: String = @@ -396,13 +396,68 @@ class CodeGenerator( | }""".stripMargin } - def generateCreateOutputRow(outputArity: Int): String = { + def genCreateOutputRow(outputArity: Int): String = { j""" | public org.apache.flink.types.Row createOutputRow() { | return new org.apache.flink.types.Row($outputArity); | }""".stripMargin } + def genMergeAccumulatorsPair( + accTypes: Array[String], + aggs: Array[String]): String = { + + val sig: String = + j""" + | public org.apache.flink.types.Row mergeAccumulatorsPair( + | org.apache.flink.types.Row a, + | org.apache.flink.types.Row b) + """.stripMargin + val merge: String = { + for (i <- aggs.indices) yield + j""" + | ${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i); + | ${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField($i); + | accList$i.set(0, aAcc$i); + | accList$i.set(1, bAcc$i); + | a.setField( + | $i, + | ${aggs(i)}.merge(accList$i)); + """.stripMargin + }.mkString("\n") + val ret: String = + j""" + | return a; + """.stripMargin + + j"""$sig { + |$merge + |$ret + | }""".stripMargin + } + + def genMergeList(accTypes: Array[String]): String = { + { + for (i <- accTypes.indices) yield + j""" + | java.util.ArrayList<${accTypes(i)}> accList$i; + """.stripMargin + }.mkString("\n") + } + + def initMergeList( + accTypes: Array[String], + aggs: Array[String]): String = { + { + for (i <- accTypes.indices) yield + j""" + | accList$i = new java.util.ArrayList<${accTypes(i)}>(2); + | accList$i.add(${aggs(i)}.createAccumulator()); + | accList$i.add(${aggs(i)}.createAccumulator()); + """.stripMargin + }.mkString("\n") + } + // get unique function name val funcName = newName(name) // register UDAGGs @@ -428,19 +483,22 @@ class CodeGenerator( | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations { | | ${reuseMemberCode()} + | ${genMergeList(accTypes)} | public $funcName() throws Exception { | ${reuseInitCode()} + | ${initMergeList(accTypes, aggs)} | } | ${reuseConstructorCode(funcName)} | """.stripMargin - funcCode += generateSetAggregationResults(accTypes, aggs, aggMapping) + "\n" - funcCode += generateAccumulate(accTypes, aggs, parameters) + "\n" - funcCode += generateRetract(accTypes, aggs, parameters) + "\n" - funcCode += generateCreateAccumulators(aggs) + "\n" - funcCode += generateSetForwardedFields(fwdMapping) + "\n" - funcCode += generateCreateOutputRow(outputArity) + "\n" + funcCode += genSetAggregationResults(accTypes, aggs, aggMapping) + "\n" + funcCode += genAccumulate(accTypes, aggs, parameters) + "\n" + funcCode += genRetract(accTypes, aggs, parameters) + "\n" + funcCode += genCreateAccumulators(aggs) + "\n" + funcCode += genSetForwardedFields(fwdMapping) + "\n" + funcCode += genCreateOutputRow(outputArity) + "\n" + funcCode += genMergeAccumulatorsPair(accTypes, aggs) + "\n" funcCode += "}" GeneratedAggregationsFunction(funcName, funcCode) http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala index 50f8281..62bcfc8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamAggregate.scala @@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWin import org.apache.flink.table.api.StreamTableEnvironment import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.CodeGenerator import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.logical._ import org.apache.flink.table.plan.nodes.CommonAggregate @@ -117,6 +118,11 @@ class DataStreamAggregate( s"select: ($aggString)" val nonKeyedAggOpName = s"window: ($window), select: ($aggString)" + val generator = new CodeGenerator( + tableEnv.getConfig, + false, + inputDS.getType) + // grouped / keyed aggregation if (groupingKeys.length > 0) { val windowFunction = AggregateUtil.createAggregationGroupWindowFunction( @@ -133,10 +139,10 @@ class DataStreamAggregate( val (aggFunction, accumulatorRowType, aggResultRowType) = AggregateUtil.createDataStreamAggregateFunction( + generator, namedAggregates, inputType, - rowRelDataType, - grouping) + rowRelDataType) windowedStream .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo) @@ -155,10 +161,10 @@ class DataStreamAggregate( val (aggFunction, accumulatorRowType, aggResultRowType) = AggregateUtil.createDataStreamAggregateFunction( + generator, namedAggregates, inputType, - rowRelDataType, - grouping) + rowRelDataType) windowedStream .aggregate(aggFunction, windowFunction, accumulatorRowType, aggResultRowType, rowTypeInfo) http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala index c608b97..377e0ff 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala @@ -18,69 +18,61 @@ package org.apache.flink.table.runtime.aggregate -import java.util.{ArrayList => JArrayList, List => JList} -import org.apache.flink.api.common.functions.{AggregateFunction => DataStreamAggFunc} -import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.functions.AggregateFunction +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row +import org.slf4j.LoggerFactory /** * Aggregate Function used for the aggregate operator in * [[org.apache.flink.streaming.api.datastream.WindowedStream]] * - * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] - * used for this aggregation - * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param genAggregations Generated aggregate helper function */ -class AggregateAggFunction( - private val aggregates: Array[AggregateFunction[_]], - private val aggFields: Array[Array[Int]]) - extends DataStreamAggFunc[Row, Row, Row] { +class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction) + extends AggregateFunction[Row, Row, Row] with Compiler[GeneratedAggregations] { + + val LOG = LoggerFactory.getLogger(this.getClass) + private var function: GeneratedAggregations = _ override def createAccumulator(): Row = { - val accumulatorRow: Row = new Row(aggregates.length) - var i = 0 - while (i < aggregates.length) { - accumulatorRow.setField(i, aggregates(i).createAccumulator()) - i += 1 + if (function == null) { + initFunction } - accumulatorRow + function.createAccumulators() } override def add(value: Row, accumulatorRow: Row): Unit = { - - var i = 0 - while (i < aggregates.length) { - val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator] - val v = value.getField(aggFields(i)(0)) - aggregates(i).accumulate(acc, v) - i += 1 + if (function == null) { + initFunction } + function.accumulate(accumulatorRow, value) } override def getResult(accumulatorRow: Row): Row = { - val output = new Row(aggFields.length) - - var i = 0 - while (i < aggregates.length) { - val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator] - output.setField(i, aggregates(i).getValue(acc)) - i += 1 + if (function == null) { + initFunction } + val output = function.createOutputRow() + function.setAggregationResults(accumulatorRow, output) output } override def merge(aAccumulatorRow: Row, bAccumulatorRow: Row): Row = { - - var i = 0 - while (i < aggregates.length) { - val aAcc = aAccumulatorRow.getField(i).asInstanceOf[Accumulator] - val bAcc = bAccumulatorRow.getField(i).asInstanceOf[Accumulator] - val accumulators: JList[Accumulator] = new JArrayList[Accumulator]() - accumulators.add(aAcc) - accumulators.add(bAcc) - aAccumulatorRow.setField(i, aggregates(i).merge(accumulators)) - i += 1 + if (function == null) { + initFunction } - aAccumulatorRow + function.mergeAccumulatorsPair(aAccumulatorRow, bAccumulatorRow) + } + + def initFunction(): Unit = { + LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " + + s"Code:\n$genAggregations.code") + val clazz = compile( + getClass.getClassLoader, + genAggregations.name, + genAggregations.code) + LOG.debug("Instantiating AggregateHelper.") + function = clazz.newInstance() } } http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 09d1a13..da57153 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -746,10 +746,10 @@ object AggregateUtil { } private[flink] def createDataStreamAggregateFunction( + generator: CodeGenerator, namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, - outputType: RelDataType, - groupKeysIndex: Array[Int]) + outputType: RelDataType) : (DataStreamAggFunction[Row, Row, Row], RowTypeInfo, RowTypeInfo) = { val (aggFields, aggregates) = @@ -758,6 +758,19 @@ object AggregateUtil { inputType, needRetraction = false) + val aggMapping = aggregates.indices.toArray + val outputArity = aggregates.length + + val genFunction = generator.generateAggregations( + "GroupingWindowAggregateHelper", + generator, + inputType, + aggregates, + aggFields, + aggMapping, + Array(), + outputArity) + val aggregateMapping = getAggregateMapping(namedAggregates, outputType) if (aggregateMapping.length != namedAggregates.length) { @@ -769,7 +782,7 @@ object AggregateUtil { val accumulatorRowType = createAccumulatorRowType(aggregates) val aggResultRowType = new RowTypeInfo(aggResultTypes: _*) - val aggFunction = new AggregateAggFunction(aggregates, aggFields) + val aggFunction = new AggregateAggFunction(genFunction) (aggFunction, accumulatorRowType, aggResultRowType) } http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala index e28f098..17a1128 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala @@ -75,4 +75,13 @@ abstract class GeneratedAggregations extends Function { */ def createOutputRow(): Row + /** + * Merges two rows of accumulators into one row + * + * @param a First row of accumulators + * @param b The other row of accumulators + * @return A row with the merged accumulators of both input rows. + */ + def mergeAccumulatorsPair(a: Row, b: Row): Row + } http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala index 3e7b66b..9f366a8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala @@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.scala.stream.table.AggregationsITCase.TimestampWithEqualWatermark +import org.apache.flink.table.api.scala.stream.table.AggregationsITCase.TimestampAndWatermarkWithOffset import org.apache.flink.table.api.scala.stream.utils.StreamITCase import org.apache.flink.types.Row import org.junit.Assert._ @@ -72,26 +72,38 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { @Test def testEventTimeSessionGroupWindowOverTime(): Unit = { + //To verify the "merge" functionality, we create this test with the following characteristics: + // 1. set the Parallelism to 1, and have the test data out of order + // 2. create a waterMark with 10ms offset to delay the window emission by 10ms + val sessionWindowTestdata = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (8L, 8, "Hello"), + (9L, 9, "Hello World"), + (4L, 4, "Hello"), + (16L, 16, "Hello")) + val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() val stream = env - .fromCollection(data) - .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + .fromCollection(sessionWindowTestdata) + .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(10L)) val table = stream.toTable(tEnv, 'long, 'int, 'string) val windowedTable = table - .window(Session withGap 7.milli on 'rowtime as 'w) + .window(Session withGap 5.milli on 'rowtime as 'w) .groupBy('w, 'string) - .select('string, 'int.count) + .select('string, 'int.count, 'int.sum) val results = windowedTable.toDataStream[Row] results.addSink(new StreamITCase.StringSink) env.execute() - val expected = Seq("Hello world,1", "Hello world,1", "Hello,2", "Hi,1") + val expected = Seq("Hello World,1,9", "Hello,1,16", "Hello,4,15") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } @@ -127,7 +139,7 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) - .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(0L)) val table = stream.toTable(tEnv, 'long, 'int, 'string) val windowedTable = table @@ -149,13 +161,14 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { } object AggregationsITCase { - class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] { + class TimestampAndWatermarkWithOffset( + offset: Long) extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] { override def checkAndGetNextWatermark( lastElement: (Long, Int, String), extractedTimestamp: Long) : Watermark = { - new Watermark(extractedTimestamp) + new Watermark(extractedTimestamp - offset) } override def extractTimestamp( http://git-wip-us.apache.org/repos/asf/flink/blob/9be5cc42/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala index 3610898..adc84bf 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala @@ -59,16 +59,16 @@ class BoundedProcessingOverRangeProcessFunctionTest { |public class BoundedOverAggregateHelper$33 | extends org.apache.flink.table.runtime.aggregate.GeneratedAggregations { | - |transient org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction - | fmin = null; + | transient org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction + | fmin = null; | - |transient org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction - | fmax = null; + | transient org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction + | fmax = null; | | public BoundedOverAggregateHelper$33() throws Exception { | - | fmin = (org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction) - | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils + | fmin = (org.apache.flink.table.functions.aggfunctions.LongMinWithRetractAggFunction) + | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" + | "MuTG9uZ01pbldpdGhSZXRyYWN0QWdnRnVuY3Rpb26oIdX_DaMPxQIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" + | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWluV2l0aFJldHJhY3RBZ2dGdW5jdGlvbkDcXxs1apkP" + @@ -77,8 +77,8 @@ class BoundedProcessingOverRangeProcessFunctionTest { | "5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvbi0B91QxuAyTAgAAeHBzcgAZc2NhbGEubWF0aC5PcmRlc" + | "mluZyRMb25nJOda0iCPo2ukAgAAeHA"); | - | fmax = (org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction) - | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils + | fmax = (org.apache.flink.table.functions.aggfunctions.LongMaxWithRetractAggFunction) + | org.apache.flink.table.functions.utils.UserDefinedFunctionUtils | .deserialize("rO0ABXNyAEtvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbn" + | "MuTG9uZ01heFdpdGhSZXRyYWN0QWdnRnVuY3Rpb25RmsI8azNGXwIAAHhyAEdvcmcuYXBhY2hlLmZsaW5rL" + | "nRhYmxlLmZ1bmN0aW9ucy5hZ2dmdW5jdGlvbnMuTWF4V2l0aFJldHJhY3RBZ2dGdW5jdGlvbu4_w_gPePlO" + @@ -158,6 +158,13 @@ class BoundedProcessingOverRangeProcessFunctionTest { | public org.apache.flink.types.Row createOutputRow() { | return new org.apache.flink.types.Row(7); | } + | + | //The test won't use this method + | public org.apache.flink.types.Row mergeAccumulatorsPair( + | org.apache.flink.types.Row a, + | org.apache.flink.types.Row b) { + | return null; + | } |} """.stripMargin