This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push: new 1345c0f [FLINK-21225][table-planner-blink] Support OVER window distinct aggregates in Table API 1345c0f is described below commit 1345c0f9a606a6e5ccffda59bb28a6ccfe054263 Author: Jane <55568005+ladyfor...@users.noreply.github.com> AuthorDate: Fri Feb 12 22:54:59 2021 +0800 [FLINK-21225][table-planner-blink] Support OVER window distinct aggregates in Table API This closes #14917 --- .../expressions/converter/OverConvertRule.java | 16 +- .../planner/plan/stream/table/OverWindowTest.xml | 68 +++++++ .../planner/plan/stream/table/OverWindowTest.scala | 49 +++++ .../runtime/stream/table/OverWindowITCase.scala | 220 +++++++++++++++++++++ 4 files changed, 351 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java index 220f19f..62df20d 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java @@ -70,6 +70,9 @@ public class OverConvertRule implements CallExpressionConvertRule { if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.OVER) { FlinkTypeFactory typeFactory = context.getTypeFactory(); Expression agg = children.get(0); + FunctionDefinition def = ((CallExpression) agg).getFunctionDefinition(); + boolean isDistinct = BuiltInFunctionDefinitions.DISTINCT == def; + SqlAggFunction aggFunc = agg.accept(new SqlAggFunctionVisitor(context.getRelBuilder())); RelDataType aggResultType = typeFactory.createFieldTypeFromLogicalType( @@ -78,7 +81,16 @@ public class OverConvertRule implements CallExpressionConvertRule { // assemble exprs by agg children List<RexNode> aggExprs = - agg.getChildren().stream().map(context::toRexNode).collect(Collectors.toList()); + agg.getChildren().stream() + .map( + child -> { + if (isDistinct) { + return context.toRexNode(child.getChildren().get(0)); + } else { + return context.toRexNode(child); + } + }) + .collect(Collectors.toList()); // assemble order by key Expression orderKeyExpr = children.get(1); @@ -123,7 +135,7 @@ public class OverConvertRule implements CallExpressionConvertRule { isPhysical, true, false, - false)); + isDistinct)); } return Optional.empty(); } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml index 2f3f38d..1df07c7 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml @@ -152,6 +152,40 @@ Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2]) ]]> </Resource> </TestCase> + <TestCase name="testRowTimeBoundedDistinctWithPartitionedRangeOver"> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST RANGE 7200000 PRECEDING), _UTF-16LE'_c1')], _c2=[AS(SUM(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST RANGE 7200000 PRECEDING), _UTF-16LE'_c2')], _c3=[AS(AVG(DISTINCT AS(CAST($0):FLOAT, _UTF-16LE'a')) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST RANGE 7200000 PRECEDING), _UTF-16LE'_c3')]) ++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3]) ++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2]) + +- Exchange(distribution=[hash[c]]) + +- Calc(select=[a, c, rowtime, CAST(a) AS $3]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testRowTimeBoundedDistinctWithPartitionedRowsOver"> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS 2 PRECEDING), _UTF-16LE'_c1')], _c2=[AS(SUM(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS 2 PRECEDING), _UTF-16LE'_c2')], _c3=[AS(AVG(DISTINCT AS(CAST($0):FLOAT, _UTF-16LE'a')) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS 2 PRECEDING), _UTF-16LE'_c3')]) ++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3]) ++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN 2 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2]) + +- Exchange(distribution=[hash[c]]) + +- Calc(select=[a, c, rowtime, CAST(a) AS $3]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> <TestCase name="testRowTimeBoundedNonPartitionedRangeOver"> <Resource name="planBefore"> <![CDATA[ @@ -220,6 +254,40 @@ Calc(select=[c, w0$o0 AS _c1, w0$o1 AS wAvg]) ]]> </Resource> </TestCase> + <TestCase name="testRowTimeUnboundedDistinctWithPartitionedRangeOver"> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST), _UTF-16LE'_c1')], _c2=[AS(SUM(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST), _UTF-16LE'_c2')], _c3=[AS(AVG(DISTINCT AS(CAST($0):FLOAT, _UTF-16LE'a')) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST), _UTF-16LE'_c3')]) ++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3]) ++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2]) + +- Exchange(distribution=[hash[c]]) + +- Calc(select=[a, c, rowtime, CAST(a) AS $3]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testRowTimeUnboundedDistinctWithPartitionedRowsOver"> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), _UTF-16LE'_c1')], _c2=[AS(SUM(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), _UTF-16LE'_c2')], _c3=[AS(AVG(DISTINCT AS(CAST($0):FLOAT, _UTF-16LE'a')) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), _UTF-16LE'_c3')]) ++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3]) ++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2]) + +- Exchange(distribution=[hash[c]]) + +- Calc(select=[a, c, rowtime, CAST(a) AS $3]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +]]> + </Resource> + </TestCase> <TestCase name="testRowTimeUnboundedNonPartitionedRangeOver"> <Resource name="planBefore"> <![CDATA[ diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.scala index cec69be..4466091 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.scala @@ -189,6 +189,55 @@ class OverWindowTest extends TableTestBase { } @Test + def testRowTimeBoundedDistinctWithPartitionedRangeOver(): Unit = { + val result = table + .window(Over partitionBy 'c orderBy 'rowtime preceding 2.hours following CURRENT_RANGE as 'w) + .select('c, + 'a.count.distinct over 'w, + 'a.sum.distinct over 'w, + ('a.cast(DataTypes.FLOAT) as 'a).avg.distinct over 'w) + + streamUtil.verifyPlan(result) + } + + @Test + def testRowTimeUnboundedDistinctWithPartitionedRangeOver(): Unit = { + val result = table + .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) + .select('c, + 'a.count.distinct over 'w, + 'a.sum.distinct over 'w, + ('a.cast(DataTypes.FLOAT) as 'a).avg.distinct over 'w) + + streamUtil.verifyPlan(result) + } + + @Test + def testRowTimeBoundedDistinctWithPartitionedRowsOver(): Unit = { + val result = table + .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) + .select('c, + 'a.count.distinct over 'w, + 'a.sum.distinct over 'w, + ('a.cast(DataTypes.FLOAT) as 'a).avg.distinct over 'w) + + streamUtil.verifyPlan(result) + } + + @Test + def testRowTimeUnboundedDistinctWithPartitionedRowsOver(): Unit = { + val result = table + .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_ROW following + CURRENT_ROW as 'w) + .select('c, + 'a.count.distinct over 'w, + 'a.sum.distinct over 'w, + ('a.cast(DataTypes.FLOAT) as 'a).avg.distinct over 'w) + + streamUtil.verifyPlan(result) + } + + @Test def testRowTimeUnboundedPartitionedRowsOver() = { val weightedAvg = new WeightedAvgWithRetract diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala index 8ed3692..2104f75 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala @@ -194,6 +194,226 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas } @Test + def testRowTimeBoundedDistinctPartitionedRangeOver(): Unit = { + val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq( + Left(14000005L, (1, 1L, "Hi")), + Left(14000000L, (2, 1L, "Hello")), + Left(14000001L, (1, 1L, "Hello")), + Left(14000002L, (1, 2L, "Hello")), + Left(14000002L, (1, 3L, "Hello world")), + Left(14000003L, (2, 2L, "Hello world")), + Left(14000003L, (2, 3L, "Hello world")), + Right(14000020L), + Left(14000021L, (1, 4L, "Hello world")), + Left(14000022L, (1, 5L, "Hello world")), + Left(14000022L, (1, 6L, "Hello world")), + Left(14000022L, (1, 7L, "Hello world")), + Left(14000023L, (2, 4L, "Hello world")), + Left(14000023L, (2, 5L, "Hello world")), + Right(14000030L) + ) + + val source = failingDataSource(data) + val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)]) + .setParallelism(source.parallelism) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) + + val windowedTable = table + .window(Over partitionBy 'c orderBy 'rowtime + preceding 1.seconds following CURRENT_RANGE as 'w) + .select( + 'c, + 'b.count.distinct over 'w, + 'b.sum.distinct over 'w, + ('b.cast(DataTypes.FLOAT) as 'b).avg.distinct over 'w) + + val sink = new TestingAppendSink + windowedTable.toAppendStream[Row].addSink(sink) + env.execute() + val expected = Seq( + "Hello,1,1,1.0", + "Hello,1,1,1.0", + "Hello,2,3,1.5", + "Hello world,1,3,3.0", + "Hello world,2,5,2.5", + "Hello world,2,5,2.5", + "Hi,1,1,1.0", + "Hello world,3,9,3.0", + "Hello world,6,27,4.5", + "Hello world,6,27,4.5", + "Hello world,6,27,4.5", + "Hello world,6,27,4.5", + "Hello world,6,27,4.5" + ) + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + @Test + def testRowTimeUnBoundedDistinctPartitionedRangeOver(): Unit = { + val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq( + Left(14000005L, (1, 1L, "Hi")), + Left(14000000L, (2, 1L, "Hello")), + Left(14000001L, (1, 1L, "Hello")), + Left(14000002L, (1, 2L, "Hello")), + Left(14000002L, (1, 3L, "Hello world")), + Left(14000003L, (2, 2L, "Hello world")), + Left(14000003L, (2, 3L, "Hello world")), + Right(14000020L), + Left(14000021L, (1, 4L, "Hello world")), + Left(14000022L, (1, 5L, "Hello world")), + Left(14000022L, (1, 6L, "Hello world")), + Left(14000022L, (1, 7L, "Hello world")), + Left(14000023L, (2, 4L, "Hello world")), + Left(14000023L, (2, 5L, "Hello world")), + Right(14000030L) + ) + + val source = failingDataSource(data) + val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)]) + .setParallelism(source.parallelism) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) + + val windowedTable = table + .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w) + .select( + 'c, + 'b.count.distinct over 'w, + 'b.sum.distinct over 'w, + ('b.cast(DataTypes.FLOAT) as 'b).avg.distinct over 'w + ) + + val sink = new TestingAppendSink + windowedTable.toAppendStream[Row].addSink(sink) + env.execute() + val expected = Seq( + "Hello,1,1,1.0", + "Hello,1,1,1.0", + "Hello,2,3,1.5", + "Hello world,1,3,3.0", + "Hello world,2,5,2.5", + "Hello world,2,5,2.5", + "Hi,1,1,1.0", + "Hello world,3,9,3.0", + "Hello world,6,27,4.5", + "Hello world,6,27,4.5", + "Hello world,6,27,4.5", + "Hello world,6,27,4.5", + "Hello world,6,27,4.5" + ) + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + @Test + def testRowTimeBoundedDistinctPartitionedRowsOver(): Unit = { + val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq( + Left(14000005L, (1, 1L, "Hi")), + Left(14000000L, (2, 1L, "Hello")), + Left(14000001L, (1, 1L, "Hello")), + Left(14000002L, (1, 2L, "Hello")), + Left(14000002L, (1, 3L, "Hello world")), + Left(14000003L, (2, 2L, "Hello world")), + Left(14000003L, (2, 3L, "Hello world")), + Right(14000020L), + Left(14000021L, (1, 4L, "Hello world")), + Left(14000022L, (1, 5L, "Hello world")), + Left(14000022L, (1, 6L, "Hello world")), + Left(14000022L, (1, 7L, "Hello world")), + Left(14000023L, (2, 4L, "Hello world")), + Left(14000023L, (2, 5L, "Hello world")), + Right(14000030L) + ) + + val source = failingDataSource(data) + val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)]) + .setParallelism(source.parallelism) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) + + val windowedTable = table + .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) + .select( + 'c, + 'b.count.distinct over 'w, + 'b.sum.distinct over 'w, + ('b.cast(DataTypes.FLOAT) as 'b).avg.distinct over 'w) + + val sink = new TestingAppendSink + windowedTable.toAppendStream[Row].addSink(sink) + env.execute() + val expected = Seq( + "Hello,1,1,1.0", + "Hello,1,1,1.0", + "Hello,2,3,1.5", + "Hello world,1,3,3.0", + "Hello world,2,5,2.5", + "Hello world,2,5,2.5", + "Hi,1,1,1.0", + "Hello world,3,9,3.0", + "Hello world,3,12,4.0", + "Hello world,3,15,5.0", + "Hello world,3,16,5.3333335", + "Hello world,3,17,5.6666665", + "Hello world,3,18,6.0" + ) + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + @Test + def testRowTimeUnBoundedDistinctPartitionedRowsOver(): Unit = { + val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq( + Left(14000005L, (1, 1L, "Hi")), + Left(14000000L, (2, 1L, "Hello")), + Left(14000001L, (1, 1L, "Hello")), + Left(14000002L, (1, 2L, "Hello")), + Left(14000002L, (1, 3L, "Hello world")), + Left(14000003L, (2, 2L, "Hello world")), + Left(14000003L, (2, 3L, "Hello world")), + Right(14000020L), + Left(14000021L, (1, 4L, "Hello world")), + Left(14000022L, (1, 5L, "Hello world")), + Left(14000022L, (1, 6L, "Hello world")), + Left(14000022L, (1, 7L, "Hello world")), + Left(14000023L, (2, 4L, "Hello world")), + Left(14000023L, (2, 5L, "Hello world")), + Right(14000030L) + ) + + val source = failingDataSource(data) + val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)]) + .setParallelism(source.parallelism) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) + + val windowedTable = table + .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_ROW following + CURRENT_ROW as 'w) + .select( + 'c, + 'b.count.distinct over 'w, + 'b.sum.distinct over 'w, + ('b.cast(DataTypes.FLOAT) as 'b).avg.distinct over 'w) + + val sink = new TestingAppendSink + windowedTable.toAppendStream[Row].addSink(sink) + env.execute() + val expected = Seq( + "Hello,1,1,1.0", + "Hello,1,1,1.0", + "Hello,2,3,1.5", + "Hello world,1,3,3.0", + "Hello world,2,5,2.5", + "Hello world,2,5,2.5", + "Hi,1,1,1.0", + "Hello world,3,9,3.0", + "Hello world,4,14,3.5", + "Hello world,5,20,4.0", + "Hello world,6,27,4.5", + "Hello world,6,27,4.5", + "Hello world,6,27,4.5" + ) + assertEquals(expected.sorted, sink.getAppendResults.sorted) + } + + + @Test def testProcTimeBoundedPartitionedRowsOver(): Unit = { val data = List(