twalthr commented on code in PR #27901:
URL: https://github.com/apache/flink/pull/27901#discussion_r3077878094


##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a dynamic table |
 | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
 
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a dynamic table (i.e. an updating table). Each input 
row is expected to have a string column that indicates the change operation. 
The op column is removed from the output and the row is emitted with the 
corresponding change operation.

Review Comment:
   also an append-only table is "dynamic"
   ```suggestion
   The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a (potentially updating) dynamic table. Each input 
row is expected to have a string column that indicates the change operation. 
The operation column is interpreted by the engine and removed from the output.
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a dynamic table |
 | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
 
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a dynamic table (i.e. an updating table). Each input 
row is expected to have a string column that indicates the change operation. 
The op column is removed from the output and the row is emitted with the 
corresponding change operation.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems 
like Debezium, Maxwell, or Canal, where events arrive as flat append-only 
records with an explicit operation field. It's also useful to be used in 
combination with the TO_CHANGELOG function, when the user wants to turn the 
append-only table back into an updating table after doing some specific 
transformation to the events.

Review Comment:
   ```suggestion
   This is useful when consuming Change Data Capture (CDC) streams from systems 
like Debezium where events arrive as flat append-only records with an explicit 
operation field. It's also useful to be used in combination with the 
TO_CHANGELOG function, when converting the append-only table back into an 
updating table after doing some specific transformation to the events.
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a dynamic table |
 | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
 
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a dynamic table (i.e. an updating table). Each input 
row is expected to have a string column that indicates the change operation. 
The op column is removed from the output and the row is emitted with the 
corresponding change operation.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems 
like Debezium, Maxwell, or Canal, where events arrive as flat append-only 
records with an explicit operation field. It's also useful to be used in 
combination with the TO_CHANGELOG function, when the user wants to turn the 
append-only table back into an updating table after doing some specific 
transformation to the events.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE source_table [PARTITION BY key_col],
+  [op => DESCRIPTOR(op_column_name),]
+  [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
+)
+```
+
+`PARTITION BY` is optional when the mapping includes `UPDATE_BEFORE` (retract 
mode). It is required when the mapping produces upsert mode (no 
`UPDATE_BEFORE`), because downstream operators need a key for updates and 
deletes. When provided, records are distributed by the partition key for 
parallel processing.
+
+### Parameters
+
+| Parameter    | Required | Description |
+|:-------------|:---------|:------------|
+| `input`      | Yes      | The input table. Must be append-only. `PARTITION 
BY` is optional for retract mode (with `UPDATE_BEFORE`) and required for upsert 
mode (without `UPDATE_BEFORE`). |
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING. |
+| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined 
operation codes to change operation names. Keys are user codes (e.g., `'c'`, 
`'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, 
`UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map 
multiple codes to the same operation (e.g., `'c, r'`). When provided, only 
mapped codes are forwarded - unmapped codes are dropped. Each change operation 
may appear at most once across all entries. |
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, the following standard names are used:
+
+| Input code         | Change operation  |
+|:-------------------|:------------------|
+| `'INSERT'`         | INSERT            |
+| `'UPDATE_BEFORE'`  | UPDATE_BEFORE     |
+| `'UPDATE_AFTER'`   | UPDATE_AFTER      |
+| `'DELETE'`         | DELETE            |
+
+### Output Schema
+
+The output columns are ordered as:
+
+```
+[all_input_columns_without_op]
+```
+
+The op column is removed from the output. Output rows carry the appropriate 
change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE).

Review Comment:
   ```suggestion
   Flink's SQL engine interprets the appropriate change operation (INSERT, 
UPDATE_BEFORE, UPDATE_AFTER, or DELETE). The op column is removed from the 
output. 
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a dynamic table |
 | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
 
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a dynamic table (i.e. an updating table). Each input 
row is expected to have a string column that indicates the change operation. 
The op column is removed from the output and the row is emitted with the 
corresponding change operation.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems 
like Debezium, Maxwell, or Canal, where events arrive as flat append-only 
records with an explicit operation field. It's also useful to be used in 
combination with the TO_CHANGELOG function, when the user wants to turn the 
append-only table back into an updating table after doing some specific 
transformation to the events.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE source_table [PARTITION BY key_col],
+  [op => DESCRIPTOR(op_column_name),]
+  [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
+)
+```
+
+`PARTITION BY` is optional when the mapping includes `UPDATE_BEFORE` (retract 
mode). It is required when the mapping produces upsert mode (no 
`UPDATE_BEFORE`), because downstream operators need a key for updates and 
deletes. When provided, records are distributed by the partition key for 
parallel processing.
+
+### Parameters
+
+| Parameter    | Required | Description |
+|:-------------|:---------|:------------|
+| `input`      | Yes      | The input table. Must be append-only. `PARTITION 
BY` is optional for retract mode (with `UPDATE_BEFORE`) and required for upsert 
mode (without `UPDATE_BEFORE`). |
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING. |
+| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined 
operation codes to change operation names. Keys are user codes (e.g., `'c'`, 
`'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, 
`UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map 
multiple codes to the same operation (e.g., `'c, r'`). When provided, only 
mapped codes are forwarded - unmapped codes are dropped. Each change operation 
may appear at most once across all entries. |
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, the following standard names are used:

Review Comment:
   ```suggestion
   When `op_mapping` is omitted, the following standard names are used. They 
allow a reverse conversion from TO_CHANGELOG by default.
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a dynamic table |
 | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
 
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a dynamic table (i.e. an updating table). Each input 
row is expected to have a string column that indicates the change operation. 
The op column is removed from the output and the row is emitted with the 
corresponding change operation.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems 
like Debezium, Maxwell, or Canal, where events arrive as flat append-only 
records with an explicit operation field. It's also useful to be used in 
combination with the TO_CHANGELOG function, when the user wants to turn the 
append-only table back into an updating table after doing some specific 
transformation to the events.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE source_table [PARTITION BY key_col],
+  [op => DESCRIPTOR(op_column_name),]
+  [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
+)
+```
+
+`PARTITION BY` is optional when the mapping includes `UPDATE_BEFORE` (retract 
mode). It is required when the mapping produces upsert mode (no 
`UPDATE_BEFORE`), because downstream operators need a key for updates and 
deletes. When provided, records are distributed by the partition key for 
parallel processing.
+
+### Parameters
+
+| Parameter    | Required | Description |
+|:-------------|:---------|:------------|
+| `input`      | Yes      | The input table. Must be append-only. `PARTITION 
BY` is optional for retract mode (with `UPDATE_BEFORE`) and required for upsert 
mode (without `UPDATE_BEFORE`). |
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING. |
+| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined 
operation codes to change operation names. Keys are user codes (e.g., `'c'`, 
`'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, 
`UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map 
multiple codes to the same operation (e.g., `'c, r'`). When provided, only 
mapped codes are forwarded - unmapped codes are dropped. Each change operation 
may appear at most once across all entries. |
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, the following standard names are used:
+
+| Input code         | Change operation  |
+|:-------------------|:------------------|
+| `'INSERT'`         | INSERT            |
+| `'UPDATE_BEFORE'`  | UPDATE_BEFORE     |
+| `'UPDATE_AFTER'`   | UPDATE_AFTER      |
+| `'DELETE'`         | DELETE            |
+
+### Output Schema
+
+The output columns are ordered as:
+
+```
+[all_input_columns_without_op]
+```
+
+The op column is removed from the output. Output rows carry the appropriate 
change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE).
+
+### Examples
+
+#### Basic usage with standard op names
+
+```sql
+-- Input (append-only):
+-- +I[id:1, op:'INSERT',        name:'Alice']
+-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
+-- +I[id:1, op:'UPDATE_AFTER',  name:'Alice2']
+-- +I[id:2, op:'DELETE',        name:'Bob']
+
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream
+)
+
+-- Output (updating table):
+-- +I[id:1, name:'Alice']
+-- -U[id:1, name:'Alice']
+-- +U[id:1, name:'Alice2']
+-- -D[id:2, name:'Bob']
+
+-- Table state after all events:
+-- | id | name   |
+-- |----|--------|
+-- | 1  | Alice2 |
+```
+
+#### Debezium-style CDC codes
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream PARTITION BY id,
+  op => DESCRIPTOR(__op),
+  op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']
+)
+-- 'c' (create) and 'r' (read/snapshot) both produce INSERT
+-- 'u' produces UPDATE_AFTER
+-- 'd' produces DELETE
+```
+
+#### Custom operation column name
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream,
+  op => DESCRIPTOR(operation)
+)
+-- The operation column named 'operation' is used instead of 'op'

Review Comment:
   add an example schema for cdc_stream in this example



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.types.ColumnList;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** Type strategies for the {@code FROM_CHANGELOG} process table function. */
+@Internal
+public final class FromChangelogTypeStrategy {
+
+    private static final String DEFAULT_OP_COLUMN_NAME = "op";
+
+    private static final Set<String> VALID_ROW_KIND_NAMES =
+            Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
+
+    // 
--------------------------------------------------------------------------------------------
+    // Input validation
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final InputTypeStrategy INPUT_TYPE_STRATEGY =
+            new InputTypeStrategy() {
+                @Override
+                public ArgumentCount getArgumentCount() {
+                    return ConstantArgumentCount.between(1, 3);
+                }
+
+                @Override
+                public Optional<List<DataType>> inferInputTypes(
+                        final CallContext callContext, final boolean 
throwOnFailure) {
+                    return validateInputs(callContext, throwOnFailure);
+                }
+
+                @Override
+                public List<Signature> getExpectedSignatures(final 
FunctionDefinition definition) {
+                    return List.of(
+                            Signature.of(Argument.of("input", "TABLE")),
+                            Signature.of(
+                                    Argument.of("input", "TABLE"), 
Argument.of("op", "DESCRIPTOR")),
+                            Signature.of(
+                                    Argument.of("input", "TABLE"),
+                                    Argument.of("op", "DESCRIPTOR"),
+                                    Argument.of("op_mapping", "MAP<STRING, 
STRING>")));
+                }

Review Comment:
   This should not be necessary? If static args are used, the InputTypeStrategy 
only serves as a validation layer and can be simplified. Feel free to introduce 
a `ValidationOnlyInputTypeStrategy` for all new PTFs.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java:
##########
@@ -105,5 +109,14 @@ interface ChangelogContext {
          * are required and {@link ChangelogMode#keyOnlyDeletes()} are 
supported.
          */
         ChangelogMode getRequiredChangelogMode();
+
+        /**
+         * Returns the value of a scalar argument at the given position.

Review Comment:
   Copy the desciption of 
`org.apache.flink.table.types.inference.CallContext#getArgumentValue`. I'm fine 
skipping isLiteral and isNull methods for now and simplicity of the interface 
here.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.types.ColumnList;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** Type strategies for the {@code FROM_CHANGELOG} process table function. */
+@Internal
+public final class FromChangelogTypeStrategy {
+
+    private static final String DEFAULT_OP_COLUMN_NAME = "op";
+
+    private static final Set<String> VALID_ROW_KIND_NAMES =
+            Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
+
+    // 
--------------------------------------------------------------------------------------------
+    // Input validation
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final InputTypeStrategy INPUT_TYPE_STRATEGY =
+            new InputTypeStrategy() {
+                @Override
+                public ArgumentCount getArgumentCount() {
+                    return ConstantArgumentCount.between(1, 3);
+                }
+
+                @Override
+                public Optional<List<DataType>> inferInputTypes(
+                        final CallContext callContext, final boolean 
throwOnFailure) {
+                    return validateInputs(callContext, throwOnFailure);
+                }
+
+                @Override
+                public List<Signature> getExpectedSignatures(final 
FunctionDefinition definition) {
+                    return List.of(
+                            Signature.of(Argument.of("input", "TABLE")),
+                            Signature.of(
+                                    Argument.of("input", "TABLE"), 
Argument.of("op", "DESCRIPTOR")),
+                            Signature.of(
+                                    Argument.of("input", "TABLE"),
+                                    Argument.of("op", "DESCRIPTOR"),
+                                    Argument.of("op_mapping", "MAP<STRING, 
STRING>")));
+                }
+            };
+
+    // 
--------------------------------------------------------------------------------------------
+    // Output type inference
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TypeStrategy OUTPUT_TYPE_STRATEGY =
+            callContext -> {
+                final TableSemantics tableSemantics =
+                        callContext
+                                .getTableSemantics(0)
+                                .orElseThrow(
+                                        () ->
+                                                new ValidationException(
+                                                        "First argument must 
be a table for FROM_CHANGELOG."));
+
+                final String opColumnName = resolveOpColumnName(callContext);
+
+                final List<Field> outputFields = 
buildOutputFields(tableSemantics, opColumnName);
+
+                return Optional.of(DataTypes.ROW(outputFields).notNull());
+            };
+
+    // 
--------------------------------------------------------------------------------------------
+    // Helpers
+    // 
--------------------------------------------------------------------------------------------
+
+    @SuppressWarnings("rawtypes")
+    private static Optional<List<DataType>> validateInputs(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final boolean isMissingTableArg = 
callContext.getTableSemantics(0).isEmpty();
+        if (isMissingTableArg) {
+            return callContext.fail(
+                    throwOnFailure, "First argument must be a table for 
FROM_CHANGELOG.");
+        }
+
+        final Optional<ColumnList> opDescriptor = 
callContext.getArgumentValue(1, ColumnList.class);
+        final boolean hasInvalidOpDescriptor =
+                opDescriptor.isPresent() && 
opDescriptor.get().getNames().size() != 1;
+        if (hasInvalidOpDescriptor) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "The descriptor for argument 'op' must contain exactly one 
column name.");
+        }
+
+        // Validate that the op column exists in the input schema and is of 
STRING type
+        final TableSemantics tableSemantics = 
callContext.getTableSemantics(0).get();
+        final String opColumnName = resolveOpColumnName(callContext);
+        final List<Field> inputFields = 
DataType.getFields(tableSemantics.dataType());
+        final Optional<Field> opField =
+                inputFields.stream().filter(f -> 
f.getName().equals(opColumnName)).findFirst();
+        if (opField.isEmpty()) {
+            return callContext.fail(
+                    throwOnFailure,
+                    String.format(
+                            "The op column '%s' does not exist in the input 
schema.",
+                            opColumnName));
+        }
+        if 
(!opField.get().getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING))
 {
+            return callContext.fail(
+                    throwOnFailure,
+                    String.format(
+                            "The op column '%s' must be of STRING type, but 
was '%s'.",
+                            opColumnName, 
opField.get().getDataType().getLogicalType()));
+        }
+
+        final boolean hasMappingArgProvided = !callContext.isArgumentNull(2);
+        final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2);
+        if (hasMappingArgProvided && !isMappingArgLiteral) {
+            return callContext.fail(
+                    throwOnFailure, "The 'op_mapping' argument must be a 
constant MAP literal.");
+        }
+
+        final Optional<Map> opMapping = callContext.getArgumentValue(2, 
Map.class);
+        if (opMapping.isPresent()) {
+            final Optional<List<DataType>> validationError =
+                    validateOpMappingValues(callContext, opMapping.get(), 
throwOnFailure);
+            if (validationError.isPresent()) {
+                return validationError;
+            }
+        }
+
+        // Upsert mode (no UPDATE_BEFORE in mapping) requires PARTITION BY for 
key-based routing
+        final boolean isUpsertMode =
+                opMapping.isPresent() && 
!containsUpdateBefore(opMapping.get());
+        final boolean hasPartitionBy = 
tableSemantics.partitionByColumns().length > 0;
+        if (isUpsertMode && !hasPartitionBy) {
+            throw new ValidationException(
+                    "Upsert changelog mode (without UPDATE_BEFORE) requires a 
PARTITION BY clause. "
+                            + "Either add PARTITION BY or include 
UPDATE_BEFORE in the op_mapping.");
+        }
+
+        return Optional.of(callContext.getArgumentDataTypes());
+    }
+
+    /**
+     * Validates op_mapping values. Values must be valid RowKind names from 
{INSERT, UPDATE_AFTER,
+     * DELETE}. Keys are arbitrary user strings (e.g., 'c', 'u', 'd') and may 
be comma-separated to
+     * map multiple user codes to the same RowKind. Each RowKind name must 
appear at most once
+     * across all entries.
+     */
+    private static Optional<List<DataType>> validateOpMappingValues(
+            final CallContext callContext,
+            final Map<?, ?> opMapping,

Review Comment:
   opMapping should already be string-string. this is guaranteed by the call 
context, the code can be simplified here.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java:
##########
@@ -52,7 +54,7 @@
  * <p>Equality is defined by reference equality.
  */
 @Internal
-public final class BuiltInFunctionDefinition implements SpecializedFunction {
+public final class BuiltInFunctionDefinition implements SpecializedFunction, 
ChangelogFunction {

Review Comment:
   If possible, I would like to avoid adding ChangelogFunction to 
BuiltInFunctionDefinition and fully solve this in the planner. Not in the 
common module. SpecializedFunctions can be specialized multiple times if 
necessary, but actually the location where ChangelogFunction is used is already 
pretty final for specialization.



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a dynamic table |

Review Comment:
   ```suggestion
   | [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into an (potentially updating) dynamic table|
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a dynamic table |
 | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
 
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a dynamic table (i.e. an updating table). Each input 
row is expected to have a string column that indicates the change operation. 
The op column is removed from the output and the row is emitted with the 
corresponding change operation.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems 
like Debezium, Maxwell, or Canal, where events arrive as flat append-only 
records with an explicit operation field. It's also useful to be used in 
combination with the TO_CHANGELOG function, when the user wants to turn the 
append-only table back into an updating table after doing some specific 
transformation to the events.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE source_table [PARTITION BY key_col],
+  [op => DESCRIPTOR(op_column_name),]
+  [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
+)
+```
+
+`PARTITION BY` is optional when the mapping includes `UPDATE_BEFORE` (retract 
mode). It is required when the mapping produces upsert mode (no 
`UPDATE_BEFORE`), because downstream operators need a key for updates and 
deletes. When provided, records are distributed by the partition key for 
parallel processing.
+
+### Parameters
+
+| Parameter    | Required | Description |
+|:-------------|:---------|:------------|
+| `input`      | Yes      | The input table. Must be append-only. `PARTITION 
BY` is optional for retract mode (with `UPDATE_BEFORE`) and required for upsert 
mode (without `UPDATE_BEFORE`). |
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING. |
+| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined 
operation codes to change operation names. Keys are user codes (e.g., `'c'`, 
`'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, 
`UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map 
multiple codes to the same operation (e.g., `'c, r'`). When provided, only 
mapped codes are forwarded - unmapped codes are dropped. Each change operation 
may appear at most once across all entries. |
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, the following standard names are used:
+
+| Input code         | Change operation  |
+|:-------------------|:------------------|
+| `'INSERT'`         | INSERT            |
+| `'UPDATE_BEFORE'`  | UPDATE_BEFORE     |
+| `'UPDATE_AFTER'`   | UPDATE_AFTER      |
+| `'DELETE'`         | DELETE            |
+
+### Output Schema
+
+The output columns are ordered as:
+
+```
+[all_input_columns_without_op]
+```
+
+The op column is removed from the output. Output rows carry the appropriate 
change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE).
+
+### Examples
+
+#### Basic usage with standard op names
+
+```sql
+-- Input (append-only):
+-- +I[id:1, op:'INSERT',        name:'Alice']
+-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
+-- +I[id:1, op:'UPDATE_AFTER',  name:'Alice2']
+-- +I[id:2, op:'DELETE',        name:'Bob']
+
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream
+)
+
+-- Output (updating table):
+-- +I[id:1, name:'Alice']
+-- -U[id:1, name:'Alice']
+-- +U[id:1, name:'Alice2']
+-- -D[id:2, name:'Bob']
+
+-- Table state after all events:
+-- | id | name   |
+-- |----|--------|
+-- | 1  | Alice2 |
+```
+
+#### Debezium-style CDC codes
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream PARTITION BY id,
+  op => DESCRIPTOR(__op),
+  op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']

Review Comment:
   Debezium also supports full and partial image. We should provide examples 
for both common use cases. One with update_before, one without. So two Debezium 
sections make sense.



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a dynamic table |
 | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
 
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a dynamic table (i.e. an updating table). Each input 
row is expected to have a string column that indicates the change operation. 
The op column is removed from the output and the row is emitted with the 
corresponding change operation.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems 
like Debezium, Maxwell, or Canal, where events arrive as flat append-only 
records with an explicit operation field. It's also useful to be used in 
combination with the TO_CHANGELOG function, when the user wants to turn the 
append-only table back into an updating table after doing some specific 
transformation to the events.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE source_table [PARTITION BY key_col],
+  [op => DESCRIPTOR(op_column_name),]
+  [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
+)
+```
+
+`PARTITION BY` is optional when the mapping includes `UPDATE_BEFORE` (retract 
mode). It is required when the mapping produces upsert mode (no 
`UPDATE_BEFORE`), because downstream operators need a key for updates and 
deletes. When provided, records are distributed by the partition key for 
parallel processing.
+
+### Parameters
+
+| Parameter    | Required | Description |
+|:-------------|:---------|:------------|
+| `input`      | Yes      | The input table. Must be append-only. `PARTITION 
BY` is optional for retract mode (with `UPDATE_BEFORE`) and required for upsert 
mode (without `UPDATE_BEFORE`). |
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING. |
+| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined 
operation codes to change operation names. Keys are user codes (e.g., `'c'`, 
`'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, 
`UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map 
multiple codes to the same operation (e.g., `'c, r'`). When provided, only 
mapped codes are forwarded - unmapped codes are dropped. Each change operation 
may appear at most once across all entries. |

Review Comment:
   ```suggestion
   | `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined 
operation codes to change operation names. Keys are user-defined codes (e.g., 
`'c'`, `'u'`, `'d'`), values are Flink change operation names (`INSERT`, 
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated 
codes to map multiple codes to the same operation (e.g., `'c, r'`). When 
provided, only mapped codes are forwarded - unmapped codes are dropped. Each 
change operation may appear at most once across all entries. |
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a dynamic table |
 | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
 
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a dynamic table (i.e. an updating table). Each input 
row is expected to have a string column that indicates the change operation. 
The op column is removed from the output and the row is emitted with the 
corresponding change operation.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems 
like Debezium, Maxwell, or Canal, where events arrive as flat append-only 
records with an explicit operation field. It's also useful to be used in 
combination with the TO_CHANGELOG function, when the user wants to turn the 
append-only table back into an updating table after doing some specific 
transformation to the events.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE source_table [PARTITION BY key_col],
+  [op => DESCRIPTOR(op_column_name),]
+  [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
+)
+```
+
+`PARTITION BY` is optional when the mapping includes `UPDATE_BEFORE` (retract 
mode). It is required when the mapping produces upsert mode (no 
`UPDATE_BEFORE`), because downstream operators need a key for updates and 
deletes. When provided, records are distributed by the partition key for 
parallel processing.
+
+### Parameters
+
+| Parameter    | Required | Description |
+|:-------------|:---------|:------------|
+| `input`      | Yes      | The input table. Must be append-only. `PARTITION 
BY` is optional for retract mode (with `UPDATE_BEFORE`) and required for upsert 
mode (without `UPDATE_BEFORE`). |
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING. |
+| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined 
operation codes to change operation names. Keys are user codes (e.g., `'c'`, 
`'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, 
`UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map 
multiple codes to the same operation (e.g., `'c, r'`). When provided, only 
mapped codes are forwarded - unmapped codes are dropped. Each change operation 
may appear at most once across all entries. |
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, the following standard names are used:
+
+| Input code         | Change operation  |
+|:-------------------|:------------------|
+| `'INSERT'`         | INSERT            |
+| `'UPDATE_BEFORE'`  | UPDATE_BEFORE     |
+| `'UPDATE_AFTER'`   | UPDATE_AFTER      |
+| `'DELETE'`         | DELETE            |
+
+### Output Schema
+
+The output columns are ordered as:
+
+```
+[all_input_columns_without_op]
+```
+
+The op column is removed from the output. Output rows carry the appropriate 
change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE).
+
+### Examples
+
+#### Basic usage with standard op names
+
+```sql
+-- Input (append-only):
+-- +I[id:1, op:'INSERT',        name:'Alice']
+-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
+-- +I[id:1, op:'UPDATE_AFTER',  name:'Alice2']
+-- +I[id:2, op:'DELETE',        name:'Bob']
+
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream
+)
+
+-- Output (updating table):
+-- +I[id:1, name:'Alice']
+-- -U[id:1, name:'Alice']
+-- +U[id:1, name:'Alice2']
+-- -D[id:2, name:'Bob']
+
+-- Table state after all events:
+-- | id | name   |
+-- |----|--------|
+-- | 1  | Alice2 |
+```
+
+#### Debezium-style CDC codes
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream PARTITION BY id,
+  op => DESCRIPTOR(__op),

Review Comment:
   as far as I know Debezium uses `op` and not `__op`



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a dynamic table |
 | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
 
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a dynamic table (i.e. an updating table). Each input 
row is expected to have a string column that indicates the change operation. 
The op column is removed from the output and the row is emitted with the 
corresponding change operation.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems 
like Debezium, Maxwell, or Canal, where events arrive as flat append-only 
records with an explicit operation field. It's also useful to be used in 
combination with the TO_CHANGELOG function, when the user wants to turn the 
append-only table back into an updating table after doing some specific 
transformation to the events.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE source_table [PARTITION BY key_col],
+  [op => DESCRIPTOR(op_column_name),]
+  [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
+)
+```
+
+`PARTITION BY` is optional when the mapping includes `UPDATE_BEFORE` (retract 
mode). It is required when the mapping produces upsert mode (no 
`UPDATE_BEFORE`), because downstream operators need a key for updates and 
deletes. When provided, records are distributed by the partition key for 
parallel processing.

Review Comment:
   ```suggestion
   `PARTITION BY` is optional when the mapping includes `UPDATE_BEFORE` 
(retract mode). It is required when the mapping produces upsert mode (no 
`UPDATE_BEFORE`), because downstream operators need a key for updates and 
deletes. When provided, records are distributed by the partition key for 
parallel processing.
   
   Note: Please double check whether your CDC data encodes an update using a 
full image (i.e. capturing the state before and after the update) or partial 
image (i.e. capturing only the state after the update). FROM_CHANGELOG is very 
powerful function but might produce incorrect results in subsequent operations 
and tables, if not configured correctly. 
   ```



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a dynamic table |
 | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
 
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a dynamic table (i.e. an updating table). Each input 
row is expected to have a string column that indicates the change operation. 
The op column is removed from the output and the row is emitted with the 
corresponding change operation.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems 
like Debezium, Maxwell, or Canal, where events arrive as flat append-only 
records with an explicit operation field. It's also useful to be used in 
combination with the TO_CHANGELOG function, when the user wants to turn the 
append-only table back into an updating table after doing some specific 
transformation to the events.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE source_table [PARTITION BY key_col],
+  [op => DESCRIPTOR(op_column_name),]
+  [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
+)
+```
+
+`PARTITION BY` is optional when the mapping includes `UPDATE_BEFORE` (retract 
mode). It is required when the mapping produces upsert mode (no 
`UPDATE_BEFORE`), because downstream operators need a key for updates and 
deletes. When provided, records are distributed by the partition key for 
parallel processing.
+
+### Parameters
+
+| Parameter    | Required | Description |
+|:-------------|:---------|:------------|
+| `input`      | Yes      | The input table. Must be append-only. `PARTITION 
BY` is optional for retract mode (with `UPDATE_BEFORE`) and required for upsert 
mode (without `UPDATE_BEFORE`). |
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING. |
+| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined 
operation codes to change operation names. Keys are user codes (e.g., `'c'`, 
`'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_BEFORE`, 
`UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map 
multiple codes to the same operation (e.g., `'c, r'`). When provided, only 
mapped codes are forwarded - unmapped codes are dropped. Each change operation 
may appear at most once across all entries. |
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, the following standard names are used:
+
+| Input code         | Change operation  |
+|:-------------------|:------------------|
+| `'INSERT'`         | INSERT            |
+| `'UPDATE_BEFORE'`  | UPDATE_BEFORE     |
+| `'UPDATE_AFTER'`   | UPDATE_AFTER      |
+| `'DELETE'`         | DELETE            |
+
+### Output Schema
+
+The output columns are ordered as:
+
+```
+[all_input_columns_without_op]
+```
+
+The op column is removed from the output. Output rows carry the appropriate 
change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE).
+
+### Examples
+
+#### Basic usage with standard op names
+
+```sql
+-- Input (append-only):
+-- +I[id:1, op:'INSERT',        name:'Alice']
+-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
+-- +I[id:1, op:'UPDATE_AFTER',  name:'Alice2']
+-- +I[id:2, op:'DELETE',        name:'Bob']
+
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream
+)
+
+-- Output (updating table):
+-- +I[id:1, name:'Alice']
+-- -U[id:1, name:'Alice']
+-- +U[id:1, name:'Alice2']
+-- -D[id:2, name:'Bob']
+
+-- Table state after all events:
+-- | id | name   |
+-- |----|--------|
+-- | 1  | Alice2 |
+```
+
+#### Debezium-style CDC codes
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream PARTITION BY id,
+  op => DESCRIPTOR(__op),
+  op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']
+)
+-- 'c' (create) and 'r' (read/snapshot) both produce INSERT
+-- 'u' produces UPDATE_AFTER
+-- 'd' produces DELETE
+```
+
+#### Custom operation column name
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream,
+  op => DESCRIPTOR(operation)
+)
+-- The operation column named 'operation' is used instead of 'op'
+```
+
+#### Table API
+
+```java
+// Default (retract mode): reads 'op' column with standard change operation 
names
+Table result = cdcStream.fromChangelog();
+
+// Upsert mode requires PARTITION BY — use the generic process() method
+Table result = cdcStream.partitionBy($("id")).process("FROM_CHANGELOG",

Review Comment:
   format this nicer, across multiple lines



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##########
@@ -802,6 +805,31 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
                             
"org.apache.flink.table.runtime.functions.ptf.ToChangelogFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition FROM_CHANGELOG =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("FROM_CHANGELOG")
+                    .kind(PROCESS_TABLE)
+                    .staticArguments(
+                            StaticArgument.table(
+                                    "input",
+                                    Row.class,
+                                    false,
+                                    EnumSet.of(
+                                            StaticArgumentTrait.TABLE,
+                                            
StaticArgumentTrait.SET_SEMANTIC_TABLE,

Review Comment:
   Are we planning ROW SEMANTICS by default once @gustavodemorais conditional 
traits PR is in?



##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,117 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a dynamic table |
 | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
 
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a dynamic table (i.e. an updating table). Each input 
row is expected to have a string column that indicates the change operation. 
The op column is removed from the output and the row is emitted with the 
corresponding change operation.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems 
like Debezium, Maxwell, or Canal, where events arrive as flat append-only 
records with an explicit operation field. It's also useful to be used in 
combination with the TO_CHANGELOG function, when the user wants to turn the 
append-only table back into an updating table after doing some specific 
transformation to the events.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE source_table [PARTITION BY key_col],
+  [op => DESCRIPTOR(op_column_name),]
+  [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
+)
+```
+
+`PARTITION BY` is optional when the mapping includes `UPDATE_BEFORE` (retract 
mode). It is required when the mapping produces upsert mode (no 
`UPDATE_BEFORE`), because downstream operators need a key for updates and 
deletes. When provided, records are distributed by the partition key for 
parallel processing.

Review Comment:
   Link to a helpful section (maybe above?) that explains the concepts what 
full and partial image means.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.types.ColumnList;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** Type strategies for the {@code FROM_CHANGELOG} process table function. */
+@Internal
+public final class FromChangelogTypeStrategy {
+
+    private static final String DEFAULT_OP_COLUMN_NAME = "op";
+
+    private static final Set<String> VALID_ROW_KIND_NAMES =
+            Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
+
+    // 
--------------------------------------------------------------------------------------------
+    // Input validation
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final InputTypeStrategy INPUT_TYPE_STRATEGY =
+            new InputTypeStrategy() {
+                @Override
+                public ArgumentCount getArgumentCount() {
+                    return ConstantArgumentCount.between(1, 3);
+                }
+
+                @Override
+                public Optional<List<DataType>> inferInputTypes(
+                        final CallContext callContext, final boolean 
throwOnFailure) {
+                    return validateInputs(callContext, throwOnFailure);
+                }
+
+                @Override
+                public List<Signature> getExpectedSignatures(final 
FunctionDefinition definition) {
+                    return List.of(
+                            Signature.of(Argument.of("input", "TABLE")),
+                            Signature.of(
+                                    Argument.of("input", "TABLE"), 
Argument.of("op", "DESCRIPTOR")),
+                            Signature.of(
+                                    Argument.of("input", "TABLE"),
+                                    Argument.of("op", "DESCRIPTOR"),
+                                    Argument.of("op_mapping", "MAP<STRING, 
STRING>")));
+                }
+            };
+
+    // 
--------------------------------------------------------------------------------------------
+    // Output type inference
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TypeStrategy OUTPUT_TYPE_STRATEGY =
+            callContext -> {
+                final TableSemantics tableSemantics =
+                        callContext
+                                .getTableSemantics(0)
+                                .orElseThrow(
+                                        () ->
+                                                new ValidationException(
+                                                        "First argument must 
be a table for FROM_CHANGELOG."));
+
+                final String opColumnName = resolveOpColumnName(callContext);
+
+                final List<Field> outputFields = 
buildOutputFields(tableSemantics, opColumnName);
+
+                return Optional.of(DataTypes.ROW(outputFields).notNull());
+            };
+
+    // 
--------------------------------------------------------------------------------------------
+    // Helpers
+    // 
--------------------------------------------------------------------------------------------
+
+    @SuppressWarnings("rawtypes")
+    private static Optional<List<DataType>> validateInputs(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final boolean isMissingTableArg = 
callContext.getTableSemantics(0).isEmpty();
+        if (isMissingTableArg) {
+            return callContext.fail(
+                    throwOnFailure, "First argument must be a table for 
FROM_CHANGELOG.");
+        }
+
+        final Optional<ColumnList> opDescriptor = 
callContext.getArgumentValue(1, ColumnList.class);
+        final boolean hasInvalidOpDescriptor =
+                opDescriptor.isPresent() && 
opDescriptor.get().getNames().size() != 1;
+        if (hasInvalidOpDescriptor) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "The descriptor for argument 'op' must contain exactly one 
column name.");
+        }
+
+        // Validate that the op column exists in the input schema and is of 
STRING type
+        final TableSemantics tableSemantics = 
callContext.getTableSemantics(0).get();
+        final String opColumnName = resolveOpColumnName(callContext);
+        final List<Field> inputFields = 
DataType.getFields(tableSemantics.dataType());
+        final Optional<Field> opField =
+                inputFields.stream().filter(f -> 
f.getName().equals(opColumnName)).findFirst();
+        if (opField.isEmpty()) {
+            return callContext.fail(
+                    throwOnFailure,
+                    String.format(
+                            "The op column '%s' does not exist in the input 
schema.",
+                            opColumnName));
+        }
+        if 
(!opField.get().getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING))
 {
+            return callContext.fail(
+                    throwOnFailure,
+                    String.format(
+                            "The op column '%s' must be of STRING type, but 
was '%s'.",
+                            opColumnName, 
opField.get().getDataType().getLogicalType()));
+        }
+
+        final boolean hasMappingArgProvided = !callContext.isArgumentNull(2);
+        final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2);
+        if (hasMappingArgProvided && !isMappingArgLiteral) {
+            return callContext.fail(
+                    throwOnFailure, "The 'op_mapping' argument must be a 
constant MAP literal.");
+        }
+
+        final Optional<Map> opMapping = callContext.getArgumentValue(2, 
Map.class);
+        if (opMapping.isPresent()) {
+            final Optional<List<DataType>> validationError =
+                    validateOpMappingValues(callContext, opMapping.get(), 
throwOnFailure);
+            if (validationError.isPresent()) {
+                return validationError;
+            }
+        }
+
+        // Upsert mode (no UPDATE_BEFORE in mapping) requires PARTITION BY for 
key-based routing
+        final boolean isUpsertMode =
+                opMapping.isPresent() && 
!containsUpdateBefore(opMapping.get());
+        final boolean hasPartitionBy = 
tableSemantics.partitionByColumns().length > 0;
+        if (isUpsertMode && !hasPartitionBy) {
+            throw new ValidationException(
+                    "Upsert changelog mode (without UPDATE_BEFORE) requires a 
PARTITION BY clause. "
+                            + "Either add PARTITION BY or include 
UPDATE_BEFORE in the op_mapping.");
+        }
+
+        return Optional.of(callContext.getArgumentDataTypes());
+    }
+
+    /**
+     * Validates op_mapping values. Values must be valid RowKind names from 
{INSERT, UPDATE_AFTER,
+     * DELETE}. Keys are arbitrary user strings (e.g., 'c', 'u', 'd') and may 
be comma-separated to
+     * map multiple user codes to the same RowKind. Each RowKind name must 
appear at most once
+     * across all entries.
+     */
+    private static Optional<List<DataType>> validateOpMappingValues(
+            final CallContext callContext,
+            final Map<?, ?> opMapping,
+            final boolean throwOnFailure) {
+        final Set<String> allRowKindsSeen = new HashSet<>();
+
+        for (final Entry<?, ?> entry : opMapping.entrySet()) {
+            if (!(entry.getKey() instanceof String)) {
+                return callContext.fail(
+                        throwOnFailure, "Invalid target mapping for argument 
'op_mapping'.");
+            }
+            final Object value = entry.getValue();
+            if (!(value instanceof String)) {
+                return callContext.fail(
+                        throwOnFailure, "Invalid target mapping for argument 
'op_mapping'.");
+            }
+            final String rowKindName = ((String) value).trim();
+            if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) {
+                return callContext.fail(
+                        throwOnFailure,
+                        String.format(
+                                "Invalid target mapping for argument 
'op_mapping'. "
+                                        + "Unknown change operation: '%s'. 
Valid values are: %s.",
+                                rowKindName, VALID_ROW_KIND_NAMES));
+            }
+            final boolean isDuplicate = !allRowKindsSeen.add(rowKindName);
+            if (isDuplicate) {
+                return callContext.fail(
+                        throwOnFailure,
+                        String.format(
+                                "Invalid target mapping for argument 
'op_mapping'. "
+                                        + "Duplicate change operation: '%s'. "
+                                        + "Use comma-separated keys to map 
multiple codes to the same operation "
+                                        + "(e.g., MAP['c, r', 'INSERT']).",
+                                rowKindName));
+            }
+        }
+        return Optional.empty();
+    }
+
+    private static String resolveOpColumnName(final CallContext callContext) {
+        return callContext
+                .getArgumentValue(1, ColumnList.class)
+                .filter(cl -> !cl.getNames().isEmpty())
+                .map(cl -> cl.getNames().get(0))
+                .orElse(DEFAULT_OP_COLUMN_NAME);
+    }
+
+    /** Returns true if the op_mapping values contain UPDATE_BEFORE. */
+    @SuppressWarnings("rawtypes")

Review Comment:
   instead of suppressing warnings, fix generics at the root



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to