wanglijie95 commented on code in PR #22966:
URL: https://github.com/apache/flink/pull/22966#discussion_r1271379553


##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgramTest.xml:
##########
@@ -0,0 +1,1022 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+       <TestCase name="testSimpleInnerJoin">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
id0=[$5], male=[$6], amount0=[$7], price0=[$8], dim_date_sk=[$9])
++- LogicalFilter(condition=[AND(=($2, $7), <($8, 500))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(amount, amount0)], select=[id, name, 
amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], 
build=[right])
+:- Exchange(distribution=[hash[amount]])
+:  +- RuntimeFilter(select=[amount], estimatedFilterRatio=[0.99993896484375])
+:     :- Exchange(distribution=[broadcast])
+:     :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+:     :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:     +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[(amount = amount0)], select=[id, name, 
amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], 
build=[right])
+:- Exchange(distribution=[hash[amount]])
+:  +- MultipleInput(readOrder=[0,1], members=[\nRuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, 
amount, price, fact_date_sk])\n])
+:     :- Exchange(distribution=[broadcast])
+:     :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[(price < 500)])(reuse_id=[1])
+:     :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:     +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- Reused(reference_id=[1])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testSemiJoin">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4])
++- LogicalFilter(condition=[IN($4, {
+LogicalProject(dim_date_sk=[$4])
+  LogicalFilter(condition=[<($3, 500)])
+    LogicalTableScan(table=[[testCatalog, test_database, dim]])
+})])
+   +- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(fact_date_sk, dim_date_sk)], 
select=[id, name, amount, price, fact_date_sk], isBroadcast=[true], 
build=[right])
+:- RuntimeFilter(select=[fact_date_sk], 
estimatedFilterRatio=[0.99993896484375])
+:  :- Exchange(distribution=[broadcast])
+:  :  +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[655360])
+:  :     +- Exchange(distribution=[single])
+:  :        +- LocalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[655360])
+:  :           +- Calc(select=[dim_date_sk], where=[<(price, 500)])
+:  :              +- TableSourceScan(table=[[testCatalog, test_database, dim, 
filter=[], project=[price, dim_date_sk], metadata=[]]], fields=[price, 
dim_date_sk])
+:  +- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[broadcast])
+   +- Calc(select=[dim_date_sk], where=[<(price, 500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], 
project=[price, dim_date_sk], metadata=[]]], fields=[price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[0,0,1], members=[\nHashJoin(joinType=[LeftSemiJoin], 
where=[(fact_date_sk = dim_date_sk)], select=[id, name, amount, price, 
fact_date_sk], isBroadcast=[true], build=[right])\n:- 
RuntimeFilter(select=[fact_date_sk], 
estimatedFilterRatio=[0.99993896484375])\n:  :- [#2] 
Exchange(distribution=[broadcast])\n:  +- [#3] 
TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, 
amount, price, fact_date_sk])\n+- [#1] Exchange(distribution=[broadcast])\n])
+:- Exchange(distribution=[broadcast])
+:  +- Calc(select=[dim_date_sk], where=[(price < 500)])(reuse_id=[1])
+:     +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], 
project=[price, dim_date_sk], metadata=[]]], fields=[price, dim_date_sk])
+:- Exchange(distribution=[broadcast])
+:  +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[655360])
+:     +- Exchange(distribution=[single])
+:        +- LocalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[655360])
+:           +- Reused(reference_id=[1])
++- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testLeftOuterJoinWithLeftBuild">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], male=[$1], amount=[$2], price=[$3], dim_date_sk=[$4], 
id0=[$5], name=[$6], amount0=[$7], price0=[$8], fact_date_sk=[$9])
++- LogicalJoin(condition=[AND(=($7, $2), <($3, 500))], joinType=[left])
+   :- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+   +- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[LeftOuterJoin], where=[AND(=(amount0, amount), <(price, 
500))], select=[id, male, amount, price, dim_date_sk, id0, name, amount0, 
price0, fact_date_sk], build=[left])
+:- Exchange(distribution=[hash[amount]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, dim]], fields=[id, 
male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- RuntimeFilter(select=[amount], estimatedFilterRatio=[0.9998779296875])
+      :- Exchange(distribution=[broadcast])
+      :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[131072], maxRowCount=[218454])
+      :     +- Exchange(distribution=[single])
+      :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[131072], maxRowCount=[218454])
+      :           +- TableSourceScan(table=[[testCatalog, test_database, 
dim]], fields=[id, male, amount, price, dim_date_sk])
+      +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[LeftOuterJoin], where=[((amount0 = amount) AND (price < 
500))], select=[id, male, amount, price, dim_date_sk, id0, name, amount0, 
price0, fact_date_sk], build=[left])
+:- Exchange(distribution=[hash[amount]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, dim]], fields=[id, 
male, amount, price, dim_date_sk])(reuse_id=[1])
++- Exchange(distribution=[hash[amount]])
+   +- MultipleInput(readOrder=[0,1], members=[\nRuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.9998779296875])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, 
amount, price, fact_date_sk])\n])
+      :- Exchange(distribution=[broadcast])
+      :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[131072], maxRowCount=[218454])
+      :     +- Exchange(distribution=[single])
+      :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[131072], maxRowCount=[218454])
+      :           +- Reused(reference_id=[1])
+      +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testLeftOuterJoinWithRightBuild">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
id0=[$5], male=[$6], amount0=[$7], price0=[$8], dim_date_sk=[$9])
++- LogicalJoin(condition=[AND(=($2, $7), <($8, 500))], joinType=[left])
+   :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+   +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[LeftOuterJoin], where=[=(amount, amount0)], select=[id, 
name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], 
build=[right])
+:- Exchange(distribution=[hash[amount]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[LeftOuterJoin], where=[(amount = amount0)], select=[id, 
name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], 
build=[right])
+:- Exchange(distribution=[hash[amount]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[(price < 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testFullOuterJoin">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
id0=[$5], male=[$6], amount0=[$7], price0=[$8], dim_date_sk=[$9])
++- LogicalJoin(condition=[=($4, $9)], joinType=[full])
+   :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+   +- LogicalProject(id=[$0], male=[$1], amount=[$2], price=[$3], 
dim_date_sk=[$4])
+      +- LogicalFilter(condition=[<($3, 500)])
+         +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[FullOuterJoin], where=[=(fact_date_sk, dim_date_sk)], 
select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, 
dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[dim_date_sk]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[FullOuterJoin], where=[(fact_date_sk = dim_date_sk)], 
select=[id, name, amount, price, fact_date_sk, id0, male, amount0, price0, 
dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[dim_date_sk]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[(price < 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testAntiJoin">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4])
++- LogicalFilter(condition=[NOT(IN($4, {
+LogicalProject(dim_date_sk=[$4])
+  LogicalFilter(condition=[<($3, 500)])
+    LogicalTableScan(table=[[testCatalog, test_database, dim]])
+}))])
+   +- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+NestedLoopJoin(joinType=[LeftAntiJoin], where=[OR(IS NULL(fact_date_sk), IS 
NULL(dim_date_sk), =(fact_date_sk, dim_date_sk))], select=[id, name, amount, 
price, fact_date_sk], build=[right])
+:- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[broadcast])
+   +- Calc(select=[dim_date_sk], where=[<(price, 500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], 
project=[price, dim_date_sk], metadata=[]]], fields=[price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[1,0], 
members=[\nNestedLoopJoin(joinType=[LeftAntiJoin], where=[(fact_date_sk IS NULL 
OR dim_date_sk IS NULL OR (fact_date_sk = dim_date_sk))], select=[id, name, 
amount, price, fact_date_sk], build=[right])\n:- [#1] 
TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, 
amount, price, fact_date_sk])\n+- [#2] Exchange(distribution=[broadcast])\n])
+:- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[broadcast])
+   +- Calc(select=[dim_date_sk], where=[(price < 500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[], 
project=[price, dim_date_sk], metadata=[]]], fields=[price, dim_date_sk])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testNestedLoopJoin">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
id0=[$5], male=[$6], amount0=[$7], price0=[$8], dim_date_sk=[$9])
++- LogicalFilter(condition=[AND(=($2, $7), <($8, 500))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+NestedLoopJoin(joinType=[InnerJoin], where=[=(amount, amount0)], select=[id, 
name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], 
build=[right])
+:- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[broadcast])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[1,0], members=[\nNestedLoopJoin(joinType=[InnerJoin], 
where=[(amount = amount0)], select=[id, name, amount, price, fact_date_sk, id0, 
male, amount0, price0, dim_date_sk], build=[right])\n:- [#1] 
TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, 
amount, price, fact_date_sk])\n+- [#2] Exchange(distribution=[broadcast])\n])
+:- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[broadcast])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[(price < 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testProbeSideIsTooSmall">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
id0=[$5], male=[$6], amount0=[$7], price0=[$8], dim_date_sk=[$9])
++- LogicalFilter(condition=[AND(=($2, $7), <($8, 500))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(amount, amount0)], select=[id, name, 
amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], 
build=[right])
+:- Exchange(distribution=[hash[amount]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[(amount = amount0)], select=[id, name, 
amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], 
build=[right])
+:- Exchange(distribution=[hash[amount]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[(price < 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testBuildSideIsTooLarge">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
id0=[$5], male=[$6], amount0=[$7], price0=[$8], dim_date_sk=[$9])
++- LogicalFilter(condition=[AND(=($2, $7), <($8, 500))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(amount, amount0)], select=[id, name, 
amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], 
build=[right])
+:- Exchange(distribution=[hash[amount]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[(amount = amount0)], select=[id, name, 
amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], 
build=[right])
+:- Exchange(distribution=[hash[amount]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[(price < 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+       </TestCase>
+
+
+       <TestCase name="testFilterRatioIsTooSmall">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
id0=[$5], male=[$6], amount0=[$7], price0=[$8], dim_date_sk=[$9])
++- LogicalFilter(condition=[AND(=($2, $7), <($8, 500))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(amount, amount0)], select=[id, name, 
amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], 
build=[right])
+:- Exchange(distribution=[hash[amount]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[(amount = amount0)], select=[id, name, 
amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], 
build=[right])
+:- Exchange(distribution=[hash[amount]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, 
name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[(price < 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testBuildSideIsJoinWithoutExchange">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], male=[$1], amount=[$2], price=[$3], dim_date_sk=[$4], 
id0=[$5], name=[$6], amount0=[$7], price0=[$8], fact_date_sk=[$9], id1=[$10], 
amount1=[$11], price1=[$12])
++- LogicalFilter(condition=[AND(=($7, $11), =($7, $2), <($3, 500))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, fact2]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(amount0, amount1)], select=[id, male, 
amount, price, dim_date_sk, id0, name, amount0, price0, fact_date_sk, id1, 
amount1, price1], build=[left])
+:- HashJoin(joinType=[InnerJoin], where=[=(amount0, amount)], select=[id, 
male, amount, price, dim_date_sk, id0, name, amount0, price0, fact_date_sk], 
build=[left])
+:  :- Exchange(distribution=[hash[amount]])
+:  :  +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+:  :     +- TableSourceScan(table=[[testCatalog, test_database, dim, 
filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:  +- Exchange(distribution=[hash[amount]])
+:     +- RuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])
+:        :- Exchange(distribution=[broadcast])
+:        :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:        :     +- Exchange(distribution=[single])
+:        :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:        :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+:        :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:        +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- RuntimeFilter(select=[amount], estimatedFilterRatio=[0.99993896484375])
+      :- Exchange(distribution=[broadcast])
+      :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+      :     +- Exchange(distribution=[single])
+      :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+      :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+      :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+      +- TableSourceScan(table=[[testCatalog, test_database, fact2]], 
fields=[id, amount, price])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[2,0,1], members=[\nHashJoin(joinType=[InnerJoin], 
where=[(amount0 = amount1)], select=[id, male, amount, price, dim_date_sk, id0, 
name, amount0, price0, fact_date_sk, id1, amount1, price1], build=[left])\n:- 
HashJoin(joinType=[InnerJoin], where=[(amount0 = amount)], select=[id, male, 
amount, price, dim_date_sk, id0, name, amount0, price0, fact_date_sk], 
build=[left])\n:  :- [#2] Exchange(distribution=[hash[amount]])\n:  +- [#3] 
Exchange(distribution=[hash[amount]])\n+- [#1] 
Exchange(distribution=[hash[amount]])\n])
+:- Exchange(distribution=[hash[amount]])
+:  +- MultipleInput(readOrder=[0,1], members=[\nRuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact2]], fields=[id, 
amount, price])\n])
+:     :- Exchange(distribution=[broadcast])(reuse_id=[2])
+:     :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[(price < 500)])(reuse_id=[1])
+:     :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:     +- TableSourceScan(table=[[testCatalog, test_database, fact2]], 
fields=[id, amount, price])
+:- Exchange(distribution=[hash[amount]])
+:  +- Reused(reference_id=[1])
++- Exchange(distribution=[hash[amount]])
+   +- MultipleInput(readOrder=[0,1], members=[\nRuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, 
amount, price, fact_date_sk])\n])
+      :- Reused(reference_id=[2])
+      +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testBuildSideIsAggWithoutExchange">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
dim_date_sk=[$5], EXPR$1=[$6])
++- LogicalJoin(condition=[=($4, $5)], joinType=[inner])
+   :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+   +- LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
+      +- LogicalProject(dim_date_sk=[$4], price=[$3])
+         +- LogicalFilter(condition=[<($3, 500)])
+            +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], 
select=[id, name, amount, price, fact_date_sk, dim_date_sk, EXPR$1], 
build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- RuntimeFilter(select=[fact_date_sk], 
estimatedFilterRatio=[0.99993896484375])
+:     :- Exchange(distribution=[broadcast])
+:     :  +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[436907])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[436907])
+:     :           +- Calc(select=[price, dim_date_sk], where=[<(price, 500)])
+:     :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[], project=[price, dim_date_sk], metadata=[]]], fields=[price, 
dim_date_sk])
+:     +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
++- HashAggregate(isMerge=[false], groupBy=[dim_date_sk], select=[dim_date_sk, 
SUM(price) AS EXPR$1])
+   +- Exchange(distribution=[hash[dim_date_sk]])
+      +- Calc(select=[price, dim_date_sk], where=[<(price, 500)])
+         +- TableSourceScan(table=[[testCatalog, test_database, dim, 
filter=[], project=[price, dim_date_sk], metadata=[]]], fields=[price, 
dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin], 
where=[(fact_date_sk = dim_date_sk)], select=[id, name, amount, price, 
fact_date_sk, dim_date_sk, EXPR$1], build=[right])\n:- [#1] 
Exchange(distribution=[hash[fact_date_sk]])\n+- HashAggregate(isMerge=[false], 
groupBy=[dim_date_sk], select=[dim_date_sk, SUM(price) AS EXPR$1])\n   +- [#2] 
Exchange(distribution=[hash[dim_date_sk]])\n])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- MultipleInput(readOrder=[0,1], 
members=[\nRuntimeFilter(select=[fact_date_sk], 
estimatedFilterRatio=[0.99993896484375])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, 
amount, price, fact_date_sk])\n])
+:     :- Exchange(distribution=[broadcast])
+:     :  +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[436907])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[436907])
+:     :           +- Calc(select=[price, dim_date_sk], where=[(price < 
500)])(reuse_id=[1])
+:     :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[], project=[price, dim_date_sk], metadata=[]]], fields=[price, 
dim_date_sk])
+:     +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[dim_date_sk]])
+   +- Reused(reference_id=[1])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testBuildSideIsCalcWithoutExchange">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
dim_date_sk=[$5], sum_price=[$6])
++- LogicalJoin(condition=[=($4, $5)], joinType=[inner])
+   :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+   +- LogicalProject(dim_date_sk=[$0], sum_price=[+($1, 1)])
+      +- LogicalAggregate(group=[{0}], agg#0=[SUM($1)])
+         +- LogicalProject(dim_date_sk=[$4], price=[$3])
+            +- LogicalFilter(condition=[<($3, 500)])
+               +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], 
select=[id, name, amount, price, fact_date_sk, dim_date_sk, sum_price], 
build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- RuntimeFilter(select=[fact_date_sk], 
estimatedFilterRatio=[0.99993896484375])
+:     :- Exchange(distribution=[broadcast])
+:     :  +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[436907])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[436907])
+:     :           +- Calc(select=[price, dim_date_sk], where=[<(price, 500)])
+:     :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[], project=[price, dim_date_sk], metadata=[]]], fields=[price, 
dim_date_sk])
+:     +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
++- Calc(select=[dim_date_sk, +($f1, 1) AS sum_price])
+   +- HashAggregate(isMerge=[false], groupBy=[dim_date_sk], 
select=[dim_date_sk, SUM(price) AS $f1])
+      +- Exchange(distribution=[hash[dim_date_sk]])
+         +- Calc(select=[price, dim_date_sk], where=[<(price, 500)])
+            +- TableSourceScan(table=[[testCatalog, test_database, dim, 
filter=[], project=[price, dim_date_sk], metadata=[]]], fields=[price, 
dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin], 
where=[(fact_date_sk = dim_date_sk)], select=[id, name, amount, price, 
fact_date_sk, dim_date_sk, sum_price], build=[right])\n:- [#1] 
Exchange(distribution=[hash[fact_date_sk]])\n+- Calc(select=[dim_date_sk, ($f1 
+ 1) AS sum_price])\n   +- HashAggregate(isMerge=[false], 
groupBy=[dim_date_sk], select=[dim_date_sk, SUM(price) AS $f1])\n      +- [#2] 
Exchange(distribution=[hash[dim_date_sk]])\n])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- MultipleInput(readOrder=[0,1], 
members=[\nRuntimeFilter(select=[fact_date_sk], 
estimatedFilterRatio=[0.99993896484375])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, 
amount, price, fact_date_sk])\n])
+:     :- Exchange(distribution=[broadcast])
+:     :  +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[436907])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[436907])
+:     :           +- Calc(select=[price, dim_date_sk], where=[(price < 
500)])(reuse_id=[1])
+:     :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[], project=[price, dim_date_sk], metadata=[]]], fields=[price, 
dim_date_sk])
+:     +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[dim_date_sk]])
+   +- Reused(reference_id=[1])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testCannotInjectMoreThanOneRuntimeFilterInSamePlace">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
id0=[$5], male=[$6], amount0=[$7], price0=[$8], dim_date_sk=[$9], id1=[$10], 
amount1=[$11], price1=[$12])
++- LogicalFilter(condition=[AND(=($2, $7), =($2, $11), <($8, 500))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim2]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(amount, amount1)], select=[id, name, 
amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk, id1, 
amount1, price1], build=[right])
+:- HashJoin(joinType=[InnerJoin], where=[=(amount, amount0)], select=[id, 
name, amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], 
build=[right])
+:  :- Exchange(distribution=[hash[amount]])
+:  :  +- RuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])
+:  :     :- Exchange(distribution=[broadcast])
+:  :     :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:  :     :     +- Exchange(distribution=[single])
+:  :     :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:  :     :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+:  :     :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:  :     +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
+:  +- Exchange(distribution=[hash[amount]])
+:     +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+:        +- TableSourceScan(table=[[testCatalog, test_database, dim, 
filter=[]]], fields=[id, male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- TableSourceScan(table=[[testCatalog, test_database, dim2]], fields=[id, 
amount, price])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], 
where=[(amount = amount1)], select=[id, name, amount, price, fact_date_sk, id0, 
male, amount0, price0, dim_date_sk, id1, amount1, price1], build=[right])\n:- 
HashJoin(joinType=[InnerJoin], where=[(amount = amount0)], select=[id, name, 
amount, price, fact_date_sk, id0, male, amount0, price0, dim_date_sk], 
build=[right])\n:  :- [#2] Exchange(distribution=[hash[amount]])\n:  +- [#3] 
Exchange(distribution=[hash[amount]])\n+- [#1] 
Exchange(distribution=[hash[amount]])\n])
+:- Exchange(distribution=[hash[amount]])
+:  +- TableSourceScan(table=[[testCatalog, test_database, dim2]], fields=[id, 
amount, price])
+:- Exchange(distribution=[hash[amount]])
+:  +- MultipleInput(readOrder=[0,1], members=[\nRuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, 
amount, price, fact_date_sk])\n])
+:     :- Exchange(distribution=[broadcast])
+:     :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[(price < 500)])(reuse_id=[1])
+:     :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:     +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- Reused(reference_id=[1])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testPushDownProbeSideWithCalc">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], male=[$1], amount=[$2], price=[$3], dim_date_sk=[$4], 
id0=[$5], name=[$6], amount0=[$7], price0=[$8], fact_date_sk=[$9])
++- LogicalFilter(condition=[AND(=($2, $7), <($3, 500), >($8, 600))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(amount, amount0)], select=[id, male, 
amount, price, dim_date_sk, id0, name, amount0, price0, fact_date_sk], 
build=[left])
+:- Exchange(distribution=[hash[amount]])
+:  +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+:     +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- Calc(select=[id, name, amount, price, fact_date_sk], where=[>(price, 
600)])
+      +- RuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])
+         :- Exchange(distribution=[broadcast])
+         :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+         :     +- Exchange(distribution=[single])
+         :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+         :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+         :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+         +- TableSourceScan(table=[[testCatalog, test_database, fact, 
filter=[]]], fields=[id, name, amount, price, fact_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[(amount = amount0)], select=[id, male, 
amount, price, dim_date_sk, id0, name, amount0, price0, fact_date_sk], 
build=[left])
+:- Exchange(distribution=[hash[amount]])
+:  +- Calc(select=[id, male, amount, price, dim_date_sk], where=[(price < 
500)])(reuse_id=[1])
+:     +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- Calc(select=[id, name, amount, price, fact_date_sk], where=[(price > 
600)])
+      +- MultipleInput(readOrder=[0,1], 
members=[\nRuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact, filter=[]]], 
fields=[id, name, amount, price, fact_date_sk])\n])
+         :- Exchange(distribution=[broadcast])
+         :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+         :     +- Exchange(distribution=[single])
+         :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+         :           +- Reused(reference_id=[1])
+         +- TableSourceScan(table=[[testCatalog, test_database, fact, 
filter=[]]], fields=[id, name, amount, price, fact_date_sk])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testCannotPushDownProbeSideWithCalc">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], male=[$1], amount=[$2], price=[$3], dim_date_sk=[$4], 
fact_date_sk=[$5], random=[$6])
++- LogicalProject(id=[$0], male=[$1], amount=[$2], price=[$3], 
dim_date_sk=[$4], fact_date_sk=[$6], random=[$7])
+   +- LogicalJoin(condition=[AND(=($5, $7), <($3, 500))], joinType=[inner])
+      :- LogicalProject(id=[$0], male=[$1], amount=[$2], price=[$3], 
dim_date_sk=[$4], amount0=[CAST($2):DOUBLE])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+      +- LogicalProject(fact_date_sk=[$4], random=[RAND(10)])
+         +- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+Calc(select=[id, male, amount, price, dim_date_sk, fact_date_sk, random])
++- HashJoin(joinType=[InnerJoin], where=[=(amount0, random)], select=[id, 
male, amount, price, dim_date_sk, amount0, fact_date_sk, random], build=[left])
+   :- Exchange(distribution=[hash[amount0]])
+   :  +- Calc(select=[id, male, amount, price, dim_date_sk, CAST(amount AS 
DOUBLE) AS amount0], where=[<(price, 500)])
+   :     +- TableSourceScan(table=[[testCatalog, test_database, dim, 
filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+   +- Exchange(distribution=[hash[random]])
+      +- RuntimeFilter(select=[random], 
estimatedFilterRatio=[0.99993896484375])
+         :- Exchange(distribution=[broadcast])
+         :  +- GlobalRuntimeFilterBuilder(select=[amount0], 
estimatedRowCount=[65536], maxRowCount=[187246])
+         :     +- Exchange(distribution=[single])
+         :        +- LocalRuntimeFilterBuilder(select=[amount0], 
estimatedRowCount=[65536], maxRowCount=[187246])
+         :           +- Calc(select=[id, male, amount, price, dim_date_sk, 
CAST(amount AS DOUBLE) AS amount0], where=[<(price, 500)])
+         :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+         +- Calc(select=[fact_date_sk, RAND(10) AS random])
+            +- TableSourceScan(table=[[testCatalog, test_database, fact, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+Calc(select=[id, male, amount, price, dim_date_sk, fact_date_sk, random])
++- HashJoin(joinType=[InnerJoin], where=[(amount0 = random)], select=[id, 
male, amount, price, dim_date_sk, amount0, fact_date_sk, random], build=[left])
+   :- Exchange(distribution=[hash[amount0]])
+   :  +- Calc(select=[id, male, amount, price, dim_date_sk, CAST(amount AS 
DOUBLE) AS amount0], where=[(price < 500)])(reuse_id=[1])
+   :     +- TableSourceScan(table=[[testCatalog, test_database, dim, 
filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+   +- Exchange(distribution=[hash[random]])
+      +- MultipleInput(readOrder=[0,1], 
members=[\nRuntimeFilter(select=[random], 
estimatedFilterRatio=[0.99993896484375])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- Calc(select=[fact_date_sk, RAND(10) AS 
random])\n   +- [#2] TableSourceScan(table=[[testCatalog, test_database, fact, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+         :- Exchange(distribution=[broadcast])
+         :  +- GlobalRuntimeFilterBuilder(select=[amount0], 
estimatedRowCount=[65536], maxRowCount=[187246])
+         :     +- Exchange(distribution=[single])
+         :        +- LocalRuntimeFilterBuilder(select=[amount0], 
estimatedRowCount=[65536], maxRowCount=[187246])
+         :           +- Reused(reference_id=[1])
+         +- TableSourceScan(table=[[testCatalog, test_database, fact, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])
+]]>
+               </Resource>
+       </TestCase>
+
+
+       <TestCase name="testPushDownProbeSideToAllInputsOfJoin">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], 
price1=[$11], dim_date_sk=[$12])
++- LogicalFilter(condition=[AND(=($2, $6), =($2, $10), <($11, 500))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, fact2]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(amount, amount1)], select=[id, name, 
amount, price, fact_date_sk, id0, amount0, price0, id1, male, amount1, price1, 
dim_date_sk], build=[right])
+:- HashJoin(joinType=[InnerJoin], where=[=(amount, amount0)], select=[id, 
name, amount, price, fact_date_sk, id0, amount0, price0], build=[right])
+:  :- Exchange(distribution=[hash[amount]])
+:  :  +- RuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])
+:  :     :- Exchange(distribution=[broadcast])
+:  :     :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:  :     :     +- Exchange(distribution=[single])
+:  :     :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:  :     :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+:  :     :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:  :     +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
+:  +- Exchange(distribution=[hash[amount]])
+:     +- RuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])
+:        :- Exchange(distribution=[broadcast])
+:        :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:        :     +- Exchange(distribution=[single])
+:        :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:        :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+:        :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:        +- TableSourceScan(table=[[testCatalog, test_database, fact2]], 
fields=[id, amount, price])
++- Exchange(distribution=[hash[amount]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[0,1,0], members=[\nHashJoin(joinType=[InnerJoin], 
where=[(amount = amount1)], select=[id, name, amount, price, fact_date_sk, id0, 
amount0, price0, id1, male, amount1, price1, dim_date_sk], build=[right])\n:- 
HashJoin(joinType=[InnerJoin], where=[(amount = amount0)], select=[id, name, 
amount, price, fact_date_sk, id0, amount0, price0], build=[right])\n:  :- [#2] 
Exchange(distribution=[hash[amount]])\n:  +- [#3] 
Exchange(distribution=[hash[amount]])\n+- [#1] 
Exchange(distribution=[hash[amount]])\n])
+:- Exchange(distribution=[hash[amount]])
+:  +- Calc(select=[id, male, amount, price, dim_date_sk], where=[(price < 
500)])(reuse_id=[1])
+:     +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+:- Exchange(distribution=[hash[amount]])
+:  +- MultipleInput(readOrder=[0,1], members=[\nRuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, 
amount, price, fact_date_sk])\n])
+:     :- Exchange(distribution=[broadcast])(reuse_id=[2])
+:     :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :           +- Reused(reference_id=[1])
+:     +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[amount]])
+   +- MultipleInput(readOrder=[0,1], members=[\nRuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact2]], fields=[id, 
amount, price])\n])
+      :- Reused(reference_id=[2])
+      +- TableSourceScan(table=[[testCatalog, test_database, fact2]], 
fields=[id, amount, price])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testPushDownProbeSideToOneInputOfJoin">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
id0=[$5], amount0=[$6], price0=[$7], id1=[$8], male=[$9], amount1=[$10], 
price1=[$11], dim_date_sk=[$12])
++- LogicalFilter(condition=[AND(=($3, $7), =($2, $10), <($11, 500))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalJoin(condition=[true], joinType=[inner])
+      :  :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+      :  +- LogicalTableScan(table=[[testCatalog, test_database, fact2]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(amount, amount1)], select=[id, name, 
amount, price, fact_date_sk, id0, amount0, price0, id1, male, amount1, price1, 
dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[amount]])
+:  +- HashJoin(joinType=[InnerJoin], where=[=(price, price0)], select=[id, 
name, amount, price, fact_date_sk, id0, amount0, price0], build=[right])
+:     :- Exchange(distribution=[hash[price]])
+:     :  +- RuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])
+:     :     :- Exchange(distribution=[broadcast])
+:     :     :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     :     +- Exchange(distribution=[single])
+:     :     :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+:     :     :              +- TableSourceScan(table=[[testCatalog, 
test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:     :     +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
+:     +- Exchange(distribution=[hash[price]])
+:        +- TableSourceScan(table=[[testCatalog, test_database, fact2]], 
fields=[id, amount, price])
++- Exchange(distribution=[hash[amount]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[(amount = amount1)], select=[id, name, 
amount, price, fact_date_sk, id0, amount0, price0, id1, male, amount1, price1, 
dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[amount]])
+:  +- HashJoin(joinType=[InnerJoin], where=[(price = price0)], select=[id, 
name, amount, price, fact_date_sk, id0, amount0, price0], build=[right])
+:     :- Exchange(distribution=[hash[price]])
+:     :  +- MultipleInput(readOrder=[0,1], 
members=[\nRuntimeFilter(select=[amount], 
estimatedFilterRatio=[0.99993896484375])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact]], fields=[id, name, 
amount, price, fact_date_sk])\n])
+:     :     :- Exchange(distribution=[broadcast])
+:     :     :  +- GlobalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     :     +- Exchange(distribution=[single])
+:     :     :        +- LocalRuntimeFilterBuilder(select=[amount], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[(price < 500)])(reuse_id=[1])
+:     :     :              +- TableSourceScan(table=[[testCatalog, 
test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:     :     +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
+:     +- Exchange(distribution=[hash[price]])
+:        +- TableSourceScan(table=[[testCatalog, test_database, fact2]], 
fields=[id, amount, price])
++- Exchange(distribution=[hash[amount]])
+   +- Reused(reference_id=[1])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testCannotPushDownProbeSideWithJoin">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], fact_date_sk=[$4], 
id2=[$5], amount2=[$6], price2=[$7], id0=[$8], male=[$9], amount0=[$10], 
price0=[$11], dim_date_sk=[$12])
++- LogicalJoin(condition=[AND(=($2, $10), =($7, $11), <($11, 500))], 
joinType=[inner])
+   :- LogicalProject(id=[$0], name=[$1], amount=[$2], price=[$3], 
fact_date_sk=[$4], id2=[$5], amount2=[$6], price2=[$7])
+   :  +- LogicalJoin(condition=[=($0, $5)], joinType=[inner])
+   :     :- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+   :     +- LogicalTableScan(table=[[testCatalog, test_database, fact2]])
+   +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[AND(=(amount, amount0), =(price2, 
price0))], select=[id, name, amount, price, fact_date_sk, id2, amount2, price2, 
id0, male, amount0, price0, dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[amount, price2]])
+:  +- RuntimeFilter(select=[amount, price2], 
estimatedFilterRatio=[0.9997287326388888])
+:     :- Exchange(distribution=[broadcast])
+:     :  +- GlobalRuntimeFilterBuilder(select=[amount, price], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[amount, price], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+:     :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:     +- HashJoin(joinType=[InnerJoin], where=[=(id, id2)], select=[id, name, 
amount, price, fact_date_sk, id2, amount2, price2], build=[right])
+:        :- Exchange(distribution=[hash[id]])
+:        :  +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
+:        +- Exchange(distribution=[hash[id2]])
+:           +- Calc(select=[id2, amount2, price2], where=[<(price2, 500)])
+:              +- TableSourceScan(table=[[testCatalog, test_database, fact2, 
filter=[]]], fields=[id2, amount2, price2])
++- Exchange(distribution=[hash[amount, price]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[((amount = amount0) AND (price2 = 
price0))], select=[id, name, amount, price, fact_date_sk, id2, amount2, price2, 
id0, male, amount0, price0, dim_date_sk], build=[right])
+:- Exchange(distribution=[hash[amount, price2]])
+:  +- MultipleInput(readOrder=[0,1,0], 
members=[\nRuntimeFilter(select=[amount, price2], 
estimatedFilterRatio=[0.9997287326388888])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- HashJoin(joinType=[InnerJoin], 
where=[(id = id2)], select=[id, name, amount, price, fact_date_sk, id2, 
amount2, price2], build=[right])\n   :- [#2] 
Exchange(distribution=[hash[id]])\n   +- [#3] 
Exchange(distribution=[hash[id2]])\n])
+:     :- Exchange(distribution=[broadcast])
+:     :  +- GlobalRuntimeFilterBuilder(select=[amount, price], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[amount, price], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[(price < 500)])(reuse_id=[1])
+:     :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:     :- Exchange(distribution=[hash[id]])
+:     :  +- TableSourceScan(table=[[testCatalog, test_database, fact]], 
fields=[id, name, amount, price, fact_date_sk])
+:     +- Exchange(distribution=[hash[id2]])
+:        +- Calc(select=[id2, amount2, price2], where=[(price2 < 500)])
+:           +- TableSourceScan(table=[[testCatalog, test_database, fact2, 
filter=[]]], fields=[id2, amount2, price2])
++- Exchange(distribution=[hash[amount, price]])
+   +- Reused(reference_id=[1])
+]]>
+               </Resource>
+       </TestCase>
+
+
+       <TestCase name="testPushDownProbeSideWithAgg">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], male=[$1], amount=[$2], price=[$3], dim_date_sk=[$4], 
id0=[$5], fact_date_sk=[$6], EXPR$2=[$7])
++- LogicalJoin(condition=[AND(=($4, $6), <($3, 500))], joinType=[inner])
+   :- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+   +- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)])
+      +- LogicalProject(id=[$0], fact_date_sk=[$4], price=[$3])
+         +- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(dim_date_sk, fact_date_sk)], 
select=[id, male, amount, price, dim_date_sk, id0, fact_date_sk, EXPR$2], 
build=[left])
+:- Exchange(distribution=[hash[dim_date_sk]])
+:  +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+:     +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[fact_date_sk]])
+   +- HashAggregate(isMerge=[false], groupBy=[id, fact_date_sk], select=[id, 
fact_date_sk, SUM(price) AS EXPR$2])
+      +- Exchange(distribution=[hash[id, fact_date_sk]])
+         +- RuntimeFilter(select=[fact_date_sk], 
estimatedFilterRatio=[0.9999999403953552])
+            :- Exchange(distribution=[broadcast])
+            :  +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[218454])
+            :     +- Exchange(distribution=[single])
+            :        +- LocalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[218454])
+            :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+            :              +- TableSourceScan(table=[[testCatalog, 
test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+            +- TableSourceScan(table=[[testCatalog, test_database, fact, 
project=[id, fact_date_sk, price], metadata=[]]], fields=[id, fact_date_sk, 
price])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[(dim_date_sk = fact_date_sk)], 
select=[id, male, amount, price, dim_date_sk, id0, fact_date_sk, EXPR$2], 
build=[left])
+:- Exchange(distribution=[hash[dim_date_sk]])
+:  +- Calc(select=[id, male, amount, price, dim_date_sk], where=[(price < 
500)])(reuse_id=[1])
+:     +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[fact_date_sk]])
+   +- HashAggregate(isMerge=[false], groupBy=[id, fact_date_sk], select=[id, 
fact_date_sk, SUM(price) AS EXPR$2])
+      +- Exchange(distribution=[hash[id, fact_date_sk]])
+         +- MultipleInput(readOrder=[0,1], 
members=[\nRuntimeFilter(select=[fact_date_sk], 
estimatedFilterRatio=[0.9999999403953552])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact, project=[id, 
fact_date_sk, price], metadata=[]]], fields=[id, fact_date_sk, price])\n])
+            :- Exchange(distribution=[broadcast])
+            :  +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[218454])
+            :     +- Exchange(distribution=[single])
+            :        +- LocalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[218454])
+            :           +- Reused(reference_id=[1])
+            +- TableSourceScan(table=[[testCatalog, test_database, fact, 
project=[id, fact_date_sk, price], metadata=[]]], fields=[id, fact_date_sk, 
price])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testCannotPushDownProbeSideWithAgg">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], male=[$1], amount=[$2], price=[$3], dim_date_sk=[$4], 
id0=[$5], fact_date_sk=[$6], sum_price=[$7])
++- LogicalJoin(condition=[AND(=($3, $7), <($3, 500))], joinType=[inner])
+   :- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+   +- LogicalAggregate(group=[{0, 1}], sum_price=[SUM($2)])
+      +- LogicalProject(id=[$0], fact_date_sk=[$4], price=[$3])
+         +- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(price, sum_price)], select=[id, male, 
amount, price, dim_date_sk, id0, fact_date_sk, sum_price], build=[left])
+:- Exchange(distribution=[hash[price]])
+:  +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+:     +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[sum_price]])
+   +- Calc(select=[id, fact_date_sk, sum_price], where=[<(sum_price, 500)])
+      +- RuntimeFilter(select=[sum_price], 
estimatedFilterRatio=[0.9999996711817046])
+         :- Exchange(distribution=[broadcast])
+         :  +- GlobalRuntimeFilterBuilder(select=[price], 
estimatedRowCount=[65536], maxRowCount=[218454])
+         :     +- Exchange(distribution=[single])
+         :        +- LocalRuntimeFilterBuilder(select=[price], 
estimatedRowCount=[65536], maxRowCount=[218454])
+         :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+         :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+         +- HashAggregate(isMerge=[false], groupBy=[id, fact_date_sk], 
select=[id, fact_date_sk, SUM(price) AS sum_price])
+            +- Exchange(distribution=[hash[id, fact_date_sk]])
+               +- TableSourceScan(table=[[testCatalog, test_database, fact, 
project=[id, fact_date_sk, price], metadata=[]]], fields=[id, fact_date_sk, 
price])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[(price = sum_price)], select=[id, male, 
amount, price, dim_date_sk, id0, fact_date_sk, sum_price], build=[left])
+:- Exchange(distribution=[hash[price]])
+:  +- Calc(select=[id, male, amount, price, dim_date_sk], where=[(price < 
500)])(reuse_id=[1])
+:     +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
++- Exchange(distribution=[hash[sum_price]])
+   +- Calc(select=[id, fact_date_sk, sum_price], where=[(sum_price < 500)])
+      +- MultipleInput(readOrder=[0,1], 
members=[\nRuntimeFilter(select=[sum_price], 
estimatedFilterRatio=[0.9999996711817046])\n:- [#1] 
Exchange(distribution=[broadcast])\n+- HashAggregate(isMerge=[false], 
groupBy=[id, fact_date_sk], select=[id, fact_date_sk, SUM(price) AS 
sum_price])\n   +- [#2] Exchange(distribution=[hash[id, fact_date_sk]])\n])
+         :- Exchange(distribution=[broadcast])
+         :  +- GlobalRuntimeFilterBuilder(select=[price], 
estimatedRowCount=[65536], maxRowCount=[218454])
+         :     +- Exchange(distribution=[single])
+         :        +- LocalRuntimeFilterBuilder(select=[price], 
estimatedRowCount=[65536], maxRowCount=[218454])
+         :           +- Reused(reference_id=[1])
+         +- Exchange(distribution=[hash[id, fact_date_sk]])
+            +- TableSourceScan(table=[[testCatalog, test_database, fact, 
project=[id, fact_date_sk, price], metadata=[]]], fields=[id, fact_date_sk, 
price])
+]]>
+               </Resource>
+       </TestCase>
+
+       <TestCase name="testPushDownProbeSideWithUnion">
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(id=[$0], fact_date_sk=[$1], amount1=[$2], id0=[$3], male=[$4], 
amount=[$5], price=[$6], dim_date_sk=[$7])
++- LogicalFilter(condition=[AND(=($1, $7), <($6, 500))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalUnion(all=[true])
+      :  :- LogicalProject(id=[$0], fact_date_sk=[$4], amount1=[$2])
+      :  :  +- LogicalFilter(condition=[<($3, 500)])
+      :  :     +- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+      :  +- LogicalProject(id=[$0], fact_date_sk=[$4], amount=[$2])
+      :     +- LogicalFilter(condition=[>($3, 600)])
+      :        +- LogicalTableScan(table=[[testCatalog, test_database, fact]])
+      +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized rel plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[=(fact_date_sk, dim_date_sk)], 
select=[id, fact_date_sk, amount1, id0, male, amount, price, dim_date_sk], 
build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- Union(all=[true], union=[id, fact_date_sk, amount1])
+:     :- Calc(select=[id, fact_date_sk, amount AS amount1], where=[<(price, 
500)])
+:     :  +- RuntimeFilter(select=[fact_date_sk], 
estimatedFilterRatio=[0.99993896484375])
+:     :     :- Exchange(distribution=[broadcast])
+:     :     :  +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     :     +- Exchange(distribution=[single])
+:     :     :        +- LocalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+:     :     :              +- TableSourceScan(table=[[testCatalog, 
test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:     :     +- TableSourceScan(table=[[testCatalog, test_database, fact, 
filter=[], project=[id, amount, price, fact_date_sk], metadata=[]]], 
fields=[id, amount, price, fact_date_sk])
+:     +- Calc(select=[id, fact_date_sk, amount], where=[>(price, 600)])
+:        +- RuntimeFilter(select=[fact_date_sk], 
estimatedFilterRatio=[0.99993896484375])
+:           :- Exchange(distribution=[broadcast])
+:           :  +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:           :     +- Exchange(distribution=[single])
+:           :        +- LocalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:           :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[<(price, 500)])
+:           :              +- TableSourceScan(table=[[testCatalog, 
test_database, dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:           +- TableSourceScan(table=[[testCatalog, test_database, fact, 
filter=[], project=[id, amount, price, fact_date_sk], metadata=[]]], 
fields=[id, amount, price, fact_date_sk])
++- Exchange(distribution=[hash[dim_date_sk]])
+   +- Calc(select=[id, male, amount, price, dim_date_sk], where=[<(price, 
500)])
+      +- TableSourceScan(table=[[testCatalog, test_database, dim, filter=[]]], 
fields=[id, male, amount, price, dim_date_sk])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], 
select=[id, fact_date_sk, amount1, id0, male, amount, price, dim_date_sk], 
build=[right])
+:- Exchange(distribution=[hash[fact_date_sk]])
+:  +- MultipleInput(readOrder=[0,1], members=[\nUnion(all=[true], union=[id, 
fact_date_sk, amount1])\n:- Calc(select=[id, fact_date_sk, amount AS amount1], 
where=[(price < 500)])\n:  +- RuntimeFilter(select=[fact_date_sk], 
estimatedFilterRatio=[0.99993896484375])(reuse_id=[1])\n:     :- [#1] 
Exchange(distribution=[broadcast])\n:     +- [#2] 
TableSourceScan(table=[[testCatalog, test_database, fact, filter=[], 
project=[id, amount, price, fact_date_sk], metadata=[]]], fields=[id, amount, 
price, fact_date_sk])\n+- Calc(select=[id, fact_date_sk, amount], where=[(price 
> 600)])\n   +- Reused(reference_id=[1])\n])
+:     :- Exchange(distribution=[broadcast])
+:     :  +- GlobalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :     +- Exchange(distribution=[single])
+:     :        +- LocalRuntimeFilterBuilder(select=[dim_date_sk], 
estimatedRowCount=[65536], maxRowCount=[218454])
+:     :           +- Calc(select=[id, male, amount, price, dim_date_sk], 
where=[(price < 500)])(reuse_id=[1])
+:     :              +- TableSourceScan(table=[[testCatalog, test_database, 
dim, filter=[]]], fields=[id, male, amount, price, dim_date_sk])
+:     +- TableSourceScan(table=[[testCatalog, test_database, fact, filter=[], 
project=[id, amount, price, fact_date_sk], metadata=[]]], fields=[id, amount, 
price, fact_date_sk])
++- Exchange(distribution=[hash[dim_date_sk]])
+   +- Reused(reference_id=[1])
+]]>
+               </Resource>
+       </TestCase>
+
+

Review Comment:
   Delelted



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/program/FlinkRuntimeFilterProgram.java:
##########
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize.program;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalGroupAggregateBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalHashJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSortMergeJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalGlobalRuntimeFilterBuilder;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalLocalRuntimeFilterBuilder;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.runtimefilter.BatchPhysicalRuntimeFilter;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.DefaultRelShuttle;
+import org.apache.flink.table.planner.plan.utils.FlinkRelMdUtil;
+import org.apache.flink.table.planner.plan.utils.JoinUtil;
+
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Union;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Planner program that tries to inject runtime filter for suitable join to 
improve join
+ * performance.
+ *
+ * <p>We build the runtime filter in a two-phase manner: First, each subtask 
on the build side
+ * builds a local filter based on its local data, and sends the built filter 
to a global aggregation
+ * node. Then the global aggregation node aggregates the received filters into 
a global filter, and
+ * sends the global filter to all probe side subtasks. Therefore, we will add 
{@link
+ * BatchPhysicalLocalRuntimeFilterBuilder}, {@link 
BatchPhysicalGlobalRuntimeFilterBuilder} and
+ * {@link BatchPhysicalRuntimeFilter} into the physical plan.
+ *
+ * <p>For example, for the following query:
+ *
+ * <pre>{@code SELECT * FROM fact, dim WHERE x = a AND z = 2}</pre>
+ *
+ * <p>The original physical plan:
+ *
+ * <pre>{@code
+ * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z])
+ * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], 
build=[right])
+ *    :- Exchange(distribution=[hash[a]])
+ *    :  +- TableSourceScan(table=[[fact]], fields=[a, b, c])
+ *    +- Exchange(distribution=[hash[x]])
+ *       +- Calc(select=[x, y], where=[=(z, 2)])
+ *          +- TableSourceScan(table=[[dim, filter=[]]], fields=[x, y, z])
+ * }</pre>
+ *
+ * <p>This optimized physical plan:
+ *
+ * <pre>{@code
+ * Calc(select=[a, b, c, x, y, CAST(2 AS BIGINT) AS z])
+ * +- HashJoin(joinType=[InnerJoin], where=[=(x, a)], select=[a, b, c, x, y], 
build=[right])
+ *    :- Exchange(distribution=[hash[a]])
+ *    :  +- RuntimeFilter(select=[a])
+ *    :     :- Exchange(distribution=[broadcast])
+ *    :     :  +- GlobalRuntimeFilterBuilder
+ *    :     :     +- Exchange(distribution=[single])
+ *    :     :        +- LocalRuntimeFilterBuilder(select=[x])
+ *    :     :           +- Calc(select=[x, y], where=[=(z, 2)])
+ *    :     :              +- TableSourceScan(table=[[dim, filter=[]]], 
fields=[x, y, z])
+ *    :     +- TableSourceScan(table=[[fact]], fields=[a, b, c])
+ *    +- Exchange(distribution=[hash[x]])
+ *       +- Calc(select=[x, y], where=[=(z, 2)])
+ *          +- TableSourceScan(table=[[dim, filter=[]]], fields=[x, y, z])
+ *
+ * }</pre>
+ */
+public class FlinkRuntimeFilterProgram implements 
FlinkOptimizeProgram<BatchOptimizeContext> {
+
+    @Override
+    public RelNode optimize(RelNode root, BatchOptimizeContext context) {
+        if (!isRuntimeFilterEnabled(root)) {
+            return root;
+        }
+
+        // To avoid that one side can be used both as a build side and as a 
probe side
+        checkState(
+                getMinProbeDataSize(root) > getMaxBuildDataSize(root),
+                "The min probe data size should be larger than the max build 
data size.");
+
+        DefaultRelShuttle shuttle =
+                new DefaultRelShuttle() {
+                    @Override
+                    public RelNode visit(RelNode rel) {
+                        if (!(rel instanceof Join)) {
+                            List<RelNode> newInputs = new ArrayList<>();
+                            for (RelNode input : rel.getInputs()) {
+                                RelNode newInput = input.accept(this);
+                                newInputs.add(newInput);
+                            }
+                            return rel.copy(rel.getTraitSet(), newInputs);
+                        }
+
+                        Join join = (Join) rel;
+                        RelNode newLeft = join.getLeft().accept(this);
+                        RelNode newRight = join.getRight().accept(this);
+
+                        return tryInjectRuntimeFilter(
+                                join.copy(join.getTraitSet(), 
Arrays.asList(newLeft, newRight)));
+                    }
+                };
+        return shuttle.visit(root);
+    }
+
+    /**
+     * Judge whether the join is suitable, and try to inject runtime filter 
for it.
+     *
+     * @param join the join node
+     * @return the new join node with runtime filter.
+     */
+    private static Join tryInjectRuntimeFilter(Join join) {
+
+        // check supported join type
+        if (join.getJoinType() != JoinRelType.INNER
+                && join.getJoinType() != JoinRelType.SEMI
+                && join.getJoinType() != JoinRelType.LEFT
+                && join.getJoinType() != JoinRelType.RIGHT) {
+            return join;
+        }
+
+        // check supported join implementation
+        if (!(join instanceof BatchPhysicalHashJoin)
+                && !(join instanceof BatchPhysicalSortMergeJoin)) {
+            return join;
+        }
+
+        boolean leftIsBuild;
+        if (canBeProbeSide(join.getLeft())) {
+            leftIsBuild = false;
+        } else if (canBeProbeSide(join.getRight())) {
+            leftIsBuild = true;
+        } else {
+            return join;
+        }
+
+        // check left join + left build
+        if (join.getJoinType() == JoinRelType.LEFT && !leftIsBuild) {
+            return join;
+        }
+
+        // check right join + right build
+        if (join.getJoinType() == JoinRelType.RIGHT && leftIsBuild) {
+            return join;
+        }
+
+        JoinInfo joinInfo = join.analyzeCondition();
+        RelNode buildSide;
+        RelNode probeSide;
+        ImmutableIntList buildIndices;
+        ImmutableIntList probeIndices;
+        if (leftIsBuild) {
+            buildSide = join.getLeft();
+            probeSide = join.getRight();
+            buildIndices = joinInfo.leftKeys;
+            probeIndices = joinInfo.rightKeys;
+        } else {
+            buildSide = join.getRight();
+            probeSide = join.getLeft();
+            buildIndices = joinInfo.rightKeys;
+            probeIndices = joinInfo.leftKeys;
+        }
+
+        Optional<BuildSideInfo> suitableBuildOpt =
+                findSuitableBuildSide(
+                        buildSide,
+                        buildIndices,
+                        (build, indices) ->
+                                isSuitableDataSize(build, probeSide, indices, 
probeIndices));
+
+        if (suitableBuildOpt.isPresent()) {
+            BuildSideInfo suitableBuildInfo = suitableBuildOpt.get();
+            RelNode newProbe =
+                    tryPushDownProbeAndInjectRuntimeFilter(
+                            probeSide, probeIndices, suitableBuildInfo);
+            if (leftIsBuild) {
+                return join.copy(join.getTraitSet(), Arrays.asList(buildSide, 
newProbe));
+            } else {
+                return join.copy(join.getTraitSet(), Arrays.asList(newProbe, 
buildSide));
+            }
+        }
+
+        return join;
+    }
+
+    /**
+     * Inject runtime filter and return the new probe side (without exchange).
+     *
+     * @param buildSide the build side
+     * @param probeSide the probe side
+     * @param buildIndices the build projection
+     * @param probeIndices the probe projection
+     * @return the new probe side
+     */
+    private static RelNode createNewProbeWithRuntimeFilter(
+            RelNode buildSide,
+            RelNode probeSide,
+            ImmutableIntList buildIndices,
+            ImmutableIntList probeIndices) {
+        Optional<Double> buildRowCountOpt = getEstimatedRowCount(buildSide);
+        checkState(buildRowCountOpt.isPresent());
+        int buildRowCount = buildRowCountOpt.get().intValue();
+        int maxRowCount =
+                (int)
+                        Math.ceil(
+                                getMaxBuildDataSize(buildSide)
+                                        / 
FlinkRelMdUtil.binaryRowAverageSize(buildSide));
+        double filterRatio = computeFilterRatio(buildSide, probeSide, 
buildIndices, probeIndices);
+
+        String[] buildFiledNames =
+                buildIndices.stream()
+                        .map(buildSide.getRowType().getFieldNames()::get)
+                        .toArray(String[]::new);
+        RelNode localBuilder =
+                new BatchPhysicalLocalRuntimeFilterBuilder(
+                        buildSide.getCluster(),
+                        buildSide.getTraitSet(),
+                        buildSide,
+                        buildIndices.toIntArray(),
+                        buildFiledNames,
+                        buildRowCount,
+                        maxRowCount);
+        RelNode globalBuilder =
+                new BatchPhysicalGlobalRuntimeFilterBuilder(
+                        localBuilder.getCluster(),
+                        localBuilder.getTraitSet(),
+                        createExchange(localBuilder, 
FlinkRelDistribution.SINGLETON()),
+                        buildFiledNames,
+                        buildRowCount,
+                        maxRowCount);
+        RelNode runtimeFilter =
+                new BatchPhysicalRuntimeFilter(
+                        probeSide.getCluster(),
+                        probeSide.getTraitSet(),
+                        createExchange(globalBuilder, 
FlinkRelDistribution.BROADCAST_DISTRIBUTED()),
+                        probeSide,
+                        probeIndices.toIntArray(),
+                        filterRatio);
+
+        return runtimeFilter;
+    }
+
+    /**
+     * Find a suitable build side. In order not to affect MultiInput, when the 
original build side
+     * of runtime filter is not an {@link Exchange}, we need to push down the 
builder, until we find
+     * an exchange and inject the builder there.
+     *
+     * @param rel the original build side
+     * @param buildIndices build indices
+     * @param buildSideChecker check whether current build side is suitable
+     * @return An optional info of the suitable build side.It will be empty if 
we cannot find the
+     *     suitable build side.
+     */
+    private static Optional<BuildSideInfo> findSuitableBuildSide(
+            RelNode rel,
+            ImmutableIntList buildIndices,
+            BiFunction<RelNode, ImmutableIntList, Boolean> buildSideChecker) {
+        if (rel instanceof Exchange) {
+            // found the desired exchange, inject builder here
+            Exchange exchange = (Exchange) rel;
+            if (!(exchange.getInput() instanceof BatchPhysicalRuntimeFilter)
+                    && buildSideChecker.apply(exchange.getInput(), 
buildIndices)) {
+                return Optional.of(new BuildSideInfo(exchange.getInput(), 
buildIndices));
+            }
+        } else if (rel instanceof BatchPhysicalRuntimeFilter) {
+            // runtime filter should not as build side
+            return Optional.empty();
+        } else if (rel instanceof Calc) {
+            // try to push the builder to input of projection
+            Calc calc = ((Calc) rel);
+            RexProgram program = calc.getProgram();
+            List<RexNode> projects =
+                    program.getProjectList().stream()
+                            .map(program::expandLocalRef)
+                            .collect(Collectors.toList());
+            ImmutableIntList inputIndices = getInputIndices(projects, 
buildIndices);
+            if (inputIndices.isEmpty()) {
+                return Optional.empty();
+            }
+            return findSuitableBuildSide(calc.getInput(), inputIndices, 
buildSideChecker);
+
+        } else if (rel instanceof Join) {
+            // try to push the builder to one input of join
+            Join join = (Join) rel;
+            if (!(join.getLeft() instanceof Exchange) && !(join.getRight() 
instanceof Exchange)) {
+                return Optional.empty();
+            }
+
+            Tuple2<ImmutableIntList, ImmutableIntList> tuple2 = 
getInputIndices(join, buildIndices);
+            ImmutableIntList leftIndices = tuple2.f0;
+            ImmutableIntList rightIndices = tuple2.f1;
+
+            if (leftIndices.isEmpty() && rightIndices.isEmpty()) {
+                return Optional.empty();
+            }
+
+            boolean firstCheckLeft = !leftIndices.isEmpty() && join.getLeft() 
instanceof Exchange;
+            Optional<BuildSideInfo> buildSideInfoOpt = Optional.empty();
+            if (firstCheckLeft) {
+                buildSideInfoOpt =
+                        findSuitableBuildSide(join.getLeft(), leftIndices, 
buildSideChecker);
+                if (!buildSideInfoOpt.isPresent() && !rightIndices.isEmpty()) {
+                    buildSideInfoOpt =
+                            findSuitableBuildSide(join.getRight(), 
rightIndices, buildSideChecker);
+                }
+                return buildSideInfoOpt;
+            } else {
+                if (!rightIndices.isEmpty()) {
+                    buildSideInfoOpt =
+                            findSuitableBuildSide(join.getRight(), 
rightIndices, buildSideChecker);
+                    if (!buildSideInfoOpt.isPresent() && 
!leftIndices.isEmpty()) {
+                        buildSideInfoOpt =
+                                findSuitableBuildSide(
+                                        join.getLeft(), leftIndices, 
buildSideChecker);
+                    }
+                }
+            }
+            return buildSideInfoOpt;
+        } else if (rel instanceof BatchPhysicalGroupAggregateBase) {
+            // try to push the builder to input of agg, iff the indices are 
all in grouping keys.
+            BatchPhysicalGroupAggregateBase agg = 
(BatchPhysicalGroupAggregateBase) rel;
+            int[] grouping = agg.grouping();
+
+            for (int k : buildIndices) {
+                if (k >= grouping.length) {
+                    return Optional.empty();
+                }
+            }
+
+            return findSuitableBuildSide(
+                    agg.getInput(),
+                    ImmutableIntList.copyOf(
+                            buildIndices.stream()
+                                    .map(index -> agg.grouping()[index])
+                                    .collect(Collectors.toList())),
+                    buildSideChecker);
+
+        } else {
+            // the above cases can cover all cases of TPC-DS test
+            // we may find more cases later
+        }
+
+        return Optional.empty();
+    }
+
+    /**
+     * Try to push down the probe side of runtime filter, and inject the 
runtime filter.
+     *
+     * @param rel the original probe side
+     * @param probeIndices the probe indices
+     * @param buildSideInfo the build side info
+     * @return the new probe side wit runtime filter
+     */
+    private static RelNode tryPushDownProbeAndInjectRuntimeFilter(
+            RelNode rel, ImmutableIntList probeIndices, BuildSideInfo 
buildSideInfo) {
+        if (rel instanceof BatchPhysicalRuntimeFilter) {
+            // do nothing, return current probe side directly. Because we 
don't inject more than
+            // once runtime filter at the same place
+            return rel;
+        } else if (rel instanceof Exchange) {
+            // try to push the probe side to the input of exchange
+            Exchange exchange = (Exchange) rel;
+            return exchange.copy(
+                    exchange.getTraitSet(),
+                    Collections.singletonList(
+                            tryPushDownProbeAndInjectRuntimeFilter(
+                                    exchange.getInput(), probeIndices, 
buildSideInfo)));
+        } else if (rel instanceof Calc) {
+            // try to push the probe side to the input of projection
+            Calc calc = ((Calc) rel);
+            RexProgram program = calc.getProgram();
+            List<RexNode> projects =
+                    program.getProjectList().stream()
+                            .map(program::expandLocalRef)
+                            .collect(Collectors.toList());
+            ImmutableIntList inputIndices = getInputIndices(projects, 
probeIndices);
+            if (!inputIndices.isEmpty()) {
+                return calc.copy(
+                        calc.getTraitSet(),
+                        Collections.singletonList(
+                                tryPushDownProbeAndInjectRuntimeFilter(
+                                        calc.getInput(), inputIndices, 
buildSideInfo)));
+            }
+        } else if (rel instanceof Join) {
+            // try to push the probe side to the all inputs of join
+            Join join = (Join) rel;
+            Tuple2<ImmutableIntList, ImmutableIntList> tuple2 = 
getInputIndices(join, probeIndices);
+            ImmutableIntList leftIndices = tuple2.f0;
+            ImmutableIntList rightIndices = tuple2.f1;
+
+            if (!leftIndices.isEmpty() || !rightIndices.isEmpty()) {
+                RelNode leftSide = join.getLeft();
+                RelNode rightSide = join.getRight();
+
+                if (!leftIndices.isEmpty()) {
+                    leftSide =
+                            tryPushDownProbeAndInjectRuntimeFilter(
+                                    leftSide, leftIndices, buildSideInfo);
+                }
+
+                if (!rightIndices.isEmpty()) {
+                    rightSide =
+                            tryPushDownProbeAndInjectRuntimeFilter(
+                                    rightSide, rightIndices, buildSideInfo);
+                }
+
+                return join.copy(join.getTraitSet(), Arrays.asList(leftSide, 
rightSide));
+            }
+        } else if (rel instanceof BatchPhysicalGroupAggregateBase) {
+            // try to push the probe side to input of agg, iff the indices are 
all in grouping keys.

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to