This is an automated email from the ASF dual-hosted git repository. xuyangzhong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8d8bf958a9af385f244b9231e33ea169d9401ac5 Author: xuyang <[email protected]> AuthorDate: Thu Mar 26 19:48:30 2026 +0800 [FLINK-39339][table-planner] Consider immutable cols to infer sink required updated mode traits --- .../FlinkChangelogModeInferenceProgram.scala | 70 ++++++++---- .../physical/stream/ChangelogModeInferenceTest.xml | 46 ++++++++ .../stream/ChangelogModeInferenceTest.scala | 119 +++++++++++++++++++++ 3 files changed, 216 insertions(+), 19 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index b11f9edc7c3..56c9cb1262a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -998,8 +998,9 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti /** * Infer sink required traits by the sink node and its input. Sink required traits is based on - * the sink node's changelog mode, the only exception is when sink's pk(s) not exactly the same - * as the changeLogUpsertKeys and sink' changelog mode is ONLY_UPDATE_AFTER. + * the sink node's changelog mode, the only exception is when sink's pk(s) are not satisfied by + * the input's upsert keys (considering immutable columns) and sink's changelog mode is + * ONLY_UPDATE_AFTER. */ private def inferSinkRequiredTraits(sink: StreamPhysicalSink): Seq[UpdateKindTrait] = { val childModifyKindSet = getModifyKindSet(sink.getInput) @@ -1009,23 +1010,9 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti sink.tableSink.getChangelogMode(childModifyKindSet.toDefaultChangelogMode)) val sinkRequiredTraits = if (sinkTrait.equals(ONLY_UPDATE_AFTER)) { - // if sink's pk(s) are not exactly match input changeLogUpsertKeys then it will fallback - // to beforeAndAfter mode for the correctness - var requireBeforeAndAfter: Boolean = false - val sinkDefinedPks = sink.contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes - - if (sinkDefinedPks.nonEmpty) { - val sinkPks = ImmutableBitSet.of(sinkDefinedPks: _*) - val fmq = FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery) - val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput) - // if input is UA only, primary key != upsert key (upsert key can be null) we should - // fallback to beforeAndAfter. - // Notice: even sink pk(s) contains input upsert key we cannot optimize to UA only, - // this differs from batch job's unique key inference - if (changeLogUpsertKeys == null || !changeLogUpsertKeys.exists(_.equals(sinkPks))) { - requireBeforeAndAfter = true - } - } + // if sink's pk(s) are not satisfied by input upsert keys (considering immutable columns), + // fallback to beforeAndAfter mode for correctness + val requireBeforeAndAfter = !canUpsertKeysWithImmutableColsSatisfyPk(sink) if (requireBeforeAndAfter) { Seq(beforeAndAfter) } else { @@ -1039,6 +1026,51 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti sinkRequiredTraits } + /** + * Check whether input's upsert keys (together with immutable columns) can satisfy sink's + * primary keys. + * + * <p>A sink pk is considered "satisfied" when there exists an upsert key `uk` such that: + * - `uk` is a subset of sink pk (no extra columns that could cause key collision) + * - the remaining sink pk columns not in `uk` are all immutable (immutable columns never + * change, so they effectively act as part of the key for upsert semantics) + * + * <p>Example: sink pk = {a, b, c}, uk = {a, b}, immutable columns = {a, b, c, d}. + * - Step 1: uk {a, b} ⊆ sink pk {a, b, c} → true + * - Step 2: sink pk \ uk = {c}, immutable columns contain {c} → true + * - Result: satisfied + * + * <p>Notice: even if sink pk is a subset of the upsert key, the pk is NOT considered satisfied + * when the upsert key has columns outside sink pk. This differs from batch job's unique key + * inference. + */ + private def canUpsertKeysWithImmutableColsSatisfyPk(sink: StreamPhysicalSink): Boolean = { + val sinkDefinedPks = sink.contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes + if (sinkDefinedPks.isEmpty) { + return true + } + val sinkPks = ImmutableBitSet.of(sinkDefinedPks: _*) + val fmq = FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery) + val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput) + // if upsert key is null, pk cannot be satisfied, should fall back to beforeAndAfter + if (changeLogUpsertKeys == null) { + return false + } + val immutableCols = + Option.apply(fmq.getImmutableColumns(sink.getInput)).getOrElse(ImmutableBitSet.of()) + + // when input immutableCols is empty, this degrades to uk.equals(sinkPks) + changeLogUpsertKeys.exists( + uk => { + // 1. uk ⊆ sinkPks + val isSinkPkContainsUk = sinkPks.contains(uk) + // 2. (sinkPks \ uk) ⊆ immutableCols + val extraSinkPkCols = sinkPks.except(uk) + val areExtraSinkPkColsImmutable = immutableCols.contains(extraSinkPkCols) + isSinkPkContainsUk && areExtraSinkPkColsImmutable + }) + } + /** * Analyze whether to enable upsertMaterialize or not. In these case will return true: * 1. when `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to FORCE and sink's primary key nonempty. diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml index 071900e4bad..03c86424dd1 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml @@ -281,6 +281,37 @@ LogicalProject(word=[$0], number=[$1]) <Resource name="optimized rel plan"> <![CDATA[ LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I]) +]]> + </Resource> + </TestCase> + <TestCase name="testSinkPkCoveredByUpsertKeyAndImmutableCols"> + <Resource name="ast"> + <![CDATA[ +LogicalSink(table=[default_catalog.default_database.sink], fields=[name, score, detail, id]) ++- LogicalProject(name=[$0], score=[$1], detail=[$2], id=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Sink(table=[default_catalog.default_database.sink], fields=[name, score, detail, id], changelogMode=[NONE]) ++- DropUpdateBefore(changelogMode=[I,UA]) + +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[name, score, detail, id], changelogMode=[I,UB,UA]) +]]> + </Resource> + </TestCase> + <TestCase name="testSinkPkNotCoveredByUpsertKeyAndImmutableCols"> + <Resource name="ast"> + <![CDATA[ +LogicalSink(table=[default_catalog.default_database.sink], fields=[name, score, detail, id]) ++- LogicalProject(name=[$0], score=[$1], detail=[$2], id=[$3]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Sink(table=[default_catalog.default_database.sink], fields=[name, score, detail, id], changelogMode=[NONE]) ++- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[name, score, detail, id], changelogMode=[I,UB,UA]) ]]> </Resource> </TestCase> @@ -522,6 +553,21 @@ GlobalGroupAggregate(groupBy=[cnt], select=[cnt, COUNT_RETRACT(count1$0) AS freq +- LocalGroupAggregate(groupBy=[word], select=[word, COUNT(number) AS count$0], changelogMode=[I]) +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime], changelogMode=[I]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [CollectionTableSource(word, number)]]], fields=[word, number], changelogMode=[I]) +]]> + </Resource> + </TestCase> + <TestCase name="testUpsertKeyExceedsSinkPk"> + <Resource name="ast"> + <![CDATA[ +LogicalSink(table=[default_catalog.default_database.sink], fields=[name, score, id]) ++- LogicalProject(name=[$0], score=[$1], id=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, src]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Sink(table=[default_catalog.default_database.sink], fields=[name, score, id], upsertMaterialize=[true], conflictStrategy=[DEDUPLICATE], changelogMode=[NONE]) ++- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[name, score, id], changelogMode=[I,UB,UA]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala index 156890a9473..ce2dc3d4468 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.scala @@ -427,4 +427,123 @@ class ChangelogModeInferenceTest extends TableTestBase { // upsert key {id} does not contain {name}, so UB cannot be dropped util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) } + + @Test + def testSinkPkCoveredByUpsertKeyAndImmutableCols(): Unit = { + util.tableEnv.executeSql(""" + |create table src ( + | name string, + | score int, + | detail string, + | id int primary key not enforced + |) with ( + | 'connector' = 'values', + | 'changelog-mode' = 'I,UA,UB' + |) + |""".stripMargin) + + val catalog = util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get() + addImmutableColConstraint(catalog, util.tableEnv.getCurrentDatabase, "src", "name", "score") + + util.tableEnv.executeSql(""" + |create table sink ( + | name string, + | score int, + | detail string, + | id int, + | primary key (id, name) not enforced + |) with ( + | 'connector' = 'values', + | 'sink-insert-only' = 'false' + |) + |""".stripMargin) + + val statementSet = util.tableEnv.createStatementSet() + statementSet.addInsertSql("INSERT INTO sink SELECT * FROM src") + + // upsert keys of input: {{id}, {id, name, score}} + // immutable cols of input: {id, name , score} + // sink pk: {id, name} + // upsert key {id} is subset of sink pk, and {id} union immutable cols covers sink pk + // so ONLY_UPDATE_AFTER is safe + util.verifyRelPlan(statementSet, ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testSinkPkNotCoveredByUpsertKeyAndImmutableCols(): Unit = { + util.tableEnv.executeSql(""" + |create table src ( + | name string, + | score int, + | detail string, + | id int primary key not enforced + |) with ( + | 'connector' = 'values', + | 'changelog-mode' = 'I,UA,UB' + |) + |""".stripMargin) + + val catalog = util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get() + addImmutableColConstraint(catalog, util.tableEnv.getCurrentDatabase, "src", "name") + + util.tableEnv.executeSql(""" + |create table sink ( + | name string, + | score int, + | detail string, + | id int, + | primary key (id, name, score) not enforced + |) with ( + | 'connector' = 'values', + | 'sink-insert-only' = 'false' + |) + |""".stripMargin) + + val statementSet = util.tableEnv.createStatementSet() + statementSet.addInsertSql("INSERT INTO sink SELECT * FROM src") + + // upsert keys of input: {{id}, {id, name}} + // sink pk: {id, name, score} + // {id} union immutable {name} = {id, name}, does NOT cover {id, name, score} + // so BEFORE_AND_AFTER is required + util.verifyRelPlan(statementSet, ExplainDetail.CHANGELOG_MODE) + } + + @Test + def testUpsertKeyExceedsSinkPk(): Unit = { + util.tableEnv.executeSql(""" + |create table src ( + | name string, + | score int, + | id int, + | primary key (id, name) not enforced + |) with ( + | 'connector' = 'values', + | 'changelog-mode' = 'I,UA,UB' + |) + |""".stripMargin) + + val catalog = util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get() + addImmutableColConstraint(catalog, util.tableEnv.getCurrentDatabase, "src", "score") + + util.tableEnv.executeSql(""" + |create table sink ( + | name string, + | score int, + | id int, + | primary key (id) not enforced + |) with ( + | 'connector' = 'values', + | 'sink-insert-only' = 'false' + |) + |""".stripMargin) + + val statementSet = util.tableEnv.createStatementSet() + statementSet.addInsertSql("INSERT INTO sink SELECT * FROM src ON CONFLICT DO DEDUPLICATE") + + // upsert keys of input: {{id, name}, {id, name, score}} + // sink pk: {id} + // upsert key {id, name} is NOT a subset of sink pk {id}, so BEFORE_AND_AFTER is required + util.verifyRelPlan(statementSet, ExplainDetail.CHANGELOG_MODE) + } }
