This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f362dcc9d4e14cfa30a27881158ec9431dd9e274
Author: bvarghese1 <bvargh...@confluent.io>
AuthorDate: Thu Nov 2 12:33:56 2023 -0700

    [FLINK-33441] Implement restore tests for ExecUnion node
---
 .../nodes/exec/testutils/UnionRestoreTest.java     |  41 ++++
 .../nodes/exec/testutils/UnionTestPrograms.java    | 158 ++++++++++++++
 .../plan/union-all-two-sources.json                | 145 +++++++++++++
 .../union-all-two-sources/savepoint/_metadata      | Bin 0 -> 7626 bytes
 .../plan/union-all-with-filter.json                | 241 +++++++++++++++++++++
 .../union-all-with-filter/savepoint/_metadata      | Bin 0 -> 8740 bytes
 .../union-two-sources/plan/union-two-sources.json  | 199 +++++++++++++++++
 .../union-two-sources/savepoint/_metadata          | Bin 0 -> 12445 bytes
 8 files changed, 784 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionRestoreTest.java
new file mode 100644
index 00000000000..ca27c175fc6
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionRestoreTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Restore tests for {@link StreamExecUnion}. */
+public class UnionRestoreTest extends RestoreTestBase {
+
+    public UnionRestoreTest() {
+        super(StreamExecUnion.class);
+    }
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return Arrays.asList(
+                UnionTestPrograms.UNION_TWO_SOURCES,
+                UnionTestPrograms.UNION_ALL_TWO_SOURCES,
+                UnionTestPrograms.UNION_ALL_WITH_FILTER);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionTestPrograms.java
new file mode 100644
index 00000000000..562199588b5
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionTestPrograms.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.testutils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion;
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+
+import java.time.LocalDateTime;
+
+/** {@link TableTestProgram} definitions for testing {@link StreamExecUnion}. 
*/
+public class UnionTestPrograms {
+
+    static final TableTestProgram UNION_TWO_SOURCES =
+            TableTestProgram.of("union-two-sources", "validates union of 2 
tables")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t1")
+                                    .addSchema(
+                                            "a BIGINT",
+                                            "b INT NOT NULL",
+                                            "c VARCHAR",
+                                            "d TIMESTAMP(3)")
+                                    .producedBeforeRestore(
+                                            Row.of(
+                                                    420L,
+                                                    1,
+                                                    "hello",
+                                                    LocalDateTime.of(
+                                                            2023, 12, 16, 01, 
01, 01, 123)))
+                                    .producedAfterRestore(
+                                            Row.of(
+                                                    420L,
+                                                    1,
+                                                    "hello",
+                                                    LocalDateTime.of(
+                                                            2023, 12, 16, 01, 
01, 01, 123)),
+                                            Row.of(
+                                                    600L,
+                                                    6,
+                                                    "hello there",
+                                                    LocalDateTime.of(
+                                                            2023, 12, 19, 01, 
01, 01, 123)))
+                                    .build())
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t2")
+                                    .addSchema("d BIGINT", "e INT NOT NULL")
+                                    .producedBeforeRestore(Row.of(420L, 1), 
Row.of(421L, 2))
+                                    .producedAfterRestore(Row.of(500L, 3), 
Row.of(420L, 1))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t1_union_t2")
+                                    .addSchema("a BIGINT", "b INT")
+                                    .consumedBeforeRestore(Row.of(420L, 1), 
Row.of(421L, 2))
+                                    .consumedAfterRestore(Row.of(600L, 6), 
Row.of(500L, 3))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t1_union_t2 SELECT * FROM 
(SELECT a, b FROM source_t1) UNION (SELECT d, e FROM source_t2)")
+                    .build();
+
+    static final TableTestProgram UNION_ALL_TWO_SOURCES =
+            TableTestProgram.of("union-all-two-sources", "validates union all 
of 2 tables")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t1")
+                                    .addSchema(
+                                            "a BIGINT",
+                                            "b INT NOT NULL",
+                                            "c VARCHAR",
+                                            "d TIMESTAMP(3)")
+                                    .producedBeforeRestore(
+                                            Row.of(
+                                                    420L,
+                                                    1,
+                                                    "hello",
+                                                    LocalDateTime.of(
+                                                            2023, 12, 16, 01, 
01, 01, 123)))
+                                    .producedAfterRestore(
+                                            Row.of(
+                                                    600L,
+                                                    6,
+                                                    "hello there",
+                                                    LocalDateTime.of(
+                                                            2023, 12, 19, 01, 
01, 01, 123)))
+                                    .build())
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t2")
+                                    .addSchema("d BIGINT", "e INT NOT NULL")
+                                    .producedBeforeRestore(Row.of(420L, 1), 
Row.of(421L, 2))
+                                    .producedAfterRestore(Row.of(500L, 3), 
Row.of(421L, 2))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t1_union_all_t2")
+                                    .addSchema("a BIGINT", "b INT")
+                                    .consumedBeforeRestore(
+                                            Row.of(420L, 1), Row.of(420L, 1), 
Row.of(421L, 2))
+                                    .consumedAfterRestore(
+                                            Row.of(600L, 6), Row.of(500L, 3), 
Row.of(421L, 2))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t1_union_all_t2 SELECT * FROM 
(SELECT a, b FROM source_t1) UNION ALL (SELECT d, e FROM source_t2)")
+                    .build();
+
+    static final TableTestProgram UNION_ALL_WITH_FILTER =
+            TableTestProgram.of(
+                            "union-all-with-filter", "validates union all of 2 
tables with filters")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t1")
+                                    .addSchema("a INT", "b VARCHAR", "c INT")
+                                    .producedBeforeRestore(
+                                            Row.of(2, "a", 6),
+                                            Row.of(4, "b", 8),
+                                            Row.of(6, "c", 10))
+                                    .producedAfterRestore(
+                                            Row.of(1, "a", 5), Row.of(3, "b", 
7), Row.of(5, "c", 9))
+                                    .build())
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t2")
+                                    .addSchema("a INT", "b VARCHAR", "c INT")
+                                    .producedBeforeRestore(
+                                            Row.of(0, "a", 6),
+                                            Row.of(7, "b", 8),
+                                            Row.of(8, "c", 10))
+                                    .producedAfterRestore(
+                                            Row.of(1, "a", 5),
+                                            Row.of(13, "b", 7),
+                                            Row.of(50, "c", 9))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t1_union_all_t2")
+                                    .addSchema("a INT", "b VARCHAR", "c INT")
+                                    .consumedBeforeRestore(
+                                            Row.of(0, "a", 6),
+                                            Row.of(4, "b", 8),
+                                            Row.of(6, "c", 10))
+                                    .consumedAfterRestore(
+                                            Row.of(1, "a", 5), Row.of(3, "b", 
7), Row.of(5, "c", 9))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t1_union_all_t2 (SELECT * FROM 
source_t1 where a >=3) UNION ALL (select * from source_t2 where a <= 3)")
+                    .build();
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-two-sources/plan/union-all-two-sources.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-two-sources/plan/union-all-two-sources.json
new file mode 100644
index 00000000000..2349006532d
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-two-sources/plan/union-all-two-sources.json
@@ -0,0 +1,145 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 7,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t1`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ] ],
+        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t1, project=[a, b], metadata=[]]], fields=[a, b])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 8,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t2`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "d",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "e",
+              "dataType" : "INT NOT NULL"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`d` BIGINT, `e` INT NOT NULL>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t2]], fields=[d, e])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 9,
+    "type" : "stream-exec-union_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "Union(all=[true], union=[a, b])"
+  }, {
+    "id" : 10,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : 
"`default_catalog`.`default_database`.`sink_t1_union_all_t2`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : 
"Sink(table=[default_catalog.default_database.sink_t1_union_all_t2], fields=[a, 
b])"
+  } ],
+  "edges" : [ {
+    "source" : 7,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 8,
+    "target" : 9,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 9,
+    "target" : 10,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-two-sources/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-two-sources/savepoint/_metadata
new file mode 100644
index 00000000000..df56312c5c1
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-two-sources/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-with-filter/plan/union-all-with-filter.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-with-filter/plan/union-all-with-filter.json
new file mode 100644
index 00000000000..9abee8a51fb
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-with-filter/plan/union-all-with-filter.json
@@ -0,0 +1,241 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 11,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t1`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t1, filter=[]]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 12,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$>=$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 3,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Calc(select=[a, b, c], where=[(a >= 3)])"
+  }, {
+    "id" : 13,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t2`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "FilterPushDown",
+        "predicates" : [ ]
+      } ]
+    },
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t2, filter=[]]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 14,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "INT"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 1,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "INT"
+    } ],
+    "condition" : {
+      "kind" : "CALL",
+      "syntax" : "BINARY",
+      "internalName" : "$<=$1",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 0,
+        "type" : "INT"
+      }, {
+        "kind" : "LITERAL",
+        "value" : 3,
+        "type" : "INT NOT NULL"
+      } ],
+      "type" : "BOOLEAN"
+    },
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Calc(select=[a, b, c], where=[(a <= 3)])"
+  }, {
+    "id" : 15,
+    "type" : "stream-exec-union_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : "Union(all=[true], union=[a, b, c])"
+  }, {
+    "id" : 16,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : 
"`default_catalog`.`default_database`.`sink_t1_union_all_t2`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "c",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>",
+    "description" : 
"Sink(table=[default_catalog.default_database.sink_t1_union_all_t2], fields=[a, 
b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 11,
+    "target" : 12,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 13,
+    "target" : 14,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 12,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 14,
+    "target" : 15,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 15,
+    "target" : 16,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-with-filter/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-with-filter/savepoint/_metadata
new file mode 100644
index 00000000000..de328432dc3
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-with-filter/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-two-sources/plan/union-two-sources.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-two-sources/plan/union-two-sources.json
new file mode 100644
index 00000000000..afe61e6ac62
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-two-sources/plan/union-two-sources.json
@@ -0,0 +1,199 @@
+{
+  "flinkVersion" : "1.19",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t1`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "d",
+              "dataType" : "TIMESTAMP(3)"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      },
+      "abilities" : [ {
+        "type" : "ProjectPushDown",
+        "projectedFields" : [ [ 0 ], [ 1 ] ],
+        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+      }, {
+        "type" : "ReadingMetadata",
+        "metadataKeys" : [ ],
+        "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+      } ]
+    },
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t1, project=[a, b], metadata=[]]], fields=[a, b])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t2`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "d",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "e",
+              "dataType" : "INT NOT NULL"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`d` BIGINT, `e` INT NOT NULL>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t2]], fields=[d, e])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-union_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    }, {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "Union(all=[true], union=[a, b])"
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0, 1 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "Exchange(distribution=[hash[a, b]])"
+  }, {
+    "id" : 5,
+    "type" : "stream-exec-group-aggregate_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "grouping" : [ 0, 1 ],
+    "aggCalls" : [ ],
+    "aggCallNeedRetractions" : [ ],
+    "generateUpdateBefore" : true,
+    "needRetraction" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "groupAggregateState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : "GroupAggregate(groupBy=[a, b], select=[a, b])"
+  }, {
+    "id" : 6,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : 
"`default_catalog`.`default_database`.`sink_t1_union_t2`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "b",
+              "dataType" : "INT"
+            } ],
+            "watermarkSpecs" : [ ]
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ],
+    "inputUpsertKey" : [ 0, 1 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+    "description" : 
"Sink(table=[default_catalog.default_database.sink_t1_union_t2], fields=[a, b])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 4,
+    "target" : 5,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 5,
+    "target" : 6,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-two-sources/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-two-sources/savepoint/_metadata
new file mode 100644
index 00000000000..34ee7652915
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-two-sources/savepoint/_metadata
 differ

Reply via email to