This is an automated email from the ASF dual-hosted git repository.

gustavodemorais pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 340fbf879fd3e03dea7b37f7c28c6c404db21f65
Author: Ramin Gharib <[email protected]>
AuthorDate: Fri May 22 17:06:54 2026 +0200

    [FLINK-39636][table] Refactor ToChangelogTypeStrategy
    
    Aligns the structure of ToChangelogTypeStrategy with 
FromChangelogTypeStrategy. Pure refactor, no behavior change.
---
 .../strategies/ToChangelogTypeStrategy.java        | 182 +++++++++++++--------
 .../ToChangelogInputTypeStrategyTest.java          | 146 +++++++++++++++++
 .../exec/stream/ToChangelogSemanticTests.java      |   1 -
 .../nodes/exec/stream/ToChangelogTestPrograms.java |  19 ---
 .../runtime/functions/ptf/ToChangelogFunction.java |  20 +--
 5 files changed, 267 insertions(+), 101 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
index 8ff8101dc5e..d406875aa2f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
@@ -22,15 +22,10 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.DataTypes.Field;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.TableSemantics;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.ArgumentCount;
 import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.ConstantArgumentCount;
 import org.apache.flink.table.types.inference.InputTypeStrategy;
-import org.apache.flink.table.types.inference.Signature;
-import org.apache.flink.table.types.inference.Signature.Argument;
 import org.apache.flink.table.types.inference.TypeStrategy;
 import org.apache.flink.types.ColumnList;
 
@@ -46,7 +41,13 @@ import java.util.Set;
 @Internal
 public final class ToChangelogTypeStrategy {
 
-    private static final String DEFAULT_OP_COLUMN_NAME = "op";
+    // Positional argument indexes for TO_CHANGELOG. Must match the order of 
StaticArguments
+    // registered in BuiltInFunctionDefinitions#TO_CHANGELOG; changing one 
without the other
+    // silently breaks argument resolution.
+    public static final int ARG_TABLE = 0;
+    public static final int ARG_OP = 1;
+    public static final int ARG_OP_MAPPING = 2;
+    public static final int ARG_PRODUCES_FULL_DELETES = 3;
 
     private static final Set<String> VALID_ROW_KIND_NAMES =
             Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
@@ -56,34 +57,12 @@ public final class ToChangelogTypeStrategy {
     // 
--------------------------------------------------------------------------------------------
 
     public static final InputTypeStrategy INPUT_TYPE_STRATEGY =
-            new InputTypeStrategy() {
-                @Override
-                public ArgumentCount getArgumentCount() {
-                    return ConstantArgumentCount.between(1, 4);
-                }
-
+            new ValidationOnlyInputTypeStrategy() {
                 @Override
                 public Optional<List<DataType>> inferInputTypes(
                         final CallContext callContext, final boolean 
throwOnFailure) {
                     return validateInputs(callContext, throwOnFailure);
                 }
-
-                @Override
-                public List<Signature> getExpectedSignatures(final 
FunctionDefinition definition) {
-                    return List.of(
-                            Signature.of(Argument.of("input", "TABLE")),
-                            Signature.of(
-                                    Argument.of("input", "TABLE"), 
Argument.of("op", "DESCRIPTOR")),
-                            Signature.of(
-                                    Argument.of("input", "TABLE"),
-                                    Argument.of("op", "DESCRIPTOR"),
-                                    Argument.of("op_mapping", "MAP<STRING, 
STRING>")),
-                            Signature.of(
-                                    Argument.of("input", "TABLE"),
-                                    Argument.of("op", "DESCRIPTOR"),
-                                    Argument.of("op_mapping", "MAP<STRING, 
STRING>"),
-                                    Argument.of("produces_full_deletes", 
"BOOLEAN")));
-                }
             };
 
     // 
--------------------------------------------------------------------------------------------
@@ -94,20 +73,16 @@ public final class ToChangelogTypeStrategy {
             callContext -> {
                 final TableSemantics semantics =
                         callContext
-                                .getTableSemantics(0)
+                                .getTableSemantics(ARG_TABLE)
                                 .orElseThrow(
                                         () ->
                                                 new ValidationException(
                                                         "First argument must 
be a table for TO_CHANGELOG."));
 
-                final String opColumnName = resolveOpColumnName(callContext);
-                final List<Field> inputFields = 
DataType.getFields(semantics.dataType());
-                final int[] outputIndices =
-                        
ChangelogTypeStrategyUtils.computeOutputIndices(semantics);
+                final String opColumnName =
+                        
ChangelogTypeStrategyUtils.resolveOpColumnName(callContext);
 
-                final List<Field> outputFields = new ArrayList<>();
-                outputFields.add(DataTypes.FIELD(opColumnName, 
DataTypes.STRING()));
-                
Arrays.stream(outputIndices).mapToObj(inputFields::get).forEach(outputFields::add);
+                final List<Field> outputFields = buildOutputFields(semantics, 
opColumnName);
 
                 return Optional.of(DataTypes.ROW(outputFields).notNull());
             };
@@ -118,45 +93,68 @@ public final class ToChangelogTypeStrategy {
 
     private static Optional<List<DataType>> validateInputs(
             final CallContext callContext, final boolean throwOnFailure) {
-        final boolean isMissingTableArg = 
callContext.getTableSemantics(0).isEmpty();
-        if (isMissingTableArg) {
+        Optional<List<DataType>> error;
+
+        error = validateTableArg(callContext, throwOnFailure);
+        if (error.isPresent()) {
+            return error;
+        }
+
+        error = validateOpDescriptor(callContext, throwOnFailure);
+        if (error.isPresent()) {
+            return error;
+        }
+
+        error = validateOpMapping(callContext, throwOnFailure);
+        if (error.isPresent()) {
+            return error;
+        }
+
+        error = validateProducesFullDeletes(callContext, throwOnFailure);
+        if (error.isPresent()) {
+            return error;
+        }
+
+        return Optional.of(callContext.getArgumentDataTypes());
+    }
+
+    private static Optional<List<DataType>> validateTableArg(
+            final CallContext callContext, final boolean throwOnFailure) {
+        if (callContext.getTableSemantics(ARG_TABLE).isEmpty()) {
             return callContext.fail(
                     throwOnFailure, "First argument must be a table for 
TO_CHANGELOG.");
         }
+        return Optional.empty();
+    }
 
-        final Optional<ColumnList> opDescriptor = 
callContext.getArgumentValue(1, ColumnList.class);
-        final boolean hasInvalidOpDescriptor =
-                opDescriptor.isPresent() && 
opDescriptor.get().getNames().size() != 1;
-        if (hasInvalidOpDescriptor) {
+    private static Optional<List<DataType>> validateOpDescriptor(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final Optional<ColumnList> opDescriptor =
+                callContext.getArgumentValue(ARG_OP, ColumnList.class);
+        if (opDescriptor.isPresent() && opDescriptor.get().getNames().size() 
!= 1) {
             return callContext.fail(
                     throwOnFailure,
                     "The descriptor for argument 'op' must contain exactly one 
column name.");
         }
+        return Optional.empty();
+    }
 
-        final boolean hasMappingArgProvided = !callContext.isArgumentNull(2);
-        final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2);
+    /** Validates op_mapping is a constant literal and that its keys are 
well-formed. */
+    @SuppressWarnings("rawtypes")
+    private static Optional<List<DataType>> validateOpMapping(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final boolean hasMappingArgProvided = 
!callContext.isArgumentNull(ARG_OP_MAPPING);
+        final boolean isMappingArgLiteral = 
callContext.isArgumentLiteral(ARG_OP_MAPPING);
         if (hasMappingArgProvided && !isMappingArgLiteral) {
             return callContext.fail(
                     throwOnFailure, "The 'op_mapping' argument must be a 
constant MAP literal.");
         }
 
-        final Optional<Map> opMapping = callContext.getArgumentValue(2, 
Map.class);
+        final Optional<Map> opMapping = 
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
         if (opMapping.isPresent()) {
-            final Optional<List<DataType>> validationError =
-                    validateOpMappingKeys(callContext, opMapping.get(), 
throwOnFailure);
-            if (validationError.isPresent()) {
-                return validationError;
-            }
+            return validateOpMappingKeys(callContext, opMapping.get(), 
throwOnFailure);
         }
-
-        final boolean hasProducesFullDeletesArg = 
!callContext.isArgumentNull(3);
-        if (hasProducesFullDeletesArg && !callContext.isArgumentLiteral(3)) {
-            return callContext.fail(
-                    throwOnFailure,
-                    "The 'produces_full_deletes' argument must be a constant 
BOOLEAN literal.");
-        }
-
-        return Optional.of(callContext.getArgumentDataTypes());
+        return Optional.empty();
     }
 
     /**
@@ -199,12 +197,64 @@ public final class ToChangelogTypeStrategy {
         return Optional.empty();
     }
 
-    private static String resolveOpColumnName(final CallContext callContext) {
-        return callContext
-                .getArgumentValue(1, ColumnList.class)
-                .filter(cl -> !cl.getNames().isEmpty())
-                .map(cl -> cl.getNames().get(0))
-                .orElse(DEFAULT_OP_COLUMN_NAME);
+    @SuppressWarnings("rawtypes")
+    private static Optional<List<DataType>> validateProducesFullDeletes(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final boolean hasArgProvided = 
!callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES);
+        if (hasArgProvided && 
!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "The 'produces_full_deletes' argument must be a constant 
BOOLEAN literal.");
+        }
+        final boolean producesFullDeletes =
+                callContext
+                        .getArgumentValue(ARG_PRODUCES_FULL_DELETES, 
Boolean.class)
+                        .orElse(false);
+        if (!producesFullDeletes) {
+            return Optional.empty();
+        }
+        // The check against the input changelog mode lives in the function 
constructor since
+        // TableSemantics#changelogMode() returns empty here at type-inference 
time. The mapping
+        // check below only needs the literal op_mapping argument, so it lives 
here.
+        final Optional<Map> opMapping = 
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
+        if (opMapping.isPresent() && !mapsDelete(opMapping.get())) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "Invalid 'produces_full_deletes' for TO_CHANGELOG: the 
active 'op_mapping' "
+                            + "does not map DELETE rows, so no DELETE rows are 
emitted. Remove "
+                            + "the 'produces_full_deletes' argument or add a 
DELETE entry to "
+                            + "'op_mapping'.");
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Returns {@code true} when at least one {@code op_mapping} key 
references {@code DELETE}.
+     * Keys may be comma-separated (e.g., {@code "INSERT, DELETE"}) per the 
user-facing contract.
+     */
+    @SuppressWarnings("rawtypes")
+    private static boolean mapsDelete(final Map opMapping) {
+        for (final Object key : opMapping.keySet()) {
+            if (!(key instanceof String)) {
+                continue;
+            }
+            for (final String rawName : ((String) key).split(",")) {
+                if ("DELETE".equals(rawName.trim())) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private static List<Field> buildOutputFields(
+            final TableSemantics semantics, final String opColumnName) {
+        final List<Field> inputFields = 
DataType.getFields(semantics.dataType());
+        final int[] outputIndices = 
ChangelogTypeStrategyUtils.computeOutputIndices(semantics);
+        final List<Field> outputFields = new ArrayList<>();
+        outputFields.add(DataTypes.FIELD(opColumnName, DataTypes.STRING()));
+        
Arrays.stream(outputIndices).mapToObj(inputFields::get).forEach(outputFields::add);
+        return outputFields;
     }
 
     private ToChangelogTypeStrategy() {}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java
new file mode 100644
index 00000000000..e9b20a7a5ba
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
+import org.apache.flink.table.types.inference.utils.TableSemanticsMock;
+import org.apache.flink.types.ColumnList;
+
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TO_CHANGELOG_INPUT_TYPE_STRATEGY;
+
+/** Tests for {@link ToChangelogTypeStrategy#INPUT_TYPE_STRATEGY}. */
+class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase {
+
+    private static final DataType TABLE_TYPE =
+            DataTypes.ROW(
+                    DataTypes.FIELD("name", DataTypes.STRING()),
+                    DataTypes.FIELD("score", DataTypes.BIGINT()));
+
+    private static final DataType DESCRIPTOR_TYPE = DataTypes.DESCRIPTOR();
+
+    private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING());
+
+    private static final DataType BOOLEAN_TYPE = DataTypes.BOOLEAN();
+
+    @Override
+    protected Stream<TestSpec> testData() {
+        return Stream.of(
+                // Valid: produces_full_deletes=true with default op_mapping 
(includes DELETE)
+                TestSpec.forStrategy(
+                                "Valid produces_full_deletes=true with default 
mapping",
+                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(2, null)
+                        .calledWithLiteralAt(3, true)
+                        .expectArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE),
+
+                // Valid: produces_full_deletes=true with op_mapping that 
includes DELETE
+                TestSpec.forStrategy(
+                                "Valid produces_full_deletes=true with 
explicit DELETE mapping",
+                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(2, Map.of("INSERT", "I", 
"DELETE", "D"))
+                        .calledWithLiteralAt(3, true)
+                        .expectArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE),
+
+                // Valid: produces_full_deletes=true with comma-separated 
DELETE key
+                TestSpec.forStrategy(
+                                "Valid produces_full_deletes=true with 
comma-separated DELETE",
+                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(2, Map.of("INSERT, DELETE", "X"))
+                        .calledWithLiteralAt(3, true)
+                        .expectArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE),
+
+                // Valid: produces_full_deletes=false with op_mapping that 
omits DELETE
+                TestSpec.forStrategy(
+                                "Valid produces_full_deletes=false with no 
DELETE in mapping",
+                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(2, Map.of("INSERT, UPDATE_AFTER", 
"X"))
+                        .calledWithLiteralAt(3, false)
+                        .expectArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE),
+
+                // Error: produces_full_deletes=true with op_mapping that 
strips DELETE
+                TestSpec.forStrategy(
+                                "produces_full_deletes=true rejected when 
op_mapping omits DELETE",
+                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(2, Map.of("INSERT, UPDATE_AFTER", 
"X"))
+                        .calledWithLiteralAt(3, true)
+                        .expectErrorMessage(
+                                "Invalid 'produces_full_deletes' for 
TO_CHANGELOG: the active "
+                                        + "'op_mapping' does not map DELETE 
rows"),
+
+                // Error: multi-column descriptor
+                TestSpec.forStrategy(
+                                "Descriptor with multiple columns",
+                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("a", "b"))
+                        .expectErrorMessage("must contain exactly one column 
name"),
+
+                // Error: invalid RowKind in op_mapping key
+                TestSpec.forStrategy(
+                                "Invalid RowKind in mapping key", 
TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(2, Map.of("INVALID_KIND", "X"))
+                        .expectErrorMessage("Unknown change operation: 
'INVALID_KIND'"),
+
+                // Error: duplicate RowKind across entries
+                TestSpec.forStrategy(
+                                "Duplicate RowKind in mapping keys",
+                                TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, 
BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(2, Map.of("INSERT, DELETE", "A", 
"DELETE", "B"))
+                        .expectErrorMessage("Duplicate change operation: 
'DELETE'"));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
index 762e08b3538..b3376b3a346 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -57,7 +57,6 @@ public class ToChangelogSemanticTests extends 
SemanticTestBase {
                 ToChangelogTestPrograms.OP_MAPPING_REFERENCES_UNSUPPORTED_KIND,
                 ToChangelogTestPrograms.DUPLICATE_ROW_KIND,
                 
ToChangelogTestPrograms.PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT,
-                
ToChangelogTestPrograms.PRODUCES_FULL_DELETES_WITHOUT_DELETE_IN_OP_MAPPING,
                 ToChangelogTestPrograms.ROW_SEM_PARTIAL_DELETES,
                 ToChangelogTestPrograms.ROW_SEM_FORCE_FULL_DELETES,
                 ToChangelogTestPrograms.SET_SEM_PARTIAL_DELETES,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index f04d30d1d99..5126014b9a3 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -613,25 +613,6 @@ public class ToChangelogTestPrograms {
                             "the input table only produces [INSERT] and never 
emits DELETE rows")
                     .build();
 
-    public static final TableTestProgram 
PRODUCES_FULL_DELETES_WITHOUT_DELETE_IN_OP_MAPPING =
-            TableTestProgram.of(
-                            
"to-changelog-produces-full-deletes-without-delete-in-op-mapping",
-                            "fails when produces_full_deletes=true but the 
active op_mapping strips DELETE")
-                    .setupTableSource(
-                            SourceTestStep.newBuilder("t")
-                                    .addSchema(
-                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
-                                    .addMode(ChangelogMode.all())
-                                    .producedValues(Row.ofKind(RowKind.INSERT, 
"Alice", 10L))
-                                    .build())
-                    .runFailingSql(
-                            "SELECT * FROM TO_CHANGELOG("
-                                    + "input => TABLE t, "
-                                    + "op_mapping => MAP['INSERT, 
UPDATE_AFTER', 'X'], "
-                                    + "produces_full_deletes => true)",
-                            ValidationException.class,
-                            "the active 'op_mapping' does not map DELETE rows")
-                    .build();
 
     // 
--------------------------------------------------------------------------------------------
     // Row semantics x delete handling matrix
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
index 0110475ef8d..d8874a2be16 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
@@ -88,7 +88,7 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         }
         final boolean producesFullDeletesArg =
                 callContext.getArgumentValue(3, Boolean.class).orElse(false);
-        validateProducesFullDeletes(producesFullDeletesArg, this.rawOpMap, 
tableSemantics);
+        validateProducesFullDeletes(producesFullDeletesArg, tableSemantics);
 
         this.outputIndices = 
ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics);
         this.producesFullDelete = 
resolveProducesFullDelete(producesFullDeletesArg, tableSemantics);
@@ -172,18 +172,15 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
     }
 
     /**
-     * Rejects {@code produces_full_deletes=true} when the input changelog 
cannot produce DELETE
-     * rows (either the input mode does not contain DELETE, or the active 
{@code op_mapping} strips
-     * it). The parameter is then dead and likely a user mistake.
+     * Rejects {@code produces_full_deletes=true} when the input changelog 
never emits DELETE rows.
      *
      * <p>Lives here rather than in the input type strategy because {@link
      * TableSemantics#changelogMode()} returns empty during type inference and 
is only populated at
-     * specialization time, which is when this constructor runs.
+     * specialization time. The complementary check against the literal {@code 
op_mapping}
+     * argument runs earlier in {@code ToChangelogTypeStrategy}.
      */
     private static void validateProducesFullDeletes(
-            final boolean producesFullDeletesArg,
-            final Map<RowKind, String> mapping,
-            final TableSemantics tableSemantics) {
+            final boolean producesFullDeletesArg, final TableSemantics 
tableSemantics) {
         if (!producesFullDeletesArg) {
             return;
         }
@@ -199,13 +196,6 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
                                     + "'produces_full_deletes' argument.",
                             inputMode.getContainedKinds()));
         }
-        if (!mapping.containsKey(RowKind.DELETE)) {
-            throw new ValidationException(
-                    "Invalid 'produces_full_deletes' for TO_CHANGELOG: the 
active 'op_mapping' "
-                            + "does not map DELETE rows, so no DELETE rows are 
emitted. Remove "
-                            + "the 'produces_full_deletes' argument or add a 
DELETE entry to "
-                            + "'op_mapping'.");
-        }
     }
 
     public void eval(

Reply via email to