This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b12302d87e5 [FLINK-39495][table] Fix FROM_CHANGELOG silently dropping
rows with unmapped operation codes
b12302d87e5 is described below
commit b12302d87e53df5bd191918faad4dda80fbcba85
Author: Ramin Gharib <[email protected]>
AuthorDate: Tue Apr 21 17:01:09 2026 +0200
[FLINK-39495][table] Fix FROM_CHANGELOG silently dropping rows with
unmapped operation codes
This closes #27973.
---
.../docs/sql/reference/queries/changelog.md | 8 ++-
.../FromChangelogInputTypeStrategyTest.java | 14 -----
.../exec/stream/FromChangelogSemanticTests.java | 5 +-
.../exec/stream/FromChangelogTestPrograms.java | 73 ++++++++++++++--------
.../functions/ptf/FromChangelogFunction.java | 10 ++-
5 files changed, 65 insertions(+), 45 deletions(-)
diff --git a/docs/content/docs/sql/reference/queries/changelog.md
b/docs/content/docs/sql/reference/queries/changelog.md
index f4449299388..c4361e4cb05 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -61,8 +61,8 @@ SELECT * FROM FROM_CHANGELOG(
| Parameter | Required | Description
[...]
|:-------------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| `input` | Yes | The input table. Must be append-only.
[...]
-| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. The column must exist in the input
table and be of type STRING.
[...]
-| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`,
`'u'`, `'d'`), values are Flink change operation names (`INSERT`,
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated
codes to map multiple codes to the same operation (e.g., `'c, r'`). When
provided, only mapped codes are forwarded - unmapped codes are dropped. Each
change operation may appear at most once acro [...]
+| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. The column must exist in the input
table and be of type STRING. The column may be declared nullable, but a NULL
value at runtime fails the job with a `TableRuntimeException` — every changelog
row must carry an operation code.
[...]
+| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`,
`'u'`, `'d'`), values are Flink change operation names (`INSERT`,
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated
codes to map multiple codes to the same operation (e.g., `'c, r'`). Receiving
an op code not present in the mapping fails the job at runtime with a
`TableRuntimeException`. Each change operation [...]
#### Default op_mapping
@@ -75,6 +75,8 @@ When `op_mapping` is omitted, the following standard names
are used. They allow
| `'UPDATE_AFTER'` | UPDATE_AFTER |
| `'DELETE'` | DELETE |
+Any input row whose op code is not present in the active mapping (default or
user-defined) fails the job at runtime with a `TableRuntimeException`.
+
### Output Schema
The output contains all input columns except the operation code (e.g., op)
column, which is interpreted by Flink's SQL engine and removed. Each output row
carries the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER,
or DELETE).
@@ -90,6 +92,7 @@ The output contains all input columns except the operation
code (e.g., op) colum
```sql
-- Input (append-only):
-- +I[id:1, op:'INSERT', name:'Alice']
+-- +I[id:2, op:'INSERT', name:'Bob']
-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
-- +I[id:2, op:'DELETE', name:'Bob']
@@ -100,6 +103,7 @@ SELECT * FROM FROM_CHANGELOG(
-- Output (updating table):
-- +I[id:1, name:'Alice']
+-- +I[id:2, name:'Bob']
-- -U[id:1, name:'Alice']
-- +U[id:1, name:'Alice2']
-- -D[id:2, name:'Bob']
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
index c059598df66..d1b2a792182 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
@@ -61,20 +61,6 @@ class FromChangelogInputTypeStrategyTest extends
InputTypeStrategiesTestBase {
"d", "DELETE"))
.expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE),
- // Valid: retract-style mapping with UPDATE_BEFORE
- TestSpec.forStrategy("Valid with UPDATE_BEFORE",
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
- .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE)
- .calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
- .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
- .calledWithLiteralAt(
- 2,
- Map.of(
- "c", "INSERT",
- "ub", "UPDATE_BEFORE",
- "ua", "UPDATE_AFTER",
- "d", "DELETE"))
- .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE),
-
// Error: op column not found
TestSpec.forStrategy(
"Op column not found in schema",
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
index 022a8d75450..6e936fb6cec 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
@@ -41,9 +41,10 @@ public class FromChangelogSemanticTests extends
SemanticTestBase {
return List.of(
FromChangelogTestPrograms.DEFAULT_OP_MAPPING,
FromChangelogTestPrograms.CUSTOM_OP_MAPPING,
- FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED,
FromChangelogTestPrograms.CUSTOM_OP_NAME,
FromChangelogTestPrograms.TABLE_API_DEFAULT,
- FromChangelogTestPrograms.ROUND_TRIP);
+ FromChangelogTestPrograms.ROUND_TRIP,
+ FromChangelogTestPrograms.INVALID_OP_CODE,
+ FromChangelogTestPrograms.NULL_OP_CODE);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
index d5554b12124..56f2c422b1a 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import org.apache.flink.table.api.TableRuntimeException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.test.program.SinkTestStep;
import org.apache.flink.table.test.program.SourceTestStep;
@@ -93,32 +94,6 @@ public class FromChangelogTestPrograms {
+ "op_mapping => MAP['c, r', 'INSERT',
'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])")
.build();
- public static final TableTestProgram UNMAPPED_CODES_DROPPED =
- TableTestProgram.of(
- "from-changelog-unmapped-codes-dropped",
- "unmapped op codes are silently dropped")
- .setupTableSource(
- SourceTestStep.newBuilder("cdc_stream")
- .addSchema(SIMPLE_CDC_SCHEMA)
- .producedValues(
- Row.of(1, "INSERT", "Alice"),
- Row.of(2, "INSERT", "Bob"),
- Row.of(1, "UNKNOWN", "Alice2"),
- Row.of(2, "DELETE", "Bob"))
- .build())
- .setupTableSink(
- SinkTestStep.newBuilder("sink")
- .addSchema("id INT", "name STRING")
- .consumedValues(
- Row.ofKind(RowKind.INSERT, 1,
"Alice"),
- Row.ofKind(RowKind.INSERT, 2,
"Bob"),
- Row.ofKind(RowKind.DELETE, 2,
"Bob"))
- .build())
- .runSql(
- "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
- + "input => TABLE cdc_stream)")
- .build();
-
/** Custom op column name via DESCRIPTOR. */
public static final TableTestProgram CUSTOM_OP_NAME =
TableTestProgram.of(
@@ -207,4 +182,50 @@ public class FromChangelogTestPrograms {
"INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+ "input => TABLE changelog_view)")
.build();
+
+ //
--------------------------------------------------------------------------------------------
+ // Error validation tests
+ //
--------------------------------------------------------------------------------------------
+
+ public static final TableTestProgram INVALID_OP_CODE =
+ TableTestProgram.of(
+ "from-changelog-invalid-op-code",
+ "fails when input contains an op code not in the
mapping")
+ .setupTableSource(
+ SourceTestStep.newBuilder("cdc_stream")
+ .addSchema(SIMPLE_CDC_SCHEMA)
+ .producedValues(Row.of(1, "UNKNOWN",
"Alice"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT", "name STRING")
+ .consumedValues(new Row[0])
+ .build())
+ .runFailingSql(
+ "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+ + "input => TABLE cdc_stream)",
+ TableRuntimeException.class,
+ "Received invalid op code 'UNKNOWN'")
+ .build();
+
+ public static final TableTestProgram NULL_OP_CODE =
+ TableTestProgram.of(
+ "from-changelog-null-op-code",
+ "fails when input contains a NULL op code")
+ .setupTableSource(
+ SourceTestStep.newBuilder("cdc_stream")
+ .addSchema(SIMPLE_CDC_SCHEMA)
+ .producedValues(Row.of(1, null, "Alice"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT", "name STRING")
+ .consumedValues(new Row[0])
+ .build())
+ .runFailingSql(
+ "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+ + "input => TABLE cdc_stream)",
+ TableRuntimeException.class,
+ "Received NULL op code")
+ .build();
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
index 38249aa02bd..bb932773f73 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.runtime.functions.ptf;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableRuntimeException;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -134,10 +135,17 @@ public class FromChangelogFunction extends
BuiltInProcessTableFunction<RowData>
final RowData input,
@Nullable final ColumnList op,
@Nullable final MapData opMapping) {
+ if (input.isNullAt(opColumnIndex)) {
+ throw new TableRuntimeException(
+ "Received NULL op code. Every changelog row must carry an
operation code.");
+ }
final StringData opCode = input.getString(opColumnIndex);
final RowKind rowKind = opMap.get(opCode);
if (rowKind == null) {
- return;
+ throw new TableRuntimeException(
+ String.format(
+ "Received invalid op code '%s'. Defined op codes
are: %s.",
+ opCode, opMap.keySet()));
}
projectedOutput.replaceRow(input);