This is an automated email from the ASF dual-hosted git repository.

gustavodemorais pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e400c2c221368e98e80d5589cacbcb0b1585ba66
Author: Ramin Gharib <[email protected]>
AuthorDate: Thu May 28 15:25:50 2026 +0200

    [FLINK-39636][table] Remove unnecessary validation in TO_CHANGELOG
    
    Drop two checks that protected against user mistakes the engine can already 
handle silently:
    
    - op_mapping entries that reference change operations the input cannot 
produce no longer throw. The extra entries are unreachable at runtime, not 
harmful.
    - produces_full_deletes=true against an append-only input no longer throws. 
The argument is a no-op there, equivalent to omitting it.
    
    The remaining check covers the only case where the argument changes 
semantics in a way the function cannot honor:
    produces_full_deletes=false with no upsert key and no PARTITION BY, leaving 
nothing to preserve when nulling non-key columns. Rename 
validateProducesFullDeletes to validateProducesPartialDeletes to reflect the 
surviving case.
    
    Clean up the now-redundant test cases, javadocs, and docs sections.
---
 .../docs/sql/reference/queries/changelog.md        | 32 +++++-----
 .../apache/flink/table/api/PartitionedTable.java   |  4 +-
 .../java/org/apache/flink/table/api/Table.java     |  4 +-
 .../flink/table/functions/TableSemantics.java      | 29 +++++----
 .../strategies/ToChangelogTypeStrategy.java        | 39 -------------
 .../ToChangelogInputTypeStrategyTest.java          | 14 -----
 .../stream/StreamExecProcessTableFunction.java     |  8 +--
 .../exec/stream/ToChangelogSemanticTests.java      |  2 -
 .../nodes/exec/stream/ToChangelogTestPrograms.java | 26 ---------
 .../runtime/functions/ptf/ToChangelogFunction.java | 68 ++++------------------
 10 files changed, 48 insertions(+), 178 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
index c91107b2df2..641bb7245f7 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -308,18 +308,18 @@ SELECT * FROM TO_CHANGELOG(
 | `input`                 | Yes      | The input table. With `PARTITION BY`, 
rows with the same key are co-located and run in the same operator instance. 
Without `PARTITION BY`, each row is processed independently. Accepts 
insert-only, retract, and upsert tables. For upsert tables, the provided 
`PARTITION BY` key should match or be a subset of the upsert key of the 
subquery.       |
 | `op`                    | No       | A `DESCRIPTOR` with a single column 
name for the operation code column. Defaults to `op`.                           
                                                                                
                                                                                
                                                                     |
 | `op_mapping`            | No       | A `MAP<STRING, STRING>` mapping change 
operation names to custom output codes. Keys can contain comma-separated names 
to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). 
When provided, only mapped operations are forwarded - unmapped events are 
dropped. Each change operation may appear at most once across all entries. |
-| `produces_full_deletes` | No       | A `BOOLEAN` literal that controls how 
DELETE rows are emitted. When `true` (default), DELETE rows carry all columns, 
the full image. When `false`, only the identifying key columns are preserved 
and the rest are nulled. See [Full vs partial 
deletes](#full-vs-partial-deletes) for more details. |
+| `produces_full_deletes` | No       | A `BOOLEAN` literal that controls how 
DELETE rows are emitted. When `true` (default), DELETE rows carry all columns, 
the full image. When `false`, only the identifying key columns are preserved 
and the rest are nulled. See [Full vs partial 
deletes](#full-vs-partial-deletes) for more details.                            
                         |
 
 #### Default op_mapping
 
 When `op_mapping` is omitted, all four change operations are mapped to their 
standard names:
 
 | Change Operation | Output value      |
-|:----------------|:------------------|
-| INSERT          | `'INSERT'`        |
-| UPDATE_BEFORE   | `'UPDATE_BEFORE'` |
-| UPDATE_AFTER    | `'UPDATE_AFTER'`  |
-| DELETE          | `'DELETE'`        |
+|:-----------------|:------------------|
+| INSERT           | `'INSERT'`        |
+| UPDATE_BEFORE    | `'UPDATE_BEFORE'` |
+| UPDATE_AFTER     | `'UPDATE_AFTER'`  |
+| DELETE           | `'DELETE'`        |
 
 ### Output Schema
 
@@ -406,26 +406,22 @@ An **upsert key** is a column or set of columns that 
uniquely identifies a row a
 The planner derives the upsert key from the input table:
 
 * A declared `PRIMARY KEY` on the source table when reading directly.
-* The grouping columns of an upstream `GROUP BY <key>`.
+* The grouping columns of an upstream `GROUP BY <key>` or `PARTITION BY <key>`.
 * The keys propagated by operators that preserve them (e.g. lookup joins, 
calc-projections that keep the key columns).
 
-When no upsert key can be derived (e.g. a plain append-only source with no key 
constraint and no grouping upstream), the input has no row identity and 
downstream operators must treat it as append-only or fall back to retract 
semantics.
-
-`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve 
when emitting partial DELETE rows. See [Full vs partial 
deletes](#full-vs-partial-deletes) below.
-
 #### Full vs partial deletes
 
 The `produces_full_deletes` argument controls how DELETE rows are emitted and 
what the planner requires from the input. The matrix below shows each 
combination with `PARTITION BY` (set semantics) and without (row semantics). 
When `false`, the function relies on the input table's [upsert 
key](#upsert-key) to decide which columns to preserve.
 
 ##### `produces_full_deletes => true` (default)
 
-The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to materialize the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.
+The planner requires fully-populated DELETE rows on the input. For upsert 
sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted 
upstream to calculate the full pre-image from state. For sources that already 
emit a full pre-image (e.g. retract), the flag is a no-op. The function then 
passes the input row through unchanged on DELETE.
 
 **Row semantics** (no `PARTITION BY`):
 
 ```sql
--- Upsert source: -D[id:5] (key-only).
--- ChangelogNormalize materializes the full pre-image from state.
+-- Upsert source: -D[id:5, name: null] (key-only).
+-- ChangelogNormalize calculates the full pre-image from state.
 -- Output: +I[op:'DELETE', id:5, name:'Alice']
 SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)
 
@@ -438,8 +434,8 @@ SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)
 **Set semantics** (`PARTITION BY`):
 
 ```sql
--- Upsert source: -D[id:5] (key-only).
--- ChangelogNormalize materializes the full pre-image from state.
+-- Upsert source: -D[id:5, name: null] (key-only).
+-- ChangelogNormalize calculates the full pre-image from state.
 -- Output: +I[id:5, op:'DELETE', name:'Alice']
 SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id)
 
@@ -451,7 +447,7 @@ SELECT * FROM TO_CHANGELOG(input => TABLE retract_source 
PARTITION BY id)
 
 ##### `produces_full_deletes => false`
 
-The planner skips `ChangelogNormalize` and the function emits partial DELETE 
rows. This avoids the stateful normalization operator for upsert sources (e.g. 
Kafka compacted topics) where the full pre-image is not needed downstream. 
Requires an [upsert key](#upsert-key) to be present for the input table (row 
semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected 
with a validation error.
+The planner skips `ChangelogNormalize` and the function emits partial DELETE 
rows. This avoids the stateful normalization operator for upsert sources (e.g. 
Kafka compacted topics) where the full pre-image is not needed downstream. This 
requires an [upsert key](#upsert-key) to be present for the input table (row 
semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected 
with a validation error.
 
 **Row semantics** (no `PARTITION BY`): the function preserves the 
planner-derived upsert key columns on DELETE rows and nulls the rest. The 
upsert key is typically a declared `PRIMARY KEY` when directly reading from a 
source or the key provided in a `GROUP BY <key>`.
 
@@ -465,7 +461,7 @@ SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, 
produces_full_deletes =
 SELECT * FROM TO_CHANGELOG(input => TABLE retract_source, 
produces_full_deletes => false)
 ```
 
-**Set semantics** (`PARTITION BY`): the function preserves the partition key 
and nulls every non-partition-key column on DELETE rows. The key used as the 
partition-key column should be the unique key that will be used as the record 
identifier. This matches the shape expected by upsert sinks and Kafka compacted 
topics.
+**Set semantics** (`PARTITION BY`): the function preserves the partition key 
and nulls every non-partition-key column on DELETE rows. The key used as the 
partition-key column should be the unique key that will be used as the record 
identifier. This matches the shape expected by upsert sinks such as Kafka 
compacted topics.
 
 ```sql
 -- Upsert source: -D[id:5] (key-only).
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
index 401317d125d..ecf58fbe9b3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
@@ -207,9 +207,7 @@ public interface PartitionedTable {
      *
      * // Opt out of full-delete semantics. When `true` (default), DELETE rows 
carry the full
      * // pre-image. When `false`, only the identifying key columns are 
preserved and the rest
-     * // are nulled. See [Full vs partial deletes](
-     * // 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes)
-     * // for more details.
+     * // are nulled. See in documentation for more details.
      * Table result = table
      *     .partitionBy($("id"))
      *     .toChangelog(lit(false).asArgument("produces_full_deletes"));
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index ab4a617a2df..9d8183d7e4d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1462,9 +1462,7 @@ public interface Table extends Explainable<Table>, 
Executable {
      *
      * // Opt out of full-delete semantics. When `true` (default), DELETE rows 
carry the full
      * // pre-image. When `false`, only the identifying key columns are 
preserved and the rest
-     * // are nulled. See [Full vs partial deletes](
-     * // 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes)
-     * // for more details.
+     * // are nulled. See in documentation for more details.
      * Table result = table.toChangelog(
      *     lit(false).asArgument("produces_full_deletes")
      * );
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
index 3d17401c5ef..1289d054ff6 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java
@@ -133,20 +133,29 @@ public interface TableSemantics {
      * Upsert key candidates derived from the passed table's metadata.
      *
      * <p>Returns a list of 0-based column index arrays that uniquely identify 
a row for upsert
-     * semantics. This is distinct from {@link #partitionByColumns()}: 
partition keys describe
-     * distribution and co-location, upsert keys describe row identity. Useful 
for functions that
-     * need to emit key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER 
pairs, or want to have a
-     * unique identifier to interact with state.
+     * semantics. Useful for functions that need to emit key-only deletes, 
match UPDATE_BEFORE /
+     * UPDATE_AFTER pairs, or use a stable identifier to interact with state.
+     *
+     * <p>The upsert key describes row identity and is distinct from {@link 
#partitionByColumns()},
+     * which describes distribution and co-location. They are independent and 
frequently disagree:
+     *
+     * <pre>{@code
+     * -- Source declares PRIMARY KEY (id); the call partitions by region.
+     * -- partitionByColumns() = [region]   (chosen by the caller)
+     * -- upsertKeyColumns()   = [[id]]     (derived from the source's PK)
+     * TO_CHANGELOG(input => TABLE source PARTITION BY region)
+     * }</pre>
      *
      * <p>Returns an empty list when no upsert key is derivable, or when the 
planner has not yet
      * computed metadata (during type inference).
      *
-     * <p>When the planner derives multiple candidate upsert keys for the same 
input (e.g., a table
-     * with several primary key constraints), all of them are returned. 
Picking which candidate to
-     * use is the function's responsibility, and the choice must be stable 
across releases to keep
-     * PTF state consistent after job restarts and upgrades. The order of the 
returned list is not
-     * part of the contract; PTF authors should not depend on it. A typical 
choice is the smallest
-     * candidate by cardinality, with ties broken by the column indices in 
ascending order.
+     * <p>The planner may derive more than one candidate for the same input. 
For example, an inner
+     * join of two tables each carrying their own primary key produces a 
result with both keys as
+     * separate candidates. Picking which candidate to use is the function's 
responsibility, and the
+     * choice must be stable across releases so PTF state stays consistent 
after job restarts and
+     * upgrades. The order of the returned list is not part of the contract; 
PTF authors should not
+     * depend on it. A typical stable choice is the smallest candidate by 
cardinality, with ties
+     * broken by the column indices in ascending order.
      *
      * @return Candidate upsert keys of the passed table, or an empty list if 
none.
      */
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
index 4c27aae4723..1434b221077 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
@@ -113,11 +113,6 @@ public final class ToChangelogTypeStrategy {
             return error;
         }
 
-        error = validateProducesFullDeletes(callContext, throwOnFailure);
-        if (error.isPresent()) {
-            return error;
-        }
-
         return Optional.of(callContext.getArgumentDataTypes());
     }
 
@@ -198,40 +193,6 @@ public final class ToChangelogTypeStrategy {
         return Optional.empty();
     }
 
-    @SuppressWarnings("rawtypes")
-    private static Optional<List<DataType>> validateProducesFullDeletes(
-            final CallContext callContext, final boolean throwOnFailure) {
-        final boolean isExplicit = 
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
-        if (!isExplicit) {
-            return Optional.empty();
-        }
-        if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
-            return callContext.fail(
-                    throwOnFailure,
-                    "The 'produces_full_deletes' argument must be a constant 
BOOLEAN literal.");
-        }
-        final boolean producesFullDeletes =
-                callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, 
Boolean.class).orElse(true);
-        if (!producesFullDeletes) {
-            return Optional.empty();
-        }
-        // The check against the input changelog mode lives in the function 
constructor since
-        // TableSemantics#changelogMode() returns empty here at type-inference 
time. The mapping
-        // check below only needs the literal op_mapping argument, so it lives 
here. Only runs
-        // when the user explicitly set produces_full_deletes=true; the 
default true is not
-        // validated since it is a safe no-op for any input.
-        final Optional<Map> opMapping = 
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
-        if (opMapping.isPresent() && !mapsDelete((Map<String, String>) 
opMapping.get())) {
-            return callContext.fail(
-                    throwOnFailure,
-                    "Invalid 'produces_full_deletes' for TO_CHANGELOG: the 
active 'op_mapping' "
-                            + "does not map DELETE rows, so no DELETE rows are 
emitted. Remove "
-                            + "the 'produces_full_deletes' argument or add a 
DELETE entry to "
-                            + "'op_mapping'.");
-        }
-        return Optional.empty();
-    }
-
     /**
      * Returns {@code true} when at least one {@code op_mapping} key 
references {@code DELETE}. Keys
      * may be comma-separated (e.g., {@code "INSERT, DELETE"}) per the 
user-facing contract.
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java
index 6b5773949db..36607885bd8 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java
@@ -98,20 +98,6 @@ class ToChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                         .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, false)
                         .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE),
 
-                // Error: produces_full_deletes=true with op_mapping that 
strips DELETE
-                TestSpec.forStrategy(
-                                "produces_full_deletes=true rejected when 
op_mapping omits DELETE",
-                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
-                        .calledWithArgumentTypes(
-                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
-                        .calledWithTableSemanticsAt(ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
-                        .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT, 
UPDATE_AFTER", "X"))
-                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true)
-                        .expectErrorMessage(
-                                "Invalid 'produces_full_deletes' for 
TO_CHANGELOG: the active "
-                                        + "'op_mapping' does not map DELETE 
rows"),
-
                 // Error: multi-column descriptor for `op`
                 TestSpec.forStrategy(
                                 "Descriptor with multiple columns",
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
index e0cf5f09187..8c00acb8e57 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java
@@ -314,9 +314,9 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
 
     private RuntimeTableSemantics createRuntimeTableSemantics(
             StaticArgument tableArg, RexTableArgCall tableArgCall, 
List<Integer> inputTimeColumns) {
+        final int inputIndex = tableArgCall.getInputIndex();
         final RuntimeChangelogMode consumedChangelogMode =
-                RuntimeChangelogMode.serialize(
-                        inputChangelogModes.get(tableArgCall.getInputIndex()));
+                
RuntimeChangelogMode.serialize(inputChangelogModes.get(inputIndex));
         final DataType dataType;
         if (tableArg.getDataType().isPresent()) {
             dataType = tableArg.getDataType().get();
@@ -324,9 +324,7 @@ public class StreamExecProcessTableFunction extends 
ExecNodeBase<RowData>
             dataType = 
DataTypes.of(FlinkTypeFactory.toLogicalRowType(tableArgCall.type));
         }
 
-        final int timeColumn = 
inputTimeColumns.get(tableArgCall.getInputIndex());
-
-        final int inputIndex = tableArgCall.getInputIndex();
+        final int timeColumn = inputTimeColumns.get(inputIndex);
         final List<int[]> upsertKeys =
                 inputIndex < inputUpsertKeys.size() ? 
inputUpsertKeys.get(inputIndex) : List.of();
         return new RuntimeTableSemantics(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
index 41c53979067..c35394c7694 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -54,9 +54,7 @@ public class ToChangelogSemanticTests extends 
SemanticTestBase {
                 ToChangelogTestPrograms.DELETION_FLAG,
                 ToChangelogTestPrograms.INVALID_DESCRIPTOR,
                 ToChangelogTestPrograms.INVALID_OP_MAPPING,
-                ToChangelogTestPrograms.OP_MAPPING_REFERENCES_UNSUPPORTED_KIND,
                 ToChangelogTestPrograms.DUPLICATE_ROW_KIND,
-                
ToChangelogTestPrograms.INVALID_PRODUCES_FULL_DELETES_FOR_APPEND_ONLY,
                 ToChangelogTestPrograms.RETRACT_PRODUCES_PARTIAL_DELETES,
                 ToChangelogTestPrograms.UPSERT_PRODUCES_FULL_DELETES,
                 ToChangelogTestPrograms.UPSERT_PRODUCES_PARTIAL_DELETES,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index 909e2f4d1ef..71731109eda 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -573,19 +573,6 @@ public class ToChangelogTestPrograms {
                             "Unknown change operation: 'INVALID_KIND'")
                     .build();
 
-    public static final TableTestProgram 
OP_MAPPING_REFERENCES_UNSUPPORTED_KIND =
-            TableTestProgram.of(
-                            
"to-changelog-op-mapping-references-unsupported-kind",
-                            "fails when op_mapping references a change 
operation the input cannot produce")
-                    .setupTableSource(SIMPLE_SOURCE)
-                    .runFailingSql(
-                            "SELECT * FROM TO_CHANGELOG("
-                                    + "input => TABLE t, "
-                                    + "op_mapping => MAP['INSERT', 'I', 
'DELETE', 'D'])",
-                            ValidationException.class,
-                            "the input table only produces [INSERT] and does 
not produce [DELETE]")
-                    .build();
-
     public static final TableTestProgram DUPLICATE_ROW_KIND =
             TableTestProgram.of(
                             "to-changelog-duplicate-rowkind",
@@ -599,19 +586,6 @@ public class ToChangelogTestPrograms {
                             "Duplicate change operation: 'DELETE'")
                     .build();
 
-    public static final TableTestProgram 
INVALID_PRODUCES_FULL_DELETES_FOR_APPEND_ONLY =
-            TableTestProgram.of(
-                            
"to-changelog-invalid-produces-full-deletes-for-append-only",
-                            "fails when produces_full_deletes=true on an input 
that never emits DELETE rows")
-                    .setupTableSource(SIMPLE_SOURCE)
-                    .runFailingSql(
-                            "SELECT * FROM TO_CHANGELOG("
-                                    + "input => TABLE t, "
-                                    + "produces_full_deletes => true)",
-                            ValidationException.class,
-                            "the input table only produces [INSERT] and never 
emits DELETE rows")
-                    .build();
-
     // 
--------------------------------------------------------------------------------------------
     // Full vs partial deletes matrix (input kind x PARTITION BY x 
produces_full_deletes)
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
index 66162e205b9..b2e88dee1dd 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.functions.ptf;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
@@ -46,7 +45,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP_MAPPING;
 import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_PRODUCES_FULL_DELETES;
@@ -96,17 +94,14 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         final Map<String, String> opMapping =
                 callContext.getArgumentValue(ARG_OP_MAPPING, 
Map.class).orElse(null);
         this.rawOpMap = buildOpMap(opMapping);
-        if (opMapping != null) {
-            validateOpMap(this.rawOpMap, tableSemantics);
-        }
-        final boolean producesFullDeletesArg =
+
+        this.producesFullDelete =
                 callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, 
Boolean.class).orElse(true);
         final boolean isExplicit = 
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
-        validateProducesFullDeletes(producesFullDeletesArg, isExplicit, 
tableSemantics);
+        validateProducesPartialDeletes(producesFullDelete, isExplicit, 
tableSemantics);
 
         this.outputIndices = 
ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics);
         this.inputRowType = (RowType) 
tableSemantics.dataType().getLogicalType();
-        this.producesFullDelete = producesFullDeletesArg;
         this.upsertKeyColumn =
                 computeUpsertKeyColumn(
                         this.outputIndices,
@@ -164,70 +159,27 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         return result;
     }
 
-    /**
-     * Rejects user-provided mappings that reference change operations the 
input changelog cannot
-     * produce. Without this check the extra entries are dead code: the 
corresponding rows never
-     * arrive. E.g., if the input is INSERT-only, then UPDATE_BEFORE and 
DELETE mappings are
-     * ignored, which is likely a user mistake.
-     *
-     * <p>Lives here rather than in the input type strategy because {@link
-     * TableSemantics#changelogMode()} returns empty during type inference and 
is only populated at
-     * specialization time, which is when this constructor runs.
-     */
-    private static void validateOpMap(
-            final Map<RowKind, String> mapping, final TableSemantics 
tableSemantics) {
-        final ChangelogMode inputMode = 
tableSemantics.changelogMode().orElse(null);
-        if (inputMode == null) {
-            return;
-        }
-        final List<RowKind> unsupported =
-                mapping.keySet().stream()
-                        .filter(kind -> !inputMode.contains(kind))
-                        .collect(Collectors.toList());
-        if (!unsupported.isEmpty()) {
-            throw new ValidationException(
-                    String.format(
-                            "Invalid 'op_mapping' for TO_CHANGELOG: the input 
table only produces "
-                                    + "%s and does not produce %s. Remove 
those entries from the "
-                                    + "mapping.",
-                            inputMode.getContainedKinds(), unsupported));
-        }
-    }
-
     /**
      * Validates an explicit {@code produces_full_deletes} argument against 
the input.
      *
-     * <p>For {@code produces_full_deletes=true}, the input changelog must 
emit DELETE rows;
-     * otherwise the parameter is dead. For {@code 
produces_full_deletes=false}, the input must
-     * declare an upsert key or the call must use {@code PARTITION BY}; 
otherwise the function has
-     * no identifying columns to preserve when nulling the rest.
+     * <p>For {@code produces_full_deletes=false}, the input must declare an 
upsert key or the call
+     * must use {@code PARTITION BY}; otherwise the function has no 
identifying columns to preserve
+     * when nulling the rest.
      *
-     * <p>No validation runs when the argument is absent, since the default 
(full deletes) is safe
-     * for any input.
+     * <p>No validation runs when the argument is absent or is set to true 
(default).
      *
      * <p>Lives here rather than in the input type strategy because {@link
      * TableSemantics#changelogMode()} and {@link 
TableSemantics#upsertKeyColumns()} are only
      * populated at specialization time.
      */
-    private static void validateProducesFullDeletes(
+    private static void validateProducesPartialDeletes(
             final boolean producesFullDeletesArg,
             final boolean isExplicit,
             final TableSemantics tableSemantics) {
-        if (!isExplicit) {
-            return;
-        }
-        if (producesFullDeletesArg) {
-            final ChangelogMode inputMode = 
tableSemantics.changelogMode().orElse(null);
-            if (inputMode != null && !inputMode.contains(RowKind.DELETE)) {
-                throw new ValidationException(
-                        String.format(
-                                "Invalid 'produces_full_deletes' for 
TO_CHANGELOG: the input "
-                                        + "table only produces %s and never 
emits DELETE rows. "
-                                        + "Remove the 'produces_full_deletes' 
argument.",
-                                inputMode.getContainedKinds()));
-            }
+        if (!isExplicit || producesFullDeletesArg) {
             return;
         }
+
         final boolean hasPartitionBy = 
tableSemantics.partitionByColumns().length > 0;
         final boolean hasUpsertKey = 
!tableSemantics.upsertKeyColumns().isEmpty();
         if (!hasPartitionBy && !hasUpsertKey) {

Reply via email to