raminqaf commented on code in PR #27901:
URL: https://github.com/apache/flink/pull/27901#discussion_r3079981993
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs)
for working with chan
| Function | Description |
|:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with
operation codes into a dynamic table |
| [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only
table with explicit operation codes |
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit
operation code column into a dynamic table (i.e. an updating table). Each input
row is expected to have a string column that indicates the change operation.
The op column is removed from the output and the row is emitted with the
corresponding change operation.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems
like Debezium, Maxwell, or Canal, where events arrive as flat append-only
records with an explicit operation field. It's also useful to be used in
combination with the TO_CHANGELOG function, when the user wants to turn the
append-only table back into an updating table after doing some specific
transformation to the events.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE source_table [PARTITION BY key_col],
+ [op => DESCRIPTOR(op_column_name),]
+ [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
+)
+```
+
+`PARTITION BY` is optional when the mapping includes `UPDATE_BEFORE` (retract
mode). It is required when the mapping produces upsert mode (no
`UPDATE_BEFORE`), because downstream operators need a key for updates and
deletes. When provided, records are distributed by the partition key for
parallel processing.
+
+### Parameters
+
+| Parameter | Required | Description |
+|:-------------|:---------|:------------|
+| `input` | Yes | The input table. Must be append-only. `PARTITION
BY` is optional for retract mode (with `UPDATE_BEFORE`) and required for upsert
mode (without `UPDATE_BEFORE`). |
+| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. The column must exist in the input
table and be of type STRING. |
+| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined
operation codes to change operation names. Keys are user codes (e.g., `'c'`,
`'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`,
`UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map
multiple codes to the same operation (e.g., `'c, r'`). When provided, only
mapped codes are forwarded - unmapped codes are dropped. Each change operation
may appear at most once across all entries. |
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, the following standard names are used:
+
+| Input code | Change operation |
+|:-------------------|:------------------|
+| `'INSERT'` | INSERT |
+| `'UPDATE_BEFORE'` | UPDATE_BEFORE |
+| `'UPDATE_AFTER'` | UPDATE_AFTER |
+| `'DELETE'` | DELETE |
+
+### Output Schema
+
+The output columns are ordered as:
+
+```
+[all_input_columns_without_op]
+```
+
+The op column is removed from the output. Output rows carry the appropriate
change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE).
+
+### Examples
+
+#### Basic usage with standard op names
+
+```sql
+-- Input (append-only):
+-- +I[id:1, op:'INSERT', name:'Alice']
+-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
+-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
+-- +I[id:2, op:'DELETE', name:'Bob']
+
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream
+)
+
+-- Output (updating table):
+-- +I[id:1, name:'Alice']
+-- -U[id:1, name:'Alice']
+-- +U[id:1, name:'Alice2']
+-- -D[id:2, name:'Bob']
+
+-- Table state after all events:
+-- | id | name |
+-- |----|--------|
+-- | 1 | Alice2 |
+```
+
+#### Debezium-style CDC codes
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream PARTITION BY id,
+ op => DESCRIPTOR(__op),
+ op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']
+)
+-- 'c' (create) and 'r' (read/snapshot) both produce INSERT
+-- 'u' produces UPDATE_AFTER
+-- 'd' produces DELETE
+```
+
+#### Custom operation column name
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream,
+ op => DESCRIPTOR(operation)
+)
+-- The operation column named 'operation' is used instead of 'op'
+```
+
+#### Table API
+
+```java
+// Default (retract mode): reads 'op' column with standard change operation
names
+Table result = cdcStream.fromChangelog();
+
+// Upsert mode requires PARTITION BY — use the generic process() method
+Table result = cdcStream.partitionBy($("id")).process("FROM_CHANGELOG",
Review Comment:
Removed `partitioneBy`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]