[ 
https://issues.apache.org/jira/browse/FLINK-39430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ramin Gharib updated FLINK-39430:
---------------------------------
    Description: 
Add an `invalid_op_handling` parameter to the `FROM_CHANGELOG` built-in process 
table function as specified in FLIP-564.

Currently, when FROM_CHANGELOG encounters an operation code in the input that 
is not present in the `op_mapping`, the row is silently dropped. The 
`invalid_op_handling` parameter would allow users to control this behavior with 
three modes:
 * `FAIL` — throw an exception when an unmapped operation code is encountered 
(strict mode)
 * `LOG` — log a warning and drop the row
 * `SKIP` — silently drop the row (current default behavior)

h3. Example
{code:java}
SELECT * FROM FROM_CHANGELOG(
  input => TABLE cdc_stream PARTITION_BY id
  op => DESCRIPTOR(__op),
  op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE'],
  invalid_op_handling => 'FAIL'
){code}
 
h3. Scope

Add `invalid_op_handling` as a new optional scalar argument 
(`StaticArgument.scalar`) of type `STRING` to the FROM_CHANGELOG definition in 
`BuiltInFunctionDefinitions`

Add input validation in `FromChangelogTypeStrategy` to check that the value is 
one of `FAIL`, `LOG`, `SKIP`
Implement the three modes in `FromChangelogFunction.eval()`: * `FAIL`: throw a 
runtime exception with details about the unmapped code
 * `LOG`: log a warning via `FunctionContext` and skip the row
 * `SKIP`: current behavior (return without collecting)
 * Update documentation in `changelog.md`
 * Add semantic tests for each mode

  was:
Add an `invalid_op_handling` parameter to the `FROM_CHANGELOG` built-in process 
table function as specified in FLIP-564.

Currently, when FROM_CHANGELOG encounters an operation code in the input that 
is not present in the `op_mapping`, the row is silently dropped. The 
`invalid_op_handling` parameter would allow users to control this behavior with 
three modes:
 * `FAIL` — throw an exception when an unmapped operation code is encountered 
(strict mode)
 * `LOG` — log a warning and drop the row
 * `SKIP` — silently drop the row (current default behavior)

h3. Example
{code:java}
SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream,
op => DESCRIPTOR(__op),
op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE'],
invalid_op_handling => 'FAIL'
){code}
 
h3. Scope

Add `invalid_op_handling` as a new optional scalar argument 
(`StaticArgument.scalar`) of type `STRING` to the FROM_CHANGELOG definition in 
`BuiltInFunctionDefinitions`

Add input validation in `FromChangelogTypeStrategy` to check that the value is 
one of `FAIL`, `LOG`, `SKIP`
Implement the three modes in `FromChangelogFunction.eval()`: * `FAIL`: throw a 
runtime exception with details about the unmapped code
 * `LOG`: log a warning via `FunctionContext` and skip the row
 * `SKIP`: current behavior (return without collecting)
 * Update documentation in `changelog.md`
 * Add semantic tests for each mode


>  Add `invalid_op_handling` parameter to FROM_CHANGELOG PTF
> ----------------------------------------------------------
>
>                 Key: FLINK-39430
>                 URL: https://issues.apache.org/jira/browse/FLINK-39430
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / API
>            Reporter: Ramin Gharib
>            Priority: Major
>
> Add an `invalid_op_handling` parameter to the `FROM_CHANGELOG` built-in 
> process table function as specified in FLIP-564.
> Currently, when FROM_CHANGELOG encounters an operation code in the input that 
> is not present in the `op_mapping`, the row is silently dropped. The 
> `invalid_op_handling` parameter would allow users to control this behavior 
> with three modes:
>  * `FAIL` — throw an exception when an unmapped operation code is encountered 
> (strict mode)
>  * `LOG` — log a warning and drop the row
>  * `SKIP` — silently drop the row (current default behavior)
> h3. Example
> {code:java}
> SELECT * FROM FROM_CHANGELOG(
>   input => TABLE cdc_stream PARTITION_BY id
>   op => DESCRIPTOR(__op),
>   op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE'],
>   invalid_op_handling => 'FAIL'
> ){code}
>  
> h3. Scope
> Add `invalid_op_handling` as a new optional scalar argument 
> (`StaticArgument.scalar`) of type `STRING` to the FROM_CHANGELOG definition 
> in `BuiltInFunctionDefinitions`
> Add input validation in `FromChangelogTypeStrategy` to check that the value 
> is one of `FAIL`, `LOG`, `SKIP`
> Implement the three modes in `FromChangelogFunction.eval()`: * `FAIL`: throw 
> a runtime exception with details about the unmapped code
>  * `LOG`: log a warning via `FunctionContext` and skip the row
>  * `SKIP`: current behavior (return without collecting)
>  * Update documentation in `changelog.md`
>  * Add semantic tests for each mode



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to