raminqaf opened a new pull request, #27901:
URL: https://github.com/apache/flink/pull/27901

   ## What is the purpose of the change
   
   Implement the FROM_CHANGELOG built-in process table function as specified in 
FLIP-564, section 4.1.3.1 (append-only stream to upsert stream, flat mode).
   
   FROM_CHANGELOG converts an append-only table with an explicit operation code 
column (e.g., Debezium's `'c'`, `'u'`, `'d'`) into a dynamic table backed by a 
Flink upsert stream (`{+I, +U, -D}`). This is the reverse of TO_CHANGELOG. The 
implementation is stateless — each input record maps directly to one output 
record with the appropriate RowKind.
   
   ```sql
   SELECT * FROM FROM_CHANGELOG(
       input => TABLE cdc_stream PARTITION BY id,
       op => DESCRIPTOR(__op),
       op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']
   )
   ```
   
   ## Brief change log
   
     - Add optional `ChangelogFunction` delegate to `BuiltInFunctionDefinition` 
so built-in PTFs can declare their output changelog mode
     - Update `FlinkChangelogModeInferenceProgram` to handle the new delegate
     - Add `FromChangelogTypeStrategy` with input validation (op column 
existence, STRING type, op_mapping value validation) and output type inference 
(removes op column from output)
     - Add `FROM_CHANGELOG` built-in function definition with 
`ChangelogMode.upsert(false)` output
     - Add `FromChangelogFunction` runtime implementation using 
`ProjectedRowData` for zero-copy projection
     - Add `fromChangelog()` convenience method to `PartitionedTable` Table API
     - Add documentation for FROM_CHANGELOG in changelog.md
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - Added 7 unit tests for `FromChangelogTypeStrategy` input validation 
(valid mapping, op column not found, wrong type, invalid descriptor, invalid 
RowKind, UPDATE_BEFORE rejected, duplicate RowKind)
     - Added 11 semantic tests covering: default op_mapping, Debezium-style 
mapping, custom op column name, unmapped codes dropped, Table API convenience 
method, and 6 error validation tests
     - Added 2 plan tests verifying changelog mode propagation 
(`changelogMode=[I,UA,D]` for PTF output, `changelogMode=[I]` for source input)
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes (`PartitionedTable.fromChangelog()`)
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? docs / JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to