http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala index 63dc1ae..31ad558 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala @@ -24,7 +24,7 @@ import org.apache.calcite.sql.{SqlFunction, SqlOperator, SqlOperatorTable} import org.apache.flink.table.api.ValidationException import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.utils.{AggSqlFunction, ScalarSqlFunction, TableSqlFunction} -import org.apache.flink.table.functions.{AggregateFunction, EventTimeExtractor, RowTime, ScalarFunction, TableFunction, _} +import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction} import scala.collection.JavaConversions._ import scala.collection.mutable @@ -242,15 +242,11 @@ object FunctionCatalog { // array "cardinality" -> classOf[ArrayCardinality], "at" -> classOf[ArrayElementAt], - "element" -> classOf[ArrayElement], + "element" -> classOf[ArrayElement] // TODO implement function overloading here // "floor" -> classOf[TemporalFloor] // "ceil" -> classOf[TemporalCeil] - - // extensions to support streaming query - "rowtime" -> classOf[RowTime], - "proctime" -> classOf[ProcTime] ) /** @@ -392,8 +388,6 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.ROUND, SqlStdOperatorTable.PI, // EXTENSIONS - EventTimeExtractor, - ProcTimeExtractor, SqlStdOperatorTable.TUMBLE, SqlStdOperatorTable.TUMBLE_START, SqlStdOperatorTable.TUMBLE_END,
http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java index cab3855..81c60b4 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java @@ -406,15 +406,6 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase { } @Test(expected = TableException.class) - public void testAsWithToFewFields() throws Exception { - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - // Must fail. Not enough field names specified. - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b"); - } - - @Test(expected = TableException.class) public void testAsWithToManyFields() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala index 9939a9c..faacc54 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala @@ -93,7 +93,8 @@ class TableEnvironmentTest extends TableTestBase { UnresolvedFieldReference("name1"), UnresolvedFieldReference("name2"), UnresolvedFieldReference("name3") - )) + ), + ignoreTimeAttributes = true) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) @@ -107,7 +108,8 @@ class TableEnvironmentTest extends TableTestBase { UnresolvedFieldReference("name1"), UnresolvedFieldReference("name2"), UnresolvedFieldReference("name3") - )) + ), + ignoreTimeAttributes = true) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) @@ -121,7 +123,8 @@ class TableEnvironmentTest extends TableTestBase { UnresolvedFieldReference("name1"), UnresolvedFieldReference("name2"), UnresolvedFieldReference("name3") - )) + ), + ignoreTimeAttributes = true) } @Test @@ -132,7 +135,8 @@ class TableEnvironmentTest extends TableTestBase { UnresolvedFieldReference("pf3"), UnresolvedFieldReference("pf1"), UnresolvedFieldReference("pf2") - )) + ), + ignoreTimeAttributes = true) fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1)) @@ -142,7 +146,8 @@ class TableEnvironmentTest extends TableTestBase { def testGetFieldInfoAtomicName1(): Unit = { val fieldInfo = tEnv.getFieldInfo( atomicType, - Array(UnresolvedFieldReference("name")) + Array(UnresolvedFieldReference("name")), + ignoreTimeAttributes = true ) fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1)) @@ -156,7 +161,8 @@ class TableEnvironmentTest extends TableTestBase { Array( UnresolvedFieldReference("name1"), UnresolvedFieldReference("name2") - )) + ), + ignoreTimeAttributes = true) } @Test @@ -167,7 +173,8 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("f0"), "name1"), Alias(UnresolvedFieldReference("f1"), "name2"), Alias(UnresolvedFieldReference("f2"), "name3") - )) + ), + ignoreTimeAttributes = true) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) @@ -181,7 +188,8 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("f2"), "name1"), Alias(UnresolvedFieldReference("f0"), "name2"), Alias(UnresolvedFieldReference("f1"), "name3") - )) + ), + ignoreTimeAttributes = true) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1)) @@ -195,7 +203,8 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("xxx"), "name1"), Alias(UnresolvedFieldReference("yyy"), "name2"), Alias(UnresolvedFieldReference("zzz"), "name3") - )) + ), + ignoreTimeAttributes = true) } @Test @@ -206,7 +215,8 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("cf1"), "name1"), Alias(UnresolvedFieldReference("cf2"), "name2"), Alias(UnresolvedFieldReference("cf3"), "name3") - )) + ), + ignoreTimeAttributes = true) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) @@ -220,7 +230,8 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("cf3"), "name1"), Alias(UnresolvedFieldReference("cf1"), "name2"), Alias(UnresolvedFieldReference("cf2"), "name3") - )) + ), + ignoreTimeAttributes = true) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1)) @@ -234,7 +245,8 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("xxx"), "name1"), Alias(UnresolvedFieldReference("yyy"), "name2"), Alias(UnresolvedFieldReference("zzz"), "name3") - )) + ), + ignoreTimeAttributes = true) } @Test @@ -245,7 +257,8 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("pf1"), "name1"), Alias(UnresolvedFieldReference("pf2"), "name2"), Alias(UnresolvedFieldReference("pf3"), "name3") - )) + ), + ignoreTimeAttributes = true) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) @@ -259,7 +272,8 @@ class TableEnvironmentTest extends TableTestBase { Alias(UnresolvedFieldReference("pf3"), "name1"), Alias(UnresolvedFieldReference("pf1"), "name2"), Alias(UnresolvedFieldReference("pf2"), "name3") - )) + ), + ignoreTimeAttributes = true) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1)) @@ -272,8 +286,9 @@ class TableEnvironmentTest extends TableTestBase { Array( Alias(UnresolvedFieldReference("xxx"), "name1"), Alias(UnresolvedFieldReference("yyy"), "name2"), - Alias( UnresolvedFieldReference("zzz"), "name3") - )) + Alias(UnresolvedFieldReference("zzz"), "name3") + ), + ignoreTimeAttributes = true) } @Test(expected = classOf[TableException]) @@ -282,12 +297,16 @@ class TableEnvironmentTest extends TableTestBase { atomicType, Array( Alias(UnresolvedFieldReference("name1"), "name2") - )) + ), + ignoreTimeAttributes = true) } @Test(expected = classOf[TableException]) def testGetFieldInfoGenericRowAlias(): Unit = { - tEnv.getFieldInfo(genericRowType, Array(UnresolvedFieldReference("first"))) + tEnv.getFieldInfo( + genericRowType, + Array(UnresolvedFieldReference("first")), + ignoreTimeAttributes = true) } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala index e61e190..57ee3b3 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala @@ -208,16 +208,6 @@ class TableEnvironmentITCase( } @Test(expected = classOf[TableException]) - def testToTableWithToFewFields(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env, config) - - CollectionDataSets.get3TupleDataSet(env) - // Must fail. Number of fields does not match. - .toTable(tEnv, 'a, 'b) - } - - @Test(expected = classOf[TableException]) def testToTableWithToManyFields(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala index 0ccb557..71d0002 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala @@ -47,7 +47,7 @@ class WindowAggregateTest extends TableTestBase { batchTableNode(0), term("select", "ts, a, b") ), - term("window", EventTimeTumblingGroupWindow('w$, 'ts, 7200000.millis)), + term("window", TumblingGroupWindow('w$, 'ts, 7200000.millis)), term("select", "SUM(a) AS sumA, COUNT(b) AS cntB") ) @@ -76,7 +76,7 @@ class WindowAggregateTest extends TableTestBase { "DataSetWindowAggregate", batchTableNode(0), term("groupBy", "c"), - term("window", EventTimeTumblingGroupWindow('w$, 'ts, 240000.millis)), + term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)), term("select", "c, SUM(a) AS sumA, MIN(b) AS minB, " + "start('w$) AS w$start, end('w$) AS w$end") ), @@ -106,7 +106,7 @@ class WindowAggregateTest extends TableTestBase { batchTableNode(0), term("select", "ts, b, a") ), - term("window", EventTimeTumblingGroupWindow('w$, 'ts, 240000.millis)), + term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)), term("select", "weightedAvg(b, a) AS wAvg") ) @@ -132,7 +132,7 @@ class WindowAggregateTest extends TableTestBase { term("select", "ts, a, b") ), term("window", - EventTimeSlidingGroupWindow('w$, 'ts, 5400000.millis, 900000.millis)), + SlidingGroupWindow('w$, 'ts, 5400000.millis, 900000.millis)), term("select", "SUM(a) AS sumA, COUNT(b) AS cntB") ) @@ -162,7 +162,7 @@ class WindowAggregateTest extends TableTestBase { batchTableNode(0), term("groupBy", "c, d"), term("window", - EventTimeSlidingGroupWindow('w$, 'ts, 10800000.millis, 3600000.millis)), + SlidingGroupWindow('w$, 'ts, 10800000.millis, 3600000.millis)), term("select", "c, d, SUM(a) AS sumA, AVG(b) AS avgB, " + "start('w$) AS w$start, end('w$) AS w$end") ), @@ -188,7 +188,7 @@ class WindowAggregateTest extends TableTestBase { batchTableNode(0), term("select", "ts") ), - term("window", EventTimeSessionGroupWindow('w$, 'ts, 1800000.millis)), + term("window", SessionGroupWindow('w$, 'ts, 1800000.millis)), term("select", "COUNT(*) AS cnt") ) @@ -217,7 +217,7 @@ class WindowAggregateTest extends TableTestBase { "DataSetWindowAggregate", batchTableNode(0), term("groupBy", "c, d"), - term("window", EventTimeSessionGroupWindow('w$, 'ts, 43200000.millis)), + term("window", SessionGroupWindow('w$, 'ts, 43200000.millis)), term("select", "c, d, SUM(a) AS sumA, MIN(b) AS minB, " + "start('w$) AS w$start, end('w$) AS w$end") ), @@ -249,7 +249,7 @@ class WindowAggregateTest extends TableTestBase { term("select", "ts, c") ), term("groupBy", "c"), - term("window", EventTimeTumblingGroupWindow('w$, 'ts, 240000.millis)), + term("window", TumblingGroupWindow('w$, 'ts, 240000.millis)), term("select", "c, start('w$) AS w$start, end('w$) AS w$end") ), term("select", "CAST(w$end) AS w$end") @@ -304,7 +304,7 @@ class WindowAggregateTest extends TableTestBase { val sql = "SELECT COUNT(*) " + "FROM T " + - "GROUP BY TUMBLE(proctime(), b * INTERVAL '1' MINUTE)" + "GROUP BY TUMBLE(ts, b * INTERVAL '1' MINUTE)" util.verifySql(sql, "n/a") } http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala index 6ebfec0..b484293 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala @@ -19,14 +19,12 @@ package org.apache.flink.table.api.scala.batch.table import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ -import org.apache.flink.table.api.ValidationException -import org.apache.flink.table.expressions.{RowtimeAttribute, Upper, WindowReference} -import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.api.scala.batch.table.FieldProjectionTest._ -import org.apache.flink.table.plan.logical.EventTimeTumblingGroupWindow -import org.apache.flink.table.utils._ -import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.expressions.{Upper, WindowReference} +import org.apache.flink.table.functions.ScalarFunction +import org.apache.flink.table.plan.logical.TumblingGroupWindow import org.apache.flink.table.utils.TableTestUtil._ +import org.apache.flink.table.utils.{TableTestBase, _} import org.junit.Test /** @@ -223,7 +221,8 @@ class FieldProjectionTest extends TableTestBase { @Test def testSelectFromStreamingWindow(): Unit = { - val sourceTable = streamUtil.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd) + val sourceTable = streamUtil + .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime) val resultTable = sourceTable .window(Tumble over 5.millis on 'rowtime as 'w) .groupBy('w) @@ -235,14 +234,14 @@ class FieldProjectionTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "c", "a", "UPPER(c) AS $f2") + term("select", "c", "a", "rowtime", "UPPER(c) AS $f3") ), term("window", - EventTimeTumblingGroupWindow( - WindowReference("w"), - RowtimeAttribute(), + TumblingGroupWindow( + WindowReference("w"), + 'rowtime, 5.millis)), - term("select", "COUNT($f2) AS TMP_0", "SUM(a) AS TMP_1") + term("select", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1") ) streamUtil.verifyTable(resultTable, expected) @@ -250,7 +249,8 @@ class FieldProjectionTest extends TableTestBase { @Test def testSelectFromStreamingGroupedWindow(): Unit = { - val sourceTable = streamUtil.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd) + val sourceTable = streamUtil + .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime) val resultTable = sourceTable .window(Tumble over 5.millis on 'rowtime as 'w) .groupBy('w, 'b) @@ -263,15 +263,15 @@ class FieldProjectionTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "c", "a", "b", "UPPER(c) AS $f3") + term("select", "c", "a", "b", "rowtime", "UPPER(c) AS $f4") ), term("groupBy", "b"), term("window", - EventTimeTumblingGroupWindow( - WindowReference("w"), - RowtimeAttribute(), + TumblingGroupWindow( + WindowReference("w"), + 'rowtime, 5.millis)), - term("select", "b", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1") + term("select", "b", "COUNT($f4) AS TMP_0", "SUM(a) AS TMP_1") ), term("select", "TMP_0", "TMP_1", "b") ) http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala index 8a20f6d..c481105 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala @@ -63,25 +63,24 @@ class GroupWindowTest extends TableTestBase { //=============================================================================================== @Test(expected = classOf[ValidationException]) - def testProcessingTimeTumblingGroupWindowOverTime(): Unit = { + def testInvalidProcessingTimeDefinition(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Tumble over 50.milli as 'w) // require a time attribute - .groupBy('w, 'string) - .select('string, 'int.count) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string) } @Test(expected = classOf[ValidationException]) - def testProcessingTimeTumblingGroupWindowOverCount(): Unit = { + def testInvalidProcessingTimeDefinition2(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + // proctime is not allowed + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime) + } - table - .window(Tumble over 2.rows as 'w) // require a time attribute - .groupBy('w, 'string) - .select('string, 'int.count) + @Test(expected = classOf[ValidationException]) + def testInvalidEventTimeDefinition(): Unit = { + val util = batchTestUtil() + // definition must not extend schema + util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime) } @Test(expected = classOf[ValidationException]) @@ -101,7 +100,7 @@ class GroupWindowTest extends TableTestBase { @Test def testEventTimeTumblingGroupWindowOverCount(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table .window(Tumble over 2.rows on 'long as 'w) @@ -112,7 +111,7 @@ class GroupWindowTest extends TableTestBase { "DataSetWindowAggregate", batchTableNode(0), term("groupBy", "string"), - term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 2.rows)), + term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -135,7 +134,7 @@ class GroupWindowTest extends TableTestBase { "DataSetWindowAggregate", batchTableNode(0), term("groupBy", "string"), - term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 5.milli)), + term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)), term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") ) @@ -145,7 +144,7 @@ class GroupWindowTest extends TableTestBase { @Test def testEventTimeTumblingGroupWindowOverTime(): Unit = { val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) + val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string) val windowedTable = table .window(Tumble over 5.milli on 'long as 'w) @@ -156,7 +155,7 @@ class GroupWindowTest extends TableTestBase { "DataSetWindowAggregate", batchTableNode(0), term("groupBy", "string"), - term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 5.milli)), + term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -164,28 +163,6 @@ class GroupWindowTest extends TableTestBase { } @Test(expected = classOf[ValidationException]) - def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = { - val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Tumble over 50.milli as 'w) // require a time attribute - .groupBy('w) - .select('string, 'int.count) - } - - @Test(expected = classOf[ValidationException]) - def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = { - val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Tumble over 2.rows as 'w) // require a time attribute - .groupBy('w) - .select('int.count) - } - - @Test(expected = classOf[ValidationException]) def testAllTumblingGroupWindowWithInvalidUdAggArgs(): Unit = { val util = batchTestUtil() val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) @@ -216,7 +193,7 @@ class GroupWindowTest extends TableTestBase { batchTableNode(0), term("select", "int", "long") ), - term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 5.milli)), + term("window", TumblingGroupWindow(WindowReference("w"), 'long, 5.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -240,7 +217,7 @@ class GroupWindowTest extends TableTestBase { batchTableNode(0), term("select", "int", "long") ), - term("window", EventTimeTumblingGroupWindow(WindowReference("w"), 'long, 2.rows)), + term("window", TumblingGroupWindow(WindowReference("w"), 'long, 2.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -252,28 +229,6 @@ class GroupWindowTest extends TableTestBase { //=============================================================================================== @Test(expected = classOf[ValidationException]) - def testProcessingTimeSlidingGroupWindowOverTime(): Unit = { - val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Slide over 50.milli every 50.milli as 'w) // require on a time attribute - .groupBy('w, 'string) - .select('string, 'int.count) - } - - @Test(expected = classOf[ValidationException]) - def testProcessingTimeSlidingGroupWindowOverCount(): Unit = { - val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Slide over 10.rows every 5.rows as 'w) // require on a time attribute - .groupBy('w, 'string) - .select('string, 'int.count) - } - - @Test(expected = classOf[ValidationException]) def testSlidingGroupWindowWithInvalidUdAggArgs(): Unit = { val util = batchTestUtil() val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) @@ -302,7 +257,7 @@ class GroupWindowTest extends TableTestBase { batchTableNode(0), term("groupBy", "string"), term("window", - EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)), + SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -324,7 +279,7 @@ class GroupWindowTest extends TableTestBase { batchTableNode(0), term("groupBy", "string"), term("window", - EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)), + SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -348,7 +303,7 @@ class GroupWindowTest extends TableTestBase { batchTableNode(0), term("groupBy", "string"), term("window", - EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)), + SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)), term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") ) @@ -356,17 +311,6 @@ class GroupWindowTest extends TableTestBase { } @Test(expected = classOf[ValidationException]) - def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = { - val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - table - .window(Slide over 2.rows every 1.rows as 'w) // require on a time attribute - .groupBy('w) - .select('int.count) - } - - @Test(expected = classOf[ValidationException]) def testAllSlidingGroupWindowWithInvalidUdAggArgs(): Unit = { val util = batchTestUtil() val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) @@ -374,7 +318,7 @@ class GroupWindowTest extends TableTestBase { val myWeightedAvg = new WeightedAvgWithMerge table - .window(Slide over 2.minutes every 1.minute on 'rowtime as 'w) + .window(Slide over 2.minutes every 1.minute on 'long as 'w) .groupBy('w) // invalid function arguments .select(myWeightedAvg('int, 'string)) @@ -398,7 +342,7 @@ class GroupWindowTest extends TableTestBase { term("select", "int", "long") ), term("window", - EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)), + SlidingGroupWindow(WindowReference("w"), 'long, 8.milli, 10.milli)), term("select", "COUNT(int) AS TMP_0") ) @@ -423,7 +367,7 @@ class GroupWindowTest extends TableTestBase { term("select", "int", "long") ), term("window", - EventTimeSlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)), + SlidingGroupWindow(WindowReference("w"), 'long, 2.rows, 1.rows)), term("select", "COUNT(int) AS TMP_0") ) @@ -448,7 +392,7 @@ class GroupWindowTest extends TableTestBase { "DataSetWindowAggregate", batchTableNode(0), term("groupBy", "string"), - term("window", EventTimeSessionGroupWindow(WindowReference("w"), 'long, 7.milli)), + term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)), term("select", "string", "COUNT(int) AS TMP_0") ) @@ -471,7 +415,7 @@ class GroupWindowTest extends TableTestBase { "DataSetWindowAggregate", batchTableNode(0), term("groupBy", "string"), - term("window", EventTimeSessionGroupWindow(WindowReference("w"), 'long, 7.milli)), + term("window", SessionGroupWindow(WindowReference("w"), 'long, 7.milli)), term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0") ) @@ -479,17 +423,6 @@ class GroupWindowTest extends TableTestBase { } @Test(expected = classOf[ValidationException]) - def testProcessingTimeSessionGroupWindowOverTime(): Unit = { - val util = batchTestUtil() - val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) - - val windowedTable = table - .window(Session withGap 7.milli as 'w) // require on a time attribute - .groupBy('string, 'w) - .select('string, 'int.count) - } - - @Test(expected = classOf[ValidationException]) def testSessionGroupWindowWithInvalidUdAggArgs(): Unit = { val util = batchTestUtil() val table = util.addTable[(Long, Int, String)]('long, 'int, 'string) http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala index 67d13b0..6bab4b3 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala @@ -197,14 +197,14 @@ class SqlITCase extends StreamingWithStateTestBase { // for sum aggregation ensure that every time the order of each element is consistent env.setParallelism(1) - val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c) + val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) tEnv.registerTable("T1", t1) val sqlQuery = "SELECT " + "c, " + - "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " + + "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " + + "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "from T1" val result = tEnv.sql(sqlQuery).toDataStream[Row] @@ -224,13 +224,13 @@ class SqlITCase extends StreamingWithStateTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() - val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c) + val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) tEnv.registerTable("T1", t1) val sqlQuery = "SELECT " + "c, " + - "count(a) OVER (PARTITION BY c ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND " + + "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " + "CURRENT ROW)" + "from T1" @@ -254,14 +254,14 @@ class SqlITCase extends StreamingWithStateTestBase { // for sum aggregation ensure that every time the order of each element is consistent env.setParallelism(1) - val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c) + val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) tEnv.registerTable("T1", t1) val sqlQuery = "SELECT " + "c, " + - "count(a) OVER (ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " + + "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " + + "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "from T1" val result = tEnv.sql(sqlQuery).toDataStream[Row] @@ -281,12 +281,12 @@ class SqlITCase extends StreamingWithStateTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() - val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c) + val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) tEnv.registerTable("T1", t1) val sqlQuery = "SELECT " + - "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + + "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + "from T1" val result = tEnv.sql(sqlQuery).toDataStream[Row] @@ -328,14 +328,14 @@ class SqlITCase extends StreamingWithStateTestBase { val t1 = env .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data)) - .toTable(tEnv).as('a, 'b, 'c) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) tEnv.registerTable("T1", t1) val sqlQuery = "SELECT " + "c, a, " + - "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" + - ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" + + "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" + + ", sum(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" + " from T1" val result = tEnv.sql(sqlQuery).toDataStream[Row] @@ -385,14 +385,14 @@ class SqlITCase extends StreamingWithStateTestBase { val t1 = env .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data)) - .toTable(tEnv).as('a, 'b, 'c) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) tEnv.registerTable("T1", t1) val sqlQuery = "SELECT " + "c, a, " + - "count(a) OVER (ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)," + - "sum(a) OVER (ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" + + "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)," + + "sum(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW)" + "from T1" val result = tEnv.sql(sqlQuery).toDataStream[Row] @@ -453,15 +453,15 @@ class SqlITCase extends StreamingWithStateTestBase { val t1 = env .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data)) - .toTable(tEnv).as('a, 'b, 'c) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) tEnv.registerTable("T1", t1) val sqlQuery = "SELECT " + "c, b, " + - "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " + + "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " + "preceding AND CURRENT ROW)" + - ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " + + ", sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " + " preceding AND CURRENT ROW)" + " from T1" @@ -525,15 +525,15 @@ class SqlITCase extends StreamingWithStateTestBase { val t1 = env .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data)) - .toTable(tEnv).as('a, 'b, 'c) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) tEnv.registerTable("T1", t1) val sqlQuery = "SELECT " + "c, b, " + - "count(a) OVER (ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " + + "count(a) OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " + "preceding AND CURRENT ROW)" + - ", sum(a) OVER (ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " + + ", sum(a) OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND " + " preceding AND CURRENT ROW)" + " from T1" @@ -565,14 +565,14 @@ class SqlITCase extends StreamingWithStateTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) StreamITCase.testResults = mutable.MutableList() - val t1 = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c) + val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) tEnv.registerTable("T1", t1) val sqlQuery = "SELECT " + "c, " + - "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (PARTITION BY b ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " + + "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " + + "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "from T1" val result = tEnv.sql(sqlQuery).toDataStream[Row] @@ -592,15 +592,15 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, b, c, " + "SUM(b) over (" + - "partition by a order by rowtime() rows between unbounded preceding and current row), " + + "partition by a order by rowtime rows between unbounded preceding and current row), " + "count(b) over (" + - "partition by a order by rowtime() rows between unbounded preceding and current row), " + + "partition by a order by rowtime rows between unbounded preceding and current row), " + "avg(b) over (" + - "partition by a order by rowtime() rows between unbounded preceding and current row), " + + "partition by a order by rowtime rows between unbounded preceding and current row), " + "max(b) over (" + - "partition by a order by rowtime() rows between unbounded preceding and current row), " + + "partition by a order by rowtime rows between unbounded preceding and current row), " + "min(b) over (" + - "partition by a order by rowtime() rows between unbounded preceding and current row) " + + "partition by a order by rowtime rows between unbounded preceding and current row) " + "from T1" val data = Seq( @@ -632,7 +632,7 @@ class SqlITCase extends StreamingWithStateTestBase { ) val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data)) - .toTable(tEnv).as('a, 'b, 'c) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) tEnv.registerTable("T1", t1) @@ -670,15 +670,15 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, b, c, " + "SUM(b) over (" + - "partition by a order by rowtime() rows between unbounded preceding and current row), " + + "partition by a order by rowtime rows between unbounded preceding and current row), " + "count(b) over (" + - "partition by a order by rowtime() rows between unbounded preceding and current row), " + + "partition by a order by rowtime rows between unbounded preceding and current row), " + "avg(b) over (" + - "partition by a order by rowtime() rows between unbounded preceding and current row), " + + "partition by a order by rowtime rows between unbounded preceding and current row), " + "max(b) over (" + - "partition by a order by rowtime() rows between unbounded preceding and current row), " + + "partition by a order by rowtime rows between unbounded preceding and current row), " + "min(b) over (" + - "partition by a order by rowtime() rows between unbounded preceding and current row) " + + "partition by a order by rowtime rows between unbounded preceding and current row) " + "from T1" val data = Seq( @@ -702,7 +702,7 @@ class SqlITCase extends StreamingWithStateTestBase { ) val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data)) - .toTable(tEnv).as('a, 'b, 'c) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) tEnv.registerTable("T1", t1) @@ -740,11 +740,11 @@ class SqlITCase extends StreamingWithStateTestBase { env.setParallelism(1) val sqlQuery = "SELECT a, b, c, " + - "SUM(b) over (order by rowtime() rows between unbounded preceding and current row), " + - "count(b) over (order by rowtime() rows between unbounded preceding and current row), " + - "avg(b) over (order by rowtime() rows between unbounded preceding and current row), " + - "max(b) over (order by rowtime() rows between unbounded preceding and current row), " + - "min(b) over (order by rowtime() rows between unbounded preceding and current row) " + + "SUM(b) over (order by rowtime rows between unbounded preceding and current row), " + + "count(b) over (order by rowtime rows between unbounded preceding and current row), " + + "avg(b) over (order by rowtime rows between unbounded preceding and current row), " + + "max(b) over (order by rowtime rows between unbounded preceding and current row), " + + "min(b) over (order by rowtime rows between unbounded preceding and current row) " + "from T1" val data = Seq( @@ -764,7 +764,7 @@ class SqlITCase extends StreamingWithStateTestBase { ) val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data)) - .toTable(tEnv).as('a, 'b, 'c) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) tEnv.registerTable("T1", t1) @@ -795,11 +795,11 @@ class SqlITCase extends StreamingWithStateTestBase { env.setParallelism(1) val sqlQuery = "SELECT a, b, c, " + - "SUM(b) over (order by rowtime() rows between unbounded preceding and current row), " + - "count(b) over (order by rowtime() rows between unbounded preceding and current row), " + - "avg(b) over (order by rowtime() rows between unbounded preceding and current row), " + - "max(b) over (order by rowtime() rows between unbounded preceding and current row), " + - "min(b) over (order by rowtime() rows between unbounded preceding and current row) " + + "SUM(b) over (order by rowtime rows between unbounded preceding and current row), " + + "count(b) over (order by rowtime rows between unbounded preceding and current row), " + + "avg(b) over (order by rowtime rows between unbounded preceding and current row), " + + "max(b) over (order by rowtime rows between unbounded preceding and current row), " + + "min(b) over (order by rowtime rows between unbounded preceding and current row) " + "from T1" val data = Seq( @@ -820,7 +820,7 @@ class SqlITCase extends StreamingWithStateTestBase { ) val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data)) - .toTable(tEnv).as('a, 'b, 'c) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) tEnv.registerTable("T1", t1) @@ -852,11 +852,11 @@ class SqlITCase extends StreamingWithStateTestBase { env.setParallelism(1) val sqlQuery = "SELECT a, b, c, " + - "SUM(b) over (order by rowtime() range between unbounded preceding and current row), " + - "count(b) over (order by rowtime() range between unbounded preceding and current row), " + - "avg(b) over (order by rowtime() range between unbounded preceding and current row), " + - "max(b) over (order by rowtime() range between unbounded preceding and current row), " + - "min(b) over (order by rowtime() range between unbounded preceding and current row) " + + "SUM(b) over (order by rowtime range between unbounded preceding and current row), " + + "count(b) over (order by rowtime range between unbounded preceding and current row), " + + "avg(b) over (order by rowtime range between unbounded preceding and current row), " + + "max(b) over (order by rowtime range between unbounded preceding and current row), " + + "min(b) over (order by rowtime range between unbounded preceding and current row) " + "from T1" val data = Seq( @@ -878,7 +878,7 @@ class SqlITCase extends StreamingWithStateTestBase { ) val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data)) - .toTable(tEnv).as('a, 'b, 'c) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) tEnv.registerTable("T1", t1) @@ -916,15 +916,15 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, b, c, " + "SUM(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row), " + + "partition by a order by rowtime range between unbounded preceding and current row), " + "count(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row), " + + "partition by a order by rowtime range between unbounded preceding and current row), " + "avg(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row), " + + "partition by a order by rowtime range between unbounded preceding and current row), " + "max(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row), " + + "partition by a order by rowtime range between unbounded preceding and current row), " + "min(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row) " + + "partition by a order by rowtime range between unbounded preceding and current row) " + "from T1" val data = Seq( @@ -946,7 +946,7 @@ class SqlITCase extends StreamingWithStateTestBase { ) val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data)) - .toTable(tEnv).as('a, 'b, 'c) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) tEnv.registerTable("T1", t1) @@ -981,14 +981,15 @@ class SqlITCase extends StreamingWithStateTestBase { env.setParallelism(1) StreamITCase.testResults = mutable.MutableList() - val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) tEnv.registerTable("MyTable", t) val sqlQuery = "SELECT a, " + " SUM(c) OVER (" + - " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " + + " PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " + " MIN(c) OVER (" + - " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " + + " PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " + " FROM MyTable" val result = tEnv.sql(sqlQuery).toDataStream[Row] @@ -1023,14 +1024,15 @@ class SqlITCase extends StreamingWithStateTestBase { env.setParallelism(1) StreamITCase.testResults = mutable.MutableList() - val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) tEnv.registerTable("MyTable", t) val sqlQuery = "SELECT a, " + " SUM(c) OVER (" + - " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC , " + + " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC , " + " MIN(c) OVER (" + - " PARTITION BY a ORDER BY procTime() ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " + + " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " + " FROM MyTable" val result = tEnv.sql(sqlQuery).toDataStream[Row] @@ -1066,14 +1068,15 @@ class SqlITCase extends StreamingWithStateTestBase { env.setParallelism(1) StreamITCase.testResults = mutable.MutableList() - val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) tEnv.registerTable("MyTable", t) val sqlQuery = "SELECT a, " + " SUM(c) OVER (" + - " ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " + + " ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumC , " + " MIN(c) OVER (" + - " ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " + + " ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minC " + " FROM MyTable" val result = tEnv.sql(sqlQuery).toDataStream[Row] @@ -1108,14 +1111,15 @@ class SqlITCase extends StreamingWithStateTestBase { env.setParallelism(1) StreamITCase.testResults = mutable.MutableList() - val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) tEnv.registerTable("MyTable", t) val sqlQuery = "SELECT a, " + " SUM(c) OVER (" + - " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " + + " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " + " MIN(c) OVER (" + - " ORDER BY procTime() ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " + + " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " + " FROM MyTable" val result = tEnv.sql(sqlQuery).toDataStream[Row] result.addSink(new StreamITCase.StringSink) http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index cef2665..edf7b1d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -21,22 +21,23 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge import org.apache.flink.table.api.scala._ -import org.apache.flink.table.plan.logical.{EventTimeTumblingGroupWindow, ProcessingTimeSessionGroupWindow, ProcessingTimeSlidingGroupWindow} +import org.apache.flink.table.plan.logical._ import org.apache.flink.table.utils.TableTestUtil._ import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} -import org.junit.Test +import org.junit.{Ignore, Test} class WindowAggregateTest extends TableTestBase { private val streamUtil: StreamTableTestUtil = streamTestUtil() - streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c) + streamUtil.addTable[(Int, String, Long)]( + "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime) @Test def testNonPartitionedProcessingTimeBoundedWindow() = { - val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" + - "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA FROM MyTable" - - val expected = + val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY proctime " + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " + + "FROM MyTable" + val expected = unaryNode( "DataStreamCalc", unaryNode( @@ -44,11 +45,11 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") + term("select", "a", "c", "proctime") ), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0") + term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0") ), term("select", "a", "w0$o0 AS $1") ) @@ -59,7 +60,7 @@ class WindowAggregateTest extends TableTestBase { @Test def testPartitionedProcessingTimeBoundedWindow() = { - val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" + + val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY proctime " + "RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " + "FROM MyTable" val expected = @@ -70,12 +71,12 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") + term("select", "a", "c", "proctime") ), term("partitionBy","a"), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1") + term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0", "$SUM0(c) AS w0$o1") ), term("select", "a", "/(CASE(>(w0$o0, 0)", "CAST(w0$o1), null), w0$o0) AS avgA") ) @@ -84,23 +85,24 @@ class WindowAggregateTest extends TableTestBase { } @Test + @Ignore // TODO enable once CALCITE-1761 is fixed def testTumbleFunction() = { streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge) val sql = "SELECT " + " COUNT(*), weightedAvg(c, a) AS wAvg, " + - " TUMBLE_START(rowtime(), INTERVAL '15' MINUTE), " + - " TUMBLE_END(rowtime(), INTERVAL '15' MINUTE)" + + " TUMBLE_START(rowtime, INTERVAL '15' MINUTE), " + + " TUMBLE_END(rowtime, INTERVAL '15' MINUTE)" + "FROM MyTable " + - "GROUP BY TUMBLE(rowtime(), INTERVAL '15' MINUTE)" + "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)" val expected = unaryNode( "DataStreamCalc", unaryNode( "DataStreamAggregate", streamTableNode(0), - term("window", EventTimeTumblingGroupWindow('w$, 'rowtime, 900000.millis)), + term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)), term("select", "COUNT(*) AS EXPR$0, " + "weightedAvg(c, a) AS wAvg, " + @@ -113,23 +115,23 @@ class WindowAggregateTest extends TableTestBase { } @Test + @Ignore // TODO enable once CALCITE-1761 is fixed def testHoppingFunction() = { streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge) val sql = "SELECT COUNT(*), weightedAvg(c, a) AS wAvg, " + - " HOP_START(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR), " + - " HOP_END(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR) " + + " HOP_START(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR), " + + " HOP_END(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR) " + "FROM MyTable " + - "GROUP BY HOP(proctime(), INTERVAL '15' MINUTE, INTERVAL '1' HOUR)" + "GROUP BY HOP(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR)" val expected = unaryNode( "DataStreamCalc", unaryNode( "DataStreamAggregate", streamTableNode(0), - term("window", ProcessingTimeSlidingGroupWindow('w$, - 3600000.millis, 900000.millis)), + term("window", SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 900000.millis)), term("select", "COUNT(*) AS EXPR$0, " + "weightedAvg(c, a) AS wAvg, " + @@ -142,23 +144,24 @@ class WindowAggregateTest extends TableTestBase { } @Test + @Ignore // TODO enable once CALCITE-1761 is fixed def testSessionFunction() = { streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge) val sql = "SELECT " + " COUNT(*), weightedAvg(c, a) AS wAvg, " + - " SESSION_START(proctime(), INTERVAL '15' MINUTE), " + - " SESSION_END(proctime(), INTERVAL '15' MINUTE) " + + " SESSION_START(proctime, INTERVAL '15' MINUTE), " + + " SESSION_END(proctime, INTERVAL '15' MINUTE) " + "FROM MyTable " + - "GROUP BY SESSION(proctime(), INTERVAL '15' MINUTE)" + "GROUP BY SESSION(proctime, INTERVAL '15' MINUTE)" val expected = unaryNode( "DataStreamCalc", unaryNode( "DataStreamAggregate", streamTableNode(0), - term("window", ProcessingTimeSessionGroupWindow('w$, 900000.millis)), + term("window", SessionGroupWindow('w$, 'rowtime, 900000.millis)), term("select", "COUNT(*) AS EXPR$0, " + "weightedAvg(c, a) AS wAvg, " + @@ -175,7 +178,7 @@ class WindowAggregateTest extends TableTestBase { val sqlQuery = "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " + "FROM MyTable " + - "GROUP BY TUMBLE(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')" + "GROUP BY TUMBLE(proctime, INTERVAL '2' HOUR, TIME '10:00:00')" streamUtil.verifySql(sqlQuery, "n/a") } @@ -185,7 +188,7 @@ class WindowAggregateTest extends TableTestBase { val sqlQuery = "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " + "FROM MyTable " + - "GROUP BY HOP(proctime(), INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')" + "GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')" streamUtil.verifySql(sqlQuery, "n/a") } @@ -195,21 +198,21 @@ class WindowAggregateTest extends TableTestBase { val sqlQuery = "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " + "FROM MyTable " + - "GROUP BY SESSION(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')" + "GROUP BY SESSION(proctime, INTERVAL '2' HOUR, TIME '10:00:00')" streamUtil.verifySql(sqlQuery, "n/a") } @Test(expected = classOf[TableException]) def testVariableWindowSize() = { - val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime(), c * INTERVAL '1' MINUTE)" + val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, c * INTERVAL '1' MINUTE)" streamUtil.verifySql(sql, "n/a") } @Test(expected = classOf[TableException]) def testMultiWindow() = { val sql = "SELECT COUNT(*) FROM MyTable GROUP BY " + - "FLOOR(rowtime() TO HOUR), FLOOR(rowtime() TO MINUTE)" + "FLOOR(rowtime TO HOUR), FLOOR(rowtime TO MINUTE)" val expected = "" streamUtil.verifySql(sql, expected) } @@ -237,8 +240,8 @@ class WindowAggregateTest extends TableTestBase { def testUnboundPartitionedProcessingWindowWithRange() = { val sql = "SELECT " + "c, " + - "count(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (PARTITION BY c ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " + + "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " + + "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "from MyTable" val expected = @@ -249,12 +252,12 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") + term("select", "a", "c", "proctime") ), term("partitionBy", "c"), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1") + term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1") ), term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2") ) @@ -265,7 +268,7 @@ class WindowAggregateTest extends TableTestBase { def testUnboundPartitionedProcessingWindowWithRow() = { val sql = "SELECT " + "c, " + - "count(a) OVER (PARTITION BY c ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND " + + "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " + "CURRENT ROW) as cnt1 " + "from MyTable" @@ -274,15 +277,11 @@ class WindowAggregateTest extends TableTestBase { "DataStreamCalc", unaryNode( "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") - ), + streamTableNode(0), term("partitionBy", "c"), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") + term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0") ), term("select", "c", "w0$o0 AS $1") ) @@ -293,8 +292,8 @@ class WindowAggregateTest extends TableTestBase { def testUnboundNonPartitionedProcessingWindowWithRange() = { val sql = "SELECT " + "c, " + - "count(a) OVER (ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (ORDER BY ProcTime() RANGE UNBOUNDED preceding) as cnt2 " + + "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " + + "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "from MyTable" val expected = @@ -305,11 +304,11 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") + term("select", "a", "c", "proctime") ), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1") + term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1") ), term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2") ) @@ -320,7 +319,7 @@ class WindowAggregateTest extends TableTestBase { def testUnboundNonPartitionedProcessingWindowWithRow() = { val sql = "SELECT " + "c, " + - "count(a) OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND " + + "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " + "CURRENT ROW) as cnt1 " + "from MyTable" @@ -329,14 +328,10 @@ class WindowAggregateTest extends TableTestBase { "DataStreamCalc", unaryNode( "DataStreamOverAggregate", - unaryNode( - "DataStreamCalc", - streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") - ), - term("orderBy", "PROCTIME"), + streamTableNode(0), + term("orderBy", "proctime"), term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") + term("select", "a", "b", "c", "proctime", "rowtime", "COUNT(a) AS w0$o0") ), term("select", "c", "w0$o0 AS $1") ) @@ -347,8 +342,8 @@ class WindowAggregateTest extends TableTestBase { def testUnboundNonPartitionedEventTimeWindowWithRange() = { val sql = "SELECT " + "c, " + - "count(a) OVER (ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt2 " + + "count(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " + + "sum(a) OVER (ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " + "from MyTable" val expected = @@ -359,11 +354,11 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "ROWTIME() AS $2") + term("select", "a", "c", "rowtime") ), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1") + term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1") ), term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2") ) @@ -374,8 +369,8 @@ class WindowAggregateTest extends TableTestBase { def testUnboundPartitionedEventTimeWindowWithRange() = { val sql = "SELECT " + "c, " + - "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE UNBOUNDED preceding) as cnt2 " + + "count(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt1, " + + "sum(a) OVER (PARTITION BY c ORDER BY rowtime RANGE UNBOUNDED preceding) as cnt2 " + "from MyTable" val expected = @@ -386,12 +381,12 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "ROWTIME() AS $2") + term("select", "a", "c", "rowtime") ), term("partitionBy", "c"), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1") + term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0", "$SUM0(a) AS w0$o1") ), term("select", "c", "w0$o0 AS cnt1", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS cnt2") ) @@ -402,7 +397,7 @@ class WindowAggregateTest extends TableTestBase { def testBoundPartitionedRowTimeWindowWithRow() = { val sql = "SELECT " + "c, " + - "count(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " + + "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " + "CURRENT ROW) as cnt1 " + "from MyTable" @@ -414,12 +409,12 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "ROWTIME() AS $2") + term("select", "a", "c", "rowtime") ), term("partitionBy", "c"), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0") + term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0") ), term("select", "c", "w0$o0 AS $1") ) @@ -430,7 +425,7 @@ class WindowAggregateTest extends TableTestBase { def testBoundNonPartitionedRowTimeWindowWithRow() = { val sql = "SELECT " + "c, " + - "count(a) OVER (ORDER BY RowTime() ROWS BETWEEN 5 preceding AND " + + "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " + "CURRENT ROW) as cnt1 " + "from MyTable" @@ -442,11 +437,11 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "ROWTIME() AS $2") + term("select", "a", "c", "rowtime") ), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("rows", "BETWEEN 5 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0") + term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0") ), term("select", "c", "w0$o0 AS $1") ) @@ -457,7 +452,7 @@ class WindowAggregateTest extends TableTestBase { def testBoundPartitionedRowTimeWindowWithRange() = { val sql = "SELECT " + "c, " + - "count(a) OVER (PARTITION BY c ORDER BY RowTime() " + + "count(a) OVER (PARTITION BY c ORDER BY rowtime " + "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " + "from MyTable" @@ -469,12 +464,12 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "ROWTIME() AS $2") + term("select", "a", "c", "rowtime") ), term("partitionBy", "c"), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0") + term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0") ), term("select", "c", "w0$o0 AS $1") ) @@ -485,7 +480,7 @@ class WindowAggregateTest extends TableTestBase { def testBoundNonPartitionedRowTimeWindowWithRange() = { val sql = "SELECT " + "c, " + - "count(a) OVER (ORDER BY RowTime() " + + "count(a) OVER (ORDER BY rowtime " + "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " + "from MyTable" @@ -497,11 +492,11 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "ROWTIME() AS $2") + term("select", "a", "c", "rowtime") ), - term("orderBy", "ROWTIME"), + term("orderBy", "rowtime"), term("range", "BETWEEN 1000 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0") + term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0") ), term("select", "c", "w0$o0 AS $1") ) @@ -512,7 +507,7 @@ class WindowAggregateTest extends TableTestBase { def testBoundNonPartitionedProcTimeWindowWithRowRange() = { val sql = "SELECT " + "c, " + - "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " + + "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " + "CURRENT ROW) as cnt1 " + "from MyTable" @@ -524,11 +519,11 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") + term("select", "a", "c", "proctime") ), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") + term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0") ), term("select", "c", "w0$o0 AS $1") ) @@ -539,7 +534,7 @@ class WindowAggregateTest extends TableTestBase { def testBoundPartitionedProcTimeWindowWithRowRange() = { val sql = "SELECT " + "c, " + - "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 preceding AND " + + "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " + "CURRENT ROW) as cnt1 " + "from MyTable" @@ -551,12 +546,12 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "c", "PROCTIME() AS $2") + term("select", "a", "c", "proctime") ), term("partitionBy", "c"), - term("orderBy", "PROCTIME"), + term("orderBy", "proctime"), term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") + term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0") ), term("select", "c", "w0$o0 AS $1") ) http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala index 3651749..4a6a616 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala @@ -57,13 +57,13 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { StreamITCase.testResults = mutable.MutableList() val stream = env.fromCollection(data) - val table = stream.toTable(tEnv, 'long, 'int, 'string) + val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime) val countFun = new CountAggFunction val weightAvgFun = new WeightedAvg val windowedTable = table - .window(Slide over 2.rows every 1.rows as 'w) + .window(Slide over 2.rows every 1.rows on 'proctime as 'w) .groupBy('w, 'string) .select('string, countFun('int), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int)) @@ -102,7 +102,7 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(sessionWindowTestdata) .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(10L)) - val table = stream.toTable(tEnv, 'long, 'int, 'string) + val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime) val windowedTable = table .window(Session withGap 5.milli on 'rowtime as 'w) @@ -126,12 +126,12 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { StreamITCase.testResults = mutable.MutableList() val stream = env.fromCollection(data) - val table = stream.toTable(tEnv, 'long, 'int, 'string) + val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime) val countFun = new CountAggFunction val weightAvgFun = new WeightedAvg val windowedTable = table - .window(Tumble over 2.rows as 'w) + .window(Tumble over 2.rows on 'proctime as 'w) .groupBy('w) .select(countFun('string), 'int.avg, weightAvgFun('long, 'int), weightAvgFun('int, 'int)) @@ -154,7 +154,7 @@ class AggregationsITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset(0L)) - val table = stream.toTable(tEnv, 'long, 'int, 'string) + val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime) val countFun = new CountAggFunction val weightAvgFun = new WeightedAvg http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala index 5969e91..1114cf0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala @@ -111,22 +111,6 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { } @Test(expected = classOf[TableException]) - def testAsWithToFewFields(): Unit = { - - val env = StreamExecutionEnvironment.getExecutionEnvironment - val tEnv = TableEnvironment.getTableEnvironment(env) - StreamITCase.testResults = mutable.MutableList() - val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b) - - val results = ds.toDataStream[Row] - results.addSink(new StreamITCase.StringSink) - env.execute() - - val expected = mutable.MutableList("no") - assertEquals(expected.sorted, StreamITCase.testResults.sorted) - } - - @Test(expected = classOf[TableException]) def testAsWithToManyFields(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment