gustavodemorais commented on code in PR #27994:
URL: https://github.com/apache/flink/pull/27994#discussion_r3123392576
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -61,8 +62,9 @@ SELECT * FROM FROM_CHANGELOG(
| Parameter | Required | Description
|
|:-------------|:---------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `input` | Yes | The input table. Must be append-only.
|
-| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. The column must exist in the input
table and be of type STRING. The column may be declared nullable, but a NULL
value at runtime fails the job with a `TableRuntimeException` — every changelog
row must carry an operation code.
|
-| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`,
`'u'`, `'d'`), values are Flink change operation names (`INSERT`,
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated
codes to map multiple codes to the same operation (e.g., `'c, r'`). Receiving
an op code not present in the mapping fails the job at runtime with a
`TableRuntimeException`. Each change operation may appear at most once across
all entries. |
+| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. The column must exist in the input
table and be of type STRING. The column may be declared nullable; runtime
behavior for `NULL` op codes is governed by `invalid_op_handling`.
|
Review Comment:
```suggestion
| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. The column must exist in the input
table and be of type STRING.
|
```
We already have this info in `invalid_op_handling`. That should be enough
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -47,6 +48,40 @@ public final class FromChangelogTypeStrategy {
private static final Set<String> VALID_ROW_KIND_NAMES =
Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
+ private static final String UPDATE_BEFORE = RowKind.UPDATE_BEFORE.name();
+
+ private static final String UPDATE_AFTER = RowKind.UPDATE_AFTER.name();
+
+ /**
+ * Controls behavior when {@code FROM_CHANGELOG} encounters an operation
code in the input that
+ * is not present in the {@code op_mapping}. Mode names are case-sensitive
— the value must be
+ * spelled in upper case.
+ *
+ * <ul>
+ * <li>{@code FAIL} — throw an exception when an unmapped operation code
is encountered
+ * (default, strict mode)
+ * <li>{@code SKIP} — silently drop the row
+ * </ul>
+ */
+ public enum InvalidOpHandlingMode {
Review Comment:
We could move it to its own top-level class in the same package - it is a
domain concept used by both validation and runtime, wdyt?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java:
##########
@@ -91,6 +93,12 @@ public FromChangelogFunction(final SpecializedContext
context) {
.toArray();
this.rawOpMap = buildOpMap(callContext);
+
+ this.invalidOpHandlingMode =
+ callContext
+ .getArgumentValue(3, String.class)
Review Comment:
Does it make sense for us to store these indexes in a nicer way so we can
just reuse them and not write them everywhere?
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -125,6 +127,21 @@ SELECT * FROM FROM_CHANGELOG(
-- The operation column named 'operation' is used instead of 'op'
```
+#### Invalid operation code handling
Review Comment:
Nit suggestion
```suggestion
#### Invalid operation code handling
We currently support two configurations for invalid_op_handling. The job can
either fail upon an invalid unknown code or skip and continue processing.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]