This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-2.3 in repository https://gitbox.apache.org/repos/asf/flink.git
commit d510e4eac13c49a4e785a7caac9b5dd51781471c Author: Ramin Gharib <[email protected]> AuthorDate: Tue Apr 21 17:01:09 2026 +0200 [FLINK-39495][table] Fix FROM_CHANGELOG silently dropping rows with unmapped operation codes This closes #27973. (cherry picked from commit b12302d87e53df5bd191918faad4dda80fbcba85) --- .../docs/sql/reference/queries/changelog.md | 8 ++- .../FromChangelogInputTypeStrategyTest.java | 14 ----- .../exec/stream/FromChangelogSemanticTests.java | 5 +- .../exec/stream/FromChangelogTestPrograms.java | 73 ++++++++++++++-------- .../functions/ptf/FromChangelogFunction.java | 10 ++- 5 files changed, 65 insertions(+), 45 deletions(-) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index f4449299388..c4361e4cb05 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -61,8 +61,8 @@ SELECT * FROM FROM_CHANGELOG( | Parameter | Required | Description [...] |:-------------|:---------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] | `input` | Yes | The input table. Must be append-only. [...] -| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. [...] -| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, `'u'`, `'d'`), values are Flink change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once acro [...] +| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. The column may be declared nullable, but a NULL value at runtime fails the job with a `TableRuntimeException` — every changelog row must carry an operation code. [...] +| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, `'u'`, `'d'`), values are Flink change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). Receiving an op code not present in the mapping fails the job at runtime with a `TableRuntimeException`. Each change operation [...] #### Default op_mapping @@ -75,6 +75,8 @@ When `op_mapping` is omitted, the following standard names are used. They allow | `'UPDATE_AFTER'` | UPDATE_AFTER | | `'DELETE'` | DELETE | +Any input row whose op code is not present in the active mapping (default or user-defined) fails the job at runtime with a `TableRuntimeException`. + ### Output Schema The output contains all input columns except the operation code (e.g., op) column, which is interpreted by Flink's SQL engine and removed. Each output row carries the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE). @@ -90,6 +92,7 @@ The output contains all input columns except the operation code (e.g., op) colum ```sql -- Input (append-only): -- +I[id:1, op:'INSERT', name:'Alice'] +-- +I[id:2, op:'INSERT', name:'Bob'] -- +I[id:1, op:'UPDATE_BEFORE', name:'Alice'] -- +I[id:1, op:'UPDATE_AFTER', name:'Alice2'] -- +I[id:2, op:'DELETE', name:'Bob'] @@ -100,6 +103,7 @@ SELECT * FROM FROM_CHANGELOG( -- Output (updating table): -- +I[id:1, name:'Alice'] +-- +I[id:2, name:'Bob'] -- -U[id:1, name:'Alice'] -- +U[id:1, name:'Alice2'] -- -D[id:2, name:'Bob'] diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java index c059598df66..d1b2a792182 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java @@ -61,20 +61,6 @@ class FromChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase { "d", "DELETE")) .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE), - // Valid: retract-style mapping with UPDATE_BEFORE - TestSpec.forStrategy("Valid with UPDATE_BEFORE", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) - .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE) - .calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE)) - .calledWithLiteralAt(1, ColumnList.of(List.of("op"))) - .calledWithLiteralAt( - 2, - Map.of( - "c", "INSERT", - "ub", "UPDATE_BEFORE", - "ua", "UPDATE_AFTER", - "d", "DELETE")) - .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE), - // Error: op column not found TestSpec.forStrategy( "Op column not found in schema", FROM_CHANGELOG_INPUT_TYPE_STRATEGY) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java index 022a8d75450..6e936fb6cec 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java @@ -41,9 +41,10 @@ public class FromChangelogSemanticTests extends SemanticTestBase { return List.of( FromChangelogTestPrograms.DEFAULT_OP_MAPPING, FromChangelogTestPrograms.CUSTOM_OP_MAPPING, - FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED, FromChangelogTestPrograms.CUSTOM_OP_NAME, FromChangelogTestPrograms.TABLE_API_DEFAULT, - FromChangelogTestPrograms.ROUND_TRIP); + FromChangelogTestPrograms.ROUND_TRIP, + FromChangelogTestPrograms.INVALID_OP_CODE, + FromChangelogTestPrograms.NULL_OP_CODE); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java index d5554b12124..56f2c422b1a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; +import org.apache.flink.table.api.TableRuntimeException; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.test.program.SinkTestStep; import org.apache.flink.table.test.program.SourceTestStep; @@ -93,32 +94,6 @@ public class FromChangelogTestPrograms { + "op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])") .build(); - public static final TableTestProgram UNMAPPED_CODES_DROPPED = - TableTestProgram.of( - "from-changelog-unmapped-codes-dropped", - "unmapped op codes are silently dropped") - .setupTableSource( - SourceTestStep.newBuilder("cdc_stream") - .addSchema(SIMPLE_CDC_SCHEMA) - .producedValues( - Row.of(1, "INSERT", "Alice"), - Row.of(2, "INSERT", "Bob"), - Row.of(1, "UNKNOWN", "Alice2"), - Row.of(2, "DELETE", "Bob")) - .build()) - .setupTableSink( - SinkTestStep.newBuilder("sink") - .addSchema("id INT", "name STRING") - .consumedValues( - Row.ofKind(RowKind.INSERT, 1, "Alice"), - Row.ofKind(RowKind.INSERT, 2, "Bob"), - Row.ofKind(RowKind.DELETE, 2, "Bob")) - .build()) - .runSql( - "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" - + "input => TABLE cdc_stream)") - .build(); - /** Custom op column name via DESCRIPTOR. */ public static final TableTestProgram CUSTOM_OP_NAME = TableTestProgram.of( @@ -207,4 +182,50 @@ public class FromChangelogTestPrograms { "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + "input => TABLE changelog_view)") .build(); + + // -------------------------------------------------------------------------------------------- + // Error validation tests + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram INVALID_OP_CODE = + TableTestProgram.of( + "from-changelog-invalid-op-code", + "fails when input contains an op code not in the mapping") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema(SIMPLE_CDC_SCHEMA) + .producedValues(Row.of(1, "UNKNOWN", "Alice")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("id INT", "name STRING") + .consumedValues(new Row[0]) + .build()) + .runFailingSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream)", + TableRuntimeException.class, + "Received invalid op code 'UNKNOWN'") + .build(); + + public static final TableTestProgram NULL_OP_CODE = + TableTestProgram.of( + "from-changelog-null-op-code", + "fails when input contains a NULL op code") + .setupTableSource( + SourceTestStep.newBuilder("cdc_stream") + .addSchema(SIMPLE_CDC_SCHEMA) + .producedValues(Row.of(1, null, "Alice")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema("id INT", "name STRING") + .consumedValues(new Row[0]) + .build()) + .runFailingSql( + "INSERT INTO sink SELECT * FROM FROM_CHANGELOG(" + + "input => TABLE cdc_stream)", + TableRuntimeException.class, + "Received NULL op code") + .build(); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java index 38249aa02bd..bb932773f73 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java @@ -19,6 +19,7 @@ package org.apache.flink.table.runtime.functions.ptf; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableRuntimeException; import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; @@ -134,10 +135,17 @@ public class FromChangelogFunction extends BuiltInProcessTableFunction<RowData> final RowData input, @Nullable final ColumnList op, @Nullable final MapData opMapping) { + if (input.isNullAt(opColumnIndex)) { + throw new TableRuntimeException( + "Received NULL op code. Every changelog row must carry an operation code."); + } final StringData opCode = input.getString(opColumnIndex); final RowKind rowKind = opMap.get(opCode); if (rowKind == null) { - return; + throw new TableRuntimeException( + String.format( + "Received invalid op code '%s'. Defined op codes are: %s.", + opCode, opMap.keySet())); } projectedOutput.replaceRow(input);
