This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 45a04a78eef [FLINK-39174][table-planner] Support lookup join after 
delta join (#27698)
45a04a78eef is described below

commit 45a04a78eef779352c94da9869946cdf9fa8f8c0
Author: Xuyang <[email protected]>
AuthorDate: Wed Mar 25 19:12:20 2026 +0800

    [FLINK-39174][table-planner] Support lookup join after delta join (#27698)
---
 .../physical/stream/DuplicateChangesInferRule.java |  4 +-
 .../stream/DuplicateChangesInferRuleTest.xml       |  6 +-
 .../planner/plan/stream/sql/DeltaJoinTest.xml      | 75 ++++++++++++++++++++++
 .../planner/plan/stream/sql/DeltaJoinTest.scala    | 50 +++++++++++++++
 .../runtime/stream/sql/DeltaJoinITCase.scala       | 73 +++++++++++++++++++++
 5 files changed, 204 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
index 6688ea1ac22..da74eca6d6b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRule.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalI
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacySink;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLegacyTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLookupJoin;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink;
@@ -119,7 +120,8 @@ public class DuplicateChangesInferRule extends 
RelRule<DuplicateChangesInferRule
                 || rel instanceof StreamPhysicalTableSourceScan
                 || rel instanceof StreamPhysicalDataStreamScan
                 || rel instanceof StreamPhysicalLegacyTableSourceScan
-                || rel instanceof StreamPhysicalIntermediateTableScan) {
+                || rel instanceof StreamPhysicalIntermediateTableScan
+                || rel instanceof StreamPhysicalLookupJoin) {
             // forward parent requirement
             requiredTrait = parentTrait;
         } else {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
index cb16b2780e3..b7b793aec32 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml
@@ -540,9 +540,9 @@ 
LogicalSink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b
 Sink(table=[default_catalog.default_database.pk_upsert_snk], fields=[a, b, c], 
conflictStrategy=[DEDUPLICATE], duplicateChanges=[NONE])
 +- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
    +- LookupJoin(table=[default_catalog.default_database.dim_src], 
joinType=[InnerJoin], lookup=[a=a], select=[a, b, c, a0], 
duplicateChanges=[ALLOW])
-      +- Calc(select=[a, b, c], duplicateChanges=[DISALLOW])
-         +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL 
SECOND)], duplicateChanges=[DISALLOW])
-            +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1]], fields=[a, b, c, rt], duplicateChanges=[DISALLOW])
+      +- Calc(select=[a, b, c], duplicateChanges=[ALLOW])
+         +- WatermarkAssigner(rowtime=[rt], watermark=[-(rt, 1000:INTERVAL 
SECOND)], duplicateChanges=[ALLOW])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
append_src1]], fields=[a, b, c, rt], duplicateChanges=[ALLOW])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
index 4bd96cd7faf..7c04bf6c2aa 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
@@ -956,6 +956,81 @@ 
Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1, a
       +- Exchange(distribution=[hash[c1, c2]])
          +- DropUpdateBefore
             +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_src3, project=[c1, c2], metadata=[]]], fields=[c1, c2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLookupJoinAfterDeltaJoin">
+    <Resource name="sql">
+      <![CDATA[
+insert into snk
+select a0, a1, a2, a3, b0, b2, b1 from myv
+  join dim for system_time as of pt
+  on a0 = f0
+on conflict do deduplicate
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, 
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 7}])
+      :- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6], pt=[PROCTIME()])
+      :  +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], 
joinType=[inner])
+      :     :- LogicalTableScan(table=[[default_catalog, default_database, 
src1]])
+      :     +- LogicalTableScan(table=[[default_catalog, default_database, 
src2]])
+      +- LogicalFilter(condition=[=($cor0.a0, $0)])
+         +- LogicalSnapshot(period=[$cor0.pt])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
dim]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, 
b2, b1], conflictStrategy=[DEDUPLICATE])
++- Calc(select=[a0, a1, a2, a3, b0, b2, b1])
+   +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[f0=a0], select=[a0, a1, a2, a3, b0, b2, b1, f0], 
async=[ORDERED, KEY_ORDERED: false, 180000ms, 100])
+      +- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], 
leftToRight=[Binary], rightToLeft=[Binary], select=[a0, a1, a2, a3, b0, b2, b1])
+         :- Exchange(distribution=[hash[a1, a2]])
+         :  +- TableSourceScan(table=[[default_catalog, default_database, 
src1]], fields=[a0, a1, a2, a3])
+         +- Exchange(distribution=[hash[b1, b2]])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
src2]], fields=[b0, b2, b1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testLookupJoinBeforeJoin">
+    <Resource name="sql">
+      <![CDATA[
+insert into snk
+select a0, a1, a2, a3, b0, b2, b1 from lookup_v
+  join src2
+  on a1 = b1 and a2 = b2
+on conflict do deduplicate
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, 
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+      :- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3])
+      :  +- LogicalCorrelate(correlation=[$cor1], joinType=[inner], 
requiredColumns=[{0, 4}])
+      :     :- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], 
pt=[PROCTIME()])
+      :     :  +- LogicalTableScan(table=[[default_catalog, default_database, 
src1]])
+      :     +- LogicalFilter(condition=[=($cor1.a0, $0)])
+      :        +- LogicalSnapshot(period=[$cor1.pt])
+      :           +- LogicalTableScan(table=[[default_catalog, 
default_database, dim]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, 
b2, b1], conflictStrategy=[DEDUPLICATE])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, 
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a1, a2]])
+   :  +- Calc(select=[a0, a1, a2, a3])
+   :     +- LookupJoin(table=[default_catalog.default_database.dim], 
joinType=[InnerJoin], lookup=[f0=a0], select=[a0, a1, a2, a3, f0], 
async=[ORDERED, KEY_ORDERED: false, 180000ms, 100])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
src1]], fields=[a0, a1, a2, a3])
+   +- Exchange(distribution=[hash[b1, b2]])
+      +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
index e1ac227c44c..47414643751 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
@@ -162,6 +162,15 @@ class DeltaJoinTest extends TableTestBase {
         .primaryKey("l0", "l1", "l2", "r1", "r2")
         .build()
     )
+
+    addTable(
+      "dim",
+      Schema
+        .newBuilder()
+        .column("f0", DataTypes.INT.notNull)
+        .column("f1", DataTypes.DOUBLE.notNull)
+        .build()
+    )
   }
 
   @Test
@@ -1154,6 +1163,47 @@ class DeltaJoinTest extends TableTestBase {
     util.verifyRelPlan(stmt)
   }
 
+  @Test
+  def testLookupJoinAfterDeltaJoin(): Unit = {
+    tEnv.executeSql("""
+                      |create temporary view myv as
+                      |select *, proctime() as pt from src1 join src2
+                      | on src1.a1 = src2.b1
+                      |   and src1.a2 = src2.b2
+                      |""".stripMargin)
+
+    util.verifyRelPlanInsert("""
+                               |insert into snk
+                               |select a0, a1, a2, a3, b0, b2, b1 from myv
+                               |  join dim for system_time as of pt
+                               |  on a0 = f0
+                               |on conflict do deduplicate
+                               |""".stripMargin)
+  }
+
+  @Test
+  def testLookupJoinBeforeJoin(): Unit = {
+    tEnv.executeSql("""
+                      |create temporary view src1_v as
+                      |select *, proctime() as pt from src1
+                      |""".stripMargin)
+
+    tEnv.executeSql("""
+                      |create temporary view lookup_v as
+                      |select a0, a1, a2, a3 from src1_v
+                      |  join dim for system_time as of pt
+                      |  on a0 = f0
+                      |""".stripMargin)
+
+    util.verifyRelPlanInsert("""
+                               |insert into snk
+                               |select a0, a1, a2, a3, b0, b2, b1 from lookup_v
+                               |  join src2
+                               |  on a1 = b1 and a2 = b2
+                               |on conflict do deduplicate
+                               |""".stripMargin)
+  }
+
   @Test
   def testLHS1(): Unit = {
     //       DT
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
index 0a769fc15a7..be5f311edbe 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
@@ -775,6 +775,79 @@ class DeltaJoinITCase(enableCache: Boolean) extends 
StreamingTestBase {
     assertThat(AsyncTestValueLookupFunction.invokeCount.get()).isEqualTo(6)
   }
 
+  @TestTemplate
+  def testLookupJoinDimTableWithPkAfterJoin(): Unit = {
+    testLookupJoinAfterJoinInner(true)
+  }
+
+  @TestTemplate
+  def testLookupJoinDimTableWithoutPkAfterJoin(): Unit = {
+    testLookupJoinAfterJoinInner(false)
+  }
+
+  def testLookupJoinAfterJoinInner(dimTableContainsPK: Boolean): Unit = {
+    val data1 = List(
+      changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 
1, 1, 1, 1, 1)),
+      changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 
2, 2, 2, 2, 2)),
+      // mismatch
+      changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033, 
3, 3, 3, 3, 3))
+    )
+
+    val data2 = List(
+      changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 
1, 1, 1, 1, 11)),
+      changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022, 
2, 2, 2, 2, 22)),
+      // mismatch
+      changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 
2, 2, 2, 2, 2))
+    )
+
+    prepareTable(List("a0"), List("b0"), data1, data2)
+
+    val dimData = List(
+      changelogRow("+I", Int.box(2), "s2"),
+      // mismatch
+      changelogRow("+I", Int.box(4), "s4")
+    )
+
+    tEnv.executeSql(s"""
+                       |create table dim (
+                       |  id int ${if (dimTableContainsPK) "primary key not 
enforced" else ""},
+                       |  dim_value string
+                       |) with (
+                       |  'connector' = 'values',
+                       |  'data-id' = 
'${TestValuesTableFactory.registerData(dimData)}'
+                       |)""".stripMargin)
+
+    // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the 
RowKind from
+    // "+U" to "+I"
+    val expected = List(
+      "+I[2.0, 2, 2022-02-02T02:02:02, 2, 2.0, 2022-02-02T02:02:22, s2]"
+    )
+
+    tEnv.executeSql("alter table testSnk add (dim_value string)")
+
+    tEnv
+      .executeSql("""
+                    |insert into testSnk
+                    | select a1, a0, a2, b0, b1, b2, dim_value
+                    | from (
+                    |   select
+                    |     *, proctime() as pt
+                    |   from testLeft
+                    |   join testRight
+                    |     on a0 = b0
+                    | ) tmp
+                    | join dim
+                    |   for system_time as of pt
+                    |     on tmp.a0 = dim.id
+                    | on conflict do deduplicate
+                    |""".stripMargin)
+      .await()
+    val result = TestValuesTableFactory.getResultsAsStrings("testSnk")
+
+    assertThat(result.sorted).isEqualTo(expected.sorted)
+    assertThat(AsyncTestValueLookupFunction.invokeCount.get()).isEqualTo(6)
+  }
+
   @TestTemplate
   def testFailOverAndRestore(): Unit = {
     // enable checkpoint, we are using failing source to force have a complete 
checkpoint

Reply via email to