This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 98d209eab2f6e6cad0bc678876a36090c774082b Author: Benoit Hanotte <b.hano...@criteo.com> AuthorDate: Mon Jan 13 10:48:25 2020 +0100 [FLINK-15577][table-planner] Add Window specs to WindowAggregate nodes' digests The RelNode's digest is used by the Calcite HepPlanner to avoid adding duplicate vertices to the graph. If an equivalent vertex was already present in the graph, then that vertex is used in place of the newly generated one. This means that the digest needs to contain all the information necessary to identifying a vertex and distinguishing it from similar - but not equivalent - vertices. In the case of the `WindowAggregation` nodes, the window specs are currently not in the digest, meaning that two aggregations with the same signatures and expressions but different windows are considered equivalent by the planner, which is not correct and will lead to an invalid Physical Plan. This commit fixes this issue and adds a test ensuring that the window specs are in the digest, as well as similar aggregations on two different windows will not be considered equivalent. This closes #10854 (cherry picked from commit 244718553742c086eefc95f927d7b26af597d40a) --- .../plan/logical/rel/LogicalWindowAggregate.scala | 2 +- .../logical/rel/LogicalWindowTableAggregate.scala | 2 +- .../logical/FlinkLogicalWindowAggregate.scala | 10 +++- .../logical/FlinkLogicalWindowTableAggregate.scala | 10 +++- .../table/api/batch/sql/GroupWindowTest.scala | 59 ++++++++++++++++++++ .../table/api/stream/sql/GroupWindowTest.scala | 64 ++++++++++++++++++++++ .../table/GroupWindowTableAggregateTest.scala | 59 ++++++++++++++++++++ 7 files changed, 202 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala index b87afd6..ee456c4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala @@ -49,7 +49,7 @@ class LogicalWindowAggregate( for (property <- namedProperties) { pw.item(property.name, property.property) } - pw + pw.item("window", window.toString) } override def copy( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowTableAggregate.scala index 02db874..3c722f5 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowTableAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowTableAggregate.scala @@ -50,7 +50,7 @@ class LogicalWindowTableAggregate( for (property <- namedProperties) { pw.item(property.name, property.property) } - pw + pw.item("window", window.toString) } override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): TableAggregate = { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala index 0c289c1..26deb4a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala @@ -25,7 +25,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.core.{Aggregate, AggregateCall} import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rel.{RelNode, RelShuttle} +import org.apache.calcite.rel.{RelNode, RelShuttle, RelWriter} import org.apache.calcite.sql.SqlKind import org.apache.calcite.util.ImmutableBitSet import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty @@ -52,6 +52,14 @@ class FlinkLogicalWindowAggregate( def getNamedProperties: Seq[NamedWindowProperty] = namedProperties + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + for (property <- namedProperties) { + pw.item(property.name, property.property) + } + pw.item("window", window.toString) + } + override def copy( traitSet: RelTraitSet, input: RelNode, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala index f80cc2d..6ba272f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala @@ -25,7 +25,7 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.calcite.rel.{RelNode, RelShuttle} +import org.apache.calcite.rel.{RelNode, RelShuttle, RelWriter} import org.apache.calcite.util.ImmutableBitSet import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.calcite.FlinkTypeFactory @@ -50,6 +50,14 @@ class FlinkLogicalWindowTableAggregate( def getNamedProperties: Seq[NamedWindowProperty] = namedProperties + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + for (property <- namedProperties) { + pw.item(property.name, property.property) + } + pw.item("window", window.toString) + } + override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): TableAggregate = { new FlinkLogicalWindowTableAggregate( window, diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala index b5091ee..77a5a83 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala @@ -387,4 +387,63 @@ class GroupWindowTest extends TableTestBase { util.verifySql(sqlQuery, expected) } + + @Test + def testWindowAggregateWithDifferentWindows() = { + // This test ensures that the LogicalWindowAggregate and FlinkLogicalWindowAggregate nodes' + // digests contain the window specs. This allows the planner to make the distinction between + // similar aggregations using different windows (see FLINK-15577). + val util = batchTestUtil() + val table = util.addTable[(Timestamp)]("MyTable", 'rowtime) + + val sql = + """ + |WITH window_1h AS ( + | SELECT 1 + | FROM MyTable + | GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) + |), + | + |window_2h AS ( + | SELECT 1 + | FROM MyTable + | GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) + |) + | + |(SELECT * FROM window_1h) + |UNION ALL + |(SELECT * FROM window_2h) + |""".stripMargin + + val expected = + binaryNode( + "DataSetUnion", + unaryNode( + "DataSetCalc", + unaryNode( + "DataSetWindowAggregate", + batchTableNode(table), + // This window is the 1hr window + term("window", "SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 3600000.millis)"), + term("select") + ), + term("select", "1 AS EXPR$0") + ), + unaryNode( + "DataSetCalc", + unaryNode( + "DataSetWindowAggregate", + batchTableNode(table), + // This window is the 2hr window + term("window", "SlidingGroupWindow('w$, 'rowtime, 7200000.millis, 3600000.millis)"), + term("select") + ), + term("select", "1 AS EXPR$0") + ), + term("all", "true"), + term("union", "EXPR$0") + ) + + util.verifySql(sql, expected) + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala index 5acef08..c7c4aeb 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala @@ -344,4 +344,68 @@ class GroupWindowTest extends TableTestBase { ) streamUtil.verifySql(sql, expected) } + + @Test + def testWindowAggregateWithDifferentWindows() = { + // This test ensures that the LogicalWindowAggregate and FlinkLogicalWindowAggregate nodes' + // digests contain the window specs. This allows the planner to make the distinction between + // similar aggregations using different windows (see FLINK-15577). + val sql = + """ + |WITH window_1h AS ( + | SELECT 1 + | FROM MyTable + | GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) + |), + | + |window_2h AS ( + | SELECT 1 + | FROM MyTable + | GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) + |) + | + |(SELECT * FROM window_1h) + |UNION ALL + |(SELECT * FROM window_2h) + |""".stripMargin + + val expected = + binaryNode( + "DataStreamUnion", + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(table), + term("select", "rowtime") + ), + // This window is the 1hr window + term("window", "SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 3600000.millis)"), + term("select") + ), + term("select", "1 AS EXPR$0") + ), + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(table), + term("select", "rowtime") + ), + // This window is the 2hr window + term("window", "SlidingGroupWindow('w$, 'rowtime, 7200000.millis, 3600000.millis)"), + term("select") + ), + term("select", "1 AS EXPR$0") + ), + term("all", "true"), + term("union all", "EXPR$0") + ) + + streamUtil.verifySql(sql, expected) + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala index d5f59ee..db869ae 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/GroupWindowTableAggregateTest.scala @@ -549,4 +549,63 @@ class GroupWindowTableAggregateTest extends TableTestBase { util.verifyTable(windowedTable, expected) } + + @Test + def testWindowAggregateWithDifferentWindows(): Unit = { + // This test ensures that the LogicalWindowTableAggregate and FlinkLogicalWindowTableAggregate + // nodes'v digests contain the window specs. This allows the planner to make the distinction + // between similar aggregations using different windows (see FLINK-15577). + val tableWindow1hr = table + .window(Slide over 1.hour every 1.hour on 'd as 'w1) + .groupBy('w1) + .flatAggregate(emptyFunc('a, 'b)) + .select(1 as 'a) + + val tableWindow2hr = table + .window(Slide over 2.hour every 1.hour on 'd as 'w1) + .groupBy('w1) + .flatAggregate(emptyFunc('a, 'b)) + .select(1 as 'b) + + val joinTable = tableWindow1hr.fullOuterJoin(tableWindow2hr, 'a === 'b) + + val expected = + binaryNode( + "DataStreamJoin", + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowTableAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(table), + term("select", "a", "b", "d") + ), + // This window is the 1hr window + term("window", "SlidingGroupWindow('w1, 'd, 3600000.millis, 3600000.millis)"), + term("select", "EmptyTableAggFunc(a, b) AS (f0, f1)") + ), + term("select", "1 AS a") + ), + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowTableAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(table), + term("select", "a", "b", "d") + ), + // This window is the 2hr window + term("window", "SlidingGroupWindow('w1, 'd, 7200000.millis, 3600000.millis)"), + term("select", "EmptyTableAggFunc(a, b) AS (f0, f1)") + ), + term("select", "1 AS b") + ), + term("where", "=(a, b)"), + term("join", "a", "b"), + term("joinType", "FullOuterJoin") + ) + util.verifyTable(joinTable, expected) + } }