wanglijie95 commented on code in PR #23216:
URL: https://github.com/apache/flink/pull/23216#discussion_r1302694349


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.java:
##########
@@ -202,6 +202,64 @@ public void testBuildSideIsJoinWithoutExchange() throws 
Exception {
         util.verifyPlan(query);
     }
 
+    @Test
+    public void testBuildSideIsJoinWithTwoAggInputs() throws Exception {

Review Comment:
   The test `testBuildSideIsAggWithoutExchange` has covered this case.



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.xml:
##########
@@ -430,6 +430,104 @@ MultipleInput(readOrder=[2,0,1], 
members=[\nHashJoin(joinType=[InnerJoin], where
                </Resource>
        </TestCase>
 
+       <TestCase name="testBuildSideIsJoinWithTwoAggInputs">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
dim_date_sk=[$5], EXPR$1=[$6], dim_date_sk0=[$7], EXPR$10=[$8])
++- LogicalJoin(condition=[=($4, $5)], joinType=[inner])
+   :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+   +- LogicalProject(dim_date_sk=[$0], EXPR$1=[$1], dim_date_sk0=[$2], 
EXPR$10=[$3])
+      +- LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+         :- LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
+         :  +- LogicalProject(dim_date_sk=[$4], price=[$3])
+         :     +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+         +- LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
+            +- LogicalProject(dim_date_sk=[$4], amount=[$2])
+               +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], 
select=[id, name, amount, price, fact_date_sk, dim_date_sk, EXPR$1, 
dim_date_sk0, EXPR$10], build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- RuntimeFilter(select=[fact_date_sk], 
estimatedFilterRatio=[0.9998779296875])
+:     :- Exchange(distribution=[broadcast])
+:     :  +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[131072], maxRowCount=[436907])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[131072], maxRowCount=[436907])
+:     :           +- TableSourceScan(table=[[testCatalog, test_database, dim, 
project=[dim_date_sk, amount], metadata=[]]], fields=[dim_date_sk, amount])
+:     +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
++- HashJoin(joinType=[InnerJoin], where=[=(dim_date_sk, dim_date_sk0)], 
select=[dim_date_sk, EXPR$1, dim_date_sk0, EXPR$10], build=[right])
+   :- HashAggregate(isMerge=[false], groupBy=[dim_date_sk], 
select=[dim_date_sk, SUM(price) AS EXPR$1])
+   :  +- Exchange(distribution=[hash[dim_date_sk]])
+   :     +- Calc(select=[dim_date_sk, price])
+   :        +- TableSourceScan(table=[[testCatalog, test_database, dim, 
project=[amount, price, dim_date_sk], metadata=[]]], fields=[amount, price, 
dim_date_sk])
+   +- HashAggregate(isMerge=[false], groupBy=[dim_date_sk], 
select=[dim_date_sk, SUM(amount) AS EXPR$1])
+      +- Exchange(distribution=[hash[dim_date_sk]])
+         +- Calc(select=[dim_date_sk, amount])
+            +- TableSourceScan(table=[[testCatalog, test_database, dim, 
project=[amount, price, dim_date_sk], metadata=[]]], fields=[amount, price, 
dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[2,1,0], members=[\nHashJoin(joinType=[InnerJoin], 
where=[(fact_date_sk = dim_date_sk)], select=[id, name, amount, price, 
fact_date_sk, dim_date_sk, EXPR$1, dim_date_sk0, EXPR$10], build=[right])\n:- 
[#1] Exchange(distribution=[hash[fact_date_sk]])\n+- 
HashJoin(joinType=[InnerJoin], where=[(dim_date_sk = dim_date_sk0)], 
select=[dim_date_sk, EXPR$1, dim_date_sk0, EXPR$10], build=[right])\n   :- 
HashAggregate(isMerge=[false], groupBy=[dim_date_sk], select=[dim_date_sk, 
SUM(price) AS EXPR$1])\n   :  +- [#2] 
Exchange(distribution=[hash[dim_date_sk]])\n   +- 
HashAggregate(isMerge=[false], groupBy=[dim_date_sk], select=[dim_date_sk, 
SUM(amount) AS EXPR$1])\n      +- [#3] 
Exchange(distribution=[hash[dim_date_sk]])\n])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- MultipleInput(readOrder=[0,1], 
members=[\nRuntimeFilter(select=[fact_date_sk], 
estimatedFilterRatio=[0.9998779296875])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, 
amount, price, fact_date_sk])\n])
+:     :- Exchange(distribution=[broadcast])
+:     :  +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[131072], maxRowCount=[436907])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[131072], maxRowCount=[436907])
+:     :           +- TableSourceScan(table=[[testCatalog, test_database, dim, 
project=[dim_date_sk, amount], metadata=[]]], fields=[dim_date_sk, amount])

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to