This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9d92095e077 [FLINK-39261][table] Add `error_handling` parameter to
FROM_CHANGELOG PTF
9d92095e077 is described below
commit 9d92095e0775cae6e18fcb562e29a7217d6b5b12
Author: Ramin Gharib <[email protected]>
AuthorDate: Thu Apr 23 16:46:32 2026 +0200
[FLINK-39261][table] Add `error_handling` parameter to FROM_CHANGELOG PTF
This closes #27994.
---
.../docs/sql/reference/queries/changelog.md | 27 ++++++-
flink-python/pyflink/table/table.py | 14 +++-
.../java/org/apache/flink/table/api/Table.java | 12 ++-
.../functions/BuiltInFunctionDefinitions.java | 3 +-
.../inference/strategies/ErrorHandlingMode.java | 55 +++++++++++++
.../strategies/FromChangelogTypeStrategy.java | 64 ++++++++++++---
.../FromChangelogInputTypeStrategyTest.java | 91 ++++++++++++++++++----
.../exec/stream/FromChangelogSemanticTests.java | 2 +
.../exec/stream/FromChangelogTestPrograms.java | 52 +++++++++++++
.../planner/plan/stream/sql/FromChangelogTest.java | 3 +-
.../planner/plan/stream/sql/FromChangelogTest.xml | 10 +--
.../functions/ptf/FromChangelogFunction.java | 40 ++++++++--
12 files changed, 322 insertions(+), 51 deletions(-)
diff --git a/docs/content/docs/sql/reference/queries/changelog.md
b/docs/content/docs/sql/reference/queries/changelog.md
index c4361e4cb05..60773af2b27 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -52,7 +52,8 @@ SELECT * FROM FROM_CHANGELOG(
'ub', 'UPDATE_BEFORE',
'ua', 'UPDATE_AFTER',
'd', 'DELETE'
- ]]
+ ],]
+ [error_handling => 'FAIL' | 'SKIP']
)
```
@@ -61,8 +62,9 @@ SELECT * FROM FROM_CHANGELOG(
| Parameter | Required | Description
[...]
|:-------------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
| `input` | Yes | The input table. Must be append-only.
[...]
-| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. The column must exist in the input
table and be of type STRING. The column may be declared nullable, but a NULL
value at runtime fails the job with a `TableRuntimeException` — every changelog
row must carry an operation code.
[...]
-| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`,
`'u'`, `'d'`), values are Flink change operation names (`INSERT`,
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated
codes to map multiple codes to the same operation (e.g., `'c, r'`). Receiving
an op code not present in the mapping fails the job at runtime with a
`TableRuntimeException`. Each change operation [...]
+| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. The column must exist in the input
table and be of type STRING.
[...]
+| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`,
`'u'`, `'d'`), values are Flink change operation names (`INSERT`,
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated
codes to map multiple codes to the same operation (e.g., `'c, r'`). Each change
operation may appear at most once across all entries. |
+| `error_handling` | No | Controls behavior when an input row's operation code
is `NULL` or not present in the `op_mapping`. Valid values: `FAIL` (default) —
throw a `TableRuntimeException`, `SKIP` — silently drop the row. |
#### Default op_mapping
@@ -75,7 +77,7 @@ When `op_mapping` is omitted, the following standard names
are used. They allow
| `'UPDATE_AFTER'` | UPDATE_AFTER |
| `'DELETE'` | DELETE |
-Any input row whose op code is not present in the active mapping (default or
user-defined) fails the job at runtime with a `TableRuntimeException`.
+By default, any input row whose op code is `NULL` or not present in the active
mapping (default or user-defined) fails the job at runtime with a
`TableRuntimeException`. Set `error_handling => 'SKIP'` to silently drop those
rows instead.
### Output Schema
@@ -125,6 +127,23 @@ SELECT * FROM FROM_CHANGELOG(
-- The operation column named 'operation' is used instead of 'op'
```
+#### Invalid operation code handling
+
+Two `error_handling` modes are supported. The job can either fail upon an
invalid or unknown op code, or skip the row and continue processing.
+
+```sql
+-- Fail on unknown op codes (default behavior)
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream
+)
+
+-- Silently skip rows with NULL or unknown op codes
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream,
+ error_handling => 'SKIP'
+)
+```
+
#### Table API
```java
diff --git a/flink-python/pyflink/table/table.py
b/flink-python/pyflink/table/table.py
index ac54022a91a..bebf1cafa29 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -1229,12 +1229,15 @@ class Table(object):
The operation code column defaults to ``op``. By default, the codes
``INSERT``,
``UPDATE_BEFORE``, ``UPDATE_AFTER``, and ``DELETE`` are recognized;
pass
- ``op_mapping`` to use custom codes.
+ ``op_mapping`` to use custom codes. By default, the job fails at
runtime with a
+ ``TableRuntimeException`` when an input row's op code is ``NULL`` or
not present
+ in the mapping; pass ``error_handling => 'SKIP'`` to silently drop
those
+ rows instead.
Example:
::
- >>> from pyflink.table.expressions import descriptor, map_
+ >>> from pyflink.table.expressions import descriptor, lit, map_
>>> # Default: reads 'op' column with standard change operation
names
>>> result = cdc_stream.from_changelog()
>>> # With custom op column name
@@ -1249,8 +1252,13 @@ class Table(object):
... "ua", "UPDATE_AFTER",
... "d", "DELETE").as_argument("op_mapping")
... )
+ >>> # Silently skip rows with NULL or unmapped op codes instead of
failing
+ >>> result = cdc_stream.from_changelog(
+ ... lit("SKIP").as_argument("error_handling")
+ ... )
- :param arguments: Optional named arguments for ``op`` and
``op_mapping``.
+ :param arguments: Optional named arguments for ``op``, ``op_mapping``,
and
+ ``error_handling``.
:return: A dynamic :class:`~pyflink.table.Table` with the ``op``
column removed and
proper change operation semantics.
"""
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 5189f60dfe8..eb61371329c 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1463,7 +1463,9 @@ public interface Table extends Explainable<Table>,
Executable {
*
* <p>The operation code column defaults to {@code op}. By default, the
codes {@code INSERT},
* {@code UPDATE_BEFORE}, {@code UPDATE_AFTER}, and {@code DELETE} are
recognized; pass {@code
- * op_mapping} to use custom codes.
+ * op_mapping} to use custom codes. By default, the job fails at runtime
with a {@code
+ * TableRuntimeException} when an input row's op code is {@code NULL} or
not present in the
+ * mapping; pass {@code error_handling => 'SKIP'} to silently drop those
rows instead.
*
* <p>Optional arguments can be passed using named expressions:
*
@@ -1484,9 +1486,15 @@ public interface Table extends Explainable<Table>,
Executable {
* "ua", "UPDATE_AFTER",
* "d", "DELETE").asArgument("op_mapping")
* );
+ *
+ * // Silently skip rows with NULL or unmapped op codes instead of failing
+ * Table result = cdcStream.fromChangelog(
+ * lit("SKIP").asArgument("error_handling")
+ * );
* }</pre>
*
- * @param arguments optional named arguments for {@code op} and {@code
op_mapping}
+ * @param arguments optional named arguments for {@code op}, {@code
op_mapping}, and {@code
+ * error_handling}
* @return a dynamic {@link Table} with the op column removed and proper
change operation
* semantics
*/
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 0ee62f1116a..bbe38ca13d5 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -828,7 +828,8 @@ public final class BuiltInFunctionDefinitions {
StaticArgument.scalar(
"op_mapping",
DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()),
- true))
+ true),
+ StaticArgument.scalar("error_handling",
DataTypes.STRING(), true))
.changelogModeStrategy(ctx -> ChangelogMode.all())
.inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
.outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ErrorHandlingMode.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ErrorHandlingMode.java
new file mode 100644
index 00000000000..f41181a3d5f
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ErrorHandlingMode.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Shared {@code error_handling} parameter values for built-in process table
functions. Mode names
+ * are case-sensitive — values must be spelled in upper case.
+ *
+ * <ul>
+ * <li>{@code FAIL} — throw an exception when an error condition is
encountered (default, strict
+ * mode)
+ * <li>{@code SKIP} — silently drop the offending row
+ * </ul>
+ */
+@Internal
+public enum ErrorHandlingMode {
+ FAIL,
+ SKIP;
+
+ public static final ErrorHandlingMode DEFAULT_MODE = FAIL;
+
+ /**
+ * Returns the mode whose name exactly matches {@code name}, or {@link
Optional#empty()} if none
+ * matches. Matching is case-sensitive.
+ */
+ public static Optional<ErrorHandlingMode> fromName(final String name) {
+ return Arrays.stream(values()).filter(v ->
v.name().equals(name)).findFirst();
+ }
+
+ public static String validNames() {
+ return
Arrays.stream(values()).map(Enum::name).collect(Collectors.joining(", "));
+ }
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
index 4d7b25fbd60..d1d3c1c61da 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
@@ -42,11 +42,23 @@ import java.util.stream.Collectors;
@Internal
public final class FromChangelogTypeStrategy {
- private static final String DEFAULT_OP_COLUMN_NAME = "op";
+ // Positional argument indexes for FROM_CHANGELOG. Must match the order of
StaticArguments
+ // registered in BuiltInFunctionDefinitions#FROM_CHANGELOG; changing one
without the other
+ // silently breaks argument resolution.
+ public static final int ARG_TABLE = 0;
+ public static final int ARG_OP = 1;
+ public static final int ARG_OP_MAPPING = 2;
+ public static final int ARG_ERROR_HANDLING = 3;
+
+ public static final String DEFAULT_OP_COLUMN_NAME = "op";
private static final Set<String> VALID_ROW_KIND_NAMES =
Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
+ private static final String UPDATE_BEFORE = RowKind.UPDATE_BEFORE.name();
+
+ private static final String UPDATE_AFTER = RowKind.UPDATE_AFTER.name();
+
//
--------------------------------------------------------------------------------------------
// Input validation
//
--------------------------------------------------------------------------------------------
@@ -68,7 +80,7 @@ public final class FromChangelogTypeStrategy {
callContext -> {
final TableSemantics tableSemantics =
callContext
- .getTableSemantics(0)
+ .getTableSemantics(ARG_TABLE)
.orElseThrow(
() ->
new ValidationException(
@@ -80,8 +92,6 @@ public final class FromChangelogTypeStrategy {
return Optional.of(DataTypes.ROW(outputFields).notNull());
};
- private static final String UPDATE_BEFORE = RowKind.UPDATE_BEFORE.name();
- private static final String UPDATE_AFTER = RowKind.UPDATE_AFTER.name();
//
--------------------------------------------------------------------------------------------
// Helpers
@@ -112,12 +122,17 @@ public final class FromChangelogTypeStrategy {
return error;
}
+ error = validateErrorHandling(callContext, throwOnFailure);
+ if (error.isPresent()) {
+ return error;
+ }
+
return Optional.of(callContext.getArgumentDataTypes());
}
private static Optional<List<DataType>> validateTableArg(
final CallContext callContext, final boolean throwOnFailure) {
- if (callContext.getTableSemantics(0).isEmpty()) {
+ if (callContext.getTableSemantics(ARG_TABLE).isEmpty()) {
return callContext.fail(
throwOnFailure, "First argument must be a table for
FROM_CHANGELOG.");
}
@@ -126,7 +141,8 @@ public final class FromChangelogTypeStrategy {
private static Optional<List<DataType>> validateOpDescriptor(
final CallContext callContext, final boolean throwOnFailure) {
- final Optional<ColumnList> opDescriptor =
callContext.getArgumentValue(1, ColumnList.class);
+ final Optional<ColumnList> opDescriptor =
+ callContext.getArgumentValue(ARG_OP, ColumnList.class);
if (opDescriptor.isPresent() && opDescriptor.get().getNames().size()
!= 1) {
return callContext.fail(
throwOnFailure,
@@ -139,7 +155,7 @@ public final class FromChangelogTypeStrategy {
private static Optional<List<DataType>> validateOpColumn(
final CallContext callContext, final boolean throwOnFailure) {
- final TableSemantics tableSemantics =
callContext.getTableSemantics(0).get();
+ final TableSemantics tableSemantics =
callContext.getTableSemantics(ARG_TABLE).get();
final String opColumnName = resolveOpColumnName(callContext);
final List<Field> inputFields =
DataType.getFields(tableSemantics.dataType());
final Optional<Field> opField =
@@ -165,14 +181,14 @@ public final class FromChangelogTypeStrategy {
@SuppressWarnings("unchecked")
private static Optional<List<DataType>> validateOpMapping(
final CallContext callContext, final boolean throwOnFailure) {
- final boolean hasMappingArgProvided = !callContext.isArgumentNull(2);
- final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2);
+ final boolean hasMappingArgProvided =
!callContext.isArgumentNull(ARG_OP_MAPPING);
+ final boolean isMappingArgLiteral =
callContext.isArgumentLiteral(ARG_OP_MAPPING);
if (hasMappingArgProvided && !isMappingArgLiteral) {
return callContext.fail(
throwOnFailure, "The 'op_mapping' argument must be a
constant MAP literal.");
}
- final Optional<Map> opMapping = callContext.getArgumentValue(2,
Map.class);
+ final Optional<Map> opMapping =
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
if (opMapping.isPresent()) {
final Map<String, String> mapping = opMapping.get();
final Optional<List<DataType>> validationError =
@@ -231,9 +247,35 @@ public final class FromChangelogTypeStrategy {
return Optional.empty();
}
+ private static Optional<List<DataType>> validateErrorHandling(
+ final CallContext callContext, final boolean throwOnFailure) {
+ final boolean hasErrorHandlingArgProvided =
!callContext.isArgumentNull(ARG_ERROR_HANDLING);
+ final boolean isErrorHandlingArgLiteral =
callContext.isArgumentLiteral(ARG_ERROR_HANDLING);
+ if (hasErrorHandlingArgProvided && !isErrorHandlingArgLiteral) {
+ return callContext.fail(
+ throwOnFailure,
+ "The 'error_handling' argument must be a constant STRING
literal.");
+ }
+
+ final Optional<String> optionalErrorHandlingArg =
+ callContext.getArgumentValue(ARG_ERROR_HANDLING, String.class);
+ if (optionalErrorHandlingArg.isPresent()) {
+ final String errorHandlingMode = optionalErrorHandlingArg.get();
+ if (ErrorHandlingMode.fromName(errorHandlingMode).isEmpty()) {
+ return callContext.fail(
+ throwOnFailure,
+ String.format(
+ "Invalid value for argument 'error_handling':
'%s'. Valid values are: %s.",
+ errorHandlingMode,
ErrorHandlingMode.validNames()));
+ }
+ }
+
+ return Optional.empty();
+ }
+
private static String resolveOpColumnName(final CallContext callContext) {
return callContext
- .getArgumentValue(1, ColumnList.class)
+ .getArgumentValue(ARG_OP, ColumnList.class)
.filter(cl -> !cl.getNames().isEmpty())
.map(cl -> cl.getNames().get(0))
.orElse(DEFAULT_OP_COLUMN_NAME);
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
index d1b2a792182..322cf94a49e 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
@@ -24,7 +24,6 @@ import
org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
import org.apache.flink.table.types.inference.utils.TableSemanticsMock;
import org.apache.flink.types.ColumnList;
-import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
@@ -43,15 +42,17 @@ class FromChangelogInputTypeStrategyTest extends
InputTypeStrategiesTestBase {
private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING());
+ private static final DataType STRING_TYPE = DataTypes.STRING();
+
@Override
protected Stream<TestSpec> testData() {
return Stream.of(
// Valid: custom mapping with all change operations
TestSpec.forStrategy(
"Valid with custom mapping",
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
- .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE)
+ .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE, STRING_TYPE)
.calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
- .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
.calledWithLiteralAt(
2,
Map.of(
@@ -59,14 +60,15 @@ class FromChangelogInputTypeStrategyTest extends
InputTypeStrategiesTestBase {
"ub", "UPDATE_BEFORE",
"ua", "UPDATE_AFTER",
"d", "DELETE"))
- .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE),
+ .calledWithLiteralAt(3, "FAIL")
+ .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE, STRING_TYPE),
// Error: op column not found
TestSpec.forStrategy(
"Op column not found in schema",
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE)
.calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
- .calledWithLiteralAt(1,
ColumnList.of(List.of("nonexistent")))
+ .calledWithLiteralAt(1, ColumnList.of("nonexistent"))
.expectErrorMessage("The op column 'nonexistent' does
not exist"),
// Error: op column is not STRING
@@ -84,7 +86,7 @@ class FromChangelogInputTypeStrategyTest extends
InputTypeStrategiesTestBase {
DataTypes.FIELD("id",
DataTypes.INT()),
DataTypes.FIELD("op",
DataTypes.INT()),
DataTypes.FIELD("name",
DataTypes.STRING()))))
- .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
.expectErrorMessage("must be of STRING type"),
// Error: multi-column descriptor
@@ -93,7 +95,7 @@ class FromChangelogInputTypeStrategyTest extends
InputTypeStrategiesTestBase {
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE)
.calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
- .calledWithLiteralAt(1, ColumnList.of(List.of("a",
"b")))
+ .calledWithLiteralAt(1, ColumnList.of("a", "b"))
.expectErrorMessage("must contain exactly one column
name"),
// Error: invalid RowKind in op_mapping value
@@ -102,7 +104,7 @@ class FromChangelogInputTypeStrategyTest extends
InputTypeStrategiesTestBase {
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE)
.calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
- .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
.calledWithLiteralAt(2, Map.of("c", "INVALID_KIND"))
.expectErrorMessage("Unknown change operation:
'INVALID_KIND'"),
@@ -112,28 +114,30 @@ class FromChangelogInputTypeStrategyTest extends
InputTypeStrategiesTestBase {
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE)
.calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
- .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
.calledWithLiteralAt(2, Map.of("c", "INSERT", "r",
"INSERT"))
.expectErrorMessage("Duplicate change operation:
'INSERT'"),
// Valid: INSERT-only mapping (append mode, no updates)
TestSpec.forStrategy(
"Valid INSERT-only mapping",
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
- .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE)
+ .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE, STRING_TYPE)
.calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
- .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
.calledWithLiteralAt(2, Map.of("c, r", "INSERT"))
- .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE),
+ .calledWithLiteralAt(3, "FAIL")
+ .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE, STRING_TYPE),
// Valid: INSERT + DELETE mapping (no updates)
TestSpec.forStrategy(
"Valid INSERT and DELETE mapping",
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
- .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE)
+ .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE, STRING_TYPE)
.calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
- .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
.calledWithLiteralAt(2, Map.of("c", "INSERT", "d",
"DELETE"))
- .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE),
+ .calledWithLiteralAt(3, "FAIL")
+ .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE, STRING_TYPE),
// Error: UPDATE_AFTER without UPDATE_BEFORE not supported
TestSpec.forStrategy(
@@ -141,13 +145,66 @@ class FromChangelogInputTypeStrategyTest extends
InputTypeStrategiesTestBase {
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE)
.calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
- .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
.calledWithLiteralAt(
2,
Map.of(
"c", "INSERT",
"u", "UPDATE_AFTER",
"d", "DELETE"))
- .expectErrorMessage("must include UPDATE_BEFORE"));
+ .expectErrorMessage("must include UPDATE_BEFORE"),
+
+ // Error: Invalid error_handling mode
+ TestSpec.forStrategy(
+ "Invalid error_handling mode",
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+ .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE, STRING_TYPE)
+ .calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
+ .calledWithLiteralAt(
+ 2,
+ Map.of(
+ "c", "INSERT",
+ "ub", "UPDATE_BEFORE",
+ "ua", "UPDATE_AFTER",
+ "d", "DELETE"))
+ .calledWithLiteralAt(3, "INVALID_MODE")
+ .expectErrorMessage(
+ "Invalid value for argument 'error_handling':
'INVALID_MODE'. Valid values are: FAIL, SKIP."),
+
+ // Error: error_handling is case-sensitive — lowercase is
rejected
+ TestSpec.forStrategy(
+ "Lowercase error_handling mode is rejected",
+ FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+ .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE, STRING_TYPE)
+ .calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
+ .calledWithLiteralAt(
+ 2,
+ Map.of(
+ "c", "INSERT",
+ "ub", "UPDATE_BEFORE",
+ "ua", "UPDATE_AFTER",
+ "d", "DELETE"))
+ .calledWithLiteralAt(3, "skip")
+ .expectErrorMessage(
+ "Invalid value for argument 'error_handling':
'skip'. Valid values are: FAIL, SKIP."),
+
+ // Error: error_handling is case-sensitive — mixed case is
rejected
+ TestSpec.forStrategy(
+ "Mixed-case error_handling mode is rejected",
+ FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+ .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE,
MAP_TYPE, STRING_TYPE)
+ .calledWithTableSemanticsAt(0, new
TableSemanticsMock(TABLE_TYPE))
+ .calledWithLiteralAt(1, ColumnList.of("op"))
+ .calledWithLiteralAt(
+ 2,
+ Map.of(
+ "c", "INSERT",
+ "ub", "UPDATE_BEFORE",
+ "ua", "UPDATE_AFTER",
+ "d", "DELETE"))
+ .calledWithLiteralAt(3, "Fail")
+ .expectErrorMessage(
+ "Invalid value for argument 'error_handling':
'Fail'. Valid values are: FAIL, SKIP."));
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
index 6e936fb6cec..4643a51be52 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
@@ -42,6 +42,8 @@ public class FromChangelogSemanticTests extends
SemanticTestBase {
FromChangelogTestPrograms.DEFAULT_OP_MAPPING,
FromChangelogTestPrograms.CUSTOM_OP_MAPPING,
FromChangelogTestPrograms.CUSTOM_OP_NAME,
+ FromChangelogTestPrograms.SKIP_INVALID_OP_HANDLING,
+ FromChangelogTestPrograms.SKIP_NULL_OP_CODE,
FromChangelogTestPrograms.TABLE_API_DEFAULT,
FromChangelogTestPrograms.ROUND_TRIP,
FromChangelogTestPrograms.INVALID_OP_CODE,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
index 725b4a3d2d0..1858aabeed0 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
@@ -94,6 +94,58 @@ public class FromChangelogTestPrograms {
+ "op_mapping => MAP['c, r', 'INSERT',
'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])")
.build();
+ public static final TableTestProgram SKIP_INVALID_OP_HANDLING =
+ TableTestProgram.of(
+ "from-changelog-unmapped-codes-dropped",
+ "unmapped op codes are silently dropped when
configured")
+ .setupTableSource(
+ SourceTestStep.newBuilder("cdc_stream")
+ .addSchema(SIMPLE_CDC_SCHEMA)
+ .producedValues(
+ Row.of(1, "INSERT", "Alice"),
+ Row.of(2, "INSERT", "Bob"),
+ Row.of(1, "UNKNOWN", "Alice2"),
+ Row.of(2, "DELETE", "Bob"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT", "name STRING")
+ .consumedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"),
+ Row.ofKind(RowKind.DELETE, 2,
"Bob"))
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+ + "input => TABLE cdc_stream,"
+ + "error_handling => 'SKIP')")
+ .build();
+
+ public static final TableTestProgram SKIP_NULL_OP_CODE =
+ TableTestProgram.of(
+ "from-changelog-null-op-code-dropped",
+ "NULL op codes are silently dropped when
configured")
+ .setupTableSource(
+ SourceTestStep.newBuilder("cdc_stream")
+ .addSchema(SIMPLE_CDC_SCHEMA)
+ .producedValues(
+ Row.of(1, "INSERT", "Alice"),
+ Row.of(2, null, "Bob"),
+ Row.of(3, "INSERT", "Carol"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT", "name STRING")
+ .consumedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 3,
"Carol"))
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+ + "input => TABLE cdc_stream,"
+ + "error_handling => 'SKIP')")
+ .build();
+
/** Custom op column name via DESCRIPTOR. */
public static final TableTestProgram CUSTOM_OP_NAME =
TableTestProgram.of(
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
index 33e163d9f41..ba2c1a5690c 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
@@ -71,7 +71,8 @@ public class FromChangelogTest extends TableTestBase {
"SELECT * FROM FROM_CHANGELOG("
+ "input => TABLE cdc_stream, "
+ "op => DESCRIPTOR(__op), "
- + "op_mapping => MAP['c, r', 'INSERT', 'ub',
'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])",
+ + "op_mapping => MAP['c, r', 'INSERT', 'ub',
'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'], "
+ + "error_handling => 'SKIP')",
CHANGELOG_MODE);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
index 2eed936fc6e..614eb5456b9 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
@@ -18,19 +18,19 @@ limitations under the License.
<Root>
<TestCase name="testCustomOpMapping">
<Resource name="sql">
- <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream, op =>
DESCRIPTOR(__op), op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE',
'ua', 'UPDATE_AFTER', 'd', 'DELETE'])]]>
+ <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream, op =>
DESCRIPTOR(__op), op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE',
'ua', 'UPDATE_AFTER', 'd', 'DELETE'], error_handling => 'SKIP')]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(id=[$0], name=[$1])
-+- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0),
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE",
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE",
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16 [...]
++- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0),
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE",
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE",
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16 [...]
+- LogicalProject(id=[$0], __op=[$1], name=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database,
cdc_stream]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0),
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE",
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE",
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'DELE [...]
+ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0),
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE",
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE",
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'DELE [...]
+- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]],
fields=[id, __op, name], changelogMode=[I])
]]>
</Resource>
@@ -42,14 +42,14 @@ ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0),
DESCRIPTOR(_UTF-16LE'
<Resource name="ast">
<![CDATA[
LogicalProject(id=[$0], name=[$1])
-+- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id,
VARCHAR(2147483647) name)])
++- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id,
VARCHAR(2147483647) name)])
+- LogicalProject(id=[$0], op=[$1], name=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database,
cdc_stream]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[id,name],
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)],
changelogMode=[I,UB,UA,D])
+ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[id,name],
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)],
changelogMode=[I,UB,UA,D])
+- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]],
fields=[id, op, name], changelogMode=[I])
]]>
</Resource>
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
index bb932773f73..cc1f9deb21d 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
import org.apache.flink.table.functions.TableSemantics;
import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.strategies.ErrorHandlingMode;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.ColumnList;
import org.apache.flink.types.RowKind;
@@ -41,6 +42,12 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_ERROR_HANDLING;
+import static
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_OP;
+import static
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_OP_MAPPING;
+import static
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_TABLE;
+import static
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.DEFAULT_OP_COLUMN_NAME;
+
/**
* Runtime implementation of {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}.
*
@@ -56,7 +63,6 @@ public class FromChangelogFunction extends
BuiltInProcessTableFunction<RowData>
private static final long serialVersionUID = 1L;
- private static final String DEFAULT_OP_COLUMN_NAME = "op";
private static final Map<String, RowKind> DEFAULT_OP_MAPPING =
Map.of(
"INSERT", RowKind.INSERT,
@@ -67,6 +73,7 @@ public class FromChangelogFunction extends
BuiltInProcessTableFunction<RowData>
private final Map<String, RowKind> rawOpMap;
private final int opColumnIndex;
private final int[] outputIndices;
+ private final ErrorHandlingMode errorHandlingMode;
private transient HashMap<StringData, RowKind> opMap;
private transient ProjectedRowData projectedOutput;
@@ -77,7 +84,7 @@ public class FromChangelogFunction extends
BuiltInProcessTableFunction<RowData>
final TableSemantics tableSemantics =
callContext
- .getTableSemantics(0)
+ .getTableSemantics(ARG_TABLE)
.orElseThrow(() -> new IllegalStateException("Table
argument expected."));
final RowType inputType = (RowType)
tableSemantics.dataType().getLogicalType();
@@ -91,6 +98,12 @@ public class FromChangelogFunction extends
BuiltInProcessTableFunction<RowData>
.toArray();
this.rawOpMap = buildOpMap(callContext);
+
+ this.errorHandlingMode =
+ callContext
+ .getArgumentValue(ARG_ERROR_HANDLING, String.class)
+ .flatMap(ErrorHandlingMode::fromName)
+ .orElse(ErrorHandlingMode.DEFAULT_MODE);
}
@Override
@@ -103,7 +116,7 @@ public class FromChangelogFunction extends
BuiltInProcessTableFunction<RowData>
private static String resolveOpColumnName(final CallContext callContext) {
return callContext
- .getArgumentValue(1, ColumnList.class)
+ .getArgumentValue(ARG_OP, ColumnList.class)
.map(cl -> cl.getNames().get(0))
.orElse(DEFAULT_OP_COLUMN_NAME);
}
@@ -114,7 +127,7 @@ public class FromChangelogFunction extends
BuiltInProcessTableFunction<RowData>
*/
private static Map<String, RowKind> buildOpMap(CallContext callContext) {
return callContext
- .getArgumentValue(2, Map.class)
+ .getArgumentValue(ARG_OP_MAPPING, Map.class)
.map(FromChangelogFunction::parseOpMapping)
.orElse(DEFAULT_OP_MAPPING);
}
@@ -134,22 +147,35 @@ public class FromChangelogFunction extends
BuiltInProcessTableFunction<RowData>
final Context ctx,
final RowData input,
@Nullable final ColumnList op,
- @Nullable final MapData opMapping) {
+ @Nullable final MapData opMapping,
+ @Nullable final StringData errorHandling) {
if (input.isNullAt(opColumnIndex)) {
- throw new TableRuntimeException(
+ handleInvalidOp(
"Received NULL op code. Every changelog row must carry an
operation code.");
+ return;
}
final StringData opCode = input.getString(opColumnIndex);
final RowKind rowKind = opMap.get(opCode);
if (rowKind == null) {
- throw new TableRuntimeException(
+ handleInvalidOp(
String.format(
"Received invalid op code '%s'. Defined op codes
are: %s.",
opCode, opMap.keySet()));
+ return;
}
projectedOutput.replaceRow(input);
projectedOutput.setRowKind(rowKind);
collect(projectedOutput);
}
+
+ private void handleInvalidOp(final String failureMessage) {
+ switch (errorHandlingMode) {
+ case FAIL:
+ throw new TableRuntimeException(failureMessage);
+ case SKIP:
+ // silently drop the row
+ break;
+ }
+ }
}