This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e76ccdc1fd8b15a5aac4968fd89643b0b17e1a48
Author: bvarghese1 <bvargh...@confluent.io>
AuthorDate: Fri Jan 26 15:04:19 2024 -0800

    [FLINK-34248] Implement restore tests for changelog normalize node
---
 .../exec/stream/ChangelogNormalizeRestoreTest.java |  41 +++++
 .../stream/ChangelogNormalizeTestPrograms.java     | 167 +++++++++++++++++++
 .../changelog-normalize-source-mini-batch.json     | 178 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 11960 bytes
 .../plan/changelog-normalize-source.json           | 155 ++++++++++++++++++
 .../changelog-normalize-source/savepoint/_metadata | Bin 0 -> 13487 bytes
 .../plan/changelog-normalize-upsert.json           | 136 ++++++++++++++++
 .../changelog-normalize-upsert/savepoint/_metadata | Bin 0 -> 13436 bytes
 8 files changed, 677 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeRestoreTest.java
new file mode 100644
index 00000000000..c7e1f5e36ce
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeRestoreTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecChangelogNormalize}. */
+public class ChangelogNormalizeRestoreTest extends RestoreTestBase {
+
+    public ChangelogNormalizeRestoreTest() {
+        super(StreamExecChangelogNormalize.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                ChangelogNormalizeTestPrograms.CHANGELOG_SOURCE,
+                ChangelogNormalizeTestPrograms.CHANGELOG_SOURCE_MINI_BATCH,
+                ChangelogNormalizeTestPrograms.UPSERT_SOURCE);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeTestPrograms.java
new file mode 100644
index 00000000000..e4a3cd1825e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeTestPrograms.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import java.time.Duration;
+
+/** {@link TableTestProgram} definitions for testing {@link 
StreamExecChangelogNormalize}. */
+public class ChangelogNormalizeTestPrograms {
+
+    static final String[] SOURCE_SCHEMA = {
+        "a VARCHAR", "b INT NOT NULL", "c VARCHAR", "PRIMARY KEY(a) NOT 
ENFORCED"
+    };
+
+    static final String[] SINK_SCHEMA = {"a VARCHAR", "b INT", "c VARCHAR"};
+
+    static final Row[] BEFORE_DATA = {
+        Row.ofKind(RowKind.INSERT, "one", 1, "a"),
+        Row.ofKind(RowKind.INSERT, "two", 2, "b"),
+        Row.ofKind(RowKind.UPDATE_BEFORE, "one", 1, "a"),
+        Row.ofKind(RowKind.UPDATE_AFTER, "one", 1, "aa"),
+        Row.ofKind(RowKind.INSERT, "three", 3, "c"),
+        Row.ofKind(RowKind.DELETE, "two", 2, "b"),
+        Row.ofKind(RowKind.UPDATE_BEFORE, "three", 3, "c"),
+        Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "cc"),
+    };
+
+    static final Row[] AFTER_DATA = {
+        Row.ofKind(RowKind.INSERT, "four", 4, "d"),
+        Row.ofKind(RowKind.INSERT, "five", 5, "e"),
+        Row.ofKind(RowKind.UPDATE_BEFORE, "four", 4, "d"),
+        Row.ofKind(RowKind.UPDATE_AFTER, "four", 4, "dd"),
+        Row.ofKind(RowKind.INSERT, "six", 6, "f"),
+        Row.ofKind(RowKind.DELETE, "six", 6, "f")
+    };
+
+    static final String[] BEFORE_OUTPUT = {
+        "+I[one, 1, a]",
+        "+I[two, 2, b]",
+        "-U[one, 1, a]",
+        "+U[one, 1, aa]",
+        "+I[three, 3, c]",
+        "-D[two, 2, b]",
+        "-U[three, 3, c]",
+        "+U[three, 3, cc]"
+    };
+
+    static final String[] AFTER_OUTPUT = {
+        "+I[four, 4, d]",
+        "+I[five, 5, e]",
+        "-U[four, 4, d]",
+        "+U[four, 4, dd]",
+        "+I[six, 6, f]",
+        "-D[six, 6, f]"
+    };
+
+    static final TableTestProgram CHANGELOG_SOURCE =
+            TableTestProgram.of(
+                            "changelog-normalize-source", "validates changelog 
normalize source")
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addOption("changelog-mode", "I,UA,UB,D")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedBeforeRestore(BEFORE_OUTPUT)
+                                    .consumedAfterRestore(AFTER_OUTPUT)
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t")
+                    .build();
+
+    static final TableTestProgram CHANGELOG_SOURCE_MINI_BATCH =
+            TableTestProgram.of(
+                            "changelog-normalize-source-mini-batch",
+                            "validates changelog normalize source with mini 
batch")
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true)
+                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true)
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
+                            Duration.ofSeconds(10))
+                    
.setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 2L)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addOption("changelog-mode", "I,UA,UB,D")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(AFTER_DATA)
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedBeforeRestore(BEFORE_OUTPUT)
+                                    .consumedAfterRestore(AFTER_OUTPUT)
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t")
+                    .build();
+
+    static final TableTestProgram UPSERT_SOURCE =
+            TableTestProgram.of(
+                            "changelog-normalize-upsert", "validates changelog 
normalize upsert")
+                    .setupConfig(
+                            
ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 1, "a"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"two", 2, "b"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"one", 1, "aa"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "c"),
+                                            Row.ofKind(RowKind.DELETE, "two", 
2, "b"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"three", 3, "cc"))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"four", 4, "d"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"five", 5, "e"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"six", 6, "f"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"five", 5, "ee"),
+                                            Row.ofKind(RowKind.DELETE, "six", 
6, "f"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"four", 4, "dd"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SINK_SCHEMA)
+                                    .consumedBeforeRestore(BEFORE_OUTPUT)
+                                    .consumedAfterRestore(
+                                            "+I[four, 4, d]",
+                                            "+I[five, 5, e]",
+                                            "+I[six, 6, f]",
+                                            "-U[five, 5, e]",
+                                            "+U[five, 5, ee]",
+                                            "-D[six, 6, f]",
+                                            "-U[four, 4, d]",
+                                            "+U[four, 4, dd]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source-mini-batch/plan/changelog-normalize-source-mini-batch.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source-mini-batch/plan/changelog-normalize-source-mini-batch.json
new file mode 100644
index 00000000000..864379b0884
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source-mini-batch/plan/changelog-normalize-source-mini-batch.json
@@ -0,0 +1,178 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 6,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_a",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "a" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 7,
+    "type" : "stream-exec-mini-batch-assigner_1",
+    "miniBatchInterval" : {
+      "interval" : 10000,
+      "mode" : "ProcTime"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])"
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-drop-update-before_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "DropUpdateBefore"
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-changelog-normalize_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "true",
+      "table.exec.mini-batch.size" : "2"
+    },
+    "uniqueKeys" : [ 0 ],
+    "generateUpdateBefore" : true,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "changelogNormalizeState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "ChangelogNormalize(key=[a])"
+  }, {
+    "id" : 11,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputUpsertKey" : [ 0 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 6,
+    "target" : 7,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 7,
+    "target" : 8,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 10,
+    "target" : 11,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source-mini-batch/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source-mini-batch/savepoint/_metadata
new file mode 100644
index 00000000000..6962eafb7aa
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source-mini-batch/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source/plan/changelog-normalize-source.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source/plan/changelog-normalize-source.json
new file mode 100644
index 00000000000..a2ef3ec51b4
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source/plan/changelog-normalize-source.json
@@ -0,0 +1,155 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_a",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "a" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-drop-update-before_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "DropUpdateBefore"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-changelog-normalize_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "uniqueKeys" : [ 0 ],
+    "generateUpdateBefore" : true,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "changelogNormalizeState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "ChangelogNormalize(key=[a])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputUpsertKey" : [ 0 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source/savepoint/_metadata
new file mode 100644
index 00000000000..a8b4a5285ea
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-upsert/plan/changelog-normalize-upsert.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-upsert/plan/changelog-normalize-upsert.json
new file mode 100644
index 00000000000..af41a0b15cd
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-upsert/plan/changelog-normalize-upsert.json
@@ -0,0 +1,136 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 12,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_a",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "a" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 13,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 14,
+    "type" : "stream-exec-changelog-normalize_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "uniqueKeys" : [ 0 ],
+    "generateUpdateBefore" : true,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "changelogNormalizeState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "ChangelogNormalize(key=[a])"
+  }, {
+    "id" : 15,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputUpsertKey" : [ 0 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, 
`c` VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 12,
+    "target" : 13,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-upsert/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-upsert/savepoint/_metadata
new file mode 100644
index 00000000000..6f1c9e13ccf
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-upsert/savepoint/_metadata
 differ

Reply via email to