This is an automated email from the ASF dual-hosted git repository.

gustavodemorais pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bca2c0a6170adbbf39cfaf4327ec368f003e4f40
Author: Ramin Gharib <[email protected]>
AuthorDate: Thu May 21 17:57:55 2026 +0200

    [FLINK-39636][table] Add produces_full_deletes parameter to TO_CHANGELOG
    
    Introduces the optional produces_full_deletes boolean argument on the 
built-in TO_CHANGELOG PTF. Controls whether DELETE rows downstream carry the 
full pre-image. Only the partition-key path is covered here; row-semantic 
partial deletes via the upsert key follow in a later commit.
---
 .../docs/sql/reference/queries/changelog.md        |  58 +++++-
 flink-python/pyflink/table/table.py                |  18 +-
 .../apache/flink/table/api/PartitionedTable.java   |  10 +-
 .../java/org/apache/flink/table/api/Table.java     |  10 +-
 .../functions/BuiltInFunctionDefinitions.java      |  40 ++--
 .../table/types/inference/BuiltInCondition.java    |   1 +
 .../table/types/inference/TraitCondition.java      |  11 ++
 .../strategies/ToChangelogTypeStrategy.java        |  16 +-
 .../exec/stream/ToChangelogSemanticTests.java      |  10 +-
 .../nodes/exec/stream/ToChangelogTestPrograms.java | 216 ++++++++++++++++++++-
 .../planner/plan/stream/sql/ToChangelogTest.java   |  38 ++++
 .../planner/plan/stream/sql/ToChangelogTest.xml    |  66 +++++--
 .../plan/to-changelog-retract-restore.json         |   7 +-
 .../runtime/functions/ptf/ToChangelogFunction.java |  72 ++++++-
 14 files changed, 527 insertions(+), 46 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
index 78f61e1ce10..7a1598ee850 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -296,17 +296,19 @@ This is useful when you need to materialize changelog 
events into a downstream s
 SELECT * FROM TO_CHANGELOG(
   input => TABLE source_table [PARTITION BY key_col],
   [op => DESCRIPTOR(op_column_name),]
-  [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
+  [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...],]
+  [produces_full_deletes => BOOLEAN]
 )
 ```
 
 ### Parameters
 
-| Parameter    | Required | Description                                        
                                                                                
                                                                                
                                                                                
                                                      |
-|:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `input`      | Yes      | The input table. With `PARTITION BY`, rows with 
the same key are co-located and run in the same operator instance. Without 
`PARTITION BY`, each row is processed independently. Accepts insert-only, 
retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key 
should match or be a subset of the upsert key of the subquery.                  
                    |
-| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`.                                        
                                                                                
                                                                                
                                                        |
-| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping change operation 
names to custom output codes. Keys can contain comma-separated names to map 
multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When 
provided, only mapped operations are forwarded - unmapped events are dropped. 
Each change operation may appear at most once across all entries. |
+| Parameter               | Required | Description                             
                                                                                
                                                                                
                                                                                
                                                                 |
+|:------------------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `input`                 | Yes      | The input table. With `PARTITION BY`, 
rows with the same key are co-located and run in the same operator instance. 
Without `PARTITION BY`, each row is processed independently. Accepts 
insert-only, retract, and upsert tables. For upsert tables, the provided 
`PARTITION BY` key should match or be a subset of the upsert key of the 
subquery.       |
+| `op`                    | No       | A `DESCRIPTOR` with a single column 
name for the operation code column. Defaults to `op`.                           
                                                                                
                                                                                
                                                                     |
+| `op_mapping`            | No       | A `MAP<STRING, STRING>` mapping change 
operation names to custom output codes. Keys can contain comma-separated names 
to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). 
When provided, only mapped operations are forwarded - unmapped events are 
dropped. Each change operation may appear at most once across all entries. |
+| `produces_full_deletes` | No       | A `BOOLEAN` literal that controls how 
DELETE rows are emitted. When `true`, the function requires fully-populated 
DELETE rows from the input. The planner inserts a `ChangelogNormalize` operator 
for upsert sources that emit key-only deletes, so downstream sees the full 
pre-image on DELETE. When `false` (default), no full-delete requirement is 
enforced. Partial DELETE rows from the input pass through unchanged. With 
`PARTITION BY` (set semantics), the [...]
 
 #### Default op_mapping
 
@@ -397,6 +399,42 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Delete handling
+
+The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The behavior depends on whether 
`PARTITION BY` is used (set semantics) or not (row semantics).
+
+**With `produces_full_deletes => true`.** The planner requires the input to 
produce DELETE rows with all columns populated. For upsert sources, a 
`ChangelogNormalize` operator is inserted to materialize the full pre-image 
from state. The function then emits fully-populated DELETE rows downstream.
+
+```sql
+-- Upsert source: -D[id:5] (key-only).
+-- ChangelogNormalize materializes the full pre-image from state.
+-- Output: +I[op:'DELETE', id:5, name:'Alice']
+SELECT * FROM TO_CHANGELOG(
+  input => TABLE upsert_source,
+  produces_full_deletes => true
+)
+```
+
+**With `produces_full_deletes => false` (default).** The planner does not 
require fully-populated DELETE rows on the input. For upsert sources that emit 
key-only deletes (e.g. Kafka compacted topics), this avoids the stateful 
`ChangelogNormalize` operator that would otherwise materialize the full 
pre-image of each deleted row.
+
+In **row semantics** (no `PARTITION BY`) the function passes the input row 
through unchanged. If the source emits partial DELETE rows they remain partial 
downstream; if it emits full DELETE rows they remain full.
+
+```sql
+-- Source emits -D[id:5] (key-only).
+-- Output: +I[op:'DELETE', id:5, name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
+```
+
+In **set semantics** (`PARTITION BY`) the function additionally nulls every 
non-partition-key column on DELETE rows. This forces the output to carry only 
the partition key on DELETE even when the input row was fully populated, which 
matches the shape expected by upsert sinks and Kafka compacted topics.
+
+```sql
+-- Source emits -D[id:5, name:'Alice'] (full pre-image, e.g. from a retract 
source).
+-- Output: +I[id:5, op:'DELETE', name:null]
+SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id)
+```
+
+There is no way to derive a partial DELETE in row semantics when the input 
emits a full pre-image, since the function has no key column to preserve. Use 
`PARTITION BY` for that case.
+
 #### Partitioning by a key
 
 ```sql
@@ -434,6 +472,14 @@ Table result = myTable.toChangelog(
     map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
 );
 
+// Require fully-populated DELETE rows from the input (inserts a 
ChangelogNormalize for
+// upsert sources). When false (default), no full-delete requirement is 
enforced; in row
+// semantics the input passes through unchanged, in set semantics 
non-partition-key columns
+// are nulled on DELETE.
+Table result = myTable.toChangelog(
+    lit(true).asArgument("produces_full_deletes")
+);
+
 // Set semantics: co-locate rows with the same key in the same parallel 
operator instance.
 // Equivalent to PARTITION BY in SQL. The partition keys are prepended to the 
output columns.
 Table result = myTable.partitionBy($("id")).toChangelog();
diff --git a/flink-python/pyflink/table/table.py 
b/flink-python/pyflink/table/table.py
index 583167fa478..bb06642db9e 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -1196,10 +1196,18 @@ class Table(object):
         INSERT-only row with a string ``op`` column indicating the original 
operation
         (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE).
 
+        The optional ``produces_full_deletes`` boolean controls how DELETE 
rows are
+        emitted. When ``True``, the planner inserts a ``ChangelogNormalize`` 
operator
+        for upsert sources that emit key-only deletes so the function emits 
fully
+        populated DELETE rows downstream. When ``False`` (default), no 
full-delete
+        requirement is enforced. In row semantics the input is passed through 
unchanged,
+        and in set semantics (``PARTITION BY``) non-partition-key columns are 
nulled on
+        DELETE rows.
+
         Example:
         ::
 
-            >>> from pyflink.table.expressions import descriptor, map_
+            >>> from pyflink.table.expressions import descriptor, map_, lit, 
col
             >>> # Default: adds 'op' column with standard change operation 
names
             >>> result = table.to_changelog()
             >>> # Custom op column name and mapping
@@ -1213,8 +1221,14 @@ class Table(object):
             ...     map_("INSERT, UPDATE_AFTER", "false",
             ...          "DELETE", "true").as_argument("op_mapping")
             ... )
+            >>> # Require fully populated DELETE rows from the input. Inserts a
+            >>> # ChangelogNormalize for upsert sources.
+            >>> result = table.to_changelog(
+            ...     lit(True).as_argument("produces_full_deletes")
+            ... )
 
-        :param arguments: Optional named arguments for ``op`` and 
``op_mapping``.
+        :param arguments: Optional named arguments for ``op``, ``op_mapping``, 
and
+                          ``produces_full_deletes``.
         :return: An append-only :class:`~pyflink.table.Table` with an ``op`` 
column prepended
                  to the input columns.
         """
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
index 51086f1edfe..074bdfdbaef 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
@@ -204,9 +204,17 @@ public interface PartitionedTable {
      *         descriptor("deleted").asArgument("op"),
      *         map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
      *     );
+     *
+     * // Require fully-populated DELETE rows from the input (inserts a 
ChangelogNormalize for
+     * // upsert sources). When false (default), DELETE rows on upsert inputs 
may omit non-key
+     * // columns, which avoids the stateful normalization operator upstream.
+     * Table result = table
+     *     .partitionBy($("id"))
+     *     .toChangelog(lit(true).asArgument("produces_full_deletes"));
      * }</pre>
      *
-     * @param arguments optional named arguments for {@code op} and {@code 
op_mapping}
+     * @param arguments optional named arguments for {@code op}, {@code 
op_mapping}, and {@code
+     *     produces_full_deletes}
      * @return an append-only {@link Table} with output schema {@code 
[partition_keys, op,
      *     non_partition_input_columns]}
      * @see Table#toChangelog(Expression...)
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 0ffb90c5fcc..de6ce9d3e5e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1459,9 +1459,17 @@ public interface Table extends Explainable<Table>, 
Executable {
      *     descriptor("deleted").asArgument("op"),
      *     map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
      * );
+     *
+     * // Require fully-populated DELETE rows from the input (inserts a 
ChangelogNormalize for
+     * // upsert sources). When false (default), no full-delete requirement is 
enforced and partial
+     * // DELETE rows from the input pass through unchanged.
+     * Table result = table.toChangelog(
+     *     lit(true).asArgument("produces_full_deletes")
+     * );
      * }</pre>
      *
-     * @param arguments optional named arguments for {@code op} and {@code 
op_mapping}
+     * @param arguments optional named arguments for {@code op}, {@code 
op_mapping}, and {@code
+     *     produces_full_deletes}
      * @return an append-only {@link Table} with an {@code op} column 
prepended to the input columns
      */
     Table toChangelog(Expression... arguments);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 4d0c9b28dac..19b8219e98b 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -839,25 +839,35 @@ public final class BuiltInFunctionDefinitions {
                                                                             
"UPDATE_BEFORE"))))
                                     .withConditionalTrait(
                                             
StaticArgumentTrait.REQUIRE_FULL_DELETE,
-                                            TraitCondition.or(
-                                                    // op_mapping omitted: 
default mapping includes
-                                                    // DELETE.
-                                                    TraitCondition.not(
-                                                            
TraitCondition.argIsPresent(
-                                                                    
"op_mapping")),
-                                                    TraitCondition.argMatches(
-                                                            "op_mapping",
-                                                            Map.class,
-                                                            mapping ->
-                                                                    
opMappingContainsKey(
-                                                                            
(Map<String, String>)
-                                                                               
     mapping,
-                                                                            
"DELETE")))),
+                                            // Require full deletes only when 
the user explicitly
+                                            // asks for them via 
produces_full_deletes=TRUE *and*
+                                            // the active op_mapping includes 
DELETE. Otherwise the
+                                            // planner can skip 
ChangelogNormalize for upsert
+                                            // sources that emit key-only 
deletes.
+                                            TraitCondition.and(
+                                                    
TraitCondition.argIsEqualTo(
+                                                            
"produces_full_deletes", Boolean.TRUE),
+                                                    TraitCondition.or(
+                                                            TraitCondition.not(
+                                                                    
TraitCondition.argIsPresent(
+                                                                            
"op_mapping")),
+                                                            
TraitCondition.argMatches(
+                                                                    
"op_mapping",
+                                                                    Map.class,
+                                                                    mapping ->
+                                                                            
opMappingContainsKey(
+                                                                               
     (Map<
+                                                                               
                     String,
+                                                                               
                     String>)
+                                                                               
             mapping,
+                                                                               
     "DELETE"))))),
                             StaticArgument.scalar("op", 
DataTypes.DESCRIPTOR(), true),
                             StaticArgument.scalar(
                                     "op_mapping",
                                     DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()),
-                                    true))
+                                    true),
+                            StaticArgument.scalar(
+                                    "produces_full_deletes", 
DataTypes.BOOLEAN(), true))
                     .inputTypeStrategy(TO_CHANGELOG_INPUT_TYPE_STRATEGY)
                     .outputTypeStrategy(TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
                     .runtimeClass(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java
index ba87cc0559d..4c0149f6533 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java
@@ -40,6 +40,7 @@ final class BuiltInCondition implements TraitCondition {
         ARG_IS_PRESENT,
         NOT,
         OR,
+        AND,
         ARG_MATCHES
     }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java
index 14734db11f4..8b86b37cc9f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java
@@ -84,6 +84,17 @@ public interface TraitCondition {
                 ctx -> left.test(ctx) || right.test(ctx));
     }
 
+    /**
+     * True when both the {@code left} and the {@code right} {@link 
TraitCondition} evaluate to
+     * true.
+     */
+    static TraitCondition and(final TraitCondition left, final TraitCondition 
right) {
+        return new BuiltInCondition(
+                BuiltInCondition.Kind.AND,
+                List.of(left, right),
+                ctx -> left.test(ctx) && right.test(ctx));
+    }
+
     /** True when the named scalar argument was provided by the caller. */
     static TraitCondition argIsPresent(final String argName) {
         return new BuiltInCondition(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
index d977deab4a9..8ff8101dc5e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
@@ -59,7 +59,7 @@ public final class ToChangelogTypeStrategy {
             new InputTypeStrategy() {
                 @Override
                 public ArgumentCount getArgumentCount() {
-                    return ConstantArgumentCount.between(1, 3);
+                    return ConstantArgumentCount.between(1, 4);
                 }
 
                 @Override
@@ -77,7 +77,12 @@ public final class ToChangelogTypeStrategy {
                             Signature.of(
                                     Argument.of("input", "TABLE"),
                                     Argument.of("op", "DESCRIPTOR"),
-                                    Argument.of("op_mapping", "MAP<STRING, 
STRING>")));
+                                    Argument.of("op_mapping", "MAP<STRING, 
STRING>")),
+                            Signature.of(
+                                    Argument.of("input", "TABLE"),
+                                    Argument.of("op", "DESCRIPTOR"),
+                                    Argument.of("op_mapping", "MAP<STRING, 
STRING>"),
+                                    Argument.of("produces_full_deletes", 
"BOOLEAN")));
                 }
             };
 
@@ -144,6 +149,13 @@ public final class ToChangelogTypeStrategy {
             }
         }
 
+        final boolean hasProducesFullDeletesArg = 
!callContext.isArgumentNull(3);
+        if (hasProducesFullDeletesArg && !callContext.isArgumentLiteral(3)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "The 'produces_full_deletes' argument must be a constant 
BOOLEAN literal.");
+        }
+
         return Optional.of(callContext.getArgumentDataTypes());
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
index bfffa8bd67d..762e08b3538 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -55,6 +55,14 @@ public class ToChangelogSemanticTests extends 
SemanticTestBase {
                 ToChangelogTestPrograms.INVALID_DESCRIPTOR,
                 ToChangelogTestPrograms.INVALID_OP_MAPPING,
                 ToChangelogTestPrograms.OP_MAPPING_REFERENCES_UNSUPPORTED_KIND,
-                ToChangelogTestPrograms.DUPLICATE_ROW_KIND);
+                ToChangelogTestPrograms.DUPLICATE_ROW_KIND,
+                
ToChangelogTestPrograms.PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT,
+                
ToChangelogTestPrograms.PRODUCES_FULL_DELETES_WITHOUT_DELETE_IN_OP_MAPPING,
+                ToChangelogTestPrograms.ROW_SEM_PARTIAL_DELETES,
+                ToChangelogTestPrograms.ROW_SEM_FORCE_FULL_DELETES,
+                ToChangelogTestPrograms.SET_SEM_PARTIAL_DELETES,
+                ToChangelogTestPrograms.SET_SEM_FULL_DELETES,
+                ToChangelogTestPrograms.SET_SEM_FORCE_FULL_DELETES,
+                ToChangelogTestPrograms.SET_SEM_FORCE_PARTIAL_DELETES);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index 250ad1f1d8f..f04d30d1d99 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -157,7 +157,7 @@ public class ToChangelogTestPrograms {
     public static final TableTestProgram UPSERT =
             TableTestProgram.of(
                             "to-changelog-upsert-input",
-                            "upsert input gets ChangelogNormalize for 
UPDATE_BEFORE and full deletes")
+                            "upsert input in row semantics gets 
ChangelogNormalize for UPDATE_BEFORE and emits full deletes")
                     .setupTableSource(
                             SourceTestStep.newBuilder("t")
                                     .addSchema(
@@ -187,7 +187,8 @@ public class ToChangelogTestPrograms {
     public static final TableTestProgram UPSERT_PARTITION_BY =
             TableTestProgram.of(
                             "to-changelog-upsert-partition-by",
-                            "PARTITION BY upsert key + mapping without UB 
skips ChangelogNormalize")
+                            "PARTITION BY upsert key + mapping without UB 
skips ChangelogNormalize; "
+                                    + "default produces_full_deletes=false 
nulls non-key columns on DELETE")
                     .setupTableSource(
                             SourceTestStep.newBuilder("t")
                                     .addSchema(
@@ -206,7 +207,7 @@ public class ToChangelogTestPrograms {
                                             "+I[Alice, C, 10]",
                                             "+I[Bob, C, 20]",
                                             "+I[Alice, C, 30]",
-                                            "+I[Bob, D, 20]")
+                                            "+I[Bob, D, null]")
                                     .build())
                     .runSql(
                             "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
@@ -598,4 +599,213 @@ public class ToChangelogTestPrograms {
                             ValidationException.class,
                             "Duplicate change operation: 'DELETE'")
                     .build();
+
+    public static final TableTestProgram 
PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT =
+            TableTestProgram.of(
+                            
"to-changelog-produces-full-deletes-on-append-only-input",
+                            "fails when produces_full_deletes=true on an input 
that never emits DELETE rows")
+                    .setupTableSource(SIMPLE_SOURCE)
+                    .runFailingSql(
+                            "SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => true)",
+                            ValidationException.class,
+                            "the input table only produces [INSERT] and never 
emits DELETE rows")
+                    .build();
+
+    public static final TableTestProgram 
PRODUCES_FULL_DELETES_WITHOUT_DELETE_IN_OP_MAPPING =
+            TableTestProgram.of(
+                            
"to-changelog-produces-full-deletes-without-delete-in-op-mapping",
+                            "fails when produces_full_deletes=true but the 
active op_mapping strips DELETE")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(Row.ofKind(RowKind.INSERT, 
"Alice", 10L))
+                                    .build())
+                    .runFailingSql(
+                            "SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "op_mapping => MAP['INSERT, 
UPDATE_AFTER', 'X'], "
+                                    + "produces_full_deletes => true)",
+                            ValidationException.class,
+                            "the active 'op_mapping' does not map DELETE rows")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Row semantics x delete handling matrix
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram ROW_SEM_PARTIAL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-row-sem-partial-deletes",
+                            "row semantics: produces_full_deletes=false skips 
ChangelogNormalize and a partial DELETE row from the input passes through 
unchanged")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[INSERT, Alice, 10]",
+                                            "+I[INSERT, Bob, 20]",
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => false)")
+                    .build();
+
+    public static final TableTestProgram ROW_SEM_FORCE_FULL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-row-sem-force-full-deletes",
+                            "row semantics: produces_full_deletes=true forces 
ChangelogNormalize to materialize the full DELETE row from an upsert source 
emitting key-only deletes")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            // Key-only delete: 
ChangelogNormalize fills the row.
+                                            Row.ofKind(RowKind.DELETE, 
"Alice", null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[INSERT, Alice, 10]", 
"+I[DELETE, Alice, 10]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => true)")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Set semantics x delete handling matrix
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram SET_SEM_FORCE_PARTIAL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-set-sem-force-partial-deletes",
+                            "set semantics: produces_full_deletes=false nulls 
non-partition-key columns on DELETE even when the input row is fully populated")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[Alice, INSERT, 10]",
+                                            "+I[Bob, INSERT, 20]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name,"
+                                    + "produces_full_deletes => false)")
+                    .build();
+
+    public static final TableTestProgram SET_SEM_PARTIAL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-set-sem-partial-deletes",
+                            "set semantics: produces_full_deletes=false 
(default) lets a partial DELETE row from the input pass through with 
non-partition-key columns null")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.DELETE, 
"Alice", null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[Alice, INSERT, 10]", 
"+I[Alice, DELETE, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name)")
+                    .build();
+
+    public static final TableTestProgram SET_SEM_FULL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-set-sem-full-deletes",
+                            "set semantics: produces_full_deletes=true on an 
input that already emits full deletes is a no-op for the planner and the full 
DELETE row reaches the output")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.DELETE, 
"Alice", 10L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[Alice, INSERT, 10]", 
"+I[Alice, DELETE, 10]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name, "
+                                    + "produces_full_deletes => true)")
+                    .build();
+
+    public static final TableTestProgram SET_SEM_FORCE_FULL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-set-sem-force-full-deletes",
+                            "set semantics: produces_full_deletes=true forces 
ChangelogNormalize to materialize the full DELETE row from an upsert source 
emitting key-only deletes")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[Alice, INSERT, 10]",
+                                            "+I[Bob, INSERT, 20]",
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name, "
+                                    + "produces_full_deletes => true)")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
index 7f6f93849c2..1f7347b027e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
@@ -144,4 +144,42 @@ public class ToChangelogTest extends TableTestBase {
                         + "op_mapping => MAP['INSERT,UPDATE_AFTER', 'C'])",
                 CHANGELOG_MODE);
     }
+
+    @Test
+    void testUpsertSourceProducesFullDeletes() {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE upsert_source ("
+                                + "  id INT,"
+                                + "  name STRING,"
+                                + "  PRIMARY KEY (id) NOT ENFORCED"
+                                + ") WITH ("
+                                + "  'connector' = 'values',"
+                                + "  'changelog-mode' = 'I,UA,D'"
+                                + ")");
+        util.verifyRelPlan(
+                "SELECT * FROM TO_CHANGELOG("
+                        + "input => TABLE upsert_source, "
+                        + "produces_full_deletes => true)",
+                CHANGELOG_MODE);
+    }
+
+    @Test
+    void testUpsertSourceKeyOnlyDeletes() {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE upsert_source ("
+                                + "  id INT,"
+                                + "  name STRING,"
+                                + "  PRIMARY KEY (id) NOT ENFORCED"
+                                + ") WITH ("
+                                + "  'connector' = 'values',"
+                                + "  'changelog-mode' = 'I,UA,D'"
+                                + ")");
+        util.verifyRelPlan(
+                "SELECT * FROM TO_CHANGELOG("
+                        + "input => TABLE upsert_source, "
+                        + "produces_full_deletes => false)",
+                CHANGELOG_MODE);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
index 7cea1058e5c..174addbf418 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
@@ -23,14 +23,14 @@ limitations under the License.
     <Resource name="ast">
       <![CDATA[
 LogicalProject(op=[$0], id=[$1], name=[$2])
-+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, 
INTEGER id, VARCHAR(2147483647) name)])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)])
    +- LogicalProject(id=[$0], name=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
insert_only_source]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
 +- TableSourceScan(table=[[default_catalog, default_database, 
insert_only_source]], fields=[id, name], changelogMode=[I])
 ]]>
     </Resource>
@@ -42,14 +42,14 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), 
DEFAULT(), DEFAULT(), D
     <Resource name="ast">
       <![CDATA[
 LogicalProject(id=[$0], op=[$1], name=[$2])
-+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION 
BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) 
name)])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION 
BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) 
name)])
    +- LogicalProject(id=[$0], name=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
retract_source]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], 
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, 
VARCHAR(2147483647) name)], changelogMode=[I])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], 
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, 
VARCHAR(2147483647) name)], changelogMode=[I])
 +- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
    +- TableSourceScan(table=[[default_catalog, default_database, 
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
 ]]>
@@ -62,14 +62,14 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) 
PARTITION BY($0), DEFAUL
     <Resource name="ast">
       <![CDATA[
 LogicalProject(op=[$0], id=[$1], name=[$2])
-+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, 
INTEGER id, VARCHAR(2147483647) name)])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)])
    +- LogicalProject(id=[$0], name=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
retract_source]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
 +- TableSourceScan(table=[[default_catalog, default_database, 
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
 ]]>
     </Resource>
@@ -81,14 +81,14 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), 
DEFAULT(), DEFAULT(), D
     <Resource name="ast">
       <![CDATA[
 LogicalProject(id=[$0], op=[$1], name=[$2])
-+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION 
BY($0), DESCRIPTOR(_UTF-16LE'op'), 
MAP(_UTF-16LE'INSERT,UPDATE_AFTER':VARCHAR(19) CHARACTER SET "UTF-16LE", 
_UTF-16LE'C', _UTF-16LE'DELETE':VARCHAR(19) CHARACTER SET "UTF-16LE", 
_UTF-16LE'D'), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, 
VARCHAR(2147483647) op, VARCHAR(2147483647) name)])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION 
BY($0), DESCRIPTOR(_UTF-16LE'op'), 
MAP(_UTF-16LE'INSERT,UPDATE_AFTER':VARCHAR(19) CHARACTER SET "UTF-16LE", 
_UTF-16LE'C', _UTF-16LE'DELETE':VARCHAR(19) CHARACTER SET "UTF-16LE", 
_UTF-16LE'D'), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER 
id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)])
    +- LogicalProject(id=[$0], name=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_source]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER':VARCHAR(19) 
CHARACTER SET "UTF-16LE", _UTF-16LE'C', _UTF-16LE'DELETE':VARCHAR(19) CHARACTER 
SET "UTF-16LE", _UTF-16LE'D'), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], 
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, 
VARCHAR(2147483647) name)], changelogMode=[I])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER':VARCHAR(19) 
CHARACTER SET "UTF-16LE", _UTF-16LE'C', _UTF-16LE'DELETE':VARCHAR(19) CHARACTER 
SET "UTF-16LE", _UTF-16LE'D'), DEFAULT(), DEFAULT(), DEFAULT())], 
uid=[TO_CHANGELOG], select=[id,op,name], rowType=[RecordType(INTEGER id, 
VARCHAR(2147483647) op, VARCHAR(2147483647) name)], changelogMode=[I])
 +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
    +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_source]], fields=[id, name], changelogMode=[I,UA,D])
 ]]>
@@ -101,14 +101,14 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) 
PARTITION BY($0), DESCRI
     <Resource name="ast">
       <![CDATA[
 LogicalProject(id=[$0], op=[$1], name=[$2])
-+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION 
BY($0), DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER', 
_UTF-16LE'C'), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, 
VARCHAR(2147483647) op, VARCHAR(2147483647) name)])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION 
BY($0), DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER', 
_UTF-16LE'C'), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER 
id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)])
    +- LogicalProject(id=[$0], name=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_source]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER', _UTF-16LE'C'), 
DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], select=[id,op,name], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) 
name)], changelogMode=[I])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER', _UTF-16LE'C'), 
DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], select=[id,op,name], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) 
name)], changelogMode=[I])
 +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
    +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_source]], fields=[id, name], changelogMode=[I,UA,D])
 ]]>
@@ -121,14 +121,56 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) 
PARTITION BY($0), DESCRI
     <Resource name="ast">
       <![CDATA[
 LogicalProject(op=[$0], id=[$1], name=[$2])
-+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, 
INTEGER id, VARCHAR(2147483647) name)])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)])
    +- LogicalProject(id=[$0], name=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_source]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
++- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_source]], fields=[id, name], changelogMode=[I,UA,D])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUpsertSourceKeyOnlyDeletes">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, 
produces_full_deletes => false)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(op=[$0], id=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), false, DEFAULT(), DEFAULT())], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)])
+   +- LogicalProject(id=[$0], name=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_source]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
false, DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
++- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_source]], fields=[id, name], changelogMode=[I,UA,D])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUpsertSourceProducesFullDeletes">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, 
produces_full_deletes => true)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(op=[$0], id=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), true, DEFAULT(), DEFAULT())], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)])
+   +- LogicalProject(id=[$0], name=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_source]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
true, DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
 +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
    +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
       +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_source]], fields=[id, name], changelogMode=[I,UA,D])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
index 959c9473118..ea961c665ca 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json
@@ -37,7 +37,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647) 
NOT NULL, `score` BIGINT>",
-    "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], 
select=[op,name,score], rowType=[RecordType(VARCHAR(2147483647) op, 
VARCHAR(2147483647) name, BIGINT score)])",
+    "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], 
select=[op,name,score], rowType=[RecordType(VARCHAR(2147483647) op, 
VARCHAR(2147483647) name, BIGINT score)])",
     "uid" : null,
     "functionCall" : {
       "kind" : "CALL",
@@ -59,6 +59,11 @@
         "syntax" : "SPECIAL",
         "internalName" : "$DEFAULT$1",
         "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "BOOLEAN"
       }, {
         "kind" : "CALL",
         "syntax" : "SPECIAL",
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
index 0b6c6ca55b8..0110475ef8d 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
@@ -65,11 +65,13 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
 
     private final Map<RowKind, String> rawOpMap;
     private final int[] outputIndices;
+    private final boolean producesFullDelete;
 
     private transient Map<RowKind, StringData> opMap;
     private transient GenericRowData opRow;
     private transient JoinedRowData output;
     private transient ProjectedRowData projectedOutput;
+    private transient GenericRowData nullPayloadRow;
 
     @SuppressWarnings("unchecked")
     public ToChangelogFunction(final SpecializedContext context) {
@@ -84,7 +86,30 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         if (opMapping != null) {
             validateOpMap(this.rawOpMap, tableSemantics);
         }
+        final boolean producesFullDeletesArg =
+                callContext.getArgumentValue(3, Boolean.class).orElse(false);
+        validateProducesFullDeletes(producesFullDeletesArg, this.rawOpMap, 
tableSemantics);
+
         this.outputIndices = 
ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics);
+        this.producesFullDelete = 
resolveProducesFullDelete(producesFullDeletesArg, tableSemantics);
+    }
+
+    /**
+     * Decides whether this function emits full DELETE rows (input passed 
through unchanged) or
+     * partial DELETE rows (only identifying columns preserved, rest nulled).
+     *
+     * <p>The framework prepends partition-key columns to the output without 
consulting this
+     * function, so in set semantics partition keys are preserved on DELETE 
rows for free. In row
+     * semantics there is no key column to preserve, so the function passes 
the input through
+     * unchanged regardless of {@code produces_full_deletes}.
+     */
+    private static boolean resolveProducesFullDelete(
+            final boolean producesFullDeletesArg, final TableSemantics 
tableSemantics) {
+        if (producesFullDeletesArg) {
+            return true;
+        }
+        final boolean hasPartitionBy = 
tableSemantics.partitionByColumns().length > 0;
+        return !hasPartitionBy;
     }
 
     @Override
@@ -95,6 +120,7 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         opRow = new GenericRowData(1);
         output = new JoinedRowData();
         projectedOutput = ProjectedRowData.from(outputIndices);
+        nullPayloadRow = new GenericRowData(outputIndices.length);
     }
 
     /**
@@ -145,17 +171,59 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         }
     }
 
+    /**
+     * Rejects {@code produces_full_deletes=true} when the input changelog 
cannot produce DELETE
+     * rows (either the input mode does not contain DELETE, or the active 
{@code op_mapping} strips
+     * it). The parameter is then dead and likely a user mistake.
+     *
+     * <p>Lives here rather than in the input type strategy because {@link
+     * TableSemantics#changelogMode()} returns empty during type inference and 
is only populated at
+     * specialization time, which is when this constructor runs.
+     */
+    private static void validateProducesFullDeletes(
+            final boolean producesFullDeletesArg,
+            final Map<RowKind, String> mapping,
+            final TableSemantics tableSemantics) {
+        if (!producesFullDeletesArg) {
+            return;
+        }
+        final ChangelogMode inputMode = 
tableSemantics.changelogMode().orElse(null);
+        if (inputMode == null) {
+            return;
+        }
+        if (!inputMode.contains(RowKind.DELETE)) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid 'produces_full_deletes' for TO_CHANGELOG: 
the input table "
+                                    + "only produces %s and never emits DELETE 
rows. Remove the "
+                                    + "'produces_full_deletes' argument.",
+                            inputMode.getContainedKinds()));
+        }
+        if (!mapping.containsKey(RowKind.DELETE)) {
+            throw new ValidationException(
+                    "Invalid 'produces_full_deletes' for TO_CHANGELOG: the 
active 'op_mapping' "
+                            + "does not map DELETE rows, so no DELETE rows are 
emitted. Remove "
+                            + "the 'produces_full_deletes' argument or add a 
DELETE entry to "
+                            + "'op_mapping'.");
+        }
+    }
+
     public void eval(
             final Context ctx,
             final RowData input,
             @Nullable final ColumnList op,
-            @Nullable final MapData opMapping) {
+            @Nullable final MapData opMapping,
+            @Nullable final Boolean producesFullDeletes) {
         final StringData opCode = opMap.get(input.getRowKind());
         if (opCode == null) {
             return;
         }
 
         opRow.setField(0, opCode);
-        collect(output.replace(opRow, projectedOutput.replaceRow(input)));
+        final RowData payload =
+                (input.getRowKind() == RowKind.DELETE && !producesFullDelete)
+                        ? nullPayloadRow
+                        : projectedOutput.replaceRow(input);
+        collect(output.replace(opRow, payload));
     }
 }

Reply via email to