[FLINK-6479] [table] Fix IOOBE in DataStreamGroupWindowAggregate. This closes #3841.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f26a9116 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f26a9116 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f26a9116 Branch: refs/heads/master Commit: f26a911627204519a78ccc57b1f12b387d85e43b Parents: e0ab5f5 Author: sunjincheng121 <sunjincheng...@gmail.com> Authored: Mon May 8 16:41:31 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue May 9 18:50:20 2017 +0200 ---------------------------------------------------------------------- .../DataStreamGroupWindowAggregate.scala | 2 +- .../table/GroupWindowAggregationsITCase.scala | 32 +++++++++++++++ .../scala/stream/table/GroupWindowTest.scala | 41 +++++++++++++++++++- 3 files changed, 73 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f26a9116/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index 1be1896..c38e5af 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -134,7 +134,7 @@ class DataStreamGroupWindowAggregate( namedAggregates, namedProperties) - val keyedAggOpName = s"groupBy: (${groupingToString(schema.logicalType, grouping)}), " + + val keyedAggOpName = s"groupBy: (${groupingToString(inputSchema.logicalType, grouping)}), " + s"window: ($window), " + s"select: ($aggString)" val nonKeyedAggOpName = s"window: ($window), select: ($aggString)" http://git-wip-us.apache.org/repos/asf/flink/blob/f26a9116/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala index 2c027a9..846fe3e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala @@ -175,6 +175,38 @@ class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { "Hi,1,1,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testGroupWindowWithoutKeyInProjection(): Unit = { + val data = List( + (1L, 1, "Hi", 1, 1), + (2L, 2, "Hello", 2, 2), + (4L, 2, "Hello", 2, 2), + (8L, 3, "Hello world", 3, 3), + (16L, 3, "Hello world", 3, 3)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3, 'proctime.proctime) + + val weightAvgFun = new WeightedAvg + + val windowedTable = table + .window(Slide over 2.rows every 1.rows on 'proctime as 'w) + .groupBy('w, 'int2, 'int3, 'string) + .select(weightAvgFun('long, 'int)) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("12", "8", "2", "3", "1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } } object GroupWindowAggregationsITCase { http://git-wip-us.apache.org/repos/asf/flink/blob/f26a9116/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala index 0573ff3..ef071b7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.api.scala.stream.table import org.apache.flink.api.scala._ -import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge} import org.apache.flink.table.api.ValidationException import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.WindowReference @@ -792,6 +792,45 @@ class GroupWindowTest extends TableTestBase { } @Test + def testSlidingWindowWithUDAF(): Unit = { + val util = streamTestUtil() + val table = util.addTable[(Long, Int, String, Int, Int)]( + 'long, + 'int, + 'string, + 'int2, + 'int3, + 'proctime.proctime) + + val weightAvgFun = new WeightedAvg + + val windowedTable = table + .window(Slide over 2.rows every 1.rows on 'proctime as 'w) + .groupBy('w, 'int2, 'int3, 'string) + .select(weightAvgFun('long, 'int)) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowAggregate", + streamTableNode(0), + term("groupBy", "string, int2, int3"), + term("window", SlidingGroupWindow(WindowReference("w"), 'proctime, 2.rows, 1.rows)), + term( + "select", + "string", + "int2", + "int3", + "WeightedAvg(long, int) AS TMP_0") + ), + term("select","TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test def testSlideWindowStartEnd(): Unit = { val util = streamTestUtil() val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)