This is an automated email from the ASF dual-hosted git repository. xuyangzhong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2e49048680e452c3d0ecb981fe3e4fd685d6aacf Author: xuyang <[email protected]> AuthorDate: Tue Mar 24 19:30:54 2026 +0800 [FLINK-39314][table-planner] Allow input to drop update before if filter is applied on any of upsert keys --- .../FlinkChangelogModeInferenceProgram.scala | 10 +- .../physical/stream/ChangelogModeInferenceTest.xml | 124 ++++++++++++++++ .../stream/ChangelogModeInferenceTest.scala | 162 +++++++++++++++++++++ 3 files changed, 291 insertions(+), 5 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index c660f00ad5a..b11f9edc7c3 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.plan.optimize.program import org.apache.flink.legacy.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, StreamTableSink, UpsertStreamTableSink} import org.apache.flink.table.api.{TableException, ValidationException} -import org.apache.flink.table.api.InsertConflictStrategy import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize @@ -1559,10 +1558,11 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti // there are no upsert keys, so all columns are non-primary key columns true } else { - val upsertKey = upsertKeys.head - RexNodeExtractor - .extractRefInputFields(JavaScalaConversionUtil.toJava(Seq(condition))) - .exists(i => !upsertKey.get(i)) + val inputRefIndices = + RexNodeExtractor + .extractRefInputFields(JavaScalaConversionUtil.toJava(Seq(condition))) + val inputRefSet = ImmutableBitSet.of(inputRefIndices: _*) + !upsertKeys.stream().anyMatch(uk => uk.contains(inputRefSet)) } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml index 72ac683e619..071900e4bad 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml @@ -16,6 +16,130 @@ See the License for the specific language governing permissions and limitations under the License. --> <Root> + <TestCase name="testFilterNotContainedByAnyUpsertKey"> + <Resource name="sql"> + <![CDATA[ +select * from src where name <> 'Tom' and score > 90 +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(score=[$0], note=[$1], name=[$2], id=[$3]) ++- LogicalFilter(condition=[AND(<>($2, _UTF-16LE'Tom'), >($0, 90))]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[score, note, name, id], where=[AND(<>(name, 'Tom'), >(score, 90))], changelogMode=[I,UB,UA]) ++- TableSourceScan(table=[[default_catalog, default_database, src, filter=[]]], fields=[score, note, name, id], changelogMode=[I,UB,UA]) +]]> + </Resource> + </TestCase> + <TestCase name="testFilterOnNonUpsertKeyColOnly"> + <Resource name="sql"> + <![CDATA[ +select * from src where name <> 'Tom' +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(name=[$0], score=[$1], id=[$2]) ++- LogicalFilter(condition=[<>($0, _UTF-16LE'Tom')]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[name, score, id], where=[<>(name, 'Tom')], changelogMode=[I,UB,UA]) ++- TableSourceScan(table=[[default_catalog, default_database, src, filter=[]]], fields=[name, score, id], changelogMode=[I,UB,UA]) +]]> + </Resource> + </TestCase> + <TestCase name="testFilterOnPrimaryKeyOnly"> + <Resource name="sql"> + <![CDATA[ +select * from src where id > 5 +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(name=[$0], score=[$1], id=[$2]) ++- LogicalFilter(condition=[>($2, 5)]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[name, score, id], where=[>(id, 5)], changelogMode=[I,UA]) ++- DropUpdateBefore(changelogMode=[I,UA]) + +- TableSourceScan(table=[[default_catalog, default_database, src, filter=[]]], fields=[name, score, id], changelogMode=[I,UB,UA]) +]]> + </Resource> + </TestCase> + <TestCase name="testFilterOneEntireUpsertKey"> + <Resource name="sql"> + <![CDATA[ +select * from src where score > 90 and name = 'Tom' and id > 0 +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(score=[$0], note=[$1], name=[$2], id=[$3]) ++- LogicalFilter(condition=[AND(>($0, 90), =($2, _UTF-16LE'Tom'), >($3, 0))]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[score, note, 'Tom' AS name, id], where=[AND(>(score, 90), =(name, 'Tom'), >(id, 0))], changelogMode=[I,UA]) ++- DropUpdateBefore(changelogMode=[I,UA]) + +- TableSourceScan(table=[[default_catalog, default_database, src, filter=[]]], fields=[score, note, name, id], changelogMode=[I,UB,UA]) +]]> + </Resource> + </TestCase> + <TestCase name="testFilterSubsetOfUpsertKey"> + <Resource name="sql"> + <![CDATA[ +select * from src where name <> 'Tom' +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(name=[$0], score=[$1], id=[$2]) ++- LogicalFilter(condition=[<>($0, _UTF-16LE'Tom')]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[name, score, id], where=[<>(name, 'Tom')], changelogMode=[I,UA]) ++- DropUpdateBefore(changelogMode=[I,UA]) + +- TableSourceScan(table=[[default_catalog, default_database, src, filter=[]]], fields=[name, score, id], changelogMode=[I,UB,UA]) +]]> + </Resource> + </TestCase> + <TestCase name="testFilterSubsetOfUpsertKey2"> + <Resource name="sql"> + <![CDATA[ +select * from src where name <> 'Tom' and score > 90 +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(score=[$0], note=[$1], name=[$2], id=[$3]) ++- LogicalFilter(condition=[AND(<>($2, _UTF-16LE'Tom'), >($0, 90))]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[score, note, name, id], where=[AND(<>(name, 'Tom'), >(score, 90))], changelogMode=[I,UA]) ++- DropUpdateBefore(changelogMode=[I,UA]) + +- TableSourceScan(table=[[default_catalog, default_database, src, filter=[]]], fields=[score, note, name, id], changelogMode=[I,UB,UA]) +]]> + </Resource> + </TestCase> <TestCase name="testGroupByWithUnion"> <Resource name="sql"> <