Repository: flink Updated Branches: refs/heads/master 28ab73750 -> f37988c19
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala index 084ee14..de6cbfa 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala @@ -20,9 +20,9 @@ package org.apache.flink.table.api.scala.stream.table import org.apache.flink.api.scala._ import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge +import org.apache.flink.table.api.ValidationException import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.{TableException, ValidationException} -import org.apache.flink.table.expressions.{RowtimeAttribute, WindowReference} +import org.apache.flink.table.expressions.WindowReference import org.apache.flink.table.plan.logical._ import org.apache.flink.table.utils.TableTestBase import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode} @@ -40,46 +40,10 @@ class GroupWindowTest extends TableTestBase { .select('string, 'string.start) // property in non windowed table } - @Test(expected = classOf[TableException]) - def testInvalidRowtime1(): Unit = { - val util = streamTestUtil() - // rowtime attribute must not be a field name - util.addTable[(Long, Int, String)]('rowtime, 'long, 'int, 'string) - } - - @Test(expected = classOf[ValidationException]) - def testInvalidRowtime2(): Unit = { - val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .select('string, 'int as 'rowtime) // rowtime attribute must not be an alias - } - - @Test(expected = classOf[ValidationException]) - def testInvalidRowtime3(): Unit = { - val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table.as('rowtime, 'myint, 'mystring) // rowtime attribute must not be an alias - } - - @Test(expected = classOf[ValidationException]) - def testInvalidRowtime4(): Unit = { - val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - // only rowtime is a valid time attribute in a stream environment - .window(Tumble over 50.milli on 'string as 'w) - .groupBy('w, 'string) - .select('string, 'int.count) - } - @Test(expected = classOf[ValidationException]) def testGroupByWithoutWindowAlias(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) table .window(Tumble over 5.milli on 'long as 'w) @@ -90,7 +54,7 @@ class GroupWindowTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testInvalidRowTimeRef(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) table .window(Tumble over 5.milli on 'long as 'w) @@ -104,10 +68,10 @@ class GroupWindowTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testInvalidTumblingSize(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) table - .window(Tumble over "WRONG" as 'w) // string is not a valid interval + .window(Tumble over "WRONG" on 'long as 'w) // string is not a valid interval .groupBy('w, 'string) .select('string, 'int.count) } @@ -127,10 +91,10 @@ class GroupWindowTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testInvalidSlidingSize(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) table - .window(Slide over "WRONG" every "WRONG" as 'w) // string is not a valid interval + .window(Slide over "WRONG" every "WRONG" on 'long as 'w) // string is not a valid interval .groupBy('w, 'string) .select('string, 'int.count) } @@ -138,10 +102,11 @@ class GroupWindowTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testInvalidSlidingSlide(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) table - .window(Slide over 12.rows every 1.minute as 'w) // row and time intervals may not be mixed + // row and time intervals may not be mixed + .window(Slide over 12.rows every 1.minute on 'long as 'w) .groupBy('w, 'string) .select('string, 'int.count) } @@ -161,10 +126,11 @@ class GroupWindowTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testInvalidSessionGap(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) table - .window(Session withGap 10.rows as 'w) // row interval is not valid for session windows + // row interval is not valid for session windows + .window(Session withGap 10.rows on 'long as 'w) .groupBy('w, 'string) .select('string, 'int.count) } @@ -172,10 +138,10 @@ class GroupWindowTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testInvalidWindowAlias1(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) table - .window(Session withGap 100.milli as 1 + 1) // expression instead of a symbol + .window(Session withGap 100.milli on 'long as 1 + 1) // expression instead of a symbol .groupBy('string) .select('string, 'int.count) } @@ -183,10 +149,11 @@ class GroupWindowTest extends TableTestBase { @Test(expected = classOf[ValidationException]) def testInvalidWindowAlias2(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) table - .window(Session withGap 100.milli as 'string) // field name "string" is already present + // field name "string" is already present + .window(Session withGap 100.milli on 'long as 'string) .groupBy('string) .select('string, 'int.count) } @@ -195,7 +162,7 @@ class GroupWindowTest extends TableTestBase { def testSessionUdAggWithInvalidArgs(): Unit = { val util = streamTestUtil() val weightedAvg = new WeightedAvgWithMerge - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) table .window(Session withGap 2.hours on 'rowtime as 'w) @@ -203,16 +170,17 @@ class GroupWindowTest extends TableTestBase { .select('string, weightedAvg('string, 'int)) // invalid UDAGG args } + @Ignore // TODO @Test def testMultiWindow(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) val windowedTable = table - .window(Tumble over 50.milli as 'w1) + .window(Tumble over 50.milli on 'proctime as 'w1) .groupBy('w1, 'string) - .select('string, 'int.count) - .window(Slide over 20.milli every 10.milli as 'w2) + .select('w.end as 'proctime, 'string, 'int.count) + .window(Slide over 20.milli every 10.milli on 'proctime as 'w2) .groupBy('w2) .select('string.count) @@ -230,8 +198,9 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "string"), term( "window", - ProcessingTimeTumblingGroupWindow( + TumblingGroupWindow( WindowReference("w1"), + 'proctime, 50.milli)), term("select", "string", "COUNT(int) AS TMP_0") ), @@ -239,9 +208,11 @@ class GroupWindowTest extends TableTestBase { ), term( "window", - ProcessingTimeSlidingGroupWindow( + SlidingGroupWindow( WindowReference("w2"), - 20.milli, 10.milli)), + 'proctime, + 20.milli, + 10.milli)), term("select", "COUNT(string) AS TMP_1") ) util.verifyTable(windowedTable, expected) @@ -250,10 +221,10 @@ class GroupWindowTest extends TableTestBase { @Test def testProcessingTimeTumblingGroupWindowOverTime(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) val windowedTable = table - .window(Tumble over 50.milli as 'w) + .window(Tumble over 50.milli on 'proctime as 'w) .groupBy('w, 'string) .select('string, 'int.count) @@ -262,13 +233,14 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "string", "int") + term("select", "string", "int", "proctime") ), term("groupBy", "string"), term( "window", - ProcessingTimeTumblingGroupWindow( + TumblingGroupWindow( WindowReference("w"), + 'proctime, 50.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -279,38 +251,10 @@ class GroupWindowTest extends TableTestBase { @Test def testProcessingTimeTumblingGroupWindowOverCount(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - val windowedTable = table - .window(Tumble over 2.rows as 'w) - .groupBy('w, 'string) - .select('string, 'int.count) - - val expected = unaryNode( - "DataStreamAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "string", "int") - ), - term("groupBy", "string"), - term( - "window", - ProcessingTimeTumblingGroupWindow( - WindowReference("w"), 2.rows)), - term("select", "string", "COUNT(int) AS TMP_0") - ) - - util.verifyTable(windowedTable, expected) - } - - @Test - def testEventTimeTumblingGroupWindowOverTime(): Unit = { - val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) val windowedTable = table - .window(Tumble over 5.milli on 'rowtime as 'w) + .window(Tumble over 2.rows on 'proctime as 'w) .groupBy('w, 'string) .select('string, 'int.count) @@ -319,15 +263,15 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "string", "int") + term("select", "string", "int", "proctime") ), term("groupBy", "string"), term( "window", - EventTimeTumblingGroupWindow( + TumblingGroupWindow( WindowReference("w"), - RowtimeAttribute(), - 5.milli)), + 'proctime, + 2.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -335,13 +279,12 @@ class GroupWindowTest extends TableTestBase { } @Test - @Ignore // see comments in DataStreamAggregate - def testEventTimeTumblingGroupWindowOverCount(): Unit = { + def testEventTimeTumblingGroupWindowOverTime(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table - .window(Tumble over 2.rows on 'rowtime as 'w) + .window(Tumble over 5.milli on 'long as 'w) .groupBy('w, 'string) .select('string, 'int.count) @@ -351,9 +294,10 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "string"), term( "window", - EventTimeTumblingGroupWindow( + TumblingGroupWindow( WindowReference("w"), - RowtimeAttribute(), 2.rows)), + 'long, + 5.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -363,7 +307,7 @@ class GroupWindowTest extends TableTestBase { @Test def testEventTimeTumblingGroupWindowWithUdAgg(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) val weightedAvg = new WeightedAvgWithMerge @@ -378,9 +322,9 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "string"), term( "window", - EventTimeTumblingGroupWindow( + TumblingGroupWindow( WindowReference("w"), - RowtimeAttribute(), + 'rowtime, 5.milli)), term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") ) @@ -391,10 +335,10 @@ class GroupWindowTest extends TableTestBase { @Test def testProcessingTimeSlidingGroupWindowOverTime(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) val windowedTable = table - .window(Slide over 50.milli every 50.milli as 'w) + .window(Slide over 50.milli every 50.milli on 'proctime as 'w) .groupBy('w, 'string) .select('string, 'int.count) @@ -403,14 +347,16 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "string", "int") + term("select", "string", "int", "proctime") ), term("groupBy", "string"), term( "window", - ProcessingTimeSlidingGroupWindow( + SlidingGroupWindow( WindowReference("w"), - 50.milli, 50.milli)), + 'proctime, + 50.milli, + 50.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -420,10 +366,10 @@ class GroupWindowTest extends TableTestBase { @Test def testProcessingTimeSlidingGroupWindowOverCount(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) val windowedTable = table - .window(Slide over 2.rows every 1.rows as 'w) + .window(Slide over 2.rows every 1.rows on 'proctime as 'w) .groupBy('w, 'string) .select('string, 'int.count) @@ -432,14 +378,16 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "string", "int") + term("select", "string", "int", "proctime") ), term("groupBy", "string"), term( "window", - ProcessingTimeSlidingGroupWindow( + SlidingGroupWindow( WindowReference("w"), - 2.rows, 1.rows)), + 'proctime, + 2.rows, + 1.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -449,40 +397,10 @@ class GroupWindowTest extends TableTestBase { @Test def testEventTimeSlidingGroupWindowOverTime(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - val windowedTable = table - .window(Slide over 8.milli every 10.milli on 'rowtime as 'w) - .groupBy('w, 'string) - .select('string, 'int.count) - - val expected = unaryNode( - "DataStreamAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "string", "int") - ), - term("groupBy", "string"), - term( - "window", - EventTimeSlidingGroupWindow( - WindowReference("w"), - RowtimeAttribute(), 8.milli, 10.milli)), - term("select", "string", "COUNT(int) AS TMP_0") - ) - - util.verifyTable(windowedTable, expected) - } - - @Test - @Ignore // see comments in DataStreamAggregate - def testEventTimeSlidingGroupWindowOverCount(): Unit = { - val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table - .window(Slide over 2.rows every 1.rows on 'rowtime as 'w) + .window(Slide over 8.milli every 10.milli on 'long as 'w) .groupBy('w, 'string) .select('string, 'int.count) @@ -492,9 +410,11 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "string"), term( "window", - EventTimeSlidingGroupWindow( + SlidingGroupWindow( WindowReference("w"), - RowtimeAttribute(), 2.rows, 1.rows)), + 'long, + 8.milli, + 10.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -504,7 +424,7 @@ class GroupWindowTest extends TableTestBase { @Test def testEventTimeSlidingGroupWindowWithUdAgg(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) val weightedAvg = new WeightedAvgWithMerge @@ -519,9 +439,7 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "string"), term( "window", - EventTimeSlidingGroupWindow( - WindowReference("w"), - RowtimeAttribute(), 8.milli, 10.milli)), + SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)), term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") ) @@ -531,26 +449,23 @@ class GroupWindowTest extends TableTestBase { @Test def testEventTimeSessionGroupWindowOverTime(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table - .window(Session withGap 7.milli on 'rowtime as 'w) + .window(Session withGap 7.milli on 'long as 'w) .groupBy('w, 'string) .select('string, 'int.count) val expected = unaryNode( "DataStreamAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "string", "int") - ), + streamTableNode(0), term("groupBy", "string"), term( "window", - EventTimeSessionGroupWindow( + SessionGroupWindow( WindowReference("w"), - RowtimeAttribute(), 7.milli)), + 'long, + 7.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -560,7 +475,7 @@ class GroupWindowTest extends TableTestBase { @Test def testEventTimeSessionGroupWindowWithUdAgg(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) val weightedAvg = new WeightedAvgWithMerge @@ -575,9 +490,7 @@ class GroupWindowTest extends TableTestBase { term("groupBy", "string"), term( "window", - EventTimeSessionGroupWindow( - WindowReference("w"), - RowtimeAttribute(), 7.milli)), + SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)), term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") ) @@ -587,10 +500,10 @@ class GroupWindowTest extends TableTestBase { @Test def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) val windowedTable = table - .window(Tumble over 50.milli as 'w) + .window(Tumble over 50.milli on 'proctime as 'w) .groupBy('w, 'string) .select('string, 'int.count) @@ -599,13 +512,14 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "string", "int") + term("select", "string", "int", "proctime") ), term("groupBy", "string"), term( "window", - ProcessingTimeTumblingGroupWindow( + TumblingGroupWindow( WindowReference("w"), + 'proctime, 50.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -616,10 +530,10 @@ class GroupWindowTest extends TableTestBase { @Test def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) val windowedTable = table - .window(Tumble over 2.rows as 'w) + .window(Tumble over 2.rows on 'proctime as 'w) .groupBy('w) .select('int.count) @@ -628,12 +542,13 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "int") + term("select", "int", "proctime") ), term( "window", - ProcessingTimeTumblingGroupWindow( + TumblingGroupWindow( WindowReference("w"), + 'proctime, 2.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -644,39 +559,10 @@ class GroupWindowTest extends TableTestBase { @Test def testAllEventTimeTumblingGroupWindowOverTime(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - val windowedTable = table - .window(Tumble over 5.milli on 'rowtime as 'w) - .groupBy('w) - .select('int.count) - - val expected = unaryNode( - "DataStreamAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "int") - ), - term( - "window", - EventTimeTumblingGroupWindow( - WindowReference("w"), - RowtimeAttribute(), 5.milli)), - term("select", "COUNT(int) AS TMP_0") - ) - - util.verifyTable(windowedTable, expected) - } - - @Test - @Ignore // see comments in DataStreamAggregate - def testAllEventTimeTumblingGroupWindowOverCount(): Unit = { - val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table - .window(Tumble over 2.rows on 'rowtime as 'w) + .window(Tumble over 5.milli on 'long as 'w) .groupBy('w) .select('int.count) @@ -685,27 +571,27 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "int") + term("select", "int", "long") ), term( "window", - EventTimeTumblingGroupWindow( + TumblingGroupWindow( WindowReference("w"), - RowtimeAttribute(), 2.rows)), + 'long, + 5.milli)), term("select", "COUNT(int) AS TMP_0") ) util.verifyTable(windowedTable, expected) } - @Test def testAllProcessingTimeSlidingGroupWindowOverTime(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) val windowedTable = table - .window(Slide over 50.milli every 50.milli as 'w) + .window(Slide over 50.milli every 50.milli on 'proctime as 'w) .groupBy('w) .select('int.count) @@ -714,13 +600,15 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "int") + term("select", "int", "proctime") ), term( "window", - ProcessingTimeSlidingGroupWindow( + SlidingGroupWindow( WindowReference("w"), - 50.milli, 50.milli)), + 'proctime, + 50.milli, + 50.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -730,10 +618,10 @@ class GroupWindowTest extends TableTestBase { @Test def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) val windowedTable = table - .window(Slide over 2.rows every 1.rows as 'w) + .window(Slide over 2.rows every 1.rows on 'proctime as 'w) .groupBy('w) .select('int.count) @@ -742,13 +630,15 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "int") + term("select", "int", "proctime") ), term( "window", - ProcessingTimeSlidingGroupWindow( + SlidingGroupWindow( WindowReference("w"), - 2.rows, 1.rows)), + 'proctime, + 2.rows, + 1.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -758,39 +648,10 @@ class GroupWindowTest extends TableTestBase { @Test def testAllEventTimeSlidingGroupWindowOverTime(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - val windowedTable = table - .window(Slide over 8.milli every 10.milli on 'rowtime as 'w) - .groupBy('w) - .select('int.count) - - val expected = unaryNode( - "DataStreamAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "int") - ), - term( - "window", - EventTimeSlidingGroupWindow( - WindowReference("w"), - RowtimeAttribute(), 8.milli, 10.milli)), - term("select", "COUNT(int) AS TMP_0") - ) - - util.verifyTable(windowedTable, expected) - } - - @Test - @Ignore // see comments in DataStreamAggregate - def testAllEventTimeSlidingGroupWindowOverCount(): Unit = { - val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table - .window(Slide over 2.rows every 1.rows on 'rowtime as 'w) + .window(Slide over 8.milli every 10.milli on 'long as 'w) .groupBy('w) .select('int.count) @@ -799,13 +660,15 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "int") + term("select", "int", "long") ), term( "window", - EventTimeSlidingGroupWindow( + SlidingGroupWindow( WindowReference("w"), - RowtimeAttribute(), 2.rows, 1.rows)), + 'long, + 8.milli, + 10.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -815,10 +678,10 @@ class GroupWindowTest extends TableTestBase { @Test def testAllEventTimeSessionGroupWindowOverTime(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table - .window(Session withGap 7.milli on 'rowtime as 'w) + .window(Session withGap 7.milli on 'long as 'w) .groupBy('w) .select('int.count) @@ -827,13 +690,14 @@ class GroupWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "int") + term("select", "int", "long") ), term( "window", - EventTimeSessionGroupWindow( + SessionGroupWindow( WindowReference("w"), - RowtimeAttribute(), 7.milli)), + 'long, + 7.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -843,25 +707,21 @@ class GroupWindowTest extends TableTestBase { @Test def testTumbleWindowStartEnd(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table - .window(Tumble over 5.milli on 'rowtime as 'w) + .window(Tumble over 5.milli on 'long as 'w) .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) val expected = unaryNode( "DataStreamAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "string", "int") - ), + streamTableNode(0), term("groupBy", "string"), term("window", - EventTimeTumblingGroupWindow( + TumblingGroupWindow( WindowReference("w"), - RowtimeAttribute(), + 'long, 5.milli)), term("select", "string", @@ -876,25 +736,21 @@ class GroupWindowTest extends TableTestBase { @Test def testSlideWindowStartEnd(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table - .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) + .window(Slide over 10.milli every 5.milli on 'long as 'w) .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) val expected = unaryNode( "DataStreamAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "string", "int") - ), + streamTableNode(0), term("groupBy", "string"), term("window", - EventTimeSlidingGroupWindow( + SlidingGroupWindow( WindowReference("w"), - RowtimeAttribute(), + 'long, 10.milli, 5.milli)), term("select", @@ -910,10 +766,10 @@ class GroupWindowTest extends TableTestBase { @Test def testSessionWindowStartWithTwoEnd(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table - .window(Session withGap 3.milli on 'rowtime as 'w) + .window(Session withGap 3.milli on 'long as 'w) .groupBy('w, 'string) .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 'w.end as 'we2) @@ -921,16 +777,12 @@ class GroupWindowTest extends TableTestBase { "DataStreamCalc", unaryNode( "DataStreamAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "string", "int") - ), + streamTableNode(0), term("groupBy", "string"), term("window", - EventTimeSessionGroupWindow( + SessionGroupWindow( WindowReference("w"), - RowtimeAttribute(), + 'long, 3.milli)), term("select", "string", @@ -947,10 +799,10 @@ class GroupWindowTest extends TableTestBase { @Test def testTumbleWindowWithDuplicateAggsAndProps(): Unit = { val util = streamTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table - .window(Tumble over 5.millis on 'rowtime as 'w) + .window(Tumble over 5.millis on 'long as 'w) .groupBy('w, 'string) .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 'x, 'w.start as 'x2, 'w.end as 'x3, 'w.end) @@ -959,16 +811,12 @@ class GroupWindowTest extends TableTestBase { "DataStreamCalc", unaryNode( "DataStreamAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "string", "int") - ), + streamTableNode(0), term("groupBy", "string"), term("window", - EventTimeTumblingGroupWindow( + TumblingGroupWindow( WindowReference("w"), - RowtimeAttribute(), + 'long, 5.millis)), term("select", "string", http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala index 4c0fea7..b097767 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala @@ -58,7 +58,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { StreamITCase.testResults = mutable.MutableList() StreamITCase.clear val stream = env.fromCollection(data) - val table = stream.toTable(tEnv, 'a, 'b, 'c) + val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) val countFun = new CountAggFunction val weightAvgFun = new WeightedAvg @@ -107,7 +107,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { ) val table = env .addSource(new RowTimeSourceFunction[(Int, Long, String)](data)) - .toTable(tEnv).as('a, 'b, 'c) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) val countFun = new CountAggFunction val weightAvgFun = new WeightedAvg @@ -173,7 +173,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { StreamITCase.testResults = mutable.MutableList() val stream = env.fromCollection(data) - val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + val table = stream.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) val windowedTable = table .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w) @@ -234,7 +234,8 @@ class OverWindowITCase extends StreamingWithStateTestBase { StreamITCase.clear val table = env.addSource[(Long, Int, String)]( - new RowTimeSourceFunction[(Long, Int, String)](data)).toTable(tEnv).as('a, 'b, 'c) + new RowTimeSourceFunction[(Long, Int, String)](data)) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) val windowedTable = table .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) @@ -295,7 +296,8 @@ class OverWindowITCase extends StreamingWithStateTestBase { StreamITCase.clear val table = env.addSource[(Long, Int, String)]( - new RowTimeSourceFunction[(Long, Int, String)](data)).toTable(tEnv).as('a, 'b, 'c) + new RowTimeSourceFunction[(Long, Int, String)](data)) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) val windowedTable = table .window( http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala index 7dea521..ea3ab22 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.api.scala.stream.table import org.apache.flink.api.scala._ -import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvgWithRetract} +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithRetract import org.apache.flink.table.api.{Table, ValidationException} import org.apache.flink.table.api.scala._ import org.apache.flink.table.utils.TableTestUtil._ @@ -27,7 +27,8 @@ import org.junit.Test class OverWindowTest extends TableTestBase { private val streamUtil: StreamTableTestUtil = streamTestUtil() - val table: Table = streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c) + val table: Table = streamUtil.addTable[(Int, String, Long)]("MyTable", + 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) @Test(expected = classOf[ValidationException]) def testInvalidWindowAlias(): Unit = { @@ -121,12 +122,12 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "b", "c", "PROCTIME() AS $3") + term("select", "a", "b", "c", "proctime") ), term("partitionBy", "b"), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), - term("select", "a", "b", "c", "PROCTIME", "WeightedAvgWithRetract(c, a) AS w0$o0") + term("select", "a", "b", "c", "proctime", "WeightedAvgWithRetract(c, a) AS w0$o0") ), term("select", "c", "w0$o0 AS _c1") ) @@ -150,16 +151,16 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") + term("select", "a", "c", "proctime") ), term("partitionBy", "a"), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"), term( "select", "a", "c", - "PROCTIME", + "proctime", "WeightedAvgWithRetract(c, a) AS w0$o0" ) ), @@ -183,11 +184,11 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") + term("select", "a", "c", "proctime") ), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0") + term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0") ), term("select", "a", "w0$o0 AS _c1") ) @@ -209,11 +210,11 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") + term("select", "a", "c", "proctime") ), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") + term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0") ), term("select", "c", "w0$o0 AS _c1") ) @@ -238,16 +239,16 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") + term("select", "a", "c", "proctime") ), term("partitionBy", "c"), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), term( "select", "a", "c", - "PROCTIME", + "proctime", "COUNT(a) AS w0$o0", "WeightedAvgWithRetract(c, a) AS w0$o1" ) @@ -280,12 +281,12 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") + term("select", "a", "c", "proctime") ), term("partitionBy", "c"), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "PROCTIME", + term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "WeightedAvgWithRetract(c, a) AS w0$o1") ), @@ -310,15 +311,15 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") + term("select", "a", "c", "proctime") ), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), term( "select", "a", "c", - "PROCTIME", + "proctime", "COUNT(a) AS w0$o0", "SUM(a) AS w0$o1" ) @@ -349,11 +350,11 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") + term("select", "a", "c", "proctime") ), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") + term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0") ), term("select", "c", "w0$o0 AS _c1") ) @@ -378,12 +379,12 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "b", "c", "ROWTIME() AS $3") + term("select", "a", "b", "c", "rowtime") ), term("partitionBy", "b"), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), - term("select", "a", "b", "c", "ROWTIME", + term("select", "a", "b", "c", "rowtime", "COUNT(b) AS w0$o0", "WeightedAvgWithRetract(c, a) AS w0$o1") ), @@ -410,16 +411,16 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "ROWTIME() AS $2") + term("select", "a", "c", "rowtime") ), term("partitionBy", "a"), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"), term( "select", "a", "c", - "ROWTIME", + "rowtime", "AVG(c) AS w0$o0", "WeightedAvgWithRetract(c, a) AS w0$o1" ) @@ -444,11 +445,11 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "ROWTIME() AS $2") + term("select", "a", "c", "rowtime") ), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "ROWTIME", "COUNT(c) AS w0$o0") + term("select", "a", "c", "rowtime", "COUNT(c) AS w0$o0") ), term("select", "a", "w0$o0 AS _c1") ) @@ -470,11 +471,11 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "ROWTIME() AS $2") + term("select", "a", "c", "rowtime") ), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0") + term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0") ), term("select", "c", "w0$o0 AS _c1") ) @@ -499,16 +500,16 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "ROWTIME() AS $2") + term("select", "a", "c", "rowtime") ), term("partitionBy", "c"), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), term( "select", "a", "c", - "ROWTIME", + "rowtime", "COUNT(a) AS w0$o0", "WeightedAvgWithRetract(c, a) AS w0$o1" ) @@ -542,12 +543,12 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "ROWTIME() AS $2") + term("select", "a", "c", "rowtime") ), term("partitionBy", "c"), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "ROWTIME", + term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "WeightedAvgWithRetract(c, a) AS w0$o1") ), @@ -572,15 +573,15 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "ROWTIME() AS $2") + term("select", "a", "c", "rowtime") ), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), term( "select", "a", "c", - "ROWTIME", + "rowtime", "COUNT(a) AS w0$o0", "SUM(a) AS w0$o1" ) @@ -611,11 +612,11 @@ class OverWindowTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "ROWTIME() AS $2") + term("select", "a", "c", "rowtime") ), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0") + term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0") ), term("select", "c", "w0$o0 AS _c1") ) http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala index d314c9a..d261e36 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala @@ -31,7 +31,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { @Test def testJavaScalaTableAPIEquality(): Unit = { val util = streamTestUtil() - val t = util.addTable[(Int, Long, String)]('int, 'long, 'string) + val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'rowtime.rowtime) val myCountFun = new CountAggFunction util.tEnv.registerFunction("myCountFun", myCountFun) @@ -40,7 +40,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // Expression / Scala API val resScala = t - .window(Slide over 4.rows every 2.rows as 'w) + .window(Slide over 4.hours every 2.hours on 'rowtime as 'w) .groupBy('w, 'string) .select( 'string, @@ -51,7 +51,7 @@ class GroupWindowStringExpressionTest extends TableTestBase { // String / Java API val resJava = t - .window(JSlide.over("4.rows").every("2.rows").as("w")) + .window(JSlide.over("4.hours").every("2.hours").on("rowtime").as("w")) .groupBy("w, string") .select( "string, " + http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala index 0a5e001..04016f1 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala @@ -29,7 +29,7 @@ class OverWindowStringExpressionTest extends TableTestBase { @Test def testPartitionedUnboundedOverRow(): Unit = { val util = streamTestUtil() - val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e) + val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime) val resScala = t .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w) @@ -44,7 +44,7 @@ class OverWindowStringExpressionTest extends TableTestBase { @Test def testUnboundedOverRow(): Unit = { val util = streamTestUtil() - val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e) + val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime) val resScala = t .window(SOver orderBy 'rowtime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) @@ -59,7 +59,7 @@ class OverWindowStringExpressionTest extends TableTestBase { @Test def testPartitionedBoundedOverRow(): Unit = { val util = streamTestUtil() - val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e) + val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime) val resScala = t .window(SOver partitionBy('a, 'd) orderBy 'rowtime preceding 10.rows as 'w) @@ -74,7 +74,7 @@ class OverWindowStringExpressionTest extends TableTestBase { @Test def testBoundedOverRow(): Unit = { val util = streamTestUtil() - val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e) + val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime) val resScala = t .window(SOver orderBy 'rowtime preceding 10.rows following CURRENT_ROW as 'w) @@ -89,7 +89,7 @@ class OverWindowStringExpressionTest extends TableTestBase { @Test def testPartitionedUnboundedOverRange(): Unit = { val util = streamTestUtil() - val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e) + val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime) val resScala = t .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) @@ -104,7 +104,7 @@ class OverWindowStringExpressionTest extends TableTestBase { @Test def testUnboundedOverRange(): Unit = { val util = streamTestUtil() - val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e) + val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime) val resScala = t .window(SOver orderBy 'rowtime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w) @@ -120,7 +120,7 @@ class OverWindowStringExpressionTest extends TableTestBase { @Test def testPartitionedBoundedOverRange(): Unit = { val util = streamTestUtil() - val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e) + val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime) val resScala = t .window(SOver partitionBy('a, 'c) orderBy 'rowtime preceding 10.minutes as 'w) @@ -135,7 +135,7 @@ class OverWindowStringExpressionTest extends TableTestBase { @Test def testBoundedOverRange(): Unit = { val util = streamTestUtil() - val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e) + val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime) val resScala = t .window(SOver orderBy 'rowtime preceding 4.hours following CURRENT_RANGE as 'w) http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala index dcd3c6c..05e1892 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala @@ -62,10 +62,10 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) - val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string) val windowedTable = table - .window(Slide over 5.milli every 2.milli on 'rowtime as 'w) + .window(Slide over 5.milli every 2.milli on 'long as 'w) .groupBy('w) .select('int.count, 'w.start, 'w.end) @@ -97,10 +97,10 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) - val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string) val windowedTable = table - .window(Slide over 10.milli every 5.milli on 'rowtime as 'w) + .window(Slide over 10.milli every 5.milli on 'long as 'w) .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) @@ -134,10 +134,10 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) - val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string) val windowedTable = table - .window(Slide over 5.milli every 4.milli on 'rowtime as 'w) + .window(Slide over 5.milli every 4.milli on 'long as 'w) .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) @@ -168,10 +168,10 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) - val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string) val windowedTable = table - .window(Slide over 5.milli every 10.milli on 'rowtime as 'w) + .window(Slide over 5.milli every 10.milli on 'long as 'w) .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) @@ -197,10 +197,10 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) - val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string) + val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string) val windowedTable = table - .window(Slide over 3.milli every 10.milli on 'rowtime as 'w) + .window(Slide over 3.milli every 10.milli on 'long as 'w) .groupBy('w, 'string) .select('string, 'int.count, 'w.start, 'w.end) @@ -225,7 +225,7 @@ class DataStreamAggregateITCase extends StreamingMultipleProgramsTestBase { .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) .map(t => (t._2, t._6)) - val table = stream.toTable(tEnv, 'int, 'string) + val table = stream.toTable(tEnv, 'int, 'string, 'rowtime.rowtime) val windowedTable = table .window(Slide over 3.milli every 10.milli on 'rowtime as 'w)