This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2e49048680e452c3d0ecb981fe3e4fd685d6aacf
Author: xuyang <[email protected]>
AuthorDate: Tue Mar 24 19:30:54 2026 +0800

    [FLINK-39314][table-planner] Allow input to drop update before if filter is 
applied on any of upsert keys
---
 .../FlinkChangelogModeInferenceProgram.scala       |  10 +-
 .../physical/stream/ChangelogModeInferenceTest.xml | 124 ++++++++++++++++
 .../stream/ChangelogModeInferenceTest.scala        | 162 +++++++++++++++++++++
 3 files changed, 291 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index c660f00ad5a..b11f9edc7c3 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.plan.optimize.program
 
 import org.apache.flink.legacy.table.sinks.{AppendStreamTableSink, 
RetractStreamTableSink, StreamTableSink, UpsertStreamTableSink}
 import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.api.InsertConflictStrategy
 import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import 
org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
@@ -1559,10 +1558,11 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
       // there are no upsert keys, so all columns are non-primary key columns
       true
     } else {
-      val upsertKey = upsertKeys.head
-      RexNodeExtractor
-        .extractRefInputFields(JavaScalaConversionUtil.toJava(Seq(condition)))
-        .exists(i => !upsertKey.get(i))
+      val inputRefIndices =
+        RexNodeExtractor
+          
.extractRefInputFields(JavaScalaConversionUtil.toJava(Seq(condition)))
+      val inputRefSet = ImmutableBitSet.of(inputRefIndices: _*)
+      !upsertKeys.stream().anyMatch(uk => uk.contains(inputRefSet))
     }
   }
 
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
index 72ac683e619..071900e4bad 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
@@ -16,6 +16,130 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testFilterNotContainedByAnyUpsertKey">
+    <Resource name="sql">
+      <![CDATA[
+select * from src where name <> 'Tom' and score > 90
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(score=[$0], note=[$1], name=[$2], id=[$3])
++- LogicalFilter(condition=[AND(<>($2, _UTF-16LE'Tom'), >($0, 90))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[score, note, name, id], where=[AND(<>(name, 'Tom'), >(score, 
90))], changelogMode=[I,UB,UA])
++- TableSourceScan(table=[[default_catalog, default_database, src, 
filter=[]]], fields=[score, note, name, id], changelogMode=[I,UB,UA])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterOnNonUpsertKeyColOnly">
+    <Resource name="sql">
+      <![CDATA[
+select * from src where name <> 'Tom'
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(name=[$0], score=[$1], id=[$2])
++- LogicalFilter(condition=[<>($0, _UTF-16LE'Tom')])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[name, score, id], where=[<>(name, 'Tom')], 
changelogMode=[I,UB,UA])
++- TableSourceScan(table=[[default_catalog, default_database, src, 
filter=[]]], fields=[name, score, id], changelogMode=[I,UB,UA])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterOnPrimaryKeyOnly">
+    <Resource name="sql">
+      <![CDATA[
+select * from src where id > 5
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(name=[$0], score=[$1], id=[$2])
++- LogicalFilter(condition=[>($2, 5)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[name, score, id], where=[>(id, 5)], changelogMode=[I,UA])
++- DropUpdateBefore(changelogMode=[I,UA])
+   +- TableSourceScan(table=[[default_catalog, default_database, src, 
filter=[]]], fields=[name, score, id], changelogMode=[I,UB,UA])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterOneEntireUpsertKey">
+    <Resource name="sql">
+      <![CDATA[
+select * from src where score > 90 and name = 'Tom' and id > 0
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(score=[$0], note=[$1], name=[$2], id=[$3])
++- LogicalFilter(condition=[AND(>($0, 90), =($2, _UTF-16LE'Tom'), >($3, 0))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[score, note, 'Tom' AS name, id], where=[AND(>(score, 90), =(name, 
'Tom'), >(id, 0))], changelogMode=[I,UA])
++- DropUpdateBefore(changelogMode=[I,UA])
+   +- TableSourceScan(table=[[default_catalog, default_database, src, 
filter=[]]], fields=[score, note, name, id], changelogMode=[I,UB,UA])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterSubsetOfUpsertKey">
+    <Resource name="sql">
+      <![CDATA[
+select * from src where name <> 'Tom'
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(name=[$0], score=[$1], id=[$2])
++- LogicalFilter(condition=[<>($0, _UTF-16LE'Tom')])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[name, score, id], where=[<>(name, 'Tom')], changelogMode=[I,UA])
++- DropUpdateBefore(changelogMode=[I,UA])
+   +- TableSourceScan(table=[[default_catalog, default_database, src, 
filter=[]]], fields=[name, score, id], changelogMode=[I,UB,UA])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterSubsetOfUpsertKey2">
+    <Resource name="sql">
+      <![CDATA[
+select * from src where name <> 'Tom' and score > 90
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(score=[$0], note=[$1], name=[$2], id=[$3])
++- LogicalFilter(condition=[AND(<>($2, _UTF-16LE'Tom'), >($0, 90))])
+   +- LogicalTableScan(table=[[default_catalog, default_database, src]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[score, note, name, id], where=[AND(<>(name, 'Tom'), >(score, 
90))], changelogMode=[I,UA])
++- DropUpdateBefore(changelogMode=[I,UA])
+   +- TableSourceScan(table=[[default_catalog, default_database, src, 
filter=[]]], fields=[score, note, name, id], changelogMode=[I,UB,UA])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testGroupByWithUnion">
     <Resource name="sql">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
index b37bf494dcc..156890a9473 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala
@@ -20,6 +20,7 @@ package 
org.apache.flink.table.planner.plan.rules.physical.stream
 import org.apache.flink.table.api.ExplainDetail
 import org.apache.flink.table.api.config.{AggregatePhaseStrategy, 
OptimizerConfigOptions}
 import 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram
+import 
org.apache.flink.table.planner.utils.ImmutableColConstraintTestUtils.addImmutableColConstraint
 import org.apache.flink.table.planner.utils.TableTestBase
 
 import org.junit.jupiter.api.{BeforeEach, Test}
@@ -265,4 +266,165 @@ class ChangelogModeInferenceTest extends TableTestBase {
     util.verifyRelPlan(statementSet, ExplainDetail.CHANGELOG_MODE)
   }
 
+  @Test
+  def testFilterSubsetOfUpsertKey(): Unit = {
+    util.tableEnv.executeSql("""
+                               |create table src (
+                               | name string,
+                               | score int,
+                               | id int primary key not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB'
+                               |)
+                               |""".stripMargin)
+
+    val catalog = 
util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get()
+    addImmutableColConstraint(catalog, util.tableEnv.getCurrentDatabase, 
"src", "name")
+
+    val sql =
+      """
+        |select * from src where name <> 'Tom'
+        |""".stripMargin
+
+    // upsert keys: {{name, id}, {id}}
+    // filter references: {name}
+    // upsert key {name, id} contains {name}, so UB can be dropped
+    util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testFilterSubsetOfUpsertKey2(): Unit = {
+    util.tableEnv.executeSql("""
+                               |create table src (
+                               | score int,
+                               | note string,
+                               | name string,
+                               | id int,
+                               | primary key (name, id) not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB'
+                               |)
+                               |""".stripMargin)
+
+    val catalog = 
util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get()
+    addImmutableColConstraint(catalog, util.tableEnv.getCurrentDatabase, 
"src", "score")
+
+    val sql =
+      """
+        |select * from src where name <> 'Tom' and score > 90
+        |""".stripMargin
+
+    // upsert keys: {{score, name, id}, {name, id}}
+    // filter references: {score, name}
+    // upsert key {score, name, id} contains {score, name}, so UB can be 
dropped
+    util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testFilterNotContainedByAnyUpsertKey(): Unit = {
+    util.tableEnv.executeSql("""
+                               |create table src (
+                               | score int,
+                               | note string,
+                               | name string,
+                               | id int,
+                               | primary key (id) not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB'
+                               |)
+                               |""".stripMargin)
+
+    val catalog = 
util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get()
+    addImmutableColConstraint(catalog, util.tableEnv.getCurrentDatabase, 
"src", "score")
+
+    val sql =
+      """
+        |select * from src where name <> 'Tom' and score > 90
+        |""".stripMargin
+
+    // upsert keys: {{score, id}, {id}}
+    // filter references: {score, name}
+    // upsert key {score, id} or {id} does not contain {score, name}, so UB 
cannot be dropped
+    util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testFilterOneEntireUpsertKey(): Unit = {
+    util.tableEnv.executeSql("""
+                               |create table src (
+                               | score int,
+                               | note string,
+                               | name string,
+                               | id int,
+                               | primary key (name, id) not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB'
+                               |)
+                               |""".stripMargin)
+
+    val catalog = 
util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get()
+    addImmutableColConstraint(catalog, util.tableEnv.getCurrentDatabase, 
"src", "score")
+
+    val sql =
+      """
+        |select * from src where score > 90 and name = 'Tom' and id > 0
+        |""".stripMargin
+
+    // upsert keys: {{score, name, id}, {name, id}}
+    // filter references: {score, name, id}
+    // upsert key {score, name, id} fully contains {score, name, id}, so UB 
can be dropped
+    util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testFilterOnPrimaryKeyOnly(): Unit = {
+    util.tableEnv.executeSql("""
+                               |create table src (
+                               | name string,
+                               | score int,
+                               | id int primary key not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB'
+                               |)
+                               |""".stripMargin)
+
+    val sql =
+      """
+        |select * from src where id > 5
+        |""".stripMargin
+
+    // upsert keys: {{id}}
+    // filter references: {id}
+    // upsert key {id} fully contains {id}, so UB can be dropped
+    util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testFilterOnNonUpsertKeyColOnly(): Unit = {
+    util.tableEnv.executeSql("""
+                               |create table src (
+                               | name string,
+                               | score int,
+                               | id int primary key not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB'
+                               |)
+                               |""".stripMargin)
+
+    val sql =
+      """
+        |select * from src where name <> 'Tom'
+        |""".stripMargin
+
+    // upsert keys: {{id}}
+    // filter references: {name}
+    // upsert key {id} does not contain {name}, so UB cannot be dropped
+    util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE)
+  }
 }

Reply via email to