This is an automated email from the ASF dual-hosted git repository. gustavodemorais pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 32f9e60eb9d7c33bac4fb3f91dc0200f03507c0c Author: Ramin Gharib <[email protected]> AuthorDate: Tue May 26 12:59:44 2026 +0200 [FLINK-39636][table] Emit partial DELETE rows in TO_CHANGELOG via upsert key Wires TO_CHANGELOG to consume the upsert key surfaced by TableSemantics so row-semantic partial DELETE rows preserve identifying columns without requiring PARTITION BY. Also flips the produces_full_deletes default to true so the safe full-pre-image behavior is the default, and false is the explicit opt-in to partial deletes. --- .../docs/sql/reference/queries/changelog.md | 91 +++++++++---- flink-python/pyflink/table/table.py | 19 +-- .../apache/flink/table/api/PartitionedTable.java | 10 +- .../java/org/apache/flink/table/api/Table.java | 10 +- .../functions/BuiltInFunctionDefinitions.java | 20 ++- .../strategies/ToChangelogTypeStrategy.java | 54 ++++---- .../ToChangelogInputTypeStrategyTest.java | 75 +++++------ .../inference/OperatorBindingCallContext.java | 7 +- .../exec/stream/ToChangelogSemanticTests.java | 14 +- .../nodes/exec/stream/ToChangelogTestPrograms.java | 81 ++++++------ .../runtime/functions/ptf/ToChangelogFunction.java | 144 +++++++++++++++------ 11 files changed, 321 insertions(+), 204 deletions(-) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index 7a1598ee850..c91107b2df2 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -308,7 +308,7 @@ SELECT * FROM TO_CHANGELOG( | `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located and run in the same operator instance. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key should match or be a subset of the upsert key of the subquery. | | `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | | `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. | -| `produces_full_deletes` | No | A `BOOLEAN` literal that controls how DELETE rows are emitted. When `true`, the function requires fully-populated DELETE rows from the input. The planner inserts a `ChangelogNormalize` operator for upsert sources that emit key-only deletes, so downstream sees the full pre-image on DELETE. When `false` (default), no full-delete requirement is enforced. Partial DELETE rows from the input pass through unchanged. With `PARTITION BY` (set semantics), the [...] +| `produces_full_deletes` | No | A `BOOLEAN` literal that controls how DELETE rows are emitted. When `true` (default), DELETE rows carry all columns, the full image. When `false`, only the identifying key columns are preserved and the rest are nulled. See [Full vs partial deletes](#full-vs-partial-deletes) for more details. | #### Default op_mapping @@ -399,41 +399,89 @@ SELECT * FROM TO_CHANGELOG( -- UPDATE_BEFORE is dropped (not in the mapping) ``` -#### Delete handling +#### Upsert key -The `produces_full_deletes` argument controls how DELETE rows are emitted and what the planner requires from the input. The behavior depends on whether `PARTITION BY` is used (set semantics) or not (row semantics). +An **upsert key** is a column or set of columns that uniquely identifies a row across its lifecycle in a changelog. It is what downstream operators and sinks use to decide which earlier row a new INSERT, UPDATE_AFTER, or DELETE refers to. -**With `produces_full_deletes => true`.** The planner requires the input to produce DELETE rows with all columns populated. For upsert sources, a `ChangelogNormalize` operator is inserted to materialize the full pre-image from state. The function then emits fully-populated DELETE rows downstream. +The planner derives the upsert key from the input table: + +* A declared `PRIMARY KEY` on the source table when reading directly. +* The grouping columns of an upstream `GROUP BY <key>`. +* The keys propagated by operators that preserve them (e.g. lookup joins, calc-projections that keep the key columns). + +When no upsert key can be derived (e.g. a plain append-only source with no key constraint and no grouping upstream), the input has no row identity and downstream operators must treat it as append-only or fall back to retract semantics. + +`TO_CHANGELOG` consumes the upsert key to decide which columns to preserve when emitting partial DELETE rows. See [Full vs partial deletes](#full-vs-partial-deletes) below. + +#### Full vs partial deletes + +The `produces_full_deletes` argument controls how DELETE rows are emitted and what the planner requires from the input. The matrix below shows each combination with `PARTITION BY` (set semantics) and without (row semantics). When `false`, the function relies on the input table's [upsert key](#upsert-key) to decide which columns to preserve. + +##### `produces_full_deletes => true` (default) + +The planner requires fully-populated DELETE rows on the input. For upsert sources that emit key-only deletes, a `ChangelogNormalize` operator is inserted upstream to materialize the full pre-image from state. For sources that already emit a full pre-image (e.g. retract), the flag is a no-op. The function then passes the input row through unchanged on DELETE. + +**Row semantics** (no `PARTITION BY`): ```sql -- Upsert source: -D[id:5] (key-only). -- ChangelogNormalize materializes the full pre-image from state. -- Output: +I[op:'DELETE', id:5, name:'Alice'] -SELECT * FROM TO_CHANGELOG( - input => TABLE upsert_source, - produces_full_deletes => true -) +SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source) + +-- Retract source: -D[id:5, name:'Alice'] (full pre-image). +-- No ChangelogNormalize inserted; the input row is passed through unchanged. +-- Output: +I[op:'DELETE', id:5, name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE retract_source) ``` -**With `produces_full_deletes => false` (default).** The planner does not require fully-populated DELETE rows on the input. For upsert sources that emit key-only deletes (e.g. Kafka compacted topics), this avoids the stateful `ChangelogNormalize` operator that would otherwise materialize the full pre-image of each deleted row. +**Set semantics** (`PARTITION BY`): -In **row semantics** (no `PARTITION BY`) the function passes the input row through unchanged. If the source emits partial DELETE rows they remain partial downstream; if it emits full DELETE rows they remain full. +```sql +-- Upsert source: -D[id:5] (key-only). +-- ChangelogNormalize materializes the full pre-image from state. +-- Output: +I[id:5, op:'DELETE', name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source PARTITION BY id) + +-- Retract source: -D[id:5, name:'Alice'] (full pre-image). +-- No ChangelogNormalize inserted; the input row is passed through unchanged. +-- Output: +I[id:5, op:'DELETE', name:'Alice'] +SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id) +``` + +##### `produces_full_deletes => false` + +The planner skips `ChangelogNormalize` and the function emits partial DELETE rows. This avoids the stateful normalization operator for upsert sources (e.g. Kafka compacted topics) where the full pre-image is not needed downstream. Requires an [upsert key](#upsert-key) to be present for the input table (row semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected with a validation error. + +**Row semantics** (no `PARTITION BY`): the function preserves the planner-derived upsert key columns on DELETE rows and nulls the rest. The upsert key is typically a declared `PRIMARY KEY` when directly reading from a source or the key provided in a `GROUP BY <key>`. ```sql --- Source emits -D[id:5] (key-only). +-- Upsert source with PRIMARY KEY (id): -D[id:5] (key-only). -- Output: +I[op:'DELETE', id:5, name:null] -SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source) +SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes => false) + +-- Retract source with PRIMARY KEY (id): -D[id:5, name:'Alice'] (full pre-image). +-- Output: +I[op:'DELETE', id:5, name:null] +SELECT * FROM TO_CHANGELOG(input => TABLE retract_source, produces_full_deletes => false) ``` -In **set semantics** (`PARTITION BY`) the function additionally nulls every non-partition-key column on DELETE rows. This forces the output to carry only the partition key on DELETE even when the input row was fully populated, which matches the shape expected by upsert sinks and Kafka compacted topics. +**Set semantics** (`PARTITION BY`): the function preserves the partition key and nulls every non-partition-key column on DELETE rows. The key used as the partition-key column should be the unique key that will be used as the record identifier. This matches the shape expected by upsert sinks and Kafka compacted topics. ```sql --- Source emits -D[id:5, name:'Alice'] (full pre-image, e.g. from a retract source). +-- Upsert source: -D[id:5] (key-only). -- Output: +I[id:5, op:'DELETE', name:null] -SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id) -``` +SELECT * FROM TO_CHANGELOG( + input => TABLE upsert_source PARTITION BY id, + produces_full_deletes => false +) -There is no way to derive a partial DELETE in row semantics when the input emits a full pre-image, since the function has no key column to preserve. Use `PARTITION BY` for that case. +-- Retract source: -D[id:5, name:'Alice'] (full pre-image). +-- Output: +I[id:5, op:'DELETE', name:null] +SELECT * FROM TO_CHANGELOG( + input => TABLE retract_source PARTITION BY id, + produces_full_deletes => false +) +``` #### Partitioning by a key @@ -472,12 +520,11 @@ Table result = myTable.toChangelog( map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") ); -// Require fully-populated DELETE rows from the input (inserts a ChangelogNormalize for -// upsert sources). When false (default), no full-delete requirement is enforced; in row -// semantics the input passes through unchanged, in set semantics non-partition-key columns -// are nulled on DELETE. +// Opt out of full-delete semantics. When `true` (default), DELETE rows carry the full +// pre-image. When `false`, only the identifying key columns are preserved and the rest +// are nulled. See [Full vs partial deletes](#full-vs-partial-deletes) for more details. Table result = myTable.toChangelog( - lit(true).asArgument("produces_full_deletes") + lit(false).asArgument("produces_full_deletes") ); // Set semantics: co-locate rows with the same key in the same parallel operator instance. diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py index bb06642db9e..e7898e8752d 100644 --- a/flink-python/pyflink/table/table.py +++ b/flink-python/pyflink/table/table.py @@ -1197,12 +1197,13 @@ class Table(object): (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE). The optional ``produces_full_deletes`` boolean controls how DELETE rows are - emitted. When ``True``, the planner inserts a ``ChangelogNormalize`` operator - for upsert sources that emit key-only deletes so the function emits fully - populated DELETE rows downstream. When ``False`` (default), no full-delete - requirement is enforced. In row semantics the input is passed through unchanged, - and in set semantics (``PARTITION BY``) non-partition-key columns are nulled on - DELETE rows. + emitted. When ``True`` (default), the planner inserts a ``ChangelogNormalize`` + operator for upsert sources that emit key-only deletes so the function emits + fully populated DELETE rows downstream. When ``False``, the function emits + partial DELETE rows: row semantics preserves the planner-derived upsert key + columns and nulls the rest, set semantics (``PARTITION BY``) preserves the + partition key and nulls the rest. Requires an upsert key or ``PARTITION BY``; + otherwise the call is rejected. Example: :: @@ -1221,10 +1222,10 @@ class Table(object): ... map_("INSERT, UPDATE_AFTER", "false", ... "DELETE", "true").as_argument("op_mapping") ... ) - >>> # Require fully populated DELETE rows from the input. Inserts a - >>> # ChangelogNormalize for upsert sources. + >>> # Opt out of full-delete semantics to emit partial DELETE rows. + >>> # Requires an upsert key or PARTITION BY; otherwise rejected. >>> result = table.to_changelog( - ... lit(True).as_argument("produces_full_deletes") + ... lit(False).as_argument("produces_full_deletes") ... ) :param arguments: Optional named arguments for ``op``, ``op_mapping``, and diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java index 074bdfdbaef..401317d125d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java @@ -205,12 +205,14 @@ public interface PartitionedTable { * map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") * ); * - * // Require fully-populated DELETE rows from the input (inserts a ChangelogNormalize for - * // upsert sources). When false (default), DELETE rows on upsert inputs may omit non-key - * // columns, which avoids the stateful normalization operator upstream. + * // Opt out of full-delete semantics. When `true` (default), DELETE rows carry the full + * // pre-image. When `false`, only the identifying key columns are preserved and the rest + * // are nulled. See [Full vs partial deletes]( + * // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes) + * // for more details. * Table result = table * .partitionBy($("id")) - * .toChangelog(lit(true).asArgument("produces_full_deletes")); + * .toChangelog(lit(false).asArgument("produces_full_deletes")); * }</pre> * * @param arguments optional named arguments for {@code op}, {@code op_mapping}, and {@code diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index de6ce9d3e5e..ab4a617a2df 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -1460,11 +1460,13 @@ public interface Table extends Explainable<Table>, Executable { * map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") * ); * - * // Require fully-populated DELETE rows from the input (inserts a ChangelogNormalize for - * // upsert sources). When false (default), no full-delete requirement is enforced and partial - * // DELETE rows from the input pass through unchanged. + * // Opt out of full-delete semantics. When `true` (default), DELETE rows carry the full + * // pre-image. When `false`, only the identifying key columns are preserved and the rest + * // are nulled. See [Full vs partial deletes]( + * // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/changelog/#full-vs-partial-deletes) + * // for more details. * Table result = table.toChangelog( - * lit(true).asArgument("produces_full_deletes") + * lit(false).asArgument("produces_full_deletes") * ); * }</pre> * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 19b8219e98b..0b9ea76b175 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -839,14 +839,20 @@ public final class BuiltInFunctionDefinitions { "UPDATE_BEFORE")))) .withConditionalTrait( StaticArgumentTrait.REQUIRE_FULL_DELETE, - // Require full deletes only when the user explicitly - // asks for them via produces_full_deletes=TRUE *and* - // the active op_mapping includes DELETE. Otherwise the - // planner can skip ChangelogNormalize for upsert - // sources that emit key-only deletes. + // Require full deletes by default. The user can opt + // out via produces_full_deletes=FALSE. + // REQUIRE_FULL_DELETE + // still gates on the active op_mapping mapping DELETE; + // otherwise no DELETE rows reach the function and there + // is no point inserting ChangelogNormalize upstream. TraitCondition.and( - TraitCondition.argIsEqualTo( - "produces_full_deletes", Boolean.TRUE), + TraitCondition.or( + TraitCondition.not( + TraitCondition.argIsPresent( + "produces_full_deletes")), + TraitCondition.argIsEqualTo( + "produces_full_deletes", + Boolean.TRUE)), TraitCondition.or( TraitCondition.not( TraitCondition.argIsPresent( diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java index d406875aa2f..4c27aae4723 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java @@ -28,6 +28,7 @@ import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.InputTypeStrategy; import org.apache.flink.table.types.inference.TypeStrategy; import org.apache.flink.types.ColumnList; +import org.apache.flink.types.RowKind; import java.util.ArrayList; import java.util.Arrays; @@ -52,6 +53,8 @@ public final class ToChangelogTypeStrategy { private static final Set<String> VALID_ROW_KIND_NAMES = Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE"); + private static final String DELETE = RowKind.DELETE.name(); + // -------------------------------------------------------------------------------------------- // Input validation // -------------------------------------------------------------------------------------------- @@ -152,7 +155,8 @@ public final class ToChangelogTypeStrategy { final Optional<Map> opMapping = callContext.getArgumentValue(ARG_OP_MAPPING, Map.class); if (opMapping.isPresent()) { - return validateOpMappingKeys(callContext, opMapping.get(), throwOnFailure); + return validateOpMappingKeys( + callContext, (Map<String, String>) opMapping.get(), throwOnFailure); } return Optional.empty(); } @@ -163,16 +167,13 @@ public final class ToChangelogTypeStrategy { * trimmed. Names are case-sensitive and must match exactly (e.g., {@code INSERT}, not {@code * insert}). Each name must be valid and appear at most once across all entries. */ - @SuppressWarnings("rawtypes") private static Optional<List<DataType>> validateOpMappingKeys( - final CallContext callContext, final Map opMapping, final boolean throwOnFailure) { + final CallContext callContext, + final Map<String, String> opMapping, + final boolean throwOnFailure) { final Set<String> allRowKindsSeen = new HashSet<>(); - for (final Object key : opMapping.keySet()) { - if (!(key instanceof String)) { - return callContext.fail( - throwOnFailure, "Invalid target mapping for argument 'op_mapping'."); - } - final String[] rowKindNames = ((String) key).split(","); + for (final String key : opMapping.keySet()) { + final String[] rowKindNames = key.split(","); for (final String rawName : rowKindNames) { final String rowKindName = rawName.trim(); if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) { @@ -180,7 +181,7 @@ public final class ToChangelogTypeStrategy { throwOnFailure, String.format( "Invalid target mapping for argument 'op_mapping'. " - + "Unknown change operation: '%s'. Valid values are: %s.", + + "Unknown change operation: '%s'. Operations are case-sensitive. Valid values are: %s.", rowKindName, VALID_ROW_KIND_NAMES)); } final boolean isDuplicate = !allRowKindsSeen.add(rowKindName); @@ -200,24 +201,27 @@ public final class ToChangelogTypeStrategy { @SuppressWarnings("rawtypes") private static Optional<List<DataType>> validateProducesFullDeletes( final CallContext callContext, final boolean throwOnFailure) { - final boolean hasArgProvided = !callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES); - if (hasArgProvided && !callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) { + final boolean isExplicit = !callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES); + if (!isExplicit) { + return Optional.empty(); + } + if (!callContext.isArgumentLiteral(ARG_PRODUCES_FULL_DELETES)) { return callContext.fail( throwOnFailure, "The 'produces_full_deletes' argument must be a constant BOOLEAN literal."); } final boolean producesFullDeletes = - callContext - .getArgumentValue(ARG_PRODUCES_FULL_DELETES, Boolean.class) - .orElse(false); + callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, Boolean.class).orElse(true); if (!producesFullDeletes) { return Optional.empty(); } // The check against the input changelog mode lives in the function constructor since // TableSemantics#changelogMode() returns empty here at type-inference time. The mapping - // check below only needs the literal op_mapping argument, so it lives here. + // check below only needs the literal op_mapping argument, so it lives here. Only runs + // when the user explicitly set produces_full_deletes=true; the default true is not + // validated since it is a safe no-op for any input. final Optional<Map> opMapping = callContext.getArgumentValue(ARG_OP_MAPPING, Map.class); - if (opMapping.isPresent() && !mapsDelete(opMapping.get())) { + if (opMapping.isPresent() && !mapsDelete((Map<String, String>) opMapping.get())) { return callContext.fail( throwOnFailure, "Invalid 'produces_full_deletes' for TO_CHANGELOG: the active 'op_mapping' " @@ -229,17 +233,13 @@ public final class ToChangelogTypeStrategy { } /** - * Returns {@code true} when at least one {@code op_mapping} key references {@code DELETE}. - * Keys may be comma-separated (e.g., {@code "INSERT, DELETE"}) per the user-facing contract. + * Returns {@code true} when at least one {@code op_mapping} key references {@code DELETE}. Keys + * may be comma-separated (e.g., {@code "INSERT, DELETE"}) per the user-facing contract. */ - @SuppressWarnings("rawtypes") - private static boolean mapsDelete(final Map opMapping) { - for (final Object key : opMapping.keySet()) { - if (!(key instanceof String)) { - continue; - } - for (final String rawName : ((String) key).split(",")) { - if ("DELETE".equals(rawName.trim())) { + private static boolean mapsDelete(final Map<String, String> opMapping) { + for (final String key : opMapping.keySet()) { + for (final String rawName : key.split(",")) { + if (DELETE.equals(rawName.trim())) { return true; } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java index e9b20a7a5ba..6b5773949db 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogInputTypeStrategyTest.java @@ -28,6 +28,10 @@ import java.util.Map; import java.util.stream.Stream; import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TO_CHANGELOG_INPUT_TYPE_STRATEGY; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP_MAPPING; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_PRODUCES_FULL_DELETES; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_TABLE; /** Tests for {@link ToChangelogTypeStrategy#INPUT_TYPE_STRATEGY}. */ class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { @@ -52,12 +56,11 @@ class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { TO_CHANGELOG_INPUT_TYPE_STRATEGY) .calledWithArgumentTypes( TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) - .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) - .calledWithLiteralAt(1, ColumnList.of("op")) - .calledWithLiteralAt(2, null) - .calledWithLiteralAt(3, true) - .expectArgumentTypes( - TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, null) + .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), // Valid: produces_full_deletes=true with op_mapping that includes DELETE TestSpec.forStrategy( @@ -65,12 +68,11 @@ class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { TO_CHANGELOG_INPUT_TYPE_STRATEGY) .calledWithArgumentTypes( TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) - .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) - .calledWithLiteralAt(1, ColumnList.of("op")) - .calledWithLiteralAt(2, Map.of("INSERT", "I", "DELETE", "D")) - .calledWithLiteralAt(3, true) - .expectArgumentTypes( - TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT", "I", "DELETE", "D")) + .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), // Valid: produces_full_deletes=true with comma-separated DELETE key TestSpec.forStrategy( @@ -78,12 +80,11 @@ class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { TO_CHANGELOG_INPUT_TYPE_STRATEGY) .calledWithArgumentTypes( TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) - .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) - .calledWithLiteralAt(1, ColumnList.of("op")) - .calledWithLiteralAt(2, Map.of("INSERT, DELETE", "X")) - .calledWithLiteralAt(3, true) - .expectArgumentTypes( - TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT, DELETE", "X")) + .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), // Valid: produces_full_deletes=false with op_mapping that omits DELETE TestSpec.forStrategy( @@ -91,12 +92,11 @@ class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { TO_CHANGELOG_INPUT_TYPE_STRATEGY) .calledWithArgumentTypes( TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) - .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) - .calledWithLiteralAt(1, ColumnList.of("op")) - .calledWithLiteralAt(2, Map.of("INSERT, UPDATE_AFTER", "X")) - .calledWithLiteralAt(3, false) - .expectArgumentTypes( - TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT, UPDATE_AFTER", "X")) + .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, false) + .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE), // Error: produces_full_deletes=true with op_mapping that strips DELETE TestSpec.forStrategy( @@ -104,22 +104,22 @@ class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { TO_CHANGELOG_INPUT_TYPE_STRATEGY) .calledWithArgumentTypes( TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) - .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) - .calledWithLiteralAt(1, ColumnList.of("op")) - .calledWithLiteralAt(2, Map.of("INSERT, UPDATE_AFTER", "X")) - .calledWithLiteralAt(3, true) + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INSERT, UPDATE_AFTER", "X")) + .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true) .expectErrorMessage( "Invalid 'produces_full_deletes' for TO_CHANGELOG: the active " + "'op_mapping' does not map DELETE rows"), - // Error: multi-column descriptor + // Error: multi-column descriptor for `op` TestSpec.forStrategy( "Descriptor with multiple columns", TO_CHANGELOG_INPUT_TYPE_STRATEGY) .calledWithArgumentTypes( TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) - .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) - .calledWithLiteralAt(1, ColumnList.of("a", "b")) + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("a", "b")) .expectErrorMessage("must contain exactly one column name"), // Error: invalid RowKind in op_mapping key @@ -127,9 +127,9 @@ class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { "Invalid RowKind in mapping key", TO_CHANGELOG_INPUT_TYPE_STRATEGY) .calledWithArgumentTypes( TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) - .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) - .calledWithLiteralAt(1, ColumnList.of("op")) - .calledWithLiteralAt(2, Map.of("INVALID_KIND", "X")) + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, Map.of("INVALID_KIND", "X")) .expectErrorMessage("Unknown change operation: 'INVALID_KIND'"), // Error: duplicate RowKind across entries @@ -138,9 +138,10 @@ class ToChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { TO_CHANGELOG_INPUT_TYPE_STRATEGY) .calledWithArgumentTypes( TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) - .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) - .calledWithLiteralAt(1, ColumnList.of("op")) - .calledWithLiteralAt(2, Map.of("INSERT, DELETE", "A", "DELETE", "B")) + .calledWithTableSemanticsAt(ARG_TABLE, new TableSemanticsMock(TABLE_TYPE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt( + ARG_OP_MAPPING, Map.of("INSERT, DELETE", "A", "DELETE", "B")) .expectErrorMessage("Duplicate change operation: 'DELETE'")); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java index 7d1d3940899..8f05d7e2718 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java @@ -195,10 +195,11 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext { Optional.ofNullable(inputChangelogModes) .map(m -> m.get(tableArgCall.getInputIndex())) .orElse(null); + final int inputIndex = tableArgCall.getInputIndex(); final List<int[]> upsertKeys = - Optional.ofNullable(inputUpsertKeys) - .map(m -> m.get(tableArgCall.getInputIndex())) - .orElse(List.of()); + inputUpsertKeys != null && inputIndex < inputUpsertKeys.size() + ? inputUpsertKeys.get(inputIndex) + : List.of(); return Optional.of( OperatorBindingTableSemantics.create( argumentDataTypes.get(pos), diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java index b3376b3a346..41c53979067 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java @@ -56,12 +56,12 @@ public class ToChangelogSemanticTests extends SemanticTestBase { ToChangelogTestPrograms.INVALID_OP_MAPPING, ToChangelogTestPrograms.OP_MAPPING_REFERENCES_UNSUPPORTED_KIND, ToChangelogTestPrograms.DUPLICATE_ROW_KIND, - ToChangelogTestPrograms.PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT, - ToChangelogTestPrograms.ROW_SEM_PARTIAL_DELETES, - ToChangelogTestPrograms.ROW_SEM_FORCE_FULL_DELETES, - ToChangelogTestPrograms.SET_SEM_PARTIAL_DELETES, - ToChangelogTestPrograms.SET_SEM_FULL_DELETES, - ToChangelogTestPrograms.SET_SEM_FORCE_FULL_DELETES, - ToChangelogTestPrograms.SET_SEM_FORCE_PARTIAL_DELETES); + ToChangelogTestPrograms.INVALID_PRODUCES_FULL_DELETES_FOR_APPEND_ONLY, + ToChangelogTestPrograms.RETRACT_PRODUCES_PARTIAL_DELETES, + ToChangelogTestPrograms.UPSERT_PRODUCES_FULL_DELETES, + ToChangelogTestPrograms.UPSERT_PRODUCES_PARTIAL_DELETES, + ToChangelogTestPrograms.RETRACT_PARTITION_BY_PRODUCES_PARTIAL_DELETES, + ToChangelogTestPrograms.RETRACT_PARTITION_BY_PRODUCES_FULL_DELETES, + ToChangelogTestPrograms.UPSERT_PARTITION_BY_PRODUCES_FULL_DELETES); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java index 5126014b9a3..909e2f4d1ef 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java @@ -187,8 +187,7 @@ public class ToChangelogTestPrograms { public static final TableTestProgram UPSERT_PARTITION_BY = TableTestProgram.of( "to-changelog-upsert-partition-by", - "PARTITION BY upsert key + mapping without UB skips ChangelogNormalize; " - + "default produces_full_deletes=false nulls non-key columns on DELETE") + "PARTITION BY upsert key + mapping without UB skips ChangelogNormalize") .setupTableSource( SourceTestStep.newBuilder("t") .addSchema( @@ -207,7 +206,7 @@ public class ToChangelogTestPrograms { "+I[Alice, C, 10]", "+I[Bob, C, 20]", "+I[Alice, C, 30]", - "+I[Bob, D, null]") + "+I[Bob, D, 20]") .build()) .runSql( "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" @@ -600,9 +599,9 @@ public class ToChangelogTestPrograms { "Duplicate change operation: 'DELETE'") .build(); - public static final TableTestProgram PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT = + public static final TableTestProgram INVALID_PRODUCES_FULL_DELETES_FOR_APPEND_ONLY = TableTestProgram.of( - "to-changelog-produces-full-deletes-on-append-only-input", + "to-changelog-invalid-produces-full-deletes-for-append-only", "fails when produces_full_deletes=true on an input that never emits DELETE rows") .setupTableSource(SIMPLE_SOURCE) .runFailingSql( @@ -613,15 +612,14 @@ public class ToChangelogTestPrograms { "the input table only produces [INSERT] and never emits DELETE rows") .build(); - // -------------------------------------------------------------------------------------------- - // Row semantics x delete handling matrix + // Full vs partial deletes matrix (input kind x PARTITION BY x produces_full_deletes) // -------------------------------------------------------------------------------------------- - public static final TableTestProgram ROW_SEM_PARTIAL_DELETES = + public static final TableTestProgram RETRACT_PRODUCES_PARTIAL_DELETES = TableTestProgram.of( - "to-changelog-row-sem-partial-deletes", - "row semantics: produces_full_deletes=false skips ChangelogNormalize and a partial DELETE row from the input passes through unchanged") + "to-changelog-retract-produces-partial-deletes", + "retract input in row semantics with produces_full_deletes=false: skips ChangelogNormalize and the partial DELETE row from the input passes through unchanged") .setupTableSource( SourceTestStep.newBuilder("t") .addSchema( @@ -650,10 +648,10 @@ public class ToChangelogTestPrograms { + "produces_full_deletes => false)") .build(); - public static final TableTestProgram ROW_SEM_FORCE_FULL_DELETES = + public static final TableTestProgram UPSERT_PRODUCES_FULL_DELETES = TableTestProgram.of( - "to-changelog-row-sem-force-full-deletes", - "row semantics: produces_full_deletes=true forces ChangelogNormalize to materialize the full DELETE row from an upsert source emitting key-only deletes") + "to-changelog-upsert-produces-full-deletes", + "upsert input in row semantics with produces_full_deletes=true: ChangelogNormalize materializes the full DELETE row from a key-only delete") .setupTableSource( SourceTestStep.newBuilder("t") .addSchema( @@ -676,44 +674,37 @@ public class ToChangelogTestPrograms { + "produces_full_deletes => true)") .build(); - // -------------------------------------------------------------------------------------------- - // Set semantics x delete handling matrix - // -------------------------------------------------------------------------------------------- - - public static final TableTestProgram SET_SEM_FORCE_PARTIAL_DELETES = + public static final TableTestProgram UPSERT_PRODUCES_PARTIAL_DELETES = TableTestProgram.of( - "to-changelog-set-sem-force-partial-deletes", - "set semantics: produces_full_deletes=false nulls non-partition-key columns on DELETE even when the input row is fully populated") + "to-changelog-upsert-produces-partial-deletes", + "upsert input in row semantics with single-column upsert key + " + + "produces_full_deletes=false: DELETE preserves the upsert key " + + "column and nulls the rest without requiring PARTITION BY") .setupTableSource( SourceTestStep.newBuilder("t") .addSchema( "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") - .addMode(ChangelogMode.all()) + .addMode(ChangelogMode.upsert()) .producedValues( Row.ofKind(RowKind.INSERT, "Alice", 10L), - Row.ofKind(RowKind.INSERT, "Bob", 20L), - Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), - Row.ofKind(RowKind.DELETE, "Bob", 20L)) + Row.ofKind(RowKind.DELETE, "Alice", 10L)) .build()) .setupTableSink( SinkTestStep.newBuilder("sink") - .addSchema("name STRING", "op STRING", "score BIGINT") + .addSchema("op STRING", "name STRING", "score BIGINT") .consumedValues( - "+I[Alice, INSERT, 10]", - "+I[Bob, INSERT, 20]", - "+I[Alice, UPDATE_AFTER, 30]", - "+I[Bob, DELETE, null]") + "+I[INSERT, Alice, 10]", "+I[DELETE, Alice, null]") .build()) .runSql( "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" - + "input => TABLE t PARTITION BY name," + + "input => TABLE t, " + "produces_full_deletes => false)") .build(); - public static final TableTestProgram SET_SEM_PARTIAL_DELETES = + public static final TableTestProgram RETRACT_PARTITION_BY_PRODUCES_PARTIAL_DELETES = TableTestProgram.of( - "to-changelog-set-sem-partial-deletes", - "set semantics: produces_full_deletes=false (default) lets a partial DELETE row from the input pass through with non-partition-key columns null") + "to-changelog-retract-partition-by-produces-partial-deletes", + "retract input in set semantics with produces_full_deletes=false: nulls non-partition-key columns on DELETE even when the input row is fully populated") .setupTableSource( SourceTestStep.newBuilder("t") .addSchema( @@ -721,23 +712,29 @@ public class ToChangelogTestPrograms { .addMode(ChangelogMode.all()) .producedValues( Row.ofKind(RowKind.INSERT, "Alice", 10L), - Row.ofKind(RowKind.DELETE, "Alice", null)) + Row.ofKind(RowKind.INSERT, "Bob", 20L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), + Row.ofKind(RowKind.DELETE, "Bob", 20L)) .build()) .setupTableSink( SinkTestStep.newBuilder("sink") .addSchema("name STRING", "op STRING", "score BIGINT") .consumedValues( - "+I[Alice, INSERT, 10]", "+I[Alice, DELETE, null]") + "+I[Alice, INSERT, 10]", + "+I[Bob, INSERT, 20]", + "+I[Alice, UPDATE_AFTER, 30]", + "+I[Bob, DELETE, null]") .build()) .runSql( "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" - + "input => TABLE t PARTITION BY name)") + + "input => TABLE t PARTITION BY name," + + "produces_full_deletes => false)") .build(); - public static final TableTestProgram SET_SEM_FULL_DELETES = + public static final TableTestProgram RETRACT_PARTITION_BY_PRODUCES_FULL_DELETES = TableTestProgram.of( - "to-changelog-set-sem-full-deletes", - "set semantics: produces_full_deletes=true on an input that already emits full deletes is a no-op for the planner and the full DELETE row reaches the output") + "to-changelog-retract-partition-by-produces-full-deletes", + "retract input in set semantics with produces_full_deletes=true (default): the input row passes through unchanged, full DELETE pre-image reaches the output") .setupTableSource( SourceTestStep.newBuilder("t") .addSchema( @@ -759,10 +756,10 @@ public class ToChangelogTestPrograms { + "produces_full_deletes => true)") .build(); - public static final TableTestProgram SET_SEM_FORCE_FULL_DELETES = + public static final TableTestProgram UPSERT_PARTITION_BY_PRODUCES_FULL_DELETES = TableTestProgram.of( - "to-changelog-set-sem-force-full-deletes", - "set semantics: produces_full_deletes=true forces ChangelogNormalize to materialize the full DELETE row from an upsert source emitting key-only deletes") + "to-changelog-upsert-partition-by-produces-full-deletes", + "upsert input in set semantics with produces_full_deletes=true: ChangelogNormalize materializes the full DELETE row from a key-only delete") .setupTableSource( SourceTestStep.newBuilder("t") .addSchema( diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index d8874a2be16..66162e205b9 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -33,21 +33,31 @@ import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.inference.CallContext; import org.apache.flink.table.types.inference.strategies.ChangelogTypeStrategyUtils; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.utils.UpsertKeyUtils; import org.apache.flink.types.ColumnList; import org.apache.flink.types.RowKind; import javax.annotation.Nullable; import java.util.EnumMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP_MAPPING; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_PRODUCES_FULL_DELETES; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_TABLE; + /** * Runtime implementation of {@link BuiltInFunctionDefinitions#TO_CHANGELOG}. * - * <p>Converts each input row into an INSERT-only output row with an operation code column. The - * output schema is {@code [op_column, ...all_input_columns...]}. + * <p>Converts each input row into an INSERT-only output row with an operation code column. Output + * schema is {@code [op_column, ...projected_input_columns...]}. Partition columns are prepended by + * the framework outside this function and are not part of the projection. * * <p>Uses {@link JoinedRowData} to combine the op column with the full input row. */ @@ -65,51 +75,55 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> { private final Map<RowKind, String> rawOpMap; private final int[] outputIndices; + private final RowType inputRowType; private final boolean producesFullDelete; + private final boolean[] upsertKeyColumn; private transient Map<RowKind, StringData> opMap; private transient GenericRowData opRow; private transient JoinedRowData output; private transient ProjectedRowData projectedOutput; - private transient GenericRowData nullPayloadRow; + private transient GenericRowData partialDeletePayload; + private transient RowData.FieldGetter[] preservedFieldGetters; @SuppressWarnings("unchecked") public ToChangelogFunction(final SpecializedContext context) { super(BuiltInFunctionDefinitions.TO_CHANGELOG, context); final CallContext callContext = context.getCallContext(); // Table argument is guaranteed by the type strategy's validation phase. - final TableSemantics tableSemantics = callContext.getTableSemantics(0).get(); + final TableSemantics tableSemantics = callContext.getTableSemantics(ARG_TABLE).get(); final Map<String, String> opMapping = - callContext.getArgumentValue(2, Map.class).orElse(null); + callContext.getArgumentValue(ARG_OP_MAPPING, Map.class).orElse(null); this.rawOpMap = buildOpMap(opMapping); if (opMapping != null) { validateOpMap(this.rawOpMap, tableSemantics); } final boolean producesFullDeletesArg = - callContext.getArgumentValue(3, Boolean.class).orElse(false); - validateProducesFullDeletes(producesFullDeletesArg, tableSemantics); + callContext.getArgumentValue(ARG_PRODUCES_FULL_DELETES, Boolean.class).orElse(true); + final boolean isExplicit = !callContext.isArgumentNull(ARG_PRODUCES_FULL_DELETES); + validateProducesFullDeletes(producesFullDeletesArg, isExplicit, tableSemantics); this.outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics); - this.producesFullDelete = resolveProducesFullDelete(producesFullDeletesArg, tableSemantics); + this.inputRowType = (RowType) tableSemantics.dataType().getLogicalType(); + this.producesFullDelete = producesFullDeletesArg; + this.upsertKeyColumn = + computeUpsertKeyColumn( + this.outputIndices, + UpsertKeyUtils.smallestKey(tableSemantics.upsertKeyColumns())); } - /** - * Decides whether this function emits full DELETE rows (input passed through unchanged) or - * partial DELETE rows (only identifying columns preserved, rest nulled). - * - * <p>The framework prepends partition-key columns to the output without consulting this - * function, so in set semantics partition keys are preserved on DELETE rows for free. In row - * semantics there is no key column to preserve, so the function passes the input through - * unchanged regardless of {@code produces_full_deletes}. - */ - private static boolean resolveProducesFullDelete( - final boolean producesFullDeletesArg, final TableSemantics tableSemantics) { - if (producesFullDeletesArg) { - return true; + private static boolean[] computeUpsertKeyColumn( + final int[] outputIndices, final int[] upsertKey) { + final Set<Integer> keepInputIndices = new HashSet<>(); + for (final int key : upsertKey) { + keepInputIndices.add(key); } - final boolean hasPartitionBy = tableSemantics.partitionByColumns().length > 0; - return !hasPartitionBy; + final boolean[] mask = new boolean[outputIndices.length]; + for (int i = 0; i < outputIndices.length; i++) { + mask[i] = keepInputIndices.contains(outputIndices[i]); + } + return mask; } @Override @@ -120,7 +134,16 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> { opRow = new GenericRowData(1); output = new JoinedRowData(); projectedOutput = ProjectedRowData.from(outputIndices); - nullPayloadRow = new GenericRowData(outputIndices.length); + partialDeletePayload = new GenericRowData(outputIndices.length); + preservedFieldGetters = new RowData.FieldGetter[outputIndices.length]; + final List<LogicalType> inputFieldTypes = inputRowType.getChildren(); + for (int i = 0; i < outputIndices.length; i++) { + if (upsertKeyColumn[i]) { + preservedFieldGetters[i] = + RowData.createFieldGetter( + inputFieldTypes.get(outputIndices[i]), outputIndices[i]); + } + } } /** @@ -172,29 +195,48 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> { } /** - * Rejects {@code produces_full_deletes=true} when the input changelog never emits DELETE rows. + * Validates an explicit {@code produces_full_deletes} argument against the input. + * + * <p>For {@code produces_full_deletes=true}, the input changelog must emit DELETE rows; + * otherwise the parameter is dead. For {@code produces_full_deletes=false}, the input must + * declare an upsert key or the call must use {@code PARTITION BY}; otherwise the function has + * no identifying columns to preserve when nulling the rest. + * + * <p>No validation runs when the argument is absent, since the default (full deletes) is safe + * for any input. * * <p>Lives here rather than in the input type strategy because {@link - * TableSemantics#changelogMode()} returns empty during type inference and is only populated at - * specialization time. The complementary check against the literal {@code op_mapping} - * argument runs earlier in {@code ToChangelogTypeStrategy}. + * TableSemantics#changelogMode()} and {@link TableSemantics#upsertKeyColumns()} are only + * populated at specialization time. */ private static void validateProducesFullDeletes( - final boolean producesFullDeletesArg, final TableSemantics tableSemantics) { - if (!producesFullDeletesArg) { + final boolean producesFullDeletesArg, + final boolean isExplicit, + final TableSemantics tableSemantics) { + if (!isExplicit) { return; } - final ChangelogMode inputMode = tableSemantics.changelogMode().orElse(null); - if (inputMode == null) { + if (producesFullDeletesArg) { + final ChangelogMode inputMode = tableSemantics.changelogMode().orElse(null); + if (inputMode != null && !inputMode.contains(RowKind.DELETE)) { + throw new ValidationException( + String.format( + "Invalid 'produces_full_deletes' for TO_CHANGELOG: the input " + + "table only produces %s and never emits DELETE rows. " + + "Remove the 'produces_full_deletes' argument.", + inputMode.getContainedKinds())); + } return; } - if (!inputMode.contains(RowKind.DELETE)) { + final boolean hasPartitionBy = tableSemantics.partitionByColumns().length > 0; + final boolean hasUpsertKey = !tableSemantics.upsertKeyColumns().isEmpty(); + if (!hasPartitionBy && !hasUpsertKey) { throw new ValidationException( - String.format( - "Invalid 'produces_full_deletes' for TO_CHANGELOG: the input table " - + "only produces %s and never emits DELETE rows. Remove the " - + "'produces_full_deletes' argument.", - inputMode.getContainedKinds())); + "Invalid 'produces_full_deletes=false' for TO_CHANGELOG: the input has no " + + "upsert key and the call has no PARTITION BY, so the function has " + + "no identifying columns to preserve on DELETE rows. Remove the " + + "argument (the default emits full DELETE rows) or add a " + + "PARTITION BY."); } } @@ -210,10 +252,28 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> { } opRow.setField(0, opCode); - final RowData payload = - (input.getRowKind() == RowKind.DELETE && !producesFullDelete) - ? nullPayloadRow - : projectedOutput.replaceRow(input); + final RowData payload; + if (input.getRowKind() == RowKind.DELETE && !producesFullDelete) { + payload = buildPartialDeletePayload(input); + } else { + payload = projectedOutput.replaceRow(input); + } collect(output.replace(opRow, payload)); } + + /** + * Builds the payload for a partial DELETE row: upsert-key columns are copied from the input, + * all other columns are emitted as {@code null}. Partition-key columns are not included here + * since the framework prepends them outside the function's projected output. + */ + private RowData buildPartialDeletePayload(final RowData input) { + for (int i = 0; i < outputIndices.length; i++) { + if (upsertKeyColumn[i]) { + partialDeletePayload.setField(i, preservedFieldGetters[i].getFieldOrNull(input)); + } else { + partialDeletePayload.setField(i, null); + } + } + return partialDeletePayload; + } }
