This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bfcf13b86aa25a2b787e0de812fd0f881fb84975
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Tue Dec 2 15:26:19 2025 +0100

    [FLINK-38765] Fix persisted metadata handling in sink
---
 .../table/tests/jsonplan/testGetJsonPlan.out       |   2 +-
 .../flink/table/test/program/TableTestProgram.java |  15 +++
 .../table/planner/connectors/DynamicSinkUtils.java |   2 +-
 .../plan/nodes/exec/batch/BatchExecSink.java       |  26 ++--
 .../plan/nodes/exec/common/CommonExecSink.java     |  23 +++-
 .../plan/nodes/exec/stream/StreamExecSink.java     |  29 ++++
 .../plan/nodes/exec/TransformationsTest.java       |   6 +-
 .../nodes/exec/common/TableSinkTestPrograms.java   |  75 +++++++++++
 .../nodes/exec/stream/TableSinkRestoreTest.java    |  13 +-
 .../plan/nodes/exec/testutils/RestoreTestBase.java |  25 +++-
 .../test/resources/jsonplan/testGetJsonPlan.out    |   2 +-
 .../jsonplan/testInvalidTypeJsonPlan.json          |   2 +-
 .../resources/jsonplan/testSinkTableWithHints.out  |   2 +-
 .../jsonplan/testSourceTableWithHints.out          |   2 +-
 .../testPythonCalc.out                             |   2 +-
 .../testPythonFunctionInWhereClause.out            |   2 +-
 .../testJoinWithFilter.out                         |   2 +-
 .../testPythonTableFunction.out                    |   2 +-
 .../tesPythonAggCallsWithGroupBy.out               |   2 +-
 .../testEventTimeHopWindow.out                     |   2 +-
 .../testEventTimeSessionWindow.out                 |   2 +-
 .../testEventTimeTumbleWindow.out                  |   2 +-
 .../testProcTimeHopWindow.out                      |   2 +-
 .../testProcTimeSessionWindow.out                  |   2 +-
 .../testProcTimeTumbleWindow.out                   |   2 +-
 .../testProcTimeBoundedNonPartitionedRangeOver.out |   2 +-
 .../testProcTimeBoundedPartitionedRangeOver.out    |   2 +-
 ...undedPartitionedRowsOverWithBuiltinProctime.out |   2 +-
 .../testProcTimeUnboundedPartitionedRangeOver.out  |   2 +-
 .../testRowTimeBoundedPartitionedRowsOver.out      |   2 +-
 .../testJsonPlanWithTableHints.out                 |   2 +-
 ...k-upsert-materializer-writable-metadata-v1.json | 149 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 8665 bytes
 ...k-upsert-materializer-writable-metadata-v1.json | 149 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 9499 bytes
 ...sink-upsert-materializer-writable-metadata.json | 149 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 9502 bytes
 .../sink-bucketing_hash-with-keys-with-count.json  |  51 +++----
 .../savepoint/_metadata                            | Bin 0 -> 8439 bytes
 ...nk-bucketing_range_with_keys_without_count.json |  50 +++----
 .../savepoint/_metadata                            | Bin 0 -> 8437 bytes
 .../plan/sink-bucketing_with-count.json}           |  44 +++---
 .../sink-bucketing_with-count/savepoint/_metadata  | Bin 0 -> 8435 bytes
 .../plan/sink-bucketing_with-keys-and-count.json   |  51 +++----
 .../savepoint/_metadata                            | Bin 0 -> 8437 bytes
 .../plan/sink-ndf-primary-key.json}                |  94 ++++++-------
 .../sink-ndf-primary-key/savepoint/_metadata       | Bin 0 -> 5212 bytes
 .../sink-overwrite/plan/sink-overwrite.json        |  46 +++----
 .../sink-overwrite/savepoint/_metadata             | Bin 0 -> 8441 bytes
 .../plan/sink-partial-insert.json}                 | 111 ++++++++-------
 .../sink-partial-insert/savepoint/_metadata        | Bin 0 -> 11115 bytes
 .../sink-partition/plan/sink-partition.json}       | 106 +++++++--------
 .../sink-partition/savepoint/_metadata             | Bin 0 -> 9561 bytes
 .../sink-upsert/plan/sink-upsert.json              |  57 ++++----
 .../sink-upsert/savepoint/_metadata                | Bin 0 -> 5102 bytes
 .../plan/sink-writing-metadata.json                |  51 +++----
 .../sink-writing-metadata/savepoint/_metadata      | Bin 0 -> 8393 bytes
 57 files changed, 992 insertions(+), 374 deletions(-)

diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out 
b/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
index 880fb2cf11d..171e8fefad6 100644
--- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
+++ b/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
@@ -30,7 +30,7 @@
     "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])"
   }, {
     "id" : 0,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
index ea3925ab06a..37dbfbb5db9 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/test/program/TableTestProgram.java
@@ -114,6 +114,21 @@ public class TableTestProgram {
         return id;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        TableTestProgram that = (TableTestProgram) o;
+        return id.equals(that.id);
+    }
+
+    @Override
+    public int hashCode() {
+        return id.hashCode();
+    }
+
     /**
      * Entrypoint for a {@link TableTestProgram} that forces an identifier and 
description of the
      * test program.
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 411470f5dbc..3733e7a24df 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -1326,7 +1326,7 @@ public final class DynamicSinkUtils {
      *
      * <p>The format looks as follows: {@code PHYSICAL COLUMNS + PERSISTED 
METADATA COLUMNS}
      */
-    private static RowType createConsumedType(ResolvedSchema schema, 
DynamicTableSink sink) {
+    public static RowType createConsumedType(ResolvedSchema schema, 
DynamicTableSink sink) {
         final Map<String, DataType> metadataMap = extractMetadataMap(sink);
 
         final Stream<RowField> physicalFields =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
index c9a6e8c3c63..8a1dd04bcfe 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
@@ -124,7 +124,7 @@ public class BatchExecSink extends CommonExecSink 
implements BatchExecNode<Objec
     }
 
     @Override
-    protected RowType getPhysicalRowType(ResolvedSchema schema) {
+    protected final RowType getPersistedRowType(ResolvedSchema schema, 
DynamicTableSink tableSink) {
         // row-level modification may only write partial columns,
         // so we try to prune the RowType to get the real RowType containing
         // the physical columns to be written
@@ -132,16 +132,20 @@ public class BatchExecSink extends CommonExecSink 
implements BatchExecNode<Objec
             for (SinkAbilitySpec sinkAbilitySpec : 
tableSinkSpec.getSinkAbilities()) {
                 if (sinkAbilitySpec instanceof RowLevelUpdateSpec) {
                     RowLevelUpdateSpec rowLevelUpdateSpec = 
(RowLevelUpdateSpec) sinkAbilitySpec;
-                    return getPhysicalRowType(
-                            schema, 
rowLevelUpdateSpec.getRequiredPhysicalColumnIndices());
+                    return getPersistedRowType(
+                            schema,
+                            
rowLevelUpdateSpec.getRequiredPhysicalColumnIndices(),
+                            tableSink);
                 } else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) {
                     RowLevelDeleteSpec rowLevelDeleteSpec = 
(RowLevelDeleteSpec) sinkAbilitySpec;
-                    return getPhysicalRowType(
-                            schema, 
rowLevelDeleteSpec.getRequiredPhysicalColumnIndices());
+                    return getPersistedRowType(
+                            schema,
+                            
rowLevelDeleteSpec.getRequiredPhysicalColumnIndices(),
+                            tableSink);
                 }
             }
         }
-        return (RowType) schema.toPhysicalRowDataType().getLogicalType();
+        return super.getPersistedRowType(schema, tableSink);
     }
 
     @Override
@@ -183,12 +187,18 @@ public class BatchExecSink extends CommonExecSink 
implements BatchExecNode<Objec
     }
 
     /** Get the physical row type with given column indices. */
-    private RowType getPhysicalRowType(ResolvedSchema schema, int[] 
columnIndices) {
+    private RowType getPersistedRowType(
+            ResolvedSchema schema, int[] columnIndices, DynamicTableSink sink) 
{
         List<Column> columns = schema.getColumns();
         List<Column> requireColumns = new ArrayList<>();
         for (int columnIndex : columnIndices) {
             requireColumns.add(columns.get(columnIndex));
         }
-        return (RowType) 
ResolvedSchema.of(requireColumns).toPhysicalRowDataType().getLogicalType();
+        return super.getPersistedRowType(ResolvedSchema.of(requireColumns), 
sink);
+    }
+
+    @Override
+    protected final boolean legacyPhysicalTypeEnabled() {
+        return false;
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 86866a98559..ad333589232 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -52,6 +52,7 @@ import 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
 import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
 import org.apache.flink.table.connector.sink.legacy.SinkFunctionProvider;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.connectors.DynamicSinkUtils;
 import org.apache.flink.table.planner.lineage.TableLineageUtils;
 import org.apache.flink.table.planner.lineage.TableSinkLineageVertex;
 import org.apache.flink.table.planner.lineage.TableSinkLineageVertexImpl;
@@ -151,8 +152,8 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
                 tableSink.getSinkRuntimeProvider(
                         new SinkRuntimeProviderContext(
                                 isBounded, tableSinkSpec.getTargetColumns()));
-        final RowType physicalRowType = getPhysicalRowType(schema);
-        final int[] primaryKeys = getPrimaryKeyIndices(physicalRowType, 
schema);
+        final RowType persistedRowType = getPersistedRowType(schema, 
tableSink);
+        final int[] primaryKeys = getPrimaryKeyIndices(persistedRowType, 
schema);
         final int sinkParallelism = deriveSinkParallelism(inputTransform, 
runtimeProvider);
         sinkParallelismConfigured = isParallelismConfigured(runtimeProvider);
         final int inputParallelism = inputTransform.getParallelism();
@@ -190,7 +191,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         final boolean needMaterialization = !inputInsertOnly && 
upsertMaterialize;
 
         Transformation<RowData> sinkTransform =
-                applyConstraintValidations(inputTransform, config, 
physicalRowType);
+                applyConstraintValidations(inputTransform, config, 
persistedRowType);
 
         if (hasPk) {
             sinkTransform =
@@ -212,7 +213,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
                             sinkParallelism,
                             config,
                             classLoader,
-                            physicalRowType,
+                            persistedRowType,
                             inputUpsertKey);
         }
 
@@ -545,8 +546,16 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
                 .orElse(new int[0]);
     }
 
-    protected RowType getPhysicalRowType(ResolvedSchema schema) {
-        return (RowType) schema.toPhysicalRowDataType().getLogicalType();
+    /**
+     * The method recreates the type of the incoming record from the sink's 
schema. It puts the
+     * physical columns first, followed by persisted metadata columns.
+     */
+    protected RowType getPersistedRowType(ResolvedSchema schema, 
DynamicTableSink sink) {
+        if (legacyPhysicalTypeEnabled()) {
+            return (RowType) schema.toPhysicalRowDataType().getLogicalType();
+        } else {
+            return DynamicSinkUtils.createConsumedType(schema, sink);
+        }
     }
 
     /**
@@ -574,4 +583,6 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         }
         return Optional.empty();
     }
+
+    protected abstract boolean legacyPhysicalTypeEnabled();
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index e1052a96533..c6947403d31 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -104,6 +104,30 @@ import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXE
         },
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
+// Version 2: Fixed the data type used for creating constraint enforcer and 
sink upsert
+// materializer. Since this version the sink works correctly with persisted 
metadata columns.
+// We introduced a new version, because statements that were never rolling 
back to a value from
+// state could run succesfully. We allow those jobs to be upgraded. Without a 
new versions such jobs
+// would fail on restore, because the state serializer would differ
+@ExecNodeMetadata(
+        name = "stream-exec-sink",
+        version = 2,
+        consumedOptions = {
+            "table.exec.sink.not-null-enforcer",
+            "table.exec.sink.type-length-enforcer",
+            "table.exec.sink.upsert-materialize",
+            "table.exec.sink.keyed-shuffle",
+            "table.exec.sink.rowtime-inserter"
+        },
+        producedTransformations = {
+            CommonExecSink.CONSTRAINT_VALIDATOR_TRANSFORMATION,
+            CommonExecSink.PARTITIONER_TRANSFORMATION,
+            CommonExecSink.UPSERT_MATERIALIZE_TRANSFORMATION,
+            CommonExecSink.TIMESTAMP_INSERTER_TRANSFORMATION,
+            CommonExecSink.SINK_TRANSFORMATION
+        },
+        minPlanVersion = FlinkVersion.v2_3,
+        minStateVersion = FlinkVersion.v2_3)
 public class StreamExecSink extends CommonExecSink implements 
StreamExecNode<Object> {
     private static final Logger LOG = 
LoggerFactory.getLogger(StreamExecSink.class);
 
@@ -391,4 +415,9 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
                 throw new IllegalArgumentException("Unsupported strategy: " + 
strategy);
         }
     }
+
+    @Override
+    protected final boolean legacyPhysicalTypeEnabled() {
+        return getVersion() == 1;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
index 007bfef4734..49609c1d439 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
@@ -188,8 +188,8 @@ class TransformationsTest {
                         config.set(TABLE_EXEC_UID_FORMAT, 
"<id>_<type>_<version>_<transformation>"),
                 json -> {},
                 env -> planFromCurrentFlinkValues(env).asJsonString(),
-                "\\d+_stream-exec-sink_1_sink",
-                "\\d+_stream-exec-sink_1_constraint-validator",
+                "\\d+_stream-exec-sink_2_sink",
+                "\\d+_stream-exec-sink_2_constraint-validator",
                 "\\d+_stream-exec-values_1_values");
     }
 
@@ -200,7 +200,7 @@ class TransformationsTest {
                 json ->
                         JsonTestUtils.setExecNodeConfig(
                                 json,
-                                "stream-exec-sink_1",
+                                "stream-exec-sink_2",
                                 TABLE_EXEC_UID_FORMAT.key(),
                                 "my_custom_<transformation>_<id>"),
                 env -> planFromCurrentFlinkValues(env).asJsonString(),
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java
index 7939a672415..b1248a2bb1b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.common;
 
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.TableDistribution;
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
 import org.apache.flink.table.test.program.SinkTestStep;
@@ -28,6 +29,9 @@ import org.apache.flink.types.RowKind;
 
 import java.util.Arrays;
 
+import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND;
+import static 
org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY;
+
 /** {@link TableTestProgram} definitions for testing {@link StreamExecSink}. */
 public class TableSinkTestPrograms {
 
@@ -222,4 +226,75 @@ public class TableSinkTestPrograms {
                                     .build())
                     .runSql("INSERT INTO sink_t SELECT * FROM source_t")
                     .build();
+
+    // The queries could run as long as the value was never rolled back to one 
from state, which is
+    // possible. We validate those can restore and still run
+    public static final TableTestProgram 
INSERT_RETRACT_WITH_WRITABLE_METADATA_FOR_LEGACY_TYPE =
+            getInsertRetractWithWritableMetadata(true);
+
+    public static final TableTestProgram INSERT_RETRACT_WITH_WRITABLE_METADATA 
=
+            getInsertRetractWithWritableMetadata(false);
+
+    private static TableTestProgram getInsertRetractWithWritableMetadata(
+            boolean forLegacyPhysicalType) {
+        final Row producedAfterRestore;
+        final String consumedAfterRestore;
+        if (forLegacyPhysicalType) {
+            producedAfterRestore = Row.ofKind(RowKind.INSERT, "Bob", 7);
+            consumedAfterRestore = "+U[BOB, 7, Bob, 7]";
+        } else {
+            // retract the last record, which should roll back to
+            // the previous state
+            producedAfterRestore = Row.ofKind(RowKind.DELETE, "Bob", 6);
+            consumedAfterRestore = "+U[BOB, 5, Bob, 5]";
+        }
+        return TableTestProgram.of(
+                        
"insert-into-upsert-with-sink-upsert-materializer-writable-metadata"
+                                + (forLegacyPhysicalType ? "-v1" : ""),
+                        "The query requires a sink upsert materializer and the 
sink"
+                                + " uses writable metadata columns. The 
scenario showcases a"
+                                + " bug where a wrong type was used in sinks 
which did not"
+                                + " consider metadata columns. There needs to 
be multiple"
+                                + " requirements for the bug to show up. 1. We 
need to use "
+                                + " rocksdb, so that we use a serializer when 
putting records"
+                                + " into state in SinkUpsertMaterializer. 2. 
We need to retract"
+                                + " to a previous value taken from the state, 
otherwise we"
+                                + " forward the incoming record. 3. There need 
to be persisted"
+                                + " metadata columns.")
+                .setupConfig(
+                        TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY,
+                        
ExecutionConfigOptions.SinkUpsertMaterializeStrategy.LEGACY)
+                .setupConfig(STATE_BACKEND, "rocksdb")
+                .setupTableSource(
+                        SourceTestStep.newBuilder("source_t")
+                                .addSchema("name STRING", "score INT")
+                                .addOption("changelog-mode", "I,UB,UA,D")
+                                .producedBeforeRestore(
+                                        Row.ofKind(RowKind.INSERT, "Bob", 5),
+                                        Row.ofKind(RowKind.INSERT, "Bob", 6))
+                                .producedAfterRestore(producedAfterRestore)
+                                .build())
+                .setupTableSink(
+                        SinkTestStep.newBuilder("sink_t")
+                                .addSchema(
+                                        "name STRING PRIMARY KEY NOT ENFORCED",
+                                        "scoreMetadata BIGINT METADATA",
+                                        "score BIGINT",
+                                        "nameMetadata STRING METADATA")
+                                .addOption("sink-changelog-mode-enforced", 
"I,UA,D")
+                                // The test sink lists metadata columns
+                                // 
(SupportsWritingMetadata#listWritableMetadata) in
+                                // alphabetical order, this is also the order 
in the record of
+                                // a sink, irrespective of the table schema
+                                .addOption(
+                                        "writable-metadata",
+                                        
"nameMetadata:STRING,scoreMetadata:BIGINT")
+                                // physical columns first, then metadata 
columns, sorted
+                                // alphabetically by columns name (test sink 
property)
+                                .consumedBeforeRestore("+I[BOB, 5, Bob, 5]", 
"+U[BOB, 6, Bob, 6]")
+                                .consumedAfterRestore(consumedAfterRestore)
+                                .build())
+                .runSql("INSERT INTO sink_t SELECT UPPER(name), score, score, 
name FROM source_t")
+                .build();
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
index 1ea01803717..2e7197525f7 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.test.program.TableTestProgram;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 /** Restore tests for {@link StreamExecSink}. */
 public class TableSinkRestoreTest extends RestoreTestBase {
@@ -44,6 +45,16 @@ public class TableSinkRestoreTest extends RestoreTestBase {
                 TableSinkTestPrograms.SINK_WRITING_METADATA,
                 TableSinkTestPrograms.SINK_NDF_PRIMARY_KEY,
                 TableSinkTestPrograms.SINK_PARTIAL_INSERT,
-                TableSinkTestPrograms.SINK_UPSERT);
+                TableSinkTestPrograms.SINK_UPSERT,
+                
TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA_FOR_LEGACY_TYPE,
+                TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA);
+    }
+
+    @Override
+    protected Map<Integer, List<TableTestProgram>> programsToIgnore() {
+        return Map.of(
+                // disable the writable metadata test for sink node with 
version 1. it fails after
+                // the restore
+                1, 
List.of(TableSinkTestPrograms.INSERT_RETRACT_WITH_WRITABLE_METADATA));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
index 13f91ef9a52..f6ec45fb5a0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java
@@ -205,6 +205,18 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
                                                                                
         savepointPath))));
     }
 
+    // 
====================================================================================
+    // Extension points for adjusting test combinations
+    // 
====================================================================================
+
+    /**
+     * Can be overridden with a collection of programs that should be ignored 
for a particular
+     * version of the node under test.
+     */
+    protected Map<Integer, List<TableTestProgram>> programsToIgnore() {
+        return Collections.emptyMap();
+    }
+
     /**
      * The method can be overridden in a subclass to test multiple savepoint 
files for a given
      * program and a node in a particular version. This can be useful e.g. to 
test a node against
@@ -212,9 +224,16 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
      */
     protected Stream<String> getSavepointPaths(
             TableTestProgram program, ExecNodeMetadata metadata) {
-        return Stream.of(getSavepointPath(program, metadata, null));
+        if (programsToIgnore()
+                .getOrDefault(metadata.version(), Collections.emptyList())
+                .contains(program)) {
+            return Stream.empty();
+        } else {
+            return Stream.of(getSavepointPath(program, metadata, null));
+        }
     }
 
+    /** Can be used in {@link #getSavepointPaths(TableTestProgram, 
ExecNodeMetadata)}. */
     protected final String getSavepointPath(
             TableTestProgram program,
             ExecNodeMetadata metadata,
@@ -229,6 +248,10 @@ public abstract class RestoreTestBase implements 
TableTestProgramRunner {
         return builder.toString();
     }
 
+    // 
====================================================================================
+    // End of extension points
+    // 
====================================================================================
+
     private void registerSinkObserver(
             final List<CompletableFuture<?>> futures,
             final SinkTestStep sinkTestStep,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
 
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
index 19cd2a29e6c..176a1eb36b4 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
@@ -30,7 +30,7 @@
     "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])"
   }, {
     "id" : 2,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json
 
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json
index 957899f9eb1..f04ebbd701d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testInvalidTypeJsonPlan.json
@@ -129,7 +129,7 @@
     },
     {
       "id": 5,
-      "type": "stream-exec-sink_1",
+      "type": "stream-exec-sink_2",
       "configuration": {
         "table.exec.sink.keyed-shuffle": "AUTO",
         "table.exec.sink.not-null-enforcer": "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out
 
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out
index 5fa8f04dbda..6018fa8f081 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSinkTableWithHints.out
@@ -30,7 +30,7 @@
     "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])"
   }, {
     "id" : 0,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out
 
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out
index 4080cb14712..4b1ddd9ad3e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/jsonplan/testSourceTableWithHints.out
@@ -31,7 +31,7 @@
     "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c], hints=[[[OPTIONS 
options:{scan.parallelism=2, bounded=true}]]])"
   }, {
     "id" : 0,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
index 9ebcad236f8..989225dbed2 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
@@ -72,7 +72,7 @@
     "description" : "PythonCalc(select=[a, pyFunc(b, b) AS EXPR$1])"
   }, {
     "id" : 3,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out
index c3169f69f9d..11a1628fbca 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonFunctionInWhereClause.out
@@ -141,7 +141,7 @@
     "description" : "Calc(select=[a, b], where=[f0])"
   }, {
     "id" : 5,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
index 195be418c0d..71ecfdb9335 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testJoinWithFilter.out
@@ -189,7 +189,7 @@
     "description" : "Calc(select=[x, y], where=[(((y + 1) = (y * y)) AND (x = 
a))])"
   }, {
     "id" : 5,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out
index 73f72972634..3d3d35aaff2 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCorrelateJsonPlanTest_jsonplan/testPythonTableFunction.out
@@ -135,7 +135,7 @@
     "description" : "Calc(select=[x, y])"
   }, {
     "id" : 5,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out
index 77a97c2d42a..fd7538d2991 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupAggregateJsonPlanTest_jsonplan/tesPythonAggCallsWithGroupBy.out
@@ -149,7 +149,7 @@
     "description" : "Calc(select=[CAST(b AS BIGINT) AS a, EXPR$1 AS b])"
   }, {
     "id" : 6,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
index c788c49e306..711d77a9810 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
@@ -303,7 +303,7 @@
     "description" : "PythonGroupWindowAggregate(groupBy=[b], 
window=[SlidingGroupWindow('w$, rowtime, 10000, 5000)], select=[b, pyFunc(a, 
$f3) AS EXPR$1])"
   }, {
     "id" : 7,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
index 892f3eb3b9a..80528083d31 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
@@ -301,7 +301,7 @@
     "description" : "PythonGroupWindowAggregate(groupBy=[b], 
window=[SessionGroupWindow('w$, rowtime, 10000)], select=[b, pyFunc(a, $f3) AS 
EXPR$1])"
   }, {
     "id" : 7,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
index 1bde1c99e43..625404e3afc 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
@@ -413,7 +413,7 @@
     "description" : "Calc(select=[b, w$start AS window_start, w$end AS 
window_end, EXPR$3])"
   }, {
     "id" : 8,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
index 66562e9f1f2..e9092e8a395 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
@@ -356,7 +356,7 @@
     "description" : "PythonGroupWindowAggregate(groupBy=[b], 
window=[SlidingGroupWindow('w$, proctime, 600000, 300000)], select=[b, 
pyFunc(a, $f3) AS EXPR$1])"
   }, {
     "id" : 7,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
index 11ee30655dd..ca4a4cbd1a8 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
@@ -354,7 +354,7 @@
     "description" : "PythonGroupWindowAggregate(groupBy=[b], 
window=[SessionGroupWindow('w$, proctime, 600000)], select=[b, pyFunc(a, $f3) 
AS EXPR$1])"
   }, {
     "id" : 7,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
index 28630581efe..1be4cb3d7a4 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
@@ -445,7 +445,7 @@
     "description" : "Calc(select=[b, w$end AS window_end, EXPR$2])"
   }, {
     "id" : 8,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
index 260321275c3..44dae4c0af9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
@@ -352,7 +352,7 @@
     "description" : "Calc(select=[$2 AS $0, w0$o0 AS $1])"
   }, {
     "id" : 8,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
index 5cad90f407c..eba924fca5e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
@@ -366,7 +366,7 @@
     "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])"
   }, {
     "id" : 8,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
index afc22d4037a..1d7a1d15ce9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
@@ -303,7 +303,7 @@
     "description" : "Calc(select=[$2 AS $0, w0$o0 AS $1])"
   }, {
     "id" : 7,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
index b792d066f59..3252699022c 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
@@ -356,7 +356,7 @@
     "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])"
   }, {
     "id" : 8,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
index 1f25a552ba3..d44f5a0aac4 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
@@ -298,7 +298,7 @@
     "description" : "Calc(select=[$3 AS $0, w0$o0 AS $1])"
   }, {
     "id" : 7,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out
index cc6ba9c8b6e..b3db697e6ab 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest_jsonplan/testJsonPlanWithTableHints.out
@@ -263,7 +263,7 @@
     }
   }, {
     "id" : 6,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json
new file mode 100644
index 00000000000..2959e98ad25
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json
@@ -0,0 +1,149 @@
+{
+  "flinkVersion" : "2.3",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "score",
+              "dataType" : "INT"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `score` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[name, score])"
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "internalName" : "$UPPER$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, 
`nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>",
+    "description" : "Calc(select=[UPPER(name) AS name, CAST(score AS BIGINT) 
AS score, name AS nameMetadata, CAST(score AS BIGINT) AS scoreMetadata])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "scoreMetadata",
+              "kind" : "METADATA",
+              "dataType" : "BIGINT",
+              "isVirtual" : false
+            }, {
+              "name" : "score",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "nameMetadata",
+              "kind" : "METADATA",
+              "dataType" : "VARCHAR(2147483647)",
+              "isVirtual" : false
+            } ],
+            "primaryKey" : {
+              "name" : "PK_name",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "name" ]
+            }
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "WritingMetadata",
+        "metadataKeys" : [ "nameMetadata", "scoreMetadata" ],
+        "consumedType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` 
BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT> NOT NULL"
+      } ]
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "requireUpsertMaterialize" : true,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "sinkMaterializeState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, 
`nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[name, score, nameMetadata, scoreMetadata], upsertMaterialize=[true])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata
new file mode 100644
index 00000000000..52dbca2634c
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json
new file mode 100644
index 00000000000..02743978262
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1.json
@@ -0,0 +1,149 @@
+{
+  "flinkVersion" : "2.3",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "score",
+              "dataType" : "INT"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `score` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[name, score])"
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "internalName" : "$UPPER$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, 
`nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>",
+    "description" : "Calc(select=[UPPER(name) AS name, CAST(score AS BIGINT) 
AS score, name AS nameMetadata, CAST(score AS BIGINT) AS scoreMetadata])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "scoreMetadata",
+              "kind" : "METADATA",
+              "dataType" : "BIGINT",
+              "isVirtual" : false
+            }, {
+              "name" : "score",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "nameMetadata",
+              "kind" : "METADATA",
+              "dataType" : "VARCHAR(2147483647)",
+              "isVirtual" : false
+            } ],
+            "primaryKey" : {
+              "name" : "PK_name",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "name" ]
+            }
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "WritingMetadata",
+        "metadataKeys" : [ "nameMetadata", "scoreMetadata" ],
+        "consumedType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` 
BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT> NOT NULL"
+      } ]
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "requireUpsertMaterialize" : true,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "sinkMaterializeState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, 
`nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[name, score, nameMetadata, scoreMetadata], upsertMaterialize=[true])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata
new file mode 100644
index 00000000000..be3b51c2def
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata-v1/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata.json
new file mode 100644
index 00000000000..43168424eec
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/plan/insert-into-upsert-with-sink-upsert-materializer-writable-metadata.json
@@ -0,0 +1,149 @@
+{
+  "flinkVersion" : "2.3",
+  "nodes" : [ {
+    "id" : 4,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "score",
+              "dataType" : "INT"
+            } ]
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `score` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[name, score])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "CALL",
+      "internalName" : "$UPPER$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "CALL",
+      "syntax" : "SPECIAL",
+      "internalName" : "$CAST$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "INT"
+      } ],
+      "type" : "BIGINT"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, 
`nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>",
+    "description" : "Calc(select=[UPPER(name) AS name, CAST(score AS BIGINT) 
AS score, name AS nameMetadata, CAST(score AS BIGINT) AS scoreMetadata])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "scoreMetadata",
+              "kind" : "METADATA",
+              "dataType" : "BIGINT",
+              "isVirtual" : false
+            }, {
+              "name" : "score",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "nameMetadata",
+              "kind" : "METADATA",
+              "dataType" : "VARCHAR(2147483647)",
+              "isVirtual" : false
+            } ],
+            "primaryKey" : {
+              "name" : "PK_name",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "name" ]
+            }
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "WritingMetadata",
+        "metadataKeys" : [ "nameMetadata", "scoreMetadata" ],
+        "consumedType" : "ROW<`name` VARCHAR(2147483647) NOT NULL, `score` 
BIGINT, `nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT> NOT NULL"
+      } ]
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "requireUpsertMaterialize" : true,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "sinkMaterializeState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`name` VARCHAR(2147483647), `score` BIGINT, 
`nameMetadata` VARCHAR(2147483647), `scoreMetadata` BIGINT>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[name, score, nameMetadata, scoreMetadata], upsertMaterialize=[true])"
+  } ],
+  "edges" : [ {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/savepoint/_metadata
new file mode 100644
index 00000000000..b2c30bf6e50
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/insert-into-upsert-with-sink-upsert-materializer-writable-metadata/savepoint/_metadata
 differ
diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/plan/sink-bucketing_hash-with-keys-with-count.json
similarity index 68%
copy from flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
copy to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/plan/sink-bucketing_hash-with-keys-with-count.json
index 880fb2cf11d..338e923a20e 100644
--- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/plan/sink-bucketing_hash-with-keys-with-count.json
@@ -1,36 +1,32 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "2.3",
   "nodes" : [ {
-    "id" : 0,
+    "id" : 5,
     "type" : "stream-exec-table-source-scan_2",
     "scanTableSource" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             } ]
-          },
-          "options" : {
-            "connector" : "datagen",
-            "number-of-rows" : "5"
           }
         }
       }
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])"
   }, {
-    "id" : 0,
-    "type" : "stream-exec-sink_1",
+    "id" : 6,
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
@@ -40,27 +36,34 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             } ]
           },
-          "options" : {
-            "connector" : "blackhole"
-          }
+          "distribution" : {
+            "kind" : "HASH",
+            "bucketCount" : 3,
+            "bucketKeys" : [ "a" ]
+          },
+          "partitionKeys" : [ "b" ]
         }
-      }
+      },
+      "abilities" : [ {
+        "type" : "Bucketing"
+      } ]
     },
     "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "MAP",
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -68,12 +71,12 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
   } ],
   "edges" : [ {
-    "source" : 0,
-    "target" : 0,
+    "source" : 5,
+    "target" : 6,
     "shuffle" : {
       "type" : "FORWARD"
     },
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/savepoint/_metadata
new file mode 100644
index 00000000000..6998265c31e
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_hash-with-keys-with-count/savepoint/_metadata
 differ
diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/plan/sink-bucketing_range_with_keys_without_count.json
similarity index 69%
copy from flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
copy to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/plan/sink-bucketing_range_with_keys_without_count.json
index 880fb2cf11d..47ce499372a 100644
--- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/plan/sink-bucketing_range_with_keys_without_count.json
@@ -1,36 +1,32 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "2.3",
   "nodes" : [ {
-    "id" : 0,
+    "id" : 7,
     "type" : "stream-exec-table-source-scan_2",
     "scanTableSource" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             } ]
-          },
-          "options" : {
-            "connector" : "datagen",
-            "number-of-rows" : "5"
           }
         }
       }
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])"
   }, {
-    "id" : 0,
-    "type" : "stream-exec-sink_1",
+    "id" : 8,
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
@@ -40,27 +36,33 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             } ]
           },
-          "options" : {
-            "connector" : "blackhole"
-          }
+          "distribution" : {
+            "kind" : "HASH",
+            "bucketKeys" : [ "a" ]
+          },
+          "partitionKeys" : [ "b" ]
         }
-      }
+      },
+      "abilities" : [ {
+        "type" : "Bucketing"
+      } ]
     },
     "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "MAP",
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -68,12 +70,12 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
   } ],
   "edges" : [ {
-    "source" : 0,
-    "target" : 0,
+    "source" : 7,
+    "target" : 8,
     "shuffle" : {
       "type" : "FORWARD"
     },
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/savepoint/_metadata
new file mode 100644
index 00000000000..0fd79331c10
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_range_with_keys_without_count/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/plan/sink-bucketing_with-count.json
similarity index 71%
copy from 
flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
copy to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/plan/sink-bucketing_with-count.json
index 19cd2a29e6c..1df0a88d5f2 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/plan/sink-bucketing_with-count.json
@@ -1,36 +1,32 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "2.3",
   "nodes" : [ {
     "id" : 1,
     "type" : "stream-exec-table-source-scan_2",
     "scanTableSource" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             } ]
-          },
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
           }
         }
       }
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])"
   }, {
     "id" : 2,
-    "type" : "stream-exec-sink_1",
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
@@ -40,28 +36,34 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             } ]
           },
-          "options" : {
-            "connector" : "values",
-            "table-sink-class" : "DEFAULT"
-          }
+          "distribution" : {
+            "kind" : "UNKNOWN",
+            "bucketCount" : 3,
+            "bucketKeys" : [ ]
+          },
+          "partitionKeys" : [ "b" ]
         }
-      }
+      },
+      "abilities" : [ {
+        "type" : "Bucketing"
+      } ]
     },
     "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "MAP",
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -69,8 +71,8 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
   } ],
   "edges" : [ {
     "source" : 1,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/savepoint/_metadata
new file mode 100644
index 00000000000..ef824265bbb
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-count/savepoint/_metadata
 differ
diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/plan/sink-bucketing_with-keys-and-count.json
similarity index 68%
copy from flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
copy to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/plan/sink-bucketing_with-keys-and-count.json
index 880fb2cf11d..9e95055e3ca 100644
--- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/plan/sink-bucketing_with-keys-and-count.json
@@ -1,36 +1,32 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "2.3",
   "nodes" : [ {
-    "id" : 0,
+    "id" : 3,
     "type" : "stream-exec-table-source-scan_2",
     "scanTableSource" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             } ]
-          },
-          "options" : {
-            "connector" : "datagen",
-            "number-of-rows" : "5"
           }
         }
       }
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])"
   }, {
-    "id" : 0,
-    "type" : "stream-exec-sink_1",
+    "id" : 4,
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
@@ -40,27 +36,34 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             } ]
           },
-          "options" : {
-            "connector" : "blackhole"
-          }
+          "distribution" : {
+            "kind" : "UNKNOWN",
+            "bucketCount" : 3,
+            "bucketKeys" : [ "a" ]
+          },
+          "partitionKeys" : [ "b" ]
         }
-      }
+      },
+      "abilities" : [ {
+        "type" : "Bucketing"
+      } ]
     },
     "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "MAP",
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -68,12 +71,12 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
   } ],
   "edges" : [ {
-    "source" : 0,
-    "target" : 0,
+    "source" : 3,
+    "target" : 4,
     "shuffle" : {
       "type" : "FORWARD"
     },
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/savepoint/_metadata
new file mode 100644
index 00000000000..023fa406bf3
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-bucketing_with-keys-and-count/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/plan/sink-ndf-primary-key.json
similarity index 54%
copy from 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
copy to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/plan/sink-ndf-primary-key.json
index 9ebcad236f8..192ef2898a9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/plan/sink-ndf-primary-key.json
@@ -1,66 +1,51 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "2.3",
   "nodes" : [ {
-    "id" : 1,
+    "id" : 16,
     "type" : "stream-exec-table-source-scan_2",
     "scanTableSource" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT NOT NULL"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "TIMESTAMP(3)"
             } ]
-          },
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
           }
         }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
-      } ]
+      }
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])"
   }, {
-    "id" : 2,
-    "type" : "stream-exec-python-calc_1",
+    "id" : 17,
+    "type" : "stream-exec-calc_1",
     "projection" : [ {
       "kind" : "INPUT_REF",
       "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
       "type" : "BIGINT"
     }, {
       "kind" : "CALL",
-      "catalogName" : "`default_catalog`.`default_database`.`pyFunc`",
+      "catalogName" : "`default_catalog`.`default_database`.`ndf`",
       "operands" : [ {
         "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "INT NOT NULL"
-      }, {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "INT NOT NULL"
+        "inputIndex" : 2,
+        "type" : "VARCHAR(2147483647)"
       } ],
-      "type" : "INT NOT NULL"
+      "type" : "VARCHAR(2147483647)"
     } ],
+    "condition" : null,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -68,11 +53,11 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>",
-    "description" : "PythonCalc(select=[a, pyFunc(b, b) AS EXPR$1])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `EXPR$2` VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, b, ndf(c) AS EXPR$2])"
   }, {
-    "id" : 3,
-    "type" : "stream-exec-sink_1",
+    "id" : 18,
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
@@ -82,25 +67,30 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
-            } ]
-          },
-          "options" : {
-            "connector" : "values",
-            "table-sink-class" : "DEFAULT"
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            } ],
+            "primaryKey" : {
+              "name" : "PK_c",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "c" ]
+            }
           }
         }
       }
     },
     "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "MAP",
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -108,19 +98,19 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, EXPR$1])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `EXPR$2` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, EXPR$2])"
   } ],
   "edges" : [ {
-    "source" : 1,
-    "target" : 2,
+    "source" : 16,
+    "target" : 17,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 2,
-    "target" : 3,
+    "source" : 17,
+    "target" : 18,
     "shuffle" : {
       "type" : "FORWARD"
     },
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/savepoint/_metadata
new file mode 100644
index 00000000000..fb6418009ee
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-ndf-primary-key/savepoint/_metadata
 differ
diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/plan/sink-overwrite.json
similarity index 71%
copy from flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
copy to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/plan/sink-overwrite.json
index 880fb2cf11d..1be7ade9d61 100644
--- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/plan/sink-overwrite.json
@@ -1,36 +1,32 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "2.3",
   "nodes" : [ {
-    "id" : 0,
+    "id" : 12,
     "type" : "stream-exec-table-source-scan_2",
     "scanTableSource" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             } ]
-          },
-          "options" : {
-            "connector" : "datagen",
-            "number-of-rows" : "5"
           }
         }
       }
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])"
   }, {
-    "id" : 0,
-    "type" : "stream-exec-sink_1",
+    "id" : 13,
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
@@ -40,27 +36,29 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             } ]
-          },
-          "options" : {
-            "connector" : "blackhole"
           }
         }
-      }
+      },
+      "abilities" : [ {
+        "type" : "Overwrite",
+        "overwrite" : true
+      } ]
     },
     "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "MAP",
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -68,12 +66,12 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
   } ],
   "edges" : [ {
-    "source" : 0,
-    "target" : 0,
+    "source" : 12,
+    "target" : 13,
     "shuffle" : {
       "type" : "FORWARD"
     },
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/savepoint/_metadata
new file mode 100644
index 00000000000..874462a05b3
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-overwrite/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/plan/sink-partial-insert.json
similarity index 53%
copy from 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
copy to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/plan/sink-partial-insert.json
index 9ebcad236f8..ddd0df467ae 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/plan/sink-partial-insert.json
@@ -1,66 +1,54 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "2.3",
   "nodes" : [ {
-    "id" : 1,
+    "id" : 19,
     "type" : "stream-exec-table-source-scan_2",
     "scanTableSource" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT NOT NULL"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "TIMESTAMP(3)"
             } ]
-          },
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
           }
         }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
-      } ]
+      }
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])"
   }, {
-    "id" : 2,
-    "type" : "stream-exec-python-calc_1",
+    "id" : 20,
+    "type" : "stream-exec-calc_1",
     "projection" : [ {
       "kind" : "INPUT_REF",
       "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
       "type" : "BIGINT"
     }, {
-      "kind" : "CALL",
-      "catalogName" : "`default_catalog`.`default_database`.`pyFunc`",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "INT NOT NULL"
-      }, {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "INT NOT NULL"
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "LITERAL",
+      "value" : null,
+      "type" : "DECIMAL(10, 2)"
+    }, {
+      "kind" : "LITERAL",
+      "value" : null,
+      "type" : "DOUBLE"
     } ],
+    "condition" : null,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -68,11 +56,11 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>",
-    "description" : "PythonCalc(select=[a, pyFunc(b, b) AS EXPR$1])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `EXPR$3` 
DECIMAL(10, 2), `EXPR$4` DOUBLE>",
+    "description" : "Calc(select=[a, b, c, null:DECIMAL(10, 2) AS EXPR$3, 
null:DOUBLE AS EXPR$4])"
   }, {
-    "id" : 3,
-    "type" : "stream-exec-sink_1",
+    "id" : 21,
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
@@ -82,25 +70,36 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "DECIMAL(10, 2)"
+            }, {
+              "name" : "e",
+              "dataType" : "DOUBLE"
             } ]
-          },
-          "options" : {
-            "connector" : "values",
-            "table-sink-class" : "DEFAULT"
           }
         }
-      }
+      },
+      "abilities" : [ {
+        "type" : "TargetColumnWriting",
+        "targetColumns" : [ [ 0 ], [ 1 ], [ 2 ] ]
+      } ],
+      "targetColumns" : [ [ 0 ], [ 1 ], [ 2 ] ]
     },
     "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "MAP",
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -108,19 +107,19 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, EXPR$1])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `EXPR$3` 
DECIMAL(10, 2), `EXPR$4` DOUBLE>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
targetColumns=[[0],[1],[2]], fields=[a, b, c, EXPR$3, EXPR$4])"
   } ],
   "edges" : [ {
-    "source" : 1,
-    "target" : 2,
+    "source" : 19,
+    "target" : 20,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 2,
-    "target" : 3,
+    "source" : 20,
+    "target" : 21,
     "shuffle" : {
       "type" : "FORWARD"
     },
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/savepoint/_metadata
new file mode 100644
index 00000000000..5b02515c833
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partial-insert/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/plan/sink-partition.json
similarity index 53%
copy from 
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
copy to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/plan/sink-partition.json
index 9ebcad236f8..79f725b5f5c 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonCalcJsonPlanTest_jsonplan/testPythonCalc.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/plan/sink-partition.json
@@ -1,66 +1,50 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "2.3",
   "nodes" : [ {
-    "id" : 1,
+    "id" : 9,
     "type" : "stream-exec-table-source-scan_2",
     "scanTableSource" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT NOT NULL"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "d",
-              "dataType" : "TIMESTAMP(3)"
             } ]
-          },
-          "options" : {
-            "bounded" : "false",
-            "connector" : "values"
           }
         }
-      },
-      "abilities" : [ {
-        "type" : "ProjectPushDown",
-        "projectedFields" : [ [ 0 ], [ 1 ] ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
-      }, {
-        "type" : "ReadingMetadata",
-        "metadataKeys" : [ ],
-        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
-      } ]
+      }
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])"
   }, {
-    "id" : 2,
-    "type" : "stream-exec-python-calc_1",
+    "id" : 10,
+    "type" : "stream-exec-calc_1",
     "projection" : [ {
       "kind" : "INPUT_REF",
       "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "LITERAL",
+      "value" : 2,
+      "type" : "BIGINT NOT NULL"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
       "type" : "BIGINT"
     }, {
-      "kind" : "CALL",
-      "catalogName" : "`default_catalog`.`default_database`.`pyFunc`",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "INT NOT NULL"
-      }, {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 1,
-        "type" : "INT NOT NULL"
-      } ],
-      "type" : "INT NOT NULL"
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
     } ],
+    "condition" : null,
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -68,11 +52,11 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>",
-    "description" : "PythonCalc(select=[a, pyFunc(b, b) AS EXPR$1])"
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "Calc(select=[a, 2 AS EXPR$1, b, c])"
   }, {
-    "id" : 3,
-    "type" : "stream-exec-sink_1",
+    "id" : 11,
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
@@ -82,25 +66,35 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "p",
+              "dataType" : "BIGINT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
             } ]
           },
-          "options" : {
-            "connector" : "values",
-            "table-sink-class" : "DEFAULT"
-          }
+          "partitionKeys" : [ "b" ]
         }
-      }
+      },
+      "abilities" : [ {
+        "type" : "Partitioning",
+        "partition" : {
+          "b" : "2"
+        }
+      } ]
     },
     "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "MAP",
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -108,19 +102,19 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `EXPR$1` INT NOT NULL>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, EXPR$1])"
+    "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, EXPR$1, b, c])"
   } ],
   "edges" : [ {
-    "source" : 1,
-    "target" : 2,
+    "source" : 9,
+    "target" : 10,
     "shuffle" : {
       "type" : "FORWARD"
     },
     "shuffleMode" : "PIPELINED"
   }, {
-    "source" : 2,
-    "target" : 3,
+    "source" : 10,
+    "target" : 11,
     "shuffle" : {
       "type" : "FORWARD"
     },
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/savepoint/_metadata
new file mode 100644
index 00000000000..32a8e76dbe0
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-partition/savepoint/_metadata
 differ
diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/plan/sink-upsert.json
similarity index 58%
copy from flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
copy to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/plan/sink-upsert.json
index 880fb2cf11d..53aa33b7e10 100644
--- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/plan/sink-upsert.json
@@ -1,36 +1,37 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "2.3",
   "nodes" : [ {
-    "id" : 0,
+    "id" : 22,
     "type" : "stream-exec-table-source-scan_2",
     "scanTableSource" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT NOT NULL"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
-            } ]
-          },
-          "options" : {
-            "connector" : "datagen",
-            "number-of-rows" : "5"
+            } ],
+            "primaryKey" : {
+              "name" : "PK_a",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "a" ]
+            }
           }
         }
       }
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])"
   }, {
-    "id" : 0,
-    "type" : "stream-exec-sink_1",
+    "id" : 23,
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
@@ -40,27 +41,31 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT NOT NULL"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
-            } ]
-          },
-          "options" : {
-            "connector" : "blackhole"
+            } ],
+            "primaryKey" : {
+              "name" : "PK_a",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "a" ]
+            }
           }
         }
       }
     },
-    "inputChangelogMode" : [ "INSERT" ],
+    "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ],
+    "upsertMaterializeStrategy" : "MAP",
+    "inputUpsertKey" : [ 0 ],
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -68,12 +73,12 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
   } ],
   "edges" : [ {
-    "source" : 0,
-    "target" : 0,
+    "source" : 22,
+    "target" : 23,
     "shuffle" : {
       "type" : "FORWARD"
     },
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/savepoint/_metadata
new file mode 100644
index 00000000000..e9567fea0c2
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-upsert/savepoint/_metadata
 differ
diff --git a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/plan/sink-writing-metadata.json
similarity index 65%
copy from flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
copy to 
flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/plan/sink-writing-metadata.json
index 880fb2cf11d..405453d3f43 100644
--- a/flink-python/pyflink/table/tests/jsonplan/testGetJsonPlan.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/plan/sink-writing-metadata.json
@@ -1,36 +1,32 @@
 {
-  "flinkVersion" : "",
+  "flinkVersion" : "2.3",
   "nodes" : [ {
-    "id" : 0,
+    "id" : 14,
     "type" : "stream-exec-table-source-scan_2",
     "scanTableSource" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
               "dataType" : "VARCHAR(2147483647)"
             } ]
-          },
-          "options" : {
-            "connector" : "datagen",
-            "number-of-rows" : "5"
           }
         }
       }
     },
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])"
   }, {
-    "id" : 0,
-    "type" : "stream-exec-sink_1",
+    "id" : 15,
+    "type" : "stream-exec-sink_2",
     "configuration" : {
       "table.exec.sink.keyed-shuffle" : "AUTO",
       "table.exec.sink.not-null-enforcer" : "ERROR",
@@ -40,27 +36,32 @@
     },
     "dynamicTableSink" : {
       "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
         "resolvedTable" : {
           "schema" : {
             "columns" : [ {
               "name" : "a",
-              "dataType" : "BIGINT"
+              "dataType" : "INT"
             }, {
               "name" : "b",
-              "dataType" : "INT"
+              "dataType" : "BIGINT"
             }, {
               "name" : "c",
-              "dataType" : "VARCHAR(2147483647)"
+              "kind" : "METADATA",
+              "dataType" : "VARCHAR(2147483647)",
+              "isVirtual" : false
             } ]
-          },
-          "options" : {
-            "connector" : "blackhole"
           }
         }
-      }
+      },
+      "abilities" : [ {
+        "type" : "WritingMetadata",
+        "metadataKeys" : [ "c" ],
+        "consumedType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)> 
NOT NULL"
+      } ]
     },
     "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "MAP",
     "inputProperties" : [ {
       "requiredDistribution" : {
         "type" : "UNKNOWN"
@@ -68,12 +69,12 @@
       "damBehavior" : "PIPELINED",
       "priority" : 0
     } ],
-    "outputType" : "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b, c])"
+    "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
   } ],
   "edges" : [ {
-    "source" : 0,
-    "target" : 0,
+    "source" : 14,
+    "target" : 15,
     "shuffle" : {
       "type" : "FORWARD"
     },
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/savepoint/_metadata
new file mode 100644
index 00000000000..75224ac197c
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_2/sink-writing-metadata/savepoint/_metadata
 differ

Reply via email to