This is an automated email from the ASF dual-hosted git repository. gustavodemorais pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e400c2c221368e98e80d5589cacbcb0b1585ba66 Author: Ramin Gharib <[email protected]> AuthorDate: Thu May 28 15:25:50 2026 +0200 [FLINK-39636][table] Remove unnecessary validation in TO_CHANGELOG Drop two checks that protected against user mistakes the engine can already handle silently: - op_mapping entries that reference change operations the input cannot produce no longer throw. The extra entries are unreachable at runtime, not harmful. - produces_full_deletes=true against an append-only input no longer throws. The argument is a no-op there, equivalent to omitting it. The remaining check covers the only case where the argument changes semantics in a way the function cannot honor: produces_full_deletes=false with no upsert key and no PARTITION BY, leaving nothing to preserve when nulling non-key columns. Rename validateProducesFullDeletes to validateProducesPartialDeletes to reflect the surviving case. Clean up the now-redundant test cases, javadocs, and docs sections. --- .../docs/sql/reference/queries/changelog.md | 32 +++++----- .../apache/flink/table/api/PartitionedTable.java | 4 +- .../java/org/apache/flink/table/api/Table.java | 4 +- .../flink/table/functions/TableSemantics.java | 29 +++++---- .../strategies/ToChangelogTypeStrategy.java | 39 ------------- .../ToChangelogInputTypeStrategyTest.java | 14 ----- .../stream/StreamExecProcessTableFunction.java | 8 +-- .../exec/stream/ToChangelogSemanticTests.java | 2 - .../nodes/exec/stream/ToChangelogTestPrograms.java | 26 --------- .../runtime/functions/ptf/ToChangelogFunction.java | 68 ++++------------------ 10 files changed, 48 insertions(+), 178 deletions(-) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index c91107b2df2..641bb7245f7 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -308,18 +308,18 @@ SELECT * FROM TO_CHANGELOG( | `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located and run in the same operator instance. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key should match or be a subset of the upsert key of the subquery. | | `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | | `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. | -| `produces_full_deletes` | No | A `BOOLEAN` literal that controls how DELETE rows are emitted. When `true` (default), DELETE rows carry all columns, the full image. When `false`, only the identifying key columns are preserved and the rest are nulled. See [Full vs partial deletes](#full-vs-partial-deletes) for more details. | +| `produces_full_deletes` | No | A `BOOLEAN` literal that controls how DELETE rows are emitted. When `true` (default), DELETE rows carry all columns, the full image. When `false`, only the identifying key columns are preserved and the rest are nulled. See [Full vs partial deletes](#full-vs-partial-deletes) for more details. | #### Default op_mapping When `op_mapping` is omitted, all four change operations are mapped to their standard names: | Change Operation | Output value | -|:----------------|:------------------| -| INSERT | `'INSERT'` | -| UPDATE_BEFORE | `'UPDATE_BEFORE'` | -| UPDATE_AFTER | `'UPDATE_AFTER'` | -| DELETE | `'DELETE'` | +|:-----------------|:------------------| +| INSERT | `'INSERT'` | +| UPDATE_BEFORE | `'UPDATE_BEFORE'` | +| UPDATE_AFTER | `'UPDATE_AFTER'` | +| DELETE | `'DELETE'` | ### Output Schema @@ -406,26 +406,22 @@ An **upsert key** is a column or set of columns that uniquely identifies a row a The planner derives the upsert key from the input table: * A declared `PRIMARY KEY` on the source table when reading directly. -* The grouping columns of an upstream `GROUP BY <key>`. +* The grouping columns of an upstream `GROUP BY <key>` or `PARTITION BY <key>`. * The keys propagated by operators that preserve them (e.g. lookup joins, calc-projections that keep the key columns). -When no upsert key can be derived (e.g. a plain append-only source with no key constraint and no grouping upstream), the input has no row identity and downstream operators must treat it as append-only or fall back to retract semantics. - -`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve when emitting partial DELETE rows. See [Full vs partial deletes](#full-vs-partial-deletes) below. - #### Full vs partial deletes The `produces_full_deletes` argument controls how DELETE rows are emitted and what the planner requires from the input. The matrix below shows each combination with `PARTITION BY` (set semantics) and without (row semantics). When `false`, the function relies on the input table's [upsert key](#upsert-key) to decide which columns to preserve. ##### `produces_full_deletes => true` (default) -The planner requires fully-populated DELETE rows on the input. For upsert sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted upstream to materialize the full pre-image from state. For sources that already emit a full pre-image (e.g. retract), the flag is a no-op. The function then passes the input row through unchanged on DELETE. +The planner requires fully-populated DELETE rows on the input. For upsert sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted upstream to calculate the full pre-image from state. For sources that already emit a full pre-image (e.g. retract), the flag is a no-op. The function then passes the input row through unchanged on DELETE. **Row semantics** (no `PARTITION BY`): ```sql --- Upsert source: -D[id:5] (key-only). --- ChangelogNormalize materializes the full pre-image from state. +-- Upsert source: -D[id:5, name: null] (key-only). +-- ChangelogNormalize calculates the full pre-image from state. -- Output: +I[op:'DELETE', id:5, name:'Alice'] SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source) @@ -438,8 +434,8 @@ SELECT * FROM TO_CHANGELOG(input => TABLE retract_source) **Set semantics** (`PARTITION BY`): ```sql --- Upsert source: -D[id:5] (key-only). --- ChangelogNormalize materializes the full pre-image from state. +-- Upsert source: -D[id:5, name: null] (key-only). +-- ChangelogNormalize calculates the full pre-image from state. -- Output: +I[id:5, op:'DELETE', name:'Alice'] SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id) @@ -451,7 +447,7 @@ SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id) ##### `produces_full_deletes => false` -The planner skips `ChangelogNormalize` and the function emits partial DELETE rows. This avoids the stateful normalization operator for upsert sources (e.g. Kafka compacted topics) where the full pre-image is not needed downstream. Requires an [upsert key](#upsert-key) to be present for the input table (row semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected with a validation error. +The planner skips `ChangelogNormalize` and the function emits partial DELETE rows. This avoids the stateful normalization operator for upsert sources (e.g. Kafka compacted topics) where the full pre-image is not needed downstream. This requires an [upsert key](#upsert-key) to be present for the input table (row semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected with a validation error. **Row semantics** (no `PARTITION BY`): the function preserves the planner-derived upsert key columns on DELETE rows and nulls the rest. The upsert key is typically a declared `PRIMARY KEY` when directly reading from a source or the key provided in a `GROUP BY <key>`. @@ -465,7 +461,7 @@ SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes = SELECT * FROM TO_CHANGELOG(input => TABLE retract_source, produces_full_deletes => false) ``` -**Set semantics** (`PARTITION BY`): the function preserves the partition key and nulls every non-partition-key column on DELETE rows. The key used as the partition-key column should be the unique key that will be used as the record identifier. This matches the shape expected by upsert sinks and Kafka compacted topics. +**Set semantics** (`PARTITION BY`): the function preserves the partition key and nulls every non-partition-key column on DELETE rows. The key used as the partition-key column should be the unique key that will be used as the record identifier. This matches the shape expected by upsert sinks such as Kafka compacted topics. ```sql -- Upsert source: -D[id:5] (key-only). diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java index 401317d125d..ecf58fbe9b3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java @@ -207,9 +207,7 @@ public interface PartitionedTable { * * // Opt out of full-delete semantics. When `true` (default), DELETE rows carry the full * // pre-image. When `false`, only the identifying key columns are preserved and the rest - * // are nulled. See [Full vs partial deletes]( - * // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes) - * // for more details. + * // are nulled. See in documentation for more details. * Table result = table * .partitionBy($("id")) * .toChangelog(lit(false).asArgument("produces_full_deletes")); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index ab4a617a2df..9d8183d7e4d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -1462,9 +1462,7 @@ public interface Table extends Explainable<Table>, Executable { * * // Opt out of full-delete semantics. When `true` (default), DELETE rows carry the full * // pre-image. When `false`, only the identifying key columns are preserved and the rest - * // are nulled. See [Full vs partial deletes]( - * // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes) - * // for more details. + * // are nulled. See in documentation for more details. * Table result = table.toChangelog( * lit(false).asArgument("produces_full_deletes") * ); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java index 3d17401c5ef..1289d054ff6 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java @@ -133,20 +133,29 @@ public interface TableSemantics { * Upsert key candidates derived from the passed table's metadata. * * <p>Returns a list of 0-based column index arrays that uniquely identify a row for upsert - * semantics. This is distinct from {@link #partitionByColumns()}: partition keys describe - * distribution and co-location, upsert keys describe row identity. Useful for functions that - * need to emit key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or want to have a - * unique identifier to interact with state. + * semantics. Useful for functions that need to emit key-only deletes, match UPDATE_BEFORE / + * UPDATE_AFTER pairs, or use a stable identifier to interact with state. + * + * <p>The upsert key describes row identity and is distinct from {@link #partitionByColumns()}, + * which describes distribution and co-location. They are independent and frequently disagree: + * + * <pre>{@code + * -- Source declares PRIMARY KEY (id); the call partitions by region. + * -- partitionByColumns() = [region] (chosen by the caller) + * -- upsertKeyColumns() = [[id]] (derived from the source's PK) + * TO_CHANGELOG(input => TABLE source PARTITION BY region) + * }</pre> * * <p>Returns an empty list when no upsert key is derivable, or when the planner has not yet * computed metadata (during type inference). * - * <p>When the planner derives multiple candidate upsert keys for the same input (e.g., a table - * with several primary key constraints), all of them are returned. Picking which candidate to - * use is the function's responsibility, and the choice must be stable across releases to keep - * PTF state consistent after job restarts and upgrades. The order of the returned list is not - * part of the contract; PTF authors should not depend on it. A typical choice is the smallest - * candidate by cardinality, with ties broken by the column indices in ascending order. + * <p>The planner may derive more than one candidate for the same input. For example, an inner + * join of two tables each carrying their own primary key produces a result with both keys as + * separate candidates. Picking which candidate to use is the function's responsibility, and the + * choice must be stable across releases so PTF state stays consistent after job restarts and + * upgrades. The order of the returned list is not part of the contract; PTF authors should not + * depend on it. A typical stable choice is the smallest candidate by cardinality, with ties + * broken by the column indices in ascending order. * * @return Candidate upsert keys of the passed table, or an empty list if none. */ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java index 4c27aae4723..1434b221077 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java @@ -113,11 +113,6 @@ public final class ToChangelogTypeStrategy { return error; } - error = validateProducesFullDeletes(callContext, throwOnFailure); - if (error.isPresent()) { - return error; - } - return Optional.of(callContext.getArgumentDataTypes()); } @@ -198,40 +193,6 @@ public final class ToChangelogTypeStrategy { return Optional.empty(); } - @SuppressWarnings("rawtypes") - private static Optional<List<DataType>> validateProducesFullDeletes( - final CallContext callContext, final boolean throwOnFailure) { - final boolean isExplicit = !callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES); - if (!isExplicit) { - return Optional.empty(); - } - if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) { - return callContext.fail( - throwOnFailure, - "The 'produces_full_deletes' argument must be a constant BOOLEAN literal."); - } - final boolean producesFullDeletes = - callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, Boolean.class).orElse(true); - if (!producesFullDeletes) { - return Optional.empty(); - } - // The check against the input changelog mode lives in the function constructor since - // TableSemantics#changelogMode() returns empty here at type-inference time. The mapping - // check below only needs the literal op_mapping argument, so it lives here. Only runs - // when the user explicitly set produces_full_deletes=true; the default true is not - // validated since it is a safe no-op for any input. - final Optional<Map> opMapping = callContext.getArgumentValue(ARG_OP_MAPPING, Map.class); - if (opMapping.isPresent() && !mapsDelete((Map<String, String>) opMapping.get())) { - return callContext.fail( - throwOnFailure, - "Invalid 'produces_full_deletes' for TO_CHANGELOG: the active 'op_mapping' " - + "does not map DELETE rows, so no DELETE rows are emitted. Remove " - + "the 'produces_full_deletes' argument or add a DELETE entry to " - + "'op_mapping'."); - } - return Optional.empty(); - } - /** * Returns {@code true} when at least one {@code op_mapping} key references {@code DELETE}. Keys * may be comma-separated (e.g., {@code "INSERT, DELETE"}) per the user-facing contract. diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java index 6b5773949db..36607885bd8 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java @@ -98,20 +98,6 @@ class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, false) .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), - // Error: produces_full_deletes=true with op_mapping that strips DELETE - TestSpec.forStrategy( - "produces_full_deletes=true rejected when op_mapping omits DELETE", - TO_CHANGELOG_INPUT_TYPE_STRATEGY) - .calledWithArgumentTypes( - TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) - .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) - .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) - .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT, UPDATE_AFTER", "X")) - .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true) - .expectErrorMessage( - "Invalid 'produces_full_deletes' for TO_CHANGELOG: the active " - + "'op_mapping' does not map DELETE rows"), - // Error: multi-column descriptor for `op` TestSpec.forStrategy( "Descriptor with multiple columns", diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java index e0cf5f09187..8c00acb8e57 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java @@ -314,9 +314,9 @@ public class StreamExecProcessTableFunction extends ExecNodeBase<RowData> private RuntimeTableSemantics createRuntimeTableSemantics( StaticArgument tableArg, RexTableArgCall tableArgCall, List<Integer> inputTimeColumns) { + final int inputIndex = tableArgCall.getInputIndex(); final RuntimeChangelogMode consumedChangelogMode = - RuntimeChangelogMode.serialize( - inputChangelogModes.get(tableArgCall.getInputIndex())); + RuntimeChangelogMode.serialize(inputChangelogModes.get(inputIndex)); final DataType dataType; if (tableArg.getDataType().isPresent()) { dataType = tableArg.getDataType().get(); @@ -324,9 +324,7 @@ public class StreamExecProcessTableFunction extends ExecNodeBase<RowData> dataType = DataTypes.of(FlinkTypeFactory.toLogicalRowType(tableArgCall.type)); } - final int timeColumn = inputTimeColumns.get(tableArgCall.getInputIndex()); - - final int inputIndex = tableArgCall.getInputIndex(); + final int timeColumn = inputTimeColumns.get(inputIndex); final List<int[]> upsertKeys = inputIndex < inputUpsertKeys.size() ? inputUpsertKeys.get(inputIndex) : List.of(); return new RuntimeTableSemantics( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java index 41c53979067..c35394c7694 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java @@ -54,9 +54,7 @@ public class ToChangelogSemanticTests extends SemanticTestBase { ToChangelogTestPrograms.DELETION_FLAG, ToChangelogTestPrograms.INVALID_DESCRIPTOR, ToChangelogTestPrograms.INVALID_OP_MAPPING, - ToChangelogTestPrograms.OP_MAPPING_REFERENCES_UNSUPPORTED_KIND, ToChangelogTestPrograms.DUPLICATE_ROW_KIND, - ToChangelogTestPrograms.INVALID_PRODUCES_FULL_DELETES_FOR_APPEND_ONLY, ToChangelogTestPrograms.RETRACT_PRODUCES_PARTIAL_DELETES, ToChangelogTestPrograms.UPSERT_PRODUCES_FULL_DELETES, ToChangelogTestPrograms.UPSERT_PRODUCES_PARTIAL_DELETES, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java index 909e2f4d1ef..71731109eda 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java @@ -573,19 +573,6 @@ public class ToChangelogTestPrograms { "Unknown change operation: 'INVALID_KIND'") .build(); - public static final TableTestProgram OP_MAPPING_REFERENCES_UNSUPPORTED_KIND = - TableTestProgram.of( - "to-changelog-op-mapping-references-unsupported-kind", - "fails when op_mapping references a change operation the input cannot produce") - .setupTableSource(SIMPLE_SOURCE) - .runFailingSql( - "SELECT * FROM TO_CHANGELOG(" - + "input => TABLE t, " - + "op_mapping => MAP['INSERT', 'I', 'DELETE', 'D'])", - ValidationException.class, - "the input table only produces [INSERT] and does not produce [DELETE]") - .build(); - public static final TableTestProgram DUPLICATE_ROW_KIND = TableTestProgram.of( "to-changelog-duplicate-rowkind", @@ -599,19 +586,6 @@ public class ToChangelogTestPrograms { "Duplicate change operation: 'DELETE'") .build(); - public static final TableTestProgram INVALID_PRODUCES_FULL_DELETES_FOR_APPEND_ONLY = - TableTestProgram.of( - "to-changelog-invalid-produces-full-deletes-for-append-only", - "fails when produces_full_deletes=true on an input that never emits DELETE rows") - .setupTableSource(SIMPLE_SOURCE) - .runFailingSql( - "SELECT * FROM TO_CHANGELOG(" - + "input => TABLE t, " - + "produces_full_deletes => true)", - ValidationException.class, - "the input table only produces [INSERT] and never emits DELETE rows") - .build(); - // -------------------------------------------------------------------------------------------- // Full vs partial deletes matrix (input kind x PARTITION BY x produces_full_deletes) // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index 66162e205b9..b2e88dee1dd 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.functions.ptf; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RowData; @@ -46,7 +45,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP_MAPPING; import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_PRODUCES_FULL_DELETES; @@ -96,17 +94,14 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> { final Map<String, String> opMapping = callContext.getArgumentValue(ARG_OP_MAPPING, Map.class).orElse(null); this.rawOpMap = buildOpMap(opMapping); - if (opMapping != null) { - validateOpMap(this.rawOpMap, tableSemantics); - } - final boolean producesFullDeletesArg = + + this.producesFullDelete = callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, Boolean.class).orElse(true); final boolean isExplicit = !callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES); - validateProducesFullDeletes(producesFullDeletesArg, isExplicit, tableSemantics); + validateProducesPartialDeletes(producesFullDelete, isExplicit, tableSemantics); this.outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics); this.inputRowType = (RowType) tableSemantics.dataType().getLogicalType(); - this.producesFullDelete = producesFullDeletesArg; this.upsertKeyColumn = computeUpsertKeyColumn( this.outputIndices, @@ -164,70 +159,27 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> { return result; } - /** - * Rejects user-provided mappings that reference change operations the input changelog cannot - * produce. Without this check the extra entries are dead code: the corresponding rows never - * arrive. E.g., if the input is INSERT-only, then UPDATE_BEFORE and DELETE mappings are - * ignored, which is likely a user mistake. - * - * <p>Lives here rather than in the input type strategy because {@link - * TableSemantics#changelogMode()} returns empty during type inference and is only populated at - * specialization time, which is when this constructor runs. - */ - private static void validateOpMap( - final Map<RowKind, String> mapping, final TableSemantics tableSemantics) { - final ChangelogMode inputMode = tableSemantics.changelogMode().orElse(null); - if (inputMode == null) { - return; - } - final List<RowKind> unsupported = - mapping.keySet().stream() - .filter(kind -> !inputMode.contains(kind)) - .collect(Collectors.toList()); - if (!unsupported.isEmpty()) { - throw new ValidationException( - String.format( - "Invalid 'op_mapping' for TO_CHANGELOG: the input table only produces " - + "%s and does not produce %s. Remove those entries from the " - + "mapping.", - inputMode.getContainedKinds(), unsupported)); - } - } - /** * Validates an explicit {@code produces_full_deletes} argument against the input. * - * <p>For {@code produces_full_deletes=true}, the input changelog must emit DELETE rows; - * otherwise the parameter is dead. For {@code produces_full_deletes=false}, the input must - * declare an upsert key or the call must use {@code PARTITION BY}; otherwise the function has - * no identifying columns to preserve when nulling the rest. + * <p>For {@code produces_full_deletes=false}, the input must declare an upsert key or the call + * must use {@code PARTITION BY}; otherwise the function has no identifying columns to preserve + * when nulling the rest. * - * <p>No validation runs when the argument is absent, since the default (full deletes) is safe - * for any input. + * <p>No validation runs when the argument is absent or is set to true (default). * * <p>Lives here rather than in the input type strategy because {@link * TableSemantics#changelogMode()} and {@link TableSemantics#upsertKeyColumns()} are only * populated at specialization time. */ - private static void validateProducesFullDeletes( + private static void validateProducesPartialDeletes( final boolean producesFullDeletesArg, final boolean isExplicit, final TableSemantics tableSemantics) { - if (!isExplicit) { - return; - } - if (producesFullDeletesArg) { - final ChangelogMode inputMode = tableSemantics.changelogMode().orElse(null); - if (inputMode != null && !inputMode.contains(RowKind.DELETE)) { - throw new ValidationException( - String.format( - "Invalid 'produces_full_deletes' for TO_CHANGELOG: the input " - + "table only produces %s and never emits DELETE rows. " - + "Remove the 'produces_full_deletes' argument.", - inputMode.getContainedKinds())); - } + if (!isExplicit || producesFullDeletesArg) { return; } + final boolean hasPartitionBy = tableSemantics.partitionByColumns().length > 0; final boolean hasUpsertKey = !tableSemantics.upsertKeyColumns().isEmpty(); if (!hasPartitionBy && !hasUpsertKey) {
