This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bcb3682605e [FLINK-39419][table] Switch TO_CHANGELOG to row semantics 
with full deletes + require update before
bcb3682605e is described below

commit bcb3682605e46c23c074fe2d31f0639cc22ff212
Author: Gustavo de Morais <[email protected]>
AuthorDate: Tue Apr 14 09:38:44 2026 +0200

    [FLINK-39419][table] Switch TO_CHANGELOG to row semantics with full deletes 
+ require update before
    
    This closes #27911.
---
 .../docs/sql/reference/queries/changelog.md        |  32 +++---
 .../pyflink/table/tests/test_table_completeness.py |   1 +
 .../apache/flink/table/api/PartitionedTable.java   |  34 ------
 .../java/org/apache/flink/table/api/Table.java     |  32 ++++++
 .../apache/flink/table/api/internal/TableImpl.java |  10 +-
 .../functions/BuiltInFunctionDefinitions.java      |  11 +-
 .../strategies/ToChangelogTypeStrategy.java        |  13 +--
 .../inference/CallBindingCallContext.java          |  18 +++-
 .../exec/stream/ToChangelogSemanticTests.java      |   2 +-
 .../nodes/exec/stream/ToChangelogTestPrograms.java | 115 ++++++++++++---------
 .../planner/plan/stream/sql/ToChangelogTest.java   |   9 +-
 .../planner/plan/stream/sql/ToChangelogTest.xml    |  49 ++++++---
 .../runtime/functions/ptf/ToChangelogFunction.java |  35 +------
 13 files changed, 187 insertions(+), 174 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
index 15ad336f4d0..61843ab5c2f 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -44,7 +44,7 @@ This is useful when you need to materialize changelog events 
into a downstream s
 
 ```sql
 SELECT * FROM TO_CHANGELOG(
-  input => TABLE source_table PARTITION BY key_col,
+  input => TABLE source_table,
   [op => DESCRIPTOR(op_column_name),]
   [op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
 )
@@ -54,7 +54,7 @@ SELECT * FROM TO_CHANGELOG(
 
 | Parameter    | Required | Description |
 |:-------------|:---------|:------------|
-| `input`      | Yes      | The input table. Must include `PARTITION BY` for 
parallel execution. Accepts insert-only, retract, and upsert tables. |
+| `input`      | Yes      | The input table. Accepts insert-only, retract, and 
upsert tables. |
 | `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. |
 | `op_mapping` | No       | A `MAP<STRING, STRING>` mapping change operation 
names to custom output codes. Keys can contain comma-separated names to map 
multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When 
provided, only mapped operations are forwarded - unmapped events are dropped. 
Each change operation may appear at most once across all entries. |
 
@@ -74,7 +74,7 @@ When `op_mapping` is omitted, all four change operations are 
mapped to their sta
 The output columns are ordered as:
 
 ```
-[partition_key_columns, op_column, remaining_columns]
+[op_column, all_input_columns]
 ```
 
 All output rows have `INSERT` - the table is always append-only.
@@ -85,25 +85,25 @@ All output rows have `INSERT` - the table is always 
append-only.
 
 ```sql
 -- Input: retract table from an aggregation
--- +I[id:1, name:'Alice', cnt:1]
--- +U[id:1, name:'Alice', cnt:2]
--- -D[id:2, name:'Bob',   cnt:1]
+-- +I[name:'Alice', cnt:1]
+-- +U[name:'Alice', cnt:2]
+-- -D[name:'Bob',   cnt:1]
 
 SELECT * FROM TO_CHANGELOG(
-  input => TABLE my_aggregation PARTITION BY id
+  input => TABLE my_aggregation
 )
 
 -- Output (append-only):
--- +I[id:1, op:'INSERT',       name:'Alice', cnt:1]
--- +I[id:1, op:'UPDATE_AFTER', name:'Alice', cnt:2]
--- +I[id:2, op:'DELETE',       name:'Bob',   cnt:1]
+-- +I[op:'INSERT',       name:'Alice', cnt:1]
+-- +I[op:'UPDATE_AFTER', name:'Alice', cnt:2]
+-- +I[op:'DELETE',       name:'Bob',   cnt:1]
 ```
 
 #### Custom operation column name
 
 ```sql
 SELECT * FROM TO_CHANGELOG(
-  input => TABLE my_aggregation PARTITION BY id,
+  input => TABLE my_aggregation,
   op => DESCRIPTOR(operation)
 )
 -- The op column is now named 'operation' instead of 'op'
@@ -113,7 +113,7 @@ SELECT * FROM TO_CHANGELOG(
 
 ```sql
 SELECT * FROM TO_CHANGELOG(
-  input => TABLE my_aggregation PARTITION BY id,
+  input => TABLE my_aggregation,
   op => DESCRIPTOR(op_code),
   op_mapping => MAP['INSERT', 'I', 'UPDATE_AFTER', 'U']
 )
@@ -126,7 +126,7 @@ SELECT * FROM TO_CHANGELOG(
 
 ```sql
 SELECT * FROM TO_CHANGELOG(
-  input => TABLE my_aggregation PARTITION BY id,
+  input => TABLE my_aggregation,
   op => DESCRIPTOR(deleted),
   op_mapping => MAP['INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true']
 )
@@ -139,16 +139,16 @@ SELECT * FROM TO_CHANGELOG(
 
 ```java
 // Default: adds 'op' column and supports all changelog modes
-Table result = myTable.partitionBy($("id")).toChangelog();
+Table result = myTable.toChangelog();
 
 // With custom parameters
-Table result = myTable.partitionBy($("id")).toChangelog(
+Table result = myTable.toChangelog(
     descriptor("op_code").asArgument("op"),
     map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
 );
 
 // Deletion flag pattern: comma-separated keys map multiple change operations 
to the same code
-Table result = myTable.partitionBy($("id")).toChangelog(
+Table result = myTable.toChangelog(
     descriptor("deleted").asArgument("op"),
     map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
 );
diff --git a/flink-python/pyflink/table/tests/test_table_completeness.py 
b/flink-python/pyflink/table/tests/test_table_completeness.py
index feca7e63b1a..21bb038aef3 100644
--- a/flink-python/pyflink/table/tests/test_table_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_completeness.py
@@ -42,6 +42,7 @@ class 
TableAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
             'asArgument',
             'process',
             'partitionBy',
+            'toChangelog',
         }
 
     @classmethod
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
index 7db750a296d..32180e51fb9 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.annotation.ArgumentTrait;
-import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.functions.ProcessTableFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
 
@@ -139,37 +138,4 @@ public interface PartitionedTable {
      * @see ProcessTableFunction
      */
     Table process(Class<? extends UserDefinedFunction> function, Object... 
arguments);
-
-    /**
-     * Converts this dynamic table into an append-only table with an explicit 
operation code column
-     * using the built-in {@code TO_CHANGELOG} process table function.
-     *
-     * <p>Each input row - regardless of its original change operation - is 
emitted as an
-     * INSERT-only row with a string {@code "op"} column indicating the 
original operation (INSERT,
-     * UPDATE_AFTER, DELETE, etc.).
-     *
-     * <p>Optional arguments can be passed using named expressions:
-     *
-     * <pre>{@code
-     * // Default: adds 'op' column and supports all changelog modes
-     * table.partitionBy($("id")).toChangelog();
-     *
-     * // Custom op column name and mapping
-     * table.partitionBy($("id")).toChangelog(
-     *     descriptor("op_code").asArgument("op"),
-     *     map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
-     * );
-     *
-     * // Deletion flag pattern: comma-separated keys map multiple change 
operations to the same code
-     * table.partitionBy($("id")).toChangelog(
-     *     descriptor("deleted").asArgument("op"),
-     *     map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
-     * );
-     * }</pre>
-     *
-     * @param arguments optional named arguments for {@code op} and {@code 
op_mapping}
-     * @return an append-only {@link Table} with an {@code op} column 
prepended to the non-partition
-     *     columns
-     */
-    Table toChangelog(Expression... arguments);
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index 2471b91227d..ef1785aecdd 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1422,4 +1422,36 @@ public interface Table extends Explainable<Table>, 
Executable {
      * @see ProcessTableFunction
      */
     Table process(Class<? extends UserDefinedFunction> function, Object... 
arguments);
+
+    /**
+     * Converts this dynamic table into an append-only table with an explicit 
operation code column
+     * using the built-in {@code TO_CHANGELOG} process table function.
+     *
+     * <p>Each input row - regardless of its original change operation - is 
emitted as an
+     * INSERT-only row with a string {@code "op"} column indicating the 
original operation (INSERT,
+     * UPDATE_AFTER, DELETE, etc.).
+     *
+     * <p>Optional arguments can be passed using named expressions:
+     *
+     * <pre>{@code
+     * // Default: adds 'op' column and supports all changelog modes
+     * table.toChangelog();
+     *
+     * // Custom op column name and mapping
+     * table.toChangelog(
+     *     descriptor("op_code").asArgument("op"),
+     *     map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
+     * );
+     *
+     * // Deletion flag pattern: comma-separated keys map multiple change 
operations to the same code
+     * table.toChangelog(
+     *     descriptor("deleted").asArgument("op"),
+     *     map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
+     * );
+     * }</pre>
+     *
+     * @param arguments optional named arguments for {@code op} and {@code 
op_mapping}
+     * @return an append-only {@link Table} with an {@code op} column 
prepended to the input columns
+     */
+    Table toChangelog(Expression... arguments);
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index 335f28f2f8e..73d70a83dfd 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -514,6 +514,11 @@ public class TableImpl implements Table {
                 function, unionTableAndArguments(operationTree, 
tableEnvironment, arguments));
     }
 
+    @Override
+    public Table toChangelog(Expression... arguments) {
+        return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(), 
(Object[]) arguments);
+    }
+
     private TablePipeline insertInto(
             ContextResolvedTable contextResolvedTable,
             @Nullable InsertConflictStrategy conflictStrategy,
@@ -901,11 +906,6 @@ public class TableImpl implements Table {
                             createPartitionQueryOperation(), 
table.tableEnvironment, arguments));
         }
 
-        @Override
-        public Table toChangelog(Expression... arguments) {
-            return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(), 
(Object[]) arguments);
-        }
-
         private QueryOperation createPartitionQueryOperation() {
             return table.operationTreeBuilder.partition(partitionKeys, 
table.operationTree);
         }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index e8783ba51c8..0f2f35a056a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -782,15 +782,22 @@ public final class BuiltInFunctionDefinitions {
                     .name("TO_CHANGELOG")
                     .kind(PROCESS_TABLE)
                     .staticArguments(
+                            // Row semantics (no PARTITION BY). Accepts 
updating
+                            // inputs. The planner inserts ChangelogNormalize 
for
+                            // upsert sources to produce UPDATE_BEFORE and full
+                            // DELETE rows.
                             StaticArgument.table(
                                     "input",
                                     Row.class,
                                     false,
                                     EnumSet.of(
                                             StaticArgumentTrait.TABLE,
-                                            
StaticArgumentTrait.SET_SEMANTIC_TABLE,
+                                            
StaticArgumentTrait.ROW_SEMANTIC_TABLE,
                                             
StaticArgumentTrait.SUPPORT_UPDATES,
-                                            
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)),
+                                            
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
+                                            // Not strictly necessary but 
explicitly state that
+                                            // we require full deletes.
+                                            
StaticArgumentTrait.REQUIRE_FULL_DELETE)),
                             StaticArgument.scalar("op", 
DataTypes.DESCRIPTOR(), true),
                             StaticArgument.scalar(
                                     "op_mapping",
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
index d13d52a49c7..c910885d69e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
@@ -40,8 +40,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /** Type strategies for the {@code TO_CHANGELOG} process table function. */
 @Internal
@@ -98,15 +96,10 @@ public final class ToChangelogTypeStrategy {
 
                 final String opColumnName = resolveOpColumnName(callContext);
                 final List<Field> inputFields = 
DataType.getFields(semantics.dataType());
-                final Set<Integer> partitionKeys = 
intArrayToSet(semantics.partitionByColumns());
 
                 final List<Field> outputFields = new ArrayList<>();
                 outputFields.add(DataTypes.FIELD(opColumnName, 
DataTypes.STRING()));
-                for (int i = 0; i < inputFields.size(); i++) {
-                    if (!partitionKeys.contains(i)) {
-                        outputFields.add(inputFields.get(i));
-                    }
-                }
+                outputFields.addAll(inputFields);
 
                 return Optional.of(DataTypes.ROW(outputFields).notNull());
             };
@@ -199,9 +192,5 @@ public final class ToChangelogTypeStrategy {
                 .orElse(DEFAULT_OP_COLUMN_NAME);
     }
 
-    private static Set<Integer> intArrayToSet(final int[] array) {
-        return IntStream.of(array).boxed().collect(Collectors.toSet());
-    }
-
     private ToChangelogTypeStrategy() {}
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
index e5968ce21fa..01c569a19ef 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
@@ -368,9 +368,11 @@ public final class CallBindingCallContext extends 
AbstractSqlCallContext {
         final Map<String, String> map = new LinkedHashMap<>();
         try {
             for (int i = 0; i < operands.size(); i += 2) {
-                final String key = 
SqlLiteral.unchain(operands.get(i)).getValueAs(String.class);
+                final String key =
+                        
SqlLiteral.unchain(unwrapCast(operands.get(i))).getValueAs(String.class);
                 final String value =
-                        SqlLiteral.unchain(operands.get(i + 
1)).getValueAs(String.class);
+                        SqlLiteral.unchain(unwrapCast(operands.get(i + 1)))
+                                .getValueAs(String.class);
                 map.put(key, value);
             }
         } catch (Exception e) {
@@ -380,6 +382,18 @@ public final class CallBindingCallContext extends 
AbstractSqlCallContext {
         return map;
     }
 
+    /** Unwraps implicit CHAR-type CASTs added by Calcite for length 
normalization. */
+    private static SqlNode unwrapCast(final SqlNode node) {
+        if (node.getKind() == SqlKind.CAST && node instanceof SqlCall) {
+            final SqlNode inner = ((SqlCall) node).operand(0);
+            if (inner instanceof SqlLiteral
+                    && SqlTypeName.CHAR_TYPES.contains(((SqlLiteral) 
inner).getTypeName())) {
+                return inner;
+            }
+        }
+        return node;
+    }
+
     /** A MAP constructor is a string literal if all its key-value children 
are string literals. */
     private static boolean isLiteralMap(SqlNode sqlNode) {
         if (sqlNode.getKind() != SqlKind.MAP_VALUE_CONSTRUCTOR) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
index 706f7abd1d3..da3c9270eba 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -41,13 +41,13 @@ public class ToChangelogSemanticTests extends 
SemanticTestBase {
         return List.of(
                 ToChangelogTestPrograms.INSERT_ONLY_INPUT,
                 ToChangelogTestPrograms.UPDATING_INPUT,
+                ToChangelogTestPrograms.UPSERT_INPUT,
                 ToChangelogTestPrograms.CUSTOM_OP_MAPPING,
                 ToChangelogTestPrograms.CUSTOM_OP_NAME,
                 ToChangelogTestPrograms.TABLE_API_DEFAULT,
                 ToChangelogTestPrograms.LAG_ON_UPSERT_VIA_CHANGELOG,
                 ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG,
                 ToChangelogTestPrograms.DELETION_FLAG,
-                ToChangelogTestPrograms.MISSING_PARTITION_BY,
                 ToChangelogTestPrograms.INVALID_DESCRIPTOR,
                 ToChangelogTestPrograms.INVALID_OP_MAPPING,
                 ToChangelogTestPrograms.DUPLICATE_ROW_KIND);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index bfc67097112..4056b652a79 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -28,8 +28,6 @@ import org.apache.flink.types.RowKind;
 
 import java.time.Instant;
 
-import static org.apache.flink.table.api.Expressions.$;
-
 /** {@link TableTestProgram} definitions for testing the built-in TO_CHANGELOG 
PTF. */
 public class ToChangelogTestPrograms {
 
@@ -56,11 +54,10 @@ public class ToChangelogTestPrograms {
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink")
-                                    .addSchema("id INT", "op STRING", "name 
STRING")
-                                    .consumedValues("+I[1, INSERT, Alice]", 
"+I[2, INSERT, Bob]")
+                                    .addSchema("op STRING", "id INT", "name 
STRING")
+                                    .consumedValues("+I[INSERT, 1, Alice]", 
"+I[INSERT, 2, Bob]")
                                     .build())
-                    .runSql(
-                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t PARTITION BY id)")
+                    .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
                     .build();
 
     public static final TableTestProgram UPDATING_INPUT =
@@ -81,16 +78,45 @@ public class ToChangelogTestPrograms {
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink")
-                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
                                     .consumedValues(
-                                            "+I[Alice, INSERT, 10]",
-                                            "+I[Bob, INSERT, 20]",
-                                            "+I[Alice, UPDATE_BEFORE, 10]",
-                                            "+I[Alice, UPDATE_AFTER, 30]",
-                                            "+I[Bob, DELETE, 20]")
+                                            "+I[INSERT, Alice, 10]",
+                                            "+I[INSERT, Bob, 20]",
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, 20]")
                                     .build())
-                    .runSql(
-                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t PARTITION BY name)")
+                    .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
+                    .build();
+
+    public static final TableTestProgram UPSERT_INPUT =
+            TableTestProgram.of(
+                            "to-changelog-upsert-input",
+                            "upsert input gets ChangelogNormalize for 
UPDATE_BEFORE and full deletes")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize fills
+                                            // in the full row from state
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[INSERT, Alice, 10]",
+                                            "+I[INSERT, Bob, 20]",
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, 20]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
                     .build();
 
     public static final TableTestProgram CUSTOM_OP_MAPPING =
@@ -111,15 +137,15 @@ public class ToChangelogTestPrograms {
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink")
-                                    .addSchema("name STRING", "op_code 
STRING", "score BIGINT")
+                                    .addSchema("op_code STRING", "name 
STRING", "score BIGINT")
                                     .consumedValues(
-                                            "+I[Alice, I, 10]",
-                                            "+I[Bob, I, 20]",
-                                            "+I[Alice, U, 30]")
+                                            "+I[I, Alice, 10]",
+                                            "+I[I, Bob, 20]",
+                                            "+I[U, Alice, 30]")
                                     .build())
                     .runSql(
                             "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
-                                    + "input => TABLE t PARTITION BY name, "
+                                    + "input => TABLE t, "
                                     + "op => DESCRIPTOR(op_code), "
                                     + "op_mapping => MAP['INSERT','I', 
'UPDATE_AFTER','U'])")
                     .build();
@@ -135,12 +161,12 @@ public class ToChangelogTestPrograms {
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink")
-                                    .addSchema("id INT", "operation STRING", 
"name STRING")
-                                    .consumedValues("+I[1, INSERT, Alice]")
+                                    .addSchema("operation STRING", "id INT", 
"name STRING")
+                                    .consumedValues("+I[INSERT, 1, Alice]")
                                     .build())
                     .runSql(
                             "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
-                                    + "input => TABLE t PARTITION BY id, "
+                                    + "input => TABLE t, "
                                     + "op => DESCRIPTOR(operation))")
                     .build();
 
@@ -151,7 +177,7 @@ public class ToChangelogTestPrograms {
     public static final TableTestProgram TABLE_API_DEFAULT =
             TableTestProgram.of(
                             "to-changelog-table-api-default",
-                            "PartitionedTable.toChangelog() convenience 
method")
+                            "Table.toChangelog() convenience method")
                     .setupTableSource(
                             SourceTestStep.newBuilder("t")
                                     .addSchema("id INT", "name STRING")
@@ -162,10 +188,10 @@ public class ToChangelogTestPrograms {
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink")
-                                    .addSchema("id INT", "op STRING", "name 
STRING")
-                                    .consumedValues("+I[1, INSERT, Alice]", 
"+I[2, INSERT, Bob]")
+                                    .addSchema("op STRING", "id INT", "name 
STRING")
+                                    .consumedValues("+I[INSERT, 1, Alice]", 
"+I[INSERT, 2, Bob]")
                                     .build())
-                    .runTableApi(env -> 
env.from("t").partitionBy($("id")).toChangelog(), "sink")
+                    .runTableApi(env -> env.from("t").toChangelog(), "sink")
                     .build();
 
     // 
--------------------------------------------------------------------------------------------
@@ -219,8 +245,8 @@ public class ToChangelogTestPrograms {
                                     .build())
                     .setupSql(
                             "CREATE VIEW orders_changelog AS "
-                                    + "SELECT order_id, op, status, ts FROM 
TO_CHANGELOG("
-                                    + "  input => TABLE orders PARTITION BY 
order_id, "
+                                    + "SELECT op, order_id, status, ts FROM 
TO_CHANGELOG("
+                                    + "  input => TABLE orders, "
                                     + "  op_mapping => MAP['INSERT', 'INSERT', 
'UPDATE_AFTER', 'UPDATE_AFTER'])")
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink")
@@ -299,8 +325,8 @@ public class ToChangelogTestPrograms {
                                     .build())
                     .setupSql(
                             "CREATE VIEW orders_changelog AS "
-                                    + "SELECT order_id, op, status, ts FROM 
TO_CHANGELOG("
-                                    + "  input => TABLE orders PARTITION BY 
order_id, "
+                                    + "SELECT op, order_id, status, ts FROM 
TO_CHANGELOG("
+                                    + "  input => TABLE orders, "
                                     + "  op_mapping => MAP['INSERT', 'INSERT', 
'UPDATE_AFTER', 'UPDATE_AFTER'])")
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink")
@@ -348,16 +374,16 @@ public class ToChangelogTestPrograms {
                                     .build())
                     .setupTableSink(
                             SinkTestStep.newBuilder("sink")
-                                    .addSchema("name STRING", "deleted 
STRING", "score BIGINT")
+                                    .addSchema("deleted STRING", "name 
STRING", "score BIGINT")
                                     .consumedValues(
-                                            "+I[Alice, false, 10]",
-                                            "+I[Bob, false, 20]",
-                                            "+I[Alice, false, 30]",
-                                            "+I[Bob, true, 20]")
+                                            "+I[false, Alice, 10]",
+                                            "+I[false, Bob, 20]",
+                                            "+I[false, Alice, 30]",
+                                            "+I[true, Bob, 20]")
                                     .build())
                     .runSql(
                             "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
-                                    + "input => TABLE t PARTITION BY name, "
+                                    + "input => TABLE t, "
                                     + "op => DESCRIPTOR(deleted), "
                                     + "op_mapping => MAP['INSERT, 
UPDATE_AFTER', 'false', 'DELETE', 'true'])")
                     .build();
@@ -366,17 +392,6 @@ public class ToChangelogTestPrograms {
     // Error validation tests
     // 
--------------------------------------------------------------------------------------------
 
-    public static final TableTestProgram MISSING_PARTITION_BY =
-            TableTestProgram.of(
-                            "to-changelog-missing-partition-by",
-                            "fails when PARTITION BY is missing")
-                    .setupTableSource(SIMPLE_SOURCE)
-                    .runFailingSql(
-                            "SELECT * FROM TO_CHANGELOG(input => TABLE t)",
-                            ValidationException.class,
-                            "Table argument 'input' requires a PARTITION BY 
clause for parallel processing.")
-                    .build();
-
     public static final TableTestProgram INVALID_DESCRIPTOR =
             TableTestProgram.of(
                             "to-changelog-invalid-descriptor",
@@ -384,7 +399,7 @@ public class ToChangelogTestPrograms {
                     .setupTableSource(SIMPLE_SOURCE)
                     .runFailingSql(
                             "SELECT * FROM TO_CHANGELOG("
-                                    + "input => TABLE t PARTITION BY id, "
+                                    + "input => TABLE t, "
                                     + "op => DESCRIPTOR(a, b))",
                             ValidationException.class,
                             "The descriptor for argument 'op' must contain 
exactly one column name.")
@@ -397,7 +412,7 @@ public class ToChangelogTestPrograms {
                     .setupTableSource(SIMPLE_SOURCE)
                     .runFailingSql(
                             "SELECT * FROM TO_CHANGELOG("
-                                    + "input => TABLE t PARTITION BY id, "
+                                    + "input => TABLE t, "
                                     + "op_mapping => MAP['INVALID_KIND', 
'X'])",
                             ValidationException.class,
                             "Unknown change operation: 'INVALID_KIND'")
@@ -410,7 +425,7 @@ public class ToChangelogTestPrograms {
                     .setupTableSource(SIMPLE_SOURCE)
                     .runFailingSql(
                             "SELECT * FROM TO_CHANGELOG("
-                                    + "input => TABLE t PARTITION BY id, "
+                                    + "input => TABLE t, "
                                     + "op_mapping => MAP['INSERT, DELETE', 
'A', 'DELETE', 'B'])",
                             ValidationException.class,
                             "Duplicate change operation: 'DELETE'")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
index 0d4380bcb2e..ee9576b2e75 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
@@ -57,8 +57,7 @@ public class ToChangelogTest extends TableTestBase {
                                 + "  'changelog-mode' = 'I,UB,UA,D'"
                                 + ")");
         util.verifyRelPlan(
-                "SELECT * FROM TO_CHANGELOG(input => TABLE retract_source 
PARTITION BY id)",
-                CHANGELOG_MODE);
+                "SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)", 
CHANGELOG_MODE);
     }
 
     @Test
@@ -74,8 +73,7 @@ public class ToChangelogTest extends TableTestBase {
                                 + "  'changelog-mode' = 'I,UA,D'"
                                 + ")");
         util.verifyRelPlan(
-                "SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source 
PARTITION BY id)",
-                CHANGELOG_MODE);
+                "SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)", 
CHANGELOG_MODE);
     }
 
     @Test
@@ -87,7 +85,6 @@ public class ToChangelogTest extends TableTestBase {
                                 + "  name STRING"
                                 + ") WITH ('connector' = 'values')");
         util.verifyRelPlan(
-                "SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source 
PARTITION BY id)",
-                CHANGELOG_MODE);
+                "SELECT * FROM TO_CHANGELOG(input => TABLE 
insert_only_source)", CHANGELOG_MODE);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
index a3a15dc8ef4..91c73153bee 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
@@ -18,39 +18,60 @@ limitations under the License.
 <Root>
   <TestCase name="testInsertOnlySource">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source 
PARTITION BY id)]]>
+      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(op=[$0], id=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, 
INTEGER id, VARCHAR(2147483647) name)])
+   +- LogicalProject(id=[$0], name=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
insert_only_source]])
+]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], 
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, 
VARCHAR(2147483647) name)], changelogMode=[I])
-+- Exchange(distribution=[hash[id]], changelogMode=[I])
-   +- TableSourceScan(table=[[default_catalog, default_database, 
insert_only_source]], fields=[id, name], changelogMode=[I])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
++- TableSourceScan(table=[[default_catalog, default_database, 
insert_only_source]], fields=[id, name], changelogMode=[I])
 ]]>
     </Resource>
   </TestCase>
   <TestCase name="testRetractSource">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source 
PARTITION BY id)]]>
+      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(op=[$0], id=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, 
INTEGER id, VARCHAR(2147483647) name)])
+   +- LogicalProject(id=[$0], name=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
retract_source]])
+]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], 
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, 
VARCHAR(2147483647) name)], changelogMode=[I])
-+- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
-   +- TableSourceScan(table=[[default_catalog, default_database, 
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
++- TableSourceScan(table=[[default_catalog, default_database, 
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
 ]]>
     </Resource>
   </TestCase>
   <TestCase name="testUpsertSource">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source 
PARTITION BY id)]]>
+      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE upsert_source)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(op=[$0], id=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, 
INTEGER id, VARCHAR(2147483647) name)])
+   +- LogicalProject(id=[$0], name=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_source]])
+]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], 
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, 
VARCHAR(2147483647) name)], changelogMode=[I])
-+- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
-   +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
-      +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
-         +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_source]], fields=[id, name], changelogMode=[I,UA,D])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
++- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_source]], fields=[id, name], changelogMode=[I,UA,D])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
index f16ea33375f..247f565228b 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
@@ -24,13 +24,10 @@ import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.utils.JoinedRowData;
-import org.apache.flink.table.data.utils.ProjectedRowData;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
-import org.apache.flink.table.functions.TableSemantics;
 import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.ColumnList;
 import org.apache.flink.types.RowKind;
 
@@ -38,19 +35,14 @@ import javax.annotation.Nullable;
 
 import java.util.EnumMap;
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
  * Runtime implementation of {@link BuiltInFunctionDefinitions#TO_CHANGELOG}.
  *
  * <p>Converts each input row into an INSERT-only output row with an operation 
code column. The
- * output schema is {@code [op_column, ...non_partition_columns...]} - the 
framework prepends
- * partition key columns automatically.
+ * output schema is {@code [op_column, ...all_input_columns...]}.
  *
- * <p>Uses {@link ProjectedRowData} for zero-copy projection of non-partition 
columns and {@link
- * JoinedRowData} to combine the op column with the projected input.
+ * <p>Uses {@link JoinedRowData} to combine the op column with the full input 
row.
  */
 @Internal
 public class ToChangelogFunction extends BuiltInProcessTableFunction<RowData> {
@@ -65,10 +57,8 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
                     RowKind.DELETE, "DELETE");
 
     private final Map<RowKind, String> rawOpMap;
-    private final int[] nonPartitionIndices;
 
     private transient Map<RowKind, StringData> opMap;
-    private transient ProjectedRowData projectedInput;
     private transient GenericRowData opRow;
     private transient JoinedRowData output;
 
@@ -77,18 +67,6 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         super(BuiltInFunctionDefinitions.TO_CHANGELOG, context);
         final CallContext callContext = context.getCallContext();
 
-        final TableSemantics semantics =
-                callContext
-                        .getTableSemantics(0)
-                        .orElseThrow(() -> new IllegalStateException("Table 
argument expected."));
-        final int[] partitionKeys = semantics.partitionByColumns();
-        final Set<Integer> partitionKeySet =
-                
IntStream.of(partitionKeys).boxed().collect(Collectors.toSet());
-
-        final RowType inputType = (RowType) 
semantics.dataType().getLogicalType();
-        this.nonPartitionIndices =
-                buildNonPartitionIndices(inputType.getFieldCount(), 
partitionKeySet);
-
         final Map<String, String> opMapping =
                 callContext.getArgumentValue(2, Map.class).orElse(null);
         this.rawOpMap = buildOpMap(opMapping);
@@ -99,16 +77,10 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         super.open(context);
         opMap = new EnumMap<>(RowKind.class);
         rawOpMap.forEach((kind, code) -> opMap.put(kind, 
StringData.fromString(code)));
-        projectedInput = ProjectedRowData.from(nonPartitionIndices);
         opRow = new GenericRowData(1);
         output = new JoinedRowData();
     }
 
-    private static int[] buildNonPartitionIndices(
-            final int fieldCount, final Set<Integer> partitionKeySet) {
-        return IntStream.range(0, fieldCount).filter(i -> 
!partitionKeySet.contains(i)).toArray();
-    }
-
     /**
      * Builds a RowKind-to-output-code map. Keys may be comma-separated (e.g., 
"INSERT,
      * UPDATE_AFTER") to map multiple RowKinds to the same output code.
@@ -138,7 +110,6 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         }
 
         opRow.setField(0, opCode);
-        projectedInput.replaceRow(input);
-        collect(output.replace(opRow, projectedInput));
+        collect(output.replace(opRow, input));
     }
 }


Reply via email to