This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bdb4f71fcba [FLINK-39259][table] Support basic TO_CHANGELOG built-in 
process table function
bdb4f71fcba is described below

commit bdb4f71fcba92a59e5f74f9e2362063328343e88
Author: Gustavo de Morais <[email protected]>
AuthorDate: Fri Mar 20 09:18:24 2026 +0100

    [FLINK-39259][table] Support basic TO_CHANGELOG built-in process table 
function
    
    This closes #27777.
---
 .../docs/sql/reference/queries/changelog.md        | 138 ++++++++
 .../apache/flink/table/api/PartitionedTable.java   |  28 ++
 .../apache/flink/table/api/internal/TableImpl.java |   5 +
 .../functions/BuiltInFunctionDefinitions.java      |  27 ++
 .../strategies/SpecificInputTypeStrategies.java    |   4 +
 .../strategies/SpecificTypeStrategies.java         |   4 +
 .../strategies/ToChangelogTypeStrategy.java        | 171 ++++++++++
 .../inference/CallBindingCallContext.java          |  58 +++-
 .../inference/OperatorBindingCallContext.java      |  55 +++-
 .../exec/stream/ToChangelogSemanticTests.java      |  53 +++
 .../nodes/exec/stream/ToChangelogTestPrograms.java | 365 +++++++++++++++++++++
 .../planner/plan/stream/sql/ToChangelogTest.java   |  93 ++++++
 .../planner/plan/stream/sql/ToChangelogTest.xml    |  57 ++++
 .../flink/table/planner/utils/TableTestBase.scala  |   5 +
 .../functions/ptf/BuiltInProcessTableFunction.java | 133 ++++++++
 .../runtime/functions/ptf/ToChangelogFunction.java | 135 ++++++++
 16 files changed, 1314 insertions(+), 17 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
new file mode 100644
index 00000000000..d7e6e29668a
--- /dev/null
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -0,0 +1,138 @@
+---
+title: "Changelog Conversion"
+weight: 8
+type: docs
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Changelog Conversion
+
+{{< label Streaming >}}
+
+Flink SQL provides built-in process table functions (PTFs) for working with 
changelog streams.
+
+| Function | Description |
+|:---------|:------------|
+| [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only 
table with explicit operation codes |
+
+<!-- Placeholder for future FROM_CHANGELOG function -->
+
+## TO_CHANGELOG
+
+The `TO_CHANGELOG` PTF converts a dynamic table (i.e. an updating table) into 
an append-only table with an explicit operation code column. Each input row - 
regardless of its original `RowKind` (INSERT, UPDATE_BEFORE, UPDATE_AFTER, 
DELETE) - is emitted as an INSERT-only row with a string column indicating the 
original operation.
+
+This is useful when you need to materialize changelog events into a downstream 
system that only supports appends (e.g., a message queue, log store, or 
append-only file sink). It is also useful to filter out certain types of 
updates, for example DELETEs.
+
+### Syntax
+
+```sql
+SELECT * FROM TO_CHANGELOG(
+  input => TABLE source_table PARTITION BY key_col,
+  [op => DESCRIPTOR(op_column_name),]
+  [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
+)
+```
+
+### Parameters
+
+| Parameter    | Required | Description |
+|:-------------|:---------|:------------|
+| `input`      | Yes      | The input table. Must include `PARTITION BY` for 
parallel execution. Accepts insert-only, retract, and upsert tables. |
+| `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. |
+| `op_mapping` | No       | A `MAP<STRING, STRING>` mapping `RowKind` names to 
custom output codes. When provided, only mapped RowKinds are forwarded - 
unmapped events are dropped. |
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, all four RowKinds are mapped to their standard 
names:
+
+| RowKind         | Output value      |
+|:----------------|:------------------|
+| INSERT          | `'INSERT'`        |
+| UPDATE_BEFORE   | `'UPDATE_BEFORE'` |
+| UPDATE_AFTER    | `'UPDATE_AFTER'`  |
+| DELETE          | `'DELETE'`        |
+
+### Output Schema
+
+The output columns are ordered as:
+
+```
+[partition_key_columns, op_column, remaining_columns]
+```
+
+All output rows have `INSERT` - the table is always append-only.
+
+### Examples
+
+#### Basic usage
+
+```sql
+-- Input: retract table from an aggregation
+-- +I[id:1, name:'Alice', cnt:1]
+-- +U[id:1, name:'Alice', cnt:2]
+-- -D[id:2, name:'Bob',   cnt:1]
+
+SELECT * FROM TO_CHANGELOG(
+  input => TABLE my_aggregation PARTITION BY id
+)
+
+-- Output (append-only):
+-- +I[id:1, op:'INSERT',       name:'Alice', cnt:1]
+-- +I[id:1, op:'UPDATE_AFTER', name:'Alice', cnt:2]
+-- +I[id:2, op:'DELETE',       name:'Bob',   cnt:1]
+```
+
+#### Custom operation column name
+
+```sql
+SELECT * FROM TO_CHANGELOG(
+  input => TABLE my_aggregation PARTITION BY id,
+  op => DESCRIPTOR(operation)
+)
+-- The op column is now named 'operation' instead of 'op'
+```
+
+#### Custom operation codes with filtering
+
+```sql
+SELECT * FROM TO_CHANGELOG(
+  input => TABLE my_aggregation PARTITION BY id,
+  op => DESCRIPTOR(op_code),
+  op_mapping => MAP['INSERT', 'I', 'UPDATE_AFTER', 'U']
+)
+-- Only INSERT and UPDATE_AFTER events are forwarded
+-- DELETE events are dropped (not in the mapping)
+-- op_code values are 'I' and 'U' instead of full names
+```
+
+#### Table API
+
+```java
+// Default: adds 'op' column and supports all changelog modes
+Table result = myTable.partitionBy($("id")).toChangelog();
+
+// With custom parameters
+Table result = myTable.partitionBy($("id")).toChangelog(
+    descriptor("op_code").asArgument("op"),
+    map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
+);
+```
+
+{{< top >}}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
index 32180e51fb9..8d5f1c91b28 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.annotation.ArgumentTrait;
+import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.functions.ProcessTableFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
 
@@ -138,4 +139,31 @@ public interface PartitionedTable {
      * @see ProcessTableFunction
      */
     Table process(Class<? extends UserDefinedFunction> function, Object... 
arguments);
+
+    /**
+     * Converts this dynamic table into an append-only table with an explicit 
operation code column
+     * using the built-in {@code TO_CHANGELOG} process table function.
+     *
+     * <p>Each input row - regardless of its original RowKind - is emitted as 
an INSERT-only row
+     * with a string {@code "op"} column indicating the original operation 
(INSERT, UPDATE_AFTER,
+     * DELETE, etc.).
+     *
+     * <p>Optional arguments can be passed using named expressions:
+     *
+     * <pre>{@code
+     * // Default: adds 'op' column and supports all changelog modes
+     * table.partitionBy($("id")).toChangelog();
+     *
+     * // Custom op column name and mapping
+     * table.partitionBy($("id")).toChangelog(
+     *     descriptor("op_code").asArgument("op"),
+     *     map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
+     * );
+     * }</pre>
+     *
+     * @param arguments optional named arguments for {@code op} and {@code 
op_mapping}
+     * @return an append-only {@link Table} with an {@code op} column 
prepended to the non-partition
+     *     columns
+     */
+    Table toChangelog(Expression... arguments);
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 581dadba4da..335f28f2f8e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -901,6 +901,11 @@ public class TableImpl implements Table {
                             createPartitionQueryOperation(), 
table.tableEnvironment, arguments));
         }
 
+        @Override
+        public Table toChangelog(Expression... arguments) {
+            return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(), 
(Object[]) arguments);
+        }
+
         private QueryOperation createPartitionQueryOperation() {
             return table.operationTreeBuilder.partition(partitionKeys, 
table.operationTree);
         }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 6685be8fdbd..b03988321e1 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -109,12 +109,14 @@ import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTyp
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.INDEX;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ML_PREDICT_INPUT_TYPE_STRATEGY;
+import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TO_CHANGELOG_INPUT_TYPE_STRATEGY;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_EQUALS_COMPARABLE;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.TWO_FULLY_COMPARABLE;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentage;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentageArray;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ARRAY_APPEND_PREPEND;
 import static 
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ML_PREDICT_OUTPUT_TYPE_STRATEGY;
+import static 
org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.TO_CHANGELOG_OUTPUT_TYPE_STRATEGY;
 
 /** Dictionary of function definitions for all built-in functions. */
 @PublicEvolving
@@ -755,6 +757,31 @@ public final class BuiltInFunctionDefinitions {
                     .runtimeProvided()
                     .build();
 
+    public static final BuiltInFunctionDefinition TO_CHANGELOG =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("TO_CHANGELOG")
+                    .kind(PROCESS_TABLE)
+                    .staticArguments(
+                            StaticArgument.table(
+                                    "input",
+                                    Row.class,
+                                    false,
+                                    EnumSet.of(
+                                            StaticArgumentTrait.TABLE,
+                                            
StaticArgumentTrait.SET_SEMANTIC_TABLE,
+                                            
StaticArgumentTrait.SUPPORT_UPDATES,
+                                            
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)),
+                            StaticArgument.scalar("op", 
DataTypes.DESCRIPTOR(), true),
+                            StaticArgument.scalar(
+                                    "op_mapping",
+                                    DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()),
+                                    true))
+                    .inputTypeStrategy(TO_CHANGELOG_INPUT_TYPE_STRATEGY)
+                    .outputTypeStrategy(TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.ptf.ToChangelogFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition GREATEST =
             BuiltInFunctionDefinition.newBuilder()
                     .name("GREATEST")
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index 9206985aa13..4455b083ce2 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -122,6 +122,10 @@ public final class SpecificInputTypeStrategies {
     public static final InputTypeStrategy ML_PREDICT_INPUT_TYPE_STRATEGY =
             MLPredictTypeStrategy.ML_PREDICT_INPUT_TYPE_STRATEGY;
 
+    /** Input strategy for {@link BuiltInFunctionDefinitions#TO_CHANGELOG}. */
+    public static final InputTypeStrategy TO_CHANGELOG_INPUT_TYPE_STRATEGY =
+            ToChangelogTypeStrategy.INPUT_TYPE_STRATEGY;
+
     /** See {@link ExtractInputTypeStrategy}. */
     public static final InputTypeStrategy EXTRACT = new 
ExtractInputTypeStrategy();
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
index 79e49de78cc..c1b854cad7c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java
@@ -198,6 +198,10 @@ public final class SpecificTypeStrategies {
     public static final TypeStrategy ML_PREDICT_OUTPUT_TYPE_STRATEGY =
             MLPredictTypeStrategy.ML_PREDICT_OUTPUT_TYPE_STRATEGY;
 
+    /** Type strategy specific for {@link 
BuiltInFunctionDefinitions#TO_CHANGELOG}. */
+    public static final TypeStrategy TO_CHANGELOG_OUTPUT_TYPE_STRATEGY =
+            ToChangelogTypeStrategy.OUTPUT_TYPE_STRATEGY;
+
     private SpecificTypeStrategies() {
         // no instantiation
     }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
new file mode 100644
index 00000000000..cb04e720209
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.types.ColumnList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** Type strategies for the {@code TO_CHANGELOG} process table function. */
+@Internal
+public final class ToChangelogTypeStrategy {
+
+    private static final String DEFAULT_OP_COLUMN_NAME = "op";
+
+    private static final Set<String> VALID_ROW_KIND_NAMES =
+            Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
+
+    // 
--------------------------------------------------------------------------------------------
+    // Input validation
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final InputTypeStrategy INPUT_TYPE_STRATEGY =
+            new InputTypeStrategy() {
+                @Override
+                public ArgumentCount getArgumentCount() {
+                    return ConstantArgumentCount.between(1, 3);
+                }
+
+                @Override
+                public Optional<List<DataType>> inferInputTypes(
+                        final CallContext callContext, final boolean 
throwOnFailure) {
+                    return validateInputs(callContext, throwOnFailure);
+                }
+
+                @Override
+                public List<Signature> getExpectedSignatures(final 
FunctionDefinition definition) {
+                    return List.of(
+                            Signature.of(Argument.of("input", "TABLE")),
+                            Signature.of(
+                                    Argument.of("input", "TABLE"), 
Argument.of("op", "DESCRIPTOR")),
+                            Signature.of(
+                                    Argument.of("input", "TABLE"),
+                                    Argument.of("op", "DESCRIPTOR"),
+                                    Argument.of("op_mapping", "MAP<STRING, 
STRING>")));
+                }
+            };
+
+    // 
--------------------------------------------------------------------------------------------
+    // Output type inference
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TypeStrategy OUTPUT_TYPE_STRATEGY =
+            callContext -> {
+                final TableSemantics semantics =
+                        callContext
+                                .getTableSemantics(0)
+                                .orElseThrow(
+                                        () ->
+                                                new ValidationException(
+                                                        "First argument must 
be a table for TO_CHANGELOG."));
+
+                final String opColumnName = resolveOpColumnName(callContext);
+                final List<Field> inputFields = 
DataType.getFields(semantics.dataType());
+                final Set<Integer> partitionKeys = 
intArrayToSet(semantics.partitionByColumns());
+
+                final List<Field> outputFields = new ArrayList<>();
+                outputFields.add(DataTypes.FIELD(opColumnName, 
DataTypes.STRING()));
+                for (int i = 0; i < inputFields.size(); i++) {
+                    if (!partitionKeys.contains(i)) {
+                        outputFields.add(inputFields.get(i));
+                    }
+                }
+
+                return Optional.of(DataTypes.ROW(outputFields).notNull());
+            };
+
+    // 
--------------------------------------------------------------------------------------------
+    // Helpers
+    // 
--------------------------------------------------------------------------------------------
+
+    private static Optional<List<DataType>> validateInputs(
+            final CallContext callContext, final boolean throwOnFailure) {
+        final boolean isMissingTableArg = 
callContext.getTableSemantics(0).isEmpty();
+        if (isMissingTableArg) {
+            return callContext.fail(
+                    throwOnFailure, "First argument must be a table for 
TO_CHANGELOG.");
+        }
+
+        final Optional<ColumnList> opDescriptor = 
callContext.getArgumentValue(1, ColumnList.class);
+        final boolean hasInvalidOpDescriptor =
+                opDescriptor.isPresent() && 
opDescriptor.get().getNames().size() != 1;
+        if (hasInvalidOpDescriptor) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "The descriptor for argument 'op' must contain exactly one 
column name.");
+        }
+
+        final boolean hasMappingArgProvided = !callContext.isArgumentNull(2);
+        final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2);
+        if (hasMappingArgProvided && !isMappingArgLiteral) {
+            return callContext.fail(
+                    throwOnFailure, "The 'op_mapping' argument must be a 
constant MAP literal.");
+        }
+
+        final Optional<Map> opMapping = callContext.getArgumentValue(2, 
Map.class);
+        if (opMapping.isPresent()) {
+            final boolean hasInvalidMappingKey =
+                    opMapping.get().keySet().stream()
+                            .anyMatch(
+                                    key ->
+                                            !(key instanceof String)
+                                                    || 
!VALID_ROW_KIND_NAMES.contains(key));
+            if (hasInvalidMappingKey) {
+                return callContext.fail(
+                        throwOnFailure, "Invalid target mapping for argument 
'op_mapping'.");
+            }
+        }
+
+        return Optional.of(callContext.getArgumentDataTypes());
+    }
+
+    private static String resolveOpColumnName(final CallContext callContext) {
+        return callContext
+                .getArgumentValue(1, ColumnList.class)
+                .filter(cl -> !cl.getNames().isEmpty())
+                .map(cl -> cl.getNames().get(0))
+                .orElse(DEFAULT_OP_COLUMN_NAME);
+    }
+
+    private static Set<Integer> intArrayToSet(final int[] array) {
+        return IntStream.of(array).boxed().collect(Collectors.toSet());
+    }
+
+    private ToChangelogTypeStrategy() {}
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
index 4aac8bcea18..e5968ce21fa 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
@@ -49,7 +49,9 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import javax.annotation.Nullable;
 
 import java.util.AbstractList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -114,9 +116,10 @@ public final class CallBindingCallContext extends 
AbstractSqlCallContext {
     @Override
     public boolean isArgumentLiteral(int pos) {
         final SqlNode sqlNode = adaptedArguments.get(pos);
-        // Semantically a descriptor can be considered a literal,
-        // however, Calcite represents them as a call
-        return SqlUtil.isLiteral(sqlNode, false) || sqlNode.getKind() == 
SqlKind.DESCRIPTOR;
+        // Calcite represents DESCRIPTOR and MAP constructors as calls, not 
literals
+        return SqlUtil.isLiteral(sqlNode, false)
+                || sqlNode.getKind() == SqlKind.DESCRIPTOR
+                || isLiteralMap(sqlNode);
     }
 
     @Override
@@ -136,6 +139,8 @@ public final class CallBindingCallContext extends 
AbstractSqlCallContext {
         }
         try {
             final SqlNode sqlNode = adaptedArguments.get(pos);
+            // Calcite represents DESCRIPTOR and MAP constructors as SqlCall, 
not SqlLiteral.
+            // Handle them explicitly by extracting values from the call 
operands.
             if (sqlNode.getKind() == SqlKind.DESCRIPTOR && clazz == 
ColumnList.class) {
                 final List<SqlNode> columns = ((SqlCall) 
sqlNode).getOperandList();
                 if (columns.stream()
@@ -147,6 +152,9 @@ public final class CallBindingCallContext extends 
AbstractSqlCallContext {
                 }
                 return Optional.of((T) convertColumnList(columns));
             }
+            if (sqlNode.getKind() == SqlKind.MAP_VALUE_CONSTRUCTOR && clazz == 
Map.class) {
+                return Optional.ofNullable((T) convertMap((SqlCall) sqlNode));
+            }
             final SqlLiteral literal = SqlLiteral.unchain(sqlNode);
             return Optional.ofNullable(getLiteralValueAs(literal::getValueAs, 
clazz));
         } catch (IllegalArgumentException e) {
@@ -354,4 +362,48 @@ public final class CallBindingCallContext extends 
AbstractSqlCallContext {
                         .collect(Collectors.toList());
         return ColumnList.of(names);
     }
+
+    private static @Nullable Map<String, String> convertMap(SqlCall mapCall) {
+        final List<SqlNode> operands = mapCall.getOperandList();
+        final Map<String, String> map = new LinkedHashMap<>();
+        try {
+            for (int i = 0; i < operands.size(); i += 2) {
+                final String key = 
SqlLiteral.unchain(operands.get(i)).getValueAs(String.class);
+                final String value =
+                        SqlLiteral.unchain(operands.get(i + 
1)).getValueAs(String.class);
+                map.put(key, value);
+            }
+        } catch (Exception e) {
+            // Not all children are literals
+            return null;
+        }
+        return map;
+    }
+
+    /** A MAP constructor is a string literal if all its key-value children 
are string literals. */
+    private static boolean isLiteralMap(SqlNode sqlNode) {
+        if (sqlNode.getKind() != SqlKind.MAP_VALUE_CONSTRUCTOR) {
+            return false;
+        }
+        return ((SqlCall) sqlNode)
+                
.getOperandList().stream().allMatch(CallBindingCallContext::isStringLiteral);
+    }
+
+    /**
+     * Checks if a node is a string literal. Also handles implicit CASTs 
because Calcite
+     * automatically wraps string literals in MAP constructors when values 
have different lengths
+     * (e.g. MAP['INSERT', 'I', 'UPDATE_AFTER', 'U'] coerces 'INSERT' to 
CAST('INSERT' AS
+     * VARCHAR(12))).
+     */
+    private static boolean isStringLiteral(final SqlNode node) {
+        if (node instanceof SqlLiteral) {
+            return SqlTypeName.CHAR_TYPES.contains(((SqlLiteral) 
node).getTypeName());
+        }
+        if (node.getKind() == SqlKind.CAST && node instanceof SqlCall) {
+            final SqlNode inner = ((SqlCall) node).operand(0);
+            return inner instanceof SqlLiteral
+                    && SqlTypeName.CHAR_TYPES.contains(((SqlLiteral) 
inner).getTypeName());
+        }
+        return false;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
index cd226f9b417..f1afecd4ace 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
@@ -46,7 +46,9 @@ import org.apache.calcite.sql.SqlOperatorBinding;
 import javax.annotation.Nullable;
 
 import java.util.AbstractList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
@@ -114,7 +116,7 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
     public boolean isArgumentLiteral(int pos) {
         // Semantically a descriptor can be considered a literal,
         // however, Calcite represents them as a call
-        return binding.isOperandLiteral(pos, false) || isDescriptor(pos);
+        return binding.isOperandLiteral(pos, false) || isDescriptor(pos) || 
isMapConstructor(pos);
     }
 
     @Override
@@ -131,11 +133,14 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
         if (isArgumentNull(pos)) {
             return Optional.empty();
         }
-        // Semantically a descriptor can be considered a literal,
-        // Calcite represents them as a call
+        // Calcite represents DESCRIPTOR and MAP constructors as RexCall, not 
RexLiteral.
+        // Handle them explicitly by extracting values from the call operands.
         if (isDescriptor(pos) && clazz == ColumnList.class) {
             return Optional.ofNullable((T) convertColumnList(pos));
         }
+        if (isMapConstructor(pos) && clazz == Map.class) {
+            return Optional.ofNullable((T) convertMap(pos));
+        }
         try {
             return Optional.ofNullable(
                     getLiteralValueAs(
@@ -194,21 +199,23 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
     }
 
     private boolean isDescriptor(int pos) {
+        return isRexCallOfKind(pos, SqlKind.DESCRIPTOR);
+    }
+
+    private boolean isMapConstructor(int pos) {
+        return isRexCallOfKind(pos, SqlKind.MAP_VALUE_CONSTRUCTOR);
+    }
+
+    private boolean isRexCallOfKind(int pos, SqlKind kind) {
         if (binding instanceof RexCallBinding) {
-            final List<RexNode> operands = ((RexCallBinding) 
binding).operands();
-            final RexNode operand = operands.get(pos);
-            return operand.getKind() == SqlKind.DESCRIPTOR;
+            final RexNode operand = ((RexCallBinding) 
binding).operands().get(pos);
+            return operand.getKind() == kind;
         }
         return false;
     }
 
     private boolean isDefault(int pos) {
-        if (binding instanceof RexCallBinding) {
-            final List<RexNode> operands = ((RexCallBinding) 
binding).operands();
-            final RexNode operand = operands.get(pos);
-            return operand.getKind() == SqlKind.DEFAULT;
-        }
-        return false;
+        return isRexCallOfKind(pos, SqlKind.DEFAULT);
     }
 
     private @Nullable RexTableArgCall getTableArgCall(int pos) {
@@ -222,8 +229,7 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
 
     private ColumnList convertColumnList(int pos) {
         if (binding instanceof RexCallBinding) {
-            final List<RexNode> operands = ((RexCallBinding) 
binding).operands();
-            final RexCall call = (RexCall) operands.get(pos);
+            final RexCall call = (RexCall) ((RexCallBinding) 
binding).operands().get(pos);
             final List<String> names =
                     call.getOperands().stream()
                             .map(RexLiteral::stringValue)
@@ -233,6 +239,27 @@ public final class OperatorBindingCallContext extends 
AbstractSqlCallContext {
         return null;
     }
 
+    /**
+     * Extracts a {@code Map<String, String>} from a MAP_VALUE_CONSTRUCTOR 
RexCall whose operands
+     * are alternating key-value literal pairs.
+     */
+    private @Nullable Map<String, String> convertMap(int pos) {
+        if (!(binding instanceof RexCallBinding)) {
+            return null;
+        }
+        final RexCall call = (RexCall) ((RexCallBinding) 
binding).operands().get(pos);
+        final List<RexNode> operands = call.getOperands();
+        final Map<String, String> map = new LinkedHashMap<>();
+        for (int i = 0; i < operands.size(); i += 2) {
+            final @Nullable String key = 
RexLiteral.stringValue(operands.get(i));
+            final @Nullable String value = 
RexLiteral.stringValue(operands.get(i + 1));
+            if (key != null) {
+                map.put(key, value);
+            }
+        }
+        return map;
+    }
+
     private @Nullable StaticArgument getStaticArg(int pos) {
         final SqlOperator operator = binding.getOperator();
         if (!(operator instanceof BridgingSqlFunction)) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
new file mode 100644
index 00000000000..798044effa7
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.List;
+
+/** Semantic tests for the built-in TO_CHANGELOG process table function. */
+public class ToChangelogSemanticTests extends SemanticTestBase {
+
+    @Override
+    protected void applyDefaultEnvironmentOptions(TableConfig config) {
+        super.applyDefaultEnvironmentOptions(config);
+        config.set(
+                
OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY,
+                OptimizerConfigOptions.NonDeterministicUpdateStrategy.IGNORE);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return List.of(
+                ToChangelogTestPrograms.INSERT_ONLY_INPUT,
+                ToChangelogTestPrograms.UPDATING_INPUT,
+                ToChangelogTestPrograms.CUSTOM_OP_MAPPING,
+                ToChangelogTestPrograms.CUSTOM_OP_NAME,
+                ToChangelogTestPrograms.TABLE_API_DEFAULT,
+                ToChangelogTestPrograms.LAG_ON_UPSERT_VIA_CHANGELOG,
+                ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG,
+                ToChangelogTestPrograms.MISSING_PARTITION_BY,
+                ToChangelogTestPrograms.INVALID_DESCRIPTOR,
+                ToChangelogTestPrograms.INVALID_OP_MAPPING);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
new file mode 100644
index 00000000000..22602bcf38a
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.time.Instant;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** {@link TableTestProgram} definitions for testing the built-in TO_CHANGELOG 
PTF. */
+public class ToChangelogTestPrograms {
+
+    private static final SourceTestStep SIMPLE_SOURCE =
+            SourceTestStep.newBuilder("t")
+                    .addSchema("id INT", "name STRING")
+                    .addMode(ChangelogMode.insertOnly())
+                    .producedValues(Row.ofKind(RowKind.INSERT, 1, "Alice"))
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // SQL tests
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram INSERT_ONLY_INPUT =
+            TableTestProgram.of("to-changelog-insert-only", "insert-only input 
produces op=INSERT")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema("id INT", "name STRING")
+                                    .addMode(ChangelogMode.insertOnly())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "op STRING", "name 
STRING")
+                                    .consumedValues("+I[1, INSERT, Alice]", 
"+I[2, INSERT, Bob]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t PARTITION BY id)")
+                    .build();
+
+    public static final TableTestProgram UPDATING_INPUT =
+            TableTestProgram.of(
+                            "to-changelog-updating-input",
+                            "retract input produces all op codes including 
UPDATE_BEFORE")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[Alice, INSERT, 10]",
+                                            "+I[Bob, INSERT, 20]",
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t PARTITION BY name)")
+                    .build();
+
+    public static final TableTestProgram CUSTOM_OP_MAPPING =
+            TableTestProgram.of(
+                            "to-changelog-custom-op-mapping",
+                            "custom op_mapping maps RowKinds to user-defined 
codes and drops unmapped")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op_code 
STRING", "score BIGINT")
+                                    .consumedValues(
+                                            "+I[Alice, I, 10]",
+                                            "+I[Bob, I, 20]",
+                                            "+I[Alice, U, 30]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name, "
+                                    + "op => DESCRIPTOR(op_code), "
+                                    + "op_mapping => MAP['INSERT','I', 
'UPDATE_AFTER','U'])")
+                    .build();
+
+    public static final TableTestProgram CUSTOM_OP_NAME =
+            TableTestProgram.of(
+                            "to-changelog-custom-op-name", "custom op column 
name via DESCRIPTOR")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema("id INT", "name STRING")
+                                    .addMode(ChangelogMode.insertOnly())
+                                    .producedValues(Row.ofKind(RowKind.INSERT, 
1, "Alice"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "operation STRING", 
"name STRING")
+                                    .consumedValues("+I[1, INSERT, Alice]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY id, "
+                                    + "op => DESCRIPTOR(operation))")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Table API test
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram TABLE_API_DEFAULT =
+            TableTestProgram.of(
+                            "to-changelog-table-api-default",
+                            "PartitionedTable.toChangelog() convenience 
method")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema("id INT", "name STRING")
+                                    .addMode(ChangelogMode.insertOnly())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "op STRING", "name 
STRING")
+                                    .consumedValues("+I[1, INSERT, Alice]", 
"+I[2, INSERT, Bob]")
+                                    .build())
+                    .runTableApi(env -> 
env.from("t").partitionBy($("id")).toChangelog(), "sink")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Use case: LAG on updating streams via TO_CHANGELOG
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * An upsert source produces INSERT, UPDATE_AFTER, and DELETE events. 
TO_CHANGELOG converts them
+     * to append-only rows with explicit op codes, enabling LAG to track 
status transitions -
+     * something that fails directly on upsert streams.
+     */
+    public static final TableTestProgram LAG_ON_UPSERT_VIA_CHANGELOG =
+            TableTestProgram.of(
+                            "to-changelog-lag-on-upsert",
+                            "enables LAG on upsert stream with INSERT, 
UPDATE_AFTER, and DELETE")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("orders")
+                                    .addSchema(
+                                            "order_id INT NOT NULL",
+                                            "status STRING",
+                                            "ts TIMESTAMP_LTZ(3)",
+                                            "PRIMARY KEY (order_id) NOT 
ENFORCED",
+                                            "WATERMARK FOR ts AS ts - INTERVAL 
'1' SECOND")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedValues(
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    1,
+                                                    "CREATED",
+                                                    
Instant.ofEpochMilli(1000)),
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    2,
+                                                    "CREATED",
+                                                    
Instant.ofEpochMilli(2000)),
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_AFTER,
+                                                    1,
+                                                    "SHIPPED",
+                                                    
Instant.ofEpochMilli(3000)),
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_AFTER,
+                                                    1,
+                                                    "DELIVERED",
+                                                    
Instant.ofEpochMilli(4000)),
+                                            Row.ofKind(
+                                                    RowKind.DELETE,
+                                                    2,
+                                                    "CREATED",
+                                                    
Instant.ofEpochMilli(5000)))
+                                    .build())
+                    .setupSql(
+                            "CREATE VIEW orders_changelog AS "
+                                    + "SELECT order_id, op, status, ts FROM 
TO_CHANGELOG("
+                                    + "  input => TABLE orders PARTITION BY 
order_id, "
+                                    + "  op_mapping => MAP['INSERT', 'INSERT', 
'UPDATE_AFTER', 'UPDATE_AFTER'])")
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(
+                                            "order_id INT",
+                                            "op STRING",
+                                            "cur_status STRING",
+                                            "prev_status STRING")
+                                    .consumedValues(
+                                            "+I[1, INSERT, CREATED, null]",
+                                            "+I[2, INSERT, CREATED, null]",
+                                            "+I[1, UPDATE_AFTER, SHIPPED, 
CREATED]",
+                                            "+I[1, UPDATE_AFTER, DELIVERED, 
SHIPPED]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink "
+                                    + "SELECT order_id, op, status, "
+                                    + "  LAG(status) OVER (PARTITION BY 
order_id ORDER BY ts) AS prev_status "
+                                    + "FROM orders_changelog")
+                    .build();
+
+    /**
+     * A retract source produces UPDATE_BEFORE + UPDATE_AFTER pairs and DELETE 
events. TO_CHANGELOG
+     * drops UPDATE_BEFORE and DELETE via op_mapping, keeping only 
forward-looking transitions for
+     * LAG to track.
+     */
+    public static final TableTestProgram LAG_ON_RETRACT_VIA_CHANGELOG =
+            TableTestProgram.of(
+                            "to-changelog-lag-on-retract",
+                            "enables LAG on retract stream, dropping UB and 
DELETE via op_mapping")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("orders")
+                                    .addSchema(
+                                            "order_id INT NOT NULL",
+                                            "status STRING",
+                                            "ts TIMESTAMP_LTZ(3)",
+                                            "PRIMARY KEY (order_id) NOT 
ENFORCED",
+                                            "WATERMARK FOR ts AS ts - INTERVAL 
'1' SECOND")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    1,
+                                                    "CREATED",
+                                                    
Instant.ofEpochMilli(1000)),
+                                            Row.ofKind(
+                                                    RowKind.INSERT,
+                                                    2,
+                                                    "CREATED",
+                                                    
Instant.ofEpochMilli(2000)),
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_BEFORE,
+                                                    1,
+                                                    "CREATED",
+                                                    
Instant.ofEpochMilli(3000)),
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_AFTER,
+                                                    1,
+                                                    "SHIPPED",
+                                                    
Instant.ofEpochMilli(3000)),
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_BEFORE,
+                                                    1,
+                                                    "SHIPPED",
+                                                    
Instant.ofEpochMilli(4000)),
+                                            Row.ofKind(
+                                                    RowKind.UPDATE_AFTER,
+                                                    1,
+                                                    "DELIVERED",
+                                                    
Instant.ofEpochMilli(4000)),
+                                            Row.ofKind(
+                                                    RowKind.DELETE,
+                                                    2,
+                                                    "CREATED",
+                                                    
Instant.ofEpochMilli(5000)))
+                                    .build())
+                    .setupSql(
+                            "CREATE VIEW orders_changelog AS "
+                                    + "SELECT order_id, op, status, ts FROM 
TO_CHANGELOG("
+                                    + "  input => TABLE orders PARTITION BY 
order_id, "
+                                    + "  op_mapping => MAP['INSERT', 'INSERT', 
'UPDATE_AFTER', 'UPDATE_AFTER'])")
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(
+                                            "order_id INT",
+                                            "op STRING",
+                                            "cur_status STRING",
+                                            "prev_status STRING")
+                                    .consumedValues(
+                                            "+I[1, INSERT, CREATED, null]",
+                                            "+I[2, INSERT, CREATED, null]",
+                                            "+I[1, UPDATE_AFTER, SHIPPED, 
CREATED]",
+                                            "+I[1, UPDATE_AFTER, DELIVERED, 
SHIPPED]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink "
+                                    + "SELECT order_id, op, status, "
+                                    + "  LAG(status) OVER (PARTITION BY 
order_id ORDER BY ts) AS prev_status "
+                                    + "FROM orders_changelog")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Error validation tests
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram MISSING_PARTITION_BY =
+            TableTestProgram.of(
+                            "to-changelog-missing-partition-by",
+                            "fails when PARTITION BY is missing")
+                    .setupTableSource(SIMPLE_SOURCE)
+                    .runFailingSql(
+                            "SELECT * FROM TO_CHANGELOG(input => TABLE t)",
+                            ValidationException.class,
+                            "Table argument 'input' requires a PARTITION BY 
clause for parallel processing.")
+                    .build();
+
+    public static final TableTestProgram INVALID_DESCRIPTOR =
+            TableTestProgram.of(
+                            "to-changelog-invalid-descriptor",
+                            "fails when DESCRIPTOR has multiple columns")
+                    .setupTableSource(SIMPLE_SOURCE)
+                    .runFailingSql(
+                            "SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY id, "
+                                    + "op => DESCRIPTOR(a, b))",
+                            ValidationException.class,
+                            "The descriptor for argument 'op' must contain 
exactly one column name.")
+                    .build();
+
+    public static final TableTestProgram INVALID_OP_MAPPING =
+            TableTestProgram.of(
+                            "to-changelog-invalid-op-mapping",
+                            "fails when op_mapping has invalid RowKind name")
+                    .setupTableSource(SIMPLE_SOURCE)
+                    .runFailingSql(
+                            "SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY id, "
+                                    + "op_mapping => MAP['INVALID_KIND', 
'X'])",
+                            ValidationException.class,
+                            "Invalid target mapping for argument 
'op_mapping'.")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
new file mode 100644
index 00000000000..0d4380bcb2e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.table.planner.utils.TableTestUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+/**
+ * Plan tests for the TO_CHANGELOG built-in process table function. Uses {@link
+ * ExplainDetail#CHANGELOG_MODE} to verify changelog mode propagation through 
the plan.
+ */
+public class ToChangelogTest extends TableTestBase {
+
+    private static final java.util.List<ExplainDetail> CHANGELOG_MODE =
+            Collections.singletonList(ExplainDetail.CHANGELOG_MODE);
+
+    private TableTestUtil util;
+
+    @BeforeEach
+    void setup() {
+        util = streamTestUtil(TableConfig.getDefault());
+    }
+
+    @Test
+    void testRetractSource() {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE retract_source ("
+                                + "  id INT,"
+                                + "  name STRING,"
+                                + "  PRIMARY KEY (id) NOT ENFORCED"
+                                + ") WITH ("
+                                + "  'connector' = 'values',"
+                                + "  'changelog-mode' = 'I,UB,UA,D'"
+                                + ")");
+        util.verifyRelPlan(
+                "SELECT * FROM TO_CHANGELOG(input => TABLE retract_source 
PARTITION BY id)",
+                CHANGELOG_MODE);
+    }
+
+    @Test
+    void testUpsertSource() {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE upsert_source ("
+                                + "  id INT,"
+                                + "  name STRING,"
+                                + "  PRIMARY KEY (id) NOT ENFORCED"
+                                + ") WITH ("
+                                + "  'connector' = 'values',"
+                                + "  'changelog-mode' = 'I,UA,D'"
+                                + ")");
+        util.verifyRelPlan(
+                "SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source 
PARTITION BY id)",
+                CHANGELOG_MODE);
+    }
+
+    @Test
+    void testInsertOnlySource() {
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE insert_only_source ("
+                                + "  id INT,"
+                                + "  name STRING"
+                                + ") WITH ('connector' = 'values')");
+        util.verifyRelPlan(
+                "SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source 
PARTITION BY id)",
+                CHANGELOG_MODE);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
new file mode 100644
index 00000000000..a3a15dc8ef4
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testInsertOnlySource">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source 
PARTITION BY id)]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], 
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, 
VARCHAR(2147483647) name)], changelogMode=[I])
++- Exchange(distribution=[hash[id]], changelogMode=[I])
+   +- TableSourceScan(table=[[default_catalog, default_database, 
insert_only_source]], fields=[id, name], changelogMode=[I])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRetractSource">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source 
PARTITION BY id)]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], 
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, 
VARCHAR(2147483647) name)], changelogMode=[I])
++- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+   +- TableSourceScan(table=[[default_catalog, default_database, 
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testUpsertSource">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source 
PARTITION BY id)]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], 
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, 
VARCHAR(2147483647) name)], changelogMode=[I])
++- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+   +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+      +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_source]], fields=[id, name], changelogMode=[I,UA,D])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index b194d08637c..3e8d5729ee7 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -612,6 +612,11 @@ abstract class TableTestUtilBase(test: TableTestBase, 
isStreamingMode: Boolean)
       withQueryBlockAlias = false)
   }
 
+  /** Java-friendly overload that accepts a list of [[ExplainDetail]]s. */
+  def verifyRelPlan(query: String, extraDetails: 
java.util.List[ExplainDetail]): Unit = {
+    verifyRelPlan(query, extraDetails.asScala.toSeq: _*)
+  }
+
   /**
    * Verify the AST (abstract syntax tree) and the optimized rel plan for the 
given INSERT
    * statement.
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/BuiltInProcessTableFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/BuiltInProcessTableFunction.java
new file mode 100644
index 00000000000..1ac2e2e03e7
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/BuiltInProcessTableFunction.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.ptf;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.BuiltInFunctionDefinition;
+import org.apache.flink.table.functions.FunctionRequirement;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.StateTypeStrategy;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategies;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Base class for built-in process table functions that are constructed from 
{@link
+ * BuiltInFunctionDefinition#specialize(SpecializedContext)}.
+ *
+ * <p>Subclasses must offer a constructor that takes {@link 
SpecializedContext}.
+ *
+ * <p>By default, all built-in PTFs work on internal data structures. All 
argument types, output
+ * type, and state types are converted to internal representation (e.g. {@code 
RowData} instead of
+ * {@code Row}, {@code MapData} instead of {@code Map}).
+ */
+@Internal
+public abstract class BuiltInProcessTableFunction<T> extends 
ProcessTableFunction<T> {
+
+    private final transient BuiltInFunctionDefinition definition;
+
+    private final transient List<DataType> argumentDataTypes;
+
+    private final transient DataType internalOutputDataType;
+
+    private final transient LinkedHashMap<String, StateTypeStrategy> 
internalStateStrategies;
+
+    protected BuiltInProcessTableFunction(
+            final BuiltInFunctionDefinition definition, final 
SpecializedContext context) {
+        this.definition = definition;
+        final CallContext callContext = context.getCallContext();
+        final TypeInference definitionTypeInference =
+                definition.getTypeInference(callContext.getDataTypeFactory());
+
+        this.argumentDataTypes =
+                callContext.getArgumentDataTypes().stream()
+                        .map(DataTypeUtils::toInternalDataType)
+                        .collect(Collectors.toList());
+
+        this.internalOutputDataType =
+                callContext
+                        .getOutputDataType()
+                        .map(DataTypeUtils::toInternalDataType)
+                        .orElseThrow(IllegalStateException::new);
+
+        this.internalStateStrategies =
+                toInternalStateStrategies(definitionTypeInference, 
callContext);
+    }
+
+    // Uses deprecated typedArguments() because staticArguments() does not 
apply the conversion
+    // class to TABLE arguments in TypeInferenceUtil.inferInputTypes, and we 
want to use internal
+    // types in our built in functions.
+    @SuppressWarnings("deprecation")
+    @Override
+    public TypeInference getTypeInference(final DataTypeFactory typeFactory) {
+        final TypeInference definitionTypeInference = 
definition.getTypeInference(typeFactory);
+
+        return TypeInference.newBuilder()
+                .typedArguments(argumentDataTypes)
+                .stateTypeStrategies(internalStateStrategies)
+                
.outputTypeStrategy(TypeStrategies.explicit(internalOutputDataType))
+                
.disableSystemArguments(definitionTypeInference.disableSystemArguments())
+                .build();
+    }
+
+    /** Wraps each state type strategy to produce internal data types, 
preserving TTL. */
+    private static LinkedHashMap<String, StateTypeStrategy> 
toInternalStateStrategies(
+            final TypeInference definitionTypeInference, final CallContext 
callContext) {
+        final LinkedHashMap<String, StateTypeStrategy> result = new 
LinkedHashMap<>();
+        definitionTypeInference
+                .getStateTypeStrategies()
+                .forEach(
+                        (name, strategy) ->
+                                result.put(
+                                        name,
+                                        StateTypeStrategy.of(
+                                                ctx ->
+                                                        strategy.inferType(ctx)
+                                                                .map(
+                                                                        
DataTypeUtils
+                                                                               
 ::toInternalDataType),
+                                                
strategy.getTimeToLive(callContext).orElse(null))));
+        return result;
+    }
+
+    @Override
+    public Set<FunctionRequirement> getRequirements() {
+        if (definition != null) {
+            return definition.getRequirements();
+        }
+        return super.getRequirements();
+    }
+
+    @Override
+    public boolean isDeterministic() {
+        if (definition != null) {
+            return definition.isDeterministic();
+        }
+        return super.isDeterministic();
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
new file mode 100644
index 00000000000..75211e164f5
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.ptf;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.ColumnList;
+import org.apache.flink.types.RowKind;
+
+import javax.annotation.Nullable;
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Runtime implementation of {@link BuiltInFunctionDefinitions#TO_CHANGELOG}.
+ *
+ * <p>Converts each input row into an INSERT-only output row with an operation 
code column. The
+ * output schema is {@code [op_column, ...non_partition_columns...]} - the 
framework prepends
+ * partition key columns automatically.
+ *
+ * <p>Uses {@link ProjectedRowData} for zero-copy projection of non-partition 
columns and {@link
+ * JoinedRowData} to combine the op column with the projected input.
+ */
+@Internal
+public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Map<RowKind, String> DEFAULT_OP_MAPPING =
+            Map.of(
+                    RowKind.INSERT, "INSERT",
+                    RowKind.UPDATE_BEFORE, "UPDATE_BEFORE",
+                    RowKind.UPDATE_AFTER, "UPDATE_AFTER",
+                    RowKind.DELETE, "DELETE");
+
+    private final Map<RowKind, String> rawOpMap;
+    private final int[] nonPartitionIndices;
+
+    private transient Map<RowKind, StringData> opMap;
+    private transient ProjectedRowData projectedInput;
+    private transient GenericRowData opRow;
+    private transient JoinedRowData output;
+
+    @SuppressWarnings("unchecked")
+    public ToChangelogFunction(final SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.TO_CHANGELOG, context);
+        final CallContext callContext = context.getCallContext();
+
+        final TableSemantics semantics =
+                callContext
+                        .getTableSemantics(0)
+                        .orElseThrow(() -> new IllegalStateException("Table 
argument expected."));
+        final int[] partitionKeys = semantics.partitionByColumns();
+        final Set<Integer> partitionKeySet =
+                
IntStream.of(partitionKeys).boxed().collect(Collectors.toSet());
+
+        final RowType inputType = (RowType) 
semantics.dataType().getLogicalType();
+        this.nonPartitionIndices =
+                buildNonPartitionIndices(inputType.getFieldCount(), 
partitionKeySet);
+
+        final Map<String, String> opMapping =
+                callContext.getArgumentValue(2, Map.class).orElse(null);
+        this.rawOpMap = buildOpMap(opMapping);
+    }
+
+    @Override
+    public void open(final FunctionContext context) throws Exception {
+        super.open(context);
+        opMap = new EnumMap<>(RowKind.class);
+        rawOpMap.forEach((kind, code) -> opMap.put(kind, 
StringData.fromString(code)));
+        projectedInput = ProjectedRowData.from(nonPartitionIndices);
+        opRow = new GenericRowData(1);
+        output = new JoinedRowData();
+    }
+
+    private static int[] buildNonPartitionIndices(
+            final int fieldCount, final Set<Integer> partitionKeySet) {
+        return IntStream.range(0, fieldCount).filter(i -> 
!partitionKeySet.contains(i)).toArray();
+    }
+
+    private static Map<RowKind, String> buildOpMap(@Nullable final Map<String, 
String> opMapping) {
+        if (opMapping == null) {
+            return new EnumMap<>(DEFAULT_OP_MAPPING);
+        }
+        final Map<RowKind, String> map = new EnumMap<>(RowKind.class);
+        opMapping.forEach((name, code) -> map.put(RowKind.valueOf(name), 
code));
+        return map;
+    }
+
+    public void eval(
+            final Context ctx,
+            final RowData input,
+            @Nullable final ColumnList op,
+            @Nullable final MapData opMapping) {
+        final StringData opCode = opMap.get(input.getRowKind());
+        if (opCode == null) {
+            return;
+        }
+
+        opRow.setField(0, opCode);
+        projectedInput.replaceRow(input);
+        collect(output.replace(opRow, projectedInput));
+    }
+}

Reply via email to