Repository: flink Updated Branches: refs/heads/master 89f0ad90b -> 6b69c588d
[FLINK-6760] [table] Fix OverWindowTest alias test error This closes #4007. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b69c588 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b69c588 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b69c588 Branch: refs/heads/master Commit: 6b69c588df866c7b1694a58a433f7957bee456c6 Parents: 89f0ad9 Author: sunjincheng121 <sunjincheng...@gmail.com> Authored: Mon May 29 19:10:58 2017 +0800 Committer: twalthr <twal...@apache.org> Committed: Mon May 29 15:41:30 2017 +0200 ---------------------------------------------------------------------- .../api/scala/stream/sql/OverWindowITCase.scala | 32 +++++++++----------- .../api/scala/stream/sql/OverWindowTest.scala | 25 +++++++-------- 2 files changed, 28 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6b69c588/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala index 7ba5c16..36eff1e 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala @@ -63,8 +63,8 @@ class OverWindowITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT " + "c, " + - "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + + "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " + + "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " + "from T1" val result = tEnv.sql(sqlQuery).toAppendStream[Row] @@ -87,9 +87,9 @@ class OverWindowITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, " + " SUM(c) OVER (" + - " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS sumC, " + + " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " + " MIN(c) OVER (" + - " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS minC " + + " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " + "FROM MyTable" val result = tEnv.sql(sqlQuery).toAppendStream[Row] @@ -130,9 +130,9 @@ class OverWindowITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, " + " SUM(c) OVER (" + - " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sumC , " + + " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " + " MIN(c) OVER (" + - " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS minC " + + " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " + "FROM MyTable" val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) @@ -173,8 +173,8 @@ class OverWindowITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT " + "c, " + - "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + + "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " + + "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " + "from T1" val result = tEnv.sql(sqlQuery).toAppendStream[Row] @@ -198,11 +198,12 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) - val sqlQuery = "SELECT " + + val sqlQuery = "SELECT c, cnt1 from " + + "(SELECT " + "c, " + "count(a) " + - " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + - "from T1" + " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " + + "as cnt1 from T1)" val result = tEnv.sql(sqlQuery).toAppendStream[Row] result.addSink(new StreamITCase.StringSink) @@ -230,8 +231,8 @@ class OverWindowITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT " + "c, " + - "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " + - "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + + "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding), " + + "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " + "from T1" val result = tEnv.sql(sqlQuery).toAppendStream[Row] @@ -256,7 +257,7 @@ class OverWindowITCase extends StreamingWithStateTestBase { tEnv.registerTable("T1", t1) val sqlQuery = "SELECT " + - "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + + "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " + "from T1" val result = tEnv.sql(sqlQuery).toAppendStream[Row] @@ -777,9 +778,6 @@ class OverWindowITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } -// <<<<<<< HEAD - - /** test sliding event-time unbounded window with partition by **/ @Test def testRowTimeUnBoundedPartitionedRowsOver2(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/6b69c588/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala index 711b31b..a79d48f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowTest.scala @@ -35,7 +35,9 @@ class OverWindowTest extends TableTestBase { val sql = "SELECT " + "c, " + "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " + - "CURRENT ROW) as cnt1 " + + "CURRENT ROW) as cnt1, " + + "sum(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " + + "CURRENT ROW) as sum1 " + "from MyTable" val expected = @@ -51,9 +53,9 @@ class OverWindowTest extends TableTestBase { term("partitionBy", "c"), term("orderBy", "proctime"), term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), - term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0") + term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1") ), - term("select", "c", "w0$o0 AS $1") + term("select", "c", "w0$o0 AS cnt1, CASE(>(w0$o0, 0), CAST(w0$o1), null) AS sum1") ) streamUtil.verifySql(sql, expected) } @@ -101,8 +103,8 @@ class OverWindowTest extends TableTestBase { val sqlQuery = "SELECT a, " + " COUNT(c) OVER (ORDER BY proctime " + - " RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS countA " + - "FROM MyTable" + " RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) " + + "FROM MyTable" val expected = unaryNode( @@ -129,7 +131,7 @@ class OverWindowTest extends TableTestBase { val sql = "SELECT " + "c, " + "count(a) OVER (ORDER BY proctime ROWS BETWEEN 2 preceding AND " + - "CURRENT ROW) as cnt1 " + + "CURRENT ROW)" + "from MyTable" val expected = @@ -185,7 +187,7 @@ class OverWindowTest extends TableTestBase { val sql = "SELECT " + "c, " + "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " + - "CURRENT ROW) as cnt1 " + + "CURRENT ROW) " + "from MyTable" val expected = @@ -236,7 +238,7 @@ class OverWindowTest extends TableTestBase { val sql = "SELECT " + "c, " + "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND " + - "CURRENT ROW) as cnt1 " + + "CURRENT ROW) " + "from MyTable" val expected = @@ -259,7 +261,7 @@ class OverWindowTest extends TableTestBase { val sql = "SELECT " + "c, " + "count(a) OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 5 preceding AND " + - "CURRENT ROW) as cnt1 " + + "CURRENT ROW) " + "from MyTable" val expected = @@ -287,7 +289,7 @@ class OverWindowTest extends TableTestBase { val sql = "SELECT " + "c, " + "count(a) OVER (PARTITION BY c ORDER BY rowtime " + - "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) as cnt1 " + + "RANGE BETWEEN INTERVAL '1' SECOND preceding AND CURRENT ROW) " + "from MyTable" val expected = @@ -315,7 +317,7 @@ class OverWindowTest extends TableTestBase { val sql = "SELECT " + "c, " + "count(a) OVER (ORDER BY rowtime ROWS BETWEEN 5 preceding AND " + - "CURRENT ROW) as cnt1 " + + "CURRENT ROW) " + "from MyTable" val expected = @@ -499,5 +501,4 @@ class OverWindowTest extends TableTestBase { ) streamUtil.verifySql(sql, expected) } - }