This is an automated email from the ASF dual-hosted git repository.
xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 45a04a78eef [FLINK-39174][table-planner] Support lookup join after
delta join (#27698)
45a04a78eef is described below
commit 45a04a78eef779352c94da9869946cdf9fa8f8c0
Author: Xuyang <[email protected]>
AuthorDate: Wed Mar 25 19:12:20 2026 +0800
[FLINK-39174][table-planner] Support lookup join after delta join (#27698)
---
.../physical/stream/DuplicateChangesInferRule.java | 4 +-
.../stream/DuplicateChangesInferRuleTest.xml | 6 +-
.../planner/plan/stream/sql/DeltaJoinTest.xml | 75 ++++++++++++++++++++++
.../planner/plan/stream/sql/DeltaJoinTest.scala | 50 +++++++++++++++
.../runtime/stream/sql/DeltaJoinITCase.scala | 73 +++++++++++++++++++++
5 files changed, 204 insertions(+), 4 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
index 6688ea1ac22..da74eca6d6b 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
@@ -32,6 +32,7 @@ import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalI
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLookupJoin;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink;
@@ -119,7 +120,8 @@ public class DuplicateChangesInferRule extends
RelRule<DuplicateChangesInferRule
|| rel instanceof StreamPhysicalTableSourceScan
|| rel instanceof StreamPhysicalDataStreamScan
|| rel instanceof StreamPhysicalLegacyTableSourceScan
- || rel instanceof StreamPhysicalIntermediateTableScan) {
+ || rel instanceof StreamPhysicalIntermediateTableScan
+ || rel instanceof StreamPhysicalLookupJoin) {
// forward parent requirement
requiredTrait = parentTrait;
} else {
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
index cb16b2780e3..b7b793aec32 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
@@ -540,9 +540,9 @@
LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b
Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c],
conflictStrategy=[DEDUPLICATE], duplicateChanges=[NONE])
+- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
+- LookupJoin(table=[default_catalog.default_database.dim_src],
joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0],
duplicateChanges=[ALLOW])
- +- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
- +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL
SECOND)], duplicateChanges=[DISALLOW])
- +- TableSourceScan(table=[[default_catalog, default_database,
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
+ +- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
+ +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL
SECOND)], duplicateChanges=[ALLOW])
+ +- TableSourceScan(table=[[default_catalog, default_database,
append_src1]], fields=[a, b, c, rt], duplicateChanges=[ALLOW])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
index 4bd96cd7faf..7c04bf6c2aa 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
@@ -956,6 +956,81 @@
Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1, a
+- Exchange(distribution=[hash[c1, c2]])
+- DropUpdateBefore
+- TableSourceScan(table=[[default_catalog, default_database,
no_delete_src3, project=[c1, c2], metadata=[]]], fields=[c1, c2])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLookupJoinAfterDeltaJoin">
+ <Resource name="sql">
+ <![CDATA[
+insert into snk
+select a0, a1, a2, a3, b0, b2, b1 from myv
+ join dim for system_time as of pt
+ on a0 = f0
+on conflict do deduplicate
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2,
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 7}])
+ :- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6], pt=[PROCTIME()])
+ : +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))],
joinType=[inner])
+ : :- LogicalTableScan(table=[[default_catalog, default_database,
src1]])
+ : +- LogicalTableScan(table=[[default_catalog, default_database,
src2]])
+ +- LogicalFilter(condition=[=($cor0.a0, $0)])
+ +- LogicalSnapshot(period=[$cor0.pt])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
dim]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0,
b2, b1], conflictStrategy=[DEDUPLICATE])
++- Calc(select=[a0, a1, a2, a3, b0, b2, b1])
+ +- LookupJoin(table=[default_catalog.default_database.dim],
joinType=[InnerJoin], lookup=[f0=a0], select=[a0, a1, a2, a3, b0, b2, b1, f0],
async=[ORDERED, KEY_ORDERED: false, 180000ms, 100])
+ +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))],
leftToRight=[Binary], rightToLeft=[Binary], select=[a0, a1, a2, a3, b0, b2, b1])
+ :- Exchange(distribution=[hash[a1, a2]])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
src1]], fields=[a0, a1, a2, a3])
+ +- Exchange(distribution=[hash[b1, b2]])
+ +- TableSourceScan(table=[[default_catalog, default_database,
src2]], fields=[b0, b2, b1])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLookupJoinBeforeJoin">
+ <Resource name="sql">
+ <![CDATA[
+insert into snk
+select a0, a1, a2, a3, b0, b2, b1 from lookup_v
+ join src2
+ on a1 = b1 and a2 = b2
+on conflict do deduplicate
+]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2,
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5],
b1=[$6])
+ +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+ :- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3])
+ : +- LogicalCorrelate(correlation=[$cor1], joinType=[inner],
requiredColumns=[{0, 4}])
+ : :- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3],
pt=[PROCTIME()])
+ : : +- LogicalTableScan(table=[[default_catalog, default_database,
src1]])
+ : +- LogicalFilter(condition=[=($cor1.a0, $0)])
+ : +- LogicalSnapshot(period=[$cor1.pt])
+ : +- LogicalTableScan(table=[[default_catalog,
default_database, dim]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0,
b2, b1], conflictStrategy=[DEDUPLICATE])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0,
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey],
rightInputSpec=[NoUniqueKey])
+ :- Exchange(distribution=[hash[a1, a2]])
+ : +- Calc(select=[a0, a1, a2, a3])
+ : +- LookupJoin(table=[default_catalog.default_database.dim],
joinType=[InnerJoin], lookup=[f0=a0], select=[a0, a1, a2, a3, f0],
async=[ORDERED, KEY_ORDERED: false, 180000ms, 100])
+ : +- TableSourceScan(table=[[default_catalog, default_database,
src1]], fields=[a0, a1, a2, a3])
+ +- Exchange(distribution=[hash[b1, b2]])
+ +- TableSourceScan(table=[[default_catalog, default_database, src2]],
fields=[b0, b2, b1])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
index e1ac227c44c..47414643751 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
@@ -162,6 +162,15 @@ class DeltaJoinTest extends TableTestBase {
.primaryKey("l0", "l1", "l2", "r1", "r2")
.build()
)
+
+ addTable(
+ "dim",
+ Schema
+ .newBuilder()
+ .column("f0", DataTypes.INT.notNull)
+ .column("f1", DataTypes.DOUBLE.notNull)
+ .build()
+ )
}
@Test
@@ -1154,6 +1163,47 @@ class DeltaJoinTest extends TableTestBase {
util.verifyRelPlan(stmt)
}
+ @Test
+ def testLookupJoinAfterDeltaJoin(): Unit = {
+ tEnv.executeSql("""
+ |create temporary view myv as
+ |select *, proctime() as pt from src1 join src2
+ | on src1.a1 = src2.b1
+ | and src1.a2 = src2.b2
+ |""".stripMargin)
+
+ util.verifyRelPlanInsert("""
+ |insert into snk
+ |select a0, a1, a2, a3, b0, b2, b1 from myv
+ | join dim for system_time as of pt
+ | on a0 = f0
+ |on conflict do deduplicate
+ |""".stripMargin)
+ }
+
+ @Test
+ def testLookupJoinBeforeJoin(): Unit = {
+ tEnv.executeSql("""
+ |create temporary view src1_v as
+ |select *, proctime() as pt from src1
+ |""".stripMargin)
+
+ tEnv.executeSql("""
+ |create temporary view lookup_v as
+ |select a0, a1, a2, a3 from src1_v
+ | join dim for system_time as of pt
+ | on a0 = f0
+ |""".stripMargin)
+
+ util.verifyRelPlanInsert("""
+ |insert into snk
+ |select a0, a1, a2, a3, b0, b2, b1 from lookup_v
+ | join src2
+ | on a1 = b1 and a2 = b2
+ |on conflict do deduplicate
+ |""".stripMargin)
+ }
+
@Test
def testLHS1(): Unit = {
// DT
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
index 0a769fc15a7..be5f311edbe 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
@@ -775,6 +775,79 @@ class DeltaJoinITCase(enableCache: Boolean) extends
StreamingTestBase {
assertThat(AsyncTestValueLookupFunction.invokeCount.get()).isEqualTo(6)
}
+ @TestTemplate
+ def testLookupJoinDimTableWithPkAfterJoin(): Unit = {
+ testLookupJoinAfterJoinInner(true)
+ }
+
+ @TestTemplate
+ def testLookupJoinDimTableWithoutPkAfterJoin(): Unit = {
+ testLookupJoinAfterJoinInner(false)
+ }
+
+ def testLookupJoinAfterJoinInner(dimTableContainsPK: Boolean): Unit = {
+ val data1 = List(
+ changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021,
1, 1, 1, 1, 1)),
+ changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022,
2, 2, 2, 2, 2)),
+ // mismatch
+ changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033,
3, 3, 3, 3, 3))
+ )
+
+ val data2 = List(
+ changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021,
1, 1, 1, 1, 11)),
+ changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022,
2, 2, 2, 2, 22)),
+ // mismatch
+ changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099,
2, 2, 2, 2, 2))
+ )
+
+ prepareTable(List("a0"), List("b0"), data1, data2)
+
+ val dimData = List(
+ changelogRow("+I", Int.box(2), "s2"),
+ // mismatch
+ changelogRow("+I", Int.box(4), "s4")
+ )
+
+ tEnv.executeSql(s"""
+ |create table dim (
+ | id int ${if (dimTableContainsPK) "primary key not
enforced" else ""},
+ | dim_value string
+ |) with (
+ | 'connector' = 'values',
+ | 'data-id' =
'${TestValuesTableFactory.registerData(dimData)}'
+ |)""".stripMargin)
+
+ // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the
RowKind from
+ // "+U" to "+I"
+ val expected = List(
+ "+I[2.0, 2, 2022-02-02T02:02:02, 2, 2.0, 2022-02-02T02:02:22, s2]"
+ )
+
+ tEnv.executeSql("alter table testSnk add (dim_value string)")
+
+ tEnv
+ .executeSql("""
+ |insert into testSnk
+ | select a1, a0, a2, b0, b1, b2, dim_value
+ | from (
+ | select
+ | *, proctime() as pt
+ | from testLeft
+ | join testRight
+ | on a0 = b0
+ | ) tmp
+ | join dim
+ | for system_time as of pt
+ | on tmp.a0 = dim.id
+ | on conflict do deduplicate
+ |""".stripMargin)
+ .await()
+ val result = TestValuesTableFactory.getResultsAsStrings("testSnk")
+
+ assertThat(result.sorted).isEqualTo(expected.sorted)
+ assertThat(AsyncTestValueLookupFunction.invokeCount.get()).isEqualTo(6)
+ }
+
@TestTemplate
def testFailOverAndRestore(): Unit = {
// enable checkpoint, we are using failing source to force have a complete
checkpoint