http://git-wip-us.apache.org/repos/asf/flink/blob/8f78824b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala new file mode 100644 index 0000000..2c027a9 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.table.GroupWindowAggregationsITCase.TimestampAndWatermarkWithOffset +import org.apache.flink.table.api.scala.stream.utils.StreamITCase +import org.apache.flink.table.functions.aggfunctions.CountAggFunction +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +/** + * We only test some aggregations until better testing of constructed DataStream + * programs is possible. + */ +class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase { + + val data = List( + (1L, 1, "Hi"), + (2L, 2, "Hello"), + (4L, 2, "Hello"), + (8L, 3, "Hello world"), + (16L, 3, "Hello world")) + + @Test + def testProcessingTimeSlidingGroupWindowOverCount(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime) + + val countFun = new CountAggFunction + val weightAvgFun = new WeightedAvg + + val windowedTable = table + .window(Slide over 2.rows every 1.rows on 'proctime as 'w) + .groupBy('w, 'string) + .select('string, countFun('int), 'int.avg, + weightAvgFun('long, 'int), weightAvgFun('int, 'int)) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("Hello world,1,3,8,3", "Hello world,2,3,12,3", "Hello,1,2,2,2", + "Hello,2,2,3,2", "Hi,1,1,1,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testEventTimeSessionGroupWindowOverTime(): Unit = { + //To verify the "merge" functionality, we create this test with the following characteristics: + // 1. set the Parallelism to 1, and have the test data out of order + // 2. create a waterMark with 10ms offset to delay the window emission by 10ms + val sessionWindowTestdata = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (8L, 8, "Hello"), + (9L, 9, "Hello World"), + (4L, 4, "Hello"), + (16L, 16, "Hello")) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val countFun = new CountAggFunction + val weightAvgFun = new WeightedAvgWithMerge + + val stream = env + .fromCollection(sessionWindowTestdata) + .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(10L)) + val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime) + + val windowedTable = table + .window(Session withGap 5.milli on 'rowtime as 'w) + .groupBy('w, 'string) + .select('string, countFun('int), 'int.avg, + weightAvgFun('long, 'int), weightAvgFun('int, 'int)) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("Hello World,1,9,9,9", "Hello,1,16,16,16", "Hello,4,3,5,5") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime) + val countFun = new CountAggFunction + val weightAvgFun = new WeightedAvg + + val windowedTable = table + .window(Tumble over 2.rows on 'proctime as 'w) + .groupBy('w) + .select(countFun('string), 'int.avg, + weightAvgFun('long, 'int), weightAvgFun('int, 'int)) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("2,1,1,1", "2,2,6,2") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testEventTimeTumblingWindow(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(0L)) + val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime) + val countFun = new CountAggFunction + val weightAvgFun = new WeightedAvg + + val windowedTable = table + .window(Tumble over 5.milli on 'rowtime as 'w) + .groupBy('w, 'string) + .select('string, countFun('string), 'int.avg, weightAvgFun('long, 'int), + weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "Hello world,1,3,8,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01", + "Hello world,1,3,16,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02", + "Hello,2,2,3,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", + "Hi,1,1,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } +} + +object GroupWindowAggregationsITCase { + class TimestampAndWatermarkWithOffset( + offset: Long) extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] { + + override def checkAndGetNextWatermark( + lastElement: (Long, Int, String), + extractedTimestamp: Long) + : Watermark = { + new Watermark(extractedTimestamp - offset) + } + + override def extractTimestamp( + element: (Long, Int, String), + previousElementTimestamp: Long): Long = { + element._1 + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/8f78824b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala index de6cbfa..0573ff3 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala @@ -185,11 +185,11 @@ class GroupWindowTest extends TableTestBase { .select('string.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", streamTableNode(0), @@ -229,7 +229,7 @@ class GroupWindowTest extends TableTestBase { .select('string, 'int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", streamTableNode(0), @@ -259,7 +259,7 @@ class GroupWindowTest extends TableTestBase { .select('string, 'int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", streamTableNode(0), @@ -268,10 +268,7 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "string"), term( "window", - TumblingGroupWindow( - WindowReference("w"), - 'proctime, - 2.rows)), + TumblingGroupWindow(WindowReference("w"), 'proctime, 2.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -289,7 +286,7 @@ class GroupWindowTest extends TableTestBase { .select('string, 'int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", streamTableNode(0), term("groupBy", "string"), term( @@ -317,7 +314,7 @@ class GroupWindowTest extends TableTestBase { .select('string, weightedAvg('long, 'int)) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", streamTableNode(0), term("groupBy", "string"), term( @@ -343,7 +340,7 @@ class GroupWindowTest extends TableTestBase { .select('string, 'int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", streamTableNode(0), @@ -374,7 +371,7 @@ class GroupWindowTest extends TableTestBase { .select('string, 'int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", streamTableNode(0), @@ -397,6 +394,32 @@ class GroupWindowTest extends TableTestBase { @Test def testEventTimeSlidingGroupWindowOverTime(): Unit = { val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) + + val windowedTable = table + .window(Slide over 8.milli every 10.milli on 'rowtime as 'w) + .groupBy('w, 'string) + .select('string, 'int.count) + + val expected = unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "string", "int", "rowtime") + ), + term("groupBy", "string"), + term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)), + term("select", "string", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + @Ignore // see comments in DataStreamGroupWindowAggregate + def testEventTimeSlidingGroupWindowOverCount(): Unit = { + val util = streamTestUtil() val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table @@ -405,7 +428,7 @@ class GroupWindowTest extends TableTestBase { .select('string, 'int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", streamTableNode(0), term("groupBy", "string"), term( @@ -434,12 +457,10 @@ class GroupWindowTest extends TableTestBase { .select('string, weightedAvg('long, 'int)) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", streamTableNode(0), term("groupBy", "string"), - term( - "window", - SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)), + term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)), term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") ) @@ -457,15 +478,10 @@ class GroupWindowTest extends TableTestBase { .select('string, 'int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", streamTableNode(0), term("groupBy", "string"), - term( - "window", - SessionGroupWindow( - WindowReference("w"), - 'long, - 7.milli)), + term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -485,12 +501,10 @@ class GroupWindowTest extends TableTestBase { .select('string, weightedAvg('long, 'int)) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", streamTableNode(0), term("groupBy", "string"), - term( - "window", - SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)), + term("window", SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)), term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") ) @@ -508,7 +522,7 @@ class GroupWindowTest extends TableTestBase { .select('string, 'int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", streamTableNode(0), @@ -538,7 +552,7 @@ class GroupWindowTest extends TableTestBase { .select('int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", streamTableNode(0), @@ -559,6 +573,31 @@ class GroupWindowTest extends TableTestBase { @Test def testAllEventTimeTumblingGroupWindowOverTime(): Unit = { val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) + + val windowedTable = table + .window(Tumble over 5.milli on 'rowtime as 'w) + .groupBy('w) + .select('int.count) + + val expected = unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "int", "rowtime") + ), + term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)), + term("select", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test + @Ignore // see comments in DataStreamGroupWindowAggregate + def testAllEventTimeTumblingGroupWindowOverCount(): Unit = { + val util = streamTestUtil() val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table @@ -567,7 +606,7 @@ class GroupWindowTest extends TableTestBase { .select('int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", streamTableNode(0), @@ -596,7 +635,7 @@ class GroupWindowTest extends TableTestBase { .select('int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", streamTableNode(0), @@ -626,7 +665,7 @@ class GroupWindowTest extends TableTestBase { .select('int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", streamTableNode(0), @@ -648,6 +687,31 @@ class GroupWindowTest extends TableTestBase { @Test def testAllEventTimeSlidingGroupWindowOverTime(): Unit = { val util = streamTestUtil() + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) + + val windowedTable = table + .window(Slide over 8.milli every 10.milli on 'rowtime as 'w) + .groupBy('w) + .select('int.count) + + val expected = unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "int", "rowtime") + ), + term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)), + term("select", "COUNT(int) AS TMP_0") + ) + + util.verifyTable(windowedTable, expected) + } + + @Test +// @Ignore // see comments in DataStreamGroupWindowAggregate + def testAllEventTimeSlidingGroupWindowOverCount(): Unit = { + val util = streamTestUtil() val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table @@ -656,19 +720,13 @@ class GroupWindowTest extends TableTestBase { .select('int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", streamTableNode(0), term("select", "int", "long") ), - term( - "window", - SlidingGroupWindow( - WindowReference("w"), - 'long, - 8.milli, - 10.milli)), + term("window", SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -686,7 +744,7 @@ class GroupWindowTest extends TableTestBase { .select('int.count) val expected = unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", unaryNode( "DataStreamCalc", streamTableNode(0), @@ -707,22 +765,22 @@ class GroupWindowTest extends TableTestBase { @Test def testTumbleWindowStartEnd(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) val windowedTable = table - .window(Tumble over 5.milli on 'long as 'w) + .window(Tumble over 5.milli on 'rowtime as 'w) .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) val expected = unaryNode( - "DataStreamAggregate", - streamTableNode(0), + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "string", "int", "rowtime") + ), term("groupBy", "string"), - term("window", - TumblingGroupWindow( - WindowReference("w"), - 'long, - 5.milli)), + term("window", TumblingGroupWindow(WindowReference("w"), 'rowtime, 5.milli)), term("select", "string", "COUNT(int) AS TMP_0", @@ -736,23 +794,22 @@ class GroupWindowTest extends TableTestBase { @Test def testSlideWindowStartEnd(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) val windowedTable = table - .window(Slide over 10.milli every 5.milli on 'long as 'w) + .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) val expected = unaryNode( - "DataStreamAggregate", - streamTableNode(0), + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "string", "int", "rowtime") + ), term("groupBy", "string"), - term("window", - SlidingGroupWindow( - WindowReference("w"), - 'long, - 10.milli, - 5.milli)), + term("window", SlidingGroupWindow(WindowReference("w"), 'rowtime, 10.milli, 5.milli)), term("select", "string", "COUNT(int) AS TMP_0", @@ -776,14 +833,10 @@ class GroupWindowTest extends TableTestBase { val expected = unaryNode( "DataStreamCalc", unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", streamTableNode(0), term("groupBy", "string"), - term("window", - SessionGroupWindow( - WindowReference("w"), - 'long, - 3.milli)), + term("window", SessionGroupWindow(WindowReference("w"), 'long, 3.milli)), term("select", "string", "COUNT(int) AS TMP_1", @@ -810,14 +863,10 @@ class GroupWindowTest extends TableTestBase { val expected = unaryNode( "DataStreamCalc", unaryNode( - "DataStreamAggregate", + "DataStreamGroupWindowAggregate", streamTableNode(0), term("groupBy", "string"), - term("window", - TumblingGroupWindow( - WindowReference("w"), - 'long, - 5.millis)), + term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.millis)), term("select", "string", "SUM(int) AS TMP_0", http://git-wip-us.apache.org/repos/asf/flink/blob/8f78824b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala index b6a6660..c72249a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala @@ -28,13 +28,6 @@ import org.junit.Test class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase { @Test(expected = classOf[ValidationException]) - def testSelectWithAggregation(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min) - } - - @Test(expected = classOf[ValidationException]) def testDistinct(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env)