gustavodemorais commented on code in PR #27994:
URL: https://github.com/apache/flink/pull/27994#discussion_r3123392576


##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -61,8 +62,9 @@ SELECT * FROM FROM_CHANGELOG(
 | Parameter    | Required | Description                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                             |
 
|:-------------|:---------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | `input`      | Yes      | The input table. Must be append-only.              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                             |
-| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING. The column may be declared nullable, but a NULL 
value at runtime fails the job with a `TableRuntimeException` — every changelog 
row must carry an operation code.                                               
                                                                                
                                      |
-| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined codes 
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, 
`'u'`, `'d'`), values are Flink change operation names (`INSERT`, 
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated 
codes to map multiple codes to the same operation (e.g., `'c, r'`). Receiving 
an op code not present in the mapping fails the job at runtime with a 
`TableRuntimeException`. Each change operation may appear at most once across 
all entries. |
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING. The column may be declared nullable; runtime 
behavior for `NULL` op codes is governed by `invalid_op_handling`.              
                                                                                
                                                                                
                                         |

Review Comment:
   ```suggestion
   | `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING.                                                    
                                                                                
                                                                                
   |
   ```
   
   We already have this info in `invalid_op_handling`. That should be enough



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -47,6 +48,40 @@ public final class FromChangelogTypeStrategy {
     private static final Set<String> VALID_ROW_KIND_NAMES =
             Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
 
+    private static final String UPDATE_BEFORE = RowKind.UPDATE_BEFORE.name();
+
+    private static final String UPDATE_AFTER = RowKind.UPDATE_AFTER.name();
+
+    /**
+     * Controls behavior when {@code FROM_CHANGELOG} encounters an operation 
code in the input that
+     * is not present in the {@code op_mapping}. Mode names are case-sensitive 
— the value must be
+     * spelled in upper case.
+     *
+     * <ul>
+     *   <li>{@code FAIL} — throw an exception when an unmapped operation code 
is encountered
+     *       (default, strict mode)
+     *   <li>{@code SKIP} — silently drop the row
+     * </ul>
+     */
+    public enum InvalidOpHandlingMode {

Review Comment:
   We could move it to  its own top-level class in the same package - it is a 
domain concept used by both validation and runtime, wdyt?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java:
##########
@@ -91,6 +93,12 @@ public FromChangelogFunction(final SpecializedContext 
context) {
                         .toArray();
 
         this.rawOpMap = buildOpMap(callContext);
+
+        this.invalidOpHandlingMode =
+                callContext
+                        .getArgumentValue(3, String.class)

Review Comment:
   Does it make sense for us to store these indexes in a nicer way so we can 
just reuse them and not write them everywhere?



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -125,6 +127,21 @@ SELECT * FROM FROM_CHANGELOG(
 -- The operation column named 'operation' is used instead of 'op'
 ```
 
+#### Invalid operation code handling

Review Comment:
   Nit suggestion
   
   ```suggestion
   #### Invalid operation code handling
   
   We currently support two configurations for invalid_op_handling. The job can 
either fail upon an invalid unknown code or skip and continue processing.
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to