This is an automated email from the ASF dual-hosted git repository.

gustavodemorais pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 32f9e60eb9d7c33bac4fb3f91dc0200f03507c0c
Author: Ramin Gharib <[email protected]>
AuthorDate: Tue May 26 12:59:44 2026 +0200

    [FLINK-39636][table] Emit partial DELETE rows in TO_CHANGELOG via upsert key
    
    Wires TO_CHANGELOG to consume the upsert key surfaced by TableSemantics so 
row-semantic partial DELETE rows preserve identifying columns without requiring 
PARTITION BY. Also flips the produces_full_deletes default to true so the safe 
full-pre-image behavior is the default, and false is the explicit opt-in to 
partial deletes.
---
 .../docs/sql/reference/queries/changelog.md        |  91 +++++++++----
 flink-python/pyflink/table/table.py                |  19 +--
 .../apache/flink/table/api/PartitionedTable.java   |  10 +-
 .../java/org/apache/flink/table/api/Table.java     |  10 +-
 .../functions/BuiltInFunctionDefinitions.java      |  20 ++-
 .../strategies/ToChangelogTypeStrategy.java        |  54 ++++----
 .../ToChangelogInputTypeStrategyTest.java          |  75 +++++------
 .../inference/OperatorBindingCallContext.java      |   7 +-
 .../exec/stream/ToChangelogSemanticTests.java      |  14 +-
 .../nodes/exec/stream/ToChangelogTestPrograms.java |  81 ++++++------
 .../runtime/functions/ptf/ToChangelogFunction.java | 144 +++++++++++++++------
 11 files changed, 321 insertions(+), 204 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
index 7a1598ee850..c91107b2df2 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -308,7 +308,7 @@ SELECT * FROM TO_CHANGELOG(
 | `input`                 | Yes      | The input table. With `PARTITION BY`, 
rows with the same key are co-located and run in the same operator instance. 
Without `PARTITION BY`, each row is processed independently. Accepts 
insert-only, retract, and upsert tables. For upsert tables, the provided 
`PARTITION BY` key should match or be a subset of the upsert key of the 
subquery.       |
 | `op`                    | No       | A `DESCRIPTOR` with a single column 
name for the operation code column. Defaults to `op`.                           
                                                                                
                                                                                
                                                                     |
 | `op_mapping`            | No       | A `MAP<STRING, STRING>` mapping change 
operation names to custom output codes. Keys can contain comma-separated names 
to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). 
When provided, only mapped operations are forwarded - unmapped events are 
dropped. Each change operation may appear at most once across all entries. |
-| `produces_full_deletes` | No       | A `BOOLEAN` literal that controls how 
DELETE rows are emitted. When `true`, the function requires fully-populated 
DELETE rows from the input. The planner inserts a `ChangelogNormalize` operator 
for upsert sources that emit key-only deletes, so downstream sees the full 
pre-image on DELETE. When `false` (default), no full-delete requirement is 
enforced. Partial DELETE rows from the input pass through unchanged. With 
`PARTITION BY` (set semantics), the [...]
+| `produces_full_deletes` | No       | A `BOOLEAN` literal that controls how 
DELETE rows are emitted. When `true` (default), DELETE rows carry all columns, 
the full image. When `false`, only the identifying key columns are preserved 
and the rest are nulled. See [Full vs partial 
deletes](#full-vs-partial-deletes) for more details. |
 
 #### Default op_mapping
 
@@ -399,41 +399,89 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
-#### Delete handling
+#### Upsert key
 
-The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The behavior depends on whether 
`PARTITION BY` is used (set semantics) or not (row semantics).
+An **upsert key** is a column or set of columns that uniquely identifies a row 
across its lifecycle in a changelog. It is what downstream operators and sinks 
use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to.
 
-**With `produces_full_deletes => true`.** The planner requires the input to 
produce DELETE rows with all columns populated. For upsert sources, a 
`ChangelogNormalize` operator is inserted to materialize the full pre-image 
from state. The function then emits fully-populated DELETE rows downstream.
+The planner derives the upsert key from the input table:
+
+* A declared `PRIMARY KEY` on the source table when reading directly.
+* The grouping columns of an upstream `GROUP BY <key>`.
+* The keys propagated by operators that preserve them (e.g. lookup joins, 
calc-projections that keep the key columns).
+
+When no upsert key can be derived (e.g. a plain append-only source with no key 
constraint and no grouping upstream), the input has no row identity and 
downstream operators must treat it as append-only or fall back to retract 
semantics.
+
+`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve 
when emitting partial DELETE rows. See [Full vs partial 
deletes](#full-vs-partial-deletes) below.
+
+#### Full vs partial deletes
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The matrix below shows each 
combination with `PARTITION BY` (set semantics) and without (row semantics). 
When `false`, the function relies on the input table's [upsert 
key](#upsert-key) to decide which columns to preserve.
+
+##### `produces_full_deletes => true` (default)
+
+The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to materialize the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.
+
+**Row semantics** (no `PARTITION BY`):
 
 ```sql
 -- Upsert source: -D[id:5] (key-only).
 -- ChangelogNormalize materializes the full pre-image from state.
 -- Output: +I[op:'DELETE', id:5, name:'Alice']
-SELECT * FROM TO_CHANGELOG(
-  input => TABLE upsert_source,
-  produces_full_deletes => true
-)
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
 ```
 
-**With `produces_full_deletes => false` (default).** The planner does not 
require fully-populated DELETE rows on the input. For upsert sources that emit 
key-only deletes (e.g. Kafka compacted topics), this avoids the stateful 
`ChangelogNormalize` operator that would otherwise materialize the full 
pre-image of each deleted row.
+**Set semantics** (`PARTITION BY`):
 
-In **row semantics** (no `PARTITION BY`) the function passes the input row 
through unchanged. If the source emits partial DELETE rows they remain partial 
downstream; if it emits full DELETE rows they remain full.
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
+
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- No ChangelogNormalize inserted; the input row is passed through unchanged.
+-- Output: +I[id:5, op:'DELETE', name:'Alice']
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+##### `produces_full_deletes => false`
+
+The planner skips `ChangelogNormalize` and the function emits partial DELETE 
rows. This avoids the stateful normalization operator for upsert sources (e.g. 
Kafka compacted topics) where the full pre-image is not needed downstream. 
Requires an [upsert key](#upsert-key) to be present for the input table (row 
semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected 
with a validation error.
+
+**Row semantics** (no `PARTITION BY`): the function preserves the 
planner-derived upsert key columns on DELETE rows and nulls the rest. The 
upsert key is typically a declared `PRIMARY KEY` when directly reading from a 
source or the key provided in a `GROUP BY <key>`.
 
 ```sql
--- Source emits -D[id:5] (key-only).
+-- Upsert source with PRIMARY KEY (id): -D[id:5] (key-only).
 -- Output: +I[op:'DELETE', id:5, name:null]
-SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes 
=> false)
+
+-- Retract source with PRIMARY KEY (id): -D[id:5, name:'Alice'] (full 
pre-image).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source, 
produces_full_deletes => false)
 ```
 
-In **set semantics** (`PARTITION BY`) the function additionally nulls every 
non-partition-key column on DELETE rows. This forces the output to carry only 
the partition key on DELETE even when the input row was fully populated, which 
matches the shape expected by upsert sinks and Kafka compacted topics.
+**Set semantics** (`PARTITION BY`): the function preserves the partition key 
and nulls every non-partition-key column on DELETE rows. The key used as the 
partition-key column should be the unique key that will be used as the record 
identifier. This matches the shape expected by upsert sinks and Kafka compacted 
topics.
 
 ```sql
--- Source emits -D[id:5, name:'Alice'] (full pre-image, e.g. from a retract 
source).
+-- Upsert source: -D[id:5] (key-only).
 -- Output: +I[id:5, op:'DELETE', name:null]
-SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
-```
+SELECT * FROM TO_CHANGELOG(
+  input => TABLE upsert_source PARTITION BY id,
+  produces_full_deletes => false
+)
 
-There is no way to derive a partial DELETE in row semantics when the input 
emits a full pre-image, since the function has no key column to preserve. Use 
`PARTITION BY` for that case.
+-- Retract source: -D[id:5, name:'Alice'] (full pre-image).
+-- Output: +I[id:5, op:'DELETE', name:null]
+SELECT * FROM TO_CHANGELOG(
+  input => TABLE retract_source PARTITION BY id,
+  produces_full_deletes => false
+)
+```
 
 #### Partitioning by a key
 
@@ -472,12 +520,11 @@ Table result = myTable.toChangelog(
     map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
 );
 
-// Require fully-populated DELETE rows from the input (inserts a 
ChangelogNormalize for
-// upsert sources). When false (default), no full-delete requirement is 
enforced; in row
-// semantics the input passes through unchanged, in set semantics 
non-partition-key columns
-// are nulled on DELETE.
+// Opt out of full-delete semantics. When `true` (default), DELETE rows carry 
the full
+// pre-image. When `false`, only the identifying key columns are preserved and 
the rest
+// are nulled. See [Full vs partial deletes](#full-vs-partial-deletes) for 
more details.
 Table result = myTable.toChangelog(
-    lit(true).asArgument("produces_full_deletes")
+    lit(false).asArgument("produces_full_deletes")
 );
 
 // Set semantics: co-locate rows with the same key in the same parallel 
operator instance.
diff --git a/flink-python/pyflink/table/table.py 
b/flink-python/pyflink/table/table.py
index bb06642db9e..e7898e8752d 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -1197,12 +1197,13 @@ class Table(object):
         (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE).
 
         The optional ``produces_full_deletes`` boolean controls how DELETE 
rows are
-        emitted. When ``True``, the planner inserts a ``ChangelogNormalize`` 
operator
-        for upsert sources that emit key-only deletes so the function emits 
fully
-        populated DELETE rows downstream. When ``False`` (default), no 
full-delete
-        requirement is enforced. In row semantics the input is passed through 
unchanged,
-        and in set semantics (``PARTITION BY``) non-partition-key columns are 
nulled on
-        DELETE rows.
+        emitted. When ``True`` (default), the planner inserts a 
``ChangelogNormalize``
+        operator for upsert sources that emit key-only deletes so the function 
emits
+        fully populated DELETE rows downstream. When ``False``, the function 
emits
+        partial DELETE rows: row semantics preserves the planner-derived 
upsert key
+        columns and nulls the rest, set semantics (``PARTITION BY``) preserves 
the
+        partition key and nulls the rest. Requires an upsert key or 
``PARTITION BY``;
+        otherwise the call is rejected.
 
         Example:
         ::
@@ -1221,10 +1222,10 @@ class Table(object):
             ...     map_("INSERT, UPDATE_AFTER", "false",
             ...          "DELETE", "true").as_argument("op_mapping")
             ... )
-            >>> # Require fully populated DELETE rows from the input. Inserts a
-            >>> # ChangelogNormalize for upsert sources.
+            >>> # Opt out of full-delete semantics to emit partial DELETE rows.
+            >>> # Requires an upsert key or PARTITION BY; otherwise rejected.
             >>> result = table.to_changelog(
-            ...     lit(True).as_argument("produces_full_deletes")
+            ...     lit(False).as_argument("produces_full_deletes")
             ... )
 
         :param arguments: Optional named arguments for ``op``, ``op_mapping``, 
and
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
index 074bdfdbaef..401317d125d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
@@ -205,12 +205,14 @@ public interface PartitionedTable {
      *         map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
      *     );
      *
-     * // Require fully-populated DELETE rows from the input (inserts a 
ChangelogNormalize for
-     * // upsert sources). When false (default), DELETE rows on upsert inputs 
may omit non-key
-     * // columns, which avoids the stateful normalization operator upstream.
+     * // Opt out of full-delete semantics. When `true` (default), DELETE rows 
carry the full
+     * // pre-image. When `false`, only the identifying key columns are 
preserved and the rest
+     * // are nulled. See [Full vs partial deletes](
+     * // 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes)
+     * // for more details.
      * Table result = table
      *     .partitionBy($("id"))
-     *     .toChangelog(lit(true).asArgument("produces_full_deletes"));
+     *     .toChangelog(lit(false).asArgument("produces_full_deletes"));
      * }</pre>
      *
      * @param arguments optional named arguments for {@code op}, {@code 
op_mapping}, and {@code
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index de6ce9d3e5e..ab4a617a2df 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1460,11 +1460,13 @@ public interface Table extends Explainable<Table>, 
Executable {
      *     map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
      * );
      *
-     * // Require fully-populated DELETE rows from the input (inserts a 
ChangelogNormalize for
-     * // upsert sources). When false (default), no full-delete requirement is 
enforced and partial
-     * // DELETE rows from the input pass through unchanged.
+     * // Opt out of full-delete semantics. When `true` (default), DELETE rows 
carry the full
+     * // pre-image. When `false`, only the identifying key columns are 
preserved and the rest
+     * // are nulled. See [Full vs partial deletes](
+     * // 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes)
+     * // for more details.
      * Table result = table.toChangelog(
-     *     lit(true).asArgument("produces_full_deletes")
+     *     lit(false).asArgument("produces_full_deletes")
      * );
      * }</pre>
      *
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 19b8219e98b..0b9ea76b175 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -839,14 +839,20 @@ public final class BuiltInFunctionDefinitions {
                                                                             
"UPDATE_BEFORE"))))
                                     .withConditionalTrait(
                                             
StaticArgumentTrait.REQUIRE_FULL_DELETE,
-                                            // Require full deletes only when 
the user explicitly
-                                            // asks for them via 
produces_full_deletes=TRUE *and*
-                                            // the active op_mapping includes 
DELETE. Otherwise the
-                                            // planner can skip 
ChangelogNormalize for upsert
-                                            // sources that emit key-only 
deletes.
+                                            // Require full deletes by 
default. The user can opt
+                                            // out via 
produces_full_deletes=FALSE.
+                                            // REQUIRE_FULL_DELETE
+                                            // still gates on the active 
op_mapping mapping DELETE;
+                                            // otherwise no DELETE rows reach 
the function and there
+                                            // is no point inserting 
ChangelogNormalize upstream.
                                             TraitCondition.and(
-                                                    
TraitCondition.argIsEqualTo(
-                                                            
"produces_full_deletes", Boolean.TRUE),
+                                                    TraitCondition.or(
+                                                            TraitCondition.not(
+                                                                    
TraitCondition.argIsPresent(
+                                                                            
"produces_full_deletes")),
+                                                            
TraitCondition.argIsEqualTo(
+                                                                    
"produces_full_deletes",
+                                                                    
Boolean.TRUE)),
                                                     TraitCondition.or(
                                                             TraitCondition.not(
                                                                     
TraitCondition.argIsPresent(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
index d406875aa2f..4c27aae4723 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.InputTypeStrategy;
 import org.apache.flink.table.types.inference.TypeStrategy;
 import org.apache.flink.types.ColumnList;
+import org.apache.flink.types.RowKind;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -52,6 +53,8 @@ public final class ToChangelogTypeStrategy {
     private static final Set<String> VALID_ROW_KIND_NAMES =
             Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
 
+    private static final String DELETE = RowKind.DELETE.name();
+
     // 
--------------------------------------------------------------------------------------------
     // Input validation
     // 
--------------------------------------------------------------------------------------------
@@ -152,7 +155,8 @@ public final class ToChangelogTypeStrategy {
 
         final Optional<Map> opMapping = 
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
         if (opMapping.isPresent()) {
-            return validateOpMappingKeys(callContext, opMapping.get(), 
throwOnFailure);
+            return validateOpMappingKeys(
+                    callContext, (Map<String, String>) opMapping.get(), 
throwOnFailure);
         }
         return Optional.empty();
     }
@@ -163,16 +167,13 @@ public final class ToChangelogTypeStrategy {
      * trimmed. Names are case-sensitive and must match exactly (e.g., {@code 
INSERT}, not {@code
      * insert}). Each name must be valid and appear at most once across all 
entries.
      */
-    @SuppressWarnings("rawtypes")
     private static Optional<List<DataType>> validateOpMappingKeys(
-            final CallContext callContext, final Map opMapping, final boolean 
throwOnFailure) {
+            final CallContext callContext,
+            final Map<String, String> opMapping,
+            final boolean throwOnFailure) {
         final Set<String> allRowKindsSeen = new HashSet<>();
-        for (final Object key : opMapping.keySet()) {
-            if (!(key instanceof String)) {
-                return callContext.fail(
-                        throwOnFailure, "Invalid target mapping for argument 
'op_mapping'.");
-            }
-            final String[] rowKindNames = ((String) key).split(",");
+        for (final String key : opMapping.keySet()) {
+            final String[] rowKindNames = key.split(",");
             for (final String rawName : rowKindNames) {
                 final String rowKindName = rawName.trim();
                 if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) {
@@ -180,7 +181,7 @@ public final class ToChangelogTypeStrategy {
                             throwOnFailure,
                             String.format(
                                     "Invalid target mapping for argument 
'op_mapping'. "
-                                            + "Unknown change operation: '%s'. 
Valid values are: %s.",
+                                            + "Unknown change operation: '%s'. 
Operations are case-sensitive. Valid values are: %s.",
                                     rowKindName, VALID_ROW_KIND_NAMES));
                 }
                 final boolean isDuplicate = !allRowKindsSeen.add(rowKindName);
@@ -200,24 +201,27 @@ public final class ToChangelogTypeStrategy {
     @SuppressWarnings("rawtypes")
     private static Optional<List<DataType>> validateProducesFullDeletes(
             final CallContext callContext, final boolean throwOnFailure) {
-        final boolean hasArgProvided = 
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
-        if (hasArgProvided && 
!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
+        final boolean isExplicit = 
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
+        if (!isExplicit) {
+            return Optional.empty();
+        }
+        if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
             return callContext.fail(
                     throwOnFailure,
                     "The 'produces_full_deletes' argument must be a constant 
BOOLEAN literal.");
         }
         final boolean producesFullDeletes =
-                callContext
-                        .getArgumentValue(ARG_PRODUCES_FULL_DELETES, 
Boolean.class)
-                        .orElse(false);
+                callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, 
Boolean.class).orElse(true);
         if (!producesFullDeletes) {
             return Optional.empty();
         }
         // The check against the input changelog mode lives in the function 
constructor since
         // TableSemantics#changelogMode() returns empty here at type-inference 
time. The mapping
-        // check below only needs the literal op_mapping argument, so it lives 
here.
+        // check below only needs the literal op_mapping argument, so it lives 
here. Only runs
+        // when the user explicitly set produces_full_deletes=true; the 
default true is not
+        // validated since it is a safe no-op for any input.
         final Optional<Map> opMapping = 
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
-        if (opMapping.isPresent() && !mapsDelete(opMapping.get())) {
+        if (opMapping.isPresent() && !mapsDelete((Map<String, String>) 
opMapping.get())) {
             return callContext.fail(
                     throwOnFailure,
                     "Invalid 'produces_full_deletes' for TO_CHANGELOG: the 
active 'op_mapping' "
@@ -229,17 +233,13 @@ public final class ToChangelogTypeStrategy {
     }
 
     /**
-     * Returns {@code true} when at least one {@code op_mapping} key 
references {@code DELETE}.
-     * Keys may be comma-separated (e.g., {@code "INSERT, DELETE"}) per the 
user-facing contract.
+     * Returns {@code true} when at least one {@code op_mapping} key 
references {@code DELETE}. Keys
+     * may be comma-separated (e.g., {@code "INSERT, DELETE"}) per the 
user-facing contract.
      */
-    @SuppressWarnings("rawtypes")
-    private static boolean mapsDelete(final Map opMapping) {
-        for (final Object key : opMapping.keySet()) {
-            if (!(key instanceof String)) {
-                continue;
-            }
-            for (final String rawName : ((String) key).split(",")) {
-                if ("DELETE".equals(rawName.trim())) {
+    private static boolean mapsDelete(final Map<String, String> opMapping) {
+        for (final String key : opMapping.keySet()) {
+            for (final String rawName : key.split(",")) {
+                if (DELETE.equals(rawName.trim())) {
                     return true;
                 }
             }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java
index e9b20a7a5ba..6b5773949db 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java
@@ -28,6 +28,10 @@ import java.util.Map;
 import java.util.stream.Stream;
 
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TO_CHANGELOG_INPUT_TYPE_STRATEGY;
+import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP;
+import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP_MAPPING;
+import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_PRODUCES_FULL_DELETES;
+import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_TABLE;
 
 /** Tests for {@link ToChangelogTypeStrategy#INPUT_TYPE_STRATEGY}. */
 class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase {
@@ -52,12 +56,11 @@ class ToChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                 TO_CHANGELOG_INPUT_TYPE_STRATEGY)
                         .calledWithArgumentTypes(
                                 TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
-                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of("op"))
-                        .calledWithLiteralAt(2, null)
-                        .calledWithLiteralAt(3, true)
-                        .expectArgumentTypes(
-                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE),
+                        .calledWithTableSemanticsAt(ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, null)
+                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true)
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE),
 
                 // Valid: produces_full_deletes=true with op_mapping that 
includes DELETE
                 TestSpec.forStrategy(
@@ -65,12 +68,11 @@ class ToChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                 TO_CHANGELOG_INPUT_TYPE_STRATEGY)
                         .calledWithArgumentTypes(
                                 TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
-                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of("op"))
-                        .calledWithLiteralAt(2, Map.of("INSERT", "I", 
"DELETE", "D"))
-                        .calledWithLiteralAt(3, true)
-                        .expectArgumentTypes(
-                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE),
+                        .calledWithTableSemanticsAt(ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT", 
"I", "DELETE", "D"))
+                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true)
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE),
 
                 // Valid: produces_full_deletes=true with comma-separated 
DELETE key
                 TestSpec.forStrategy(
@@ -78,12 +80,11 @@ class ToChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                 TO_CHANGELOG_INPUT_TYPE_STRATEGY)
                         .calledWithArgumentTypes(
                                 TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
-                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of("op"))
-                        .calledWithLiteralAt(2, Map.of("INSERT, DELETE", "X"))
-                        .calledWithLiteralAt(3, true)
-                        .expectArgumentTypes(
-                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE),
+                        .calledWithTableSemanticsAt(ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT, 
DELETE", "X"))
+                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true)
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE),
 
                 // Valid: produces_full_deletes=false with op_mapping that 
omits DELETE
                 TestSpec.forStrategy(
@@ -91,12 +92,11 @@ class ToChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                 TO_CHANGELOG_INPUT_TYPE_STRATEGY)
                         .calledWithArgumentTypes(
                                 TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
-                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of("op"))
-                        .calledWithLiteralAt(2, Map.of("INSERT, UPDATE_AFTER", 
"X"))
-                        .calledWithLiteralAt(3, false)
-                        .expectArgumentTypes(
-                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE),
+                        .calledWithTableSemanticsAt(ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT, 
UPDATE_AFTER", "X"))
+                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, false)
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE),
 
                 // Error: produces_full_deletes=true with op_mapping that 
strips DELETE
                 TestSpec.forStrategy(
@@ -104,22 +104,22 @@ class ToChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                 TO_CHANGELOG_INPUT_TYPE_STRATEGY)
                         .calledWithArgumentTypes(
                                 TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
-                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of("op"))
-                        .calledWithLiteralAt(2, Map.of("INSERT, UPDATE_AFTER", 
"X"))
-                        .calledWithLiteralAt(3, true)
+                        .calledWithTableSemanticsAt(ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT, 
UPDATE_AFTER", "X"))
+                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true)
                         .expectErrorMessage(
                                 "Invalid 'produces_full_deletes' for 
TO_CHANGELOG: the active "
                                         + "'op_mapping' does not map DELETE 
rows"),
 
-                // Error: multi-column descriptor
+                // Error: multi-column descriptor for `op`
                 TestSpec.forStrategy(
                                 "Descriptor with multiple columns",
                                 TO_CHANGELOG_INPUT_TYPE_STRATEGY)
                         .calledWithArgumentTypes(
                                 TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
-                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of("a", "b"))
+                        .calledWithTableSemanticsAt(ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("a", "b"))
                         .expectErrorMessage("must contain exactly one column 
name"),
 
                 // Error: invalid RowKind in op_mapping key
@@ -127,9 +127,9 @@ class ToChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                 "Invalid RowKind in mapping key", 
TO_CHANGELOG_INPUT_TYPE_STRATEGY)
                         .calledWithArgumentTypes(
                                 TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
-                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of("op"))
-                        .calledWithLiteralAt(2, Map.of("INVALID_KIND", "X"))
+                        .calledWithTableSemanticsAt(ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, 
Map.of("INVALID_KIND", "X"))
                         .expectErrorMessage("Unknown change operation: 
'INVALID_KIND'"),
 
                 // Error: duplicate RowKind across entries
@@ -138,9 +138,10 @@ class ToChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                 TO_CHANGELOG_INPUT_TYPE_STRATEGY)
                         .calledWithArgumentTypes(
                                 TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
-                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of("op"))
-                        .calledWithLiteralAt(2, Map.of("INSERT, DELETE", "A", 
"DELETE", "B"))
+                        .calledWithTableSemanticsAt(ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(
+                                ARG_OP_MAPPING, Map.of("INSERT, DELETE", "A", 
"DELETE", "B"))
                         .expectErrorMessage("Duplicate change operation: 
'DELETE'"));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
index 7d1d3940899..8f05d7e2718 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
@@ -195,10 +195,11 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
                 Optional.ofNullable(inputChangelogModes)
                         .map(m -> m.get(tableArgCall.getInputIndex()))
                         .orElse(null);
+        final int inputIndex = tableArgCall.getInputIndex();
         final List<int[]> upsertKeys =
-                Optional.ofNullable(inputUpsertKeys)
-                        .map(m -> m.get(tableArgCall.getInputIndex()))
-                        .orElse(List.of());
+                inputUpsertKeys != null && inputIndex < inputUpsertKeys.size()
+                        ? inputUpsertKeys.get(inputIndex)
+                        : List.of();
         return Optional.of(
                 OperatorBindingTableSemantics.create(
                         argumentDataTypes.get(pos),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
index b3376b3a346..41c53979067 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -56,12 +56,12 @@ public class ToChangelogSemanticTests extends 
SemanticTestBase {
                 ToChangelogTestPrograms.INVALID_OP_MAPPING,
                 ToChangelogTestPrograms.OP_MAPPING_REFERENCES_UNSUPPORTED_KIND,
                 ToChangelogTestPrograms.DUPLICATE_ROW_KIND,
-                
ToChangelogTestPrograms.PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT,
-                ToChangelogTestPrograms.ROW_SEM_PARTIAL_DELETES,
-                ToChangelogTestPrograms.ROW_SEM_FORCE_FULL_DELETES,
-                ToChangelogTestPrograms.SET_SEM_PARTIAL_DELETES,
-                ToChangelogTestPrograms.SET_SEM_FULL_DELETES,
-                ToChangelogTestPrograms.SET_SEM_FORCE_FULL_DELETES,
-                ToChangelogTestPrograms.SET_SEM_FORCE_PARTIAL_DELETES);
+                
ToChangelogTestPrograms.INVALID_PRODUCES_FULL_DELETES_FOR_APPEND_ONLY,
+                ToChangelogTestPrograms.RETRACT_PRODUCES_PARTIAL_DELETES,
+                ToChangelogTestPrograms.UPSERT_PRODUCES_FULL_DELETES,
+                ToChangelogTestPrograms.UPSERT_PRODUCES_PARTIAL_DELETES,
+                
ToChangelogTestPrograms.RETRACT_PARTITION_BY_PRODUCES_PARTIAL_DELETES,
+                
ToChangelogTestPrograms.RETRACT_PARTITION_BY_PRODUCES_FULL_DELETES,
+                
ToChangelogTestPrograms.UPSERT_PARTITION_BY_PRODUCES_FULL_DELETES);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index 5126014b9a3..909e2f4d1ef 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -187,8 +187,7 @@ public class ToChangelogTestPrograms {
     public static final TableTestProgram UPSERT_PARTITION_BY =
             TableTestProgram.of(
                             "to-changelog-upsert-partition-by",
-                            "PARTITION BY upsert key + mapping without UB 
skips ChangelogNormalize; "
-                                    + "default produces_full_deletes=false 
nulls non-key columns on DELETE")
+                            "PARTITION BY upsert key + mapping without UB 
skips ChangelogNormalize")
                     .setupTableSource(
                             SourceTestStep.newBuilder("t")
                                     .addSchema(
@@ -207,7 +206,7 @@ public class ToChangelogTestPrograms {
                                             "+I[Alice, C, 10]",
                                             "+I[Bob, C, 20]",
                                             "+I[Alice, C, 30]",
-                                            "+I[Bob, D, null]")
+                                            "+I[Bob, D, 20]")
                                     .build())
                     .runSql(
                             "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
@@ -600,9 +599,9 @@ public class ToChangelogTestPrograms {
                             "Duplicate change operation: 'DELETE'")
                     .build();
 
-    public static final TableTestProgram 
PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT =
+    public static final TableTestProgram 
INVALID_PRODUCES_FULL_DELETES_FOR_APPEND_ONLY =
             TableTestProgram.of(
-                            
"to-changelog-produces-full-deletes-on-append-only-input",
+                            
"to-changelog-invalid-produces-full-deletes-for-append-only",
                             "fails when produces_full_deletes=true on an input 
that never emits DELETE rows")
                     .setupTableSource(SIMPLE_SOURCE)
                     .runFailingSql(
@@ -613,15 +612,14 @@ public class ToChangelogTestPrograms {
                             "the input table only produces [INSERT] and never 
emits DELETE rows")
                     .build();
 
-
     // 
--------------------------------------------------------------------------------------------
-    // Row semantics x delete handling matrix
+    // Full vs partial deletes matrix (input kind x PARTITION BY x 
produces_full_deletes)
     // 
--------------------------------------------------------------------------------------------
 
-    public static final TableTestProgram ROW_SEM_PARTIAL_DELETES =
+    public static final TableTestProgram RETRACT_PRODUCES_PARTIAL_DELETES =
             TableTestProgram.of(
-                            "to-changelog-row-sem-partial-deletes",
-                            "row semantics: produces_full_deletes=false skips 
ChangelogNormalize and a partial DELETE row from the input passes through 
unchanged")
+                            "to-changelog-retract-produces-partial-deletes",
+                            "retract input in row semantics with 
produces_full_deletes=false: skips ChangelogNormalize and the partial DELETE 
row from the input passes through unchanged")
                     .setupTableSource(
                             SourceTestStep.newBuilder("t")
                                     .addSchema(
@@ -650,10 +648,10 @@ public class ToChangelogTestPrograms {
                                     + "produces_full_deletes => false)")
                     .build();
 
-    public static final TableTestProgram ROW_SEM_FORCE_FULL_DELETES =
+    public static final TableTestProgram UPSERT_PRODUCES_FULL_DELETES =
             TableTestProgram.of(
-                            "to-changelog-row-sem-force-full-deletes",
-                            "row semantics: produces_full_deletes=true forces 
ChangelogNormalize to materialize the full DELETE row from an upsert source 
emitting key-only deletes")
+                            "to-changelog-upsert-produces-full-deletes",
+                            "upsert input in row semantics with 
produces_full_deletes=true: ChangelogNormalize materializes the full DELETE row 
from a key-only delete")
                     .setupTableSource(
                             SourceTestStep.newBuilder("t")
                                     .addSchema(
@@ -676,44 +674,37 @@ public class ToChangelogTestPrograms {
                                     + "produces_full_deletes => true)")
                     .build();
 
-    // 
--------------------------------------------------------------------------------------------
-    // Set semantics x delete handling matrix
-    // 
--------------------------------------------------------------------------------------------
-
-    public static final TableTestProgram SET_SEM_FORCE_PARTIAL_DELETES =
+    public static final TableTestProgram UPSERT_PRODUCES_PARTIAL_DELETES =
             TableTestProgram.of(
-                            "to-changelog-set-sem-force-partial-deletes",
-                            "set semantics: produces_full_deletes=false nulls 
non-partition-key columns on DELETE even when the input row is fully populated")
+                            "to-changelog-upsert-produces-partial-deletes",
+                            "upsert input in row semantics with single-column 
upsert key + "
+                                    + "produces_full_deletes=false: DELETE 
preserves the upsert key "
+                                    + "column and nulls the rest without 
requiring PARTITION BY")
                     .setupTableSource(
                             SourceTestStep.newBuilder("t")
                                     .addSchema(
                                             "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
-                                    .addMode(ChangelogMode.all())
+                                    .addMode(ChangelogMode.upsert())
                                     .producedValues(
                                             Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
-                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
-                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
-                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                            Row.ofKind(RowKind.DELETE, 
"Alice", 10L))
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink")
-                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
                                     .consumedValues(
-                                            "+I[Alice, INSERT, 10]",
-                                            "+I[Bob, INSERT, 20]",
-                                            "+I[Alice, UPDATE_AFTER, 30]",
-                                            "+I[Bob, DELETE, null]")
+                                            "+I[INSERT, Alice, 10]", 
"+I[DELETE, Alice, null]")
                                     .build())
                     .runSql(
                             "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
-                                    + "input => TABLE t PARTITION BY name,"
+                                    + "input => TABLE t, "
                                     + "produces_full_deletes => false)")
                     .build();
 
-    public static final TableTestProgram SET_SEM_PARTIAL_DELETES =
+    public static final TableTestProgram 
RETRACT_PARTITION_BY_PRODUCES_PARTIAL_DELETES =
             TableTestProgram.of(
-                            "to-changelog-set-sem-partial-deletes",
-                            "set semantics: produces_full_deletes=false 
(default) lets a partial DELETE row from the input pass through with 
non-partition-key columns null")
+                            
"to-changelog-retract-partition-by-produces-partial-deletes",
+                            "retract input in set semantics with 
produces_full_deletes=false: nulls non-partition-key columns on DELETE even 
when the input row is fully populated")
                     .setupTableSource(
                             SourceTestStep.newBuilder("t")
                                     .addSchema(
@@ -721,23 +712,29 @@ public class ToChangelogTestPrograms {
                                     .addMode(ChangelogMode.all())
                                     .producedValues(
                                             Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
-                                            Row.ofKind(RowKind.DELETE, 
"Alice", null))
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink")
                                     .addSchema("name STRING", "op STRING", 
"score BIGINT")
                                     .consumedValues(
-                                            "+I[Alice, INSERT, 10]", 
"+I[Alice, DELETE, null]")
+                                            "+I[Alice, INSERT, 10]",
+                                            "+I[Bob, INSERT, 20]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, null]")
                                     .build())
                     .runSql(
                             "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
-                                    + "input => TABLE t PARTITION BY name)")
+                                    + "input => TABLE t PARTITION BY name,"
+                                    + "produces_full_deletes => false)")
                     .build();
 
-    public static final TableTestProgram SET_SEM_FULL_DELETES =
+    public static final TableTestProgram 
RETRACT_PARTITION_BY_PRODUCES_FULL_DELETES =
             TableTestProgram.of(
-                            "to-changelog-set-sem-full-deletes",
-                            "set semantics: produces_full_deletes=true on an 
input that already emits full deletes is a no-op for the planner and the full 
DELETE row reaches the output")
+                            
"to-changelog-retract-partition-by-produces-full-deletes",
+                            "retract input in set semantics with 
produces_full_deletes=true (default): the input row passes through unchanged, 
full DELETE pre-image reaches the output")
                     .setupTableSource(
                             SourceTestStep.newBuilder("t")
                                     .addSchema(
@@ -759,10 +756,10 @@ public class ToChangelogTestPrograms {
                                     + "produces_full_deletes => true)")
                     .build();
 
-    public static final TableTestProgram SET_SEM_FORCE_FULL_DELETES =
+    public static final TableTestProgram 
UPSERT_PARTITION_BY_PRODUCES_FULL_DELETES =
             TableTestProgram.of(
-                            "to-changelog-set-sem-force-full-deletes",
-                            "set semantics: produces_full_deletes=true forces 
ChangelogNormalize to materialize the full DELETE row from an upsert source 
emitting key-only deletes")
+                            
"to-changelog-upsert-partition-by-produces-full-deletes",
+                            "upsert input in set semantics with 
produces_full_deletes=true: ChangelogNormalize materializes the full DELETE row 
from a key-only delete")
                     .setupTableSource(
                             SourceTestStep.newBuilder("t")
                                     .addSchema(
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
index d8874a2be16..66162e205b9 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
@@ -33,21 +33,31 @@ import 
org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
 import org.apache.flink.table.functions.TableSemantics;
 import org.apache.flink.table.types.inference.CallContext;
 import 
org.apache.flink.table.types.inference.strategies.ChangelogTypeStrategyUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.UpsertKeyUtils;
 import org.apache.flink.types.ColumnList;
 import org.apache.flink.types.RowKind;
 
 import javax.annotation.Nullable;
 
 import java.util.EnumMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP_MAPPING;
+import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_PRODUCES_FULL_DELETES;
+import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_TABLE;
+
 /**
  * Runtime implementation of {@link BuiltInFunctionDefinitions#TO_CHANGELOG}.
  *
- * <p>Converts each input row into an INSERT-only output row with an operation 
code column. The
- * output schema is {@code [op_column, ...all_input_columns...]}.
+ * <p>Converts each input row into an INSERT-only output row with an operation 
code column. Output
+ * schema is {@code [op_column, ...projected_input_columns...]}. Partition 
columns are prepended by
+ * the framework outside this function and are not part of the projection.
  *
  * <p>Uses {@link JoinedRowData} to combine the op column with the full input 
row.
  */
@@ -65,51 +75,55 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
 
     private final Map<RowKind, String> rawOpMap;
     private final int[] outputIndices;
+    private final RowType inputRowType;
     private final boolean producesFullDelete;
+    private final boolean[] upsertKeyColumn;
 
     private transient Map<RowKind, StringData> opMap;
     private transient GenericRowData opRow;
     private transient JoinedRowData output;
     private transient ProjectedRowData projectedOutput;
-    private transient GenericRowData nullPayloadRow;
+    private transient GenericRowData partialDeletePayload;
+    private transient RowData.FieldGetter[] preservedFieldGetters;
 
     @SuppressWarnings("unchecked")
     public ToChangelogFunction(final SpecializedContext context) {
         super(BuiltInFunctionDefinitions.TO_CHANGELOG, context);
         final CallContext callContext = context.getCallContext();
         // Table argument is guaranteed by the type strategy's validation 
phase.
-        final TableSemantics tableSemantics = 
callContext.getTableSemantics(0).get();
+        final TableSemantics tableSemantics = 
callContext.getTableSemantics(ARG_TABLE).get();
 
         final Map<String, String> opMapping =
-                callContext.getArgumentValue(2, Map.class).orElse(null);
+                callContext.getArgumentValue(ARG_OP_MAPPING, 
Map.class).orElse(null);
         this.rawOpMap = buildOpMap(opMapping);
         if (opMapping != null) {
             validateOpMap(this.rawOpMap, tableSemantics);
         }
         final boolean producesFullDeletesArg =
-                callContext.getArgumentValue(3, Boolean.class).orElse(false);
-        validateProducesFullDeletes(producesFullDeletesArg, tableSemantics);
+                callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, 
Boolean.class).orElse(true);
+        final boolean isExplicit = 
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
+        validateProducesFullDeletes(producesFullDeletesArg, isExplicit, 
tableSemantics);
 
         this.outputIndices = 
ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics);
-        this.producesFullDelete = 
resolveProducesFullDelete(producesFullDeletesArg, tableSemantics);
+        this.inputRowType = (RowType) 
tableSemantics.dataType().getLogicalType();
+        this.producesFullDelete = producesFullDeletesArg;
+        this.upsertKeyColumn =
+                computeUpsertKeyColumn(
+                        this.outputIndices,
+                        
UpsertKeyUtils.smallestKey(tableSemantics.upsertKeyColumns()));
     }
 
-    /**
-     * Decides whether this function emits full DELETE rows (input passed 
through unchanged) or
-     * partial DELETE rows (only identifying columns preserved, rest nulled).
-     *
-     * <p>The framework prepends partition-key columns to the output without 
consulting this
-     * function, so in set semantics partition keys are preserved on DELETE 
rows for free. In row
-     * semantics there is no key column to preserve, so the function passes 
the input through
-     * unchanged regardless of {@code produces_full_deletes}.
-     */
-    private static boolean resolveProducesFullDelete(
-            final boolean producesFullDeletesArg, final TableSemantics 
tableSemantics) {
-        if (producesFullDeletesArg) {
-            return true;
+    private static boolean[] computeUpsertKeyColumn(
+            final int[] outputIndices, final int[] upsertKey) {
+        final Set<Integer> keepInputIndices = new HashSet<>();
+        for (final int key : upsertKey) {
+            keepInputIndices.add(key);
         }
-        final boolean hasPartitionBy = 
tableSemantics.partitionByColumns().length > 0;
-        return !hasPartitionBy;
+        final boolean[] mask = new boolean[outputIndices.length];
+        for (int i = 0; i < outputIndices.length; i++) {
+            mask[i] = keepInputIndices.contains(outputIndices[i]);
+        }
+        return mask;
     }
 
     @Override
@@ -120,7 +134,16 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         opRow = new GenericRowData(1);
         output = new JoinedRowData();
         projectedOutput = ProjectedRowData.from(outputIndices);
-        nullPayloadRow = new GenericRowData(outputIndices.length);
+        partialDeletePayload = new GenericRowData(outputIndices.length);
+        preservedFieldGetters = new RowData.FieldGetter[outputIndices.length];
+        final List<LogicalType> inputFieldTypes = inputRowType.getChildren();
+        for (int i = 0; i < outputIndices.length; i++) {
+            if (upsertKeyColumn[i]) {
+                preservedFieldGetters[i] =
+                        RowData.createFieldGetter(
+                                inputFieldTypes.get(outputIndices[i]), 
outputIndices[i]);
+            }
+        }
     }
 
     /**
@@ -172,29 +195,48 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
     }
 
     /**
-     * Rejects {@code produces_full_deletes=true} when the input changelog 
never emits DELETE rows.
+     * Validates an explicit {@code produces_full_deletes} argument against 
the input.
+     *
+     * <p>For {@code produces_full_deletes=true}, the input changelog must 
emit DELETE rows;
+     * otherwise the parameter is dead. For {@code 
produces_full_deletes=false}, the input must
+     * declare an upsert key or the call must use {@code PARTITION BY}; 
otherwise the function has
+     * no identifying columns to preserve when nulling the rest.
+     *
+     * <p>No validation runs when the argument is absent, since the default 
(full deletes) is safe
+     * for any input.
      *
      * <p>Lives here rather than in the input type strategy because {@link
-     * TableSemantics#changelogMode()} returns empty during type inference and 
is only populated at
-     * specialization time. The complementary check against the literal {@code 
op_mapping}
-     * argument runs earlier in {@code ToChangelogTypeStrategy}.
+     * TableSemantics#changelogMode()} and {@link 
TableSemantics#upsertKeyColumns()} are only
+     * populated at specialization time.
      */
     private static void validateProducesFullDeletes(
-            final boolean producesFullDeletesArg, final TableSemantics 
tableSemantics) {
-        if (!producesFullDeletesArg) {
+            final boolean producesFullDeletesArg,
+            final boolean isExplicit,
+            final TableSemantics tableSemantics) {
+        if (!isExplicit) {
             return;
         }
-        final ChangelogMode inputMode = 
tableSemantics.changelogMode().orElse(null);
-        if (inputMode == null) {
+        if (producesFullDeletesArg) {
+            final ChangelogMode inputMode = 
tableSemantics.changelogMode().orElse(null);
+            if (inputMode != null && !inputMode.contains(RowKind.DELETE)) {
+                throw new ValidationException(
+                        String.format(
+                                "Invalid 'produces_full_deletes' for 
TO_CHANGELOG: the input "
+                                        + "table only produces %s and never 
emits DELETE rows. "
+                                        + "Remove the 'produces_full_deletes' 
argument.",
+                                inputMode.getContainedKinds()));
+            }
             return;
         }
-        if (!inputMode.contains(RowKind.DELETE)) {
+        final boolean hasPartitionBy = 
tableSemantics.partitionByColumns().length > 0;
+        final boolean hasUpsertKey = 
!tableSemantics.upsertKeyColumns().isEmpty();
+        if (!hasPartitionBy && !hasUpsertKey) {
             throw new ValidationException(
-                    String.format(
-                            "Invalid 'produces_full_deletes' for TO_CHANGELOG: 
the input table "
-                                    + "only produces %s and never emits DELETE 
rows. Remove the "
-                                    + "'produces_full_deletes' argument.",
-                            inputMode.getContainedKinds()));
+                    "Invalid 'produces_full_deletes=false' for TO_CHANGELOG: 
the input has no "
+                            + "upsert key and the call has no PARTITION BY, so 
the function has "
+                            + "no identifying columns to preserve on DELETE 
rows. Remove the "
+                            + "argument (the default emits full DELETE rows) 
or add a "
+                            + "PARTITION BY.");
         }
     }
 
@@ -210,10 +252,28 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         }
 
         opRow.setField(0, opCode);
-        final RowData payload =
-                (input.getRowKind() == RowKind.DELETE && !producesFullDelete)
-                        ? nullPayloadRow
-                        : projectedOutput.replaceRow(input);
+        final RowData payload;
+        if (input.getRowKind() == RowKind.DELETE && !producesFullDelete) {
+            payload = buildPartialDeletePayload(input);
+        } else {
+            payload = projectedOutput.replaceRow(input);
+        }
         collect(output.replace(opRow, payload));
     }
+
+    /**
+     * Builds the payload for a partial DELETE row: upsert-key columns are 
copied from the input,
+     * all other columns are emitted as {@code null}. Partition-key columns 
are not included here
+     * since the framework prepends them outside the function's projected 
output.
+     */
+    private RowData buildPartialDeletePayload(final RowData input) {
+        for (int i = 0; i < outputIndices.length; i++) {
+            if (upsertKeyColumn[i]) {
+                partialDeletePayload.setField(i, 
preservedFieldGetters[i].getFieldOrNull(input));
+            } else {
+                partialDeletePayload.setField(i, null);
+            }
+        }
+        return partialDeletePayload;
+    }
 }

Reply via email to