This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 85d17b13b5da3a6627f81096ff382580134516ac
Author: Benoit Hanotte <b.hano...@criteo.com>
AuthorDate: Tue Jan 14 11:19:09 2020 +0100

    [FLINK-15577][table-planner-blink] Add different windows tests to blink 
planner
    
    The Blink planner doesn't seem to be subject to the bug described in
    FLINK-15577. For safety, we also add the tests to ensure no regression
    is possible that would introduce the issue in the Blink planner.
    
    (cherry picked from commit fdc10141418205c520ee4667285ed857d92c3740)
---
 .../plan/batch/sql/agg/WindowAggregateTest.xml     | 148 +++++++++++++++++++++
 .../plan/stream/sql/agg/WindowAggregateTest.xml    |  89 +++++++++++++
 .../planner/plan/stream/table/GroupWindowTest.xml  |  25 ++++
 .../plan/batch/sql/agg/WindowAggregateTest.scala   |  27 ++++
 .../plan/stream/sql/agg/WindowAggregateTest.scala  |  27 ++++
 .../plan/stream/table/GroupWindowTest.scala        |  31 ++++-
 6 files changed, 345 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
index b91d909..07b7b71 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
@@ -111,6 +111,154 @@ Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS 
EXPR$0, CAST(/(-($f0,
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=AUTO]">
+    <Resource name="sql">
+      <![CDATA[
+WITH window_1h AS (
+    SELECT 1
+    FROM MyTable2
+    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
+),
+
+window_2h AS (
+    SELECT 1
+    FROM MyTable2
+    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
+)
+
+(SELECT * FROM window_1h)
+UNION ALL
+(SELECT * FROM window_2h)
+]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(EXPR$0=[1])
+:  +- LogicalAggregate(group=[{0}])
+:     +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL 
HOUR)])
+:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
++- LogicalProject(EXPR$0=[1])
+   +- LogicalAggregate(group=[{0}])
+      +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL 
HOUR)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Union(all=[true], union=[EXPR$0])
+:- Calc(select=[1 AS EXPR$0])
+:  +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 
3600000)], select=[])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
3600000, 3600000)], select=[])
+:           +- Calc(select=[ts], reuse_id=[1])
+:              +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
++- Calc(select=[1 AS EXPR$0])
+   +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 
3600000)], select=[])
+      +- Exchange(distribution=[single])
+         +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
7200000, 3600000)], select=[])
+            +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testWindowAggregateWithDifferentWindows[aggStrategy=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+WITH window_1h AS (
+    SELECT 1
+    FROM MyTable2
+    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
+),
+
+window_2h AS (
+    SELECT 1
+    FROM MyTable2
+    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
+)
+
+(SELECT * FROM window_1h)
+UNION ALL
+(SELECT * FROM window_2h)
+]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(EXPR$0=[1])
+:  +- LogicalAggregate(group=[{0}])
+:     +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL 
HOUR)])
+:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
++- LogicalProject(EXPR$0=[1])
+   +- LogicalAggregate(group=[{0}])
+      +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL 
HOUR)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Union(all=[true], union=[EXPR$0])
+:- Calc(select=[1 AS EXPR$0])
+:  +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 
3600000)], select=[])
+:     +- Exchange(distribution=[single], reuse_id=[1])
+:        +- Calc(select=[ts])
+:           +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
++- Calc(select=[1 AS EXPR$0])
+   +- SortWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 
3600000)], select=[])
+      +- Sort(orderBy=[ts ASC])
+         +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testWindowAggregateWithDifferentWindows[aggStrategy=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+WITH window_1h AS (
+    SELECT 1
+    FROM MyTable2
+    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
+),
+
+window_2h AS (
+    SELECT 1
+    FROM MyTable2
+    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
+)
+
+(SELECT * FROM window_1h)
+UNION ALL
+(SELECT * FROM window_2h)
+]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(EXPR$0=[1])
+:  +- LogicalAggregate(group=[{0}])
+:     +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL 
HOUR)])
+:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
++- LogicalProject(EXPR$0=[1])
+   +- LogicalAggregate(group=[{0}])
+      +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL 
HOUR)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Union(all=[true], union=[EXPR$0])
+:- Calc(select=[1 AS EXPR$0])
+:  +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 
3600000)], select=[])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
3600000, 3600000)], select=[])
+:           +- Calc(select=[ts], reuse_id=[1])
+:              +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
++- Calc(select=[1 AS EXPR$0])
+   +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 
3600000)], select=[])
+      +- Exchange(distribution=[single])
+         +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
7200000, 3600000)], select=[])
+            +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testExpressionOnWindowHavingFunction[aggStrategy=AUTO]">
     <Resource name="sql">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index bf0f787..db86af3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -48,6 +48,53 @@ Calc(select=[/(-($f0, /(*($f1, $f1), $f2)), $f2) AS EXPR$0, 
/(-($f0, /(*($f1, $f
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testWindowAggregateWithDifferentWindows">
+    <Resource name="sql">
+      <![CDATA[
+WITH window_1h AS (
+    SELECT 1
+    FROM MyTable
+    GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
+),
+
+window_2h AS (
+    SELECT 1
+    FROM MyTable
+    GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
+)
+
+(SELECT * FROM window_1h)
+UNION ALL
+(SELECT * FROM window_2h)
+]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(EXPR$0=[1])
+:  +- LogicalAggregate(group=[{0}])
+:     +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL 
HOUR)])
+:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
++- LogicalProject(EXPR$0=[1])
+   +- LogicalAggregate(group=[{0}])
+      +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL 
HOUR)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Union(all=[true], union=[EXPR$0])
+:- Calc(select=[1 AS EXPR$0])
+:  +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 3600000, 
3600000)], select=[])
+:     +- Exchange(distribution=[single], reuse_id=[1])
+:        +- Calc(select=[rowtime])
+:           +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
++- Calc(select=[1 AS EXPR$0])
+   +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 7200000, 
3600000)], select=[])
+      +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testExpressionOnWindowAuxFunction">
     <Resource name="sql">
       <![CDATA[
@@ -591,4 +638,46 @@ GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 
1000)], select=[SUM(a
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testWindowAggregateWithDifferentWindows">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  SUM(correct) AS s,
+  AVG(correct) AS a,
+  TUMBLE_START(rowtime, INTERVAL '15' MINUTE) AS wStart
+FROM (
+  SELECT CASE a
+      WHEN 1 THEN 1
+      ELSE 99
+    END AS correct, rowtime
+  FROM MyTable
+)
+GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
+   +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], 
correct=[CASE(=($0, 1), 1, 99)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Union(all=[true], union=[EXPR$0])
+:- Calc(select=[1 AS EXPR$0])
+:  +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 
3600000)], select=[])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
3600000, 3600000)], select=[])
+:           +- Calc(select=[ts], reuse_id=[1])
+:              +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
++- Calc(select=[1 AS EXPR$0])
+   +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 
3600000)], select=[])
+      +- Exchange(distribution=[single])
+         +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
7200000, 3600000)], select=[])
+            +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
index 165c523..36a9d63 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml
@@ -194,6 +194,31 @@ GroupWindowAggregate(groupBy=[string], 
window=[SessionGroupWindow('w, rowtime, 7
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testWindowAggregateWithDifferentWindows">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(_c0=[AS(1, _UTF-16LE'_c0')])
+:  +- LogicalWindowTableAggregate(group=[{}], 
tableAggregate=[[EmptyTableAggFunc($1, $2)]], window=[SlidingGroupWindow('w1, 
ts, 3600000, 3600000)], properties=[])
+:     +- LogicalTableScan(table=[[default_catalog, default_database, Table1, 
source: [TestTableSource(ts, a, b)]]])
++- LogicalProject(_c0=[AS(1, _UTF-16LE'_c0')])
+   +- LogicalWindowTableAggregate(group=[{}], 
tableAggregate=[[EmptyTableAggFunc($1, $2)]], window=[SlidingGroupWindow('w1, 
ts, 7200000, 3600000)], properties=[])
+      +- LogicalTableScan(table=[[default_catalog, default_database, Table1, 
source: [TestTableSource(ts, a, b)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Union(all=[true], union=[_c0])
+:- Calc(select=[1 AS _c0])
+:  +- GroupWindowTableAggregate(window=[SlidingGroupWindow('w1, ts, 3600000, 
3600000)], select=[EmptyTableAggFunc(a, b) AS (f0, f1)])
+:     +- Exchange(distribution=[single], reuse_id=[1])
+:        +- TableSourceScan(table=[[default_catalog, default_database, Table1, 
source: [TestTableSource(ts, a, b)]]], fields=[ts, a, b])
++- Calc(select=[1 AS _c0])
+   +- GroupWindowTableAggregate(window=[SlidingGroupWindow('w1, ts, 7200000, 
3600000)], select=[EmptyTableAggFunc(a, b) AS (f0, f1)])
+      +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testEventTimeSessionGroupWindowWithUdAgg">
     <Resource name="planBefore">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.scala
index 50c418e..fba8d95 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.scala
@@ -386,6 +386,33 @@ class WindowAggregateTest(aggStrategy: 
AggregatePhaseStrategy) extends TableTest
 
     util.verifyPlan(sql)
   }
+
+  @Test
+  def testWindowAggregateWithDifferentWindows(): Unit = {
+    // This test ensures that the LogicalWindowAggregate node' digest contains 
the window specs.
+    // This allows the planner to make the distinction between similar 
aggregations using different
+    // windows (see FLINK-15577).
+    val sql =
+      """
+        |WITH window_1h AS (
+        |    SELECT 1
+        |    FROM MyTable2
+        |    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
+        |),
+        |
+        |window_2h AS (
+        |    SELECT 1
+        |    FROM MyTable2
+        |    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
+        |)
+        |
+        |(SELECT * FROM window_1h)
+        |UNION ALL
+        |(SELECT * FROM window_2h)
+        |""".stripMargin
+
+    util.verifyPlan(sql)
+  }
 }
 
 object WindowAggregateTest {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
index 1da9694..83d5544 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala
@@ -387,4 +387,31 @@ class WindowAggregateTest extends TableTestBase {
 
     util.verifyPlan(sql)
   }
+
+  @Test
+  def testWindowAggregateWithDifferentWindows(): Unit = {
+    // This test ensures that the LogicalWindowAggregate node' digest contains 
the window specs.
+    // This allows the planner to make the distinction between similar 
aggregations using different
+    // windows (see FLINK-15577).
+    val sql =
+    """
+      |WITH window_1h AS (
+      |    SELECT 1
+      |    FROM MyTable
+      |    GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
+      |),
+      |
+      |window_2h AS (
+      |    SELECT 1
+      |    FROM MyTable
+      |    GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
+      |)
+      |
+      |(SELECT * FROM window_1h)
+      |UNION ALL
+      |(SELECT * FROM window_2h)
+      |""".stripMargin
+
+    util.verifyPlan(sql)
+  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.scala
index f3de4fa..b416170 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.table.planner.plan.stream.table
 
+import java.sql.Timestamp
+
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{Session, Slide, Tumble}
 import 
org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.{WeightedAvg,
 WeightedAvgWithMerge}
-import org.apache.flink.table.planner.utils.TableTestBase
-
+import org.apache.flink.table.planner.utils.{EmptyTableAggFunc, TableTestBase}
 import org.junit.Test
 
 class GroupWindowTest extends TableTestBase {
@@ -406,4 +407,30 @@ class GroupWindowTest extends TableTestBase {
 
     util.verifyPlan(windowedTable)
   }
+
+  @Test
+  def testWindowAggregateWithDifferentWindows(): Unit = {
+    // This test ensures that the LogicalWindowTableAggregate node's digest 
contains the window
+    // specs. This allows the planner to make the distinction between similar 
aggregations using
+    // different windows (see FLINK-15577).
+    val util = streamTestUtil()
+    val table = util.addTableSource[(Timestamp, Long, Int)]('ts.rowtime, 'a, 
'b)
+    val emptyFunc = new EmptyTableAggFunc
+
+    val tableWindow1hr = table
+      .window(Slide over 1.hour every 1.hour on 'ts as 'w1)
+      .groupBy('w1)
+      .flatAggregate(emptyFunc('a, 'b))
+      .select(1)
+
+    val tableWindow2hr = table
+      .window(Slide over 2.hour every 1.hour on 'ts as 'w1)
+      .groupBy('w1)
+      .flatAggregate(emptyFunc('a, 'b))
+      .select(1)
+
+    val unionTable = tableWindow1hr.unionAll(tableWindow2hr)
+
+    util.verifyPlan(unionTable)
+  }
 }

Reply via email to