This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f76cd88f16d [FLINK-38554][table] Fix rowCount cost for
FlinkLogicalMultiJoin
f76cd88f16d is described below
commit f76cd88f16ded975a067025c07e274d657d3fea7
Author: Gustavo de Morais <[email protected]>
AuthorDate: Fri Oct 24 11:57:15 2025 +0200
[FLINK-38554][table] Fix rowCount cost for FlinkLogicalMultiJoin
---
.../plan/nodes/logical/FlinkLogicalMultiJoin.java | 2 +-
.../planner/plan/stream/sql/MultiJoinTest.java | 39 ++++++++++++++
.../planner/plan/stream/sql/MultiJoinTest.xml | 62 ++++++++++++++++++++++
3 files changed, 102 insertions(+), 1 deletion(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMultiJoin.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMultiJoin.java
index fb41fe8dbab..f9cff7f951e 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMultiJoin.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalMultiJoin.java
@@ -155,7 +155,7 @@ public class FlinkLogicalMultiJoin extends AbstractRelNode
implements FlinkLogic
final Double averageRowSize = mq.getAverageRowSize(input);
final double dAverageRowSize = averageRowSize == null ? 100.0 :
averageRowSize;
- rowCount *= inputRowCount;
+ rowCount += inputRowCount;
cpu += inputRowCount;
io += inputRowCount * dAverageRowSize;
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
index ece24f9d95c..a832696fbda 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java
@@ -230,6 +230,45 @@ public class MultiJoinTest extends TableTestBase {
+ "LEFT JOIN Payments p ON u.user_id_0 = p.user_id_2");
}
+ @Test
+ void testTwoWayJoinWithUnion() {
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE Orders2 ("
+ + " order_id STRING PRIMARY KEY NOT ENFORCED,"
+ + " user_id_1 STRING,"
+ + " product STRING"
+ + ") WITH ('connector' = 'values',
'changelog-mode' = 'I,D')");
+
+ util.verifyRelPlan(
+ "WITH OrdersUnion as ("
+ + "SELECT * FROM Orders "
+ + "UNION ALL "
+ + "SELECT * FROM Orders2"
+ + ") "
+ + "SELECT * FROM OrdersUnion o "
+ + "LEFT JOIN Users u "
+ + "ON o.user_id_1 = u.user_id_0");
+ }
+
+ @Test
+ void testTwoWayJoinWithRank() {
+ util.getTableEnv()
+ .getConfig()
+
.set(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true);
+
+ util.verifyRelPlan(
+ "WITH JoinedEvents as ("
+ + "SELECT e1.id as id, e1.val, e1.rowtime as
`rowtime`, e2.price "
+ + "FROM EventTable1 e1 "
+ + "JOIN EventTable2 e2 ON e1.id = e2.id) "
+ + "SELECT id, val, `rowtime` FROM ("
+ + "SELECT *, "
+ + "ROW_NUMBER() OVER (PARTITION BY id ORDER BY
`rowtime` DESC) as ts "
+ + "FROM JoinedEvents) "
+ + "WHERE ts = 1");
+ }
+
@Test
void testFourWayComplexJoinRelPlan() {
util.verifyRelPlan(
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
index f9568b3b6e5..149ed09e911 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml
@@ -946,6 +946,68 @@ Calc(select=[user_id_0, CAST('Gus' AS VARCHAR(2147483647))
AS name, order_id, CA
+- Exchange(distribution=[hash[user_id_2]])
+- Calc(select=[payment_id, user_id_2], where=[(price > 10)])
+- TableSourceScan(table=[[default_catalog, default_database,
Payments, filter=[]]], fields=[payment_id, price, user_id_2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTwoWayJoinWithUnion">
+ <Resource name="sql">
+ <![CDATA[WITH OrdersUnion as (SELECT * FROM Orders UNION ALL SELECT *
FROM Orders2) SELECT * FROM OrdersUnion o LEFT JOIN Users u ON o.user_id_1 =
u.user_id_0]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(order_id=[$0], user_id_1=[$1], product=[$2], user_id_0=[$3],
name=[$4], cash=[$5])
++- LogicalJoin(condition=[=($1, $3)], joinType=[left])
+ :- LogicalUnion(all=[true])
+ : :- LogicalProject(order_id=[$0], user_id_1=[$1], product=[$2])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database,
Orders]])
+ : +- LogicalProject(order_id=[$0], user_id_1=[$1], product=[$2])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
Orders2]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, Users]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+MultiJoin(commonJoinKey=[user_id_1], joinTypes=[LEFT],
inputUniqueKeys=[noUniqueKey, (user_id_0)], joinConditions=[=(user_id_1,
user_id_0)], select=[order_id,user_id_1,product,user_id_0,name,cash],
rowType=[RecordType(VARCHAR(2147483647) order_id, VARCHAR(2147483647)
user_id_1, VARCHAR(2147483647) product, VARCHAR(2147483647) user_id_0,
VARCHAR(2147483647) name, INTEGER cash)])
+:- Exchange(distribution=[hash[user_id_1]])
+: +- Union(all=[true], union=[order_id, user_id_1, product])
+: :- TableSourceScan(table=[[default_catalog, default_database, Orders]],
fields=[order_id, user_id_1, product])
+: +- TableSourceScan(table=[[default_catalog, default_database, Orders2]],
fields=[order_id, user_id_1, product])
++- Exchange(distribution=[hash[user_id_0]])
+ +- TableSourceScan(table=[[default_catalog, default_database, Users]],
fields=[user_id_0, name, cash])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testTwoWayJoinWithRank">
+ <Resource name="sql">
+ <![CDATA[WITH JoinedEvents as (SELECT e1.id as id, e1.val, e1.rowtime as
`rowtime`, e2.price FROM EventTable1 e1 JOIN EventTable2 e2 ON e1.id = e2.id)
SELECT id, val, `rowtime` FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY id
ORDER BY `rowtime` DESC) as ts FROM JoinedEvents) WHERE ts = 1]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(id=[$0], val=[$1], rowtime=[$2])
++- LogicalFilter(condition=[=($4, 1)])
+ +- LogicalProject(id=[$0], val=[$1], rowtime=[$2], price=[$3],
ts=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $2 DESC NULLS LAST)])
+ +- LogicalProject(id=[$0], val=[$1], rowtime=[$2], price=[$4])
+ +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+ :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($2,
5000:INTERVAL SECOND)])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
EventTable1]])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($2,
5000:INTERVAL SECOND)])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
EventTable2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER],
rankRange=[rankStart=1, rankEnd=1], partitionBy=[id], orderBy=[rowtime DESC],
select=[id, val, rowtime])
++- Exchange(distribution=[hash[id]])
+ +- Calc(select=[id, val, rowtime])
+ +- MultiJoin(commonJoinKey=[id], joinTypes=[INNER],
inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[=(id, id0)],
select=[id,val,rowtime,id0,price,rowtime0],
rowType=[RecordType(VARCHAR(2147483647) id, INTEGER val, TIMESTAMP(3) rowtime,
VARCHAR(2147483647) id0, DOUBLE price, TIMESTAMP(3) rowtime0)])
+ :- Exchange(distribution=[hash[id]])
+ : +- Calc(select=[id, val, CAST(rowtime AS TIMESTAMP(3)) AS rowtime])
+ : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
5000:INTERVAL SECOND)])
+ : +- TableSourceScan(table=[[default_catalog,
default_database, EventTable1]], fields=[id, val, rowtime])
+ +- Exchange(distribution=[hash[id]])
+ +- Calc(select=[id, price, CAST(rowtime AS TIMESTAMP(3)) AS
rowtime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
5000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog,
default_database, EventTable2]], fields=[id, price, rowtime])
]]>
</Resource>
</TestCase>