This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 85d17b13b5da3a6627f81096ff382580134516ac Author: Benoit Hanotte <b.hano...@criteo.com> AuthorDate: Tue Jan 14 11:19:09 2020 +0100 [FLINK-15577][table-planner-blink] Add different windows tests to blink planner The Blink planner doesn't seem to be subject to the bug described in FLINK-15577. For safety, we also add the tests to ensure no regression is possible that would introduce the issue in the Blink planner. (cherry picked from commit fdc10141418205c520ee4667285ed857d92c3740) --- .../plan/batch/sql/agg/WindowAggregateTest.xml | 148 +++++++++++++++++++++ .../plan/stream/sql/agg/WindowAggregateTest.xml | 89 +++++++++++++ .../planner/plan/stream/table/GroupWindowTest.xml | 25 ++++ .../plan/batch/sql/agg/WindowAggregateTest.scala | 27 ++++ .../plan/stream/sql/agg/WindowAggregateTest.scala | 27 ++++ .../plan/stream/table/GroupWindowTest.scala | 31 ++++- 6 files changed, 345 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml index b91d909..07b7b71 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml @@ -111,6 +111,154 @@ Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0, CAST(/(-($f0, ]]> </Resource> </TestCase> + <TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=AUTO]"> + <Resource name="sql"> + <![CDATA[ +WITH window_1h AS ( + SELECT 1 + FROM MyTable2 + GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) +), + +window_2h AS ( + SELECT 1 + FROM MyTable2 + GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) +) + +(SELECT * FROM window_1h) +UNION ALL +(SELECT * FROM window_2h) +]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalUnion(all=[true]) +:- LogicalProject(EXPR$0=[1]) +: +- LogicalAggregate(group=[{0}]) +: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ++- LogicalProject(EXPR$0=[1]) + +- LogicalAggregate(group=[{0}]) + +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Union(all=[true], union=[EXPR$0]) +:- Calc(select=[1 AS EXPR$0]) +: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) +: +- Exchange(distribution=[single]) +: +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) +: +- Calc(select=[ts], reuse_id=[1]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) ++- Calc(select=[1 AS EXPR$0]) + +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) + +- Exchange(distribution=[single]) + +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) + +- Reused(reference_id=[1]) +]]> + </Resource> + </TestCase> + <TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=ONE_PHASE]"> + <Resource name="sql"> + <![CDATA[ +WITH window_1h AS ( + SELECT 1 + FROM MyTable2 + GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) +), + +window_2h AS ( + SELECT 1 + FROM MyTable2 + GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) +) + +(SELECT * FROM window_1h) +UNION ALL +(SELECT * FROM window_2h) +]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalUnion(all=[true]) +:- LogicalProject(EXPR$0=[1]) +: +- LogicalAggregate(group=[{0}]) +: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ++- LogicalProject(EXPR$0=[1]) + +- LogicalAggregate(group=[{0}]) + +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Union(all=[true], union=[EXPR$0]) +:- Calc(select=[1 AS EXPR$0]) +: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) +: +- Exchange(distribution=[single], reuse_id=[1]) +: +- Calc(select=[ts]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) ++- Calc(select=[1 AS EXPR$0]) + +- SortWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) + +- Sort(orderBy=[ts ASC]) + +- Reused(reference_id=[1]) +]]> + </Resource> + </TestCase> + <TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=TWO_PHASE]"> + <Resource name="sql"> + <![CDATA[ +WITH window_1h AS ( + SELECT 1 + FROM MyTable2 + GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) +), + +window_2h AS ( + SELECT 1 + FROM MyTable2 + GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) +) + +(SELECT * FROM window_1h) +UNION ALL +(SELECT * FROM window_2h) +]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalUnion(all=[true]) +:- LogicalProject(EXPR$0=[1]) +: +- LogicalAggregate(group=[{0}]) +: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ++- LogicalProject(EXPR$0=[1]) + +- LogicalAggregate(group=[{0}]) + +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Union(all=[true], union=[EXPR$0]) +:- Calc(select=[1 AS EXPR$0]) +: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) +: +- Exchange(distribution=[single]) +: +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) +: +- Calc(select=[ts], reuse_id=[1]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) ++- Calc(select=[1 AS EXPR$0]) + +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) + +- Exchange(distribution=[single]) + +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) + +- Reused(reference_id=[1]) +]]> + </Resource> + </TestCase> <TestCase name="testExpressionOnWindowHavingFunction[aggStrategy=AUTO]"> <Resource name="sql"> <![CDATA[ diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml index bf0f787..db86af3 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml @@ -48,6 +48,53 @@ Calc(select=[/(-($f0, /(*($f1, $f1), $f2)), $f2) AS EXPR$0, /(-($f0, /(*($f1, $f ]]> </Resource> </TestCase> + <TestCase name="testWindowAggregateWithDifferentWindows"> + <Resource name="sql"> + <![CDATA[ +WITH window_1h AS ( + SELECT 1 + FROM MyTable + GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) +), + +window_2h AS ( + SELECT 1 + FROM MyTable + GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) +) + +(SELECT * FROM window_1h) +UNION ALL +(SELECT * FROM window_2h) +]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalUnion(all=[true]) +:- LogicalProject(EXPR$0=[1]) +: +- LogicalAggregate(group=[{0}]) +: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ++- LogicalProject(EXPR$0=[1]) + +- LogicalAggregate(group=[{0}]) + +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Union(all=[true], union=[EXPR$0]) +:- Calc(select=[1 AS EXPR$0]) +: +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 3600000, 3600000)], select=[]) +: +- Exchange(distribution=[single], reuse_id=[1]) +: +- Calc(select=[rowtime]) +: +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ++- Calc(select=[1 AS EXPR$0]) + +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 7200000, 3600000)], select=[]) + +- Reused(reference_id=[1]) +]]> + </Resource> + </TestCase> <TestCase name="testExpressionOnWindowAuxFunction"> <Resource name="sql"> <![CDATA[ @@ -591,4 +638,46 @@ GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 1000)], select=[SUM(a ]]> </Resource> </TestCase> + <TestCase name="testWindowAggregateWithDifferentWindows"> + <Resource name="sql"> + <![CDATA[ +SELECT + SUM(correct) AS s, + AVG(correct) AS a, + TUMBLE_START(rowtime, INTERVAL '15' MINUTE) AS wStart +FROM ( + SELECT CASE a + WHEN 1 THEN 1 + ELSE 99 + END AS correct, rowtime + FROM MyTable +) +GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE) + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)]) ++- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)]) + +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Union(all=[true], union=[EXPR$0]) +:- Calc(select=[1 AS EXPR$0]) +: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) +: +- Exchange(distribution=[single]) +: +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) +: +- Calc(select=[ts], reuse_id=[1]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) ++- Calc(select=[1 AS EXPR$0]) + +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) + +- Exchange(distribution=[single]) + +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) + +- Reused(reference_id=[1]) +]]> + </Resource> + </TestCase> </Root> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml index 165c523..36a9d63 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.xml @@ -194,6 +194,31 @@ GroupWindowAggregate(groupBy=[string], window=[SessionGroupWindow('w, rowtime, 7 ]]> </Resource> </TestCase> + <TestCase name="testWindowAggregateWithDifferentWindows"> + <Resource name="planBefore"> + <![CDATA[ +LogicalUnion(all=[true]) +:- LogicalProject(_c0=[AS(1, _UTF-16LE'_c0')]) +: +- LogicalWindowTableAggregate(group=[{}], tableAggregate=[[EmptyTableAggFunc($1, $2)]], window=[SlidingGroupWindow('w1, ts, 3600000, 3600000)], properties=[]) +: +- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(ts, a, b)]]]) ++- LogicalProject(_c0=[AS(1, _UTF-16LE'_c0')]) + +- LogicalWindowTableAggregate(group=[{}], tableAggregate=[[EmptyTableAggFunc($1, $2)]], window=[SlidingGroupWindow('w1, ts, 7200000, 3600000)], properties=[]) + +- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(ts, a, b)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Union(all=[true], union=[_c0]) +:- Calc(select=[1 AS _c0]) +: +- GroupWindowTableAggregate(window=[SlidingGroupWindow('w1, ts, 3600000, 3600000)], select=[EmptyTableAggFunc(a, b) AS (f0, f1)]) +: +- Exchange(distribution=[single], reuse_id=[1]) +: +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(ts, a, b)]]], fields=[ts, a, b]) ++- Calc(select=[1 AS _c0]) + +- GroupWindowTableAggregate(window=[SlidingGroupWindow('w1, ts, 7200000, 3600000)], select=[EmptyTableAggFunc(a, b) AS (f0, f1)]) + +- Reused(reference_id=[1]) +]]> + </Resource> + </TestCase> <TestCase name="testEventTimeSessionGroupWindowWithUdAgg"> <Resource name="planBefore"> <![CDATA[ diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.scala index 50c418e..fba8d95 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.scala @@ -386,6 +386,33 @@ class WindowAggregateTest(aggStrategy: AggregatePhaseStrategy) extends TableTest util.verifyPlan(sql) } + + @Test + def testWindowAggregateWithDifferentWindows(): Unit = { + // This test ensures that the LogicalWindowAggregate node' digest contains the window specs. + // This allows the planner to make the distinction between similar aggregations using different + // windows (see FLINK-15577). + val sql = + """ + |WITH window_1h AS ( + | SELECT 1 + | FROM MyTable2 + | GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) + |), + | + |window_2h AS ( + | SELECT 1 + | FROM MyTable2 + | GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) + |) + | + |(SELECT * FROM window_1h) + |UNION ALL + |(SELECT * FROM window_2h) + |""".stripMargin + + util.verifyPlan(sql) + } } object WindowAggregateTest { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala index 1da9694..83d5544 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala @@ -387,4 +387,31 @@ class WindowAggregateTest extends TableTestBase { util.verifyPlan(sql) } + + @Test + def testWindowAggregateWithDifferentWindows(): Unit = { + // This test ensures that the LogicalWindowAggregate node' digest contains the window specs. + // This allows the planner to make the distinction between similar aggregations using different + // windows (see FLINK-15577). + val sql = + """ + |WITH window_1h AS ( + | SELECT 1 + | FROM MyTable + | GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) + |), + | + |window_2h AS ( + | SELECT 1 + | FROM MyTable + | GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) + |) + | + |(SELECT * FROM window_1h) + |UNION ALL + |(SELECT * FROM window_2h) + |""".stripMargin + + util.verifyPlan(sql) + } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.scala index f3de4fa..b416170 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/GroupWindowTest.scala @@ -18,12 +18,13 @@ package org.apache.flink.table.planner.plan.stream.table +import java.sql.Timestamp + import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.{Session, Slide, Tumble} import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge} -import org.apache.flink.table.planner.utils.TableTestBase - +import org.apache.flink.table.planner.utils.{EmptyTableAggFunc, TableTestBase} import org.junit.Test class GroupWindowTest extends TableTestBase { @@ -406,4 +407,30 @@ class GroupWindowTest extends TableTestBase { util.verifyPlan(windowedTable) } + + @Test + def testWindowAggregateWithDifferentWindows(): Unit = { + // This test ensures that the LogicalWindowTableAggregate node's digest contains the window + // specs. This allows the planner to make the distinction between similar aggregations using + // different windows (see FLINK-15577). + val util = streamTestUtil() + val table = util.addTableSource[(Timestamp, Long, Int)]('ts.rowtime, 'a, 'b) + val emptyFunc = new EmptyTableAggFunc + + val tableWindow1hr = table + .window(Slide over 1.hour every 1.hour on 'ts as 'w1) + .groupBy('w1) + .flatAggregate(emptyFunc('a, 'b)) + .select(1) + + val tableWindow2hr = table + .window(Slide over 2.hour every 1.hour on 'ts as 'w1) + .groupBy('w1) + .flatAggregate(emptyFunc('a, 'b)) + .select(1) + + val unionTable = tableWindow1hr.unionAll(tableWindow2hr) + + util.verifyPlan(unionTable) + } }