This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2283c216d3694f268163c30bd1d2628b4270af58
Author: Ramin Gharib <[email protected]>
AuthorDate: Mon Apr 20 10:19:55 2026 +0200

    [FLINK-39261][table] Add FROM_CHANGELOG built-in process table function
    
    This closes #27901.
    
    (cherry picked from commit 4997ca40a3f757088be05f117612066cc3382a4f)
---
 .../docs/sql/reference/queries/changelog.md        | 112 ++++++++-
 .../pyflink/table/tests/test_table_completeness.py |   1 +
 .../java/org/apache/flink/table/api/Table.java     |  27 +++
 .../apache/flink/table/api/internal/TableImpl.java |   5 +
 .../table/functions/BuiltInFunctionDefinition.java |  28 ++-
 .../functions/BuiltInFunctionDefinitions.java      |  27 +++
 .../table/functions/ChangelogModeStrategy.java     |  37 +++
 .../strategies/FromChangelogTypeStrategy.java      | 253 +++++++++++++++++++++
 .../strategies/SpecificInputTypeStrategies.java    |   4 +
 .../strategies/SpecificTypeStrategies.java         |   4 +
 .../ValidationOnlyInputTypeStrategy.java           |  49 ++++
 .../FromChangelogInputTypeStrategyTest.java        | 167 ++++++++++++++
 .../FlinkChangelogModeInferenceProgram.scala       |  32 ++-
 .../exec/stream/FromChangelogSemanticTests.java    |  49 ++++
 .../exec/stream/FromChangelogTestPrograms.java     | 210 +++++++++++++++++
 .../exec/stream/ProcessTableFunctionTestUtils.java |  15 ++
 .../planner/plan/stream/sql/FromChangelogTest.java |  77 +++++++
 .../plan/stream/sql/ProcessTableFunctionTest.java  |  14 +-
 .../planner/plan/stream/sql/FromChangelogTest.xml  |  57 +++++
 .../plan/stream/sql/ProcessTableFunctionTest.xml   |  20 ++
 .../functions/ptf/FromChangelogFunction.java       | 147 ++++++++++++
 21 files changed, 1324 insertions(+), 11 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
index 61843ab5c2f..f4449299388 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -30,9 +30,119 @@ Flink SQL provides built-in process table functions (PTFs) 
for working with chan
 
 | Function | Description |
 |:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with 
operation codes into a (potentially updating) dynamic table |
 | [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
 
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit 
operation code column into a (potentially updating) dynamic table. Each input 
row is expected to have a string column that indicates the change operation. 
The operation column is interpreted by the engine and removed from the output.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems 
like Debezium where events arrive as flat append-only records with an explicit 
operation field. It's also useful to be used in combination with the 
TO_CHANGELOG function, when converting the append-only table back into an 
updating table after doing some specific transformation to the events.
+
+Note: This version requires that your CDC data encodes updates using a full 
image (i.e. providing separate events for before and after the update). Please 
double-check whether your source provides both UPDATE_BEFORE and UPDATE_AFTER 
events. FROM_CHANGELOG is a very powerful function but might produce incorrect 
results in subsequent operations and tables, if not configured correctly.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE source_table,
+  [op => DESCRIPTOR(op_column_name),]
+  [op_mapping => MAP[
+      'c, r', 'INSERT',
+      'ub', 'UPDATE_BEFORE',
+      'ua', 'UPDATE_AFTER',
+      'd', 'DELETE'
+  ]]
+)
+```
+
+### Parameters
+
+| Parameter    | Required | Description                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|:-------------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| `input`      | Yes      | The input table. Must be append-only.              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING.                                                    
                                                                                
                                                                                
                                                                                
                    [...]
+| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined codes 
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, 
`'u'`, `'d'`), values are Flink change operation names (`INSERT`, 
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated 
codes to map multiple codes to the same operation (e.g., `'c, r'`). When 
provided, only mapped codes are forwarded - unmapped codes are dropped. Each 
change operation may appear at most once acro [...]
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, the following standard names are used. They 
allow a reverse conversion from TO_CHANGELOG by default.
+
+| Input code         | Change operation  |
+|:-------------------|:------------------|
+| `'INSERT'`         | INSERT            |
+| `'UPDATE_BEFORE'`  | UPDATE_BEFORE     |
+| `'UPDATE_AFTER'`   | UPDATE_AFTER      |
+| `'DELETE'`         | DELETE            |
+
+### Output Schema
+
+The output contains all input columns except the operation code (e.g., op) 
column, which is interpreted by Flink's SQL engine and removed. Each output row 
carries the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, 
or DELETE).
+
+```
+[all_input_columns_without_op]
+```
+
+### Examples
+
+#### Basic usage with standard op names
+
+```sql
+-- Input (append-only):
+-- +I[id:1, op:'INSERT',        name:'Alice']
+-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
+-- +I[id:1, op:'UPDATE_AFTER',  name:'Alice2']
+-- +I[id:2, op:'DELETE',        name:'Bob']
+
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream
+)
+
+-- Output (updating table):
+-- +I[id:1, name:'Alice']
+-- -U[id:1, name:'Alice']
+-- +U[id:1, name:'Alice2']
+-- -D[id:2, name:'Bob']
+
+-- Table state after all events:
+-- | id | name   |
+-- |----|--------|
+-- | 1  | Alice2 |
+```
+
+#### Custom operation column name
+
+```sql
+-- Source schema: id INT, operation STRING, name STRING
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream,
+  op => DESCRIPTOR(operation)
+)
+-- The operation column named 'operation' is used instead of 'op'
+```
+
+#### Table API
+
+```java
+Table cdcStream = ...;
+
+// Default: reads 'op' column with standard change operation names
+Table result = cdcStream.fromChangelog();
+
+// With custom op column name
+Table result = cdcStream.fromChangelog(
+    descriptor("operation").asArgument("op")
+);
+
+// With custom op_mapping
+Table result = cdcStream.fromChangelog(
+    descriptor("op").asArgument("op"),
+    map("c, r", "INSERT",
+        "ub", "UPDATE_BEFORE",
+        "ua", "UPDATE_AFTER",
+        "d", "DELETE").asArgument("op_mapping")
+);
+```
 
 ## TO_CHANGELOG
 
diff --git a/flink-python/pyflink/table/tests/test_table_completeness.py 
b/flink-python/pyflink/table/tests/test_table_completeness.py
index feca7e63b1a..1f2b920474f 100644
--- a/flink-python/pyflink/table/tests/test_table_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_completeness.py
@@ -42,6 +42,7 @@ class 
TableAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
             'asArgument',
             'process',
             'partitionBy',
+            'fromChangelog',
         }
 
     @classmethod
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index ef1785aecdd..172adfe1354 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1454,4 +1454,31 @@ public interface Table extends Explainable<Table>, 
Executable {
      * @return an append-only {@link Table} with an {@code op} column 
prepended to the input columns
      */
     Table toChangelog(Expression... arguments);
+
+    /**
+     * Converts this append-only table with an explicit operation code column 
into a dynamic table
+     * using the built-in {@code FROM_CHANGELOG} process table function.
+     *
+     * <p>Each input row is expected to have a string operation code column 
(default: {@code "op"})
+     * that indicates the change operation (e.g., INSERT, UPDATE_AFTER, 
UPDATE_BEFORE, DELETE). The
+     * output table is a dynamic table backed by a changelog stream.
+     *
+     * <p>Optional arguments can be passed using named expressions:
+     *
+     * <pre>{@code
+     * // Default: reads 'op' column with standard change operation names
+     * table.fromChangelog();
+     *
+     * // Custom op column name and mapping (Debezium-style codes)
+     * table.fromChangelog(
+     *     descriptor("__op").asArgument("op"),
+     *     map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", 
"DELETE").asArgument("op_mapping")
+     * );
+     * }</pre>
+     *
+     * @param arguments optional named arguments for {@code op} and {@code 
op_mapping}
+     * @return a dynamic {@link Table} with the op column removed and proper 
change operation
+     *     semantics
+     */
+    Table fromChangelog(Expression... arguments);
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index f63d4032067..a539977532f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -497,6 +497,11 @@ public class TableImpl implements Table {
         return new PartitionedTableImpl(this, Arrays.asList(fields));
     }
 
+    @Override
+    public Table fromChangelog(Expression... arguments) {
+        return process(BuiltInFunctionDefinitions.FROM_CHANGELOG.getName(), 
(Object[]) arguments);
+    }
+
     @Override
     public ApiExpression asArgument(String name) {
         return createArgumentExpression(operationTree, tableEnvironment, name);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
index 77cfaf71667..03e4780e018 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java
@@ -72,6 +72,8 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
 
     private final SqlCallSyntax sqlCallSyntax;
 
+    private final @Nullable ChangelogModeStrategy changelogModeStrategy;
+
     private final String sqlName;
 
     private BuiltInFunctionDefinition(
@@ -84,7 +86,8 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
             boolean isDeterministic,
             boolean isRuntimeProvided,
             String runtimeClass,
-            boolean isInternal) {
+            boolean isInternal,
+            @Nullable ChangelogModeStrategy changelogModeStrategy) {
         this.name = checkNotNull(name, "Name must not be null.");
         this.sqlName = sqlName;
         this.version = isInternal ? null : version;
@@ -95,6 +98,7 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
         this.runtimeClass = runtimeClass;
         this.isInternal = isInternal;
         this.sqlCallSyntax = sqlCallSyntax;
+        this.changelogModeStrategy = changelogModeStrategy;
         validateFunction(this.name, this.version, this.isInternal);
     }
 
@@ -131,6 +135,14 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
         return isInternal;
     }
 
+    /**
+     * Returns the optional {@link ChangelogModeStrategy} for built-in PTFs 
that emit updates (e.g.,
+     * FROM_CHANGELOG). The planner uses this to determine the output 
changelog mode.
+     */
+    public Optional<ChangelogModeStrategy> getChangelogModeStrategy() {
+        return Optional.ofNullable(changelogModeStrategy);
+    }
+
     public String getQualifiedName() {
         if (isInternal) {
             return name;
@@ -253,6 +265,8 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
 
         private SqlCallSyntax sqlCallSyntax = SqlCallSyntax.FUNCTION;
 
+        private @Nullable ChangelogModeStrategy changelogModeStrategy;
+
         public Builder() {
             // default constructor to allow a fluent definition
         }
@@ -399,6 +413,15 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
             return this;
         }
 
+        /**
+         * Sets the {@link ChangelogModeStrategy} that determines the output 
changelog mode for this
+         * built-in PTF. Only needed for PTFs that emit updates (e.g., 
FROM_CHANGELOG).
+         */
+        public Builder changelogModeStrategy(ChangelogModeStrategy 
changelogModeStrategy) {
+            this.changelogModeStrategy = changelogModeStrategy;
+            return this;
+        }
+
         public BuiltInFunctionDefinition build() {
             return new BuiltInFunctionDefinition(
                     name,
@@ -410,7 +433,8 @@ public final class BuiltInFunctionDefinition implements 
SpecializedFunction {
                     isDeterministic,
                     isRuntimeProvided,
                     runtimeClass,
-                    isInternal);
+                    isInternal,
+                    changelogModeStrategy);
         }
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 0f2f35a056a..0ee62f1116a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.api.JsonQueryWrapper;
 import org.apache.flink.table.api.JsonType;
 import org.apache.flink.table.api.JsonValueOnEmptyOrError;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.expressions.TimeIntervalUnit;
 import org.apache.flink.table.expressions.TimePointUnit;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -106,6 +107,7 @@ import static 
org.apache.flink.table.types.inference.TypeStrategies.nullableIfAr
 import static 
org.apache.flink.table.types.inference.TypeStrategies.varyingString;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_ELEMENT_ARG;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE;
+import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.FROM_CHANGELOG_INPUT_TYPE_STRATEGY;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.INDEX;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ML_PREDICT_INPUT_TYPE_STRATEGY;
@@ -115,6 +117,7 @@ import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTyp
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentage;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentageArray;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ARRAY_APPEND_PREPEND;
+import static 
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ML_PREDICT_OUTPUT_TYPE_STRATEGY;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.TO_CHANGELOG_OUTPUT_TYPE_STRATEGY;
 
@@ -809,6 +812,30 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.ptf.ToChangelogFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition FROM_CHANGELOG =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("FROM_CHANGELOG")
+                    .kind(PROCESS_TABLE)
+                    .staticArguments(
+                            StaticArgument.table(
+                                    "input",
+                                    Row.class,
+                                    false,
+                                    EnumSet.of(
+                                            StaticArgumentTrait.TABLE,
+                                            
StaticArgumentTrait.ROW_SEMANTIC_TABLE)),
+                            StaticArgument.scalar("op", 
DataTypes.DESCRIPTOR(), true),
+                            StaticArgument.scalar(
+                                    "op_mapping",
+                                    DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()),
+                                    true))
+                    .changelogModeStrategy(ctx -> ChangelogMode.all())
+                    .inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                    .outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY)
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.ptf.FromChangelogFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition GREATEST =
             BuiltInFunctionDefinition.newBuilder()
                     .name("GREATEST")
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogModeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogModeStrategy.java
new file mode 100644
index 00000000000..53a561c74a4
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogModeStrategy.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext;
+import org.apache.flink.table.types.inference.TypeStrategy;
+
+/**
+ * Strategy for determining the output {@link ChangelogMode} of a built-in 
process table function.
+ *
+ * <p>Similar to {@link TypeStrategy}, this is used to declare changelog 
semantics in the function
+ * definition rather than implementing the {@link ChangelogFunction} interface.
+ */
+@Internal
+public interface ChangelogModeStrategy {
+
+    /** Infers the output {@link ChangelogMode} based on the given {@link 
ChangelogContext}. */
+    ChangelogMode inferChangelogMode(ChangelogContext changelogContext);
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
new file mode 100644
index 00000000000..4d7b25fbd60
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.types.ColumnList;
+import org.apache.flink.types.RowKind;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Type strategies for the {@code FROM_CHANGELOG} process table function. */
+@Internal
+public final class FromChangelogTypeStrategy {
+
+    private static final String DEFAULT_OP_COLUMN_NAME = "op";
+
+    private static final Set<String> VALID_ROW_KIND_NAMES =
+            Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
+
+    // 
--------------------------------------------------------------------------------------------
+    // Input validation
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final InputTypeStrategy INPUT_TYPE_STRATEGY =
+            new ValidationOnlyInputTypeStrategy() {
+                @Override
+                public Optional<List<DataType>> inferInputTypes(
+                        final CallContext callContext, final boolean 
throwOnFailure) {
+                    return validateInputs(callContext, throwOnFailure);
+                }
+            };
+
+    // 
--------------------------------------------------------------------------------------------
+    // Output type inference
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TypeStrategy OUTPUT_TYPE_STRATEGY =
+            callContext -> {
+                final TableSemantics tableSemantics =
+                        callContext
+                                .getTableSemantics(0)
+                                .orElseThrow(
+                                        () ->
+                                                new ValidationException(
+                                                        "First argument must 
be a table for FROM_CHANGELOG."));
+
+                final String opColumnName = resolveOpColumnName(callContext);
+
+                final List<Field> outputFields = 
buildOutputFields(tableSemantics, opColumnName);
+
+                return Optional.of(DataTypes.ROW(outputFields).notNull());
+            };
+    private static final String UPDATE_BEFORE = RowKind.UPDATE_BEFORE.name();
+    private static final String UPDATE_AFTER = RowKind.UPDATE_AFTER.name();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Helpers
+    // 
--------------------------------------------------------------------------------------------
+
+    @SuppressWarnings("rawtypes")
+    private static Optional<List<DataType>> validateInputs(
+            final CallContext callContext, final boolean throwOnFailure) {
+        Optional<List<DataType>> error;
+
+        error = validateTableArg(callContext, throwOnFailure);
+        if (error.isPresent()) {
+            return error;
+        }
+
+        error = validateOpDescriptor(callContext, throwOnFailure);
+        if (error.isPresent()) {
+            return error;
+        }
+
+        error = validateOpColumn(callContext, throwOnFailure);
+        if (error.isPresent()) {
+            return error;
+        }
+
+        error = validateOpMapping(callContext, throwOnFailure);
+        if (error.isPresent()) {
+            return error;
+        }
+
+        return Optional.of(callContext.getArgumentDataTypes());
+    }
+
+    private static Optional<List<DataType>> validateTableArg(
+            final CallContext callContext, final boolean throwOnFailure) {
+        if (callContext.getTableSemantics(0).isEmpty()) {
+            return callContext.fail(
+                    throwOnFailure, "First argument must be a table for 
FROM_CHANGELOG.");
+        }
+        return Optional.empty();
+    }
+
+    private static Optional<List<DataType>> validateOpDescriptor(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final Optional<ColumnList> opDescriptor = 
callContext.getArgumentValue(1, ColumnList.class);
+        if (opDescriptor.isPresent() && opDescriptor.get().getNames().size() 
!= 1) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "The descriptor for argument 'op' must contain exactly one 
column name.");
+        }
+        return Optional.empty();
+    }
+
+    /** Validates that the op column exists in the input schema and is of 
STRING type. */
+    private static Optional<List<DataType>> validateOpColumn(
+            final CallContext callContext, final boolean throwOnFailure) {
+
+        final TableSemantics tableSemantics = 
callContext.getTableSemantics(0).get();
+        final String opColumnName = resolveOpColumnName(callContext);
+        final List<Field> inputFields = 
DataType.getFields(tableSemantics.dataType());
+        final Optional<Field> opField =
+                inputFields.stream().filter(f -> 
f.getName().equals(opColumnName)).findFirst();
+        if (opField.isEmpty()) {
+            return callContext.fail(
+                    throwOnFailure,
+                    String.format(
+                            "The op column '%s' does not exist in the input 
schema.",
+                            opColumnName));
+        }
+        if 
(!opField.get().getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING))
 {
+            return callContext.fail(
+                    throwOnFailure,
+                    String.format(
+                            "The op column '%s' must be of STRING type, but 
was '%s'.",
+                            opColumnName, 
opField.get().getDataType().getLogicalType()));
+        }
+        return Optional.empty();
+    }
+
+    /** Validates op_mapping is a literal and its values are valid change 
operation names. */
+    @SuppressWarnings("unchecked")
+    private static Optional<List<DataType>> validateOpMapping(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final boolean hasMappingArgProvided = !callContext.isArgumentNull(2);
+        final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2);
+        if (hasMappingArgProvided && !isMappingArgLiteral) {
+            return callContext.fail(
+                    throwOnFailure, "The 'op_mapping' argument must be a 
constant MAP literal.");
+        }
+
+        final Optional<Map> opMapping = callContext.getArgumentValue(2, 
Map.class);
+        if (opMapping.isPresent()) {
+            final Map<String, String> mapping = opMapping.get();
+            final Optional<List<DataType>> validationError =
+                    validateOpMappingValues(callContext, mapping, 
throwOnFailure);
+            if (validationError.isPresent()) {
+                return validationError;
+            }
+
+            final boolean hasUpdateBefore =
+                    mapping.values().stream().anyMatch(v -> 
UPDATE_BEFORE.equals(v.trim()));
+            final boolean hasUpdateAfter =
+                    mapping.values().stream().anyMatch(v -> 
UPDATE_AFTER.equals(v.trim()));
+            if (hasUpdateAfter && !hasUpdateBefore) {
+                return callContext.fail(
+                        throwOnFailure,
+                        "The 'op_mapping' must include UPDATE_BEFORE for 
retract mode. "
+                                + "Upsert mode (without UPDATE_BEFORE) is not 
supported "
+                                + "in this version.");
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Validates op_mapping values. Values must be valid Flink change 
operation names. Each name
+     * must appear at most once across all entries.
+     */
+    private static Optional<List<DataType>> validateOpMappingValues(
+            final CallContext callContext,
+            final Map<String, String> opMapping,
+            final boolean throwOnFailure) {
+        final Set<String> allRowKindsSeen = new HashSet<>();
+
+        for (final String value : opMapping.values()) {
+            final String rowKindName = value.trim();
+            if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) {
+                return callContext.fail(
+                        throwOnFailure,
+                        String.format(
+                                "Invalid target mapping for argument 
'op_mapping'. "
+                                        + "Unknown change operation: '%s'. 
Valid values are: %s.",
+                                rowKindName, VALID_ROW_KIND_NAMES));
+            }
+            final boolean isDuplicate = !allRowKindsSeen.add(rowKindName);
+            if (isDuplicate) {
+                return callContext.fail(
+                        throwOnFailure,
+                        String.format(
+                                "Invalid target mapping for argument 
'op_mapping'. "
+                                        + "Duplicate change operation: '%s'. "
+                                        + "Use comma-separated keys to map 
multiple codes to the same operation "
+                                        + "(e.g., MAP['c, r', 'INSERT']).",
+                                rowKindName));
+            }
+        }
+        return Optional.empty();
+    }
+
+    private static String resolveOpColumnName(final CallContext callContext) {
+        return callContext
+                .getArgumentValue(1, ColumnList.class)
+                .filter(cl -> !cl.getNames().isEmpty())
+                .map(cl -> cl.getNames().get(0))
+                .orElse(DEFAULT_OP_COLUMN_NAME);
+    }
+
+    private static List<Field> buildOutputFields(
+            final TableSemantics tableSemantics, final String opColumnName) {
+        final List<Field> inputFields = 
DataType.getFields(tableSemantics.dataType());
+
+        // Exclude the op column (becomes RowKind), keep all other columns
+        return inputFields.stream()
+                .filter(f -> !f.getName().equals(opColumnName))
+                .collect(Collectors.toList());
+    }
+
+    private FromChangelogTypeStrategy() {}
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index 4455b083ce2..b4cd96e0349 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -126,6 +126,10 @@ public final class SpecificInputTypeStrategies {
     public static final InputTypeStrategy TO_CHANGELOG_INPUT_TYPE_STRATEGY =
             ToChangelogTypeStrategy.INPUT_TYPE_STRATEGY;
 
+    /** Input strategy for {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}. 
*/
+    public static final InputTypeStrategy FROM_CHANGELOG_INPUT_TYPE_STRATEGY =
+            FromChangelogTypeStrategy.INPUT_TYPE_STRATEGY;
+
     /** See {@link ExtractInputTypeStrategy}. */
     public static final InputTypeStrategy EXTRACT = new 
ExtractInputTypeStrategy();
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
index c1b854cad7c..06b6cbb2d3b 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
@@ -202,6 +202,10 @@ public final class SpecificTypeStrategies {
     public static final TypeStrategy TO_CHANGELOG_OUTPUT_TYPE_STRATEGY =
             ToChangelogTypeStrategy.OUTPUT_TYPE_STRATEGY;
 
+    /** Type strategy specific for {@link 
BuiltInFunctionDefinitions#FROM_CHANGELOG}. */
+    public static final TypeStrategy FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY =
+            FromChangelogTypeStrategy.OUTPUT_TYPE_STRATEGY;
+
     private SpecificTypeStrategies() {
         // no instantiation
     }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ValidationOnlyInputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ValidationOnlyInputTypeStrategy.java
new file mode 100644
index 00000000000..0744076c164
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ValidationOnlyInputTypeStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.StaticArgument;
+
+import java.util.List;
+
+/**
+ * Base class for input type strategies that only perform validation. Argument 
count and signatures
+ * are handled by {@link StaticArgument}s in the function definition.
+ *
+ * <p>Subclasses only need to implement {@link #inferInputTypes} for custom 
validation logic.
+ */
+@Internal
+public abstract class ValidationOnlyInputTypeStrategy implements 
InputTypeStrategy {
+
+    @Override
+    public ArgumentCount getArgumentCount() {
+        return ConstantArgumentCount.any();
+    }
+
+    @Override
+    public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
+        return List.of(Signature.of());
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
new file mode 100644
index 00000000000..c059598df66
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
+import org.apache.flink.table.types.inference.utils.TableSemanticsMock;
+import org.apache.flink.types.ColumnList;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.FROM_CHANGELOG_INPUT_TYPE_STRATEGY;
+
+/** Tests for {@link FromChangelogTypeStrategy#INPUT_TYPE_STRATEGY}. */
+class FromChangelogInputTypeStrategyTest extends InputTypeStrategiesTestBase {
+
+    private static final DataType TABLE_TYPE =
+            DataTypes.ROW(
+                    DataTypes.FIELD("id", DataTypes.INT()),
+                    DataTypes.FIELD("op", DataTypes.STRING()),
+                    DataTypes.FIELD("name", DataTypes.STRING()));
+
+    private static final DataType DESCRIPTOR_TYPE = DataTypes.DESCRIPTOR();
+
+    private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING());
+
+    @Override
+    protected Stream<TestSpec> testData() {
+        return Stream.of(
+                // Valid: custom mapping with all change operations
+                TestSpec.forStrategy(
+                                "Valid with custom mapping", 
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(
+                                2,
+                                Map.of(
+                                        "c", "INSERT",
+                                        "ub", "UPDATE_BEFORE",
+                                        "ua", "UPDATE_AFTER",
+                                        "d", "DELETE"))
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE),
+
+                // Valid: retract-style mapping with UPDATE_BEFORE
+                TestSpec.forStrategy("Valid with UPDATE_BEFORE", 
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(
+                                2,
+                                Map.of(
+                                        "c", "INSERT",
+                                        "ub", "UPDATE_BEFORE",
+                                        "ua", "UPDATE_AFTER",
+                                        "d", "DELETE"))
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE),
+
+                // Error: op column not found
+                TestSpec.forStrategy(
+                                "Op column not found in schema", 
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, 
ColumnList.of(List.of("nonexistent")))
+                        .expectErrorMessage("The op column 'nonexistent' does 
not exist"),
+
+                // Error: op column is not STRING
+                TestSpec.forStrategy("Op column wrong type", 
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(
+                                DataTypes.ROW(
+                                        DataTypes.FIELD("id", DataTypes.INT()),
+                                        DataTypes.FIELD("op", DataTypes.INT()),
+                                        DataTypes.FIELD("name", 
DataTypes.STRING())),
+                                DESCRIPTOR_TYPE)
+                        .calledWithTableSemanticsAt(
+                                0,
+                                new TableSemanticsMock(
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("id", 
DataTypes.INT()),
+                                                DataTypes.FIELD("op", 
DataTypes.INT()),
+                                                DataTypes.FIELD("name", 
DataTypes.STRING()))))
+                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .expectErrorMessage("must be of STRING type"),
+
+                // Error: multi-column descriptor
+                TestSpec.forStrategy(
+                                "Descriptor with multiple columns",
+                                FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of(List.of("a", 
"b")))
+                        .expectErrorMessage("must contain exactly one column 
name"),
+
+                // Error: invalid RowKind in op_mapping value
+                TestSpec.forStrategy(
+                                "Invalid RowKind in mapping value",
+                                FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(2, Map.of("c", "INVALID_KIND"))
+                        .expectErrorMessage("Unknown change operation: 
'INVALID_KIND'"),
+
+                // Error: duplicate RowKind across entries
+                TestSpec.forStrategy(
+                                "Duplicate RowKind in mapping values",
+                                FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(2, Map.of("c", "INSERT", "r", 
"INSERT"))
+                        .expectErrorMessage("Duplicate change operation: 
'INSERT'"),
+
+                // Valid: INSERT-only mapping (append mode, no updates)
+                TestSpec.forStrategy(
+                                "Valid INSERT-only mapping", 
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(2, Map.of("c, r", "INSERT"))
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE),
+
+                // Valid: INSERT + DELETE mapping (no updates)
+                TestSpec.forStrategy(
+                                "Valid INSERT and DELETE mapping",
+                                FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(2, Map.of("c", "INSERT", "d", 
"DELETE"))
+                        .expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE),
+
+                // Error: UPDATE_AFTER without UPDATE_BEFORE not supported
+                TestSpec.forStrategy(
+                                "UPDATE_AFTER requires UPDATE_BEFORE",
+                                FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
+                        .calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, 
MAP_TYPE)
+                        .calledWithTableSemanticsAt(0, new 
TableSemanticsMock(TABLE_TYPE))
+                        .calledWithLiteralAt(1, ColumnList.of(List.of("op")))
+                        .calledWithLiteralAt(
+                                2,
+                                Map.of(
+                                        "c", "INSERT",
+                                        "u", "UPDATE_AFTER",
+                                        "d", "DELETE"))
+                        .expectErrorMessage("must include UPDATE_BEFORE"));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 56c9cb1262a..b6767e68ac6 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import 
org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
 import org.apache.flink.table.connector.ChangelogMode
-import org.apache.flink.table.functions.ChangelogFunction
+import org.apache.flink.table.functions.{BuiltInFunctionDefinition, 
ChangelogFunction}
 import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext
 import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, 
RexTableArgCall}
 import org.apache.flink.table.planner.plan.`trait`._
@@ -1711,16 +1711,36 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         val changelogContext =
           toPtfChangelogContext(process, inputChangelogModes, 
requiredChangelogMode)
         val changelogMode = 
changelogFunction.getChangelogMode(changelogContext)
-        if (!changelogMode.containsOnly(RowKind.INSERT)) {
-          verifyPtfTableArgsForUpdates(call)
-        }
+        verifyPtfTableArgsForUpdates(call, changelogMode)
+        toTraitSet(changelogMode)
+      case builtIn: BuiltInFunctionDefinition if 
builtIn.getChangelogModeStrategy.isPresent =>
+        val inputChangelogModes = children.map(toChangelogMode(_, None, None))
+        val changelogContext =
+          toPtfChangelogContext(process, inputChangelogModes, 
requiredChangelogMode)
+        val changelogMode =
+          
builtIn.getChangelogModeStrategy.get().inferChangelogMode(changelogContext)
+        verifyPtfTableArgsForUpdates(call, changelogMode)
         toTraitSet(changelogMode)
       case _ =>
         defaultTraitSet
     }
   }
 
-  private def verifyPtfTableArgsForUpdates(call: RexCall): Unit = {
+  /**
+   * Verifies that PTFs with upsert output (without UPDATE_BEFORE) use set 
semantics.
+   *
+   * Retract mode (with UPDATE_BEFORE) is self-describing — each update 
carries either the old and
+   * new value, so downstream can process it without a key. Row semantics is 
safe.
+   *
+   * Upsert mode (without UPDATE_BEFORE) requires a key to look up previous 
values, so set semantics
+   * with PARTITION BY is required.
+   */
+  private def verifyPtfTableArgsForUpdates(call: RexCall, changelogMode: 
ChangelogMode): Unit = {
+    if (
+      changelogMode.containsOnly(RowKind.INSERT) || 
changelogMode.contains(RowKind.UPDATE_BEFORE)
+    ) {
+      return
+    }
     StreamPhysicalProcessTableFunction
       .getProvidedInputArgs(call)
       .map(_.e)
@@ -1728,7 +1748,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         tableArg =>
           if (tableArg.is(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) {
             throw new ValidationException(
-              s"PTFs that take table arguments with row semantics don't 
support updating output. " +
+              s"PTFs that take table arguments with row semantics don't 
support upsert output. " +
                 s"Table argument '${tableArg.getName}' of function 
'${call.getOperator.toString}' " +
                 s"must use set semantics.")
           }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
new file mode 100644
index 00000000000..022a8d75450
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.List;
+
+/** Semantic tests for the built-in FROM_CHANGELOG process table function. */
+public class FromChangelogSemanticTests extends SemanticTestBase {
+
+    @Override
+    protected void applyDefaultEnvironmentOptions(TableConfig config) {
+        super.applyDefaultEnvironmentOptions(config);
+        config.set(
+                
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+                OptimizerConfigOptions.NonDeterministicUpdateStrategy.IGNORE);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return List.of(
+                FromChangelogTestPrograms.DEFAULT_OP_MAPPING,
+                FromChangelogTestPrograms.CUSTOM_OP_MAPPING,
+                FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED,
+                FromChangelogTestPrograms.CUSTOM_OP_NAME,
+                FromChangelogTestPrograms.TABLE_API_DEFAULT,
+                FromChangelogTestPrograms.ROUND_TRIP);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
new file mode 100644
index 00000000000..d5554b12124
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/** {@link TableTestProgram} definitions for testing the built-in 
FROM_CHANGELOG PTF. */
+public class FromChangelogTestPrograms {
+
+    private static final String[] SIMPLE_CDC_SCHEMA = {"id INT", "op STRING", 
"name STRING"};
+
+    // 
--------------------------------------------------------------------------------------------
+    // SQL tests
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram DEFAULT_OP_MAPPING =
+            TableTestProgram.of(
+                            "from-changelog-default-op-mapping",
+                            "default mapping with standard op names")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema(SIMPLE_CDC_SCHEMA)
+                                    .producedValues(
+                                            Row.of(1, "INSERT", "Alice"),
+                                            Row.of(2, "INSERT", "Bob"),
+                                            Row.of(1, "UPDATE_BEFORE", 
"Alice"),
+                                            Row.of(1, "UPDATE_AFTER", 
"Alice2"),
+                                            Row.of(2, "DELETE", "Bob"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
1, "Alice"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
1, "Alice2"),
+                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream)")
+                    .build();
+
+    public static final TableTestProgram CUSTOM_OP_MAPPING =
+            TableTestProgram.of(
+                            "from-changelog-custom-op-mapping",
+                            "custom op_mapping with comma-separated keys")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema(SIMPLE_CDC_SCHEMA)
+                                    .producedValues(
+                                            Row.of(1, "c", "Alice"),
+                                            Row.of(2, "r", "Bob"),
+                                            Row.of(1, "ub", "Alice"),
+                                            Row.of(1, "ua", "Alice2"),
+                                            Row.of(2, "d", "Bob"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
1, "Alice"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
1, "Alice2"),
+                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream, "
+                                    + "op_mapping => MAP['c, r', 'INSERT', 
'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])")
+                    .build();
+
+    public static final TableTestProgram UNMAPPED_CODES_DROPPED =
+            TableTestProgram.of(
+                            "from-changelog-unmapped-codes-dropped",
+                            "unmapped op codes are silently dropped")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema(SIMPLE_CDC_SCHEMA)
+                                    .producedValues(
+                                            Row.of(1, "INSERT", "Alice"),
+                                            Row.of(2, "INSERT", "Bob"),
+                                            Row.of(1, "UNKNOWN", "Alice2"),
+                                            Row.of(2, "DELETE", "Bob"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"),
+                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream)")
+                    .build();
+
+    /** Custom op column name via DESCRIPTOR. */
+    public static final TableTestProgram CUSTOM_OP_NAME =
+            TableTestProgram.of(
+                            "from-changelog-custom-op-name", "custom op column 
name via DESCRIPTOR")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema("id INT", "operation STRING", 
"name STRING")
+                                    .producedValues(
+                                            Row.of(1, "INSERT", "Alice"),
+                                            Row.of(1, "UPDATE_BEFORE", 
"Alice"),
+                                            Row.of(1, "UPDATE_AFTER", 
"Alice2"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
1, "Alice"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
1, "Alice2"))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream, "
+                                    + "op => DESCRIPTOR(operation))")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Table API test
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram TABLE_API_DEFAULT =
+            TableTestProgram.of(
+                            "from-changelog-table-api-default",
+                            "Table.fromChangelog() convenience method")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema(SIMPLE_CDC_SCHEMA)
+                                    .producedValues(
+                                            Row.of(1, "INSERT", "Alice"),
+                                            Row.of(2, "INSERT", "Bob"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"))
+                                    .build())
+                    .runTableApi(env -> 
env.from("cdc_stream").fromChangelog(), "sink")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Round-trip test: FROM_CHANGELOG(TO_CHANGELOG(table))
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Verifies that FROM_CHANGELOG(TO_CHANGELOG(table)) recovers the 
original dynamic table. */
+    public static final TableTestProgram ROUND_TRIP =
+            TableTestProgram.of(
+                            "from-changelog-round-trip",
+                            "FROM_CHANGELOG(TO_CHANGELOG(table)) recovers 
original table")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("orders")
+                                    .addSchema("id INT", "name STRING")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
1, "Alice"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
1, "Alice2"),
+                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
1, "Alice"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
1, "Alice2"),
+                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
+                                    .build())
+                    .setupSql(
+                            "CREATE VIEW changelog_view AS "
+                                    + "SELECT * FROM TO_CHANGELOG(input => 
TABLE orders)")
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE changelog_view)")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index 13ca08280b6..be6902a0b17 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -953,6 +953,21 @@ public class ProcessTableFunctionTestUtils {
             collectUpdate(ctx, r);
         }
 
+        @Override
+        public ChangelogMode getChangelogMode(ChangelogContext 
changelogContext) {
+            return ChangelogMode.upsert(false);
+        }
+    }
+
+    /** Testing function that uses row semantics with retract mode (valid). */
+    public static class UpdatingRetractRowSemanticFunction
+            extends ChangelogProcessTableFunctionBase {
+        public void eval(
+                Context ctx,
+                @ArgumentHint({ROW_SEMANTIC_TABLE, SUPPORT_UPDATES, 
REQUIRE_UPDATE_BEFORE}) Row r) {
+            collectUpdate(ctx, r);
+        }
+
         @Override
         public ChangelogMode getChangelogMode(ChangelogContext 
changelogContext) {
             return ChangelogMode.all();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
new file mode 100644
index 00000000000..33e163d9f41
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Plan tests for the FROM_CHANGELOG built-in process table function. Uses 
{@link
+ * ExplainDetail#CHANGELOG_MODE} to verify changelog mode propagation through 
the plan.
+ */
+public class FromChangelogTest extends TableTestBase {
+
+    private static final List<ExplainDetail> CHANGELOG_MODE =
+            Collections.singletonList(ExplainDetail.CHANGELOG_MODE);
+
+    private TableTestUtil util;
+
+    @BeforeEach
+    void setup() {
+        util = streamTestUtil(TableConfig.getDefault());
+    }
+
+    @Test
+    void testInsertOnlySource() {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE cdc_stream ("
+                                + "  id INT,"
+                                + "  op STRING,"
+                                + "  name STRING"
+                                + ") WITH ('connector' = 'values')");
+        util.verifyRelPlan(
+                "SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream)", 
CHANGELOG_MODE);
+    }
+
+    @Test
+    void testCustomOpMapping() {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE cdc_stream ("
+                                + "  id INT,"
+                                + "  __op STRING,"
+                                + "  name STRING"
+                                + ") WITH ('connector' = 'values')");
+        util.verifyRelPlan(
+                "SELECT * FROM FROM_CHANGELOG("
+                        + "input => TABLE cdc_stream, "
+                        + "op => DESCRIPTOR(__op), "
+                        + "op_mapping => MAP['c, r', 'INSERT', 'ub', 
'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])",
+                CHANGELOG_MODE);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
index c00f76fbb66..17a303b93f8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.stream.sql;
 
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.ProcessTableFunction;
@@ -42,6 +43,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTablePassThroughFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TypedRowSemanticTableFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.TypedSetSemanticTableFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.UpdatingRetractRowSemanticFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.UpdatingUpsertFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.User;
 import org.apache.flink.table.planner.utils.TableTestBase;
@@ -60,6 +62,7 @@ import java.util.EnumSet;
 import java.util.Optional;
 import java.util.stream.Stream;
 
+import static java.util.Collections.singletonList;
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static 
org.apache.flink.table.annotation.ArgumentTrait.OPTIONAL_PARTITION_BY;
 import static 
org.apache.flink.table.annotation.ArgumentTrait.PASS_COLUMNS_THROUGH;
@@ -282,6 +285,13 @@ public class ProcessTableFunctionTest extends 
TableTestBase {
                         .addInsertSql("INSERT INTO t_sink SELECT * FROM f(r => 
TABLE t, i => 42)"));
     }
 
+    @Test
+    void testRetractModeWithRowSemantics() {
+        util.addTemporarySystemFunction("f", 
UpdatingRetractRowSemanticFunction.class);
+        util.verifyRelPlan(
+                "SELECT * FROM f(r => TABLE t)", 
singletonList(ExplainDetail.CHANGELOG_MODE));
+    }
+
     @ParameterizedTest
     @MethodSource("errorSpecs")
     void testErrorBehavior(ErrorSpec spec) {
@@ -455,10 +465,10 @@ public class ProcessTableFunctionTest extends 
TableTestBase {
                         "SELECT * FROM f()",
                         "Table arguments must not be optional."),
                 ErrorSpec.ofSelect(
-                        "no changelog support for tables with row semantics",
+                        "no upsert support for tables with row semantics",
                         InvalidUpdatingSemanticsFunction.class,
                         "SELECT * FROM f(r => TABLE t)",
-                        "PTFs that take table arguments with row semantics 
don't support updating output. "
+                        "PTFs that take table arguments with row semantics 
don't support upsert output. "
                                 + "Table argument 'r' of function 'f' must use 
set semantics."),
                 ErrorSpec.ofSelect(
                         "on_time is not supported on updating output",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
new file mode 100644
index 00000000000..2eed936fc6e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testCustomOpMapping">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream, op => 
DESCRIPTOR(__op), op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 
'ua', 'UPDATE_AFTER', 'd', 'DELETE'])]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], name=[$1])
++- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET 
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16 [...]
+   +- LogicalProject(id=[$0], __op=[$1], name=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
cdc_stream]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET 
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'DELE [...]
++- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]], 
fields=[id, __op, name], changelogMode=[I])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testInsertOnlySource">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(id=[$0], name=[$1])
++- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, 
VARCHAR(2147483647) name)])
+   +- LogicalProject(id=[$0], op=[$1], name=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
cdc_stream]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[id,name], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)], 
changelogMode=[I,UB,UA,D])
++- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]], 
fields=[id, op, name], changelogMode=[I])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
index d89f766b84a..f0f340f4eb7 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ProcessTableFunctionTest.xml
@@ -146,6 +146,26 @@ Calc(select=[out, rowtime])
    +- Exchange(distribution=[hash[name]])
       +- WatermarkAssigner(rowtime=[ts], watermark=[ts])
          +- TableSourceScan(table=[[default_catalog, default_database, 
t_watermarked]], fields=[name, score, ts])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRetractModeWithRowSemantics">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM f(r => TABLE t)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(name=[$0], count=[$1], mode=[$2])
++- LogicalTableFunctionScan(invocation=[f(TABLE(#0), DEFAULT(), DEFAULT())], 
rowType=[RecordType(VARCHAR(2147483647) name, BIGINT count, VARCHAR(2147483647) 
mode)])
+   +- LogicalProject(name=[$0], score=[$1])
+      +- LogicalProject(name=[$0], score=[$1])
+         +- LogicalValues(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 
42 }]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[f(TABLE(#0), DEFAULT(), DEFAULT())], 
uid=[null], select=[name,count,mode], rowType=[RecordType(VARCHAR(2147483647) 
name, BIGINT count, VARCHAR(2147483647) mode)], changelogMode=[I,UB,UA,D])
++- Values(tuples=[[{ _UTF-16LE'Bob', 12 }, { _UTF-16LE'Alice', 42 }]], 
changelogMode=[I])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
new file mode 100644
index 00000000000..38249aa02bd
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.ptf;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.ColumnList;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Runtime implementation of {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}.
+ *
+ * <p>Converts each append-only input row (which contains an operation code 
column) back into a
+ * changelog stream with proper {@link RowKind} annotations. The output schema 
excludes the
+ * operation code column and partition key columns (which are prepended by the 
framework
+ * automatically).
+ *
+ * <p>This is the reverse operation of {@link ToChangelogFunction}.
+ */
+@Internal
+public class FromChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final String DEFAULT_OP_COLUMN_NAME = "op";
+    private static final Map<String, RowKind> DEFAULT_OP_MAPPING =
+            Map.of(
+                    "INSERT", RowKind.INSERT,
+                    "UPDATE_BEFORE", RowKind.UPDATE_BEFORE,
+                    "UPDATE_AFTER", RowKind.UPDATE_AFTER,
+                    "DELETE", RowKind.DELETE);
+
+    private final Map<String, RowKind> rawOpMap;
+    private final int opColumnIndex;
+    private final int[] outputIndices;
+
+    private transient HashMap<StringData, RowKind> opMap;
+    private transient ProjectedRowData projectedOutput;
+
+    public FromChangelogFunction(final SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.FROM_CHANGELOG, context);
+        final CallContext callContext = context.getCallContext();
+
+        final TableSemantics tableSemantics =
+                callContext
+                        .getTableSemantics(0)
+                        .orElseThrow(() -> new IllegalStateException("Table 
argument expected."));
+
+        final RowType inputType = (RowType) 
tableSemantics.dataType().getLogicalType();
+        final String opColumnName = resolveOpColumnName(callContext);
+        this.opColumnIndex = inputType.getFieldNames().indexOf(opColumnName);
+
+        // Exclude only the op column from output — all other columns pass 
through
+        this.outputIndices =
+                IntStream.range(0, inputType.getFieldCount())
+                        .filter(i -> i != opColumnIndex)
+                        .toArray();
+
+        this.rawOpMap = buildOpMap(callContext);
+    }
+
+    @Override
+    public void open(final FunctionContext context) throws Exception {
+        super.open(context);
+        opMap = new HashMap<>();
+        rawOpMap.forEach((code, kind) -> 
opMap.put(StringData.fromString(code), kind));
+        projectedOutput = ProjectedRowData.from(outputIndices);
+    }
+
+    private static String resolveOpColumnName(final CallContext callContext) {
+        return callContext
+                .getArgumentValue(1, ColumnList.class)
+                .map(cl -> cl.getNames().get(0))
+                .orElse(DEFAULT_OP_COLUMN_NAME);
+    }
+
+    /**
+     * Builds a String-to-RowKind map. Keys in the provided mapping may be 
comma-separated (e.g.,
+     * "INSERT, UPDATE_AFTER") to map multiple input codes to the same RowKind.
+     */
+    private static Map<String, RowKind> buildOpMap(CallContext callContext) {
+        return callContext
+                .getArgumentValue(2, Map.class)
+                .map(FromChangelogFunction::parseOpMapping)
+                .orElse(DEFAULT_OP_MAPPING);
+    }
+
+    private static Map<String, RowKind> parseOpMapping(Map<String, String> 
opMapping) {
+        return opMapping.entrySet().stream()
+                .flatMap(
+                        e -> {
+                            final RowKind kind = 
RowKind.valueOf(e.getValue().trim());
+                            return Arrays.stream(e.getKey().split(","))
+                                    .map(code -> Map.entry(code.trim(), kind));
+                        })
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+
+    public void eval(
+            final Context ctx,
+            final RowData input,
+            @Nullable final ColumnList op,
+            @Nullable final MapData opMapping) {
+        final StringData opCode = input.getString(opColumnIndex);
+        final RowKind rowKind = opMap.get(opCode);
+        if (rowKind == null) {
+            return;
+        }
+
+        projectedOutput.replaceRow(input);
+        projectedOutput.setRowKind(rowKind);
+        collect(projectedOutput);
+    }
+}


Reply via email to