This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new ce7968e [FLINK-15466][table-planner-blink] Fix the wrong plan when use distinct aggregations with filter ce7968e is described below commit ce7968ecafcde69596897a2243235161cd347b13 Author: Shuo Cheng <mf1533...@smail.nju.edu.cn> AuthorDate: Thu Jan 9 17:04:24 2020 +0800 [FLINK-15466][table-planner-blink] Fix the wrong plan when use distinct aggregations with filter This closes #10760 (cherry picked from commit 6f2e9abffb0b1ef68e4f2cf058a24524b61e88a1) --- ...FlinkAggregateExpandDistinctAggregatesRule.java | 26 +- .../plan/batch/sql/agg/DistinctAggregateTest.xml | 561 ++++++++++++++++++--- ...nkAggregateExpandDistinctAggregatesRuleTest.xml | 195 +++++-- .../plan/batch/sql/agg/DistinctAggregateTest.scala | 64 +-- .../DistinctAggregateTestBase.scala} | 81 ++- ...AggregateExpandDistinctAggregatesRuleTest.scala | 143 +----- .../sql/agg/DistinctAggregateITCaseBase.scala | 59 ++- 7 files changed, 775 insertions(+), 354 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRule.java index 8023100..4b44eec 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRule.java @@ -417,14 +417,17 @@ public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule Aggregate aggregate) { final Set<ImmutableBitSet> groupSetTreeSet = new TreeSet<>(ImmutableBitSet.ORDERING); + final Map<ImmutableBitSet, Integer> groupSetToDistinctAggCallFilterArg = new HashMap<>(); for (AggregateCall aggCall : aggregate.getAggCallList()) { if (!aggCall.isDistinct()) { groupSetTreeSet.add(aggregate.getGroupSet()); } else { - groupSetTreeSet.add( + ImmutableBitSet groupSet = ImmutableBitSet.of(aggCall.getArgList()) .setIf(aggCall.filterArg, aggCall.filterArg >= 0) - .union(aggregate.getGroupSet())); + .union(aggregate.getGroupSet()); + groupSetToDistinctAggCallFilterArg.put(groupSet, aggCall.filterArg); + groupSetTreeSet.add(groupSet); } } @@ -471,10 +474,21 @@ public final class FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule final RexNode nodeZ = nodes.remove(nodes.size() - 1); for (Map.Entry<ImmutableBitSet, Integer> entry : filters.entrySet()) { final long v = groupValue(fullGroupSet, entry.getKey()); - nodes.add( - relBuilder.alias( - relBuilder.equals(nodeZ, relBuilder.literal(v)), - "$g_" + v)); + // Get and remap the filterArg of the distinct aggregate call. + int distinctAggCallFilterArg = remap(fullGroupSet, + groupSetToDistinctAggCallFilterArg.getOrDefault(entry.getKey(), -1)); + RexNode expr; + if (distinctAggCallFilterArg < 0) { + expr = relBuilder.equals(nodeZ, relBuilder.literal(v)); + } else { + RexBuilder rexBuilder = aggregate.getCluster().getRexBuilder(); + // merge the filter of the distinct aggregate call itself. + expr = relBuilder.and( + relBuilder.equals(nodeZ, relBuilder.literal(v)), + rexBuilder.makeCall(SqlStdOperatorTable.IS_TRUE, + relBuilder.field(distinctAggCallFilterArg))); + } + nodes.add(relBuilder.alias(expr, "$g_" + v)); } relBuilder.project(nodes); } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml index e93942a..577f89a 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml @@ -16,64 +16,233 @@ See the License for the specific language governing permissions and limitations under the License. --> <Root> - <TestCase name="testMultiDistinctAggregateOnDifferentColumn"> + <TestCase name="testDistinctAggWithDuplicateField"> <Resource name="sql"> - <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable]]> + <![CDATA[SELECT a, COUNT(a), SUM(b), SUM(DISTINCT b) FROM MyTable GROUP BY a]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)]) +LogicalAggregate(group=[{0}], EXPR$1=[COUNT($0)], EXPR$2=[SUM($1)], EXPR$3=[SUM(DISTINCT $1)]) +- LogicalProject(a=[$0], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1]) +Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2, EXPR$3]) ++- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MIN(min$0) AS EXPR$1, Final_MIN(min$1) AS EXPR$2, Final_SUM(sum$2) AS EXPR$3]) + +- Exchange(distribution=[hash[a]]) + +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MIN(EXPR$1) FILTER $g_1 AS min$0, Partial_MIN(EXPR$2) FILTER $g_1 AS min$1, Partial_SUM(b) FILTER $g_0 AS sum$2]) + +- Calc(select=[a, b, EXPR$1, EXPR$2, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) + +- HashAggregate(isMerge=[true], groupBy=[a, b, $e], select=[a, b, $e, Final_COUNT(count$0) AS EXPR$1, Final_SUM(sum$1) AS EXPR$2]) + +- Exchange(distribution=[hash[a, b, $e]]) + +- LocalHashAggregate(groupBy=[a, b, $e], select=[a, b, $e, Partial_COUNT(a) AS count$0, Partial_SUM(b_0) AS sum$1]) + +- Expand(projects=[{a=[$0], b=[$1], $e=[0], b_0=[$1]}, {a=[$0], b=[null], $e=[1], b_0=[$1]}], projects=[{a, b, 0 AS $e, b AS b_0}, {a, null AS b, 1 AS $e, b AS b_0}]) + +- Calc(select=[a, b]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiDifferentDistinctAggWithNonDistinctAggOnDifferentColumnAndGroupBy"> + <Resource name="sql"> + <![CDATA[SELECT SUM(DISTINCT a), COUNT(DISTINCT c) FROM MyTable GROUP BY b]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[$1], EXPR$1=[$2]) ++- LogicalAggregate(group=[{0}], EXPR$0=[SUM(DISTINCT $1)], EXPR$1=[COUNT(DISTINCT $2)]) + +- LogicalProject(b=[$1], a=[$0], c=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[EXPR$0, EXPR$1]) ++- HashAggregate(isMerge=[true], groupBy=[b], select=[b, Final_SUM(sum$0) AS EXPR$0, Final_COUNT(count$1) AS EXPR$1]) + +- Exchange(distribution=[hash[b]]) + +- LocalHashAggregate(groupBy=[b], select=[b, Partial_SUM(a) FILTER $g_1 AS sum$0, Partial_COUNT(c) FILTER $g_2 AS count$1]) + +- Calc(select=[b, a, c, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 2) AS $g_2]) + +- HashAggregate(isMerge=[true], groupBy=[a, b, c, $e], select=[a, b, c, $e]) + +- Exchange(distribution=[hash[a, b, c, $e]]) + +- LocalHashAggregate(groupBy=[a, b, c, $e], select=[a, b, c, $e]) + +- Expand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[null], b=[$1], c=[$2], $e=[4]}], projects=[{a, b, null AS c, 1 AS $e}, {null AS a, b, c, 4 AS $e}]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiDifferentDistinctAggWithNonDistinctAggOnSameColumn"> + <Resource name="sql"> + <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b), MAX(a), MIN(a) FROM MyTable]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[MAX($0)], EXPR$3=[MIN($0)]) ++- LogicalProject(a=[$0], b=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1, Final_MIN(min$2) AS EXPR$2, Final_MIN(min$3) AS EXPR$3]) +- Exchange(distribution=[single]) - +- LocalSortAggregate(select=[Partial_COUNT(a) FILTER $g_1 AS count$0, Partial_SUM(b) FILTER $g_2 AS sum$1]) - +- Calc(select=[a, b, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 2) AS $g_2]) - +- HashAggregate(isMerge=[true], groupBy=[a, b, $e], select=[a, b, $e]) + +- LocalSortAggregate(select=[Partial_COUNT(a) FILTER $g_1 AS count$0, Partial_SUM(b) FILTER $g_2 AS sum$1, Partial_MIN(EXPR$2) FILTER $g_3 AS min$2, Partial_MIN(EXPR$3) FILTER $g_3 AS min$3]) + +- Calc(select=[a, b, EXPR$2, EXPR$3, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3]) + +- HashAggregate(isMerge=[true], groupBy=[a, b, $e], select=[a, b, $e, Final_MAX(max$0) AS EXPR$2, Final_MIN(min$1) AS EXPR$3]) +- Exchange(distribution=[hash[a, b, $e]]) - +- LocalHashAggregate(groupBy=[a, b, $e], select=[a, b, $e]) - +- Expand(projects=[{a=[$0], b=[null], $e=[1]}, {a=[null], b=[$1], $e=[2]}], projects=[{a, null AS b, 1 AS $e}, {null AS a, b, 2 AS $e}]) + +- LocalHashAggregate(groupBy=[a, b, $e], select=[a, b, $e, Partial_MAX(a_0) AS max$0, Partial_MIN(a_0) AS min$1]) + +- Expand(projects=[{a=[$0], b=[null], $e=[1], a_0=[$0]}, {a=[null], b=[$1], $e=[2], a_0=[$0]}, {a=[null], b=[null], $e=[3], a_0=[$0]}], projects=[{a, null AS b, 1 AS $e, a AS a_0}, {null AS a, b, 2 AS $e, a AS a_0}, {null AS a, null AS b, 3 AS $e, a AS a_0}]) +- Calc(select=[a, b]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> </TestCase> - <TestCase name="testMultiDistinctAggregateOnSameColumn"> + <TestCase name="testMultiDifferentDistinctAggWithNonDistinctAggOnSameColumnAndGroupBy"> <Resource name="sql"> - <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable]]> + <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b), MAX(a), MIN(a) FROM MyTable GROUP BY c]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $0)], EXPR$2=[MAX($0)]) -+- LogicalProject(a=[$0]) +LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4]) ++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT(DISTINCT $1)], EXPR$1=[SUM(DISTINCT $2)], EXPR$2=[MAX($1)], EXPR$3=[MIN($1)]) + +- LogicalProject(c=[$2], a=[$0], b=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3]) ++- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_COUNT(count$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1, Final_MIN(min$2) AS EXPR$2, Final_MIN(min$3) AS EXPR$3]) + +- Exchange(distribution=[hash[c]]) + +- LocalHashAggregate(groupBy=[c], select=[c, Partial_COUNT(a) FILTER $g_1 AS count$0, Partial_SUM(b) FILTER $g_2 AS sum$1, Partial_MIN(EXPR$2) FILTER $g_3 AS min$2, Partial_MIN(EXPR$3) FILTER $g_3 AS min$3]) + +- Calc(select=[c, a, b, EXPR$2, EXPR$3, =(CASE(=($e, 2:BIGINT), 1:BIGINT, =($e, 4:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 2:BIGINT), 1:BIGINT, =($e, 4:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, =(CASE(=($e, 2:BIGINT), 1:BIGINT, =($e, 4:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3]) + +- HashAggregate(isMerge=[true], groupBy=[a, b, c, $e], select=[a, b, c, $e, Final_MAX(max$0) AS EXPR$2, Final_MIN(min$1) AS EXPR$3]) + +- Exchange(distribution=[hash[a, b, c, $e]]) + +- LocalHashAggregate(groupBy=[a, b, c, $e], select=[a, b, c, $e, Partial_MAX(a_0) AS max$0, Partial_MIN(a_0) AS min$1]) + +- Expand(projects=[{a=[$0], b=[null], c=[$2], $e=[2], a_0=[$0]}, {a=[null], b=[$1], c=[$2], $e=[4], a_0=[$0]}, {a=[null], b=[null], c=[$2], $e=[6], a_0=[$0]}], projects=[{a, null AS b, c, 2 AS $e, a AS a_0}, {null AS a, b, c, 4 AS $e, a AS a_0}, {null AS a, null AS b, c, 6 AS $e, a AS a_0}]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiDistinctAggOnDifferentColumn"> + <Resource name="sql"> + <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b), +COUNT(DISTINCT c) FILTER (WHERE a > 5) FROM MyTable]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $2) FILTER $3]) ++- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[IS TRUE(>($0, 5))]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1, Final_MAX(max$2) AS EXPR$2]) +SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1, Final_COUNT(count$2) AS EXPR$2]) +- Exchange(distribution=[single]) - +- LocalSortAggregate(select=[Partial_COUNT(a) AS count$0, Partial_SUM(a) AS sum$1, Partial_MAX(a) AS max$2]) - +- HashAggregate(isMerge=[true], groupBy=[a], select=[a]) - +- Exchange(distribution=[hash[a]]) - +- LocalHashAggregate(groupBy=[a], select=[a]) - +- Calc(select=[a]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) + +- LocalSortAggregate(select=[Partial_COUNT(a) FILTER $g_7 AS count$0, Partial_SUM(b) FILTER $g_11 AS sum$1, Partial_COUNT(c) FILTER $g_12 AS count$2]) + +- Calc(select=[a, b, c, CAST($f3) AS $f3, =(CASE(=($e, 7:BIGINT), 7:BIGINT, =($e, 11:BIGINT), 11:BIGINT, 12:BIGINT), 7) AS $g_7, =(CASE(=($e, 7:BIGINT), 7:BIGINT, =($e, 11:BIGINT), 11:BIGINT, 12:BIGINT), 11) AS $g_11, AND(=(CASE(=($e, 7:BIGINT), 7:BIGINT, =($e, 11:BIGINT), 11:BIGINT, 12:BIGINT), 12), IS TRUE(CAST($f3))) AS $g_12]) + +- HashAggregate(isMerge=[true], groupBy=[a, b, c, $f3, $e], select=[a, b, c, $f3, $e]) + +- Exchange(distribution=[hash[a, b, c, $f3, $e]]) + +- LocalHashAggregate(groupBy=[a, b, c, $f3, $e], select=[a, b, c, $f3, $e]) + +- Expand(projects=[{a=[$0], b=[null], c=[null], $f3=[null], $e=[7]}, {a=[null], b=[$1], c=[null], $f3=[null], $e=[11]}, {a=[null], b=[null], c=[$2], $f3=[$3], $e=[12]}], projects=[{a, null AS b, null AS c, null AS $f3, 7 AS $e}, {null AS a, b, null AS c, null AS $f3, 11 AS $e}, {null AS a, null AS b, c, $f3, 12 AS $e}]) + +- Calc(select=[a, b, c, IS TRUE(>(a, 5)) AS $f3]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> </TestCase> - <TestCase name="testMultiDistinctAndNonDistinctAggregateOnDifferentColumn"> + <TestCase name="testMultiDistinctAggOnDifferentColumnWithGroupingSets"> <Resource name="sql"> - <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM MyTable]]> + <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable2 GROUP BY GROUPING SETS (c, d)]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[COUNT($2)]) -+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +LogicalProject(EXPR$0=[$2], EXPR$1=[$3]) ++- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], EXPR$0=[COUNT(DISTINCT $2)], EXPR$1=[SUM(DISTINCT $3)]) + +- LogicalProject(c=[$2], d=[$3], a=[$0], b=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[EXPR$0, EXPR$1]) ++- HashAggregate(isMerge=[true], groupBy=[c, d, $e], select=[c, d, $e, Final_COUNT(count$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1]) + +- Exchange(distribution=[hash[c, d, $e]]) + +- LocalHashAggregate(groupBy=[c, d, $e], select=[c, d, $e, Partial_COUNT(a) FILTER $g_2 AS count$0, Partial_SUM(b) FILTER $g_4 AS sum$1]) + +- Calc(select=[c, d, a, b, $e, =(CASE(=($e_0, 2:BIGINT), 2:BIGINT, 4:BIGINT), 2) AS $g_2, =(CASE(=($e_0, 2:BIGINT), 2:BIGINT, 4:BIGINT), 4) AS $g_4]) + +- HashAggregate(isMerge=[true], groupBy=[c, d, a, b, $e, $e_0], select=[c, d, a, b, $e, $e_0]) + +- Exchange(distribution=[hash[c, d, a, b, $e, $e_0]]) + +- LocalHashAggregate(groupBy=[c, d, a, b, $e, $e_0], select=[c, d, a, b, $e, $e_0]) + +- Expand(projects=[{c=[$0], d=[$1], a=[$2], b=[null], $e=[$4], $e_0=[2]}, {c=[$0], d=[$1], a=[null], b=[$3], $e=[$4], $e_0=[4]}], projects=[{c, d, a, null AS b, $e, 2 AS $e_0}, {c, d, null AS a, b, $e, 4 AS $e_0}]) + +- Expand(projects=[{c=[$0], d=[null], a=[$2], b=[$3], $e=[1]}, {c=[null], d=[$1], a=[$2], b=[$3], $e=[2]}], projects=[{c, null AS d, a, b, 1 AS $e}, {null AS c, d, a, b, 2 AS $e}]) + +- Calc(select=[c, d, a, b]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiDistinctAggOnSameColumn"> + <Resource name="sql"> + <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT a) FILTER (WHERE b > 0), +MAX(DISTINCT a) FROM MyTable]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $0) FILTER $1], EXPR$2=[MAX($0)]) ++- LogicalProject(a=[$0], $f1=[IS TRUE(>($1, 0))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1, Final_MIN(min$2) AS EXPR$2]) ++- Exchange(distribution=[single]) + +- LocalSortAggregate(select=[Partial_COUNT(a) FILTER $g_1 AS count$0, Partial_SUM(a) FILTER $g_0 AS sum$1, Partial_MIN(EXPR$2) FILTER $g_3 AS min$2]) + +- Calc(select=[a, CAST($f1) AS $f1, EXPR$2, AND(=(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 0), IS TRUE(CAST($f1))) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 3) AS $g_3]) + +- HashAggregate(isMerge=[true], groupBy=[a, $f1, $e], select=[a, $f1, $e, Final_MAX(max$0) AS EXPR$2]) + +- Exchange(distribution=[hash[a, $f1, $e]]) + +- LocalHashAggregate(groupBy=[a, $f1, $e], select=[a, $f1, $e, Partial_MAX(a_0) AS max$0]) + +- Expand(projects=[{a=[$0], $f1=[$1], $e=[0], a_0=[$0]}, {a=[$0], $f1=[null], $e=[1], a_0=[$0]}, {a=[null], $f1=[null], $e=[3], a_0=[$0]}], projects=[{a, $f1, 0 AS $e, a AS a_0}, {a, null AS $f1, 1 AS $e, a AS a_0}, {null AS a, null AS $f1, 3 AS $e, a AS a_0}]) + +- Calc(select=[a, IS TRUE(>(b, 0)) AS $f1]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiDistinctAggOnSameColumnWithGroupingSets"> + <Resource name="sql"> + <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable2 GROUP BY GROUPING SETS (b, c)]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[$2], EXPR$1=[$3], EXPR$2=[$4]) ++- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], EXPR$0=[COUNT(DISTINCT $2)], EXPR$1=[SUM(DISTINCT $2)], EXPR$2=[MAX($2)]) + +- LogicalProject(b=[$1], c=[$2], a=[$0]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[EXPR$0, EXPR$1, EXPR$2]) ++- HashAggregate(isMerge=[true], groupBy=[b, c, $e], select=[b, c, $e, Final_COUNT(count$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1, Final_MAX(max$2) AS EXPR$2]) + +- Exchange(distribution=[hash[b, c, $e]]) + +- LocalHashAggregate(groupBy=[b, c, $e], select=[b, c, $e, Partial_COUNT(a) AS count$0, Partial_SUM(a) AS sum$1, Partial_MAX(a) AS max$2]) + +- HashAggregate(isMerge=[true], groupBy=[b, c, a, $e], select=[b, c, a, $e]) + +- Exchange(distribution=[hash[b, c, a, $e]]) + +- LocalHashAggregate(groupBy=[b, c, a, $e], select=[b, c, a, $e]) + +- Expand(projects=[{b=[$0], c=[null], a=[$2], $e=[1]}, {b=[null], c=[$1], a=[$2], $e=[2]}], projects=[{b, null AS c, a, 1 AS $e}, {null AS b, c, a, 2 AS $e}]) + +- Calc(select=[b, c, a]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiDistinctAndNonDistinctAggOnDifferentColumn"> + <Resource name="sql"> + <![CDATA[SELECT COUNT(DISTINCT a) FILTER (WHERE c > 0), +SUM(DISTINCT b), COUNT(c) FROM MyTable]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0) FILTER $1], EXPR$1=[SUM(DISTINCT $2)], EXPR$2=[COUNT($3)]) ++- LogicalProject(a=[$0], $f1=[IS TRUE(>($2, 0))], b=[$1], c=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> @@ -81,17 +250,46 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $ Calc(select=[EXPR$0, EXPR$1, CAST(CASE(IS NOT NULL(EXPR$2), EXPR$2, 0)) AS EXPR$2]) +- HashAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1, Final_MIN(min$2) AS EXPR$2]) +- Exchange(distribution=[single]) - +- LocalHashAggregate(select=[Partial_COUNT(a) FILTER $g_1 AS count$0, Partial_SUM(b) FILTER $g_2 AS sum$1, Partial_MIN(EXPR$2) FILTER $g_3 AS min$2]) - +- Calc(select=[a, b, EXPR$2, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3]) - +- HashAggregate(isMerge=[true], groupBy=[a, b, $e], select=[a, b, $e, Final_COUNT(count$0) AS EXPR$2]) - +- Exchange(distribution=[hash[a, b, $e]]) - +- LocalHashAggregate(groupBy=[a, b, $e], select=[a, b, $e, Partial_COUNT(c) AS count$0]) - +- Expand(projects=[{a=[$0], b=[null], c=[$2], $e=[1]}, {a=[null], b=[$1], c=[$2], $e=[2]}, {a=[null], b=[null], c=[$2], $e=[3]}], projects=[{a, null AS b, c, 1 AS $e}, {null AS a, b, c, 2 AS $e}, {null AS a, null AS b, c, 3 AS $e}]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) + +- LocalHashAggregate(select=[Partial_COUNT(a) FILTER $g_1 AS count$0, Partial_SUM(b) FILTER $g_6 AS sum$1, Partial_MIN(EXPR$2) FILTER $g_7 AS min$2]) + +- Calc(select=[a, CAST($f1) AS $f1, b, EXPR$2, AND(=(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 6:BIGINT), 6:BIGINT, 7:BIGINT), 1), IS TRUE(CAST($f1))) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 6:BIGINT), 6:BIGINT, 7:BIGINT), 6) AS $g_6, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 6:BIGINT), 6:BIGINT, 7:BIGINT), 7) AS $g_7]) + +- HashAggregate(isMerge=[true], groupBy=[a, $f1, b, $e], select=[a, $f1, b, $e, Final_COUNT(count$0) AS EXPR$2]) + +- Exchange(distribution=[hash[a, $f1, b, $e]]) + +- LocalHashAggregate(groupBy=[a, $f1, b, $e], select=[a, $f1, b, $e, Partial_COUNT(c) AS count$0]) + +- Expand(projects=[{a=[$0], $f1=[$1], b=[null], c=[$3], $e=[1]}, {a=[null], $f1=[null], b=[$2], c=[$3], $e=[6]}, {a=[null], $f1=[null], b=[null], c=[$3], $e=[7]}], projects=[{a, $f1, null AS b, c, 1 AS $e}, {null AS a, null AS $f1, b, c, 6 AS $e}, {null AS a, null AS $f1, null AS b, c, 7 AS $e}]) + +- Calc(select=[a, IS TRUE(>(c, 0)) AS $f1, b, c]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> </TestCase> - <TestCase name="testSingleDistinctAggregate"> + <TestCase name="testMultiDistinctAndNonDistinctAggOnDifferentColumnWithGroupingSets"> + <Resource name="sql"> + <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM MyTable2 GROUP BY GROUPING SETS (d, e)]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[$2], EXPR$1=[$3], EXPR$2=[$4]) ++- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], EXPR$0=[COUNT(DISTINCT $2)], EXPR$1=[SUM(DISTINCT $3)], EXPR$2=[COUNT($4)]) + +- LogicalProject(d=[$3], e=[$4], a=[$0], b=[$1], c=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[EXPR$0, EXPR$1, CAST(EXPR$2) AS EXPR$2]) ++- HashAggregate(isMerge=[true], groupBy=[d, e, $e], select=[d, e, $e, Final_COUNT(count$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1, Final_MIN(min$2) AS EXPR$2]) + +- Exchange(distribution=[hash[d, e, $e]]) + +- LocalHashAggregate(groupBy=[d, e, $e], select=[d, e, $e, Partial_COUNT(a) FILTER $g_8 AS count$0, Partial_SUM(b) FILTER $g_16 AS sum$1, Partial_MIN(EXPR$2) FILTER $g_24 AS min$2]) + +- Calc(select=[a, b, d, e, $e, EXPR$2, =(CASE(=($e_0, 8:BIGINT), 8:BIGINT, =($e_0, 16:BIGINT), 16:BIGINT, 24:BIGINT), 8) AS $g_8, =(CASE(=($e_0, 8:BIGINT), 8:BIGINT, =($e_0, 16:BIGINT), 16:BIGINT, 24:BIGINT), 16) AS $g_16, =(CASE(=($e_0, 8:BIGINT), 8:BIGINT, =($e_0, 16:BIGINT), 16:BIGINT, 24:BIGINT), 24) AS $g_24]) + +- HashAggregate(isMerge=[true], groupBy=[a, b, d, e, $e, $e_0], select=[a, b, d, e, $e, $e_0, Final_COUNT(count$0) AS EXPR$2]) + +- Exchange(distribution=[hash[a, b, d, e, $e, $e_0]]) + +- LocalHashAggregate(groupBy=[a, b, d, e, $e, $e_0], select=[a, b, d, e, $e, $e_0, Partial_COUNT(c) AS count$0]) + +- Expand(projects=[{a=[$0], b=[null], c=[$2], d=[$3], e=[$4], $e=[$5], $e_0=[8]}, {a=[null], b=[$1], c=[$2], d=[$3], e=[$4], $e=[$5], $e_0=[16]}, {a=[null], b=[null], c=[$2], d=[$3], e=[$4], $e=[$5], $e_0=[24]}], projects=[{a, null AS b, c, d, e, $e, 8 AS $e_0}, {null AS a, b, c, d, e, $e, 16 AS $e_0}, {null AS a, null AS b, c, d, e, $e, 24 AS $e_0}]) + +- Expand(projects=[{a=[$0], b=[$1], c=[$2], d=[$3], e=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], d=[null], e=[$4], $e=[2]}], projects=[{a, b, c, d, null AS e, 1 AS $e}, {a, b, c, null AS d, e, 2 AS $e}]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +]]> + </Resource> + </TestCase> + <TestCase name="testSingleDistinctAgg"> <Resource name="sql"> <![CDATA[SELECT COUNT(DISTINCT a) FROM MyTable]]> </Resource> @@ -115,14 +313,14 @@ SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0]) ]]> </Resource> </TestCase> - <TestCase name="testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate"> + <TestCase name="testSingleDistinctAggAndOneOrMultiNonDistinctAgg1"> <Resource name="sql"> - <![CDATA[SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable]]> + <![CDATA[SELECT COUNT(DISTINCT a) FILTER (WHERE a > 0), SUM(b) FROM MyTable]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM($1)]) -+- LogicalProject(a=[$0], b=[$1]) +LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0) FILTER $1], EXPR$1=[SUM($2)]) ++- LogicalProject(a=[$0], $f1=[IS TRUE(>($0, 0))], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> @@ -130,70 +328,130 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM($1)]) <![CDATA[ SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, Final_MIN(min$1) AS EXPR$1]) +- Exchange(distribution=[single]) - +- LocalSortAggregate(select=[Partial_COUNT(a) FILTER $g_0 AS count$0, Partial_MIN(EXPR$1) FILTER $g_1 AS min$1]) - +- Calc(select=[a, EXPR$1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) - +- HashAggregate(isMerge=[true], groupBy=[a, $e], select=[a, $e, Final_SUM(sum$0) AS EXPR$1]) - +- Exchange(distribution=[hash[a, $e]]) - +- LocalHashAggregate(groupBy=[a, $e], select=[a, $e, Partial_SUM(b) AS sum$0]) - +- Expand(projects=[{a=[$0], b=[$1], $e=[0]}, {a=[null], b=[$1], $e=[1]}], projects=[{a, b, 0 AS $e}, {null AS a, b, 1 AS $e}]) - +- Calc(select=[a, b]) + +- LocalSortAggregate(select=[Partial_COUNT(a) FILTER $g_0 AS count$0, Partial_MIN(EXPR$1) FILTER $g_3 AS min$1]) + +- Calc(select=[a, CAST($f1) AS $f1, EXPR$1, AND(=(CASE(=($e, 0:BIGINT), 0:BIGINT, 3:BIGINT), 0), IS TRUE(CAST($f1))) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 3:BIGINT), 3) AS $g_3]) + +- HashAggregate(isMerge=[true], groupBy=[a, $f1, $e], select=[a, $f1, $e, Final_SUM(sum$0) AS EXPR$1]) + +- Exchange(distribution=[hash[a, $f1, $e]]) + +- LocalHashAggregate(groupBy=[a, $f1, $e], select=[a, $f1, $e, Partial_SUM(b) AS sum$0]) + +- Expand(projects=[{a=[$0], $f1=[$1], b=[$2], $e=[0]}, {a=[null], $f1=[null], b=[$2], $e=[3]}], projects=[{a, $f1, b, 0 AS $e}, {null AS a, null AS $f1, b, 3 AS $e}]) + +- Calc(select=[a, IS TRUE(>(a, 0)) AS $f1, b]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> </TestCase> - <TestCase name="testTwoDifferentDistinctAggregateWithGroupingAndCountStar"> + <TestCase name="testSingleDistinctAggAndOneOrMultiNonDistinctAgg2"> <Resource name="sql"> - <![CDATA[SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a]]> + <![CDATA[SELECT COUNT(a) filter (WHERE a > 0), SUM(DISTINCT b) FROM MyTable]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[SUM(DISTINCT $1)], EXPR$3=[COUNT(DISTINCT $2)]) -+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +LogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $1], EXPR$1=[SUM(DISTINCT $2)]) ++- LogicalProject(a=[$0], $f1=[IS TRUE(>($0, 0))], b=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2, EXPR$3]) -+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MIN(min$0) AS EXPR$1, Final_SUM(sum$1) AS EXPR$2, Final_COUNT(count$2) AS EXPR$3]) - +- Exchange(distribution=[hash[a]]) - +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MIN(EXPR$1) FILTER $g_3 AS min$0, Partial_SUM(b) FILTER $g_1 AS sum$1, Partial_COUNT(c) FILTER $g_2 AS count$2]) - +- Calc(select=[a, b, c, EXPR$1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3]) - +- HashAggregate(isMerge=[true], groupBy=[a, b, c, $e], select=[a, b, c, $e, Final_COUNT(count1$0) AS EXPR$1]) - +- Exchange(distribution=[hash[a, b, c, $e]]) - +- LocalHashAggregate(groupBy=[a, b, c, $e], select=[a, b, c, $e, Partial_COUNT(*) AS count1$0]) - +- Expand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[$2], $e=[2]}, {a=[$0], b=[null], c=[null], $e=[3]}], projects=[{a, b, null AS c, 1 AS $e}, {a, null AS b, c, 2 AS $e}, {a, null AS b, null AS c, 3 AS $e}]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +Calc(select=[CAST(CASE(IS NOT NULL(EXPR$0), EXPR$0, 0)) AS EXPR$0, EXPR$1]) ++- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1]) + +- Exchange(distribution=[single]) + +- LocalHashAggregate(select=[Partial_MIN(EXPR$0) FILTER $g_1 AS min$0, Partial_SUM(b) FILTER $g_0 AS sum$1]) + +- Calc(select=[b, EXPR$0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) + +- HashAggregate(isMerge=[true], groupBy=[b, $e], select=[b, $e, Final_COUNT(count$0) FILTER $e AS EXPR$0]) + +- Exchange(distribution=[hash[b, $e]]) + +- LocalHashAggregate(groupBy=[b, $e], select=[b, $e, Partial_COUNT(a) FILTER $f1 AS count$0]) + +- Expand(projects=[{a=[$0], $f1=[$1], b=[$2], $e=[0]}, {a=[$0], $f1=[$1], b=[null], $e=[1]}], projects=[{a, $f1, b, 0 AS $e}, {a, $f1, null AS b, 1 AS $e}]) + +- Calc(select=[a, IS TRUE(>(a, 0)) AS $f1, b]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testSingleDistinctAggAndOneOrMultiNonDistinctAggWithGroupingSets1"> + <Resource name="sql"> + <![CDATA[SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable2 GROUP BY GROUPING SETS (b, c)]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[$2], EXPR$1=[$3]) ++- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], EXPR$0=[COUNT(DISTINCT $2)], EXPR$1=[SUM($0)]) + +- LogicalProject(b=[$1], c=[$2], a=[$0]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[EXPR$0, EXPR$1]) ++- HashAggregate(isMerge=[true], groupBy=[b, c, $e], select=[b, c, $e, Final_COUNT(count$0) AS EXPR$0, Final_MIN(min$1) AS EXPR$1]) + +- Exchange(distribution=[hash[b, c, $e]]) + +- LocalHashAggregate(groupBy=[b, c, $e], select=[b, c, $e, Partial_COUNT(a) FILTER $g_0 AS count$0, Partial_MIN(EXPR$1) FILTER $g_2 AS min$1]) + +- Calc(select=[b, c, a, $e, EXPR$1, =(CASE(=($e_0, 0:BIGINT), 0:BIGINT, 2:BIGINT), 0) AS $g_0, =(CASE(=($e_0, 0:BIGINT), 0:BIGINT, 2:BIGINT), 2) AS $g_2]) + +- HashAggregate(isMerge=[true], groupBy=[b, c, a, $e, $e_0], select=[b, c, a, $e, $e_0, Final_SUM(sum$0) AS EXPR$1]) + +- Exchange(distribution=[hash[b, c, a, $e, $e_0]]) + +- LocalHashAggregate(groupBy=[b, c, a, $e, $e_0], select=[b, c, a, $e, $e_0, Partial_SUM(b_0) AS sum$0]) + +- Expand(projects=[{b=[$0], c=[$1], a=[$2], $e=[$3], b_0=[$4], $e_0=[0]}, {b=[$0], c=[$1], a=[null], $e=[$3], b_0=[$4], $e_0=[2]}], projects=[{b, c, a, $e, b_0, 0 AS $e_0}, {b, c, null AS a, $e, b_0, 2 AS $e_0}]) + +- Expand(projects=[{b=[$0], c=[null], a=[$2], $e=[1], b_0=[$0]}, {b=[null], c=[$1], a=[$2], $e=[2], b_0=[$0]}], projects=[{b, null AS c, a, 1 AS $e, b AS b_0}, {null AS b, c, a, 2 AS $e, b AS b_0}]) + +- Calc(select=[b, c, a]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) ]]> </Resource> </TestCase> - <TestCase name="testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate2"> + <TestCase name="testSingleDistinctAggAndOneOrMultiNonDistinctAggWithGroupingSets2"> <Resource name="sql"> - <![CDATA[SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable]]> + <![CDATA[SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable2 GROUP BY GROUPING SETS (c, d)]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{}], EXPR$0=[COUNT($0)], EXPR$1=[SUM(DISTINCT $1)]) +LogicalProject(EXPR$0=[$2], EXPR$1=[$3]) ++- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], EXPR$0=[COUNT($2)], EXPR$1=[SUM(DISTINCT $3)]) + +- LogicalProject(c=[$2], d=[$3], a=[$0], b=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[CAST(EXPR$0) AS EXPR$0, EXPR$1]) ++- HashAggregate(isMerge=[true], groupBy=[c, d, $e], select=[c, d, $e, Final_MIN(min$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1]) + +- Exchange(distribution=[hash[c, d, $e]]) + +- LocalHashAggregate(groupBy=[c, d, $e], select=[c, d, $e, Partial_MIN(EXPR$0) FILTER $g_2 AS min$0, Partial_SUM(b) FILTER $g_0 AS sum$1]) + +- Calc(select=[c, d, b, $e, EXPR$0, =(CASE(=($e_0, 0:BIGINT), 0:BIGINT, 2:BIGINT), 0) AS $g_0, =(CASE(=($e_0, 0:BIGINT), 0:BIGINT, 2:BIGINT), 2) AS $g_2]) + +- HashAggregate(isMerge=[true], groupBy=[c, d, b, $e, $e_0], select=[c, d, b, $e, $e_0, Final_COUNT(count$0) AS EXPR$0]) + +- Exchange(distribution=[hash[c, d, b, $e, $e_0]]) + +- LocalHashAggregate(groupBy=[c, d, b, $e, $e_0], select=[c, d, b, $e, $e_0, Partial_COUNT(a) AS count$0]) + +- Expand(projects=[{c=[$0], d=[$1], a=[$2], b=[$3], $e=[$4], $e_0=[0]}, {c=[$0], d=[$1], a=[$2], b=[null], $e=[$4], $e_0=[2]}], projects=[{c, d, a, b, $e, 0 AS $e_0}, {c, d, a, null AS b, $e, 2 AS $e_0}]) + +- Expand(projects=[{c=[$0], d=[null], a=[$2], b=[$3], $e=[1]}, {c=[null], d=[$1], a=[$2], b=[$3], $e=[2]}], projects=[{c, null AS d, a, b, 1 AS $e}, {null AS c, d, a, b, 2 AS $e}]) + +- Calc(select=[c, d, a, b]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +]]> + </Resource> + </TestCase> + <TestCase name="testTwoDistinctAggWithGroupByAndCountStar"> + <Resource name="sql"> + <![CDATA[SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[SUM(DISTINCT $1)], EXPR$3=[COUNT(DISTINCT $1)]) +- LogicalProject(a=[$0], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[CAST(CASE(IS NOT NULL(EXPR$0), EXPR$0, 0)) AS EXPR$0, EXPR$1]) -+- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1]) - +- Exchange(distribution=[single]) - +- LocalHashAggregate(select=[Partial_MIN(EXPR$0) FILTER $g_1 AS min$0, Partial_SUM(b) FILTER $g_0 AS sum$1]) - +- Calc(select=[b, EXPR$0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) - +- HashAggregate(isMerge=[true], groupBy=[b, $e], select=[b, $e, Final_COUNT(count$0) AS EXPR$0]) - +- Exchange(distribution=[hash[b, $e]]) - +- LocalHashAggregate(groupBy=[b, $e], select=[b, $e, Partial_COUNT(a) AS count$0]) +Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2, EXPR$3]) ++- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MIN(min$0) AS EXPR$1, Final_SUM(sum$1) FILTER count$2 AS EXPR$2, Final_COUNT(count$2) FILTER count$2 AS EXPR$3]) + +- Exchange(distribution=[hash[a]]) + +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MIN(EXPR$1) FILTER $g_1 AS min$0, Partial_SUM(b) FILTER $g_0 AS sum$1, Partial_COUNT(b) FILTER $g_0 AS count$2]) + +- Calc(select=[a, b, EXPR$1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) + +- HashAggregate(isMerge=[true], groupBy=[a, b, $e], select=[a, b, $e, Final_COUNT(count1$0) AS EXPR$1]) + +- Exchange(distribution=[hash[a, b, $e]]) + +- LocalHashAggregate(groupBy=[a, b, $e], select=[a, b, $e, Partial_COUNT(*) AS count1$0]) +- Expand(projects=[{a=[$0], b=[$1], $e=[0]}, {a=[$0], b=[null], $e=[1]}], projects=[{a, b, 0 AS $e}, {a, null AS b, 1 AS $e}]) +- Calc(select=[a, b]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> </TestCase> - <TestCase name="testSingleDistinctAggregateWithGrouping"> + <TestCase name="testSingleDistinctAggWithGroupBy"> <Resource name="sql"> <![CDATA[SELECT a, COUNT(a), SUM(DISTINCT b) FROM MyTable GROUP BY a]]> </Resource> @@ -220,7 +478,7 @@ Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2]) ]]> </Resource> </TestCase> - <TestCase name="testSingleDistinctAggregateWithGroupingAndCountStar"> + <TestCase name="testSingleDistinctAggWithGroupByAndCountStar"> <Resource name="sql"> <![CDATA[SELECT a, COUNT(*), SUM(DISTINCT b) FROM MyTable GROUP BY a]]> </Resource> @@ -247,30 +505,165 @@ Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2]) ]]> </Resource> </TestCase> - <TestCase name="testTwoDistinctAggregateWithGroupingAndCountStar"> + <TestCase name="testTwoDifferentDistinctAggWithGroupByAndCountStar"> <Resource name="sql"> - <![CDATA[SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT b) FROM MyTable GROUP BY a]]> + <![CDATA[SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[SUM(DISTINCT $1)], EXPR$3=[COUNT(DISTINCT $1)]) -+- LogicalProject(a=[$0], b=[$1]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[SUM(DISTINCT $1)], EXPR$3=[COUNT(DISTINCT $2)]) ++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2, EXPR$3]) -+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MIN(min$0) AS EXPR$1, Final_SUM(sum$1) FILTER count$2 AS EXPR$2, Final_COUNT(count$2) FILTER count$2 AS EXPR$3]) ++- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MIN(min$0) AS EXPR$1, Final_SUM(sum$1) AS EXPR$2, Final_COUNT(count$2) AS EXPR$3]) +- Exchange(distribution=[hash[a]]) - +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MIN(EXPR$1) FILTER $g_1 AS min$0, Partial_SUM(b) FILTER $g_0 AS sum$1, Partial_COUNT(b) FILTER $g_0 AS count$2]) - +- Calc(select=[a, b, EXPR$1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) - +- HashAggregate(isMerge=[true], groupBy=[a, b, $e], select=[a, b, $e, Final_COUNT(count1$0) AS EXPR$1]) - +- Exchange(distribution=[hash[a, b, $e]]) - +- LocalHashAggregate(groupBy=[a, b, $e], select=[a, b, $e, Partial_COUNT(*) AS count1$0]) - +- Expand(projects=[{a=[$0], b=[$1], $e=[0]}, {a=[$0], b=[null], $e=[1]}], projects=[{a, b, 0 AS $e}, {a, null AS b, 1 AS $e}]) - +- Calc(select=[a, b]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) + +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MIN(EXPR$1) FILTER $g_3 AS min$0, Partial_SUM(b) FILTER $g_1 AS sum$1, Partial_COUNT(c) FILTER $g_2 AS count$2]) + +- Calc(select=[a, b, c, EXPR$1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3]) + +- HashAggregate(isMerge=[true], groupBy=[a, b, c, $e], select=[a, b, c, $e, Final_COUNT(count1$0) AS EXPR$1]) + +- Exchange(distribution=[hash[a, b, c, $e]]) + +- LocalHashAggregate(groupBy=[a, b, c, $e], select=[a, b, c, $e, Partial_COUNT(*) AS count1$0]) + +- Expand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[$2], $e=[2]}, {a=[$0], b=[null], c=[null], $e=[3]}], projects=[{a, b, null AS c, 1 AS $e}, {a, null AS b, c, 2 AS $e}, {a, null AS b, null AS c, 3 AS $e}]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="TestMultiDistinctOnDifferentColumnWithFilter"> + <Resource name="sql"> + <![CDATA[SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a > 0), +COUNT(DISTINCT b) FILTER (WHERE b > 1) FROM MyTable2 GROUP BY d]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[COUNT(DISTINCT $3) FILTER $4]) ++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 0))], b=[$1], $f4=[IS TRUE(>($1, 1))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS EXPR$1, Final_COUNT(count$1) AS EXPR$2, Final_COUNT(count$2) AS EXPR$3]) ++- Exchange(distribution=[hash[d]]) + +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(c) FILTER $g_7 AS count$0, Partial_COUNT(c) FILTER $g_3 AS count$1, Partial_COUNT(b) FILTER $g_12 AS count$2]) + +- Calc(select=[d, c, CAST($f2) AS $f2, b, CAST($f4) AS $f4, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, 12:BIGINT), 3), IS TRUE(CAST($f2))) AS $g_3, =(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, 12:BIGINT), 7) AS $g_7, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, 12:BIGINT), 12), IS TRUE(CAST($f4))) AS $g_12]) + +- HashAggregate(isMerge=[true], groupBy=[d, c, $f2, b, $f4, $e], select=[d, c, $f2, b, $f4, $e]) + +- Exchange(distribution=[hash[d, c, $f2, b, $f4, $e]]) + +- LocalHashAggregate(groupBy=[d, c, $f2, b, $f4, $e], select=[d, c, $f2, b, $f4, $e]) + +- Expand(projects=[{d=[$0], c=[$1], $f2=[$2], b=[null], $f4=[null], $e=[3]}, {d=[$0], c=[$1], $f2=[null], b=[null], $f4=[null], $e=[7]}, {d=[$0], c=[null], $f2=[null], b=[$3], $f4=[$4], $e=[12]}], projects=[{d, c, $f2, null AS b, null AS $f4, 3 AS $e}, {d, c, null AS $f2, null AS b, null AS $f4, 7 AS $e}, {d, null AS c, null AS $f2, b, $f4, 12 AS $e}]) + +- Calc(select=[d, c, IS TRUE(>(a, 0)) AS $f2, b, IS TRUE(>(b, 1)) AS $f4]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +]]> + </Resource> + </TestCase> + <TestCase name="TestMultiDistinctWithFilterAndNonDistinctAgg"> + <Resource name="sql"> + <![CDATA[SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a > 0), +MAX(e), MIN(e) FROM MyTable2 GROUP BY d]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[MAX($3)], EXPR$4=[MIN($3)]) ++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 0))], e=[$4]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +SortAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS EXPR$1, Final_COUNT(count$1) AS EXPR$2, Final_MIN(min$2) AS EXPR$3, Final_MIN(min$3) AS EXPR$4]) ++- Sort(orderBy=[d ASC]) + +- Exchange(distribution=[hash[d]]) + +- LocalSortAggregate(groupBy=[d], select=[d, Partial_COUNT(c) FILTER $g_1 AS count$0, Partial_COUNT(c) FILTER $g_0 AS count$1, Partial_MIN(EXPR$3) FILTER $g_3 AS min$2, Partial_MIN(EXPR$4) FILTER $g_3 AS min$3]) + +- Calc(select=[d, c, CAST($f2) AS $f2, EXPR$3, EXPR$4, AND(=(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 0), IS TRUE(CAST($f2))) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 3) AS $g_3]) + +- Sort(orderBy=[d ASC]) + +- SortAggregate(isMerge=[false], groupBy=[d, c, $f2, $e], select=[d, c, $f2, $e, MAX(e) AS EXPR$3, MIN(e) AS EXPR$4]) + +- Sort(orderBy=[d ASC, c ASC, $f2 ASC, $e ASC]) + +- Exchange(distribution=[hash[d, c, $f2, $e]]) + +- Expand(projects=[{d=[$0], c=[$1], $f2=[$2], e=[$3], $e=[0]}, {d=[$0], c=[$1], $f2=[null], e=[$3], $e=[1]}, {d=[$0], c=[null], $f2=[null], e=[$3], $e=[3]}], projects=[{d, c, $f2, e, 0 AS $e}, {d, c, null AS $f2, e, 1 AS $e}, {d, null AS c, null AS $f2, e, 3 AS $e}]) + +- Calc(select=[d, c, IS TRUE(>(a, 0)) AS $f2, e]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiDistinctOnSameColumnWithFilter"> + <Resource name="sql"> + <![CDATA[SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a > 10), +COUNT(DISTINCT c) FILTER (WHERE a < 10) FROM MyTable2 GROUP BY d]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[COUNT(DISTINCT $1) FILTER $3]) ++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 10))], $f3=[IS TRUE(<($0, 10))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS EXPR$1, Final_COUNT(count$1) AS EXPR$2, Final_COUNT(count$2) AS EXPR$3]) ++- Exchange(distribution=[hash[d]]) + +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(c) FILTER $g_3 AS count$0, Partial_COUNT(c) FILTER $g_1 AS count$1, Partial_COUNT(c) FILTER $g_2 AS count$2]) + +- Calc(select=[d, c, CAST($f2) AS $f2, CAST($f3) AS $f3, AND(=(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 1), IS TRUE(CAST($f2))) AS $g_1, AND(=(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 2), IS TRUE(CAST($f3))) AS $g_2, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3]) + +- HashAggregate(isMerge=[true], groupBy=[d, c, $f2, $f3, $e], select=[d, c, $f2, $f3, $e]) + +- Exchange(distribution=[hash[d, c, $f2, $f3, $e]]) + +- LocalHashAggregate(groupBy=[d, c, $f2, $f3, $e], select=[d, c, $f2, $f3, $e]) + +- Expand(projects=[{d=[$0], c=[$1], $f2=[$2], $f3=[null], $e=[1]}, {d=[$0], c=[$1], $f2=[null], $f3=[$3], $e=[2]}, {d=[$0], c=[$1], $f2=[null], $f3=[null], $e=[3]}], projects=[{d, c, $f2, null AS $f3, 1 AS $e}, {d, c, null AS $f2, $f3, 2 AS $e}, {d, c, null AS $f2, null AS $f3, 3 AS $e}]) + +- Calc(select=[d, c, IS TRUE(>(a, 10)) AS $f2, IS TRUE(<(a, 10)) AS $f3]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +]]> + </Resource> + </TestCase> + <TestCase name="testSingleDistinctWithFilter"> + <Resource name="sql"> + <![CDATA[SELECT d, COUNT(DISTINCT c) FILTER (WHERE a > 0) FROM MyTable2 GROUP BY d]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1) FILTER $2]) ++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 0))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS EXPR$1]) ++- Exchange(distribution=[hash[d]]) + +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(i$c) AS count$0]) + +- HashAggregate(isMerge=[true], groupBy=[d, $f2, i$c], select=[d, $f2, i$c]) + +- Exchange(distribution=[hash[d, $f2, i$c]]) + +- LocalHashAggregate(groupBy=[d, $f2, i$c], select=[d, $f2, i$c]) + +- Calc(select=[d, IS TRUE(>(a, 0)) AS $f2, CASE(IS TRUE(>(a, 0)), c, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS i$c]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiDistinctAndNonDistinctAggWithFilter"> + <Resource name="sql"> + <![CDATA[SELECT d, MAX(e), MAX(e) FILTER (WHERE a < 10), COUNT(DISTINCT c), +COUNT(DISTINCT c) FILTER (WHERE a > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3) +FROM MyTable2 GROUP BY d]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[MAX($1) FILTER $2], EXPR$3=[COUNT(DISTINCT $3)], EXPR$4=[COUNT(DISTINCT $3) FILTER $4], EXPR$5=[COUNT(DISTINCT $5) FILTER $6]) ++- LogicalProject(d=[$3], e=[$4], $f2=[IS TRUE(<($0, 10))], c=[$2], $f4=[IS TRUE(>($0, 5))], b=[$1], $f6=[IS TRUE(>($1, 3))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +SortAggregate(isMerge=[true], groupBy=[d], select=[d, Final_MIN(min$0) AS EXPR$1, Final_MIN(min$1) AS EXPR$2, Final_COUNT(count$2) AS EXPR$3, Final_COUNT(count$3) AS EXPR$4, Final_COUNT(count$4) AS EXPR$5]) ++- Sort(orderBy=[d ASC]) + +- Exchange(distribution=[hash[d]]) + +- LocalSortAggregate(groupBy=[d], select=[d, Partial_MIN(EXPR$1) FILTER $g_15 AS min$0, Partial_MIN(EXPR$2) FILTER $g_15 AS min$1, Partial_COUNT(c) FILTER $g_7 AS count$2, Partial_COUNT(c) FILTER $g_3 AS count$3, Partial_COUNT(b) FILTER $g_12 AS count$4]) + +- Calc(select=[d, c, CAST($f4) AS $f4, b, CAST($f6) AS $f6, EXPR$1, EXPR$2, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 3), IS TRUE(CAST($f4))) AS $g_3, =(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 7) AS $g_7, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 12), IS TRUE(CAST($f6))) AS $g_12, =(CASE(=($e, [...] + +- Sort(orderBy=[d ASC]) + +- SortAggregate(isMerge=[false], groupBy=[d, c, $f4, b, $f6, $e], select=[d, c, $f4, b, $f6, $e, MAX(e) AS EXPR$1, MAX(e) FILTER $f2 AS EXPR$2]) + +- Sort(orderBy=[d ASC, c ASC, $f4 ASC, b ASC, $f6 ASC, $e ASC]) + +- Exchange(distribution=[hash[d, c, $f4, b, $f6, $e]]) + +- Expand(projects=[{d=[$0], e=[$1], $f2=[$2], c=[$3], $f4=[$4], b=[null], $f6=[null], $e=[3]}, {d=[$0], e=[$1], $f2=[$2], c=[$3], $f4=[null], b=[null], $f6=[null], $e=[7]}, {d=[$0], e=[$1], $f2=[$2], c=[null], $f4=[null], b=[$5], $f6=[$6], $e=[12]}, {d=[$0], e=[$1], $f2=[$2], c=[null], $f4=[null], b=[null], $f6=[null], $e=[15]}], projects=[{d, e, $f2, c, $f4, null AS b, null AS $f6, 3 AS $e}, {d, e, $f2, c, null AS $f4, null AS b, null AS $f6, 7 AS $e}, {d, e, $f [...] + +- Calc(select=[d, e, IS TRUE(<(a, 10)) AS $f2, c, IS TRUE(>(a, 5)) AS $f4, b, IS TRUE(>(b, 3)) AS $f6]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml index b0744cf..0268505 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml @@ -109,22 +109,23 @@ FlinkLogicalCalc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3]) </TestCase> <TestCase name="testMultiDistinctAggOnDifferentColumn"> <Resource name="sql"> - <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable]]> + <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b), +COUNT(DISTINCT c) FILTER (WHERE a > 5) FROM MyTable]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)]) -+- LogicalProject(a=[$0], b=[$1]) +LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $2) FILTER $3]) ++- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[IS TRUE(>($0, 5))]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $2], EXPR$1=[SUM($1) FILTER $3]) -+- FlinkLogicalCalc(select=[a, b, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 2) AS $g_2]) - +- FlinkLogicalAggregate(group=[{0, 1, 2}]) - +- FlinkLogicalExpand(projects=[{a=[$0], b=[null], $e=[1]}, {a=[null], b=[$1], $e=[2]}]) - +- FlinkLogicalCalc(select=[a, b]) +FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $4], EXPR$1=[SUM($1) FILTER $5], EXPR$2=[COUNT($2) FILTER $6]) ++- FlinkLogicalCalc(select=[a, b, c, CAST($f3) AS $f3, =(CASE(=($e, 7:BIGINT), 7:BIGINT, =($e, 11:BIGINT), 11:BIGINT, 12:BIGINT), 7) AS $g_7, =(CASE(=($e, 7:BIGINT), 7:BIGINT, =($e, 11:BIGINT), 11:BIGINT, 12:BIGINT), 11) AS $g_11, AND(=(CASE(=($e, 7:BIGINT), 7:BIGINT, =($e, 11:BIGINT), 11:BIGINT, 12:BIGINT), 12), IS TRUE(CAST($f3))) AS $g_12]) + +- FlinkLogicalAggregate(group=[{0, 1, 2, 3, 4}]) + +- FlinkLogicalExpand(projects=[{a=[$0], b=[null], c=[null], $f3=[null], $e=[7]}, {a=[null], b=[$1], c=[null], $f3=[null], $e=[11]}, {a=[null], b=[null], c=[$2], $f3=[$3], $e=[12]}]) + +- FlinkLogicalCalc(select=[a, b, c, IS TRUE(>(a, 5)) AS $f3]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -156,21 +157,24 @@ FlinkLogicalCalc(select=[EXPR$0, EXPR$1]) </TestCase> <TestCase name="testMultiDistinctAggOnSameColumn"> <Resource name="sql"> - <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable]]> + <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT a) FILTER (WHERE b > 0), +MAX(DISTINCT a) FROM MyTable]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $0)], EXPR$2=[MAX($0)]) -+- LogicalProject(a=[$0]) +LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $0) FILTER $1], EXPR$2=[MAX($0)]) ++- LogicalProject(a=[$0], $f1=[IS TRUE(>($1, 0))]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0)], EXPR$1=[SUM($0)], EXPR$2=[MAX($0)]) -+- FlinkLogicalAggregate(group=[{0}]) - +- FlinkLogicalCalc(select=[a]) - +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $4], EXPR$1=[SUM($0) FILTER $3], EXPR$2=[MIN($2) FILTER $5]) ++- FlinkLogicalCalc(select=[a, CAST($f1) AS $f1, EXPR$2, AND(=(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 0), IS TRUE(CAST($f1))) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 3) AS $g_3]) + +- FlinkLogicalAggregate(group=[{0, 1, 2}], EXPR$2=[MAX($3)]) + +- FlinkLogicalExpand(projects=[{a=[$0], $f1=[$1], $e=[0], a_0=[$0]}, {a=[$0], $f1=[null], $e=[1], a_0=[$0]}, {a=[null], $f1=[null], $e=[3], a_0=[$0]}]) + +- FlinkLogicalCalc(select=[a, IS TRUE(>(b, 0)) AS $f1]) + +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> </TestCase> @@ -199,22 +203,25 @@ FlinkLogicalCalc(select=[EXPR$0, EXPR$1, EXPR$2]) </TestCase> <TestCase name="testMultiDistinctAndNonDistinctAggOnDifferentColumn"> <Resource name="sql"> - <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM MyTable]]> + <![CDATA[SELECT COUNT(DISTINCT a) FILTER (WHERE c > 0), +SUM(DISTINCT b), COUNT(c) FROM MyTable]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[COUNT($2)]) -+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0) FILTER $1], EXPR$1=[SUM(DISTINCT $2)], EXPR$2=[COUNT($3)]) ++- LogicalProject(a=[$0], $f1=[IS TRUE(>($2, 0))], b=[$1], c=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ FlinkLogicalCalc(select=[EXPR$0, EXPR$1, CAST(CASE(IS NOT NULL(EXPR$2), EXPR$2, 0)) AS EXPR$2]) -+- FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $3], EXPR$1=[SUM($1) FILTER $4], EXPR$2=[MIN($2) FILTER $5]) - +- FlinkLogicalCalc(select=[a, b, EXPR$2, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3]) - +- FlinkLogicalAggregate(group=[{0, 1, 3}], EXPR$2=[COUNT($2)]) - +- FlinkLogicalExpand(projects=[{a=[$0], b=[null], c=[$2], $e=[1]}, {a=[null], b=[$1], c=[$2], $e=[2]}, {a=[null], b=[null], c=[$2], $e=[3]}]) - +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ++- FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $4], EXPR$1=[SUM($2) FILTER $5], EXPR$2=[MIN($3) FILTER $6]) + +- FlinkLogicalCalc(select=[a, CAST($f1) AS $f1, b, EXPR$2, AND(=(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 6:BIGINT), 6:BIGINT, 7:BIGINT), 1), IS TRUE(CAST($f1))) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 6:BIGINT), 6:BIGINT, 7:BIGINT), 6) AS $g_6, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 6:BIGINT), 6:BIGINT, 7:BIGINT), 7) AS $g_7]) + +- FlinkLogicalAggregate(group=[{0, 1, 2, 4}], EXPR$2=[COUNT($3)]) + +- FlinkLogicalExpand(projects=[{a=[$0], $f1=[$1], b=[null], c=[$3], $e=[1]}, {a=[null], $f1=[null], b=[$2], c=[$3], $e=[6]}, {a=[null], $f1=[null], b=[null], c=[$3], $e=[7]}]) + +- FlinkLogicalCalc(select=[a, IS TRUE(>(c, 0)) AS $f1, b, c]) + +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> </TestCase> @@ -264,34 +271,34 @@ FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0)]) </TestCase> <TestCase name="testSingleDistinctAggAndOneOrMultiNonDistinctAgg1"> <Resource name="sql"> - <![CDATA[SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable]]> + <![CDATA[SELECT COUNT(DISTINCT a) FILTER (WHERE a > 0), SUM(b) FROM MyTable]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM($1)]) -+- LogicalProject(a=[$0], b=[$1]) +LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0) FILTER $1], EXPR$1=[SUM($2)]) ++- LogicalProject(a=[$0], $f1=[IS TRUE(>($0, 0))], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $2], EXPR$1=[MIN($1) FILTER $3]) -+- FlinkLogicalCalc(select=[a, EXPR$1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) - +- FlinkLogicalAggregate(group=[{0, 2}], EXPR$1=[SUM($1)]) - +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $e=[0]}, {a=[null], b=[$1], $e=[1]}]) - +- FlinkLogicalCalc(select=[a, b]) +FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $3], EXPR$1=[MIN($2) FILTER $4]) ++- FlinkLogicalCalc(select=[a, CAST($f1) AS $f1, EXPR$1, AND(=(CASE(=($e, 0:BIGINT), 0:BIGINT, 3:BIGINT), 0), IS TRUE(CAST($f1))) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 3:BIGINT), 3) AS $g_3]) + +- FlinkLogicalAggregate(group=[{0, 1, 3}], EXPR$1=[SUM($2)]) + +- FlinkLogicalExpand(projects=[{a=[$0], $f1=[$1], b=[$2], $e=[0]}, {a=[null], $f1=[null], b=[$2], $e=[3]}]) + +- FlinkLogicalCalc(select=[a, IS TRUE(>(a, 0)) AS $f1, b]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> </TestCase> <TestCase name="testSingleDistinctAggAndOneOrMultiNonDistinctAgg2"> <Resource name="sql"> - <![CDATA[SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable]]> + <![CDATA[SELECT COUNT(a) filter (WHERE a > 0), SUM(DISTINCT b) FROM MyTable]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{}], EXPR$0=[COUNT($0)], EXPR$1=[SUM(DISTINCT $1)]) -+- LogicalProject(a=[$0], b=[$1]) +LogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $1], EXPR$1=[SUM(DISTINCT $2)]) ++- LogicalProject(a=[$0], $f1=[IS TRUE(>($0, 0))], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> @@ -300,9 +307,9 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT($0)], EXPR$1=[SUM(DISTINCT $1)]) FlinkLogicalCalc(select=[CAST(CASE(IS NOT NULL(EXPR$0), EXPR$0, 0)) AS EXPR$0, EXPR$1]) +- FlinkLogicalAggregate(group=[{}], EXPR$0=[MIN($1) FILTER $3], EXPR$1=[SUM($0) FILTER $2]) +- FlinkLogicalCalc(select=[b, EXPR$0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) - +- FlinkLogicalAggregate(group=[{1, 2}], EXPR$0=[COUNT($0)]) - +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $e=[0]}, {a=[$0], b=[null], $e=[1]}]) - +- FlinkLogicalCalc(select=[a, b]) + +- FlinkLogicalAggregate(group=[{2, 3}], EXPR$0=[COUNT($0) FILTER $1]) + +- FlinkLogicalExpand(projects=[{a=[$0], $f1=[$1], b=[$2], $e=[0]}, {a=[$0], $f1=[$1], b=[null], $e=[1]}]) + +- FlinkLogicalCalc(select=[a, IS TRUE(>(a, 0)) AS $f1, b]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -490,4 +497,118 @@ FlinkLogicalCalc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2, EXPR$3]) ]]> </Resource> </TestCase> + <TestCase name="TestMultiDistinctOnDifferentColumnWithFilter"> + <Resource name="sql"> + <![CDATA[SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a > 0), +COUNT(DISTINCT b) FILTER (WHERE b > 1) FROM MyTable2 GROUP BY d]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[COUNT(DISTINCT $3) FILTER $4]) ++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 0))], b=[$1], $f4=[IS TRUE(>($1, 1))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $6], EXPR$2=[COUNT($1) FILTER $5], EXPR$3=[COUNT($3) FILTER $7]) ++- FlinkLogicalCalc(select=[d, c, CAST($f2) AS $f2, b, CAST($f4) AS $f4, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, 12:BIGINT), 3), IS TRUE(CAST($f2))) AS $g_3, =(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, 12:BIGINT), 7) AS $g_7, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, 12:BIGINT), 12), IS TRUE(CAST($f4))) AS $g_12]) + +- FlinkLogicalAggregate(group=[{0, 1, 2, 3, 4, 5}]) + +- FlinkLogicalExpand(projects=[{d=[$0], c=[$1], $f2=[$2], b=[null], $f4=[null], $e=[3]}, {d=[$0], c=[$1], $f2=[null], b=[null], $f4=[null], $e=[7]}, {d=[$0], c=[null], $f2=[null], b=[$3], $f4=[$4], $e=[12]}]) + +- FlinkLogicalCalc(select=[d, c, IS TRUE(>(a, 0)) AS $f2, b, IS TRUE(>(b, 1)) AS $f4]) + +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +]]> + </Resource> + </TestCase> + <TestCase name="TestMultiDistinctWithFilterAndNonDistinctAgg"> + <Resource name="sql"> + <![CDATA[SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a > 0), +MAX(e), MIN(e) FROM MyTable2 GROUP BY d]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[MAX($3)], EXPR$4=[MIN($3)]) ++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 0))], e=[$4]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $6], EXPR$2=[COUNT($1) FILTER $5], EXPR$3=[MIN($3) FILTER $7], EXPR$4=[MIN($4) FILTER $7]) ++- FlinkLogicalCalc(select=[d, c, CAST($f2) AS $f2, EXPR$3, EXPR$4, AND(=(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 0), IS TRUE(CAST($f2))) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 3) AS $g_3]) + +- FlinkLogicalAggregate(group=[{0, 1, 2, 4}], EXPR$3=[MAX($3)], EXPR$4=[MIN($3)]) + +- FlinkLogicalExpand(projects=[{d=[$0], c=[$1], $f2=[$2], e=[$3], $e=[0]}, {d=[$0], c=[$1], $f2=[null], e=[$3], $e=[1]}, {d=[$0], c=[null], $f2=[null], e=[$3], $e=[3]}]) + +- FlinkLogicalCalc(select=[d, c, IS TRUE(>(a, 0)) AS $f2, e]) + +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiDistinctOnSameColumnWithFilter"> + <Resource name="sql"> + <![CDATA[SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a > 10), +COUNT(DISTINCT c) FILTER (WHERE a < 10) FROM MyTable2 GROUP BY d]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[COUNT(DISTINCT $1) FILTER $3]) ++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 10))], $f3=[IS TRUE(<($0, 10))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $6], EXPR$2=[COUNT($1) FILTER $4], EXPR$3=[COUNT($1) FILTER $5]) ++- FlinkLogicalCalc(select=[d, c, CAST($f2) AS $f2, CAST($f3) AS $f3, AND(=(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 1), IS TRUE(CAST($f2))) AS $g_1, AND(=(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 2), IS TRUE(CAST($f3))) AS $g_2, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3]) + +- FlinkLogicalAggregate(group=[{0, 1, 2, 3, 4}]) + +- FlinkLogicalExpand(projects=[{d=[$0], c=[$1], $f2=[$2], $f3=[null], $e=[1]}, {d=[$0], c=[$1], $f2=[null], $f3=[$3], $e=[2]}, {d=[$0], c=[$1], $f2=[null], $f3=[null], $e=[3]}]) + +- FlinkLogicalCalc(select=[d, c, IS TRUE(>(a, 10)) AS $f2, IS TRUE(<(a, 10)) AS $f3]) + +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +]]> + </Resource> + </TestCase> + <TestCase name="testSingleDistinctWithFilter"> + <Resource name="sql"> + <![CDATA[SELECT d, COUNT(DISTINCT c) FILTER (WHERE a > 0) FROM MyTable2 GROUP BY d]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1) FILTER $2]) ++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 0))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($2)]) ++- FlinkLogicalAggregate(group=[{0, 1, 2}]) + +- FlinkLogicalCalc(select=[d, IS TRUE(>(a, 0)) AS $f2, CASE(IS TRUE(>(a, 0)), c, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS i$c]) + +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiDistinctAndNonDistinctAggWithFilter"> + <Resource name="sql"> + <![CDATA[SELECT d, MAX(e), MAX(e) FILTER (WHERE a < 10), COUNT(DISTINCT c), +COUNT(DISTINCT c) FILTER (WHERE a > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3) +FROM MyTable2 GROUP BY d]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[MAX($1) FILTER $2], EXPR$3=[COUNT(DISTINCT $3)], EXPR$4=[COUNT(DISTINCT $3) FILTER $4], EXPR$5=[COUNT(DISTINCT $5) FILTER $6]) ++- LogicalProject(d=[$3], e=[$4], $f2=[IS TRUE(<($0, 10))], c=[$2], $f4=[IS TRUE(>($0, 5))], b=[$1], $f6=[IS TRUE(>($1, 3))]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +FlinkLogicalAggregate(group=[{0}], EXPR$1=[MIN($5) FILTER $10], EXPR$2=[MIN($6) FILTER $10], EXPR$3=[COUNT($1) FILTER $8], EXPR$4=[COUNT($1) FILTER $7], EXPR$5=[COUNT($3) FILTER $9]) ++- FlinkLogicalCalc(select=[d, c, CAST($f4) AS $f4, b, CAST($f6) AS $f6, EXPR$1, EXPR$2, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 3), IS TRUE(CAST($f4))) AS $g_3, =(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 7) AS $g_7, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 12), IS TRUE(CAST($f6))) AS $g_12, =(CASE(=( [...] + +- FlinkLogicalAggregate(group=[{0, 3, 4, 5, 6, 7}], EXPR$1=[MAX($1)], EXPR$2=[MAX($1) FILTER $2]) + +- FlinkLogicalExpand(projects=[{d=[$0], e=[$1], $f2=[$2], c=[$3], $f4=[$4], b=[null], $f6=[null], $e=[3]}, {d=[$0], e=[$1], $f2=[$2], c=[$3], $f4=[null], b=[null], $f6=[null], $e=[7]}, {d=[$0], e=[$1], $f2=[$2], c=[null], $f4=[null], b=[$5], $f6=[$6], $e=[12]}, {d=[$0], e=[$1], $f2=[$2], c=[null], $f4=[null], b=[null], $f6=[null], $e=[15]}]) + +- FlinkLogicalCalc(select=[d, e, IS TRUE(<(a, 10)) AS $f2, c, IS TRUE(>(a, 5)) AS $f4, b, IS TRUE(>(b, 3)) AS $f6]) + +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], fields=[a, b, c, d, e]) +]]> + </Resource> + </TestCase> + </Root> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.scala index 85f4e9c..008b5a8 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.scala @@ -18,68 +18,8 @@ package org.apache.flink.table.planner.plan.batch.sql.agg -import org.apache.flink.api.scala._ -import org.apache.flink.table.api.scala._ -import org.apache.flink.table.planner.utils.TableTestBase +import org.apache.flink.table.planner.plan.common.DistinctAggregateTestBase -import org.junit.Test - -class DistinctAggregateTest extends TableTestBase { - private val util = batchTestUtil() - util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) - - @Test - def testSingleDistinctAggregate(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a) FROM MyTable") - } - - @Test - def testMultiDistinctAggregateOnSameColumn(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable") - } - - @Test - def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = { - // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable") - } - - @Test - def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate2(): Unit = { - // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others - util.verifyPlan("SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable") - } - - @Test - def testMultiDistinctAggregateOnDifferentColumn(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable") - } - - @Test - def testMultiDistinctAndNonDistinctAggregateOnDifferentColumn(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM MyTable") - } - - @Test - def testSingleDistinctAggregateWithGrouping(): Unit = { - util.verifyPlan("SELECT a, COUNT(a), SUM(DISTINCT b) FROM MyTable GROUP BY a") - } - - @Test - def testSingleDistinctAggregateWithGroupingAndCountStar(): Unit = { - util.verifyPlan("SELECT a, COUNT(*), SUM(DISTINCT b) FROM MyTable GROUP BY a") - } - - @Test - def testTwoDistinctAggregateWithGroupingAndCountStar(): Unit = { - val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT b) FROM MyTable GROUP BY a" - util.verifyPlan(sqlQuery) - } - - @Test - def testTwoDifferentDistinctAggregateWithGroupingAndCountStar(): Unit = { - val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a" - util.verifyPlan(sqlQuery) - } +class DistinctAggregateTest extends DistinctAggregateTestBase { } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala similarity index 69% copy from flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala copy to flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala index 9552e81..d631892 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala @@ -16,62 +16,57 @@ * limitations under the License. */ -package org.apache.flink.table.planner.plan.rules.logical +package org.apache.flink.table.planner.plan.common import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.table.api.Types import org.apache.flink.table.api.scala._ -import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram -import org.apache.flink.table.planner.utils.TableTestBase +import org.apache.flink.table.planner.utils.{BatchTableTestUtil, TableTestBase} +import org.junit.{Before, Test} -import org.junit.Test - -/** - * Test for [[FlinkAggregateExpandDistinctAggregatesRule]]. - */ -class FlinkAggregateExpandDistinctAggregatesRuleTest extends TableTestBase { - private val util = batchTestUtil() - util.addTableSource[(Int, Long, Int)]("MyTable", 'a, 'b, 'c) - util.addTableSource[(Int, Long, String, String, String)]("MyTable2", 'a, 'b, 'c, 'd, 'e) - util.buildBatchProgram(FlinkBatchProgram.PHYSICAL) +abstract class DistinctAggregateTestBase extends TableTestBase { + protected val util: BatchTableTestUtil = batchTestUtil() + @Before + def setup(): Unit = { + util.addTableSource[(Int, Long, Int)]("MyTable", 'a, 'b, 'c) + util.addTableSource[(Int, Long, String, String, String)]("MyTable2", 'a, 'b, 'c, 'd, 'e) + } @Test def testSingleDistinctAgg(): Unit = { util.verifyPlan("SELECT COUNT(DISTINCT a) FROM MyTable") } @Test - def testSingleDistinctAggOnMultiColumns(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a, b) FROM MyTable") - } - - @Test def testMultiDistinctAggOnSameColumn(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable") + util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT a) FILTER (WHERE b > 0),\n" + + "MAX(DISTINCT a) FROM MyTable") } @Test def testSingleDistinctAggAndOneOrMultiNonDistinctAgg1(): Unit = { // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable") + util.verifyPlan("SELECT COUNT(DISTINCT a) FILTER (WHERE a > 0), SUM(b) FROM MyTable") } @Test def testSingleDistinctAggAndOneOrMultiNonDistinctAgg2(): Unit = { // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others // when field `a` is non-nullable, count(a) = count(*) - util.verifyPlan("SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable") + util.verifyPlan("SELECT COUNT(a) filter (WHERE a > 0), SUM(DISTINCT b) FROM MyTable") } @Test def testMultiDistinctAggOnDifferentColumn(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable") + util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b),\n" + + "COUNT(DISTINCT c) FILTER (WHERE a > 5) FROM MyTable") } @Test def testMultiDistinctAndNonDistinctAggOnDifferentColumn(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM MyTable") + util.verifyPlan("SELECT COUNT(DISTINCT a) FILTER (WHERE c > 0),\n" + + "SUM(DISTINCT b), COUNT(c) FROM MyTable") } @Test @@ -121,11 +116,6 @@ class FlinkAggregateExpandDistinctAggregatesRuleTest extends TableTestBase { } @Test - def testSingleDistinctAggOnMultiColumnsWithGroupingSets(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a, b) FROM MyTable2 GROUP BY GROUPING SETS (c, d)") - } - - @Test def testMultiDistinctAggOnSameColumnWithGroupingSets(): Unit = { val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) " + "FROM MyTable2 GROUP BY GROUPING SETS (b, c)" @@ -158,6 +148,41 @@ class FlinkAggregateExpandDistinctAggregatesRuleTest extends TableTestBase { util.verifyPlan(sqlQuery) } + @Test + def testSingleDistinctWithFilter(): Unit = { + val sqlQuery = "SELECT d, COUNT(DISTINCT c) FILTER (WHERE a > 0) FROM MyTable2 GROUP BY d" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiDistinctOnSameColumnWithFilter(): Unit = { + val sqlQuery = "SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a > 10),\n" + + "COUNT(DISTINCT c) FILTER (WHERE a < 10) FROM MyTable2 GROUP BY d" + util.verifyPlan(sqlQuery) + } + + @Test + def TestMultiDistinctOnDifferentColumnWithFilter(): Unit = { + val sqlQuery = "SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a > 0),\n" + + "COUNT(DISTINCT b) FILTER (WHERE b > 1) FROM MyTable2 GROUP BY d" + util.verifyPlan(sqlQuery) + } + + @Test + def TestMultiDistinctWithFilterAndNonDistinctAgg(): Unit = { + val sqlQuery = "SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a > 0),\n" + + "MAX(e), MIN(e) FROM MyTable2 GROUP BY d" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiDistinctAndNonDistinctAggWithFilter(): Unit = { + val sqlQuery = "SELECT d, MAX(e), MAX(e) FILTER (WHERE a < 10), COUNT(DISTINCT c),\n" + + "COUNT(DISTINCT c) FILTER (WHERE a > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3)\n" + + "FROM MyTable2 GROUP BY d" + util.verifyPlan(sqlQuery) + } + @Test(expected = classOf[RuntimeException]) def testTooManyDistinctAggOnDifferentColumn(): Unit = { // max group count must be less than 64 diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala index 9552e81..d80d641 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala @@ -18,27 +18,19 @@ package org.apache.flink.table.planner.plan.rules.logical -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ -import org.apache.flink.table.api.Types -import org.apache.flink.table.api.scala._ +import org.apache.flink.table.planner.plan.common.DistinctAggregateTestBase import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram -import org.apache.flink.table.planner.utils.TableTestBase - -import org.junit.Test +import org.junit.{Before, Test} /** * Test for [[FlinkAggregateExpandDistinctAggregatesRule]]. */ -class FlinkAggregateExpandDistinctAggregatesRuleTest extends TableTestBase { - private val util = batchTestUtil() - util.addTableSource[(Int, Long, Int)]("MyTable", 'a, 'b, 'c) - util.addTableSource[(Int, Long, String, String, String)]("MyTable2", 'a, 'b, 'c, 'd, 'e) - util.buildBatchProgram(FlinkBatchProgram.PHYSICAL) +class FlinkAggregateExpandDistinctAggregatesRuleTest extends DistinctAggregateTestBase { - @Test - def testSingleDistinctAgg(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a) FROM MyTable") + @Before + override def setup(): Unit = { + util.buildBatchProgram(FlinkBatchProgram.PHYSICAL) + super.setup() } @Test @@ -47,128 +39,7 @@ class FlinkAggregateExpandDistinctAggregatesRuleTest extends TableTestBase { } @Test - def testMultiDistinctAggOnSameColumn(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable") - } - - @Test - def testSingleDistinctAggAndOneOrMultiNonDistinctAgg1(): Unit = { - // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable") - } - - @Test - def testSingleDistinctAggAndOneOrMultiNonDistinctAgg2(): Unit = { - // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others - // when field `a` is non-nullable, count(a) = count(*) - util.verifyPlan("SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable") - } - - @Test - def testMultiDistinctAggOnDifferentColumn(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable") - } - - @Test - def testMultiDistinctAndNonDistinctAggOnDifferentColumn(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM MyTable") - } - - @Test - def testSingleDistinctAggWithGroupBy(): Unit = { - // when field `a` is non-nullable, count(a) = count(*) - util.verifyPlan("SELECT a, COUNT(a), SUM(DISTINCT b) FROM MyTable GROUP BY a") - } - - @Test - def testSingleDistinctAggWithGroupByAndCountStar(): Unit = { - util.verifyPlan("SELECT a, COUNT(*), SUM(DISTINCT b) FROM MyTable GROUP BY a") - } - - @Test - def testTwoDistinctAggWithGroupByAndCountStar(): Unit = { - val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT b) FROM MyTable GROUP BY a" - util.verifyPlan(sqlQuery) - } - - @Test - def testTwoDifferentDistinctAggWithGroupByAndCountStar(): Unit = { - val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a" - util.verifyPlan(sqlQuery) - } - - @Test - def testMultiDifferentDistinctAggWithNonDistinctAggOnSameColumn(): Unit = { - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b), MAX(a), MIN(a) FROM MyTable") - } - - @Test - def testMultiDifferentDistinctAggWithNonDistinctAggOnSameColumnAndGroupBy(): Unit = { - val sqlQuery = - "SELECT COUNT(DISTINCT a), SUM(DISTINCT b), MAX(a), MIN(a) FROM MyTable GROUP BY c" - util.verifyPlan(sqlQuery) - } - - @Test - def testMultiDifferentDistinctAggWithNonDistinctAggOnDifferentColumnAndGroupBy(): Unit = { - util.verifyPlan("SELECT SUM(DISTINCT a), COUNT(DISTINCT c) FROM MyTable GROUP BY b") - } - - @Test - def testDistinctAggWithDuplicateField(): Unit = { - // when field `a` is non-nullable, count(a) = count(*) - util.verifyPlan("SELECT a, COUNT(a), SUM(b), SUM(DISTINCT b) FROM MyTable GROUP BY a") - } - - @Test def testSingleDistinctAggOnMultiColumnsWithGroupingSets(): Unit = { util.verifyPlan("SELECT COUNT(DISTINCT a, b) FROM MyTable2 GROUP BY GROUPING SETS (c, d)") } - - @Test - def testMultiDistinctAggOnSameColumnWithGroupingSets(): Unit = { - val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) " + - "FROM MyTable2 GROUP BY GROUPING SETS (b, c)" - util.verifyPlan(sqlQuery) - } - - @Test - def testSingleDistinctAggAndOneOrMultiNonDistinctAggWithGroupingSets1(): Unit = { - // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others - util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable2 GROUP BY GROUPING SETS (b, c)") - } - - @Test - def testSingleDistinctAggAndOneOrMultiNonDistinctAggWithGroupingSets2(): Unit = { - // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others - util.verifyPlan("SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable2 GROUP BY GROUPING SETS (c, d)") - } - - @Test - def testMultiDistinctAggOnDifferentColumnWithGroupingSets(): Unit = { - val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable2 " + - "GROUP BY GROUPING SETS (c, d)" - util.verifyPlan(sqlQuery) - } - - @Test - def testMultiDistinctAndNonDistinctAggOnDifferentColumnWithGroupingSets(): Unit = { - val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM MyTable2 " + - "GROUP BY GROUPING SETS (d, e)" - util.verifyPlan(sqlQuery) - } - - @Test(expected = classOf[RuntimeException]) - def testTooManyDistinctAggOnDifferentColumn(): Unit = { - // max group count must be less than 64 - val fieldNames = (0 until 64).map(i => s"f$i").toArray - val fieldTypes: Array[TypeInformation[_]] = Array.fill(fieldNames.length)(Types.INT) - util.addTableSource("MyTable64", fieldTypes, fieldNames) - - val distinctList = fieldNames.map(f => s"COUNT(DISTINCT $f)").mkString(", ") - val maxList = fieldNames.map(f => s"MAX($f)").mkString(", ") - val sqlQuery = s"SELECT $distinctList, $maxList FROM MyTable64" - - util.verifyPlan(sqlQuery) - } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala index 07016a4..fd0a126 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala @@ -24,7 +24,6 @@ import org.apache.flink.table.planner.runtime.utils.BatchTestBase import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row import org.apache.flink.table.planner.runtime.utils.TestData._ import org.apache.flink.types.Row - import org.junit.{Before, Ignore, Test} import scala.collection.Seq @@ -282,6 +281,64 @@ abstract class DistinctAggregateITCaseBase extends BatchTestBase { ) } + @Test + def testSingleDistinctWithFilter(): Unit = { + checkResult("SELECT e, COUNT(DISTINCT a) FILTER (WHERE c > 0) FROM Table5 GROUP BY e", + Seq(row(1, 3), row(2, 4), row(3, 2)) + ) + } + + @Test + def testMultiDistinctOnSameColumnWithFilter(): Unit = { + checkResult("SELECT e, COUNT(DISTINCT a), COUNT(DISTINCT a) FILTER (WHERE c > 0), " + + "COUNT(DISTINCT a) FILTER (WHERE c < 10) FROM Table5 GROUP BY e", + Seq(row(1, 4, 3, 3), row(2, 4, 4, 3), row(3, 2, 2, 1))) + } + + @Test + def TestMultiDistinctOnDifferentColumnWithFilter(): Unit = { + checkResult("SELECT e, COUNT(DISTINCT a), COUNT(DISTINCT a) FILTER (WHERE c > 0), " + + "COUNT(DISTINCT b) FILTER (WHERE b > 1) FROM Table5 GROUP BY e", + Seq(row(1, 4, 3, 4), row(2, 4, 4, 7), row(3, 2, 2, 3))) + } + + @Test + def TestMultiDistinctWithFilterAndNonDistinctAgg(): Unit = { + checkResult("SELECT e, COUNT(DISTINCT a), COUNT(DISTINCT a) FILTER (WHERE c > 0), " + + "MAX(c), MIN(c) FROM Table5 GROUP BY e", + Seq(row(1, 4, 3, 10, 0), row(2, 4, 4, 14, 1), row (3, 2, 2, 12, 5))) + } + + @Test + def testMultiDistinctAndNonDistinctAggWithFilter(): Unit = { + checkResult("SELECT e, MAX(c), MAX(c) FILTER (WHERE b < 10), COUNT(DISTINCT a), " + + "COUNT(DISTINCT a) FILTER (WHERE c > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3)\n" + + "FROM Table5 GROUP BY e", + Seq(row(1, 10, 8, 4, 2, 3), row(2, 14, 6, 4, 2, 6), row (3, 12, 5, 2, 1, 3))) + } + + @Test + def TestDistinctWithFilterWithoutGroupBy(): Unit = { + // single distinct agg with filter. + checkResult("SELECT COUNT(DISTINCT a) FILTER (WHERE c > 0) FROM Table5", + Seq(row(4))) + + // multi distinct aggs on same column with filter. + checkResult("SELECT COUNT(DISTINCT a), COUNT(DISTINCT a) FILTER (WHERE c > 10),\n" + + "COUNT(DISTINCT a) FILTER (WHERE c < 10) FROM Table5", + Seq(row(5, 1, 4))) + + // multi distinct aggs on different columns with filter. + checkResult("SELECT COUNT(DISTINCT a), COUNT(DISTINCT a) FILTER (WHERE c > 0),\n" + + "COUNT(DISTINCT b) FILTER (WHERE b > 1) FROM Table5", + Seq(row(5, 4, 14))) + + // multi distinct aggs with non-distinct agg with filter. + checkResult("SELECT MAX(e), MAX(e) FILTER (WHERE c < 10), COUNT(DISTINCT a),\n" + + "COUNT(DISTINCT a) FILTER (WHERE c > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3) FROM Table5", + Seq(row(3, 3, 5, 2, 12))) + } + // TODO remove Ignore after supporting generated code cloud be splitted into // small classes or methods due to code is too large @Ignore