This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 47349536b09 [FLINK-39349][table] Support deletion flag in TO_CHANGELOG
47349536b09 is described below
commit 47349536b09ef9c0b8731ed6d1ec4ebd0ce886b8
Author: Gustavo de Morais <[email protected]>
AuthorDate: Mon Mar 30 16:05:27 2026 +0200
[FLINK-39349][table] Support deletion flag in TO_CHANGELOG
This closes #27847.
---
.../docs/sql/reference/queries/changelog.md | 27 ++++++++--
.../apache/flink/table/api/PartitionedTable.java | 12 +++--
.../strategies/ToChangelogTypeStrategy.java | 54 ++++++++++++++++----
.../exec/stream/ToChangelogSemanticTests.java | 4 +-
.../nodes/exec/stream/ToChangelogTestPrograms.java | 59 ++++++++++++++++++++--
.../runtime/functions/ptf/ToChangelogFunction.java | 15 ++++--
6 files changed, 148 insertions(+), 23 deletions(-)
diff --git a/docs/content/docs/sql/reference/queries/changelog.md
b/docs/content/docs/sql/reference/queries/changelog.md
index d7e6e29668a..15ad336f4d0 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -36,7 +36,7 @@ Flink SQL provides built-in process table functions (PTFs)
for working with chan
## TO_CHANGELOG
-The `TO_CHANGELOG` PTF converts a dynamic table (i.e. an updating table) into
an append-only table with an explicit operation code column. Each input row -
regardless of its original `RowKind` (INSERT, UPDATE_BEFORE, UPDATE_AFTER,
DELETE) - is emitted as an INSERT-only row with a string column indicating the
original operation.
+The `TO_CHANGELOG` PTF converts a dynamic table (i.e. an updating table) into
an append-only table with an explicit operation code column. Each input row -
regardless of its original change operation (INSERT, UPDATE_BEFORE,
UPDATE_AFTER, DELETE) - is emitted as an INSERT-only row with a string column
indicating the original operation.
This is useful when you need to materialize changelog events into a downstream
system that only supports appends (e.g., a message queue, log store, or
append-only file sink). It is also useful to filter out certain types of
updates, for example DELETEs.
@@ -56,13 +56,13 @@ SELECT * FROM TO_CHANGELOG(
|:-------------|:---------|:------------|
| `input` | Yes | The input table. Must include `PARTITION BY` for
parallel execution. Accepts insert-only, retract, and upsert tables. |
| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. |
-| `op_mapping` | No | A `MAP<STRING, STRING>` mapping `RowKind` names to
custom output codes. When provided, only mapped RowKinds are forwarded -
unmapped events are dropped. |
+| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation
names to custom output codes. Keys can contain comma-separated names to map
multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When
provided, only mapped operations are forwarded - unmapped events are dropped.
Each change operation may appear at most once across all entries. |
#### Default op_mapping
-When `op_mapping` is omitted, all four RowKinds are mapped to their standard
names:
+When `op_mapping` is omitted, all four change operations are mapped to their
standard names:
-| RowKind | Output value |
+| Change Operation | Output value |
|:----------------|:------------------|
| INSERT | `'INSERT'` |
| UPDATE_BEFORE | `'UPDATE_BEFORE'` |
@@ -122,6 +122,19 @@ SELECT * FROM TO_CHANGELOG(
-- op_code values are 'I' and 'U' instead of full names
```
+#### Deletion flag pattern
+
+```sql
+SELECT * FROM TO_CHANGELOG(
+ input => TABLE my_aggregation PARTITION BY id,
+ op => DESCRIPTOR(deleted),
+ op_mapping => MAP['INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true']
+)
+-- INSERT and UPDATE_AFTER produce deleted='false'
+-- DELETE produces deleted='true'
+-- UPDATE_BEFORE is dropped (not in the mapping)
+```
+
#### Table API
```java
@@ -133,6 +146,12 @@ Table result = myTable.partitionBy($("id")).toChangelog(
descriptor("op_code").asArgument("op"),
map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
);
+
+// Deletion flag pattern: comma-separated keys map multiple change operations
to the same code
+Table result = myTable.partitionBy($("id")).toChangelog(
+ descriptor("deleted").asArgument("op"),
+ map("INSERT, UPDATE_AFTER", "false", "DELETE",
"true").asArgument("op_mapping")
+);
```
{{< top >}}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
index 8d5f1c91b28..7db750a296d 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
@@ -144,9 +144,9 @@ public interface PartitionedTable {
* Converts this dynamic table into an append-only table with an explicit
operation code column
* using the built-in {@code TO_CHANGELOG} process table function.
*
- * <p>Each input row - regardless of its original RowKind - is emitted as
an INSERT-only row
- * with a string {@code "op"} column indicating the original operation
(INSERT, UPDATE_AFTER,
- * DELETE, etc.).
+ * <p>Each input row - regardless of its original change operation - is
emitted as an
+ * INSERT-only row with a string {@code "op"} column indicating the
original operation (INSERT,
+ * UPDATE_AFTER, DELETE, etc.).
*
* <p>Optional arguments can be passed using named expressions:
*
@@ -159,6 +159,12 @@ public interface PartitionedTable {
* descriptor("op_code").asArgument("op"),
* map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
* );
+ *
+ * // Deletion flag pattern: comma-separated keys map multiple change
operations to the same code
+ * table.partitionBy($("id")).toChangelog(
+ * descriptor("deleted").asArgument("op"),
+ * map("INSERT, UPDATE_AFTER", "false", "DELETE",
"true").asArgument("op_mapping")
+ * );
* }</pre>
*
* @param arguments optional named arguments for {@code op} and {@code
op_mapping}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
index cb04e720209..d13d52a49c7 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.types.inference.TypeStrategy;
import org.apache.flink.types.ColumnList;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -140,21 +141,56 @@ public final class ToChangelogTypeStrategy {
final Optional<Map> opMapping = callContext.getArgumentValue(2,
Map.class);
if (opMapping.isPresent()) {
- final boolean hasInvalidMappingKey =
- opMapping.get().keySet().stream()
- .anyMatch(
- key ->
- !(key instanceof String)
- ||
!VALID_ROW_KIND_NAMES.contains(key));
- if (hasInvalidMappingKey) {
- return callContext.fail(
- throwOnFailure, "Invalid target mapping for argument
'op_mapping'.");
+ final Optional<List<DataType>> validationError =
+ validateOpMappingKeys(callContext, opMapping.get(),
throwOnFailure);
+ if (validationError.isPresent()) {
+ return validationError;
}
}
return Optional.of(callContext.getArgumentDataTypes());
}
+ /**
+ * Validates op_mapping keys. Keys may be comma-separated (e.g., {@code
"INSERT, UPDATE_AFTER"})
+ * to map multiple change operations to the same output code. Whitespace
around names is
+ * trimmed. Names are case-sensitive and must match exactly (e.g., {@code
INSERT}, not {@code
+ * insert}). Each name must be valid and appear at most once across all
entries.
+ */
+ @SuppressWarnings("rawtypes")
+ private static Optional<List<DataType>> validateOpMappingKeys(
+ final CallContext callContext, final Map opMapping, final boolean
throwOnFailure) {
+ final Set<String> allRowKindsSeen = new HashSet<>();
+ for (final Object key : opMapping.keySet()) {
+ if (!(key instanceof String)) {
+ return callContext.fail(
+ throwOnFailure, "Invalid target mapping for argument
'op_mapping'.");
+ }
+ final String[] rowKindNames = ((String) key).split(",");
+ for (final String rawName : rowKindNames) {
+ final String rowKindName = rawName.trim();
+ if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) {
+ return callContext.fail(
+ throwOnFailure,
+ String.format(
+ "Invalid target mapping for argument
'op_mapping'. "
+ + "Unknown change operation: '%s'.
Valid values are: %s.",
+ rowKindName, VALID_ROW_KIND_NAMES));
+ }
+ final boolean isDuplicate = !allRowKindsSeen.add(rowKindName);
+ if (isDuplicate) {
+ return callContext.fail(
+ throwOnFailure,
+ String.format(
+ "Invalid target mapping for argument
'op_mapping'. "
+ + "Duplicate change operation:
'%s'.",
+ rowKindName));
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
private static String resolveOpColumnName(final CallContext callContext) {
return callContext
.getArgumentValue(1, ColumnList.class)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
index 798044effa7..706f7abd1d3 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -46,8 +46,10 @@ public class ToChangelogSemanticTests extends
SemanticTestBase {
ToChangelogTestPrograms.TABLE_API_DEFAULT,
ToChangelogTestPrograms.LAG_ON_UPSERT_VIA_CHANGELOG,
ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG,
+ ToChangelogTestPrograms.DELETION_FLAG,
ToChangelogTestPrograms.MISSING_PARTITION_BY,
ToChangelogTestPrograms.INVALID_DESCRIPTOR,
- ToChangelogTestPrograms.INVALID_OP_MAPPING);
+ ToChangelogTestPrograms.INVALID_OP_MAPPING,
+ ToChangelogTestPrograms.DUPLICATE_ROW_KIND);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index 22602bcf38a..bfc67097112 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -96,7 +96,7 @@ public class ToChangelogTestPrograms {
public static final TableTestProgram CUSTOM_OP_MAPPING =
TableTestProgram.of(
"to-changelog-custom-op-mapping",
- "custom op_mapping maps RowKinds to user-defined
codes and drops unmapped")
+ "custom op_mapping maps change operations to
user-defined codes and drops unmapped")
.setupTableSource(
SourceTestStep.newBuilder("t")
.addSchema(
@@ -322,6 +322,46 @@ public class ToChangelogTestPrograms {
+ "FROM orders_changelog")
.build();
+ //
--------------------------------------------------------------------------------------------
+ // Use case: deletion flag pattern (comma-separated change operation keys)
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Kafka Connect style deletion flag: INSERT and UPDATE_AFTER both produce
deleted='false' and
+ * DELETE produces deleted='true'. UPDATE_BEFORE is silently dropped.
+ */
+ public static final TableTestProgram DELETION_FLAG =
+ TableTestProgram.of(
+ "to-changelog-deletion-flag",
+ "comma-separated change operations produce
deletion flag output")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.all())
+ .producedValues(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
"Alice", 10L),
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ Row.ofKind(RowKind.DELETE, "Bob",
20L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("name STRING", "deleted
STRING", "score BIGINT")
+ .consumedValues(
+ "+I[Alice, false, 10]",
+ "+I[Bob, false, 20]",
+ "+I[Alice, false, 30]",
+ "+I[Bob, true, 20]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t PARTITION BY name, "
+ + "op => DESCRIPTOR(deleted), "
+ + "op_mapping => MAP['INSERT,
UPDATE_AFTER', 'false', 'DELETE', 'true'])")
+ .build();
+
//
--------------------------------------------------------------------------------------------
// Error validation tests
//
--------------------------------------------------------------------------------------------
@@ -353,13 +393,26 @@ public class ToChangelogTestPrograms {
public static final TableTestProgram INVALID_OP_MAPPING =
TableTestProgram.of(
"to-changelog-invalid-op-mapping",
- "fails when op_mapping has invalid RowKind name")
+ "fails when op_mapping has invalid change
operation name")
.setupTableSource(SIMPLE_SOURCE)
.runFailingSql(
"SELECT * FROM TO_CHANGELOG("
+ "input => TABLE t PARTITION BY id, "
+ "op_mapping => MAP['INVALID_KIND',
'X'])",
ValidationException.class,
- "Invalid target mapping for argument
'op_mapping'.")
+ "Unknown change operation: 'INVALID_KIND'")
+ .build();
+
+ public static final TableTestProgram DUPLICATE_ROW_KIND =
+ TableTestProgram.of(
+ "to-changelog-duplicate-rowkind",
+ "fails when a change operation appears in multiple
op_mapping entries")
+ .setupTableSource(SIMPLE_SOURCE)
+ .runFailingSql(
+ "SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t PARTITION BY id, "
+ + "op_mapping => MAP['INSERT, DELETE',
'A', 'DELETE', 'B'])",
+ ValidationException.class,
+ "Duplicate change operation: 'DELETE'")
.build();
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
index 75211e164f5..f16ea33375f 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
@@ -109,13 +109,22 @@ public class ToChangelogFunction extends
BuiltInProcessTableFunction<RowData> {
return IntStream.range(0, fieldCount).filter(i ->
!partitionKeySet.contains(i)).toArray();
}
+ /**
+ * Builds a RowKind-to-output-code map. Keys may be comma-separated (e.g.,
"INSERT,
+ * UPDATE_AFTER") to map multiple RowKinds to the same output code.
+ */
private static Map<RowKind, String> buildOpMap(@Nullable final Map<String,
String> opMapping) {
if (opMapping == null) {
return new EnumMap<>(DEFAULT_OP_MAPPING);
}
- final Map<RowKind, String> map = new EnumMap<>(RowKind.class);
- opMapping.forEach((name, code) -> map.put(RowKind.valueOf(name),
code));
- return map;
+ final Map<RowKind, String> result = new EnumMap<>(RowKind.class);
+ opMapping.forEach(
+ (commaSeparatedRowKinds, outputCode) -> {
+ for (final String rawName :
commaSeparatedRowKinds.split(",")) {
+ result.put(RowKind.valueOf(rawName.trim()),
outputCode);
+ }
+ });
+ return result;
}
public void eval(