This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8d8bf958a9af385f244b9231e33ea169d9401ac5
Author: xuyang <[email protected]>
AuthorDate: Thu Mar 26 19:48:30 2026 +0800

    [FLINK-39339][table-planner] Consider immutable cols to infer sink required 
updated mode traits
---
 .../FlinkChangelogModeInferenceProgram.scala       |  70 ++++++++----
 .../physical/stream/ChangelogModeInferenceTest.xml |  46 ++++++++
 .../stream/ChangelogModeInferenceTest.scala        | 119 +++++++++++++++++++++
 3 files changed, 216 insertions(+), 19 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index b11f9edc7c3..56c9cb1262a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -998,8 +998,9 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
 
     /**
      * Infer sink required traits by the sink node and its input. Sink 
required traits is based on
-     * the sink node's changelog mode, the only exception is when sink's pk(s) 
not exactly the same
-     * as the changeLogUpsertKeys and sink' changelog mode is 
ONLY_UPDATE_AFTER.
+     * the sink node's changelog mode, the only exception is when sink's pk(s) 
are not satisfied by
+     * the input's upsert keys (considering immutable columns) and sink's 
changelog mode is
+     * ONLY_UPDATE_AFTER.
      */
     private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[UpdateKindTrait] = {
       val childModifyKindSet = getModifyKindSet(sink.getInput)
@@ -1009,23 +1010,9 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         
sink.tableSink.getChangelogMode(childModifyKindSet.toDefaultChangelogMode))
 
       val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) {
-        // if sink's pk(s) are not exactly match input changeLogUpsertKeys 
then it will fallback
-        // to beforeAndAfter mode for the correctness
-        var requireBeforeAndAfter: Boolean = false
-        val sinkDefinedPks = 
sink.contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
-
-        if (sinkDefinedPks.nonEmpty) {
-          val sinkPks = ImmutableBitSet.of(sinkDefinedPks: _*)
-          val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
-          val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
-          // if input is UA only, primary key != upsert key (upsert key can be 
null) we should
-          // fallback to beforeAndAfter.
-          // Notice: even sink pk(s) contains input upsert key we cannot 
optimize to UA only,
-          // this differs from batch job's unique key inference
-          if (changeLogUpsertKeys == null || 
!changeLogUpsertKeys.exists(_.equals(sinkPks))) {
-            requireBeforeAndAfter = true
-          }
-        }
+        // if sink's pk(s) are not satisfied by input upsert keys (considering 
immutable columns),
+        // fallback to beforeAndAfter mode for correctness
+        val requireBeforeAndAfter = 
!canUpsertKeysWithImmutableColsSatisfyPk(sink)
         if (requireBeforeAndAfter) {
           Seq(beforeAndAfter)
         } else {
@@ -1039,6 +1026,51 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
       sinkRequiredTraits
     }
 
+    /**
+     * Check whether input's upsert keys (together with immutable columns) can 
satisfy sink's
+     * primary keys.
+     *
+     * <p>A sink pk is considered "satisfied" when there exists an upsert key 
`uk` such that:
+     *   - `uk` is a subset of sink pk (no extra columns that could cause key 
collision)
+     *   - the remaining sink pk columns not in `uk` are all immutable 
(immutable columns never
+     *     change, so they effectively act as part of the key for upsert 
semantics)
+     *
+     * <p>Example: sink pk = {a, b, c}, uk = {a, b}, immutable columns = {a, 
b, c, d}.
+     *   - Step 1: uk {a, b} ⊆ sink pk {a, b, c} → true
+     *   - Step 2: sink pk \ uk = {c}, immutable columns contain {c} → true
+     *   - Result: satisfied
+     *
+     * <p>Notice: even if sink pk is a subset of the upsert key, the pk is NOT 
considered satisfied
+     * when the upsert key has columns outside sink pk. This differs from 
batch job's unique key
+     * inference.
+     */
+    private def canUpsertKeysWithImmutableColsSatisfyPk(sink: 
StreamPhysicalSink): Boolean = {
+      val sinkDefinedPks = 
sink.contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
+      if (sinkDefinedPks.isEmpty) {
+        return true
+      }
+      val sinkPks = ImmutableBitSet.of(sinkDefinedPks: _*)
+      val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+      val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+      // if upsert key is null, pk cannot be satisfied, should fall back to 
beforeAndAfter
+      if (changeLogUpsertKeys == null) {
+        return false
+      }
+      val immutableCols =
+        
Option.apply(fmq.getImmutableColumns(sink.getInput)).getOrElse(ImmutableBitSet.of())
+
+      // when input immutableCols is empty, this degrades to uk.equals(sinkPks)
+      changeLogUpsertKeys.exists(
+        uk => {
+          // 1. uk ⊆ sinkPks
+          val isSinkPkContainsUk = sinkPks.contains(uk)
+          // 2. (sinkPks \ uk) ⊆ immutableCols
+          val extraSinkPkCols = sinkPks.except(uk)
+          val areExtraSinkPkColsImmutable = 
immutableCols.contains(extraSinkPkCols)
+          isSinkPkContainsUk && areExtraSinkPkColsImmutable
+        })
+    }
+
     /**
      * Analyze whether to enable upsertMaterialize or not. In these case will 
return true:
      *   1. when `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to FORCE and sink's 
primary key nonempty.
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
index 071900e4bad..03c86424dd1 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
@@ -281,6 +281,37 @@ LogicalProject(word=[$0], number=[$1])
     <Resource name="optimized rel plan">
       <![CDATA[
 LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [CollectionTableSource(word, number)]]], fields=[word, number], 
changelogMode=[I])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSinkPkCoveredByUpsertKeyAndImmutableCols">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink], fields=[name, 
score, detail, id])
++- LogicalProject(name=[$0], score=[$1], detail=[$2], id=[$3])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink], fields=[name, score, 
detail, id], changelogMode=[NONE])
++- DropUpdateBefore(changelogMode=[I,UA])
+   +- TableSourceScan(table=[[default_catalog, default_database, src]], 
fields=[name, score, detail, id], changelogMode=[I,UB,UA])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSinkPkNotCoveredByUpsertKeyAndImmutableCols">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink], fields=[name, 
score, detail, id])
++- LogicalProject(name=[$0], score=[$1], detail=[$2], id=[$3])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink], fields=[name, score, 
detail, id], changelogMode=[NONE])
++- TableSourceScan(table=[[default_catalog, default_database, src]], 
fields=[name, score, detail, id], changelogMode=[I,UB,UA])
 ]]>
     </Resource>
   </TestCase>
@@ -522,6 +553,21 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, 
COUNT_RETRACT(count1$0) AS freq
                +- LocalGroupAggregate(groupBy=[word], select=[word, 
COUNT(number) AS count$0], changelogMode=[I])
                   +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], 
changelogMode=[I])
                      +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [CollectionTableSource(word, number)]]], 
fields=[word, number], changelogMode=[I])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUpsertKeyExceedsSinkPk">
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.sink], fields=[name, 
score, id])
++- LogicalProject(name=[$0], score=[$1], id=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.sink], fields=[name, score, id], 
upsertMaterialize=[true], conflictStrategy=[DEDUPLICATE], changelogMode=[NONE])
++- TableSourceScan(table=[[default_catalog, default_database, src]], 
fields=[name, score, id], changelogMode=[I,UB,UA])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
index 156890a9473..ce2dc3d4468 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
@@ -427,4 +427,123 @@ class ChangelogModeInferenceTest extends TableTestBase {
     // upsert key {id} does not contain {name}, so UB cannot be dropped
     util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
   }
+
+  @Test
+  def testSinkPkCoveredByUpsertKeyAndImmutableCols(): Unit = {
+    util.tableEnv.executeSql("""
+                               |create table src (
+                               | name string,
+                               | score int,
+                               | detail string,
+                               | id int primary key not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB'
+                               |)
+                               |""".stripMargin)
+
+    val catalog = 
util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get()
+    addImmutableColConstraint(catalog, util.tableEnv.getCurrentDatabase, 
"src", "name", "score")
+
+    util.tableEnv.executeSql("""
+                               |create table sink (
+                               | name string,
+                               | score int,
+                               | detail string,
+                               | id int,
+                               | primary key (id, name) not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'sink-insert-only' = 'false'
+                               |)
+                               |""".stripMargin)
+
+    val statementSet = util.tableEnv.createStatementSet()
+    statementSet.addInsertSql("INSERT INTO sink SELECT * FROM src")
+
+    // upsert keys of input: {{id}, {id, name, score}}
+    // immutable cols of input: {id, name , score}
+    // sink pk: {id, name}
+    // upsert key {id} is subset of sink pk, and {id} union immutable cols 
covers sink pk
+    // so ONLY_UPDATE_AFTER is safe
+    util.verifyRelPlan(statementSet, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testSinkPkNotCoveredByUpsertKeyAndImmutableCols(): Unit = {
+    util.tableEnv.executeSql("""
+                               |create table src (
+                               | name string,
+                               | score int,
+                               | detail string,
+                               | id int primary key not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB'
+                               |)
+                               |""".stripMargin)
+
+    val catalog = 
util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get()
+    addImmutableColConstraint(catalog, util.tableEnv.getCurrentDatabase, 
"src", "name")
+
+    util.tableEnv.executeSql("""
+                               |create table sink (
+                               | name string,
+                               | score int,
+                               | detail string,
+                               | id int,
+                               | primary key (id, name, score) not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'sink-insert-only' = 'false'
+                               |)
+                               |""".stripMargin)
+
+    val statementSet = util.tableEnv.createStatementSet()
+    statementSet.addInsertSql("INSERT INTO sink SELECT * FROM src")
+
+    // upsert keys of input: {{id}, {id, name}}
+    // sink pk: {id, name, score}
+    // {id} union immutable {name} = {id, name}, does NOT cover {id, name, 
score}
+    // so BEFORE_AND_AFTER is required
+    util.verifyRelPlan(statementSet, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testUpsertKeyExceedsSinkPk(): Unit = {
+    util.tableEnv.executeSql("""
+                               |create table src (
+                               | name string,
+                               | score int,
+                               | id int,
+                               | primary key (id, name) not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB'
+                               |)
+                               |""".stripMargin)
+
+    val catalog = 
util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get()
+    addImmutableColConstraint(catalog, util.tableEnv.getCurrentDatabase, 
"src", "score")
+
+    util.tableEnv.executeSql("""
+                               |create table sink (
+                               | name string,
+                               | score int,
+                               | id int,
+                               | primary key (id) not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'sink-insert-only' = 'false'
+                               |)
+                               |""".stripMargin)
+
+    val statementSet = util.tableEnv.createStatementSet()
+    statementSet.addInsertSql("INSERT INTO sink SELECT * FROM src ON CONFLICT 
DO DEDUPLICATE")
+
+    // upsert keys of input: {{id, name}, {id, name, score}}
+    // sink pk: {id}
+    // upsert key {id, name} is NOT a subset of sink pk {id}, so 
BEFORE_AND_AFTER is required
+    util.verifyRelPlan(statementSet, ExplainDetail.CHANGELOG_MODE)
+  }
 }

Reply via email to