[
https://issues.apache.org/jira/browse/FLINK-39430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timo Walther closed FLINK-39430.
--------------------------------
Fix Version/s: 2.4.0
Resolution: Fixed
Fixed in master: 9d92095e0775cae6e18fcb562e29a7217d6b5b12
> Add `invalid_op_handling` parameter to FROM_CHANGELOG PTF
> ----------------------------------------------------------
>
> Key: FLINK-39430
> URL: https://issues.apache.org/jira/browse/FLINK-39430
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / API
> Reporter: Ramin Gharib
> Assignee: Ramin Gharib
> Priority: Major
> Fix For: 2.4.0
>
>
> Add an `invalid_op_handling` parameter to the `FROM_CHANGELOG` built-in
> process table function as specified in FLIP-564.
> Currently, when FROM_CHANGELOG encounters an operation code in the input that
> is not present in the `op_mapping`, the row is silently dropped. The
> `invalid_op_handling` parameter would allow users to control this behavior
> with three modes:
> * `FAIL` — throw an exception when an unmapped operation code is encountered
> (strict mode)
> * `LOG` — log a warning and drop the row
> * `SKIP` — silently drop the row (current default behavior)
> h3. Example
> {code:java}
> SELECT * FROM FROM_CHANGELOG(
> input => TABLE cdc_stream PARTITION_BY id
> op => DESCRIPTOR(__op),
> op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE'],
> invalid_op_handling => 'FAIL'
> ){code}
>
> h3. Scope
> Add `invalid_op_handling` as a new optional scalar argument
> (`StaticArgument.scalar`) of type `STRING` to the FROM_CHANGELOG definition
> in `BuiltInFunctionDefinitions`
> Add input validation in `FromChangelogTypeStrategy` to check that the value
> is one of `FAIL`, `LOG`, `SKIP`
> Implement the three modes in `FromChangelogFunction.eval()`: * `FAIL`: throw
> a runtime exception with details about the unmapped code
> * `LOG`: log a warning via `FunctionContext` and skip the row
> * `SKIP`: current behavior (return without collecting)
> * Update documentation in `changelog.md`
> * Add semantic tests for each mode
--
This message was sent by Atlassian Jira
(v8.20.10#820010)