This is an automated email from the ASF dual-hosted git repository.
xuyangzhong pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.1 by this push:
new acb998e94a8 [FLINK-38586][table-runtime] Fix ClassCastException when
using Delta Join with lookup cache (#27193)
acb998e94a8 is described below
commit acb998e94a8624c912e32ba902fac1cda88d186c
Author: Stepan Stepanishchev
<[email protected]>
AuthorDate: Fri Nov 7 13:47:02 2025 +0700
[FLINK-38586][table-runtime] Fix ClassCastException when using Delta Join
with lookup cache (#27193)
---
.../table/planner/plan/utils/DeltaJoinUtil.java | 4 +--
.../planner/plan/stream/sql/DeltaJoinTest.xml | 29 ++++++++++++++++++++++
.../planner/plan/stream/sql/DeltaJoinTest.scala | 27 ++++++++++++++++++++
3 files changed, 58 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
index d7dfb3d4035..288de3287c8 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
@@ -167,13 +167,13 @@ public class DeltaJoinUtil {
while (changed) {
// unwrap cache delegator
if (lookupFunction instanceof CachingAsyncLookupFunction) {
- lookupFunction = ((CachingAsyncLookupFunction)
temporalTable).getDelegate();
+ lookupFunction = ((CachingAsyncLookupFunction)
lookupFunction).getDelegate();
continue;
}
// unwrap retryable delegator
if (lookupFunction instanceof
RetryableAsyncLookupFunctionDelegator) {
lookupFunction =
- ((RetryableAsyncLookupFunctionDelegator) temporalTable)
+ ((RetryableAsyncLookupFunctionDelegator)
lookupFunction)
.getUserLookupFunction();
continue;
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
index d6d7054f731..9f12a320c0a 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
@@ -346,6 +346,35 @@ Sink(table=[default_catalog.default_database.snk],
fields=[a0, a1, a2, a3, b0, b
: +- TableSourceScan(table=[[default_catalog, default_database, src1]],
fields=[a0, a1, a2, a3])
+- Exchange(distribution=[hash[b2]])
+- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLookupTableWithCache">
+ <Resource name="explain">
+ <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2,
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, lc_src]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+
+== Optimized Physical Plan With Advice ==
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0,
b2, b1])
++- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))],
select=[a0, a1, a2, a3, b0, b2, b1])
+ :- Exchange(distribution=[hash[a1, a2]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, lc_src]],
fields=[a0, a1, a2, a3])
+ +- Exchange(distribution=[hash[b1, b2]])
+ +- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
+
+No available advice...
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0,
b2, b1])
++- DeltaJoin(joinType=[InnerJoin], where=[((a1 = b1) AND (a2 = b2))],
select=[a0, a1, a2, a3, b0, b2, b1])
+ :- Exchange(distribution=[hash[a1, a2]])
+ : +- TableSourceScan(table=[[default_catalog, default_database, lc_src]],
fields=[a0, a1, a2, a3])
+ +- Exchange(distribution=[hash[b1, b2]])
+ +- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
index b9af0e70c7d..7f97e59c6b2 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
@@ -283,6 +283,33 @@ class DeltaJoinTest extends TableTestBase {
ExplainDetail.PLAN_ADVICE)
}
+ @Test
+ def testLookupTableWithCache(): Unit = {
+ val lookupOptions = new JHashMap[String, String]()
+ lookupOptions.put("lookup.cache", "partial")
+ lookupOptions.put("lookup.partial-cache.max-rows", "1000")
+
+ addTable(
+ "lc_src",
+ Schema
+ .newBuilder()
+ .column("a0", DataTypes.INT.notNull)
+ .column("a1", DataTypes.DOUBLE.notNull)
+ .column("a2", DataTypes.STRING)
+ .column("a3", DataTypes.INT)
+ .index("a1", "a2")
+ .index("a3")
+ .build(),
+ lookupOptions
+ )
+
+ util.verifyExplainInsert(
+ "insert into snk select * from lc_src join src2 " +
+ "on lc_src.a1 = src2.b1 " +
+ "and lc_src.a2 = src2.b2",
+ ExplainDetail.PLAN_ADVICE)
+ }
+
@Test
def testWithWatermarkAssigner(): Unit = {
addTable(