This is an automated email from the ASF dual-hosted git repository. gustavodemorais pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bca2c0a6170adbbf39cfaf4327ec368f003e4f40 Author: Ramin Gharib <[email protected]> AuthorDate: Thu May 21 17:57:55 2026 +0200 [FLINK-39636][table] Add produces_full_deletes parameter to TO_CHANGELOG Introduces the optional produces_full_deletes boolean argument on the built-in TO_CHANGELOG PTF. Controls whether DELETE rows downstream carry the full pre-image. Only the partition-key path is covered here; row-semantic partial deletes via the upsert key follow in a later commit. --- .../docs/sql/reference/queries/changelog.md | 58 +++++- flink-python/pyflink/table/table.py | 18 +- .../apache/flink/table/api/PartitionedTable.java | 10 +- .../java/org/apache/flink/table/api/Table.java | 10 +- .../functions/BuiltInFunctionDefinitions.java | 40 ++-- .../table/types/inference/BuiltInCondition.java | 1 + .../table/types/inference/TraitCondition.java | 11 ++ .../strategies/ToChangelogTypeStrategy.java | 16 +- .../exec/stream/ToChangelogSemanticTests.java | 10 +- .../nodes/exec/stream/ToChangelogTestPrograms.java | 216 ++++++++++++++++++++- .../planner/plan/stream/sql/ToChangelogTest.java | 38 ++++ .../planner/plan/stream/sql/ToChangelogTest.xml | 66 +++++-- .../plan/to-changelog-retract-restore.json | 7 +- .../runtime/functions/ptf/ToChangelogFunction.java | 72 ++++++- 14 files changed, 527 insertions(+), 46 deletions(-) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index 78f61e1ce10..7a1598ee850 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -296,17 +296,19 @@ This is useful when you need to materialize changelog events into a downstream s SELECT * FROM TO_CHANGELOG( input => TABLE source_table [PARTITION BY key_col], [op => DESCRIPTOR(op_column_name),] - [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]] + [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...],] + [produces_full_deletes => BOOLEAN] ) ``` ### Parameters -| Parameter | Required | Description | -|:-------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located and run in the same operator instance. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key should match or be a subset of the upsert key of the subquery. | -| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | -| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. | +| Parameter | Required | Description | +|:------------------------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `input` | Yes | The input table. With `PARTITION BY`, rows with the same key are co-located and run in the same operator instance. Without `PARTITION BY`, each row is processed independently. Accepts insert-only, retract, and upsert tables. For upsert tables, the provided `PARTITION BY` key should match or be a subset of the upsert key of the subquery. | +| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. | +| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. | +| `produces_full_deletes` | No | A `BOOLEAN` literal that controls how DELETE rows are emitted. When `true`, the function requires fully-populated DELETE rows from the input. The planner inserts a `ChangelogNormalize` operator for upsert sources that emit key-only deletes, so downstream sees the full pre-image on DELETE. When `false` (default), no full-delete requirement is enforced. Partial DELETE rows from the input pass through unchanged. With `PARTITION BY` (set semantics), the [...] #### Default op_mapping @@ -397,6 +399,42 @@ SELECT * FROM TO_CHANGELOG( -- UPDATE_BEFORE is dropped (not in the mapping) ``` +#### Delete handling + +The `produces_full_deletes` argument controls how DELETE rows are emitted and what the planner requires from the input. The behavior depends on whether `PARTITION BY` is used (set semantics) or not (row semantics). + +**With `produces_full_deletes => true`.** The planner requires the input to produce DELETE rows with all columns populated. For upsert sources, a `ChangelogNormalize` operator is inserted to materialize the full pre-image from state. The function then emits fully-populated DELETE rows downstream. + +```sql +-- Upsert source: -D[id:5] (key-only). +-- ChangelogNormalize materializes the full pre-image from state. +-- Output: +I[op:'DELETE', id:5, name:'Alice'] +SELECT * FROM TO_CHANGELOG( + input => TABLE upsert_source, + produces_full_deletes => true +) +``` + +**With `produces_full_deletes => false` (default).** The planner does not require fully-populated DELETE rows on the input. For upsert sources that emit key-only deletes (e.g. Kafka compacted topics), this avoids the stateful `ChangelogNormalize` operator that would otherwise materialize the full pre-image of each deleted row. + +In **row semantics** (no `PARTITION BY`) the function passes the input row through unchanged. If the source emits partial DELETE rows they remain partial downstream; if it emits full DELETE rows they remain full. + +```sql +-- Source emits -D[id:5] (key-only). +-- Output: +I[op:'DELETE', id:5, name:null] +SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source) +``` + +In **set semantics** (`PARTITION BY`) the function additionally nulls every non-partition-key column on DELETE rows. This forces the output to carry only the partition key on DELETE even when the input row was fully populated, which matches the shape expected by upsert sinks and Kafka compacted topics. + +```sql +-- Source emits -D[id:5, name:'Alice'] (full pre-image, e.g. from a retract source). +-- Output: +I[id:5, op:'DELETE', name:null] +SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id) +``` + +There is no way to derive a partial DELETE in row semantics when the input emits a full pre-image, since the function has no key column to preserve. Use `PARTITION BY` for that case. + #### Partitioning by a key ```sql @@ -434,6 +472,14 @@ Table result = myTable.toChangelog( map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") ); +// Require fully-populated DELETE rows from the input (inserts a ChangelogNormalize for +// upsert sources). When false (default), no full-delete requirement is enforced; in row +// semantics the input passes through unchanged, in set semantics non-partition-key columns +// are nulled on DELETE. +Table result = myTable.toChangelog( + lit(true).asArgument("produces_full_deletes") +); + // Set semantics: co-locate rows with the same key in the same parallel operator instance. // Equivalent to PARTITION BY in SQL. The partition keys are prepended to the output columns. Table result = myTable.partitionBy($("id")).toChangelog(); diff --git a/flink-python/pyflink/table/table.py b/flink-python/pyflink/table/table.py index 583167fa478..bb06642db9e 100644 --- a/flink-python/pyflink/table/table.py +++ b/flink-python/pyflink/table/table.py @@ -1196,10 +1196,18 @@ class Table(object): INSERT-only row with a string ``op`` column indicating the original operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE). + The optional ``produces_full_deletes`` boolean controls how DELETE rows are + emitted. When ``True``, the planner inserts a ``ChangelogNormalize`` operator + for upsert sources that emit key-only deletes so the function emits fully + populated DELETE rows downstream. When ``False`` (default), no full-delete + requirement is enforced. In row semantics the input is passed through unchanged, + and in set semantics (``PARTITION BY``) non-partition-key columns are nulled on + DELETE rows. + Example: :: - >>> from pyflink.table.expressions import descriptor, map_ + >>> from pyflink.table.expressions import descriptor, map_, lit, col >>> # Default: adds 'op' column with standard change operation names >>> result = table.to_changelog() >>> # Custom op column name and mapping @@ -1213,8 +1221,14 @@ class Table(object): ... map_("INSERT, UPDATE_AFTER", "false", ... "DELETE", "true").as_argument("op_mapping") ... ) + >>> # Require fully populated DELETE rows from the input. Inserts a + >>> # ChangelogNormalize for upsert sources. + >>> result = table.to_changelog( + ... lit(True).as_argument("produces_full_deletes") + ... ) - :param arguments: Optional named arguments for ``op`` and ``op_mapping``. + :param arguments: Optional named arguments for ``op``, ``op_mapping``, and + ``produces_full_deletes``. :return: An append-only :class:`~pyflink.table.Table` with an ``op`` column prepended to the input columns. """ diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java index 51086f1edfe..074bdfdbaef 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java @@ -204,9 +204,17 @@ public interface PartitionedTable { * descriptor("deleted").asArgument("op"), * map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") * ); + * + * // Require fully-populated DELETE rows from the input (inserts a ChangelogNormalize for + * // upsert sources). When false (default), DELETE rows on upsert inputs may omit non-key + * // columns, which avoids the stateful normalization operator upstream. + * Table result = table + * .partitionBy($("id")) + * .toChangelog(lit(true).asArgument("produces_full_deletes")); * }</pre> * - * @param arguments optional named arguments for {@code op} and {@code op_mapping} + * @param arguments optional named arguments for {@code op}, {@code op_mapping}, and {@code + * produces_full_deletes} * @return an append-only {@link Table} with output schema {@code [partition_keys, op, * non_partition_input_columns]} * @see Table#toChangelog(Expression...) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java index 0ffb90c5fcc..de6ce9d3e5e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java @@ -1459,9 +1459,17 @@ public interface Table extends Explainable<Table>, Executable { * descriptor("deleted").asArgument("op"), * map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping") * ); + * + * // Require fully-populated DELETE rows from the input (inserts a ChangelogNormalize for + * // upsert sources). When false (default), no full-delete requirement is enforced and partial + * // DELETE rows from the input pass through unchanged. + * Table result = table.toChangelog( + * lit(true).asArgument("produces_full_deletes") + * ); * }</pre> * - * @param arguments optional named arguments for {@code op} and {@code op_mapping} + * @param arguments optional named arguments for {@code op}, {@code op_mapping}, and {@code + * produces_full_deletes} * @return an append-only {@link Table} with an {@code op} column prepended to the input columns */ Table toChangelog(Expression... arguments); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 4d0c9b28dac..19b8219e98b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -839,25 +839,35 @@ public final class BuiltInFunctionDefinitions { "UPDATE_BEFORE")))) .withConditionalTrait( StaticArgumentTrait.REQUIRE_FULL_DELETE, - TraitCondition.or( - // op_mapping omitted: default mapping includes - // DELETE. - TraitCondition.not( - TraitCondition.argIsPresent( - "op_mapping")), - TraitCondition.argMatches( - "op_mapping", - Map.class, - mapping -> - opMappingContainsKey( - (Map<String, String>) - mapping, - "DELETE")))), + // Require full deletes only when the user explicitly + // asks for them via produces_full_deletes=TRUE *and* + // the active op_mapping includes DELETE. Otherwise the + // planner can skip ChangelogNormalize for upsert + // sources that emit key-only deletes. + TraitCondition.and( + TraitCondition.argIsEqualTo( + "produces_full_deletes", Boolean.TRUE), + TraitCondition.or( + TraitCondition.not( + TraitCondition.argIsPresent( + "op_mapping")), + TraitCondition.argMatches( + "op_mapping", + Map.class, + mapping -> + opMappingContainsKey( + (Map< + String, + String>) + mapping, + "DELETE"))))), StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true), StaticArgument.scalar( "op_mapping", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()), - true)) + true), + StaticArgument.scalar( + "produces_full_deletes", DataTypes.BOOLEAN(), true)) .inputTypeStrategy(TO_CHANGELOG_INPUT_TYPE_STRATEGY) .outputTypeStrategy(TO_CHANGELOG_OUTPUT_TYPE_STRATEGY) .runtimeClass( diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java index ba87cc0559d..4c0149f6533 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/BuiltInCondition.java @@ -40,6 +40,7 @@ final class BuiltInCondition implements TraitCondition { ARG_IS_PRESENT, NOT, OR, + AND, ARG_MATCHES } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java index 14734db11f4..8b86b37cc9f 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TraitCondition.java @@ -84,6 +84,17 @@ public interface TraitCondition { ctx -> left.test(ctx) || right.test(ctx)); } + /** + * True when both the {@code left} and the {@code right} {@link TraitCondition} evaluate to + * true. + */ + static TraitCondition and(final TraitCondition left, final TraitCondition right) { + return new BuiltInCondition( + BuiltInCondition.Kind.AND, + List.of(left, right), + ctx -> left.test(ctx) && right.test(ctx)); + } + /** True when the named scalar argument was provided by the caller. */ static TraitCondition argIsPresent(final String argName) { return new BuiltInCondition( diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java index d977deab4a9..8ff8101dc5e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java @@ -59,7 +59,7 @@ public final class ToChangelogTypeStrategy { new InputTypeStrategy() { @Override public ArgumentCount getArgumentCount() { - return ConstantArgumentCount.between(1, 3); + return ConstantArgumentCount.between(1, 4); } @Override @@ -77,7 +77,12 @@ public final class ToChangelogTypeStrategy { Signature.of( Argument.of("input", "TABLE"), Argument.of("op", "DESCRIPTOR"), - Argument.of("op_mapping", "MAP<STRING, STRING>"))); + Argument.of("op_mapping", "MAP<STRING, STRING>")), + Signature.of( + Argument.of("input", "TABLE"), + Argument.of("op", "DESCRIPTOR"), + Argument.of("op_mapping", "MAP<STRING, STRING>"), + Argument.of("produces_full_deletes", "BOOLEAN"))); } }; @@ -144,6 +149,13 @@ public final class ToChangelogTypeStrategy { } } + final boolean hasProducesFullDeletesArg = !callContext.isArgumentNull(3); + if (hasProducesFullDeletesArg && !callContext.isArgumentLiteral(3)) { + return callContext.fail( + throwOnFailure, + "The 'produces_full_deletes' argument must be a constant BOOLEAN literal."); + } + return Optional.of(callContext.getArgumentDataTypes()); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java index bfffa8bd67d..762e08b3538 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java @@ -55,6 +55,14 @@ public class ToChangelogSemanticTests extends SemanticTestBase { ToChangelogTestPrograms.INVALID_DESCRIPTOR, ToChangelogTestPrograms.INVALID_OP_MAPPING, ToChangelogTestPrograms.OP_MAPPING_REFERENCES_UNSUPPORTED_KIND, - ToChangelogTestPrograms.DUPLICATE_ROW_KIND); + ToChangelogTestPrograms.DUPLICATE_ROW_KIND, + ToChangelogTestPrograms.PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT, + ToChangelogTestPrograms.PRODUCES_FULL_DELETES_WITHOUT_DELETE_IN_OP_MAPPING, + ToChangelogTestPrograms.ROW_SEM_PARTIAL_DELETES, + ToChangelogTestPrograms.ROW_SEM_FORCE_FULL_DELETES, + ToChangelogTestPrograms.SET_SEM_PARTIAL_DELETES, + ToChangelogTestPrograms.SET_SEM_FULL_DELETES, + ToChangelogTestPrograms.SET_SEM_FORCE_FULL_DELETES, + ToChangelogTestPrograms.SET_SEM_FORCE_PARTIAL_DELETES); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java index 250ad1f1d8f..f04d30d1d99 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java @@ -157,7 +157,7 @@ public class ToChangelogTestPrograms { public static final TableTestProgram UPSERT = TableTestProgram.of( "to-changelog-upsert-input", - "upsert input gets ChangelogNormalize for UPDATE_BEFORE and full deletes") + "upsert input in row semantics gets ChangelogNormalize for UPDATE_BEFORE and emits full deletes") .setupTableSource( SourceTestStep.newBuilder("t") .addSchema( @@ -187,7 +187,8 @@ public class ToChangelogTestPrograms { public static final TableTestProgram UPSERT_PARTITION_BY = TableTestProgram.of( "to-changelog-upsert-partition-by", - "PARTITION BY upsert key + mapping without UB skips ChangelogNormalize") + "PARTITION BY upsert key + mapping without UB skips ChangelogNormalize; " + + "default produces_full_deletes=false nulls non-key columns on DELETE") .setupTableSource( SourceTestStep.newBuilder("t") .addSchema( @@ -206,7 +207,7 @@ public class ToChangelogTestPrograms { "+I[Alice, C, 10]", "+I[Bob, C, 20]", "+I[Alice, C, 30]", - "+I[Bob, D, 20]") + "+I[Bob, D, null]") .build()) .runSql( "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" @@ -598,4 +599,213 @@ public class ToChangelogTestPrograms { ValidationException.class, "Duplicate change operation: 'DELETE'") .build(); + + public static final TableTestProgram PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT = + TableTestProgram.of( + "to-changelog-produces-full-deletes-on-append-only-input", + "fails when produces_full_deletes=true on an input that never emits DELETE rows") + .setupTableSource(SIMPLE_SOURCE) + .runFailingSql( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t, " + + "produces_full_deletes => true)", + ValidationException.class, + "the input table only produces [INSERT] and never emits DELETE rows") + .build(); + + public static final TableTestProgram PRODUCES_FULL_DELETES_WITHOUT_DELETE_IN_OP_MAPPING = + TableTestProgram.of( + "to-changelog-produces-full-deletes-without-delete-in-op-mapping", + "fails when produces_full_deletes=true but the active op_mapping strips DELETE") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.all()) + .producedValues(Row.ofKind(RowKind.INSERT, "Alice", 10L)) + .build()) + .runFailingSql( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t, " + + "op_mapping => MAP['INSERT, UPDATE_AFTER', 'X'], " + + "produces_full_deletes => true)", + ValidationException.class, + "the active 'op_mapping' does not map DELETE rows") + .build(); + + // -------------------------------------------------------------------------------------------- + // Row semantics x delete handling matrix + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram ROW_SEM_PARTIAL_DELETES = + TableTestProgram.of( + "to-changelog-row-sem-partial-deletes", + "row semantics: produces_full_deletes=false skips ChangelogNormalize and a partial DELETE row from the input passes through unchanged") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.all()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.INSERT, "Bob", 20L), + Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 10L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), + Row.ofKind(RowKind.DELETE, "Bob", null)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("op STRING", "name STRING", "score BIGINT") + .consumedValues( + "+I[INSERT, Alice, 10]", + "+I[INSERT, Bob, 20]", + "+I[UPDATE_BEFORE, Alice, 10]", + "+I[UPDATE_AFTER, Alice, 30]", + "+I[DELETE, Bob, null]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t, " + + "produces_full_deletes => false)") + .build(); + + public static final TableTestProgram ROW_SEM_FORCE_FULL_DELETES = + TableTestProgram.of( + "to-changelog-row-sem-force-full-deletes", + "row semantics: produces_full_deletes=true forces ChangelogNormalize to materialize the full DELETE row from an upsert source emitting key-only deletes") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.upsert()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + // Key-only delete: ChangelogNormalize fills the row. + Row.ofKind(RowKind.DELETE, "Alice", null)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("op STRING", "name STRING", "score BIGINT") + .consumedValues( + "+I[INSERT, Alice, 10]", "+I[DELETE, Alice, 10]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t, " + + "produces_full_deletes => true)") + .build(); + + // -------------------------------------------------------------------------------------------- + // Set semantics x delete handling matrix + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram SET_SEM_FORCE_PARTIAL_DELETES = + TableTestProgram.of( + "to-changelog-set-sem-force-partial-deletes", + "set semantics: produces_full_deletes=false nulls non-partition-key columns on DELETE even when the input row is fully populated") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.all()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.INSERT, "Bob", 20L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), + Row.ofKind(RowKind.DELETE, "Bob", 20L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("name STRING", "op STRING", "score BIGINT") + .consumedValues( + "+I[Alice, INSERT, 10]", + "+I[Bob, INSERT, 20]", + "+I[Alice, UPDATE_AFTER, 30]", + "+I[Bob, DELETE, null]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY name," + + "produces_full_deletes => false)") + .build(); + + public static final TableTestProgram SET_SEM_PARTIAL_DELETES = + TableTestProgram.of( + "to-changelog-set-sem-partial-deletes", + "set semantics: produces_full_deletes=false (default) lets a partial DELETE row from the input pass through with non-partition-key columns null") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.all()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.DELETE, "Alice", null)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("name STRING", "op STRING", "score BIGINT") + .consumedValues( + "+I[Alice, INSERT, 10]", "+I[Alice, DELETE, null]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY name)") + .build(); + + public static final TableTestProgram SET_SEM_FULL_DELETES = + TableTestProgram.of( + "to-changelog-set-sem-full-deletes", + "set semantics: produces_full_deletes=true on an input that already emits full deletes is a no-op for the planner and the full DELETE row reaches the output") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.all()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.DELETE, "Alice", 10L)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("name STRING", "op STRING", "score BIGINT") + .consumedValues( + "+I[Alice, INSERT, 10]", "+I[Alice, DELETE, 10]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY name, " + + "produces_full_deletes => true)") + .build(); + + public static final TableTestProgram SET_SEM_FORCE_FULL_DELETES = + TableTestProgram.of( + "to-changelog-set-sem-force-full-deletes", + "set semantics: produces_full_deletes=true forces ChangelogNormalize to materialize the full DELETE row from an upsert source emitting key-only deletes") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addMode(ChangelogMode.upsert()) + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 10L), + Row.ofKind(RowKind.INSERT, "Bob", 20L), + Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L), + Row.ofKind(RowKind.DELETE, "Bob", null)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("name STRING", "op STRING", "score BIGINT") + .consumedValues( + "+I[Alice, INSERT, 10]", + "+I[Bob, INSERT, 20]", + "+I[Alice, UPDATE_BEFORE, 10]", + "+I[Alice, UPDATE_AFTER, 30]", + "+I[Bob, DELETE, 20]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM TO_CHANGELOG(" + + "input => TABLE t PARTITION BY name, " + + "produces_full_deletes => true)") + .build(); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java index 7f6f93849c2..1f7347b027e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java @@ -144,4 +144,42 @@ public class ToChangelogTest extends TableTestBase { + "op_mapping => MAP['INSERT,UPDATE_AFTER', 'C'])", CHANGELOG_MODE); } + + @Test + void testUpsertSourceProducesFullDeletes() { + util.tableEnv() + .executeSql( + "CREATE TABLE upsert_source (" + + " id INT," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'changelog-mode' = 'I,UA,D'" + + ")"); + util.verifyRelPlan( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE upsert_source, " + + "produces_full_deletes => true)", + CHANGELOG_MODE); + } + + @Test + void testUpsertSourceKeyOnlyDeletes() { + util.tableEnv() + .executeSql( + "CREATE TABLE upsert_source (" + + " id INT," + + " name STRING," + + " PRIMARY KEY (id) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'changelog-mode' = 'I,UA,D'" + + ")"); + util.verifyRelPlan( + "SELECT * FROM TO_CHANGELOG(" + + "input => TABLE upsert_source, " + + "produces_full_deletes => false)", + CHANGELOG_MODE); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml index 7cea1058e5c..174addbf418 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml @@ -23,14 +23,14 @@ limitations under the License. <Resource name="ast"> <![CDATA[ LogicalProject(op=[$0], id=[$1], name=[$2]) -+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)]) ++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)]) +- LogicalProject(id=[$0], name=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, insert_only_source]]) ]]> </Resource> <Resource name="optimized rel plan"> <![CDATA[ -ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)], changelogMode=[I]) +ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)], changelogMode=[I]) +- TableSourceScan(table=[[default_catalog, default_database, insert_only_source]], fields=[id, name], changelogMode=[I]) ]]> </Resource> @@ -42,14 +42,14 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), D <Resource name="ast"> <![CDATA[ LogicalProject(id=[$0], op=[$1], name=[$2]) -+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)]) ++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)]) +- LogicalProject(id=[$0], name=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, retract_source]]) ]]> </Resource> <Resource name="optimized rel plan"> <![CDATA[ -ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)], changelogMode=[I]) +ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)], changelogMode=[I]) +- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D]) ]]> @@ -62,14 +62,14 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DEFAUL <Resource name="ast"> <![CDATA[ LogicalProject(op=[$0], id=[$1], name=[$2]) -+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)]) ++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)]) +- LogicalProject(id=[$0], name=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, retract_source]]) ]]> </Resource> <Resource name="optimized rel plan"> <![CDATA[ -ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)], changelogMode=[I]) +ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)], changelogMode=[I]) +- TableSourceScan(table=[[default_catalog, default_database, retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D]) ]]> </Resource> @@ -81,14 +81,14 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), D <Resource name="ast"> <![CDATA[ LogicalProject(id=[$0], op=[$1], name=[$2]) -+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'C', _UTF-16LE'DELETE':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'D'), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)]) ++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'C', _UTF-16LE'DELETE':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'D'), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)]) +- LogicalProject(id=[$0], name=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, upsert_source]]) ]]> </Resource> <Resource name="optimized rel plan"> <![CDATA[ -ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'C', _UTF-16LE'DELETE':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'D'), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)], changelogMode=[I]) +ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'C', _UTF-16LE'DELETE':VARCHAR(19) CHARACTER SET "UTF-16LE", _UTF-16LE'D'), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)], changelogMode=[I]) +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, upsert_source]], fields=[id, name], changelogMode=[I,UA,D]) ]]> @@ -101,14 +101,14 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DESCRI <Resource name="ast"> <![CDATA[ LogicalProject(id=[$0], op=[$1], name=[$2]) -+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER', _UTF-16LE'C'), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)]) ++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER', _UTF-16LE'C'), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)]) +- LogicalProject(id=[$0], name=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, upsert_source]]) ]]> </Resource> <Resource name="optimized rel plan"> <![CDATA[ -ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER', _UTF-16LE'C'), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)], changelogMode=[I]) +ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DESCRIPTOR(_UTF-16LE'op'), MAP(_UTF-16LE'INSERT,UPDATE_AFTER', _UTF-16LE'C'), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) name)], changelogMode=[I]) +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, upsert_source]], fields=[id, name], changelogMode=[I,UA,D]) ]]> @@ -121,14 +121,56 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), DESCRI <Resource name="ast"> <![CDATA[ LogicalProject(op=[$0], id=[$1], name=[$2]) -+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)]) ++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)]) +- LogicalProject(id=[$0], name=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, upsert_source]]) ]]> </Resource> <Resource name="optimized rel plan"> <![CDATA[ -ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)], changelogMode=[I]) +ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)], changelogMode=[I]) ++- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) + +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) + +- TableSourceScan(table=[[default_catalog, default_database, upsert_source]], fields=[id, name], changelogMode=[I,UA,D]) +]]> + </Resource> + </TestCase> + <TestCase name="testUpsertSourceKeyOnlyDeletes"> + <Resource name="sql"> + <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes => false)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(op=[$0], id=[$1], name=[$2]) ++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), false, DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)]) + +- LogicalProject(id=[$0], name=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, upsert_source]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), false, DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)], changelogMode=[I]) ++- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) + +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) + +- TableSourceScan(table=[[default_catalog, default_database, upsert_source]], fields=[id, name], changelogMode=[I,UA,D]) +]]> + </Resource> + </TestCase> + <TestCase name="testUpsertSourceProducesFullDeletes"> + <Resource name="sql"> + <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source, produces_full_deletes => true)]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(op=[$0], id=[$1], name=[$2]) ++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), true, DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)]) + +- LogicalProject(id=[$0], name=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, upsert_source]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), true, DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) name)], changelogMode=[I]) +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, upsert_source]], fields=[id, name], changelogMode=[I,UA,D]) diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json index 959c9473118..ea961c665ca 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json @@ -37,7 +37,7 @@ "priority" : 0 } ], "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>", - "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,name,score], rowType=[RecordType(VARCHAR(2147483647) op, VARCHAR(2147483647) name, BIGINT score)])", + "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[op,name,score], rowType=[RecordType(VARCHAR(2147483647) op, VARCHAR(2147483647) name, BIGINT score)])", "uid" : null, "functionCall" : { "kind" : "CALL", @@ -59,6 +59,11 @@ "syntax" : "SPECIAL", "internalName" : "$DEFAULT$1", "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$DEFAULT$1", + "type" : "BOOLEAN" }, { "kind" : "CALL", "syntax" : "SPECIAL", diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java index 0b6c6ca55b8..0110475ef8d 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java @@ -65,11 +65,13 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> { private final Map<RowKind, String> rawOpMap; private final int[] outputIndices; + private final boolean producesFullDelete; private transient Map<RowKind, StringData> opMap; private transient GenericRowData opRow; private transient JoinedRowData output; private transient ProjectedRowData projectedOutput; + private transient GenericRowData nullPayloadRow; @SuppressWarnings("unchecked") public ToChangelogFunction(final SpecializedContext context) { @@ -84,7 +86,30 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> { if (opMapping != null) { validateOpMap(this.rawOpMap, tableSemantics); } + final boolean producesFullDeletesArg = + callContext.getArgumentValue(3, Boolean.class).orElse(false); + validateProducesFullDeletes(producesFullDeletesArg, this.rawOpMap, tableSemantics); + this.outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics); + this.producesFullDelete = resolveProducesFullDelete(producesFullDeletesArg, tableSemantics); + } + + /** + * Decides whether this function emits full DELETE rows (input passed through unchanged) or + * partial DELETE rows (only identifying columns preserved, rest nulled). + * + * <p>The framework prepends partition-key columns to the output without consulting this + * function, so in set semantics partition keys are preserved on DELETE rows for free. In row + * semantics there is no key column to preserve, so the function passes the input through + * unchanged regardless of {@code produces_full_deletes}. + */ + private static boolean resolveProducesFullDelete( + final boolean producesFullDeletesArg, final TableSemantics tableSemantics) { + if (producesFullDeletesArg) { + return true; + } + final boolean hasPartitionBy = tableSemantics.partitionByColumns().length > 0; + return !hasPartitionBy; } @Override @@ -95,6 +120,7 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> { opRow = new GenericRowData(1); output = new JoinedRowData(); projectedOutput = ProjectedRowData.from(outputIndices); + nullPayloadRow = new GenericRowData(outputIndices.length); } /** @@ -145,17 +171,59 @@ public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> { } } + /** + * Rejects {@code produces_full_deletes=true} when the input changelog cannot produce DELETE + * rows (either the input mode does not contain DELETE, or the active {@code op_mapping} strips + * it). The parameter is then dead and likely a user mistake. + * + * <p>Lives here rather than in the input type strategy because {@link + * TableSemantics#changelogMode()} returns empty during type inference and is only populated at + * specialization time, which is when this constructor runs. + */ + private static void validateProducesFullDeletes( + final boolean producesFullDeletesArg, + final Map<RowKind, String> mapping, + final TableSemantics tableSemantics) { + if (!producesFullDeletesArg) { + return; + } + final ChangelogMode inputMode = tableSemantics.changelogMode().orElse(null); + if (inputMode == null) { + return; + } + if (!inputMode.contains(RowKind.DELETE)) { + throw new ValidationException( + String.format( + "Invalid 'produces_full_deletes' for TO_CHANGELOG: the input table " + + "only produces %s and never emits DELETE rows. Remove the " + + "'produces_full_deletes' argument.", + inputMode.getContainedKinds())); + } + if (!mapping.containsKey(RowKind.DELETE)) { + throw new ValidationException( + "Invalid 'produces_full_deletes' for TO_CHANGELOG: the active 'op_mapping' " + + "does not map DELETE rows, so no DELETE rows are emitted. Remove " + + "the 'produces_full_deletes' argument or add a DELETE entry to " + + "'op_mapping'."); + } + } + public void eval( final Context ctx, final RowData input, @Nullable final ColumnList op, - @Nullable final MapData opMapping) { + @Nullable final MapData opMapping, + @Nullable final Boolean producesFullDeletes) { final StringData opCode = opMap.get(input.getRowKind()); if (opCode == null) { return; } opRow.setField(0, opCode); - collect(output.replace(opRow, projectedOutput.replaceRow(input))); + final RowData payload = + (input.getRowKind() == RowKind.DELETE && !producesFullDelete) + ? nullPayloadRow + : projectedOutput.replaceRow(input); + collect(output.replace(opRow, payload)); } }
