raminqaf opened a new pull request, #27994:
URL: https://github.com/apache/flink/pull/27994
## What is the purpose of the change
Add an optional `invalid_op_handling` parameter to the `FROM_CHANGELOG`
built-in process table function. Until now, FROM_CHANGELOG fails the job
whenever a row carries an operation code that is `NULL` or absent from the
active `op_mapping`. With this parameter, users can opt into silently dropping
such rows instead. This is useful when a CDC source occasionally emits unmapped
or sentinel events that should not tear the pipeline down.
## Brief change log
- Added an optional `invalid_op_handling` STRING parameter to
`BuiltInFunctionDefinitions.FROM_CHANGELOG` with two case-sensitive values:
`FAIL` (default) and `SKIP`.
- `FAIL` preserves prior behavior — throw a `TableRuntimeException` on
NULL or unmapped op codes.
- `SKIP` silently drops the offending row and continues processing.
- Both NULL op codes and unmapped op codes now go through the same
`invalid_op_handling` switch. Previously the NULL path was hard-coded to throw,
so `SKIP` users would still see the job fail on a NULL op.
- Validation is strict: the value must be exactly `FAIL` or `SKIP`
(matching the runtime `Enum.valueOf` behavior), and validation errors surface
the user's literal input verbatim instead of an upper-cased copy.
- Updated SQL reference docs
(`docs/content/docs/sql/reference/queries/changelog.md`), the public
`Table.fromChangelog` Javadoc, the Python `from_changelog` docstring, and the
`InvalidOpHandlingMode` enum Javadoc to describe the new parameter and its
case-sensitive contract.
## Verifying this change
Please make sure both new and modified tests in this PR follow [the
conventions for tests defined in our code quality
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
This change added tests and can be verified as follows:
- Added input-strategy unit tests in `FromChangelogInputTypeStrategyTest`
covering the case-sensitivity contract (`Lowercase invalid_op_handling
mode is rejected`, `Mixed-case invalid_op_handling mode is rejected`) and
the existing `INVALID_MODE` rejection (now asserting `Valid values are:
FAIL, SKIP.`).
- Added a semantic test program `SKIP_NULL_OP_CODE` in
`FromChangelogTestPrograms` that interleaves a NULL op code between two valid
`INSERT`
rows and asserts only the two valid rows reach the sink under
`invalid_op_handling => 'SKIP'`. Registered in `FromChangelogSemanticTests`.
- Existing semantic tests continue to cover FAIL-on-NULL (`NULL_OP_CODE`),
FAIL-on-unmapped (`INVALID_OP_CODE`), and SKIP-on-unmapped
(`SKIP_INVALID_OP_HANDLING`).
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: yes — adds an optional named argument to the public
`Table.fromChangelog` API and its Python equivalent.
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes —
`FromChangelogFunction.eval()` runs per record. The refactor consolidates
two duplicated `switch` blocks into a single helper without adding
per-record allocations; the happy path now executes one fewer branch.
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? docs + JavaDocs (Java
`Table.fromChangelog`, `InvalidOpHandlingMode`) + Python docstring
(`Table.from_changelog`)
---
##### Was generative AI tooling used to co-author this PR?
- [X] Yes (Claude Code, Opus 4.7)
<!--
Generated-by: Claude Code (Opus 4.7)
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]