This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.1 by this push:
     new 39d20cf1991 [FLINK-38399][table-planner] Convert join to delta join if 
join key contains one of the indexes in the source
39d20cf1991 is described below

commit 39d20cf199172153bd955233c22e7259b76c2f4b
Author: Xuyang <[email protected]>
AuthorDate: Fri Oct 24 11:20:23 2025 +0800

    [FLINK-38399][table-planner] Convert join to delta join if join key 
contains one of the indexes in the source
---
 .../table/planner/plan/utils/DeltaJoinUtil.java    | 17 +++++-----
 .../planner/plan/stream/sql/DeltaJoinTest.xml      | 33 +++++++++++++++++++
 .../planner/plan/stream/sql/DeltaJoinTest.scala    | 38 ++++++++++++++++++++++
 3 files changed, 80 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
index 8c6a9a8542f..d7dfb3d4035 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
@@ -276,14 +276,15 @@ public class DeltaJoinUtil {
         // the source must have at least one index, and the join key contains 
one index
         Set<Integer> lookupKeysSet = 
Arrays.stream(lookupKeys).boxed().collect(Collectors.toSet());
 
-        for (int[] idxsOfIndex : idxsOfAllIndexes) {
-            Preconditions.checkState(idxsOfIndex.length > 0);
-
-            // ignore the field order of the index
-            boolean containsIndex = 
Arrays.stream(idxsOfIndex).allMatch(lookupKeysSet::contains);
-            if (!containsIndex) {
-                return false;
-            }
+        boolean lookupKeyContainsOneIndex =
+                Arrays.stream(idxsOfAllIndexes)
+                        .peek(idxsOfIndex -> 
Preconditions.checkState(idxsOfIndex.length > 0))
+                        .anyMatch(
+                                idxsOfIndex ->
+                                        Arrays.stream(idxsOfIndex)
+                                                
.allMatch(lookupKeysSet::contains));
+        if (!lookupKeyContainsOneIndex) {
+            return false;
         }
 
         // the lookup source must support async lookup
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
index d689955746b..d6d7054f731 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
@@ -346,6 +346,39 @@ Sink(table=[default_catalog.default_database.snk], 
fields=[a0, a1, a2, a3, b0, b
    :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a0, a1, a2, a3])
    +- Exchange(distribution=[hash[b2]])
       +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testMultiIndexesInSourceWhileJoinKeyContainsOneOfThem">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, 
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[=($2, $5)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
tmp_src11]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, 
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[=($2, $5)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
tmp_src12]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, 
b2, b1])
++- Union(all=[true], union=[a0, a1, a2, a3, b0, b2, b1])
+   :- DeltaJoin(joinType=[InnerJoin], where=[=(a2, b2)], select=[a0, a1, a2, 
a3, b0, b2, b1])
+   :  :- Exchange(distribution=[hash[a2]])
+   :  :  +- TableSourceScan(table=[[default_catalog, default_database, 
tmp_src11]], fields=[a0, a1, a2, a3])
+   :  +- Exchange(distribution=[hash[b2]])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
+   +- DeltaJoin(joinType=[InnerJoin], where=[=(a2, b2)], select=[a0, a1, a2, 
a3, b0, b2, b1])
+      :- Exchange(distribution=[hash[a2]])
+      :  +- TableSourceScan(table=[[default_catalog, default_database, 
tmp_src12]], fields=[a0, a1, a2, a3])
+      +- Exchange(distribution=[hash[b2]])
+         +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
index 8c92844dec2..b9af0e70c7d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
@@ -545,6 +545,44 @@ class DeltaJoinTest extends TableTestBase {
     util.verifyRelPlanInsert("insert into tmp_snk select a0, a1 from src1")
   }
 
+  @Test
+  def testMultiIndexesInSourceWhileJoinKeyContainsOneOfThem(): Unit = {
+    addTable(
+      "tmp_src11",
+      Schema
+        .newBuilder()
+        .column("a0", DataTypes.INT.notNull)
+        .column("a1", DataTypes.DOUBLE.notNull)
+        .column("a2", DataTypes.STRING)
+        .column("a3", DataTypes.INT)
+        .index("a1", "a2")
+        .index("a2")
+        .build()
+    )
+
+    addTable(
+      "tmp_src12",
+      Schema
+        .newBuilder()
+        .column("a0", DataTypes.INT.notNull)
+        .column("a1", DataTypes.DOUBLE.notNull)
+        .column("a2", DataTypes.STRING)
+        .column("a3", DataTypes.INT)
+        .index("a2")
+        .index("a1", "a2")
+        .build()
+    )
+
+    val stmt = tEnv.createStatementSet()
+    stmt.addInsertSql(
+      "insert into snk select * from tmp_src11 join src2 " +
+        "on tmp_src11.a2 = src2.b2")
+    stmt.addInsertSql(
+      "insert into snk select * from tmp_src12 join src2 " +
+        "on tmp_src12.a2 = src2.b2")
+    util.verifyRelPlan(stmt)
+  }
+
   private def addTable(
       tableName: String,
       schema: Schema,

Reply via email to