This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 47349536b09 [FLINK-39349][table] Support deletion flag in TO_CHANGELOG
47349536b09 is described below

commit 47349536b09ef9c0b8731ed6d1ec4ebd0ce886b8
Author: Gustavo de Morais <[email protected]>
AuthorDate: Mon Mar 30 16:05:27 2026 +0200

    [FLINK-39349][table] Support deletion flag in TO_CHANGELOG
    
    This closes #27847.
---
 .../docs/sql/reference/queries/changelog.md        | 27 ++++++++--
 .../apache/flink/table/api/PartitionedTable.java   | 12 +++--
 .../strategies/ToChangelogTypeStrategy.java        | 54 ++++++++++++++++----
 .../exec/stream/ToChangelogSemanticTests.java      |  4 +-
 .../nodes/exec/stream/ToChangelogTestPrograms.java | 59 ++++++++++++++++++++--
 .../runtime/functions/ptf/ToChangelogFunction.java | 15 ++++--
 6 files changed, 148 insertions(+), 23 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
index d7e6e29668a..15ad336f4d0 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -36,7 +36,7 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 ## TO_CHANGELOG
 
-The `TO_CHANGELOG` PTF converts a dynamic table (i.e. an updating table) into 
an append-only table with an explicit operation code column. Each input row - 
regardless of its original `RowKind` (INSERT, UPDATE_BEFORE, UPDATE_AFTER, 
DELETE) - is emitted as an INSERT-only row with a string column indicating the 
original operation.
+The `TO_CHANGELOG` PTF converts a dynamic table (i.e. an updating table) into 
an append-only table with an explicit operation code column. Each input row - 
regardless of its original change operation (INSERT, UPDATE_BEFORE, 
UPDATE_AFTER, DELETE) - is emitted as an INSERT-only row with a string column 
indicating the original operation.
 
 This is useful when you need to materialize changelog events into a downstream 
system that only supports appends (e.g., a message queue, log store, or 
append-only file sink). It is also useful to filter out certain types of 
updates, for example DELETEs.
 
@@ -56,13 +56,13 @@ SELECT * FROM TO_CHANGELOG(
 |:-------------|:---------|:------------|
 | `input`      | Yes      | The input table. Must include `PARTITION BY` for 
parallel execution. Accepts insert-only, retract, and upsert tables. |
 | `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. |
-| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping `RowKind` names to 
custom output codes. When provided, only mapped RowKinds are forwarded - 
unmapped events are dropped. |
+| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping change operation 
names to custom output codes. Keys can contain comma-separated names to map 
multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When 
provided, only mapped operations are forwarded - unmapped events are dropped. 
Each change operation may appear at most once across all entries. |
 
 #### Default op_mapping
 
-When `op_mapping` is omitted, all four RowKinds are mapped to their standard 
names:
+When `op_mapping` is omitted, all four change operations are mapped to their 
standard names:
 
-| RowKind         | Output value      |
+| Change Operation | Output value      |
 |:----------------|:------------------|
 | INSERT          | `'INSERT'`        |
 | UPDATE_BEFORE   | `'UPDATE_BEFORE'` |
@@ -122,6 +122,19 @@ SELECT * FROM TO_CHANGELOG(
 -- op_code values are 'I' and 'U' instead of full names
 ```
 
+#### Deletion flag pattern
+
+```sql
+SELECT * FROM TO_CHANGELOG(
+  input => TABLE my_aggregation PARTITION BY id,
+  op => DESCRIPTOR(deleted),
+  op_mapping => MAP['INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true']
+)
+-- INSERT and UPDATE_AFTER produce deleted='false'
+-- DELETE produces deleted='true'
+-- UPDATE_BEFORE is dropped (not in the mapping)
+```
+
 #### Table API
 
 ```java
@@ -133,6 +146,12 @@ Table result = myTable.partitionBy($("id")).toChangelog(
     descriptor("op_code").asArgument("op"),
     map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
 );
+
+// Deletion flag pattern: comma-separated keys map multiple change operations 
to the same code
+Table result = myTable.partitionBy($("id")).toChangelog(
+    descriptor("deleted").asArgument("op"),
+    map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
+);
 ```
 
 {{< top >}}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
index 8d5f1c91b28..7db750a296d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
@@ -144,9 +144,9 @@ public interface PartitionedTable {
      * Converts this dynamic table into an append-only table with an explicit 
operation code column
      * using the built-in {@code TO_CHANGELOG} process table function.
      *
-     * <p>Each input row - regardless of its original RowKind - is emitted as 
an INSERT-only row
-     * with a string {@code "op"} column indicating the original operation 
(INSERT, UPDATE_AFTER,
-     * DELETE, etc.).
+     * <p>Each input row - regardless of its original change operation - is 
emitted as an
+     * INSERT-only row with a string {@code "op"} column indicating the 
original operation (INSERT,
+     * UPDATE_AFTER, DELETE, etc.).
      *
      * <p>Optional arguments can be passed using named expressions:
      *
@@ -159,6 +159,12 @@ public interface PartitionedTable {
      *     descriptor("op_code").asArgument("op"),
      *     map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
      * );
+     *
+     * // Deletion flag pattern: comma-separated keys map multiple change 
operations to the same code
+     * table.partitionBy($("id")).toChangelog(
+     *     descriptor("deleted").asArgument("op"),
+     *     map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
+     * );
      * }</pre>
      *
      * @param arguments optional named arguments for {@code op} and {@code 
op_mapping}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
index cb04e720209..d13d52a49c7 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.types.inference.TypeStrategy;
 import org.apache.flink.types.ColumnList;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -140,21 +141,56 @@ public final class ToChangelogTypeStrategy {
 
         final Optional<Map> opMapping = callContext.getArgumentValue(2, 
Map.class);
         if (opMapping.isPresent()) {
-            final boolean hasInvalidMappingKey =
-                    opMapping.get().keySet().stream()
-                            .anyMatch(
-                                    key ->
-                                            !(key instanceof String)
-                                                    || 
!VALID_ROW_KIND_NAMES.contains(key));
-            if (hasInvalidMappingKey) {
-                return callContext.fail(
-                        throwOnFailure, "Invalid target mapping for argument 
'op_mapping'.");
+            final Optional<List<DataType>> validationError =
+                    validateOpMappingKeys(callContext, opMapping.get(), 
throwOnFailure);
+            if (validationError.isPresent()) {
+                return validationError;
             }
         }
 
         return Optional.of(callContext.getArgumentDataTypes());
     }
 
+    /**
+     * Validates op_mapping keys. Keys may be comma-separated (e.g., {@code 
"INSERT, UPDATE_AFTER"})
+     * to map multiple change operations to the same output code. Whitespace 
around names is
+     * trimmed. Names are case-sensitive and must match exactly (e.g., {@code 
INSERT}, not {@code
+     * insert}). Each name must be valid and appear at most once across all 
entries.
+     */
+    @SuppressWarnings("rawtypes")
+    private static Optional<List<DataType>> validateOpMappingKeys(
+            final CallContext callContext, final Map opMapping, final boolean 
throwOnFailure) {
+        final Set<String> allRowKindsSeen = new HashSet<>();
+        for (final Object key : opMapping.keySet()) {
+            if (!(key instanceof String)) {
+                return callContext.fail(
+                        throwOnFailure, "Invalid target mapping for argument 
'op_mapping'.");
+            }
+            final String[] rowKindNames = ((String) key).split(",");
+            for (final String rawName : rowKindNames) {
+                final String rowKindName = rawName.trim();
+                if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) {
+                    return callContext.fail(
+                            throwOnFailure,
+                            String.format(
+                                    "Invalid target mapping for argument 
'op_mapping'. "
+                                            + "Unknown change operation: '%s'. 
Valid values are: %s.",
+                                    rowKindName, VALID_ROW_KIND_NAMES));
+                }
+                final boolean isDuplicate = !allRowKindsSeen.add(rowKindName);
+                if (isDuplicate) {
+                    return callContext.fail(
+                            throwOnFailure,
+                            String.format(
+                                    "Invalid target mapping for argument 
'op_mapping'. "
+                                            + "Duplicate change operation: 
'%s'.",
+                                    rowKindName));
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
     private static String resolveOpColumnName(final CallContext callContext) {
         return callContext
                 .getArgumentValue(1, ColumnList.class)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
index 798044effa7..706f7abd1d3 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -46,8 +46,10 @@ public class ToChangelogSemanticTests extends 
SemanticTestBase {
                 ToChangelogTestPrograms.TABLE_API_DEFAULT,
                 ToChangelogTestPrograms.LAG_ON_UPSERT_VIA_CHANGELOG,
                 ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG,
+                ToChangelogTestPrograms.DELETION_FLAG,
                 ToChangelogTestPrograms.MISSING_PARTITION_BY,
                 ToChangelogTestPrograms.INVALID_DESCRIPTOR,
-                ToChangelogTestPrograms.INVALID_OP_MAPPING);
+                ToChangelogTestPrograms.INVALID_OP_MAPPING,
+                ToChangelogTestPrograms.DUPLICATE_ROW_KIND);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index 22602bcf38a..bfc67097112 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -96,7 +96,7 @@ public class ToChangelogTestPrograms {
     public static final TableTestProgram CUSTOM_OP_MAPPING =
             TableTestProgram.of(
                             "to-changelog-custom-op-mapping",
-                            "custom op_mapping maps RowKinds to user-defined 
codes and drops unmapped")
+                            "custom op_mapping maps change operations to 
user-defined codes and drops unmapped")
                     .setupTableSource(
                             SourceTestStep.newBuilder("t")
                                     .addSchema(
@@ -322,6 +322,46 @@ public class ToChangelogTestPrograms {
                                     + "FROM orders_changelog")
                     .build();
 
+    // 
--------------------------------------------------------------------------------------------
+    // Use case: deletion flag pattern (comma-separated change operation keys)
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Kafka Connect style deletion flag: INSERT and UPDATE_AFTER both produce 
deleted='false' and
+     * DELETE produces deleted='true'. UPDATE_BEFORE is silently dropped.
+     */
+    public static final TableTestProgram DELETION_FLAG =
+            TableTestProgram.of(
+                            "to-changelog-deletion-flag",
+                            "comma-separated change operations produce 
deletion flag output")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "deleted 
STRING", "score BIGINT")
+                                    .consumedValues(
+                                            "+I[Alice, false, 10]",
+                                            "+I[Bob, false, 20]",
+                                            "+I[Alice, false, 30]",
+                                            "+I[Bob, true, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name, "
+                                    + "op => DESCRIPTOR(deleted), "
+                                    + "op_mapping => MAP['INSERT, 
UPDATE_AFTER', 'false', 'DELETE', 'true'])")
+                    .build();
+
     // 
--------------------------------------------------------------------------------------------
     // Error validation tests
     // 
--------------------------------------------------------------------------------------------
@@ -353,13 +393,26 @@ public class ToChangelogTestPrograms {
     public static final TableTestProgram INVALID_OP_MAPPING =
             TableTestProgram.of(
                             "to-changelog-invalid-op-mapping",
-                            "fails when op_mapping has invalid RowKind name")
+                            "fails when op_mapping has invalid change 
operation name")
                     .setupTableSource(SIMPLE_SOURCE)
                     .runFailingSql(
                             "SELECT * FROM TO_CHANGELOG("
                                     + "input => TABLE t PARTITION BY id, "
                                     + "op_mapping => MAP['INVALID_KIND', 
'X'])",
                             ValidationException.class,
-                            "Invalid target mapping for argument 
'op_mapping'.")
+                            "Unknown change operation: 'INVALID_KIND'")
+                    .build();
+
+    public static final TableTestProgram DUPLICATE_ROW_KIND =
+            TableTestProgram.of(
+                            "to-changelog-duplicate-rowkind",
+                            "fails when a change operation appears in multiple 
op_mapping entries")
+                    .setupTableSource(SIMPLE_SOURCE)
+                    .runFailingSql(
+                            "SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY id, "
+                                    + "op_mapping => MAP['INSERT, DELETE', 
'A', 'DELETE', 'B'])",
+                            ValidationException.class,
+                            "Duplicate change operation: 'DELETE'")
                     .build();
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
index 75211e164f5..f16ea33375f 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
@@ -109,13 +109,22 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         return IntStream.range(0, fieldCount).filter(i -> 
!partitionKeySet.contains(i)).toArray();
     }
 
+    /**
+     * Builds a RowKind-to-output-code map. Keys may be comma-separated (e.g., 
"INSERT,
+     * UPDATE_AFTER") to map multiple RowKinds to the same output code.
+     */
     private static Map<RowKind, String> buildOpMap(@Nullable final Map<String, 
String> opMapping) {
         if (opMapping == null) {
             return new EnumMap<>(DEFAULT_OP_MAPPING);
         }
-        final Map<RowKind, String> map = new EnumMap<>(RowKind.class);
-        opMapping.forEach((name, code) -> map.put(RowKind.valueOf(name), 
code));
-        return map;
+        final Map<RowKind, String> result = new EnumMap<>(RowKind.class);
+        opMapping.forEach(
+                (commaSeparatedRowKinds, outputCode) -> {
+                    for (final String rawName : 
commaSeparatedRowKinds.split(",")) {
+                        result.put(RowKind.valueOf(rawName.trim()), 
outputCode);
+                    }
+                });
+        return result;
     }
 
     public void eval(

Reply via email to