raminqaf opened a new pull request, #28235:
URL: https://github.com/apache/flink/pull/28235

   ## What is the purpose of the change
   
   Adds the built-in `TO_CHANGELOG` process table function (PTF) introduced in 
FLIP-564. The function converts a table (insert-only, retract, or upsert) into 
a changelog view that surfaces the row-kind (`INSERT`, `UPDATE_BEFORE`, 
`UPDATE_AFTER`, `DELETE`) as an explicit `op` column.
   
   A new `produces_full_deletes` boolean parameter controls how DELETE rows are 
emitted. When `false` (default), the function emits partial DELETEs: the 
upsert-key columns of the input table are preserved and the remaining columns 
are nulled. When `true`, full DELETE rows are passed through unchanged. Partial 
deletes match the contract of most upsert sinks and avoid forcing users to 
retain full pre-image state.
   
   ## Brief change log
   
   - Added `BuiltInFunctionDefinitions.TO_CHANGELOG` and the corresponding 
`ToChangelogFunction` runtime in `flink-table-api-java`.
   - Added `StreamPhysicalToChangelog` / `StreamExecToChangelog` exec nodes 
wired through `FlinkRelMdChangelogMode` and `FlinkRelMdUpsertKeys` so the 
planner picks the correct downstream changelog mode.
   - Computed the input upsert key at planning time via 
`FlinkRelMetadataQuery.getUpsertKeys(input)` collapsed to one candidate via 
`UpsertKeyUtil.smallestKey(...)`, and threaded it through the codegen path so 
the runtime knows which columns to keep on DELETE.
   - Implemented row-semantics and set-semantics variants; `PARTITION BY` over 
a row-semantics input forwards the partitioning column as the upsert key.
   - Added compiled-plan JSON support for `StreamExecProcessTableFunction` (new 
`inputUpsertKeys` field with a per-input empty-array default for back-compat 
with older plans).
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - New unit tests for the planner pieces in `ToChangelogTest`.
   - New end-to-end semantic tests in `ToChangelogSemanticTests` covering: 
insert-only inputs, retract inputs with a derivable upsert key (partial 
DELETE), retract inputs with `produces_full_deletes=true` (full DELETE 
pass-through), `PARTITION BY` over a non-leading column, single-column upsert 
key from the input PK, and row semantics where the upsert key comes from the PK 
constraint rather than `PARTITION BY`.
   - New restore test in `ToChangelogRestoreTest` validates the compiled-plan + 
savepoint round trip for a retract source.
   - Validation tests for misuse: missing `input` argument, wrong argument 
type, conflicting `PARTITION BY` and upsert key.
   - Run with: `mvn -pl flink-table/flink-table-planner test 
-Dtest='ToChangelogTest,ToChangelogSemanticTests,ToChangelogRestoreTest'`. 
Result: 50 pass, 1 skip (existing savepoint stub).
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes (new `BuiltInFunctionDefinitions.TO_CHANGELOG`; new 
compiled-plan JSON field on `StreamExecProcessTableFunction` with a back-compat 
default)
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): yes (the 
`ToChangelogFunction` operator runs per record, but it only writes the row kind 
plus a projected key on DELETE; no extra state)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? docs + JavaDocs (built-in 
function docs page entry plus `BuiltInFunctionDefinitions.TO_CHANGELOG` JavaDoc)
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [X] Yes
   Generated-by: Claude Opus 4.7
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to