This is an automated email from the ASF dual-hosted git repository.
xuyangzhong pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.1 by this push:
new 39d20cf1991 [FLINK-38399][table-planner] Convert join to delta join if
join key contains one of the indexes in the source
39d20cf1991 is described below
commit 39d20cf199172153bd955233c22e7259b76c2f4b
Author: Xuyang <[email protected]>
AuthorDate: Fri Oct 24 11:20:23 2025 +0800
[FLINK-38399][table-planner] Convert join to delta join if join key
contains one of the indexes in the source
---
.../table/planner/plan/utils/DeltaJoinUtil.java | 17 +++++-----
.../planner/plan/stream/sql/DeltaJoinTest.xml | 33 +++++++++++++++++++
.../planner/plan/stream/sql/DeltaJoinTest.scala | 38 ++++++++++++++++++++++
3 files changed, 80 insertions(+), 8 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
index 8c6a9a8542f..d7dfb3d4035 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
@@ -276,14 +276,15 @@ public class DeltaJoinUtil {
// the source must have at least one index, and the join key contains
one index
Set<Integer> lookupKeysSet =
Arrays.stream(lookupKeys).boxed().collect(Collectors.toSet());
- for (int[] idxsOfIndex : idxsOfAllIndexes) {
- Preconditions.checkState(idxsOfIndex.length > 0);
-
- // ignore the field order of the index
- boolean containsIndex =
Arrays.stream(idxsOfIndex).allMatch(lookupKeysSet::contains);
- if (!containsIndex) {
- return false;
- }
+ boolean lookupKeyContainsOneIndex =
+ Arrays.stream(idxsOfAllIndexes)
+ .peek(idxsOfIndex ->
Preconditions.checkState(idxsOfIndex.length > 0))
+ .anyMatch(
+ idxsOfIndex ->
+ Arrays.stream(idxsOfIndex)
+
.allMatch(lookupKeysSet::contains));
+ if (!lookupKeyContainsOneIndex) {
+ return false;
}
// the lookup source must support async lookup
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
index d689955746b..d6d7054f731 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
@@ -346,6 +346,39 @@ Sink(table=[default_catalog.default_database.snk],
fields=[a0, a1, a2, a3, b0, b
: +- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a0, a1, a2, a3])
+- Exchange(distribution=[hash[b2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testMultiIndexesInSourceWhileJoinKeyContainsOneOfThem">
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2,
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[=($2, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
tmp_src11]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2,
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[=($2, $5)], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database,
tmp_src12]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0,
b2, b1])
++- Union(all=[true], union=[a0, a1, a2, a3, b0, b2, b1])
+ :- DeltaJoin(joinType=[InnerJoin], where=[=(a2, b2)], select=[a0, a1, a2,
a3, b0, b2, b1])
+ : :- Exchange(distribution=[hash[a2]])
+ : : +- TableSourceScan(table=[[default_catalog, default_database,
tmp_src11]], fields=[a0, a1, a2, a3])
+ : +- Exchange(distribution=[hash[b2]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
+ +- DeltaJoin(joinType=[InnerJoin], where=[=(a2, b2)], select=[a0, a1, a2,
a3, b0, b2, b1])
+ :- Exchange(distribution=[hash[a2]])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
tmp_src12]], fields=[a0, a1, a2, a3])
+ +- Exchange(distribution=[hash[b2]])
+ +- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
index 8c92844dec2..b9af0e70c7d 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
@@ -545,6 +545,44 @@ class DeltaJoinTest extends TableTestBase {
util.verifyRelPlanInsert("insert into tmp_snk select a0, a1 from src1")
}
+ @Test
+ def testMultiIndexesInSourceWhileJoinKeyContainsOneOfThem(): Unit = {
+ addTable(
+ "tmp_src11",
+ Schema
+ .newBuilder()
+ .column("a0", DataTypes.INT.notNull)
+ .column("a1", DataTypes.DOUBLE.notNull)
+ .column("a2", DataTypes.STRING)
+ .column("a3", DataTypes.INT)
+ .index("a1", "a2")
+ .index("a2")
+ .build()
+ )
+
+ addTable(
+ "tmp_src12",
+ Schema
+ .newBuilder()
+ .column("a0", DataTypes.INT.notNull)
+ .column("a1", DataTypes.DOUBLE.notNull)
+ .column("a2", DataTypes.STRING)
+ .column("a3", DataTypes.INT)
+ .index("a2")
+ .index("a1", "a2")
+ .build()
+ )
+
+ val stmt = tEnv.createStatementSet()
+ stmt.addInsertSql(
+ "insert into snk select * from tmp_src11 join src2 " +
+ "on tmp_src11.a2 = src2.b2")
+ stmt.addInsertSql(
+ "insert into snk select * from tmp_src12 join src2 " +
+ "on tmp_src12.a2 = src2.b2")
+ util.verifyRelPlan(stmt)
+ }
+
private def addTable(
tableName: String,
schema: Schema,