raminqaf opened a new pull request, #27901:
URL: https://github.com/apache/flink/pull/27901
## What is the purpose of the change
Implement the FROM_CHANGELOG built-in process table function as specified in
FLIP-564, section 4.1.3.1 (append-only stream to upsert stream, flat mode).
FROM_CHANGELOG converts an append-only table with an explicit operation code
column (e.g., Debezium's `'c'`, `'u'`, `'d'`) into a dynamic table backed by a
Flink upsert stream (`{+I, +U, -D}`). This is the reverse of TO_CHANGELOG. The
implementation is stateless — each input record maps directly to one output
record with the appropriate RowKind.
```sql
SELECT * FROM FROM_CHANGELOG(
input => TABLE cdc_stream PARTITION BY id,
op => DESCRIPTOR(__op),
op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']
)
```
## Brief change log
- Add optional `ChangelogFunction` delegate to `BuiltInFunctionDefinition`
so built-in PTFs can declare their output changelog mode
- Update `FlinkChangelogModeInferenceProgram` to handle the new delegate
- Add `FromChangelogTypeStrategy` with input validation (op column
existence, STRING type, op_mapping value validation) and output type inference
(removes op column from output)
- Add `FROM_CHANGELOG` built-in function definition with
`ChangelogMode.upsert(false)` output
- Add `FromChangelogFunction` runtime implementation using
`ProjectedRowData` for zero-copy projection
- Add `fromChangelog()` convenience method to `PartitionedTable` Table API
- Add documentation for FROM_CHANGELOG in changelog.md
## Verifying this change
This change added tests and can be verified as follows:
- Added 7 unit tests for `FromChangelogTypeStrategy` input validation
(valid mapping, op column not found, wrong type, invalid descriptor, invalid
RowKind, UPDATE_BEFORE rejected, duplicate RowKind)
- Added 11 semantic tests covering: default op_mapping, Debezium-style
mapping, custom op column name, unmapped codes dropped, Table API convenience
method, and 6 error validation tests
- Added 2 plan tests verifying changelog mode propagation
(`changelogMode=[I,UA,D]` for PTF output, `changelogMode=[I]` for source input)
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: yes (`PartitionedTable.fromChangelog()`)
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? yes
- If yes, how is the feature documented? docs / JavaDocs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]