This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.1 by this push:
     new acb998e94a8 [FLINK-38586][table-runtime] Fix ClassCastException when 
using Delta Join with lookup cache (#27193)
acb998e94a8 is described below

commit acb998e94a8624c912e32ba902fac1cda88d186c
Author: Stepan Stepanishchev 
<[email protected]>
AuthorDate: Fri Nov 7 13:47:02 2025 +0700

    [FLINK-38586][table-runtime] Fix ClassCastException when using Delta Join 
with lookup cache (#27193)
---
 .../table/planner/plan/utils/DeltaJoinUtil.java    |  4 +--
 .../planner/plan/stream/sql/DeltaJoinTest.xml      | 29 ++++++++++++++++++++++
 .../planner/plan/stream/sql/DeltaJoinTest.scala    | 27 ++++++++++++++++++++
 3 files changed, 58 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
index d7dfb3d4035..288de3287c8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
@@ -167,13 +167,13 @@ public class DeltaJoinUtil {
         while (changed) {
             // unwrap cache delegator
             if (lookupFunction instanceof CachingAsyncLookupFunction) {
-                lookupFunction = ((CachingAsyncLookupFunction) 
temporalTable).getDelegate();
+                lookupFunction = ((CachingAsyncLookupFunction) 
lookupFunction).getDelegate();
                 continue;
             }
             // unwrap retryable delegator
             if (lookupFunction instanceof 
RetryableAsyncLookupFunctionDelegator) {
                 lookupFunction =
-                        ((RetryableAsyncLookupFunctionDelegator) temporalTable)
+                        ((RetryableAsyncLookupFunctionDelegator) 
lookupFunction)
                                 .getUserLookupFunction();
                 continue;
             }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
index d6d7054f731..9f12a320c0a 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
@@ -346,6 +346,35 @@ Sink(table=[default_catalog.default_database.snk], 
fields=[a0, a1, a2, a3, b0, b
    :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a0, a1, a2, a3])
    +- Exchange(distribution=[hash[b2]])
       +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLookupTableWithCache">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, 
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, lc_src]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+
+== Optimized Physical Plan With Advice ==
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, 
b2, b1])
++- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], 
select=[a0, a1, a2, a3, b0, b2, b1])
+   :- Exchange(distribution=[hash[a1, a2]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, lc_src]], 
fields=[a0, a1, a2, a3])
+   +- Exchange(distribution=[hash[b1, b2]])
+      +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
+
+No available advice...
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, 
b2, b1])
++- DeltaJoin(joinType=[InnerJoin], where=[((a1 = b1) AND (a2 = b2))], 
select=[a0, a1, a2, a3, b0, b2, b1])
+   :- Exchange(distribution=[hash[a1, a2]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, lc_src]], 
fields=[a0, a1, a2, a3])
+   +- Exchange(distribution=[hash[b1, b2]])
+      +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
index b9af0e70c7d..7f97e59c6b2 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
@@ -283,6 +283,33 @@ class DeltaJoinTest extends TableTestBase {
       ExplainDetail.PLAN_ADVICE)
   }
 
+  @Test
+  def testLookupTableWithCache(): Unit = {
+    val lookupOptions = new JHashMap[String, String]()
+    lookupOptions.put("lookup.cache", "partial")
+    lookupOptions.put("lookup.partial-cache.max-rows", "1000")
+
+    addTable(
+      "lc_src",
+      Schema
+        .newBuilder()
+        .column("a0", DataTypes.INT.notNull)
+        .column("a1", DataTypes.DOUBLE.notNull)
+        .column("a2", DataTypes.STRING)
+        .column("a3", DataTypes.INT)
+        .index("a1", "a2")
+        .index("a3")
+        .build(),
+      lookupOptions
+    )
+
+    util.verifyExplainInsert(
+      "insert into snk select * from lc_src join src2 " +
+        "on lc_src.a1 = src2.b1 " +
+        "and lc_src.a2 = src2.b2",
+      ExplainDetail.PLAN_ADVICE)
+  }
+
   @Test
   def testWithWatermarkAssigner(): Unit = {
     addTable(

Reply via email to