This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new ce7968e  [FLINK-15466][table-planner-blink] Fix the wrong plan when 
use distinct aggregations with filter
ce7968e is described below

commit ce7968ecafcde69596897a2243235161cd347b13
Author: Shuo Cheng <mf1533...@smail.nju.edu.cn>
AuthorDate: Thu Jan 9 17:04:24 2020 +0800

    [FLINK-15466][table-planner-blink] Fix the wrong plan when use distinct 
aggregations with filter
    
    This closes #10760
    
    (cherry picked from commit 6f2e9abffb0b1ef68e4f2cf058a24524b61e88a1)
---
 ...FlinkAggregateExpandDistinctAggregatesRule.java |  26 +-
 .../plan/batch/sql/agg/DistinctAggregateTest.xml   | 561 ++++++++++++++++++---
 ...nkAggregateExpandDistinctAggregatesRuleTest.xml | 195 +++++--
 .../plan/batch/sql/agg/DistinctAggregateTest.scala |  64 +--
 .../DistinctAggregateTestBase.scala}               |  81 ++-
 ...AggregateExpandDistinctAggregatesRuleTest.scala | 143 +-----
 .../sql/agg/DistinctAggregateITCaseBase.scala      |  59 ++-
 7 files changed, 775 insertions(+), 354 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRule.java
index 8023100..4b44eec 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRule.java
@@ -417,14 +417,17 @@ public final class 
FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule
                        Aggregate aggregate) {
                final Set<ImmutableBitSet> groupSetTreeSet =
                                new TreeSet<>(ImmutableBitSet.ORDERING);
+               final Map<ImmutableBitSet, Integer> 
groupSetToDistinctAggCallFilterArg = new HashMap<>();
                for (AggregateCall aggCall : aggregate.getAggCallList()) {
                        if (!aggCall.isDistinct()) {
                                groupSetTreeSet.add(aggregate.getGroupSet());
                        } else {
-                               groupSetTreeSet.add(
+                               ImmutableBitSet groupSet =
                                                
ImmutableBitSet.of(aggCall.getArgList())
                                                                
.setIf(aggCall.filterArg, aggCall.filterArg >= 0)
-                                                               
.union(aggregate.getGroupSet()));
+                                                               
.union(aggregate.getGroupSet());
+                               
groupSetToDistinctAggCallFilterArg.put(groupSet, aggCall.filterArg);
+                               groupSetTreeSet.add(groupSet);
                        }
                }
 
@@ -471,10 +474,21 @@ public final class 
FlinkAggregateExpandDistinctAggregatesRule extends RelOptRule
                        final RexNode nodeZ = nodes.remove(nodes.size() - 1);
                        for (Map.Entry<ImmutableBitSet, Integer> entry : 
filters.entrySet()) {
                                final long v = groupValue(fullGroupSet, 
entry.getKey());
-                               nodes.add(
-                                               relBuilder.alias(
-                                                               
relBuilder.equals(nodeZ, relBuilder.literal(v)),
-                                                               "$g_" + v));
+                               // Get and remap the filterArg of the distinct 
aggregate call.
+                               int distinctAggCallFilterArg = 
remap(fullGroupSet,
+                                       
groupSetToDistinctAggCallFilterArg.getOrDefault(entry.getKey(), -1));
+                               RexNode expr;
+                               if (distinctAggCallFilterArg < 0) {
+                                       expr = relBuilder.equals(nodeZ, 
relBuilder.literal(v));
+                               } else {
+                                       RexBuilder rexBuilder = 
aggregate.getCluster().getRexBuilder();
+                                       // merge the filter of the distinct 
aggregate call itself.
+                                       expr = relBuilder.and(
+                                               relBuilder.equals(nodeZ, 
relBuilder.literal(v)),
+                                               
rexBuilder.makeCall(SqlStdOperatorTable.IS_TRUE,
+                                                       
relBuilder.field(distinctAggCallFilterArg)));
+                               }
+                               nodes.add(relBuilder.alias(expr, "$g_" + v));
                        }
                        relBuilder.project(nodes);
                }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml
index e93942a..577f89a 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.xml
@@ -16,64 +16,233 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
-  <TestCase name="testMultiDistinctAggregateOnDifferentColumn">
+  <TestCase name="testDistinctAggWithDuplicateField">
     <Resource name="sql">
-      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable]]>
+      <![CDATA[SELECT a, COUNT(a), SUM(b), SUM(DISTINCT b) FROM MyTable GROUP 
BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT 
$1)])
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT($0)], EXPR$2=[SUM($1)], 
EXPR$3=[SUM(DISTINCT $1)])
 +- LogicalProject(a=[$0], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, 
Final_SUM(sum$1) AS EXPR$1])
+Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2, EXPR$3])
++- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MIN(min$0) AS 
EXPR$1, Final_MIN(min$1) AS EXPR$2, Final_SUM(sum$2) AS EXPR$3])
+   +- Exchange(distribution=[hash[a]])
+      +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MIN(EXPR$1) FILTER 
$g_1 AS min$0, Partial_MIN(EXPR$2) FILTER $g_1 AS min$1, Partial_SUM(b) FILTER 
$g_0 AS sum$2])
+         +- Calc(select=[a, b, EXPR$1, EXPR$2, =(CASE(=($e, 0:BIGINT), 
0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 
1) AS $g_1])
+            +- HashAggregate(isMerge=[true], groupBy=[a, b, $e], select=[a, b, 
$e, Final_COUNT(count$0) AS EXPR$1, Final_SUM(sum$1) AS EXPR$2])
+               +- Exchange(distribution=[hash[a, b, $e]])
+                  +- LocalHashAggregate(groupBy=[a, b, $e], select=[a, b, $e, 
Partial_COUNT(a) AS count$0, Partial_SUM(b_0) AS sum$1])
+                     +- Expand(projects=[{a=[$0], b=[$1], $e=[0], b_0=[$1]}, 
{a=[$0], b=[null], $e=[1], b_0=[$1]}], projects=[{a, b, 0 AS $e, b AS b_0}, {a, 
null AS b, 1 AS $e, b AS b_0}])
+                        +- Calc(select=[a, b])
+                           +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testMultiDifferentDistinctAggWithNonDistinctAggOnDifferentColumnAndGroupBy">
+    <Resource name="sql">
+      <![CDATA[SELECT SUM(DISTINCT a), COUNT(DISTINCT c) FROM MyTable GROUP BY 
b]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
++- LogicalAggregate(group=[{0}], EXPR$0=[SUM(DISTINCT $1)], 
EXPR$1=[COUNT(DISTINCT $2)])
+   +- LogicalProject(b=[$1], a=[$0], c=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[EXPR$0, EXPR$1])
++- HashAggregate(isMerge=[true], groupBy=[b], select=[b, Final_SUM(sum$0) AS 
EXPR$0, Final_COUNT(count$1) AS EXPR$1])
+   +- Exchange(distribution=[hash[b]])
+      +- LocalHashAggregate(groupBy=[b], select=[b, Partial_SUM(a) FILTER $g_1 
AS sum$0, Partial_COUNT(c) FILTER $g_2 AS count$1])
+         +- Calc(select=[b, a, c, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 
1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 2) AS $g_2])
+            +- HashAggregate(isMerge=[true], groupBy=[a, b, c, $e], select=[a, 
b, c, $e])
+               +- Exchange(distribution=[hash[a, b, c, $e]])
+                  +- LocalHashAggregate(groupBy=[a, b, c, $e], select=[a, b, 
c, $e])
+                     +- Expand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, 
{a=[null], b=[$1], c=[$2], $e=[4]}], projects=[{a, b, null AS c, 1 AS $e}, 
{null AS a, b, c, 4 AS $e}])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiDifferentDistinctAggWithNonDistinctAggOnSameColumn">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b), MAX(a), MIN(a) FROM 
MyTable]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT 
$1)], EXPR$2=[MAX($0)], EXPR$3=[MIN($0)])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, 
Final_SUM(sum$1) AS EXPR$1, Final_MIN(min$2) AS EXPR$2, Final_MIN(min$3) AS 
EXPR$3])
 +- Exchange(distribution=[single])
-   +- LocalSortAggregate(select=[Partial_COUNT(a) FILTER $g_1 AS count$0, 
Partial_SUM(b) FILTER $g_2 AS sum$1])
-      +- Calc(select=[a, b, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 1) AS 
$g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 2) AS $g_2])
-         +- HashAggregate(isMerge=[true], groupBy=[a, b, $e], select=[a, b, 
$e])
+   +- LocalSortAggregate(select=[Partial_COUNT(a) FILTER $g_1 AS count$0, 
Partial_SUM(b) FILTER $g_2 AS sum$1, Partial_MIN(EXPR$2) FILTER $g_3 AS min$2, 
Partial_MIN(EXPR$3) FILTER $g_3 AS min$3])
+      +- Calc(select=[a, b, EXPR$2, EXPR$3, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 
=($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 
1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, =(CASE(=($e, 
1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3])
+         +- HashAggregate(isMerge=[true], groupBy=[a, b, $e], select=[a, b, 
$e, Final_MAX(max$0) AS EXPR$2, Final_MIN(min$1) AS EXPR$3])
             +- Exchange(distribution=[hash[a, b, $e]])
-               +- LocalHashAggregate(groupBy=[a, b, $e], select=[a, b, $e])
-                  +- Expand(projects=[{a=[$0], b=[null], $e=[1]}, {a=[null], 
b=[$1], $e=[2]}], projects=[{a, null AS b, 1 AS $e}, {null AS a, b, 2 AS $e}])
+               +- LocalHashAggregate(groupBy=[a, b, $e], select=[a, b, $e, 
Partial_MAX(a_0) AS max$0, Partial_MIN(a_0) AS min$1])
+                  +- Expand(projects=[{a=[$0], b=[null], $e=[1], a_0=[$0]}, 
{a=[null], b=[$1], $e=[2], a_0=[$0]}, {a=[null], b=[null], $e=[3], a_0=[$0]}], 
projects=[{a, null AS b, 1 AS $e, a AS a_0}, {null AS a, b, 2 AS $e, a AS a_0}, 
{null AS a, null AS b, 3 AS $e, a AS a_0}])
                      +- Calc(select=[a, b])
                         +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testMultiDistinctAggregateOnSameColumn">
+  <TestCase 
name="testMultiDifferentDistinctAggWithNonDistinctAggOnSameColumnAndGroupBy">
     <Resource name="sql">
-      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM 
MyTable]]>
+      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b), MAX(a), MIN(a) FROM 
MyTable GROUP BY c]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT 
$0)], EXPR$2=[MAX($0)])
-+- LogicalProject(a=[$0])
+LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT(DISTINCT $1)], 
EXPR$1=[SUM(DISTINCT $2)], EXPR$2=[MAX($1)], EXPR$3=[MIN($1)])
+   +- LogicalProject(c=[$2], a=[$0], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3])
++- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_COUNT(count$0) 
AS EXPR$0, Final_SUM(sum$1) AS EXPR$1, Final_MIN(min$2) AS EXPR$2, 
Final_MIN(min$3) AS EXPR$3])
+   +- Exchange(distribution=[hash[c]])
+      +- LocalHashAggregate(groupBy=[c], select=[c, Partial_COUNT(a) FILTER 
$g_1 AS count$0, Partial_SUM(b) FILTER $g_2 AS sum$1, Partial_MIN(EXPR$2) 
FILTER $g_3 AS min$2, Partial_MIN(EXPR$3) FILTER $g_3 AS min$3])
+         +- Calc(select=[c, a, b, EXPR$2, EXPR$3, =(CASE(=($e, 2:BIGINT), 
1:BIGINT, =($e, 4:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 
2:BIGINT), 1:BIGINT, =($e, 4:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, 
=(CASE(=($e, 2:BIGINT), 1:BIGINT, =($e, 4:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS 
$g_3])
+            +- HashAggregate(isMerge=[true], groupBy=[a, b, c, $e], select=[a, 
b, c, $e, Final_MAX(max$0) AS EXPR$2, Final_MIN(min$1) AS EXPR$3])
+               +- Exchange(distribution=[hash[a, b, c, $e]])
+                  +- LocalHashAggregate(groupBy=[a, b, c, $e], select=[a, b, 
c, $e, Partial_MAX(a_0) AS max$0, Partial_MIN(a_0) AS min$1])
+                     +- Expand(projects=[{a=[$0], b=[null], c=[$2], $e=[2], 
a_0=[$0]}, {a=[null], b=[$1], c=[$2], $e=[4], a_0=[$0]}, {a=[null], b=[null], 
c=[$2], $e=[6], a_0=[$0]}], projects=[{a, null AS b, c, 2 AS $e, a AS a_0}, 
{null AS a, b, c, 4 AS $e, a AS a_0}, {null AS a, null AS b, c, 6 AS $e, a AS 
a_0}])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiDistinctAggOnDifferentColumn">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b),
+COUNT(DISTINCT c) FILTER (WHERE a > 5) FROM MyTable]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT 
$1)], EXPR$2=[COUNT(DISTINCT $2) FILTER $3])
++- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[IS TRUE(>($0, 5))])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, 
Final_SUM(sum$1) AS EXPR$1, Final_MAX(max$2) AS EXPR$2])
+SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, 
Final_SUM(sum$1) AS EXPR$1, Final_COUNT(count$2) AS EXPR$2])
 +- Exchange(distribution=[single])
-   +- LocalSortAggregate(select=[Partial_COUNT(a) AS count$0, Partial_SUM(a) 
AS sum$1, Partial_MAX(a) AS max$2])
-      +- HashAggregate(isMerge=[true], groupBy=[a], select=[a])
-         +- Exchange(distribution=[hash[a]])
-            +- LocalHashAggregate(groupBy=[a], select=[a])
-               +- Calc(select=[a])
-                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+   +- LocalSortAggregate(select=[Partial_COUNT(a) FILTER $g_7 AS count$0, 
Partial_SUM(b) FILTER $g_11 AS sum$1, Partial_COUNT(c) FILTER $g_12 AS count$2])
+      +- Calc(select=[a, b, c, CAST($f3) AS $f3, =(CASE(=($e, 7:BIGINT), 
7:BIGINT, =($e, 11:BIGINT), 11:BIGINT, 12:BIGINT), 7) AS $g_7, =(CASE(=($e, 
7:BIGINT), 7:BIGINT, =($e, 11:BIGINT), 11:BIGINT, 12:BIGINT), 11) AS $g_11, 
AND(=(CASE(=($e, 7:BIGINT), 7:BIGINT, =($e, 11:BIGINT), 11:BIGINT, 12:BIGINT), 
12), IS TRUE(CAST($f3))) AS $g_12])
+         +- HashAggregate(isMerge=[true], groupBy=[a, b, c, $f3, $e], 
select=[a, b, c, $f3, $e])
+            +- Exchange(distribution=[hash[a, b, c, $f3, $e]])
+               +- LocalHashAggregate(groupBy=[a, b, c, $f3, $e], select=[a, b, 
c, $f3, $e])
+                  +- Expand(projects=[{a=[$0], b=[null], c=[null], $f3=[null], 
$e=[7]}, {a=[null], b=[$1], c=[null], $f3=[null], $e=[11]}, {a=[null], 
b=[null], c=[$2], $f3=[$3], $e=[12]}], projects=[{a, null AS b, null AS c, null 
AS $f3, 7 AS $e}, {null AS a, b, null AS c, null AS $f3, 11 AS $e}, {null AS a, 
null AS b, c, $f3, 12 AS $e}])
+                     +- Calc(select=[a, b, c, IS TRUE(>(a, 5)) AS $f3])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testMultiDistinctAndNonDistinctAggregateOnDifferentColumn">
+  <TestCase name="testMultiDistinctAggOnDifferentColumnWithGroupingSets">
     <Resource name="sql">
-      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM 
MyTable]]>
+      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable2 GROUP 
BY GROUPING SETS (c, d)]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT 
$1)], EXPR$2=[COUNT($2)])
-+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+LogicalProject(EXPR$0=[$2], EXPR$1=[$3])
++- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], 
EXPR$0=[COUNT(DISTINCT $2)], EXPR$1=[SUM(DISTINCT $3)])
+   +- LogicalProject(c=[$2], d=[$3], a=[$0], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[EXPR$0, EXPR$1])
++- HashAggregate(isMerge=[true], groupBy=[c, d, $e], select=[c, d, $e, 
Final_COUNT(count$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1])
+   +- Exchange(distribution=[hash[c, d, $e]])
+      +- LocalHashAggregate(groupBy=[c, d, $e], select=[c, d, $e, 
Partial_COUNT(a) FILTER $g_2 AS count$0, Partial_SUM(b) FILTER $g_4 AS sum$1])
+         +- Calc(select=[c, d, a, b, $e, =(CASE(=($e_0, 2:BIGINT), 2:BIGINT, 
4:BIGINT), 2) AS $g_2, =(CASE(=($e_0, 2:BIGINT), 2:BIGINT, 4:BIGINT), 4) AS 
$g_4])
+            +- HashAggregate(isMerge=[true], groupBy=[c, d, a, b, $e, $e_0], 
select=[c, d, a, b, $e, $e_0])
+               +- Exchange(distribution=[hash[c, d, a, b, $e, $e_0]])
+                  +- LocalHashAggregate(groupBy=[c, d, a, b, $e, $e_0], 
select=[c, d, a, b, $e, $e_0])
+                     +- Expand(projects=[{c=[$0], d=[$1], a=[$2], b=[null], 
$e=[$4], $e_0=[2]}, {c=[$0], d=[$1], a=[null], b=[$3], $e=[$4], $e_0=[4]}], 
projects=[{c, d, a, null AS b, $e, 2 AS $e_0}, {c, d, null AS a, b, $e, 4 AS 
$e_0}])
+                        +- Expand(projects=[{c=[$0], d=[null], a=[$2], b=[$3], 
$e=[1]}, {c=[null], d=[$1], a=[$2], b=[$3], $e=[2]}], projects=[{c, null AS d, 
a, b, 1 AS $e}, {null AS c, d, a, b, 2 AS $e}])
+                           +- Calc(select=[c, d, a, b])
+                              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiDistinctAggOnSameColumn">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT a) FILTER (WHERE b > 0),
+MAX(DISTINCT a) FROM MyTable]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT 
$0) FILTER $1], EXPR$2=[MAX($0)])
++- LogicalProject(a=[$0], $f1=[IS TRUE(>($1, 0))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, 
Final_SUM(sum$1) AS EXPR$1, Final_MIN(min$2) AS EXPR$2])
++- Exchange(distribution=[single])
+   +- LocalSortAggregate(select=[Partial_COUNT(a) FILTER $g_1 AS count$0, 
Partial_SUM(a) FILTER $g_0 AS sum$1, Partial_MIN(EXPR$2) FILTER $g_3 AS min$2])
+      +- Calc(select=[a, CAST($f1) AS $f1, EXPR$2, AND(=(CASE(=($e, 0:BIGINT), 
0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 0), IS TRUE(CAST($f1))) AS 
$g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 
1) AS $g_1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 
3:BIGINT), 3) AS $g_3])
+         +- HashAggregate(isMerge=[true], groupBy=[a, $f1, $e], select=[a, 
$f1, $e, Final_MAX(max$0) AS EXPR$2])
+            +- Exchange(distribution=[hash[a, $f1, $e]])
+               +- LocalHashAggregate(groupBy=[a, $f1, $e], select=[a, $f1, $e, 
Partial_MAX(a_0) AS max$0])
+                  +- Expand(projects=[{a=[$0], $f1=[$1], $e=[0], a_0=[$0]}, 
{a=[$0], $f1=[null], $e=[1], a_0=[$0]}, {a=[null], $f1=[null], $e=[3], 
a_0=[$0]}], projects=[{a, $f1, 0 AS $e, a AS a_0}, {a, null AS $f1, 1 AS $e, a 
AS a_0}, {null AS a, null AS $f1, 3 AS $e, a AS a_0}])
+                     +- Calc(select=[a, IS TRUE(>(b, 0)) AS $f1])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiDistinctAggOnSameColumnWithGroupingSets">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM 
MyTable2 GROUP BY GROUPING SETS (b, c)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$2], EXPR$1=[$3], EXPR$2=[$4])
++- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], 
EXPR$0=[COUNT(DISTINCT $2)], EXPR$1=[SUM(DISTINCT $2)], EXPR$2=[MAX($2)])
+   +- LogicalProject(b=[$1], c=[$2], a=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[EXPR$0, EXPR$1, EXPR$2])
++- HashAggregate(isMerge=[true], groupBy=[b, c, $e], select=[b, c, $e, 
Final_COUNT(count$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1, Final_MAX(max$2) AS 
EXPR$2])
+   +- Exchange(distribution=[hash[b, c, $e]])
+      +- LocalHashAggregate(groupBy=[b, c, $e], select=[b, c, $e, 
Partial_COUNT(a) AS count$0, Partial_SUM(a) AS sum$1, Partial_MAX(a) AS max$2])
+         +- HashAggregate(isMerge=[true], groupBy=[b, c, a, $e], select=[b, c, 
a, $e])
+            +- Exchange(distribution=[hash[b, c, a, $e]])
+               +- LocalHashAggregate(groupBy=[b, c, a, $e], select=[b, c, a, 
$e])
+                  +- Expand(projects=[{b=[$0], c=[null], a=[$2], $e=[1]}, 
{b=[null], c=[$1], a=[$2], $e=[2]}], projects=[{b, null AS c, a, 1 AS $e}, 
{null AS b, c, a, 2 AS $e}])
+                     +- Calc(select=[b, c, a])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiDistinctAndNonDistinctAggOnDifferentColumn">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(DISTINCT a) FILTER (WHERE c > 0),
+SUM(DISTINCT b), COUNT(c) FROM MyTable]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0) FILTER $1], 
EXPR$1=[SUM(DISTINCT $2)], EXPR$2=[COUNT($3)])
++- LogicalProject(a=[$0], $f1=[IS TRUE(>($2, 0))], b=[$1], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -81,17 +250,46 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], 
EXPR$1=[SUM(DISTINCT $
 Calc(select=[EXPR$0, EXPR$1, CAST(CASE(IS NOT NULL(EXPR$2), EXPR$2, 0)) AS 
EXPR$2])
 +- HashAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, 
Final_SUM(sum$1) AS EXPR$1, Final_MIN(min$2) AS EXPR$2])
    +- Exchange(distribution=[single])
-      +- LocalHashAggregate(select=[Partial_COUNT(a) FILTER $g_1 AS count$0, 
Partial_SUM(b) FILTER $g_2 AS sum$1, Partial_MIN(EXPR$2) FILTER $g_3 AS min$2])
-         +- Calc(select=[a, b, EXPR$2, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 
2:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 
=($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, =(CASE(=($e, 1:BIGINT), 
1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3])
-            +- HashAggregate(isMerge=[true], groupBy=[a, b, $e], select=[a, b, 
$e, Final_COUNT(count$0) AS EXPR$2])
-               +- Exchange(distribution=[hash[a, b, $e]])
-                  +- LocalHashAggregate(groupBy=[a, b, $e], select=[a, b, $e, 
Partial_COUNT(c) AS count$0])
-                     +- Expand(projects=[{a=[$0], b=[null], c=[$2], $e=[1]}, 
{a=[null], b=[$1], c=[$2], $e=[2]}, {a=[null], b=[null], c=[$2], $e=[3]}], 
projects=[{a, null AS b, c, 1 AS $e}, {null AS a, b, c, 2 AS $e}, {null AS a, 
null AS b, c, 3 AS $e}])
-                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+      +- LocalHashAggregate(select=[Partial_COUNT(a) FILTER $g_1 AS count$0, 
Partial_SUM(b) FILTER $g_6 AS sum$1, Partial_MIN(EXPR$2) FILTER $g_7 AS min$2])
+         +- Calc(select=[a, CAST($f1) AS $f1, b, EXPR$2, AND(=(CASE(=($e, 
1:BIGINT), 1:BIGINT, =($e, 6:BIGINT), 6:BIGINT, 7:BIGINT), 1), IS 
TRUE(CAST($f1))) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 6:BIGINT), 
6:BIGINT, 7:BIGINT), 6) AS $g_6, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 
6:BIGINT), 6:BIGINT, 7:BIGINT), 7) AS $g_7])
+            +- HashAggregate(isMerge=[true], groupBy=[a, $f1, b, $e], 
select=[a, $f1, b, $e, Final_COUNT(count$0) AS EXPR$2])
+               +- Exchange(distribution=[hash[a, $f1, b, $e]])
+                  +- LocalHashAggregate(groupBy=[a, $f1, b, $e], select=[a, 
$f1, b, $e, Partial_COUNT(c) AS count$0])
+                     +- Expand(projects=[{a=[$0], $f1=[$1], b=[null], c=[$3], 
$e=[1]}, {a=[null], $f1=[null], b=[$2], c=[$3], $e=[6]}, {a=[null], $f1=[null], 
b=[null], c=[$3], $e=[7]}], projects=[{a, $f1, null AS b, c, 1 AS $e}, {null AS 
a, null AS $f1, b, c, 6 AS $e}, {null AS a, null AS $f1, null AS b, c, 7 AS 
$e}])
+                        +- Calc(select=[a, IS TRUE(>(c, 0)) AS $f1, b, c])
+                           +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSingleDistinctAggregate">
+  <TestCase 
name="testMultiDistinctAndNonDistinctAggOnDifferentColumnWithGroupingSets">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM 
MyTable2 GROUP BY GROUPING SETS (d, e)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$2], EXPR$1=[$3], EXPR$2=[$4])
++- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], 
EXPR$0=[COUNT(DISTINCT $2)], EXPR$1=[SUM(DISTINCT $3)], EXPR$2=[COUNT($4)])
+   +- LogicalProject(d=[$3], e=[$4], a=[$0], b=[$1], c=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[EXPR$0, EXPR$1, CAST(EXPR$2) AS EXPR$2])
++- HashAggregate(isMerge=[true], groupBy=[d, e, $e], select=[d, e, $e, 
Final_COUNT(count$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1, Final_MIN(min$2) AS 
EXPR$2])
+   +- Exchange(distribution=[hash[d, e, $e]])
+      +- LocalHashAggregate(groupBy=[d, e, $e], select=[d, e, $e, 
Partial_COUNT(a) FILTER $g_8 AS count$0, Partial_SUM(b) FILTER $g_16 AS sum$1, 
Partial_MIN(EXPR$2) FILTER $g_24 AS min$2])
+         +- Calc(select=[a, b, d, e, $e, EXPR$2, =(CASE(=($e_0, 8:BIGINT), 
8:BIGINT, =($e_0, 16:BIGINT), 16:BIGINT, 24:BIGINT), 8) AS $g_8, =(CASE(=($e_0, 
8:BIGINT), 8:BIGINT, =($e_0, 16:BIGINT), 16:BIGINT, 24:BIGINT), 16) AS $g_16, 
=(CASE(=($e_0, 8:BIGINT), 8:BIGINT, =($e_0, 16:BIGINT), 16:BIGINT, 24:BIGINT), 
24) AS $g_24])
+            +- HashAggregate(isMerge=[true], groupBy=[a, b, d, e, $e, $e_0], 
select=[a, b, d, e, $e, $e_0, Final_COUNT(count$0) AS EXPR$2])
+               +- Exchange(distribution=[hash[a, b, d, e, $e, $e_0]])
+                  +- LocalHashAggregate(groupBy=[a, b, d, e, $e, $e_0], 
select=[a, b, d, e, $e, $e_0, Partial_COUNT(c) AS count$0])
+                     +- Expand(projects=[{a=[$0], b=[null], c=[$2], d=[$3], 
e=[$4], $e=[$5], $e_0=[8]}, {a=[null], b=[$1], c=[$2], d=[$3], e=[$4], $e=[$5], 
$e_0=[16]}, {a=[null], b=[null], c=[$2], d=[$3], e=[$4], $e=[$5], $e_0=[24]}], 
projects=[{a, null AS b, c, d, e, $e, 8 AS $e_0}, {null AS a, b, c, d, e, $e, 
16 AS $e_0}, {null AS a, null AS b, c, d, e, $e, 24 AS $e_0}])
+                        +- Expand(projects=[{a=[$0], b=[$1], c=[$2], d=[$3], 
e=[null], $e=[1]}, {a=[$0], b=[$1], c=[$2], d=[null], e=[$4], $e=[2]}], 
projects=[{a, b, c, d, null AS e, 1 AS $e}, {a, b, c, null AS d, e, 2 AS $e}])
+                           +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSingleDistinctAgg">
     <Resource name="sql">
       <![CDATA[SELECT COUNT(DISTINCT a) FROM MyTable]]>
     </Resource>
@@ -115,14 +313,14 @@ SortAggregate(isMerge=[true], 
select=[Final_COUNT(count$0) AS EXPR$0])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase 
name="testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate">
+  <TestCase name="testSingleDistinctAggAndOneOrMultiNonDistinctAgg1">
     <Resource name="sql">
-      <![CDATA[SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable]]>
+      <![CDATA[SELECT COUNT(DISTINCT a) FILTER (WHERE a > 0), SUM(b) FROM 
MyTable]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM($1)])
-+- LogicalProject(a=[$0], b=[$1])
+LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0) FILTER $1], 
EXPR$1=[SUM($2)])
++- LogicalProject(a=[$0], $f1=[IS TRUE(>($0, 0))], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
@@ -130,70 +328,130 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT 
$0)], EXPR$1=[SUM($1)])
       <![CDATA[
 SortAggregate(isMerge=[true], select=[Final_COUNT(count$0) AS EXPR$0, 
Final_MIN(min$1) AS EXPR$1])
 +- Exchange(distribution=[single])
-   +- LocalSortAggregate(select=[Partial_COUNT(a) FILTER $g_0 AS count$0, 
Partial_MIN(EXPR$1) FILTER $g_1 AS min$1])
-      +- Calc(select=[a, EXPR$1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 
0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1])
-         +- HashAggregate(isMerge=[true], groupBy=[a, $e], select=[a, $e, 
Final_SUM(sum$0) AS EXPR$1])
-            +- Exchange(distribution=[hash[a, $e]])
-               +- LocalHashAggregate(groupBy=[a, $e], select=[a, $e, 
Partial_SUM(b) AS sum$0])
-                  +- Expand(projects=[{a=[$0], b=[$1], $e=[0]}, {a=[null], 
b=[$1], $e=[1]}], projects=[{a, b, 0 AS $e}, {null AS a, b, 1 AS $e}])
-                     +- Calc(select=[a, b])
+   +- LocalSortAggregate(select=[Partial_COUNT(a) FILTER $g_0 AS count$0, 
Partial_MIN(EXPR$1) FILTER $g_3 AS min$1])
+      +- Calc(select=[a, CAST($f1) AS $f1, EXPR$1, AND(=(CASE(=($e, 0:BIGINT), 
0:BIGINT, 3:BIGINT), 0), IS TRUE(CAST($f1))) AS $g_0, =(CASE(=($e, 0:BIGINT), 
0:BIGINT, 3:BIGINT), 3) AS $g_3])
+         +- HashAggregate(isMerge=[true], groupBy=[a, $f1, $e], select=[a, 
$f1, $e, Final_SUM(sum$0) AS EXPR$1])
+            +- Exchange(distribution=[hash[a, $f1, $e]])
+               +- LocalHashAggregate(groupBy=[a, $f1, $e], select=[a, $f1, $e, 
Partial_SUM(b) AS sum$0])
+                  +- Expand(projects=[{a=[$0], $f1=[$1], b=[$2], $e=[0]}, 
{a=[null], $f1=[null], b=[$2], $e=[3]}], projects=[{a, $f1, b, 0 AS $e}, {null 
AS a, null AS $f1, b, 3 AS $e}])
+                     +- Calc(select=[a, IS TRUE(>(a, 0)) AS $f1, b])
                         +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTwoDifferentDistinctAggregateWithGroupingAndCountStar">
+  <TestCase name="testSingleDistinctAggAndOneOrMultiNonDistinctAgg2">
     <Resource name="sql">
-      <![CDATA[SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT c) FROM 
MyTable GROUP BY a]]>
+      <![CDATA[SELECT COUNT(a) filter (WHERE a > 0), SUM(DISTINCT b) FROM 
MyTable]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[SUM(DISTINCT $1)], 
EXPR$3=[COUNT(DISTINCT $2)])
-+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+LogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $1], 
EXPR$1=[SUM(DISTINCT $2)])
++- LogicalProject(a=[$0], $f1=[IS TRUE(>($0, 0))], b=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2, EXPR$3])
-+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MIN(min$0) AS 
EXPR$1, Final_SUM(sum$1) AS EXPR$2, Final_COUNT(count$2) AS EXPR$3])
-   +- Exchange(distribution=[hash[a]])
-      +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MIN(EXPR$1) FILTER 
$g_3 AS min$0, Partial_SUM(b) FILTER $g_1 AS sum$1, Partial_COUNT(c) FILTER 
$g_2 AS count$2])
-         +- Calc(select=[a, b, c, EXPR$1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 
=($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 
1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, =(CASE(=($e, 
1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3])
-            +- HashAggregate(isMerge=[true], groupBy=[a, b, c, $e], select=[a, 
b, c, $e, Final_COUNT(count1$0) AS EXPR$1])
-               +- Exchange(distribution=[hash[a, b, c, $e]])
-                  +- LocalHashAggregate(groupBy=[a, b, c, $e], select=[a, b, 
c, $e, Partial_COUNT(*) AS count1$0])
-                     +- Expand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, 
{a=[$0], b=[null], c=[$2], $e=[2]}, {a=[$0], b=[null], c=[null], $e=[3]}], 
projects=[{a, b, null AS c, 1 AS $e}, {a, null AS b, c, 2 AS $e}, {a, null AS 
b, null AS c, 3 AS $e}])
-                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+Calc(select=[CAST(CASE(IS NOT NULL(EXPR$0), EXPR$0, 0)) AS EXPR$0, EXPR$1])
++- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS EXPR$0, 
Final_SUM(sum$1) AS EXPR$1])
+   +- Exchange(distribution=[single])
+      +- LocalHashAggregate(select=[Partial_MIN(EXPR$0) FILTER $g_1 AS min$0, 
Partial_SUM(b) FILTER $g_0 AS sum$1])
+         +- Calc(select=[b, EXPR$0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 
1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1])
+            +- HashAggregate(isMerge=[true], groupBy=[b, $e], select=[b, $e, 
Final_COUNT(count$0) FILTER $e AS EXPR$0])
+               +- Exchange(distribution=[hash[b, $e]])
+                  +- LocalHashAggregate(groupBy=[b, $e], select=[b, $e, 
Partial_COUNT(a) FILTER $f1 AS count$0])
+                     +- Expand(projects=[{a=[$0], $f1=[$1], b=[$2], $e=[0]}, 
{a=[$0], $f1=[$1], b=[null], $e=[1]}], projects=[{a, $f1, b, 0 AS $e}, {a, $f1, 
null AS b, 1 AS $e}])
+                        +- Calc(select=[a, IS TRUE(>(a, 0)) AS $f1, b])
+                           +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testSingleDistinctAggAndOneOrMultiNonDistinctAggWithGroupingSets1">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable2 GROUP BY 
GROUPING SETS (b, c)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[$2], EXPR$1=[$3])
++- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], 
EXPR$0=[COUNT(DISTINCT $2)], EXPR$1=[SUM($0)])
+   +- LogicalProject(b=[$1], c=[$2], a=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[EXPR$0, EXPR$1])
++- HashAggregate(isMerge=[true], groupBy=[b, c, $e], select=[b, c, $e, 
Final_COUNT(count$0) AS EXPR$0, Final_MIN(min$1) AS EXPR$1])
+   +- Exchange(distribution=[hash[b, c, $e]])
+      +- LocalHashAggregate(groupBy=[b, c, $e], select=[b, c, $e, 
Partial_COUNT(a) FILTER $g_0 AS count$0, Partial_MIN(EXPR$1) FILTER $g_2 AS 
min$1])
+         +- Calc(select=[b, c, a, $e, EXPR$1, =(CASE(=($e_0, 0:BIGINT), 
0:BIGINT, 2:BIGINT), 0) AS $g_0, =(CASE(=($e_0, 0:BIGINT), 0:BIGINT, 2:BIGINT), 
2) AS $g_2])
+            +- HashAggregate(isMerge=[true], groupBy=[b, c, a, $e, $e_0], 
select=[b, c, a, $e, $e_0, Final_SUM(sum$0) AS EXPR$1])
+               +- Exchange(distribution=[hash[b, c, a, $e, $e_0]])
+                  +- LocalHashAggregate(groupBy=[b, c, a, $e, $e_0], 
select=[b, c, a, $e, $e_0, Partial_SUM(b_0) AS sum$0])
+                     +- Expand(projects=[{b=[$0], c=[$1], a=[$2], $e=[$3], 
b_0=[$4], $e_0=[0]}, {b=[$0], c=[$1], a=[null], $e=[$3], b_0=[$4], $e_0=[2]}], 
projects=[{b, c, a, $e, b_0, 0 AS $e_0}, {b, c, null AS a, $e, b_0, 2 AS $e_0}])
+                        +- Expand(projects=[{b=[$0], c=[null], a=[$2], $e=[1], 
b_0=[$0]}, {b=[null], c=[$1], a=[$2], $e=[2], b_0=[$0]}], projects=[{b, null AS 
c, a, 1 AS $e, b AS b_0}, {null AS b, c, a, 2 AS $e, b AS b_0}])
+                           +- Calc(select=[b, c, a])
+                              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase 
name="testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate2">
+  <TestCase 
name="testSingleDistinctAggAndOneOrMultiNonDistinctAggWithGroupingSets2">
     <Resource name="sql">
-      <![CDATA[SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable]]>
+      <![CDATA[SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable2 GROUP BY 
GROUPING SETS (c, d)]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[COUNT($0)], EXPR$1=[SUM(DISTINCT $1)])
+LogicalProject(EXPR$0=[$2], EXPR$1=[$3])
++- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], EXPR$0=[COUNT($2)], 
EXPR$1=[SUM(DISTINCT $3)])
+   +- LogicalProject(c=[$2], d=[$3], a=[$0], b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[CAST(EXPR$0) AS EXPR$0, EXPR$1])
++- HashAggregate(isMerge=[true], groupBy=[c, d, $e], select=[c, d, $e, 
Final_MIN(min$0) AS EXPR$0, Final_SUM(sum$1) AS EXPR$1])
+   +- Exchange(distribution=[hash[c, d, $e]])
+      +- LocalHashAggregate(groupBy=[c, d, $e], select=[c, d, $e, 
Partial_MIN(EXPR$0) FILTER $g_2 AS min$0, Partial_SUM(b) FILTER $g_0 AS sum$1])
+         +- Calc(select=[c, d, b, $e, EXPR$0, =(CASE(=($e_0, 0:BIGINT), 
0:BIGINT, 2:BIGINT), 0) AS $g_0, =(CASE(=($e_0, 0:BIGINT), 0:BIGINT, 2:BIGINT), 
2) AS $g_2])
+            +- HashAggregate(isMerge=[true], groupBy=[c, d, b, $e, $e_0], 
select=[c, d, b, $e, $e_0, Final_COUNT(count$0) AS EXPR$0])
+               +- Exchange(distribution=[hash[c, d, b, $e, $e_0]])
+                  +- LocalHashAggregate(groupBy=[c, d, b, $e, $e_0], 
select=[c, d, b, $e, $e_0, Partial_COUNT(a) AS count$0])
+                     +- Expand(projects=[{c=[$0], d=[$1], a=[$2], b=[$3], 
$e=[$4], $e_0=[0]}, {c=[$0], d=[$1], a=[$2], b=[null], $e=[$4], $e_0=[2]}], 
projects=[{c, d, a, b, $e, 0 AS $e_0}, {c, d, a, null AS b, $e, 2 AS $e_0}])
+                        +- Expand(projects=[{c=[$0], d=[null], a=[$2], b=[$3], 
$e=[1]}, {c=[null], d=[$1], a=[$2], b=[$3], $e=[2]}], projects=[{c, null AS d, 
a, b, 1 AS $e}, {null AS c, d, a, b, 2 AS $e}])
+                           +- Calc(select=[c, d, a, b])
+                              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTwoDistinctAggWithGroupByAndCountStar">
+    <Resource name="sql">
+      <![CDATA[SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT b) FROM 
MyTable GROUP BY a]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[SUM(DISTINCT $1)], 
EXPR$3=[COUNT(DISTINCT $1)])
 +- LogicalProject(a=[$0], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[CAST(CASE(IS NOT NULL(EXPR$0), EXPR$0, 0)) AS EXPR$0, EXPR$1])
-+- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS EXPR$0, 
Final_SUM(sum$1) AS EXPR$1])
-   +- Exchange(distribution=[single])
-      +- LocalHashAggregate(select=[Partial_MIN(EXPR$0) FILTER $g_1 AS min$0, 
Partial_SUM(b) FILTER $g_0 AS sum$1])
-         +- Calc(select=[b, EXPR$0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 
1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1])
-            +- HashAggregate(isMerge=[true], groupBy=[b, $e], select=[b, $e, 
Final_COUNT(count$0) AS EXPR$0])
-               +- Exchange(distribution=[hash[b, $e]])
-                  +- LocalHashAggregate(groupBy=[b, $e], select=[b, $e, 
Partial_COUNT(a) AS count$0])
+Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2, EXPR$3])
++- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MIN(min$0) AS 
EXPR$1, Final_SUM(sum$1) FILTER count$2 AS EXPR$2, Final_COUNT(count$2) FILTER 
count$2 AS EXPR$3])
+   +- Exchange(distribution=[hash[a]])
+      +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MIN(EXPR$1) FILTER 
$g_1 AS min$0, Partial_SUM(b) FILTER $g_0 AS sum$1, Partial_COUNT(b) FILTER 
$g_0 AS count$2])
+         +- Calc(select=[a, b, EXPR$1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 
1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1])
+            +- HashAggregate(isMerge=[true], groupBy=[a, b, $e], select=[a, b, 
$e, Final_COUNT(count1$0) AS EXPR$1])
+               +- Exchange(distribution=[hash[a, b, $e]])
+                  +- LocalHashAggregate(groupBy=[a, b, $e], select=[a, b, $e, 
Partial_COUNT(*) AS count1$0])
                      +- Expand(projects=[{a=[$0], b=[$1], $e=[0]}, {a=[$0], 
b=[null], $e=[1]}], projects=[{a, b, 0 AS $e}, {a, null AS b, 1 AS $e}])
                         +- Calc(select=[a, b])
                            +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSingleDistinctAggregateWithGrouping">
+  <TestCase name="testSingleDistinctAggWithGroupBy">
     <Resource name="sql">
       <![CDATA[SELECT a, COUNT(a), SUM(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
@@ -220,7 +478,7 @@ Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSingleDistinctAggregateWithGroupingAndCountStar">
+  <TestCase name="testSingleDistinctAggWithGroupByAndCountStar">
     <Resource name="sql">
       <![CDATA[SELECT a, COUNT(*), SUM(DISTINCT b) FROM MyTable GROUP BY a]]>
     </Resource>
@@ -247,30 +505,165 @@ Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTwoDistinctAggregateWithGroupingAndCountStar">
+  <TestCase name="testTwoDifferentDistinctAggWithGroupByAndCountStar">
     <Resource name="sql">
-      <![CDATA[SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT b) FROM 
MyTable GROUP BY a]]>
+      <![CDATA[SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT c) FROM 
MyTable GROUP BY a]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[SUM(DISTINCT $1)], 
EXPR$3=[COUNT(DISTINCT $1)])
-+- LogicalProject(a=[$0], b=[$1])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT()], EXPR$2=[SUM(DISTINCT $1)], 
EXPR$3=[COUNT(DISTINCT $2)])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, CAST(EXPR$1) AS EXPR$1, EXPR$2, EXPR$3])
-+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MIN(min$0) AS 
EXPR$1, Final_SUM(sum$1) FILTER count$2 AS EXPR$2, Final_COUNT(count$2) FILTER 
count$2 AS EXPR$3])
++- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_MIN(min$0) AS 
EXPR$1, Final_SUM(sum$1) AS EXPR$2, Final_COUNT(count$2) AS EXPR$3])
    +- Exchange(distribution=[hash[a]])
-      +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MIN(EXPR$1) FILTER 
$g_1 AS min$0, Partial_SUM(b) FILTER $g_0 AS sum$1, Partial_COUNT(b) FILTER 
$g_0 AS count$2])
-         +- Calc(select=[a, b, EXPR$1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 
1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1])
-            +- HashAggregate(isMerge=[true], groupBy=[a, b, $e], select=[a, b, 
$e, Final_COUNT(count1$0) AS EXPR$1])
-               +- Exchange(distribution=[hash[a, b, $e]])
-                  +- LocalHashAggregate(groupBy=[a, b, $e], select=[a, b, $e, 
Partial_COUNT(*) AS count1$0])
-                     +- Expand(projects=[{a=[$0], b=[$1], $e=[0]}, {a=[$0], 
b=[null], $e=[1]}], projects=[{a, b, 0 AS $e}, {a, null AS b, 1 AS $e}])
-                        +- Calc(select=[a, b])
-                           +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+      +- LocalHashAggregate(groupBy=[a], select=[a, Partial_MIN(EXPR$1) FILTER 
$g_3 AS min$0, Partial_SUM(b) FILTER $g_1 AS sum$1, Partial_COUNT(c) FILTER 
$g_2 AS count$2])
+         +- Calc(select=[a, b, c, EXPR$1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 
=($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 
1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, =(CASE(=($e, 
1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3])
+            +- HashAggregate(isMerge=[true], groupBy=[a, b, c, $e], select=[a, 
b, c, $e, Final_COUNT(count1$0) AS EXPR$1])
+               +- Exchange(distribution=[hash[a, b, c, $e]])
+                  +- LocalHashAggregate(groupBy=[a, b, c, $e], select=[a, b, 
c, $e, Partial_COUNT(*) AS count1$0])
+                     +- Expand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, 
{a=[$0], b=[null], c=[$2], $e=[2]}, {a=[$0], b=[null], c=[null], $e=[3]}], 
projects=[{a, b, null AS c, 1 AS $e}, {a, null AS b, c, 2 AS $e}, {a, null AS 
b, null AS c, 3 AS $e}])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="TestMultiDistinctOnDifferentColumnWithFilter">
+    <Resource name="sql">
+      <![CDATA[SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a 
> 0),
+COUNT(DISTINCT b) FILTER (WHERE b > 1) FROM MyTable2 GROUP BY d]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], 
EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[COUNT(DISTINCT $3) FILTER $4])
++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 0))], b=[$1], $f4=[IS 
TRUE(>($1, 1))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS 
EXPR$1, Final_COUNT(count$1) AS EXPR$2, Final_COUNT(count$2) AS EXPR$3])
++- Exchange(distribution=[hash[d]])
+   +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(c) FILTER $g_7 
AS count$0, Partial_COUNT(c) FILTER $g_3 AS count$1, Partial_COUNT(b) FILTER 
$g_12 AS count$2])
+      +- Calc(select=[d, c, CAST($f2) AS $f2, b, CAST($f4) AS $f4, 
AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, 12:BIGINT), 
3), IS TRUE(CAST($f2))) AS $g_3, =(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 
7:BIGINT), 7:BIGINT, 12:BIGINT), 7) AS $g_7, AND(=(CASE(=($e, 3:BIGINT), 
3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, 12:BIGINT), 12), IS TRUE(CAST($f4))) AS 
$g_12])
+         +- HashAggregate(isMerge=[true], groupBy=[d, c, $f2, b, $f4, $e], 
select=[d, c, $f2, b, $f4, $e])
+            +- Exchange(distribution=[hash[d, c, $f2, b, $f4, $e]])
+               +- LocalHashAggregate(groupBy=[d, c, $f2, b, $f4, $e], 
select=[d, c, $f2, b, $f4, $e])
+                  +- Expand(projects=[{d=[$0], c=[$1], $f2=[$2], b=[null], 
$f4=[null], $e=[3]}, {d=[$0], c=[$1], $f2=[null], b=[null], $f4=[null], 
$e=[7]}, {d=[$0], c=[null], $f2=[null], b=[$3], $f4=[$4], $e=[12]}], 
projects=[{d, c, $f2, null AS b, null AS $f4, 3 AS $e}, {d, c, null AS $f2, 
null AS b, null AS $f4, 7 AS $e}, {d, null AS c, null AS $f2, b, $f4, 12 AS 
$e}])
+                     +- Calc(select=[d, c, IS TRUE(>(a, 0)) AS $f2, b, IS 
TRUE(>(b, 1)) AS $f4])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="TestMultiDistinctWithFilterAndNonDistinctAgg">
+    <Resource name="sql">
+      <![CDATA[SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a 
> 0),
+MAX(e), MIN(e) FROM MyTable2 GROUP BY d]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], 
EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[MAX($3)], EXPR$4=[MIN($3)])
++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 0))], e=[$4])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+SortAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS 
EXPR$1, Final_COUNT(count$1) AS EXPR$2, Final_MIN(min$2) AS EXPR$3, 
Final_MIN(min$3) AS EXPR$4])
++- Sort(orderBy=[d ASC])
+   +- Exchange(distribution=[hash[d]])
+      +- LocalSortAggregate(groupBy=[d], select=[d, Partial_COUNT(c) FILTER 
$g_1 AS count$0, Partial_COUNT(c) FILTER $g_0 AS count$1, Partial_MIN(EXPR$3) 
FILTER $g_3 AS min$2, Partial_MIN(EXPR$4) FILTER $g_3 AS min$3])
+         +- Calc(select=[d, c, CAST($f2) AS $f2, EXPR$3, EXPR$4, 
AND(=(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 0), 
IS TRUE(CAST($f2))) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 
1:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 
1:BIGINT), 1:BIGINT, 3:BIGINT), 3) AS $g_3])
+            +- Sort(orderBy=[d ASC])
+               +- SortAggregate(isMerge=[false], groupBy=[d, c, $f2, $e], 
select=[d, c, $f2, $e, MAX(e) AS EXPR$3, MIN(e) AS EXPR$4])
+                  +- Sort(orderBy=[d ASC, c ASC, $f2 ASC, $e ASC])
+                     +- Exchange(distribution=[hash[d, c, $f2, $e]])
+                        +- Expand(projects=[{d=[$0], c=[$1], $f2=[$2], e=[$3], 
$e=[0]}, {d=[$0], c=[$1], $f2=[null], e=[$3], $e=[1]}, {d=[$0], c=[null], 
$f2=[null], e=[$3], $e=[3]}], projects=[{d, c, $f2, e, 0 AS $e}, {d, c, null AS 
$f2, e, 1 AS $e}, {d, null AS c, null AS $f2, e, 3 AS $e}])
+                           +- Calc(select=[d, c, IS TRUE(>(a, 0)) AS $f2, e])
+                              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiDistinctOnSameColumnWithFilter">
+    <Resource name="sql">
+      <![CDATA[SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a 
> 10),
+COUNT(DISTINCT c) FILTER (WHERE a < 10) FROM MyTable2 GROUP BY d]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], 
EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[COUNT(DISTINCT $1) FILTER $3])
++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 10))], $f3=[IS TRUE(<($0, 
10))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS 
EXPR$1, Final_COUNT(count$1) AS EXPR$2, Final_COUNT(count$2) AS EXPR$3])
++- Exchange(distribution=[hash[d]])
+   +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(c) FILTER $g_3 
AS count$0, Partial_COUNT(c) FILTER $g_1 AS count$1, Partial_COUNT(c) FILTER 
$g_2 AS count$2])
+      +- Calc(select=[d, c, CAST($f2) AS $f2, CAST($f3) AS $f3, 
AND(=(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 1), 
IS TRUE(CAST($f2))) AS $g_1, AND(=(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 
2:BIGINT), 2:BIGINT, 3:BIGINT), 2), IS TRUE(CAST($f3))) AS $g_2, =(CASE(=($e, 
1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3])
+         +- HashAggregate(isMerge=[true], groupBy=[d, c, $f2, $f3, $e], 
select=[d, c, $f2, $f3, $e])
+            +- Exchange(distribution=[hash[d, c, $f2, $f3, $e]])
+               +- LocalHashAggregate(groupBy=[d, c, $f2, $f3, $e], select=[d, 
c, $f2, $f3, $e])
+                  +- Expand(projects=[{d=[$0], c=[$1], $f2=[$2], $f3=[null], 
$e=[1]}, {d=[$0], c=[$1], $f2=[null], $f3=[$3], $e=[2]}, {d=[$0], c=[$1], 
$f2=[null], $f3=[null], $e=[3]}], projects=[{d, c, $f2, null AS $f3, 1 AS $e}, 
{d, c, null AS $f2, $f3, 2 AS $e}, {d, c, null AS $f2, null AS $f3, 3 AS $e}])
+                     +- Calc(select=[d, c, IS TRUE(>(a, 10)) AS $f2, IS 
TRUE(<(a, 10)) AS $f3])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSingleDistinctWithFilter">
+    <Resource name="sql">
+      <![CDATA[SELECT d, COUNT(DISTINCT c) FILTER (WHERE a > 0) FROM MyTable2 
GROUP BY d]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1) FILTER $2])
++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 0))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count$0) AS 
EXPR$1])
++- Exchange(distribution=[hash[d]])
+   +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(i$c) AS 
count$0])
+      +- HashAggregate(isMerge=[true], groupBy=[d, $f2, i$c], select=[d, $f2, 
i$c])
+         +- Exchange(distribution=[hash[d, $f2, i$c]])
+            +- LocalHashAggregate(groupBy=[d, $f2, i$c], select=[d, $f2, i$c])
+               +- Calc(select=[d, IS TRUE(>(a, 0)) AS $f2, CASE(IS TRUE(>(a, 
0)), c, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS i$c])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiDistinctAndNonDistinctAggWithFilter">
+    <Resource name="sql">
+      <![CDATA[SELECT d, MAX(e), MAX(e) FILTER (WHERE a < 10), COUNT(DISTINCT 
c),
+COUNT(DISTINCT c) FILTER (WHERE a > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3)
+FROM MyTable2 GROUP BY d]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[MAX($1) FILTER $2], 
EXPR$3=[COUNT(DISTINCT $3)], EXPR$4=[COUNT(DISTINCT $3) FILTER $4], 
EXPR$5=[COUNT(DISTINCT $5) FILTER $6])
++- LogicalProject(d=[$3], e=[$4], $f2=[IS TRUE(<($0, 10))], c=[$2], $f4=[IS 
TRUE(>($0, 5))], b=[$1], $f6=[IS TRUE(>($1, 3))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+SortAggregate(isMerge=[true], groupBy=[d], select=[d, Final_MIN(min$0) AS 
EXPR$1, Final_MIN(min$1) AS EXPR$2, Final_COUNT(count$2) AS EXPR$3, 
Final_COUNT(count$3) AS EXPR$4, Final_COUNT(count$4) AS EXPR$5])
++- Sort(orderBy=[d ASC])
+   +- Exchange(distribution=[hash[d]])
+      +- LocalSortAggregate(groupBy=[d], select=[d, Partial_MIN(EXPR$1) FILTER 
$g_15 AS min$0, Partial_MIN(EXPR$2) FILTER $g_15 AS min$1, Partial_COUNT(c) 
FILTER $g_7 AS count$2, Partial_COUNT(c) FILTER $g_3 AS count$3, 
Partial_COUNT(b) FILTER $g_12 AS count$4])
+         +- Calc(select=[d, c, CAST($f4) AS $f4, b, CAST($f6) AS $f6, EXPR$1, 
EXPR$2, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 
12:BIGINT), 12:BIGINT, 15:BIGINT), 3), IS TRUE(CAST($f4))) AS $g_3, 
=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 
12:BIGINT, 15:BIGINT), 7) AS $g_7, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 
7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 12), IS 
TRUE(CAST($f6))) AS $g_12, =(CASE(=($e, [...]
+            +- Sort(orderBy=[d ASC])
+               +- SortAggregate(isMerge=[false], groupBy=[d, c, $f4, b, $f6, 
$e], select=[d, c, $f4, b, $f6, $e, MAX(e) AS EXPR$1, MAX(e) FILTER $f2 AS 
EXPR$2])
+                  +- Sort(orderBy=[d ASC, c ASC, $f4 ASC, b ASC, $f6 ASC, $e 
ASC])
+                     +- Exchange(distribution=[hash[d, c, $f4, b, $f6, $e]])
+                        +- Expand(projects=[{d=[$0], e=[$1], $f2=[$2], c=[$3], 
$f4=[$4], b=[null], $f6=[null], $e=[3]}, {d=[$0], e=[$1], $f2=[$2], c=[$3], 
$f4=[null], b=[null], $f6=[null], $e=[7]}, {d=[$0], e=[$1], $f2=[$2], c=[null], 
$f4=[null], b=[$5], $f6=[$6], $e=[12]}, {d=[$0], e=[$1], $f2=[$2], c=[null], 
$f4=[null], b=[null], $f6=[null], $e=[15]}], projects=[{d, e, $f2, c, $f4, null 
AS b, null AS $f6, 3 AS $e}, {d, e, $f2, c, null AS $f4, null AS b, null AS 
$f6, 7 AS $e}, {d, e, $f [...]
+                           +- Calc(select=[d, e, IS TRUE(<(a, 10)) AS $f2, c, 
IS TRUE(>(a, 5)) AS $f4, b, IS TRUE(>(b, 3)) AS $f6])
+                              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
index b0744cf..0268505 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.xml
@@ -109,22 +109,23 @@ FlinkLogicalCalc(select=[EXPR$0, EXPR$1, EXPR$2, EXPR$3])
   </TestCase>
   <TestCase name="testMultiDistinctAggOnDifferentColumn">
     <Resource name="sql">
-      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable]]>
+      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b),
+COUNT(DISTINCT c) FILTER (WHERE a > 5) FROM MyTable]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT 
$1)])
-+- LogicalProject(a=[$0], b=[$1])
+LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT 
$1)], EXPR$2=[COUNT(DISTINCT $2) FILTER $3])
++- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[IS TRUE(>($0, 5))])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $2], 
EXPR$1=[SUM($1) FILTER $3])
-+- FlinkLogicalCalc(select=[a, b, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 
1) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 2:BIGINT), 2) AS $g_2])
-   +- FlinkLogicalAggregate(group=[{0, 1, 2}])
-      +- FlinkLogicalExpand(projects=[{a=[$0], b=[null], $e=[1]}, {a=[null], 
b=[$1], $e=[2]}])
-         +- FlinkLogicalCalc(select=[a, b])
+FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $4], 
EXPR$1=[SUM($1) FILTER $5], EXPR$2=[COUNT($2) FILTER $6])
++- FlinkLogicalCalc(select=[a, b, c, CAST($f3) AS $f3, =(CASE(=($e, 7:BIGINT), 
7:BIGINT, =($e, 11:BIGINT), 11:BIGINT, 12:BIGINT), 7) AS $g_7, =(CASE(=($e, 
7:BIGINT), 7:BIGINT, =($e, 11:BIGINT), 11:BIGINT, 12:BIGINT), 11) AS $g_11, 
AND(=(CASE(=($e, 7:BIGINT), 7:BIGINT, =($e, 11:BIGINT), 11:BIGINT, 12:BIGINT), 
12), IS TRUE(CAST($f3))) AS $g_12])
+   +- FlinkLogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      +- FlinkLogicalExpand(projects=[{a=[$0], b=[null], c=[null], $f3=[null], 
$e=[7]}, {a=[null], b=[$1], c=[null], $f3=[null], $e=[11]}, {a=[null], 
b=[null], c=[$2], $f3=[$3], $e=[12]}])
+         +- FlinkLogicalCalc(select=[a, b, c, IS TRUE(>(a, 5)) AS $f3])
             +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
@@ -156,21 +157,24 @@ FlinkLogicalCalc(select=[EXPR$0, EXPR$1])
   </TestCase>
   <TestCase name="testMultiDistinctAggOnSameColumn">
     <Resource name="sql">
-      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM 
MyTable]]>
+      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT a) FILTER (WHERE b > 0),
+MAX(DISTINCT a) FROM MyTable]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT 
$0)], EXPR$2=[MAX($0)])
-+- LogicalProject(a=[$0])
+LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT 
$0) FILTER $1], EXPR$2=[MAX($0)])
++- LogicalProject(a=[$0], $f1=[IS TRUE(>($1, 0))])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0)], EXPR$1=[SUM($0)], 
EXPR$2=[MAX($0)])
-+- FlinkLogicalAggregate(group=[{0}])
-   +- FlinkLogicalCalc(select=[a])
-      +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
+FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $4], 
EXPR$1=[SUM($0) FILTER $3], EXPR$2=[MIN($2) FILTER $5])
++- FlinkLogicalCalc(select=[a, CAST($f1) AS $f1, EXPR$2, AND(=(CASE(=($e, 
0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 0), IS 
TRUE(CAST($f1))) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 
1:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 
1:BIGINT), 1:BIGINT, 3:BIGINT), 3) AS $g_3])
+   +- FlinkLogicalAggregate(group=[{0, 1, 2}], EXPR$2=[MAX($3)])
+      +- FlinkLogicalExpand(projects=[{a=[$0], $f1=[$1], $e=[0], a_0=[$0]}, 
{a=[$0], $f1=[null], $e=[1], a_0=[$0]}, {a=[null], $f1=[null], $e=[3], 
a_0=[$0]}])
+         +- FlinkLogicalCalc(select=[a, IS TRUE(>(b, 0)) AS $f1])
+            +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
   </TestCase>
@@ -199,22 +203,25 @@ FlinkLogicalCalc(select=[EXPR$0, EXPR$1, EXPR$2])
   </TestCase>
   <TestCase name="testMultiDistinctAndNonDistinctAggOnDifferentColumn">
     <Resource name="sql">
-      <![CDATA[SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM 
MyTable]]>
+      <![CDATA[SELECT COUNT(DISTINCT a) FILTER (WHERE c > 0),
+SUM(DISTINCT b), COUNT(c) FROM MyTable]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM(DISTINCT 
$1)], EXPR$2=[COUNT($2)])
-+- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0) FILTER $1], 
EXPR$1=[SUM(DISTINCT $2)], EXPR$2=[COUNT($3)])
++- LogicalProject(a=[$0], $f1=[IS TRUE(>($2, 0))], b=[$1], c=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 FlinkLogicalCalc(select=[EXPR$0, EXPR$1, CAST(CASE(IS NOT NULL(EXPR$2), 
EXPR$2, 0)) AS EXPR$2])
-+- FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $3], 
EXPR$1=[SUM($1) FILTER $4], EXPR$2=[MIN($2) FILTER $5])
-   +- FlinkLogicalCalc(select=[a, b, EXPR$2, =(CASE(=($e, 1:BIGINT), 1:BIGINT, 
=($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 1:BIGINT), 
1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 2) AS $g_2, =(CASE(=($e, 
1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3])
-      +- FlinkLogicalAggregate(group=[{0, 1, 3}], EXPR$2=[COUNT($2)])
-         +- FlinkLogicalExpand(projects=[{a=[$0], b=[null], c=[$2], $e=[1]}, 
{a=[null], b=[$1], c=[$2], $e=[2]}, {a=[null], b=[null], c=[$2], $e=[3]}])
-            +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
++- FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $4], 
EXPR$1=[SUM($2) FILTER $5], EXPR$2=[MIN($3) FILTER $6])
+   +- FlinkLogicalCalc(select=[a, CAST($f1) AS $f1, b, EXPR$2, 
AND(=(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 6:BIGINT), 6:BIGINT, 7:BIGINT), 1), 
IS TRUE(CAST($f1))) AS $g_1, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 6:BIGINT), 
6:BIGINT, 7:BIGINT), 6) AS $g_6, =(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 
6:BIGINT), 6:BIGINT, 7:BIGINT), 7) AS $g_7])
+      +- FlinkLogicalAggregate(group=[{0, 1, 2, 4}], EXPR$2=[COUNT($3)])
+         +- FlinkLogicalExpand(projects=[{a=[$0], $f1=[$1], b=[null], c=[$3], 
$e=[1]}, {a=[null], $f1=[null], b=[$2], c=[$3], $e=[6]}, {a=[null], $f1=[null], 
b=[null], c=[$3], $e=[7]}])
+            +- FlinkLogicalCalc(select=[a, IS TRUE(>(c, 0)) AS $f1, b, c])
+               +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
   </TestCase>
@@ -264,34 +271,34 @@ FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0)])
   </TestCase>
   <TestCase name="testSingleDistinctAggAndOneOrMultiNonDistinctAgg1">
     <Resource name="sql">
-      <![CDATA[SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable]]>
+      <![CDATA[SELECT COUNT(DISTINCT a) FILTER (WHERE a > 0), SUM(b) FROM 
MyTable]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], EXPR$1=[SUM($1)])
-+- LogicalProject(a=[$0], b=[$1])
+LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0) FILTER $1], 
EXPR$1=[SUM($2)])
++- LogicalProject(a=[$0], $f1=[IS TRUE(>($0, 0))], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $2], 
EXPR$1=[MIN($1) FILTER $3])
-+- FlinkLogicalCalc(select=[a, EXPR$1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 
1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1])
-   +- FlinkLogicalAggregate(group=[{0, 2}], EXPR$1=[SUM($1)])
-      +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $e=[0]}, {a=[null], 
b=[$1], $e=[1]}])
-         +- FlinkLogicalCalc(select=[a, b])
+FlinkLogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $3], 
EXPR$1=[MIN($2) FILTER $4])
++- FlinkLogicalCalc(select=[a, CAST($f1) AS $f1, EXPR$1, AND(=(CASE(=($e, 
0:BIGINT), 0:BIGINT, 3:BIGINT), 0), IS TRUE(CAST($f1))) AS $g_0, =(CASE(=($e, 
0:BIGINT), 0:BIGINT, 3:BIGINT), 3) AS $g_3])
+   +- FlinkLogicalAggregate(group=[{0, 1, 3}], EXPR$1=[SUM($2)])
+      +- FlinkLogicalExpand(projects=[{a=[$0], $f1=[$1], b=[$2], $e=[0]}, 
{a=[null], $f1=[null], b=[$2], $e=[3]}])
+         +- FlinkLogicalCalc(select=[a, IS TRUE(>(a, 0)) AS $f1, b])
             +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
   </TestCase>
   <TestCase name="testSingleDistinctAggAndOneOrMultiNonDistinctAgg2">
     <Resource name="sql">
-      <![CDATA[SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable]]>
+      <![CDATA[SELECT COUNT(a) filter (WHERE a > 0), SUM(DISTINCT b) FROM 
MyTable]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalAggregate(group=[{}], EXPR$0=[COUNT($0)], EXPR$1=[SUM(DISTINCT $1)])
-+- LogicalProject(a=[$0], b=[$1])
+LogicalAggregate(group=[{}], EXPR$0=[COUNT($0) FILTER $1], 
EXPR$1=[SUM(DISTINCT $2)])
++- LogicalProject(a=[$0], $f1=[IS TRUE(>($0, 0))], b=[$1])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
@@ -300,9 +307,9 @@ LogicalAggregate(group=[{}], EXPR$0=[COUNT($0)], 
EXPR$1=[SUM(DISTINCT $1)])
 FlinkLogicalCalc(select=[CAST(CASE(IS NOT NULL(EXPR$0), EXPR$0, 0)) AS EXPR$0, 
EXPR$1])
 +- FlinkLogicalAggregate(group=[{}], EXPR$0=[MIN($1) FILTER $3], 
EXPR$1=[SUM($0) FILTER $2])
    +- FlinkLogicalCalc(select=[b, EXPR$0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 
1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1])
-      +- FlinkLogicalAggregate(group=[{1, 2}], EXPR$0=[COUNT($0)])
-         +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], $e=[0]}, {a=[$0], 
b=[null], $e=[1]}])
-            +- FlinkLogicalCalc(select=[a, b])
+      +- FlinkLogicalAggregate(group=[{2, 3}], EXPR$0=[COUNT($0) FILTER $1])
+         +- FlinkLogicalExpand(projects=[{a=[$0], $f1=[$1], b=[$2], $e=[0]}, 
{a=[$0], $f1=[$1], b=[null], $e=[1]}])
+            +- FlinkLogicalCalc(select=[a, IS TRUE(>(a, 0)) AS $f1, b])
                +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c])
 ]]>
     </Resource>
@@ -490,4 +497,118 @@ FlinkLogicalCalc(select=[a, CAST(EXPR$1) AS EXPR$1, 
EXPR$2, EXPR$3])
 ]]>
     </Resource>
   </TestCase>
+    <TestCase name="TestMultiDistinctOnDifferentColumnWithFilter">
+    <Resource name="sql">
+      <![CDATA[SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a 
> 0),
+COUNT(DISTINCT b) FILTER (WHERE b > 1) FROM MyTable2 GROUP BY d]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], 
EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[COUNT(DISTINCT $3) FILTER $4])
++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 0))], b=[$1], $f4=[IS 
TRUE(>($1, 1))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $6], 
EXPR$2=[COUNT($1) FILTER $5], EXPR$3=[COUNT($3) FILTER $7])
++- FlinkLogicalCalc(select=[d, c, CAST($f2) AS $f2, b, CAST($f4) AS $f4, 
AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, 12:BIGINT), 
3), IS TRUE(CAST($f2))) AS $g_3, =(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 
7:BIGINT), 7:BIGINT, 12:BIGINT), 7) AS $g_7, AND(=(CASE(=($e, 3:BIGINT), 
3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, 12:BIGINT), 12), IS TRUE(CAST($f4))) AS 
$g_12])
+   +- FlinkLogicalAggregate(group=[{0, 1, 2, 3, 4, 5}])
+      +- FlinkLogicalExpand(projects=[{d=[$0], c=[$1], $f2=[$2], b=[null], 
$f4=[null], $e=[3]}, {d=[$0], c=[$1], $f2=[null], b=[null], $f4=[null], 
$e=[7]}, {d=[$0], c=[null], $f2=[null], b=[$3], $f4=[$4], $e=[12]}])
+         +- FlinkLogicalCalc(select=[d, c, IS TRUE(>(a, 0)) AS $f2, b, IS 
TRUE(>(b, 1)) AS $f4])
+            +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="TestMultiDistinctWithFilterAndNonDistinctAgg">
+    <Resource name="sql">
+      <![CDATA[SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a 
> 0),
+MAX(e), MIN(e) FROM MyTable2 GROUP BY d]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], 
EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[MAX($3)], EXPR$4=[MIN($3)])
++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 0))], e=[$4])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $6], 
EXPR$2=[COUNT($1) FILTER $5], EXPR$3=[MIN($3) FILTER $7], EXPR$4=[MIN($4) 
FILTER $7])
++- FlinkLogicalCalc(select=[d, c, CAST($f2) AS $f2, EXPR$3, EXPR$4, 
AND(=(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT), 0), 
IS TRUE(CAST($f2))) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 
1:BIGINT, 3:BIGINT), 1) AS $g_1, =(CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 
1:BIGINT), 1:BIGINT, 3:BIGINT), 3) AS $g_3])
+   +- FlinkLogicalAggregate(group=[{0, 1, 2, 4}], EXPR$3=[MAX($3)], 
EXPR$4=[MIN($3)])
+      +- FlinkLogicalExpand(projects=[{d=[$0], c=[$1], $f2=[$2], e=[$3], 
$e=[0]}, {d=[$0], c=[$1], $f2=[null], e=[$3], $e=[1]}, {d=[$0], c=[null], 
$f2=[null], e=[$3], $e=[3]}])
+         +- FlinkLogicalCalc(select=[d, c, IS TRUE(>(a, 0)) AS $f2, e])
+            +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiDistinctOnSameColumnWithFilter">
+    <Resource name="sql">
+      <![CDATA[SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER (WHERE a 
> 10),
+COUNT(DISTINCT c) FILTER (WHERE a < 10) FROM MyTable2 GROUP BY d]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], 
EXPR$2=[COUNT(DISTINCT $1) FILTER $2], EXPR$3=[COUNT(DISTINCT $1) FILTER $3])
++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 10))], $f3=[IS TRUE(<($0, 
10))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $6], 
EXPR$2=[COUNT($1) FILTER $4], EXPR$3=[COUNT($1) FILTER $5])
++- FlinkLogicalCalc(select=[d, c, CAST($f2) AS $f2, CAST($f3) AS $f3, 
AND(=(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 1), 
IS TRUE(CAST($f2))) AS $g_1, AND(=(CASE(=($e, 1:BIGINT), 1:BIGINT, =($e, 
2:BIGINT), 2:BIGINT, 3:BIGINT), 2), IS TRUE(CAST($f3))) AS $g_2, =(CASE(=($e, 
1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT), 3) AS $g_3])
+   +- FlinkLogicalAggregate(group=[{0, 1, 2, 3, 4}])
+      +- FlinkLogicalExpand(projects=[{d=[$0], c=[$1], $f2=[$2], $f3=[null], 
$e=[1]}, {d=[$0], c=[$1], $f2=[null], $f3=[$3], $e=[2]}, {d=[$0], c=[$1], 
$f2=[null], $f3=[null], $e=[3]}])
+         +- FlinkLogicalCalc(select=[d, c, IS TRUE(>(a, 10)) AS $f2, IS 
TRUE(<(a, 10)) AS $f3])
+            +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSingleDistinctWithFilter">
+    <Resource name="sql">
+      <![CDATA[SELECT d, COUNT(DISTINCT c) FILTER (WHERE a > 0) FROM MyTable2 
GROUP BY d]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1) FILTER $2])
++- LogicalProject(d=[$3], c=[$2], $f2=[IS TRUE(>($0, 0))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($2)])
++- FlinkLogicalAggregate(group=[{0, 1, 2}])
+   +- FlinkLogicalCalc(select=[d, IS TRUE(>(a, 0)) AS $f2, CASE(IS TRUE(>(a, 
0)), c, null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS i$c])
+      +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiDistinctAndNonDistinctAggWithFilter">
+    <Resource name="sql">
+      <![CDATA[SELECT d, MAX(e), MAX(e) FILTER (WHERE a < 10), COUNT(DISTINCT 
c),
+COUNT(DISTINCT c) FILTER (WHERE a > 5), COUNT(DISTINCT b) FILTER (WHERE b > 3)
+FROM MyTable2 GROUP BY d]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)], EXPR$2=[MAX($1) FILTER $2], 
EXPR$3=[COUNT(DISTINCT $3)], EXPR$4=[COUNT(DISTINCT $3) FILTER $4], 
EXPR$5=[COUNT(DISTINCT $5) FILTER $6])
++- LogicalProject(d=[$3], e=[$4], $f2=[IS TRUE(<($0, 10))], c=[$2], $f4=[IS 
TRUE(>($0, 5))], b=[$1], $f6=[IS TRUE(>($1, 3))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, e)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+FlinkLogicalAggregate(group=[{0}], EXPR$1=[MIN($5) FILTER $10], 
EXPR$2=[MIN($6) FILTER $10], EXPR$3=[COUNT($1) FILTER $8], EXPR$4=[COUNT($1) 
FILTER $7], EXPR$5=[COUNT($3) FILTER $9])
++- FlinkLogicalCalc(select=[d, c, CAST($f4) AS $f4, b, CAST($f6) AS $f6, 
EXPR$1, EXPR$2, AND(=(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 
7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 3), IS TRUE(CAST($f4))) AS 
$g_3, =(CASE(=($e, 3:BIGINT), 3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 
12:BIGINT), 12:BIGINT, 15:BIGINT), 7) AS $g_7, AND(=(CASE(=($e, 3:BIGINT), 
3:BIGINT, =($e, 7:BIGINT), 7:BIGINT, =($e, 12:BIGINT), 12:BIGINT, 15:BIGINT), 
12), IS TRUE(CAST($f6))) AS $g_12, =(CASE(=( [...]
+   +- FlinkLogicalAggregate(group=[{0, 3, 4, 5, 6, 7}], EXPR$1=[MAX($1)], 
EXPR$2=[MAX($1) FILTER $2])
+      +- FlinkLogicalExpand(projects=[{d=[$0], e=[$1], $f2=[$2], c=[$3], 
$f4=[$4], b=[null], $f6=[null], $e=[3]}, {d=[$0], e=[$1], $f2=[$2], c=[$3], 
$f4=[null], b=[null], $f6=[null], $e=[7]}, {d=[$0], e=[$1], $f2=[$2], c=[null], 
$f4=[null], b=[$5], $f6=[$6], $e=[12]}, {d=[$0], e=[$1], $f2=[$2], c=[null], 
$f4=[null], b=[null], $f6=[null], $e=[15]}])
+         +- FlinkLogicalCalc(select=[d, e, IS TRUE(<(a, 10)) AS $f2, c, IS 
TRUE(>(a, 5)) AS $f4, b, IS TRUE(>(b, 3)) AS $f6])
+            +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, MyTable2, source: [TestTableSource(a, b, c, d, e)]]], 
fields=[a, b, c, d, e])
+]]>
+    </Resource>
+  </TestCase>
+  
 </Root>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.scala
index 85f4e9c..008b5a8 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/agg/DistinctAggregateTest.scala
@@ -18,68 +18,8 @@
 
 package org.apache.flink.table.planner.plan.batch.sql.agg
 
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.planner.plan.common.DistinctAggregateTestBase
 
-import org.junit.Test
-
-class DistinctAggregateTest extends TableTestBase {
-  private val util = batchTestUtil()
-  util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-  @Test
-  def testSingleDistinctAggregate(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a) FROM MyTable")
-  }
-
-  @Test
-  def testMultiDistinctAggregateOnSameColumn(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT 
a) FROM MyTable")
-  }
-
-  @Test
-  def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate(): Unit = {
-    // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable")
-  }
-
-  @Test
-  def testSingleDistinctAggregateAndOneOrMultiNonDistinctAggregate2(): Unit = {
-    // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
-    util.verifyPlan("SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable")
-  }
-
-  @Test
-  def testMultiDistinctAggregateOnDifferentColumn(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable")
-  }
-
-  @Test
-  def testMultiDistinctAndNonDistinctAggregateOnDifferentColumn(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM 
MyTable")
-  }
-
-  @Test
-  def testSingleDistinctAggregateWithGrouping(): Unit = {
-    util.verifyPlan("SELECT a, COUNT(a), SUM(DISTINCT b) FROM MyTable GROUP BY 
a")
-  }
-
-  @Test
-  def testSingleDistinctAggregateWithGroupingAndCountStar(): Unit = {
-    util.verifyPlan("SELECT a, COUNT(*), SUM(DISTINCT b) FROM MyTable GROUP BY 
a")
-  }
-
-  @Test
-  def testTwoDistinctAggregateWithGroupingAndCountStar(): Unit = {
-    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT b) 
FROM MyTable GROUP BY a"
-    util.verifyPlan(sqlQuery)
-  }
-
-  @Test
-  def testTwoDifferentDistinctAggregateWithGroupingAndCountStar(): Unit = {
-    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT c) 
FROM MyTable GROUP BY a"
-    util.verifyPlan(sqlQuery)
-  }
+class DistinctAggregateTest extends DistinctAggregateTestBase {
 
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala
similarity index 69%
copy from 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala
copy to 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala
index 9552e81..d631892 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/DistinctAggregateTestBase.scala
@@ -16,62 +16,57 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.plan.rules.logical
+package org.apache.flink.table.planner.plan.common
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram
-import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.planner.utils.{BatchTableTestUtil, TableTestBase}
+import org.junit.{Before, Test}
 
-import org.junit.Test
-
-/**
-  * Test for [[FlinkAggregateExpandDistinctAggregatesRule]].
-  */
-class FlinkAggregateExpandDistinctAggregatesRuleTest extends TableTestBase {
-  private val util = batchTestUtil()
-  util.addTableSource[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-  util.addTableSource[(Int, Long, String, String, String)]("MyTable2", 'a, 'b, 
'c, 'd, 'e)
-  util.buildBatchProgram(FlinkBatchProgram.PHYSICAL)
+abstract class DistinctAggregateTestBase extends TableTestBase {
+  protected val util: BatchTableTestUtil = batchTestUtil()
 
+  @Before
+  def setup(): Unit = {
+    util.addTableSource[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+    util.addTableSource[(Int, Long, String, String, String)]("MyTable2", 'a, 
'b, 'c, 'd, 'e)
+  }
   @Test
   def testSingleDistinctAgg(): Unit = {
     util.verifyPlan("SELECT COUNT(DISTINCT a) FROM MyTable")
   }
 
   @Test
-  def testSingleDistinctAggOnMultiColumns(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a, b) FROM MyTable")
-  }
-
-  @Test
   def testMultiDistinctAggOnSameColumn(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT 
a) FROM MyTable")
+    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT a) FILTER (WHERE b 
> 0),\n" +
+      "MAX(DISTINCT a) FROM MyTable")
   }
 
   @Test
   def testSingleDistinctAggAndOneOrMultiNonDistinctAgg1(): Unit = {
     // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable")
+    util.verifyPlan("SELECT COUNT(DISTINCT a) FILTER (WHERE a > 0), SUM(b) 
FROM MyTable")
   }
 
   @Test
   def testSingleDistinctAggAndOneOrMultiNonDistinctAgg2(): Unit = {
     // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
     // when field `a` is non-nullable, count(a) = count(*)
-    util.verifyPlan("SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable")
+    util.verifyPlan("SELECT COUNT(a) filter (WHERE a > 0), SUM(DISTINCT b) 
FROM MyTable")
   }
 
   @Test
   def testMultiDistinctAggOnDifferentColumn(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable")
+    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b),\n" +
+      "COUNT(DISTINCT c) FILTER (WHERE a > 5) FROM MyTable")
   }
 
   @Test
   def testMultiDistinctAndNonDistinctAggOnDifferentColumn(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM 
MyTable")
+    util.verifyPlan("SELECT COUNT(DISTINCT a) FILTER (WHERE c > 0),\n" +
+      "SUM(DISTINCT b), COUNT(c) FROM MyTable")
   }
 
   @Test
@@ -121,11 +116,6 @@ class FlinkAggregateExpandDistinctAggregatesRuleTest 
extends TableTestBase {
   }
 
   @Test
-  def testSingleDistinctAggOnMultiColumnsWithGroupingSets(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a, b) FROM MyTable2 GROUP BY 
GROUPING SETS (c, d)")
-  }
-
-  @Test
   def testMultiDistinctAggOnSameColumnWithGroupingSets(): Unit = {
     val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) 
" +
       "FROM MyTable2 GROUP BY GROUPING SETS (b, c)"
@@ -158,6 +148,41 @@ class FlinkAggregateExpandDistinctAggregatesRuleTest 
extends TableTestBase {
     util.verifyPlan(sqlQuery)
   }
 
+  @Test
+  def testSingleDistinctWithFilter(): Unit = {
+    val sqlQuery = "SELECT d, COUNT(DISTINCT c) FILTER (WHERE a > 0) FROM 
MyTable2 GROUP BY d"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiDistinctOnSameColumnWithFilter(): Unit = {
+    val sqlQuery = "SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER 
(WHERE a > 10),\n" +
+      "COUNT(DISTINCT c) FILTER (WHERE a < 10) FROM MyTable2 GROUP BY d"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def TestMultiDistinctOnDifferentColumnWithFilter(): Unit = {
+    val sqlQuery = "SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER 
(WHERE a > 0),\n" +
+      "COUNT(DISTINCT b) FILTER (WHERE b > 1) FROM MyTable2 GROUP BY d"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def TestMultiDistinctWithFilterAndNonDistinctAgg(): Unit = {
+    val sqlQuery = "SELECT d, COUNT(DISTINCT c), COUNT(DISTINCT c) FILTER 
(WHERE a > 0),\n" +
+      "MAX(e), MIN(e) FROM MyTable2 GROUP BY d"
+    util.verifyPlan(sqlQuery)
+  }
+
+  @Test
+  def testMultiDistinctAndNonDistinctAggWithFilter(): Unit = {
+    val sqlQuery = "SELECT d, MAX(e), MAX(e) FILTER (WHERE a < 10), 
COUNT(DISTINCT c),\n" +
+      "COUNT(DISTINCT c) FILTER (WHERE a > 5), COUNT(DISTINCT b) FILTER (WHERE 
b > 3)\n" +
+      "FROM MyTable2 GROUP BY d"
+    util.verifyPlan(sqlQuery)
+  }
+
   @Test(expected = classOf[RuntimeException])
   def testTooManyDistinctAggOnDifferentColumn(): Unit = {
     // max group count must be less than 64
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala
index 9552e81..d80d641 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateExpandDistinctAggregatesRuleTest.scala
@@ -18,27 +18,19 @@
 
 package org.apache.flink.table.planner.plan.rules.logical
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
+import org.apache.flink.table.planner.plan.common.DistinctAggregateTestBase
 import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram
-import org.apache.flink.table.planner.utils.TableTestBase
-
-import org.junit.Test
+import org.junit.{Before, Test}
 
 /**
   * Test for [[FlinkAggregateExpandDistinctAggregatesRule]].
   */
-class FlinkAggregateExpandDistinctAggregatesRuleTest extends TableTestBase {
-  private val util = batchTestUtil()
-  util.addTableSource[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
-  util.addTableSource[(Int, Long, String, String, String)]("MyTable2", 'a, 'b, 
'c, 'd, 'e)
-  util.buildBatchProgram(FlinkBatchProgram.PHYSICAL)
+class FlinkAggregateExpandDistinctAggregatesRuleTest extends 
DistinctAggregateTestBase {
 
-  @Test
-  def testSingleDistinctAgg(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a) FROM MyTable")
+  @Before
+  override def setup(): Unit = {
+    util.buildBatchProgram(FlinkBatchProgram.PHYSICAL)
+    super.setup()
   }
 
   @Test
@@ -47,128 +39,7 @@ class FlinkAggregateExpandDistinctAggregatesRuleTest 
extends TableTestBase {
   }
 
   @Test
-  def testMultiDistinctAggOnSameColumn(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT 
a) FROM MyTable")
-  }
-
-  @Test
-  def testSingleDistinctAggAndOneOrMultiNonDistinctAgg1(): Unit = {
-    // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable")
-  }
-
-  @Test
-  def testSingleDistinctAggAndOneOrMultiNonDistinctAgg2(): Unit = {
-    // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
-    // when field `a` is non-nullable, count(a) = count(*)
-    util.verifyPlan("SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable")
-  }
-
-  @Test
-  def testMultiDistinctAggOnDifferentColumn(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable")
-  }
-
-  @Test
-  def testMultiDistinctAndNonDistinctAggOnDifferentColumn(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM 
MyTable")
-  }
-
-  @Test
-  def testSingleDistinctAggWithGroupBy(): Unit = {
-    // when field `a` is non-nullable, count(a) = count(*)
-    util.verifyPlan("SELECT a, COUNT(a), SUM(DISTINCT b) FROM MyTable GROUP BY 
a")
-  }
-
-  @Test
-  def testSingleDistinctAggWithGroupByAndCountStar(): Unit = {
-    util.verifyPlan("SELECT a, COUNT(*), SUM(DISTINCT b) FROM MyTable GROUP BY 
a")
-  }
-
-  @Test
-  def testTwoDistinctAggWithGroupByAndCountStar(): Unit = {
-    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT b) 
FROM MyTable GROUP BY a"
-    util.verifyPlan(sqlQuery)
-  }
-
-  @Test
-  def testTwoDifferentDistinctAggWithGroupByAndCountStar(): Unit = {
-    val sqlQuery = "SELECT a, COUNT(*), SUM(DISTINCT b), COUNT(DISTINCT c) 
FROM MyTable GROUP BY a"
-    util.verifyPlan(sqlQuery)
-  }
-
-  @Test
-  def testMultiDifferentDistinctAggWithNonDistinctAggOnSameColumn(): Unit = {
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(DISTINCT b), MAX(a), MIN(a) 
FROM MyTable")
-  }
-
-  @Test
-  def testMultiDifferentDistinctAggWithNonDistinctAggOnSameColumnAndGroupBy(): 
Unit = {
-    val sqlQuery =
-      "SELECT COUNT(DISTINCT a), SUM(DISTINCT b), MAX(a), MIN(a) FROM MyTable 
GROUP BY c"
-    util.verifyPlan(sqlQuery)
-  }
-
-  @Test
-  def 
testMultiDifferentDistinctAggWithNonDistinctAggOnDifferentColumnAndGroupBy(): 
Unit = {
-    util.verifyPlan("SELECT SUM(DISTINCT a), COUNT(DISTINCT c) FROM MyTable 
GROUP BY b")
-  }
-
-  @Test
-  def testDistinctAggWithDuplicateField(): Unit = {
-    // when field `a` is non-nullable, count(a) = count(*)
-    util.verifyPlan("SELECT a, COUNT(a), SUM(b), SUM(DISTINCT b) FROM MyTable 
GROUP BY a")
-  }
-
-  @Test
   def testSingleDistinctAggOnMultiColumnsWithGroupingSets(): Unit = {
     util.verifyPlan("SELECT COUNT(DISTINCT a, b) FROM MyTable2 GROUP BY 
GROUPING SETS (c, d)")
   }
-
-  @Test
-  def testMultiDistinctAggOnSameColumnWithGroupingSets(): Unit = {
-    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) 
" +
-      "FROM MyTable2 GROUP BY GROUPING SETS (b, c)"
-    util.verifyPlan(sqlQuery)
-  }
-
-  @Test
-  def testSingleDistinctAggAndOneOrMultiNonDistinctAggWithGroupingSets1(): 
Unit = {
-    // case 0x00: DISTINCT on COUNT and Non-DISTINCT on others
-    util.verifyPlan("SELECT COUNT(DISTINCT a), SUM(b) FROM MyTable2 GROUP BY 
GROUPING SETS (b, c)")
-  }
-
-  @Test
-  def testSingleDistinctAggAndOneOrMultiNonDistinctAggWithGroupingSets2(): 
Unit = {
-    // case 0x01: Non-DISTINCT on COUNT and DISTINCT on others
-    util.verifyPlan("SELECT COUNT(a), SUM(DISTINCT b) FROM MyTable2 GROUP BY 
GROUPING SETS (c, d)")
-  }
-
-  @Test
-  def testMultiDistinctAggOnDifferentColumnWithGroupingSets(): Unit = {
-    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b) FROM MyTable2 " +
-      "GROUP BY GROUPING SETS (c, d)"
-    util.verifyPlan(sqlQuery)
-  }
-
-  @Test
-  def testMultiDistinctAndNonDistinctAggOnDifferentColumnWithGroupingSets(): 
Unit = {
-    val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT b), COUNT(c) FROM 
MyTable2 " +
-      "GROUP BY GROUPING SETS (d, e)"
-    util.verifyPlan(sqlQuery)
-  }
-
-  @Test(expected = classOf[RuntimeException])
-  def testTooManyDistinctAggOnDifferentColumn(): Unit = {
-    // max group count must be less than 64
-    val fieldNames = (0 until 64).map(i => s"f$i").toArray
-    val fieldTypes: Array[TypeInformation[_]] = 
Array.fill(fieldNames.length)(Types.INT)
-    util.addTableSource("MyTable64", fieldTypes, fieldNames)
-
-    val distinctList = fieldNames.map(f => s"COUNT(DISTINCT $f)").mkString(", 
")
-    val maxList = fieldNames.map(f => s"MAX($f)").mkString(", ")
-    val sqlQuery = s"SELECT $distinctList, $maxList FROM MyTable64"
-
-    util.verifyPlan(sqlQuery)
-  }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala
index 07016a4..fd0a126 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/DistinctAggregateITCaseBase.scala
@@ -24,7 +24,6 @@ import 
org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.runtime.utils.TestData._
 import org.apache.flink.types.Row
-
 import org.junit.{Before, Ignore, Test}
 
 import scala.collection.Seq
@@ -282,6 +281,64 @@ abstract class DistinctAggregateITCaseBase extends 
BatchTestBase {
     )
   }
 
+  @Test
+  def testSingleDistinctWithFilter(): Unit = {
+    checkResult("SELECT e, COUNT(DISTINCT a) FILTER (WHERE c > 0) FROM Table5 
GROUP BY e",
+      Seq(row(1, 3), row(2, 4), row(3, 2))
+    )
+  }
+
+  @Test
+  def testMultiDistinctOnSameColumnWithFilter(): Unit = {
+    checkResult("SELECT e, COUNT(DISTINCT a), COUNT(DISTINCT a) FILTER (WHERE 
c > 0), " +
+      "COUNT(DISTINCT a) FILTER (WHERE c < 10) FROM Table5 GROUP BY e",
+      Seq(row(1, 4, 3, 3), row(2, 4, 4, 3), row(3, 2, 2, 1)))
+  }
+
+  @Test
+  def TestMultiDistinctOnDifferentColumnWithFilter(): Unit = {
+    checkResult("SELECT e, COUNT(DISTINCT a), COUNT(DISTINCT a) FILTER (WHERE 
c > 0), " +
+      "COUNT(DISTINCT b) FILTER (WHERE b > 1) FROM Table5 GROUP BY e",
+      Seq(row(1, 4, 3, 4), row(2, 4, 4, 7), row(3, 2, 2, 3)))
+  }
+
+  @Test
+  def TestMultiDistinctWithFilterAndNonDistinctAgg(): Unit = {
+    checkResult("SELECT e, COUNT(DISTINCT a), COUNT(DISTINCT a) FILTER (WHERE 
c > 0), " +
+      "MAX(c), MIN(c) FROM Table5 GROUP BY e",
+      Seq(row(1, 4, 3, 10, 0), row(2, 4, 4, 14, 1), row (3, 2, 2, 12, 5)))
+  }
+
+  @Test
+  def testMultiDistinctAndNonDistinctAggWithFilter(): Unit = {
+    checkResult("SELECT e, MAX(c), MAX(c) FILTER (WHERE b < 10), 
COUNT(DISTINCT a), " +
+      "COUNT(DISTINCT a) FILTER (WHERE c > 5), COUNT(DISTINCT b) FILTER (WHERE 
b > 3)\n" +
+      "FROM Table5 GROUP BY e",
+      Seq(row(1, 10, 8, 4, 2, 3), row(2, 14, 6, 4, 2, 6), row (3, 12, 5, 2, 1, 
3)))
+  }
+
+  @Test
+  def TestDistinctWithFilterWithoutGroupBy(): Unit = {
+    // single distinct agg with filter.
+    checkResult("SELECT COUNT(DISTINCT a) FILTER (WHERE c > 0) FROM Table5",
+      Seq(row(4)))
+
+    // multi distinct aggs on same column with filter.
+    checkResult("SELECT COUNT(DISTINCT a), COUNT(DISTINCT a) FILTER (WHERE c > 
10),\n" +
+      "COUNT(DISTINCT a) FILTER (WHERE c < 10) FROM Table5",
+      Seq(row(5, 1, 4)))
+
+    // multi distinct aggs on different columns with filter.
+    checkResult("SELECT COUNT(DISTINCT a), COUNT(DISTINCT a) FILTER (WHERE c > 
0),\n" +
+      "COUNT(DISTINCT b) FILTER (WHERE b > 1) FROM Table5",
+      Seq(row(5, 4, 14)))
+
+    // multi distinct aggs with non-distinct agg with filter.
+    checkResult("SELECT MAX(e), MAX(e) FILTER (WHERE c < 10), COUNT(DISTINCT 
a),\n" +
+      "COUNT(DISTINCT a) FILTER (WHERE c > 5), COUNT(DISTINCT b) FILTER (WHERE 
b > 3) FROM Table5",
+      Seq(row(3, 3, 5, 2, 12)))
+  }
+
   // TODO remove Ignore after supporting generated code cloud be splitted into
   //  small classes or methods due to code is too large
   @Ignore

Reply via email to