This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d92095e077 [FLINK-39261][table] Add `error_handling` parameter to 
FROM_CHANGELOG PTF
9d92095e077 is described below

commit 9d92095e0775cae6e18fcb562e29a7217d6b5b12
Author: Ramin Gharib <[email protected]>
AuthorDate: Thu Apr 23 16:46:32 2026 +0200

    [FLINK-39261][table] Add `error_handling` parameter to FROM_CHANGELOG PTF
    
    This closes #27994.
---
 .../docs/sql/reference/queries/changelog.md        | 27 ++++++-
 flink-python/pyflink/table/table.py                | 14 +++-
 .../java/org/apache/flink/table/api/Table.java     | 12 ++-
 .../functions/BuiltInFunctionDefinitions.java      |  3 +-
 .../inference/strategies/ErrorHandlingMode.java    | 55 +++++++++++++
 .../strategies/FromChangelogTypeStrategy.java      | 64 ++++++++++++---
 .../FromChangelogInputTypeStrategyTest.java        | 91 ++++++++++++++++++----
 .../exec/stream/FromChangelogSemanticTests.java    |  2 +
 .../exec/stream/FromChangelogTestPrograms.java     | 52 +++++++++++++
 .../planner/plan/stream/sql/FromChangelogTest.java |  3 +-
 .../planner/plan/stream/sql/FromChangelogTest.xml  | 10 +--
 .../functions/ptf/FromChangelogFunction.java       | 40 ++++++++--
 12 files changed, 322 insertions(+), 51 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
index c4361e4cb05..60773af2b27 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -52,7 +52,8 @@ SELECT * FROM FROM_CHANGELOG(
       'ub', 'UPDATE_BEFORE',
       'ua', 'UPDATE_AFTER',
       'd', 'DELETE'
-  ]]
+  ],]
+  [error_handling => 'FAIL' | 'SKIP']
 )
 ```
 
@@ -61,8 +62,9 @@ SELECT * FROM FROM_CHANGELOG(
 | Parameter    | Required | Description                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 
|:-------------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
 | `input`      | Yes      | The input table. Must be append-only.              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING. The column may be declared nullable, but a NULL 
value at runtime fails the job with a `TableRuntimeException` — every changelog 
row must carry an operation code.                                               
                                                                                
                       [...]
-| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined codes 
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, 
`'u'`, `'d'`), values are Flink change operation names (`INSERT`, 
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated 
codes to map multiple codes to the same operation (e.g., `'c, r'`). Receiving 
an op code not present in the mapping fails the job at runtime with a 
`TableRuntimeException`. Each change operation  [...]
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING.                                                    
                                                                                
                                                                                
                                                                                
                    [...]
+| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined codes 
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, 
`'u'`, `'d'`), values are Flink change operation names (`INSERT`, 
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated 
codes to map multiple codes to the same operation (e.g., `'c, r'`). Each change 
operation may appear at most once across all entries. |
+| `error_handling` | No | Controls behavior when an input row's operation code 
is `NULL` or not present in the `op_mapping`. Valid values: `FAIL` (default) — 
throw a `TableRuntimeException`, `SKIP` — silently drop the row. |
 
 #### Default op_mapping
 
@@ -75,7 +77,7 @@ When `op_mapping` is omitted, the following standard names 
are used. They allow
 | `'UPDATE_AFTER'`   | UPDATE_AFTER      |
 | `'DELETE'`         | DELETE            |
 
-Any input row whose op code is not present in the active mapping (default or 
user-defined) fails the job at runtime with a `TableRuntimeException`.
+By default, any input row whose op code is `NULL` or not present in the active 
mapping (default or user-defined) fails the job at runtime with a 
`TableRuntimeException`. Set `error_handling => 'SKIP'` to silently drop those 
rows instead.
 
 ### Output Schema
 
@@ -125,6 +127,23 @@ SELECT * FROM FROM_CHANGELOG(
 -- The operation column named 'operation' is used instead of 'op'
 ```
 
+#### Invalid operation code handling
+
+Two `error_handling` modes are supported. The job can either fail upon an 
invalid or unknown op code, or skip the row and continue processing.
+
+```sql
+-- Fail on unknown op codes (default behavior)
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream
+)
+
+-- Silently skip rows with NULL or unknown op codes
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream,
+  error_handling => 'SKIP'
+)
+```
+
 #### Table API
 
 ```java
diff --git a/flink-python/pyflink/table/table.py 
b/flink-python/pyflink/table/table.py
index ac54022a91a..bebf1cafa29 100644
--- a/flink-python/pyflink/table/table.py
+++ b/flink-python/pyflink/table/table.py
@@ -1229,12 +1229,15 @@ class Table(object):
 
         The operation code column defaults to ``op``. By default, the codes 
``INSERT``,
         ``UPDATE_BEFORE``, ``UPDATE_AFTER``, and ``DELETE`` are recognized; 
pass
-        ``op_mapping`` to use custom codes.
+        ``op_mapping`` to use custom codes. By default, the job fails at 
runtime with a
+        ``TableRuntimeException`` when an input row's op code is ``NULL`` or 
not present
+        in the mapping; pass ``error_handling => 'SKIP'`` to silently drop 
those
+        rows instead.
 
         Example:
         ::
 
-            >>> from pyflink.table.expressions import descriptor, map_
+            >>> from pyflink.table.expressions import descriptor, lit, map_
             >>> # Default: reads 'op' column with standard change operation 
names
             >>> result = cdc_stream.from_changelog()
             >>> # With custom op column name
@@ -1249,8 +1252,13 @@ class Table(object):
             ...          "ua", "UPDATE_AFTER",
             ...          "d", "DELETE").as_argument("op_mapping")
             ... )
+            >>> # Silently skip rows with NULL or unmapped op codes instead of 
failing
+            >>> result = cdc_stream.from_changelog(
+            ...     lit("SKIP").as_argument("error_handling")
+            ... )
 
-        :param arguments: Optional named arguments for ``op`` and 
``op_mapping``.
+        :param arguments: Optional named arguments for ``op``, ``op_mapping``, 
and
+                          ``error_handling``.
         :return: A dynamic :class:`~pyflink.table.Table` with the ``op`` 
column removed and
                  proper change operation semantics.
         """
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 5189f60dfe8..eb61371329c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1463,7 +1463,9 @@ public interface Table extends Explainable<Table>, 
Executable {
      *
      * <p>The operation code column defaults to {@code op}. By default, the 
codes {@code INSERT},
      * {@code UPDATE_BEFORE}, {@code UPDATE_AFTER}, and {@code DELETE} are 
recognized; pass {@code
-     * op_mapping} to use custom codes.
+     * op_mapping} to use custom codes. By default, the job fails at runtime 
with a {@code
+     * TableRuntimeException} when an input row's op code is {@code NULL} or 
not present in the
+     * mapping; pass {@code error_handling => 'SKIP'} to silently drop those 
rows instead.
      *
      * <p>Optional arguments can be passed using named expressions:
      *
@@ -1484,9 +1486,15 @@ public interface Table extends Explainable<Table>, 
Executable {
      *         "ua", "UPDATE_AFTER",
      *         "d", "DELETE").asArgument("op_mapping")
      * );
+     *
+     * // Silently skip rows with NULL or unmapped op codes instead of failing
+     * Table result = cdcStream.fromChangelog(
+     *     lit("SKIP").asArgument("error_handling")
+     * );
      * }</pre>
      *
-     * @param arguments optional named arguments for {@code op} and {@code 
op_mapping}
+     * @param arguments optional named arguments for {@code op}, {@code 
op_mapping}, and {@code
+     *     error_handling}
      * @return a dynamic {@link Table} with the op column removed and proper 
change operation
      *     semantics
      */
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 0ee62f1116a..bbe38ca13d5 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -828,7 +828,8 @@ public final class BuiltInFunctionDefinitions {
                             StaticArgument.scalar(
                                     "op_mapping",
                                     DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()),
-                                    true))
+                                    true),
+                            StaticArgument.scalar("error_handling", 
DataTypes.STRING(), true))
                     .changelogModeStrategy(ctx -> ChangelogMode.all())
                     .inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
                     .outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY)
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ErrorHandlingMode.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ErrorHandlingMode.java
new file mode 100644
index 00000000000..f41181a3d5f
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ErrorHandlingMode.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Shared {@code error_handling} parameter values for built-in process table 
functions. Mode names
+ * are case-sensitive — values must be spelled in upper case.
+ *
+ * <ul>
+ *   <li>{@code FAIL} — throw an exception when an error condition is 
encountered (default, strict
+ *       mode)
+ *   <li>{@code SKIP} — silently drop the offending row
+ * </ul>
+ */
+@Internal
+public enum ErrorHandlingMode {
+    FAIL,
+    SKIP;
+
+    public static final ErrorHandlingMode DEFAULT_MODE = FAIL;
+
+    /**
+     * Returns the mode whose name exactly matches {@code name}, or {@link 
Optional#empty()} if none
+     * matches. Matching is case-sensitive.
+     */
+    public static Optional<ErrorHandlingMode> fromName(final String name) {
+        return Arrays.stream(values()).filter(v -> 
v.name().equals(name)).findFirst();
+    }
+
+    public static String validNames() {
+        return 
Arrays.stream(values()).map(Enum::name).collect(Collectors.joining(", "));
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
index 4d7b25fbd60..d1d3c1c61da 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
@@ -42,11 +42,23 @@ import java.util.stream.Collectors;
 @Internal
 public final class FromChangelogTypeStrategy {
 
-    private static final String DEFAULT_OP_COLUMN_NAME = "op";
+    // Positional argument indexes for FROM_CHANGELOG. Must match the order of 
StaticArguments
+    // registered in BuiltInFunctionDefinitions#FROM_CHANGELOG; changing one 
without the other
+    // silently breaks argument resolution.
+    public static final int ARG_TABLE = 0;
+    public static final int ARG_OP = 1;
+    public static final int ARG_OP_MAPPING = 2;
+    public static final int ARG_ERROR_HANDLING = 3;
+
+    public static final String DEFAULT_OP_COLUMN_NAME = "op";
 
     private static final Set<String> VALID_ROW_KIND_NAMES =
             Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
 
+    private static final String UPDATE_BEFORE = RowKind.UPDATE_BEFORE.name();
+
+    private static final String UPDATE_AFTER = RowKind.UPDATE_AFTER.name();
+
     // 
--------------------------------------------------------------------------------------------
     // Input validation
     // 
--------------------------------------------------------------------------------------------
@@ -68,7 +80,7 @@ public final class FromChangelogTypeStrategy {
             callContext -> {
                 final TableSemantics tableSemantics =
                         callContext
-                                .getTableSemantics(0)
+                                .getTableSemantics(ARG_TABLE)
                                 .orElseThrow(
                                         () ->
                                                 new ValidationException(
@@ -80,8 +92,6 @@ public final class FromChangelogTypeStrategy {
 
                 return Optional.of(DataTypes.ROW(outputFields).notNull());
             };
-    private static final String UPDATE_BEFORE = RowKind.UPDATE_BEFORE.name();
-    private static final String UPDATE_AFTER = RowKind.UPDATE_AFTER.name();
 
     // 
--------------------------------------------------------------------------------------------
     // Helpers
@@ -112,12 +122,17 @@ public final class FromChangelogTypeStrategy {
             return error;
         }
 
+        error = validateErrorHandling(callContext, throwOnFailure);
+        if (error.isPresent()) {
+            return error;
+        }
+
         return Optional.of(callContext.getArgumentDataTypes());
     }
 
     private static Optional<List<DataType>> validateTableArg(
             final CallContext callContext, final boolean throwOnFailure) {
-        if (callContext.getTableSemantics(0).isEmpty()) {
+        if (callContext.getTableSemantics(ARG_TABLE).isEmpty()) {
             return callContext.fail(
                     throwOnFailure, "First argument must be a table for 
FROM_CHANGELOG.");
         }
@@ -126,7 +141,8 @@ public final class FromChangelogTypeStrategy {
 
     private static Optional<List<DataType>> validateOpDescriptor(
             final CallContext callContext, final boolean throwOnFailure) {
-        final Optional<ColumnList> opDescriptor = 
callContext.getArgumentValue(1, ColumnList.class);
+        final Optional<ColumnList> opDescriptor =
+                callContext.getArgumentValue(ARG_OP, ColumnList.class);
         if (opDescriptor.isPresent() && opDescriptor.get().getNames().size() 
!= 1) {
             return callContext.fail(
                     throwOnFailure,
@@ -139,7 +155,7 @@ public final class FromChangelogTypeStrategy {
     private static Optional<List<DataType>> validateOpColumn(
             final CallContext callContext, final boolean throwOnFailure) {
 
-        final TableSemantics tableSemantics = 
callContext.getTableSemantics(0).get();
+        final TableSemantics tableSemantics = 
callContext.getTableSemantics(ARG_TABLE).get();
         final String opColumnName = resolveOpColumnName(callContext);
         final List<Field> inputFields = 
DataType.getFields(tableSemantics.dataType());
         final Optional<Field> opField =
@@ -165,14 +181,14 @@ public final class FromChangelogTypeStrategy {
     @SuppressWarnings("unchecked")
     private static Optional<List<DataType>> validateOpMapping(
             final CallContext callContext, final boolean throwOnFailure) {
-        final boolean hasMappingArgProvided = !callContext.isArgumentNull(2);
-        final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2);
+        final boolean hasMappingArgProvided = 
!callContext.isArgumentNull(ARG_OP_MAPPING);
+        final boolean isMappingArgLiteral = 
callContext.isArgumentLiteral(ARG_OP_MAPPING);
         if (hasMappingArgProvided && !isMappingArgLiteral) {
             return callContext.fail(
                     throwOnFailure, "The 'op_mapping' argument must be a 
constant MAP literal.");
         }
 
-        final Optional<Map> opMapping = callContext.getArgumentValue(2, 
Map.class);
+        final Optional<Map> opMapping = 
callContext.getArgumentValue(ARG_OP_MAPPING, Map.class);
         if (opMapping.isPresent()) {
             final Map<String, String> mapping = opMapping.get();
             final Optional<List<DataType>> validationError =
@@ -231,9 +247,35 @@ public final class FromChangelogTypeStrategy {
         return Optional.empty();
     }
 
+    private static Optional<List<DataType>> validateErrorHandling(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final boolean hasErrorHandlingArgProvided = 
!callContext.isArgumentNull(ARG_ERROR_HANDLING);
+        final boolean isErrorHandlingArgLiteral = 
callContext.isArgumentLiteral(ARG_ERROR_HANDLING);
+        if (hasErrorHandlingArgProvided && !isErrorHandlingArgLiteral) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "The 'error_handling' argument must be a constant STRING 
literal.");
+        }
+
+        final Optional<String> optionalErrorHandlingArg =
+                callContext.getArgumentValue(ARG_ERROR_HANDLING, String.class);
+        if (optionalErrorHandlingArg.isPresent()) {
+            final String errorHandlingMode = optionalErrorHandlingArg.get();
+            if (ErrorHandlingMode.fromName(errorHandlingMode).isEmpty()) {
+                return callContext.fail(
+                        throwOnFailure,
+                        String.format(
+                                "Invalid value for argument 'error_handling': 
'%s'. Valid values are: %s.",
+                                errorHandlingMode, 
ErrorHandlingMode.validNames()));
+            }
+        }
+
+        return Optional.empty();
+    }
+
     private static String resolveOpColumnName(final CallContext callContext) {
         return callContext
-                .getArgumentValue(1, ColumnList.class)
+                .getArgumentValue(ARG_OP, ColumnList.class)
                 .filter(cl -> !cl.getNames().isEmpty())
                 .map(cl -> cl.getNames().get(0))
                 .orElse(DEFAULT_OP_COLUMN_NAME);
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
index d1b2a792182..322cf94a49e 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
 import org.apache.flink.table.types.inference.utils.TableSemanticsMock;
 import org.apache.flink.types.ColumnList;
 
-import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
@@ -43,15 +42,17 @@ class FromChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
 
     private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING());
 
+    private static final DataType STRING_TYPE = DataTypes.STRING();
+
     @Override
     protected Stream<TestSpec> testData() {
         return Stream.of(
                 // Valid: custom mapping with all change operations
                 TestSpec.forStrategy(
                                 "Valid with custom mapping", 
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
-                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, STRING_TYPE)
                         .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
                         .calledWithLiteralAt(
                                 2,
                                 Map.of(
@@ -59,14 +60,15 @@ class FromChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                         "ub", "UPDATE_BEFORE",
                                         "ua", "UPDATE_AFTER",
                                         "d", "DELETE"))
-                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE),
+                        .calledWithLiteralAt(3, "FAIL")
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, STRING_TYPE),
 
                 // Error: op column not found
                 TestSpec.forStrategy(
                                 "Op column not found in schema", 
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
                         .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE)
                         .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, 
ColumnList.of(List.of("nonexistent")))
+                        .calledWithLiteralAt(1, ColumnList.of("nonexistent"))
                         .expectErrorMessage("The op column 'nonexistent' does 
not exist"),
 
                 // Error: op column is not STRING
@@ -84,7 +86,7 @@ class FromChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                                 DataTypes.FIELD("id", 
DataTypes.INT()),
                                                 DataTypes.FIELD("op", 
DataTypes.INT()),
                                                 DataTypes.FIELD("name", 
DataTypes.STRING()))))
-                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
                         .expectErrorMessage("must be of STRING type"),
 
                 // Error: multi-column descriptor
@@ -93,7 +95,7 @@ class FromChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                 FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
                         .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE)
                         .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of(List.of("a", 
"b")))
+                        .calledWithLiteralAt(1, ColumnList.of("a", "b"))
                         .expectErrorMessage("must contain exactly one column 
name"),
 
                 // Error: invalid RowKind in op_mapping value
@@ -102,7 +104,7 @@ class FromChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                 FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
                         .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
                         .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
                         .calledWithLiteralAt(2, Map.of("c", "INVALID_KIND"))
                         .expectErrorMessage("Unknown change operation: 
'INVALID_KIND'"),
 
@@ -112,28 +114,30 @@ class FromChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                 FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
                         .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
                         .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
                         .calledWithLiteralAt(2, Map.of("c", "INSERT", "r", 
"INSERT"))
                         .expectErrorMessage("Duplicate change operation: 
'INSERT'"),
 
                 // Valid: INSERT-only mapping (append mode, no updates)
                 TestSpec.forStrategy(
                                 "Valid INSERT-only mapping", 
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
-                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, STRING_TYPE)
                         .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
                         .calledWithLiteralAt(2, Map.of("c, r", "INSERT"))
-                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE),
+                        .calledWithLiteralAt(3, "FAIL")
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, STRING_TYPE),
 
                 // Valid: INSERT + DELETE mapping (no updates)
                 TestSpec.forStrategy(
                                 "Valid INSERT and DELETE mapping",
                                 FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
-                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, STRING_TYPE)
                         .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
                         .calledWithLiteralAt(2, Map.of("c", "INSERT", "d", 
"DELETE"))
-                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE),
+                        .calledWithLiteralAt(3, "FAIL")
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, STRING_TYPE),
 
                 // Error: UPDATE_AFTER without UPDATE_BEFORE not supported
                 TestSpec.forStrategy(
@@ -141,13 +145,66 @@ class FromChangelogInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                                 FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
                         .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
                         .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
-                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
                         .calledWithLiteralAt(
                                 2,
                                 Map.of(
                                         "c", "INSERT",
                                         "u", "UPDATE_AFTER",
                                         "d", "DELETE"))
-                        .expectErrorMessage("must include UPDATE_BEFORE"));
+                        .expectErrorMessage("must include UPDATE_BEFORE"),
+
+                // Error: Invalid error_handling mode
+                TestSpec.forStrategy(
+                                "Invalid error_handling mode", 
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, STRING_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(
+                                2,
+                                Map.of(
+                                        "c", "INSERT",
+                                        "ub", "UPDATE_BEFORE",
+                                        "ua", "UPDATE_AFTER",
+                                        "d", "DELETE"))
+                        .calledWithLiteralAt(3, "INVALID_MODE")
+                        .expectErrorMessage(
+                                "Invalid value for argument 'error_handling': 
'INVALID_MODE'. Valid values are: FAIL, SKIP."),
+
+                // Error: error_handling is case-sensitive — lowercase is 
rejected
+                TestSpec.forStrategy(
+                                "Lowercase error_handling mode is rejected",
+                                FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, STRING_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(
+                                2,
+                                Map.of(
+                                        "c", "INSERT",
+                                        "ub", "UPDATE_BEFORE",
+                                        "ua", "UPDATE_AFTER",
+                                        "d", "DELETE"))
+                        .calledWithLiteralAt(3, "skip")
+                        .expectErrorMessage(
+                                "Invalid value for argument 'error_handling': 
'skip'. Valid values are: FAIL, SKIP."),
+
+                // Error: error_handling is case-sensitive — mixed case is 
rejected
+                TestSpec.forStrategy(
+                                "Mixed-case error_handling mode is rejected",
+                                FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE, STRING_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of("op"))
+                        .calledWithLiteralAt(
+                                2,
+                                Map.of(
+                                        "c", "INSERT",
+                                        "ub", "UPDATE_BEFORE",
+                                        "ua", "UPDATE_AFTER",
+                                        "d", "DELETE"))
+                        .calledWithLiteralAt(3, "Fail")
+                        .expectErrorMessage(
+                                "Invalid value for argument 'error_handling': 
'Fail'. Valid values are: FAIL, SKIP."));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
index 6e936fb6cec..4643a51be52 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
@@ -42,6 +42,8 @@ public class FromChangelogSemanticTests extends 
SemanticTestBase {
                 FromChangelogTestPrograms.DEFAULT_OP_MAPPING,
                 FromChangelogTestPrograms.CUSTOM_OP_MAPPING,
                 FromChangelogTestPrograms.CUSTOM_OP_NAME,
+                FromChangelogTestPrograms.SKIP_INVALID_OP_HANDLING,
+                FromChangelogTestPrograms.SKIP_NULL_OP_CODE,
                 FromChangelogTestPrograms.TABLE_API_DEFAULT,
                 FromChangelogTestPrograms.ROUND_TRIP,
                 FromChangelogTestPrograms.INVALID_OP_CODE,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
index 725b4a3d2d0..1858aabeed0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
@@ -94,6 +94,58 @@ public class FromChangelogTestPrograms {
                                     + "op_mapping => MAP['c, r', 'INSERT', 
'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])")
                     .build();
 
+    public static final TableTestProgram SKIP_INVALID_OP_HANDLING =
+            TableTestProgram.of(
+                            "from-changelog-unmapped-codes-dropped",
+                            "unmapped op codes are silently dropped when 
configured")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema(SIMPLE_CDC_SCHEMA)
+                                    .producedValues(
+                                            Row.of(1, "INSERT", "Alice"),
+                                            Row.of(2, "INSERT", "Bob"),
+                                            Row.of(1, "UNKNOWN", "Alice2"),
+                                            Row.of(2, "DELETE", "Bob"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"),
+                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream,"
+                                    + "error_handling => 'SKIP')")
+                    .build();
+
+    public static final TableTestProgram SKIP_NULL_OP_CODE =
+            TableTestProgram.of(
+                            "from-changelog-null-op-code-dropped",
+                            "NULL op codes are silently dropped when 
configured")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema(SIMPLE_CDC_SCHEMA)
+                                    .producedValues(
+                                            Row.of(1, "INSERT", "Alice"),
+                                            Row.of(2, null, "Bob"),
+                                            Row.of(3, "INSERT", "Carol"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 3, 
"Carol"))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream,"
+                                    + "error_handling => 'SKIP')")
+                    .build();
+
     /** Custom op column name via DESCRIPTOR. */
     public static final TableTestProgram CUSTOM_OP_NAME =
             TableTestProgram.of(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
index 33e163d9f41..ba2c1a5690c 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
@@ -71,7 +71,8 @@ public class FromChangelogTest extends TableTestBase {
                 "SELECT * FROM FROM_CHANGELOG("
                         + "input => TABLE cdc_stream, "
                         + "op => DESCRIPTOR(__op), "
-                        + "op_mapping => MAP['c, r', 'INSERT', 'ub', 
'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])",
+                        + "op_mapping => MAP['c, r', 'INSERT', 'ub', 
'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'], "
+                        + "error_handling => 'SKIP')",
                 CHANGELOG_MODE);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
index 2eed936fc6e..614eb5456b9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
@@ -18,19 +18,19 @@ limitations under the License.
 <Root>
   <TestCase name="testCustomOpMapping">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream, op => 
DESCRIPTOR(__op), op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 
'ua', 'UPDATE_AFTER', 'd', 'DELETE'])]]>
+      <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream, op => 
DESCRIPTOR(__op), op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 
'ua', 'UPDATE_AFTER', 'd', 'DELETE'], error_handling => 'SKIP')]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(id=[$0], name=[$1])
-+- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET 
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16 [...]
++- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET 
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16 [...]
    +- LogicalProject(id=[$0], __op=[$1], name=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
cdc_stream]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET 
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'DELE [...]
+ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET 
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'DELE [...]
 +- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]], 
fields=[id, __op, name], changelogMode=[I])
 ]]>
     </Resource>
@@ -42,14 +42,14 @@ ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'
     <Resource name="ast">
       <![CDATA[
 LogicalProject(id=[$0], name=[$1])
-+- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, 
VARCHAR(2147483647) name)])
++- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, 
VARCHAR(2147483647) name)])
    +- LogicalProject(id=[$0], op=[$1], name=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
cdc_stream]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[id,name], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)], 
changelogMode=[I,UB,UA,D])
+ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[id,name], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)], 
changelogMode=[I,UB,UA,D])
 +- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]], 
fields=[id, op, name], changelogMode=[I])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
index bb932773f73..cc1f9deb21d 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
 import org.apache.flink.table.functions.TableSemantics;
 import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.strategies.ErrorHandlingMode;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.ColumnList;
 import org.apache.flink.types.RowKind;
@@ -41,6 +42,12 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_ERROR_HANDLING;
+import static 
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_OP;
+import static 
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_OP_MAPPING;
+import static 
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_TABLE;
+import static 
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.DEFAULT_OP_COLUMN_NAME;
+
 /**
  * Runtime implementation of {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}.
  *
@@ -56,7 +63,6 @@ public class FromChangelogFunction extends 
BuiltInProcessTableFunction<RowData>
 
     private static final long serialVersionUID = 1L;
 
-    private static final String DEFAULT_OP_COLUMN_NAME = "op";
     private static final Map<String, RowKind> DEFAULT_OP_MAPPING =
             Map.of(
                     "INSERT", RowKind.INSERT,
@@ -67,6 +73,7 @@ public class FromChangelogFunction extends 
BuiltInProcessTableFunction<RowData>
     private final Map<String, RowKind> rawOpMap;
     private final int opColumnIndex;
     private final int[] outputIndices;
+    private final ErrorHandlingMode errorHandlingMode;
 
     private transient HashMap<StringData, RowKind> opMap;
     private transient ProjectedRowData projectedOutput;
@@ -77,7 +84,7 @@ public class FromChangelogFunction extends 
BuiltInProcessTableFunction<RowData>
 
         final TableSemantics tableSemantics =
                 callContext
-                        .getTableSemantics(0)
+                        .getTableSemantics(ARG_TABLE)
                         .orElseThrow(() -> new IllegalStateException("Table 
argument expected."));
 
         final RowType inputType = (RowType) 
tableSemantics.dataType().getLogicalType();
@@ -91,6 +98,12 @@ public class FromChangelogFunction extends 
BuiltInProcessTableFunction<RowData>
                         .toArray();
 
         this.rawOpMap = buildOpMap(callContext);
+
+        this.errorHandlingMode =
+                callContext
+                        .getArgumentValue(ARG_ERROR_HANDLING, String.class)
+                        .flatMap(ErrorHandlingMode::fromName)
+                        .orElse(ErrorHandlingMode.DEFAULT_MODE);
     }
 
     @Override
@@ -103,7 +116,7 @@ public class FromChangelogFunction extends 
BuiltInProcessTableFunction<RowData>
 
     private static String resolveOpColumnName(final CallContext callContext) {
         return callContext
-                .getArgumentValue(1, ColumnList.class)
+                .getArgumentValue(ARG_OP, ColumnList.class)
                 .map(cl -> cl.getNames().get(0))
                 .orElse(DEFAULT_OP_COLUMN_NAME);
     }
@@ -114,7 +127,7 @@ public class FromChangelogFunction extends 
BuiltInProcessTableFunction<RowData>
      */
     private static Map<String, RowKind> buildOpMap(CallContext callContext) {
         return callContext
-                .getArgumentValue(2, Map.class)
+                .getArgumentValue(ARG_OP_MAPPING, Map.class)
                 .map(FromChangelogFunction::parseOpMapping)
                 .orElse(DEFAULT_OP_MAPPING);
     }
@@ -134,22 +147,35 @@ public class FromChangelogFunction extends 
BuiltInProcessTableFunction<RowData>
             final Context ctx,
             final RowData input,
             @Nullable final ColumnList op,
-            @Nullable final MapData opMapping) {
+            @Nullable final MapData opMapping,
+            @Nullable final StringData errorHandling) {
         if (input.isNullAt(opColumnIndex)) {
-            throw new TableRuntimeException(
+            handleInvalidOp(
                     "Received NULL op code. Every changelog row must carry an 
operation code.");
+            return;
         }
         final StringData opCode = input.getString(opColumnIndex);
         final RowKind rowKind = opMap.get(opCode);
         if (rowKind == null) {
-            throw new TableRuntimeException(
+            handleInvalidOp(
                     String.format(
                             "Received invalid op code '%s'. Defined op codes 
are: %s.",
                             opCode, opMap.keySet()));
+            return;
         }
 
         projectedOutput.replaceRow(input);
         projectedOutput.setRowKind(rowKind);
         collect(projectedOutput);
     }
+
+    private void handleInvalidOp(final String failureMessage) {
+        switch (errorHandlingMode) {
+            case FAIL:
+                throw new TableRuntimeException(failureMessage);
+            case SKIP:
+                // silently drop the row
+                break;
+        }
+    }
 }

Reply via email to