This is an automated email from the ASF dual-hosted git repository. gustavodemorais pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 340fbf879fd3e03dea7b37f7c28c6c404db21f65 Author: Ramin Gharib <[email protected]> AuthorDate: Fri May 22 17:06:54 2026 +0200 [FLINK-39636][table] Refactor ToChangelogTypeStrategy Aligns the structure of ToChangelogTypeStrategy with FromChangelogTypeStrategy. Pure refactor, no behavior change. --- .../strategies/ToChangelogTypeStrategy.java | 182 +++++++++++++-------- .../ToChangelogInputTypeStrategyTest.java | 146 +++++++++++++++++ .../exec/stream/ToChangelogSemanticTests.java | 1 - .../nodes/exec/stream/ToChangelogTestPrograms.java | 19 --- .../runtime/functions/ptf/ToChangelogFunction.java | 20 +-- 5 files changed, 267 insertions(+), 101 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java index 8ff8101dc5e..d406875aa2f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java @@ -22,15 +22,10 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes.Field; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.inference.ArgumentCount; import org.apache.flink.table.types.inference.CallContext; -import org.apache.flink.table.types.inference.ConstantArgumentCount; import org.apache.flink.table.types.inference.InputTypeStrategy; -import org.apache.flink.table.types.inference.Signature; -import org.apache.flink.table.types.inference.Signature.Argument; import org.apache.flink.table.types.inference.TypeStrategy; import org.apache.flink.types.ColumnList; @@ -46,7 +41,13 @@ import java.util.Set; @Internal public final class ToChangelogTypeStrategy { - private static final String DEFAULT_OP_COLUMN_NAME = "op"; + // Positional argument indexes for TO_CHANGELOG. Must match the order of StaticArguments + // registered in BuiltInFunctionDefinitions#TO_CHANGELOG; changing one without the other + // silently breaks argument resolution. + public static final int ARG_TABLE = 0; + public static final int ARG_OP = 1; + public static final int ARG_OP_MAPPING = 2; + public static final int ARG_PRODUCES_FULL_DELETES = 3; private static final Set<String> VALID_ROW_KIND_NAMES = Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE"); @@ -56,34 +57,12 @@ public final class ToChangelogTypeStrategy { // -------------------------------------------------------------------------------------------- public static final InputTypeStrategy INPUT_TYPE_STRATEGY = - new InputTypeStrategy() { - @Override - public ArgumentCount getArgumentCount() { - return ConstantArgumentCount.between(1, 4); - } - + new ValidationOnlyInputTypeStrategy() { @Override public Optional<List<DataType>> inferInputTypes( final CallContext callContext, final boolean throwOnFailure) { return validateInputs(callContext, throwOnFailure); } - - @Override - public List<Signature> getExpectedSignatures(final FunctionDefinition definition) { - return List.of( - Signature.of(Argument.of("input", "TABLE")), - Signature.of( - Argument.of("input", "TABLE"), Argument.of("op", "DESCRIPTOR")), - Signature.of( - Argument.of("input", "TABLE"), - Argument.of("op", "DESCRIPTOR"), - Argument.of("op_mapping", "MAP<STRING, STRING>")), - Signature.of( - Argument.of("input", "TABLE"), - Argument.of("op", "DESCRIPTOR"), - Argument.of("op_mapping", "MAP<STRING, STRING>"), - Argument.of("produces_full_deletes", "BOOLEAN"))); - } }; // -------------------------------------------------------------------------------------------- @@ -94,20 +73,16 @@ public final class ToChangelogTypeStrategy { callContext -> { final TableSemantics semantics = callContext - .getTableSemantics(0) + .getTableSemantics(ARG_TABLE) .orElseThrow( () -> new ValidationException( "First argument must be a table for TO_CHANGELOG.")); - final String opColumnName = resolveOpColumnName(callContext); - final List<Field> inputFields = DataType.getFields(semantics.dataType()); - final int[] outputIndices = - ChangelogTypeStrategyUtils.computeOutputIndices(semantics); + final String opColumnName = + ChangelogTypeStrategyUtils.resolveOpColumnName(callContext); - final List<Field> outputFields = new ArrayList<>(); - outputFields.add(DataTypes.FIELD(opColumnName, DataTypes.STRING())); - Arrays.stream(outputIndices).mapToObj(inputFields::get).forEach(outputFields::add); + final List<Field> outputFields = buildOutputFields(semantics, opColumnName); return Optional.of(DataTypes.ROW(outputFields).notNull()); }; @@ -118,45 +93,68 @@ public final class ToChangelogTypeStrategy { private static Optional<List<DataType>> validateInputs( final CallContext callContext, final boolean throwOnFailure) { - final boolean isMissingTableArg = callContext.getTableSemantics(0).isEmpty(); - if (isMissingTableArg) { + Optional<List<DataType>> error; + + error = validateTableArg(callContext, throwOnFailure); + if (error.isPresent()) { + return error; + } + + error = validateOpDescriptor(callContext, throwOnFailure); + if (error.isPresent()) { + return error; + } + + error = validateOpMapping(callContext, throwOnFailure); + if (error.isPresent()) { + return error; + } + + error = validateProducesFullDeletes(callContext, throwOnFailure); + if (error.isPresent()) { + return error; + } + + return Optional.of(callContext.getArgumentDataTypes()); + } + + private static Optional<List<DataType>> validateTableArg( + final CallContext callContext, final boolean throwOnFailure) { + if (callContext.getTableSemantics(ARG_TABLE).isEmpty()) { return callContext.fail( throwOnFailure, "First argument must be a table for TO_CHANGELOG."); } + return Optional.empty(); + } - final Optional<ColumnList> opDescriptor = callContext.getArgumentValue(1, ColumnList.class); - final boolean hasInvalidOpDescriptor = - opDescriptor.isPresent() && opDescriptor.get().getNames().size() != 1; - if (hasInvalidOpDescriptor) { + private static Optional<List<DataType>> validateOpDescriptor( + final CallContext callContext, final boolean throwOnFailure) { + final Optional<ColumnList> opDescriptor = + callContext.getArgumentValue(ARG_OP, ColumnList.class); + if (opDescriptor.isPresent() && opDescriptor.get().getNames().size() != 1) { return callContext.fail( throwOnFailure, "The descriptor for argument 'op' must contain exactly one column name."); } + return Optional.empty(); + } - final boolean hasMappingArgProvided = !callContext.isArgumentNull(2); - final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2); + /** Validates op_mapping is a constant literal and that its keys are well-formed. */ + @SuppressWarnings("rawtypes") + private static Optional<List<DataType>> validateOpMapping( + final CallContext callContext, final boolean throwOnFailure) { + final boolean hasMappingArgProvided = !callContext.isArgumentNull(ARG_OP_MAPPING); + final boolean isMappingArgLiteral = callContext.isArgumentLiteral(ARG_OP_MAPPING); if (hasMappingArgProvided && !isMappingArgLiteral) { return callContext.fail( throwOnFailure, "The 'op_mapping' argument must be a constant MAP literal."); } - final Optional<Map> opMapping = callContext.getArgumentValue(2, Map.class); + final Optional<Map> opMapping = callContext.getArgumentValue(ARG_OP_MAPPING, Map.class); if (opMapping.isPresent()) { - final Optional<List<DataType>> validationError = - validateOpMappingKeys(callContext, opMapping.get(), throwOnFailure); - if (validationError.isPresent()) { - return validationError; - } + return validateOpMappingKeys(callContext, opMapping.get(), throwOnFailure); } - - final boolean hasProducesFullDeletesArg = !callContext.isArgumentNull(3); - if (hasProducesFullDeletesArg && !callContext.isArgumentLiteral(3)) { - return callContext.fail( - throwOnFailure, - "The 'produces_full_deletes' argument must be a constant BOOLEAN literal."); - } - - return Optional.of(callContext.getArgumentDataTypes()); + return Optional.empty(); } /** @@ -199,12 +197,64 @@ public final class ToChangelogTypeStrategy { return Optional.empty(); } - private static String resolveOpColumnName(final CallContext callContext) { - return callContext - .getArgumentValue(1, ColumnList.class) - .filter(cl -> !cl.getNames().isEmpty()) - .map(cl -> cl.getNames().get(0)) - .orElse(DEFAULT_OP_COLUMN_NAME); + @SuppressWarnings("rawtypes") + private static Optional<List<DataType>> validateProducesFullDeletes( + final CallContext callContext, final boolean throwOnFailure) { + final boolean hasArgProvided = !callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES); + if (hasArgProvided && !callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) { + return callContext.fail( + throwOnFailure, + "The 'produces_full_deletes' argument must be a constant BOOLEAN literal."); + } + final boolean producesFullDeletes = + callContext + .getArgumentValue(ARG_PRODUCES_FULL_DELETES, Boolean.class) + .orElse(false); + if (!producesFullDeletes) { + return Optional.empty(); + } + // The check against the input changelog mode lives in the function constructor since + // TableSemantics#changelogMode() returns empty here at type-inference time. The mapping + // check below only needs the literal op_mapping argument, so it lives here. + final Optional<Map> opMapping = callContext.getArgumentValue(ARG_OP_MAPPING, Map.class); + if (opMapping.isPresent() && !mapsDelete(opMapping.get())) { + return callContext.fail( + throwOnFailure, + "Invalid 'produces_full_deletes' for TO_CHANGELOG: the active 'op_mapping' " + + "does not map DELETE rows, so no DELETE rows are emitted. Remove " + + "the 'produces_full_deletes' argument or add a DELETE entry to " + + "'op_mapping'."); + } + return Optional.empty(); + } + + /** + * Returns {@code true} when at least one {@code op_mapping} key references {@code DELETE}. + * Keys may be comma-separated (e.g., {@code "INSERT, DELETE"}) per the user-facing contract. + */ + @SuppressWarnings("rawtypes") + private static boolean mapsDelete(final Map opMapping) { + for (final Object key : opMapping.keySet()) { + if (!(key instanceof String)) { + continue; + } + for (final String rawName : ((String) key).split(",")) { + if ("DELETE".equals(rawName.trim())) { + return true; + } + } + } + return false; + } + + private static List<Field> buildOutputFields( + final TableSemantics semantics, final String opColumnName) { + final List<Field> inputFields = DataType.getFields(semantics.dataType()); + final int[] outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(semantics); + final List<Field> outputFields = new ArrayList<>(); + outputFields.add(DataTypes.FIELD(opColumnName, DataTypes.STRING())); + Arrays.stream(outputIndices).mapToObj(inputFields::get).forEach(outputFields::add); + return outputFields; } private ToChangelogTypeStrategy() {} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java new file mode 100644 index 00000000000..e9b20a7a5ba --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase; +import org.apache.flink.table.types.inference.utils.TableSemanticsMock; +import org.apache.flink.types.ColumnList; + +import java.util.Map; +import java.util.stream.Stream; + +import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TO_CHANGELOG_INPUT_TYPE_STRATEGY; + +/** Tests for {@link ToChangelogTypeStrategy#INPUT_TYPE_STRATEGY}. */ +class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { + + private static final DataType TABLE_TYPE = + DataTypes.ROW( + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("score", DataTypes.BIGINT())); + + private static final DataType DESCRIPTOR_TYPE = DataTypes.DESCRIPTOR(); + + private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()); + + private static final DataType BOOLEAN_TYPE = DataTypes.BOOLEAN(); + + @Override + protected Stream<TestSpec> testData() { + return Stream.of( + // Valid: produces_full_deletes=true with default op_mapping (includes DELETE) + TestSpec.forStrategy( + "Valid produces_full_deletes=true with default mapping", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of("op")) + .calledWithLiteralAt(2, null) + .calledWithLiteralAt(3, true) + .expectArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), + + // Valid: produces_full_deletes=true with op_mapping that includes DELETE + TestSpec.forStrategy( + "Valid produces_full_deletes=true with explicit DELETE mapping", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of("op")) + .calledWithLiteralAt(2, Map.of("INSERT", "I", "DELETE", "D")) + .calledWithLiteralAt(3, true) + .expectArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), + + // Valid: produces_full_deletes=true with comma-separated DELETE key + TestSpec.forStrategy( + "Valid produces_full_deletes=true with comma-separated DELETE", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of("op")) + .calledWithLiteralAt(2, Map.of("INSERT, DELETE", "X")) + .calledWithLiteralAt(3, true) + .expectArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), + + // Valid: produces_full_deletes=false with op_mapping that omits DELETE + TestSpec.forStrategy( + "Valid produces_full_deletes=false with no DELETE in mapping", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of("op")) + .calledWithLiteralAt(2, Map.of("INSERT, UPDATE_AFTER", "X")) + .calledWithLiteralAt(3, false) + .expectArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), + + // Error: produces_full_deletes=true with op_mapping that strips DELETE + TestSpec.forStrategy( + "produces_full_deletes=true rejected when op_mapping omits DELETE", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of("op")) + .calledWithLiteralAt(2, Map.of("INSERT, UPDATE_AFTER", "X")) + .calledWithLiteralAt(3, true) + .expectErrorMessage( + "Invalid 'produces_full_deletes' for TO_CHANGELOG: the active " + + "'op_mapping' does not map DELETE rows"), + + // Error: multi-column descriptor + TestSpec.forStrategy( + "Descriptor with multiple columns", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of("a", "b")) + .expectErrorMessage("must contain exactly one column name"), + + // Error: invalid RowKind in op_mapping key + TestSpec.forStrategy( + "Invalid RowKind in mapping key", TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of("op")) + .calledWithLiteralAt(2, Map.of("INVALID_KIND", "X")) + .expectErrorMessage("Unknown change operation: 'INVALID_KIND'"), + + // Error: duplicate RowKind across entries + TestSpec.forStrategy( + "Duplicate RowKind in mapping keys", + TO_CHANGELOG_INPUT_TYPE_STRATEGY) + .calledWithArgumentTypes( + TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(1, ColumnList.of("op")) + .calledWithLiteralAt(2, Map.of("INSERT, DELETE", "A", "DELETE", "B")) + .expectErrorMessage("Duplicate change operation: 'DELETE'")); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java index 762e08b3538..b3376b3a346 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java @@ -57,7 +57,6 @@ public class ToChangelogSemanticTests extends SemanticTestBase { ToChangelogTestPrograms.OP_MAPPING_REFERENCES_UNSUPPORTED_KIND, ToChangelogTestPrograms.DUPLICATE_ROW_KIND, ToChangelogTestPrograms.PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT, - ToChangelogTestPrograms.PRODUCES_FULL_DELETES_WITHOUT_DELETE_IN_OP_MAPPING, ToChangelogTestPrograms.ROW_SEM_PARTIAL_DELETES, ToChangelogTestPrograms.ROW_SEM_FORCE_FULL_DELETES, ToChangelogTestPrograms.SET_SEM_PARTIAL_DELETES, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java index f04d30d1d99..5126014b9a3 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java @@ -613,25 +613,6 @@ public class ToChangelogTestPrograms { "the input table only produces [INSERT] and never emits DELETE rows") .build(); - public static final TableTestProgram PRODUCES_FULL_DELETES_WITHOUT_DELETE_IN_OP_MAPPING = - TableTestProgram.of( - "to-changelog-produces-full-deletes-without-delete-in-op-mapping", - "fails when produces_full_deletes=true but the active op_mapping strips DELETE") - .setupTableSource( - SourceTestStep.newBuilder("t") - .addSchema( - "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") - .addMode(ChangelogMode.all()) - .producedValues(Row.ofKind(RowKind.INSERT, "Alice", 10L)) - .build()) - .runFailingSql( - "SELECT * FROM TO_CHANGELOG(" - + "input => TABLE t, " - + "op_mapping => MAP['INSERT, UPDATE_AFTER', 'X'], " - + "produces_full_deletes => true)", - ValidationException.class, - "the active 'op_mapping' does not map DELETE rows") - .build(); // -------------------------------------------------------------------------------------------- // Row semantics x delete handling matrix diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index 0110475ef8d..d8874a2be16 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -88,7 +88,7 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> { } final boolean producesFullDeletesArg = callContext.getArgumentValue(3, Boolean.class).orElse(false); - validateProducesFullDeletes(producesFullDeletesArg, this.rawOpMap, tableSemantics); + validateProducesFullDeletes(producesFullDeletesArg, tableSemantics); this.outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics); this.producesFullDelete = resolveProducesFullDelete(producesFullDeletesArg, tableSemantics); @@ -172,18 +172,15 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> { } /** - * Rejects {@code produces_full_deletes=true} when the input changelog cannot produce DELETE - * rows (either the input mode does not contain DELETE, or the active {@code op_mapping} strips - * it). The parameter is then dead and likely a user mistake. + * Rejects {@code produces_full_deletes=true} when the input changelog never emits DELETE rows. * * <p>Lives here rather than in the input type strategy because {@link * TableSemantics#changelogMode()} returns empty during type inference and is only populated at - * specialization time, which is when this constructor runs. + * specialization time. The complementary check against the literal {@code op_mapping} + * argument runs earlier in {@code ToChangelogTypeStrategy}. */ private static void validateProducesFullDeletes( - final boolean producesFullDeletesArg, - final Map<RowKind, String> mapping, - final TableSemantics tableSemantics) { + final boolean producesFullDeletesArg, final TableSemantics tableSemantics) { if (!producesFullDeletesArg) { return; } @@ -199,13 +196,6 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> { + "'produces_full_deletes' argument.", inputMode.getContainedKinds())); } - if (!mapping.containsKey(RowKind.DELETE)) { - throw new ValidationException( - "Invalid 'produces_full_deletes' for TO_CHANGELOG: the active 'op_mapping' " - + "does not map DELETE rows, so no DELETE rows are emitted. Remove " - + "the 'produces_full_deletes' argument or add a DELETE entry to " - + "'op_mapping'."); - } } public void eval(
