This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b12302d87e5 [FLINK-39495][table] Fix FROM_CHANGELOG silently dropping 
rows with unmapped operation codes
b12302d87e5 is described below

commit b12302d87e53df5bd191918faad4dda80fbcba85
Author: Ramin Gharib <[email protected]>
AuthorDate: Tue Apr 21 17:01:09 2026 +0200

    [FLINK-39495][table] Fix FROM_CHANGELOG silently dropping rows with 
unmapped operation codes
    
    This closes #27973.
---
 .../docs/sql/reference/queries/changelog.md        |  8 ++-
 .../FromChangelogInputTypeStrategyTest.java        | 14 -----
 .../exec/stream/FromChangelogSemanticTests.java    |  5 +-
 .../exec/stream/FromChangelogTestPrograms.java     | 73 ++++++++++++++--------
 .../functions/ptf/FromChangelogFunction.java       | 10 ++-
 5 files changed, 65 insertions(+), 45 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
index f4449299388..c4361e4cb05 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -61,8 +61,8 @@ SELECT * FROM FROM_CHANGELOG(
 | Parameter    | Required | Description                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 
|:-------------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
 | `input`      | Yes      | The input table. Must be append-only.              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING.                                                    
                                                                                
                                                                                
                                                                                
                    [...]
-| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined codes 
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, 
`'u'`, `'d'`), values are Flink change operation names (`INSERT`, 
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated 
codes to map multiple codes to the same operation (e.g., `'c, r'`). When 
provided, only mapped codes are forwarded - unmapped codes are dropped. Each 
change operation may appear at most once acro [...]
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING. The column may be declared nullable, but a NULL 
value at runtime fails the job with a `TableRuntimeException` — every changelog 
row must carry an operation code.                                               
                                                                                
                       [...]
+| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined codes 
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, 
`'u'`, `'d'`), values are Flink change operation names (`INSERT`, 
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated 
codes to map multiple codes to the same operation (e.g., `'c, r'`). Receiving 
an op code not present in the mapping fails the job at runtime with a 
`TableRuntimeException`. Each change operation  [...]
 
 #### Default op_mapping
 
@@ -75,6 +75,8 @@ When `op_mapping` is omitted, the following standard names 
are used. They allow
 | `'UPDATE_AFTER'`   | UPDATE_AFTER      |
 | `'DELETE'`         | DELETE            |
 
+Any input row whose op code is not present in the active mapping (default or 
user-defined) fails the job at runtime with a `TableRuntimeException`.
+
 ### Output Schema
 
 The output contains all input columns except the operation code (e.g., op) 
column, which is interpreted by Flink's SQL engine and removed. Each output row 
carries the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, 
or DELETE).
@@ -90,6 +92,7 @@ The output contains all input columns except the operation 
code (e.g., op) colum
 ```sql
 -- Input (append-only):
 -- +I[id:1, op:'INSERT',        name:'Alice']
+-- +I[id:2, op:'INSERT',        name:'Bob']
 -- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
 -- +I[id:1, op:'UPDATE_AFTER',  name:'Alice2']
 -- +I[id:2, op:'DELETE',        name:'Bob']
@@ -100,6 +103,7 @@ SELECT * FROM FROM_CHANGELOG(
 
 -- Output (updating table):
 -- +I[id:1, name:'Alice']
+-- +I[id:2, name:'Bob']
 -- -U[id:1, name:'Alice']
 -- +U[id:1, name:'Alice2']
 -- -D[id:2, name:'Bob']
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
index c059598df66..d1b2a792182 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
@@ -61,20 +61,6 @@ class FromChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                         "d", "DELETE"))
                         .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE),
 
-                // Valid: retract-style mapping with UPDATE_BEFORE
-                TestSpec.forStrategy("Valid with UPDATE_BEFORE", 
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
-                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
-                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
-                        .calledWithLiteralAt(
-                                2,
-                                Map.of(
-                                        "c", "INSERT",
-                                        "ub", "UPDATE_BEFORE",
-                                        "ua", "UPDATE_AFTER",
-                                        "d", "DELETE"))
-                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE),
-
                 // Error: op column not found
                 TestSpec.forStrategy(
                                 "Op column not found in schema", 
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
index 022a8d75450..6e936fb6cec 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
@@ -41,9 +41,10 @@ public class FromChangelogSemanticTests extends 
SemanticTestBase {
         return List.of(
                 FromChangelogTestPrograms.DEFAULT_OP_MAPPING,
                 FromChangelogTestPrograms.CUSTOM_OP_MAPPING,
-                FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED,
                 FromChangelogTestPrograms.CUSTOM_OP_NAME,
                 FromChangelogTestPrograms.TABLE_API_DEFAULT,
-                FromChangelogTestPrograms.ROUND_TRIP);
+                FromChangelogTestPrograms.ROUND_TRIP,
+                FromChangelogTestPrograms.INVALID_OP_CODE,
+                FromChangelogTestPrograms.NULL_OP_CODE);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
index d5554b12124..56f2c422b1a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.flink.table.api.TableRuntimeException;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.SourceTestStep;
@@ -93,32 +94,6 @@ public class FromChangelogTestPrograms {
                                     + "op_mapping => MAP['c, r', 'INSERT', 
'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])")
                     .build();
 
-    public static final TableTestProgram UNMAPPED_CODES_DROPPED =
-            TableTestProgram.of(
-                            "from-changelog-unmapped-codes-dropped",
-                            "unmapped op codes are silently dropped")
-                    .setupTableSource(
-                            SourceTestStep.newBuilder("cdc_stream")
-                                    .addSchema(SIMPLE_CDC_SCHEMA)
-                                    .producedValues(
-                                            Row.of(1, "INSERT", "Alice"),
-                                            Row.of(2, "INSERT", "Bob"),
-                                            Row.of(1, "UNKNOWN", "Alice2"),
-                                            Row.of(2, "DELETE", "Bob"))
-                                    .build())
-                    .setupTableSink(
-                            SinkTestStep.newBuilder("sink")
-                                    .addSchema("id INT", "name STRING")
-                                    .consumedValues(
-                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
-                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"),
-                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
-                                    .build())
-                    .runSql(
-                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
-                                    + "input => TABLE cdc_stream)")
-                    .build();
-
     /** Custom op column name via DESCRIPTOR. */
     public static final TableTestProgram CUSTOM_OP_NAME =
             TableTestProgram.of(
@@ -207,4 +182,50 @@ public class FromChangelogTestPrograms {
                             "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
                                     + "input => TABLE changelog_view)")
                     .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Error validation tests
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram INVALID_OP_CODE =
+            TableTestProgram.of(
+                            "from-changelog-invalid-op-code",
+                            "fails when input contains an op code not in the 
mapping")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema(SIMPLE_CDC_SCHEMA)
+                                    .producedValues(Row.of(1, "UNKNOWN", 
"Alice"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedValues(new Row[0])
+                                    .build())
+                    .runFailingSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream)",
+                            TableRuntimeException.class,
+                            "Received invalid op code 'UNKNOWN'")
+                    .build();
+
+    public static final TableTestProgram NULL_OP_CODE =
+            TableTestProgram.of(
+                            "from-changelog-null-op-code",
+                            "fails when input contains a NULL op code")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema(SIMPLE_CDC_SCHEMA)
+                                    .producedValues(Row.of(1, null, "Alice"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedValues(new Row[0])
+                                    .build())
+                    .runFailingSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream)",
+                            TableRuntimeException.class,
+                            "Received NULL op code")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
index 38249aa02bd..bb932773f73 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.runtime.functions.ptf;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableRuntimeException;
 import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -134,10 +135,17 @@ public class FromChangelogFunction extends 
BuiltInProcessTableFunction<RowData>
             final RowData input,
             @Nullable final ColumnList op,
             @Nullable final MapData opMapping) {
+        if (input.isNullAt(opColumnIndex)) {
+            throw new TableRuntimeException(
+                    "Received NULL op code. Every changelog row must carry an 
operation code.");
+        }
         final StringData opCode = input.getString(opColumnIndex);
         final RowKind rowKind = opMap.get(opCode);
         if (rowKind == null) {
-            return;
+            throw new TableRuntimeException(
+                    String.format(
+                            "Received invalid op code '%s'. Defined op codes 
are: %s.",
+                            opCode, opMap.keySet()));
         }
 
         projectedOutput.replaceRow(input);

Reply via email to