This is an automated email from the ASF dual-hosted git repository.

gustavodemorais pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4490b3074e6 [FLINK-39258][table] Extend restore tests for TO_CHANGELOG 
and FROM_CHANGELOG
4490b3074e6 is described below

commit 4490b3074e67a6197177cacddde4149bd6597aee
Author: Gustavo de Morais <[email protected]>
AuthorDate: Wed Jun 3 17:03:29 2026 +0200

    [FLINK-39258][table] Extend restore tests for TO_CHANGELOG and 
FROM_CHANGELOG
    
    This closes #28278.
---
 .../ToChangelogOutputTypeStrategyTest.java         |  60 +++++-
 .../exec/stream/FromChangelogRestoreTest.java      |   5 +-
 .../exec/stream/FromChangelogTestPrograms.java     |  65 +++++++
 .../nodes/exec/stream/ToChangelogRestoreTest.java  |   7 +-
 .../nodes/exec/stream/ToChangelogTestPrograms.java | 132 +++++++++++++
 ...rom-changelog-retract-partition-by-restore.json | 152 +++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 12564 bytes
 ...from-changelog-upsert-partition-by-restore.json | 183 ++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 9805 bytes
 .../to-changelog-retract-partition-by-restore.json | 157 ++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 13814 bytes
 ...g-retract-produces-partial-deletes-restore.json | 135 ++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 8271 bytes
 .../to-changelog-upsert-partition-by-restore.json  | 207 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 18497 bytes
 .../plan/to-changelog-upsert-restore.json          | 185 ++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 12994 bytes
 17 files changed, 1285 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java
index e912370ef00..8c06c26d7bb 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java
@@ -33,7 +33,12 @@ import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeS
 import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_PRODUCES_FULL_DELETES;
 import static 
org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_TABLE;
 
-/** Tests for {@link ToChangelogTypeStrategy#OUTPUT_TYPE_STRATEGY}. */
+/**
+ * Tests for {@link ToChangelogTypeStrategy#OUTPUT_TYPE_STRATEGY}.
+ *
+ * <p>For background on row vs. set semantics and how PARTITION BY columns are 
handled, see the
+ * Process Table Functions page in the Flink documentation.
+ */
 class ToChangelogOutputTypeStrategyTest extends TypeStrategiesTestBase {
 
     private static final DataType TABLE_TYPE_NOT_NULL_SCORE =
@@ -47,6 +52,10 @@ class ToChangelogOutputTypeStrategyTest extends 
TypeStrategiesTestBase {
 
     @Override
     protected Stream<TestSpec> testData() {
+        return Stream.concat(rowSemantics(), setSemantics());
+    }
+
+    private static Stream<TestSpec> rowSemantics() {
         return Stream.of(
                 TestSpec.forStrategy(
                                 "produces_full_deletes=true preserves NOT NULL 
on input columns",
@@ -107,4 +116,53 @@ class ToChangelogOutputTypeStrategyTest extends 
TypeStrategiesTestBase {
                                                 DataTypes.FIELD("score", 
DataTypes.BIGINT()))
                                         .notNull()));
     }
+
+    private static Stream<TestSpec> setSemantics() {
+        return Stream.of(
+                TestSpec.forStrategy(
+                                "produces_full_deletes=true in set semantics 
preserves NOT NULL on non-partition columns",
+                                TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
+                        .inputTypes(
+                                TABLE_TYPE_NOT_NULL_SCORE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(
+                                ARG_TABLE,
+                                new TableSemanticsMock(
+                                        TABLE_TYPE_NOT_NULL_SCORE,
+                                        new int[] {0},
+                                        new int[0],
+                                        -1,
+                                        null,
+                                        List.of(new int[] {0})))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, null)
+                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true)
+                        .expectDataType(
+                                DataTypes.ROW(
+                                                DataTypes.FIELD("op", 
DataTypes.STRING()),
+                                                DataTypes.FIELD(
+                                                        "score", 
DataTypes.BIGINT().notNull()))
+                                        .notNull()),
+                TestSpec.forStrategy(
+                                "produces_full_deletes=false in set semantics 
widens non-partition-key columns",
+                                TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
+                        .inputTypes(
+                                TABLE_TYPE_NOT_NULL_SCORE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(
+                                ARG_TABLE,
+                                new TableSemanticsMock(
+                                        TABLE_TYPE_NOT_NULL_SCORE,
+                                        new int[] {0},
+                                        new int[0],
+                                        -1,
+                                        null,
+                                        List.of(new int[] {0})))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, null)
+                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, false)
+                        .expectDataType(
+                                DataTypes.ROW(
+                                                DataTypes.FIELD("op", 
DataTypes.STRING()),
+                                                DataTypes.FIELD("score", 
DataTypes.BIGINT()))
+                                        .notNull()));
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
index 7cd25c35ab5..96fd1450928 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogRestoreTest.java
@@ -35,6 +35,9 @@ public class FromChangelogRestoreTest extends RestoreTestBase 
{
 
     @Override
     public List<TableTestProgram> programs() {
-        return List.of(FromChangelogTestPrograms.RETRACT_RESTORE);
+        return List.of(
+                FromChangelogTestPrograms.RETRACT_RESTORE,
+                FromChangelogTestPrograms.RETRACT_PARTITION_BY_RESTORE,
+                FromChangelogTestPrograms.UPSERT_PARTITION_BY_RESTORE);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
index 2ab92f052d7..5fa3956de3b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
@@ -391,6 +391,71 @@ public class FromChangelogTestPrograms {
                                     + "input => TABLE cdc_stream)")
                     .build();
 
+    public static final TableTestProgram RETRACT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "from-changelog-retract-partition-by-restore",
+                            "FROM_CHANGELOG with PARTITION BY producing a 
retract changelog")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema("name STRING", "id INT", "op 
STRING")
+                                    .producedBeforeRestore(
+                                            Row.of("Alice", 1, "INSERT"),
+                                            Row.of("Bob", 2, "INSERT"))
+                                    .producedAfterRestore(
+                                            Row.of("Alice", 1, 
"UPDATE_BEFORE"),
+                                            Row.of("Alice2", 1, 
"UPDATE_AFTER"),
+                                            Row.of("Bob", 2, "DELETE"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"))
+                                    .consumedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
1, "Alice"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
1, "Alice2"),
+                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream PARTITION BY 
id)")
+                    .build();
+
+    public static final TableTestProgram UPSERT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "from-changelog-upsert-partition-by-restore",
+                            "FROM_CHANGELOG with PARTITION BY producing an 
upsert changelog "
+                                    + "(op_mapping without UPDATE_BEFORE)")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema("name STRING", "id INT", "op 
STRING")
+                                    .producedBeforeRestore(
+                                            Row.of("Alice", 1, "INSERT"),
+                                            Row.of("Bob", 2, "INSERT"))
+                                    .producedAfterRestore(
+                                            Row.of("Alice2", 1, 
"UPDATE_AFTER"),
+                                            Row.of("Bob", 2, "DELETE"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT PRIMARY KEY NOT 
ENFORCED", "name STRING")
+                                    .consumedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"))
+                                    .consumedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
1, "Alice2"),
+                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream PARTITION BY 
id, "
+                                    + "op_mapping => MAP["
+                                    + "'INSERT', 'INSERT', "
+                                    + "'UPDATE_AFTER', 'UPDATE_AFTER', "
+                                    + "'DELETE', 'DELETE'])")
+                    .build();
+
     // 
--------------------------------------------------------------------------------------------
     // Error validation tests
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
index 5b7258946d7..00205798b65 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogRestoreTest.java
@@ -35,6 +35,11 @@ public class ToChangelogRestoreTest extends RestoreTestBase {
 
     @Override
     public List<TableTestProgram> programs() {
-        return List.of(ToChangelogTestPrograms.RETRACT_RESTORE);
+        return List.of(
+                ToChangelogTestPrograms.RETRACT_RESTORE,
+                ToChangelogTestPrograms.UPSERT_RESTORE,
+                ToChangelogTestPrograms.RETRACT_PARTITION_BY_RESTORE,
+                ToChangelogTestPrograms.UPSERT_PARTITION_BY_RESTORE,
+                
ToChangelogTestPrograms.RETRACT_PRODUCES_PARTIAL_DELETES_RESTORE);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index 71731109eda..a14b259b068 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -123,6 +123,138 @@ public class ToChangelogTestPrograms {
                     .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
                     .build();
 
+    public static final TableTestProgram UPSERT_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-restore",
+                            "TO_CHANGELOG over an upsert source, including 
ChangelogNormalize state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[INSERT, Alice, 10]", 
"+I[INSERT, Bob, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, 20]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
+                    .build();
+
+    public static final TableTestProgram RETRACT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-retract-partition-by-restore",
+                            "TO_CHANGELOG over a retract source with PARTITION 
BY")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING NOT NULL", "op 
STRING", "score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[Alice, INSERT, 10]", "+I[Bob, 
INSERT, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name)")
+                    .build();
+
+    public static final TableTestProgram UPSERT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-partition-by-restore",
+                            "TO_CHANGELOG over an upsert source with PARTITION 
BY, including "
+                                    + "ChangelogNormalize state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING NOT NULL", "op 
STRING", "score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[Alice, INSERT, 10]", "+I[Bob, 
INSERT, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name)")
+                    .build();
+
+    public static final TableTestProgram 
RETRACT_PRODUCES_PARTIAL_DELETES_RESTORE =
+            TableTestProgram.of(
+                            
"to-changelog-retract-produces-partial-deletes-restore",
+                            "TO_CHANGELOG with produces_full_deletes=false 
over a NOT NULL input "
+                                    + "column; non-key columns are widened to 
nullable on DELETE")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED",
+                                            "score BIGINT NOT NULL")
+                                    .addMode(ChangelogMode.all())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[INSERT, Alice, 10]", 
"+I[INSERT, Bob, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => false)")
+                    .build();
+
     /** Partitions by a non-leading column ({@code id}, the middle column of 
three). */
     public static final TableTestProgram RETRACT_PARTITION_BY =
             TableTestProgram.of(
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-partition-by-restore/plan/from-changelog-retract-partition-by-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-partition-by-restore/plan/from-changelog-retract-partition-by-restore.json
new file mode 100644
index 00000000000..4de3e05d438
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-partition-by-restore/plan/from-changelog-retract-partition-by-restore.json
@@ -0,0 +1,152 @@
+{
+  "flinkVersion" : "2.4",
+  "nodes" : [ {
+    "id" : 4,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`cdc_stream`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "op",
+              "dataType" : "VARCHAR(2147483647)"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `id` INT, `op` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, cdc_stream]], fields=[name, id, op])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `id` INT, `op` 
VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[id]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-process-table-function_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
+    "description" : "ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0) 
PARTITION BY($1), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], 
uid=[FROM_CHANGELOG], select=[id,name], rowType=[RecordType(INTEGER id, 
VARCHAR(2147483647) name)])",
+    "uid" : "FROM_CHANGELOG",
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$FROM_CHANGELOG$1",
+      "operands" : [ {
+        "kind" : "TABLE_ARG_CALL",
+        "inputIndex" : 0,
+        "partitionKeys" : [ 1 ],
+        "type" : "ROW<`name` VARCHAR(2147483647), `id` INT, `op` 
VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "VARCHAR(2147483647)"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "ROW<`id` INT, `name` VARCHAR(2147483647)> NOT NULL"
+    },
+    "inputChangelogModes" : [ [ "INSERT" ] ],
+    "outputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputUpsertKeys" : [ [ ] ]
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "upsertMaterializeStrategy" : "ADAPTIVE",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[id, name])"
+  } ],
+  "edges" : [ {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-partition-by-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-partition-by-restore/savepoint/_metadata
new file mode 100644
index 00000000000..422977ab9b2
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-retract-partition-by-restore/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-upsert-partition-by-restore/plan/from-changelog-upsert-partition-by-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-upsert-partition-by-restore/plan/from-changelog-upsert-partition-by-restore.json
new file mode 100644
index 00000000000..d771bad2480
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-upsert-partition-by-restore/plan/from-changelog-upsert-partition-by-restore.json
@@ -0,0 +1,183 @@
+{
+  "flinkVersion" : "2.4",
+  "nodes" : [ {
+    "id" : 8,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`cdc_stream`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "id",
+              "dataType" : "INT"
+            }, {
+              "name" : "op",
+              "dataType" : "VARCHAR(2147483647)"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `id` INT, `op` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, cdc_stream]], fields=[name, id, op])"
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `id` INT, `op` 
VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[id]])"
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-process-table-function_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
+    "description" : "ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0) 
PARTITION BY($1), DEFAULT(), MAP(_UTF-16LE'INSERT':VARCHAR(12) CHARACTER SET 
\"UTF-16LE\", _UTF-16LE'INSERT':VARCHAR(12) CHARACTER SET \"UTF-16LE\", 
_UTF-16LE'UPDATE_AFTER':VARCHAR(12) CHARACTER SET \"UTF-16LE\", 
_UTF-16LE'UPDATE_AFTER':VARCHAR(12) CHARACTER SET \"UTF-16LE\", 
_UTF-16LE'DELETE':VARCHAR(12) CHARACTER SET \"UTF-16LE\", 
_UTF-16LE'DELETE':VARCHAR(12) CHARACTER SET \"UTF-16LE\"), DEFAULT(), DEFAULT( 
[...]
+    "uid" : "FROM_CHANGELOG",
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$FROM_CHANGELOG$1",
+      "operands" : [ {
+        "kind" : "TABLE_ARG_CALL",
+        "inputIndex" : 0,
+        "partitionKeys" : [ 1 ],
+        "type" : "ROW<`name` VARCHAR(2147483647), `id` INT, `op` 
VARCHAR(2147483647)> NOT NULL"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$MAP$1",
+        "operands" : [ {
+          "kind" : "LITERAL",
+          "value" : "INSERT",
+          "type" : "VARCHAR(12) NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : "INSERT",
+          "type" : "VARCHAR(12) NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : "UPDATE_AFTER",
+          "type" : "VARCHAR(12) NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : "UPDATE_AFTER",
+          "type" : "VARCHAR(12) NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : "DELETE",
+          "type" : "VARCHAR(12) NOT NULL"
+        }, {
+          "kind" : "LITERAL",
+          "value" : "DELETE",
+          "type" : "VARCHAR(12) NOT NULL"
+        } ],
+        "type" : "MAP<VARCHAR(12) NOT NULL, VARCHAR(12) NOT NULL> NOT NULL"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "VARCHAR(2147483647)"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "ROW<`id` INT, `name` VARCHAR(2147483647)> NOT NULL"
+    },
+    "inputChangelogModes" : [ [ "INSERT" ] ],
+    "outputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ],
+    "inputUpsertKeys" : [ [ ] ]
+  }, {
+    "id" : 11,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "id",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "primaryKey" : {
+              "name" : "PK_id",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "id" ]
+            }
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ],
+    "upsertMaterializeStrategy" : "ADAPTIVE",
+    "inputUpsertKey" : [ 0 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[id, name])"
+  } ],
+  "edges" : [ {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-upsert-partition-by-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-upsert-partition-by-restore/savepoint/_metadata
new file mode 100644
index 00000000000..2914912731c
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/from-changelog-upsert-partition-by-restore/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-partition-by-restore/plan/to-changelog-retract-partition-by-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-partition-by-restore/plan/to-changelog-retract-partition-by-restore.json
new file mode 100644
index 00000000000..76741ae6d82
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-partition-by-restore/plan/to-changelog-retract-partition-by-restore.json
@@ -0,0 +1,157 @@
+{
+  "flinkVersion" : "2.4",
+  "nodes" : [ {
+    "id" : 9,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "score",
+              "dataType" : "BIGINT"
+            } ],
+            "primaryKey" : {
+              "name" : "PK_name",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "name" ]
+            }
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[name, score])"
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+    "description" : "Exchange(distribution=[hash[name]])"
+  }, {
+    "id" : 11,
+    "type" : "stream-exec-process-table-function_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `op` 
VARCHAR(2147483647), `score` BIGINT>",
+    "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) 
PARTITION BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], 
uid=[TO_CHANGELOG], select=[name,op,score], 
rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) op, BIGINT 
score)])",
+    "uid" : "TO_CHANGELOG",
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$TO_CHANGELOG$1",
+      "operands" : [ {
+        "kind" : "TABLE_ARG_CALL",
+        "inputIndex" : 0,
+        "partitionKeys" : [ 0 ],
+        "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT> NOT 
NULL"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `op` 
VARCHAR(2147483647), `score` BIGINT> NOT NULL"
+    },
+    "inputChangelogModes" : [ [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ] ],
+    "outputChangelogMode" : [ "INSERT" ],
+    "inputUpsertKeys" : [ [ [ 0 ] ] ]
+  }, {
+    "id" : 12,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "op",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "score",
+              "dataType" : "BIGINT"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "ADAPTIVE",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `op` 
VARCHAR(2147483647), `score` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[name, op, score])"
+  } ],
+  "edges" : [ {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-partition-by-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-partition-by-restore/savepoint/_metadata
new file mode 100644
index 00000000000..6c2826c7177
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-partition-by-restore/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-produces-partial-deletes-restore/plan/to-changelog-retract-produces-partial-deletes-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-produces-partial-deletes-restore/plan/to-changelog-retract-produces-partial-deletes-restore.json
new file mode 100644
index 00000000000..83e5963980c
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-produces-partial-deletes-restore/plan/to-changelog-retract-produces-partial-deletes-restore.json
@@ -0,0 +1,135 @@
+{
+  "flinkVersion" : "2.4",
+  "nodes" : [ {
+    "id" : 19,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "score",
+              "dataType" : "BIGINT NOT NULL"
+            } ],
+            "primaryKey" : {
+              "name" : "PK_name",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "name" ]
+            }
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT 
NOT NULL>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[name, score])"
+  }, {
+    "id" : 20,
+    "type" : "stream-exec-process-table-function_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647), 
`score` BIGINT>",
+    "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), 
DEFAULT(), DEFAULT(), false, DEFAULT(), DEFAULT())], uid=[null], 
select=[op,name,score], rowType=[RecordType(VARCHAR(2147483647) op, 
VARCHAR(2147483647) name, BIGINT score)])",
+    "uid" : null,
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$TO_CHANGELOG$1",
+      "operands" : [ {
+        "kind" : "TABLE_ARG_CALL",
+        "inputIndex" : 0,
+        "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT NOT 
NULL> NOT NULL"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+      }, {
+        "kind" : "LITERAL",
+        "value" : false,
+        "type" : "BOOLEAN NOT NULL"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647), 
`score` BIGINT> NOT NULL"
+    },
+    "inputChangelogModes" : [ [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ] ],
+    "outputChangelogMode" : [ "INSERT" ],
+    "inputUpsertKeys" : [ [ [ 0 ] ] ]
+  }, {
+    "id" : 21,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "op",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "score",
+              "dataType" : "BIGINT"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "ADAPTIVE",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647), 
`score` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[op, name, score])"
+  } ],
+  "edges" : [ {
+    "source" : 19,
+    "target" : 20,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 20,
+    "target" : 21,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-produces-partial-deletes-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-produces-partial-deletes-restore/savepoint/_metadata
new file mode 100644
index 00000000000..fad2d14492d
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-produces-partial-deletes-restore/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-partition-by-restore/plan/to-changelog-upsert-partition-by-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-partition-by-restore/plan/to-changelog-upsert-partition-by-restore.json
new file mode 100644
index 00000000000..1d3991ca886
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-partition-by-restore/plan/to-changelog-upsert-partition-by-restore.json
@@ -0,0 +1,207 @@
+{
+  "flinkVersion" : "2.4",
+  "nodes" : [ {
+    "id" : 13,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "score",
+              "dataType" : "BIGINT"
+            } ],
+            "primaryKey" : {
+              "name" : "PK_name",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "name" ]
+            }
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[name, score])"
+  }, {
+    "id" : 14,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+    "description" : "Exchange(distribution=[hash[name]])"
+  }, {
+    "id" : 15,
+    "type" : "stream-exec-changelog-normalize_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "uniqueKeys" : [ 0 ],
+    "generateUpdateBefore" : true,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "changelogNormalizeState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+    "description" : "ChangelogNormalize(key=[name])"
+  }, {
+    "id" : 16,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+    "description" : "Exchange(distribution=[hash[name]])"
+  }, {
+    "id" : 17,
+    "type" : "stream-exec-process-table-function_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `op` 
VARCHAR(2147483647), `score` BIGINT>",
+    "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) 
PARTITION BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], 
uid=[TO_CHANGELOG], select=[name,op,score], 
rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) op, BIGINT 
score)])",
+    "uid" : "TO_CHANGELOG",
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$TO_CHANGELOG$1",
+      "operands" : [ {
+        "kind" : "TABLE_ARG_CALL",
+        "inputIndex" : 0,
+        "partitionKeys" : [ 0 ],
+        "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT> NOT 
NULL"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `op` 
VARCHAR(2147483647), `score` BIGINT> NOT NULL"
+    },
+    "inputChangelogModes" : [ [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ] ],
+    "outputChangelogMode" : [ "INSERT" ],
+    "inputUpsertKeys" : [ [ [ 0 ] ] ]
+  }, {
+    "id" : 18,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "op",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "score",
+              "dataType" : "BIGINT"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "ADAPTIVE",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `op` 
VARCHAR(2147483647), `score` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[name, op, score])"
+  } ],
+  "edges" : [ {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 16,
+    "target" : 17,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 17,
+    "target" : 18,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-partition-by-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-partition-by-restore/savepoint/_metadata
new file mode 100644
index 00000000000..50eef475775
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-partition-by-restore/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-restore/plan/to-changelog-upsert-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-restore/plan/to-changelog-upsert-restore.json
new file mode 100644
index 00000000000..aa61aa93f01
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-restore/plan/to-changelog-upsert-restore.json
@@ -0,0 +1,185 @@
+{
+  "flinkVersion" : "2.4",
+  "nodes" : [ {
+    "id" : 4,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "score",
+              "dataType" : "BIGINT"
+            } ],
+            "primaryKey" : {
+              "name" : "PK_name",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "name" ]
+            }
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[name, score])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+    "description" : "Exchange(distribution=[hash[name]])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-changelog-normalize_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "uniqueKeys" : [ 0 ],
+    "generateUpdateBefore" : true,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "changelogNormalizeState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT>",
+    "description" : "ChangelogNormalize(key=[name])"
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-process-table-function_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647) 
NOT NULL, `score` BIGINT>",
+    "description" : "ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], 
select=[op,name,score], rowType=[RecordType(VARCHAR(2147483647) op, 
VARCHAR(2147483647) name, BIGINT score)])",
+    "uid" : null,
+    "functionCall" : {
+      "kind" : "CALL",
+      "internalName" : "$TO_CHANGELOG$1",
+      "operands" : [ {
+        "kind" : "TABLE_ARG_CALL",
+        "inputIndex" : 0,
+        "type" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` BIGINT> NOT 
NULL"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "BOOLEAN"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "DESCRIPTOR"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647) NOT 
NULL, `score` BIGINT> NOT NULL"
+    },
+    "inputChangelogModes" : [ [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ] ],
+    "outputChangelogMode" : [ "INSERT" ],
+    "inputUpsertKeys" : [ [ [ 0 ] ] ]
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "op",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "score",
+              "dataType" : "BIGINT"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647) 
NOT NULL, `score` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[op, name, score])"
+  } ],
+  "edges" : [ {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-restore/savepoint/_metadata
new file mode 100644
index 00000000000..af7f44744e7
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-upsert-restore/savepoint/_metadata
 differ

Reply via email to