raminqaf commented on code in PR #27901: URL: https://github.com/apache/flink/pull/27901#discussion_r3058546967
########## docs/content/docs/sql/reference/queries/changelog.md: ########## @@ -30,9 +30,108 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan | Function | Description | |:---------|:------------| +| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with operation codes into a dynamic table | | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only table with explicit operation codes | -<!-- Placeholder for future FROM_CHANGELOG function --> +## FROM_CHANGELOG + +The `FROM_CHANGELOG` PTF converts an append-only table with an explicit operation code column into a dynamic table (i.e. an updating table). Each input row is expected to have a string column that indicates the change operation. The op column is removed from the output and the row is emitted with the corresponding `RowKind`. + +This is useful when consuming Change Data Capture (CDC) streams from systems like Debezium, Maxwell, or Canal, where events arrive as flat append-only records with an explicit operation field. + +### Syntax + +```sql +SELECT * FROM FROM_CHANGELOG( + input => TABLE source_table PARTITION BY key_col, + [op => DESCRIPTOR(op_column_name),] + [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']] +) +``` + +### Parameters + +| Parameter | Required | Description | +|:-------------|:---------|:------------| +| `input` | Yes | The input table. Must be append-only and include `PARTITION BY` for parallel execution. | +| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. | +| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined operation codes to change operation names. Keys are user codes (e.g., `'c'`, `'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once across all entries. | + +#### Default op_mapping + +When `op_mapping` is omitted, the following standard names are used: + +| Input code | Change operation | +|:-----------------|:------------------| +| `'INSERT'` | INSERT | +| `'UPDATE_AFTER'` | UPDATE_AFTER | +| `'DELETE'` | DELETE | + +### Output Schema + +The output columns are ordered as: + +``` +[partition_key_columns, remaining_columns_without_op] +``` + +The op column is removed from the output. Output rows carry the appropriate `RowKind` (INSERT, UPDATE_AFTER, or DELETE). + +### Examples + +#### Basic usage with standard op names + +```sql +-- Input (append-only): +-- +I[id:1, op:'INSERT', name:'Alice'] +-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2'] +-- +I[id:2, op:'DELETE', name:'Bob'] + +SELECT * FROM FROM_CHANGELOG( + input => TABLE cdc_stream PARTITION BY id +) + +-- Output (upsert stream): Review Comment: addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
