snuyanzin commented on code in PR #27994:
URL: https://github.com/apache/flink/pull/27994#discussion_r3122905439


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java:
##########
@@ -134,22 +142,32 @@ public void eval(
             final Context ctx,
             final RowData input,
             @Nullable final ColumnList op,
-            @Nullable final MapData opMapping) {
+            @Nullable final MapData opMapping,
+            @Nullable final StringData invalidOpHandling) {
         if (input.isNullAt(opColumnIndex)) {
-            throw new TableRuntimeException(
+            handleInvalidOp(
                     "Received NULL op code. Every changelog row must carry an 
operation code.");
+            return;
         }
         final StringData opCode = input.getString(opColumnIndex);
         final RowKind rowKind = opMap.get(opCode);
         if (rowKind == null) {
-            throw new TableRuntimeException(
+            handleInvalidOp(
                     String.format(
                             "Received invalid op code '%s'. Defined op codes 
are: %s.",
                             opCode, opMap.keySet()));
+            return;
         }
 
         projectedOutput.replaceRow(input);
         projectedOutput.setRowKind(rowKind);
         collect(projectedOutput);
     }
+
+    private void handleInvalidOp(final String failureMessage) {
+        switch (invalidOpHandlingMode) {
+            case FAIL -> throw new TableRuntimeException(failureMessage);
+            case SKIP -> {} // silently drop the row
+        }

Review Comment:
   only java 11 syntax is allowed in OSS Flink



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to