This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f362dcc9d4e14cfa30a27881158ec9431dd9e274 Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Thu Nov 2 12:33:56 2023 -0700 [FLINK-33441] Implement restore tests for ExecUnion node --- .../nodes/exec/testutils/UnionRestoreTest.java | 41 ++++ .../nodes/exec/testutils/UnionTestPrograms.java | 158 ++++++++++++++ .../plan/union-all-two-sources.json | 145 +++++++++++++ .../union-all-two-sources/savepoint/_metadata | Bin 0 -> 7626 bytes .../plan/union-all-with-filter.json | 241 +++++++++++++++++++++ .../union-all-with-filter/savepoint/_metadata | Bin 0 -> 8740 bytes .../union-two-sources/plan/union-two-sources.json | 199 +++++++++++++++++ .../union-two-sources/savepoint/_metadata | Bin 0 -> 12445 bytes 8 files changed, 784 insertions(+) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionRestoreTest.java new file mode 100644 index 00000000000..ca27c175fc6 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionRestoreTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecUnion}. */ +public class UnionRestoreTest extends RestoreTestBase { + + public UnionRestoreTest() { + super(StreamExecUnion.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + UnionTestPrograms.UNION_TWO_SOURCES, + UnionTestPrograms.UNION_ALL_TWO_SOURCES, + UnionTestPrograms.UNION_ALL_WITH_FILTER); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionTestPrograms.java new file mode 100644 index 00000000000..562199588b5 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/UnionTestPrograms.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.testutils; + +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +import java.time.LocalDateTime; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecUnion}. */ +public class UnionTestPrograms { + + static final TableTestProgram UNION_TWO_SOURCES = + TableTestProgram.of("union-two-sources", "validates union of 2 tables") + .setupTableSource( + SourceTestStep.newBuilder("source_t1") + .addSchema( + "a BIGINT", + "b INT NOT NULL", + "c VARCHAR", + "d TIMESTAMP(3)") + .producedBeforeRestore( + Row.of( + 420L, + 1, + "hello", + LocalDateTime.of( + 2023, 12, 16, 01, 01, 01, 123))) + .producedAfterRestore( + Row.of( + 420L, + 1, + "hello", + LocalDateTime.of( + 2023, 12, 16, 01, 01, 01, 123)), + Row.of( + 600L, + 6, + "hello there", + LocalDateTime.of( + 2023, 12, 19, 01, 01, 01, 123))) + .build()) + .setupTableSource( + SourceTestStep.newBuilder("source_t2") + .addSchema("d BIGINT", "e INT NOT NULL") + .producedBeforeRestore(Row.of(420L, 1), Row.of(421L, 2)) + .producedAfterRestore(Row.of(500L, 3), Row.of(420L, 1)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t1_union_t2") + .addSchema("a BIGINT", "b INT") + .consumedBeforeRestore(Row.of(420L, 1), Row.of(421L, 2)) + .consumedAfterRestore(Row.of(600L, 6), Row.of(500L, 3)) + .build()) + .runSql( + "INSERT INTO sink_t1_union_t2 SELECT * FROM (SELECT a, b FROM source_t1) UNION (SELECT d, e FROM source_t2)") + .build(); + + static final TableTestProgram UNION_ALL_TWO_SOURCES = + TableTestProgram.of("union-all-two-sources", "validates union all of 2 tables") + .setupTableSource( + SourceTestStep.newBuilder("source_t1") + .addSchema( + "a BIGINT", + "b INT NOT NULL", + "c VARCHAR", + "d TIMESTAMP(3)") + .producedBeforeRestore( + Row.of( + 420L, + 1, + "hello", + LocalDateTime.of( + 2023, 12, 16, 01, 01, 01, 123))) + .producedAfterRestore( + Row.of( + 600L, + 6, + "hello there", + LocalDateTime.of( + 2023, 12, 19, 01, 01, 01, 123))) + .build()) + .setupTableSource( + SourceTestStep.newBuilder("source_t2") + .addSchema("d BIGINT", "e INT NOT NULL") + .producedBeforeRestore(Row.of(420L, 1), Row.of(421L, 2)) + .producedAfterRestore(Row.of(500L, 3), Row.of(421L, 2)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t1_union_all_t2") + .addSchema("a BIGINT", "b INT") + .consumedBeforeRestore( + Row.of(420L, 1), Row.of(420L, 1), Row.of(421L, 2)) + .consumedAfterRestore( + Row.of(600L, 6), Row.of(500L, 3), Row.of(421L, 2)) + .build()) + .runSql( + "INSERT INTO sink_t1_union_all_t2 SELECT * FROM (SELECT a, b FROM source_t1) UNION ALL (SELECT d, e FROM source_t2)") + .build(); + + static final TableTestProgram UNION_ALL_WITH_FILTER = + TableTestProgram.of( + "union-all-with-filter", "validates union all of 2 tables with filters") + .setupTableSource( + SourceTestStep.newBuilder("source_t1") + .addSchema("a INT", "b VARCHAR", "c INT") + .producedBeforeRestore( + Row.of(2, "a", 6), + Row.of(4, "b", 8), + Row.of(6, "c", 10)) + .producedAfterRestore( + Row.of(1, "a", 5), Row.of(3, "b", 7), Row.of(5, "c", 9)) + .build()) + .setupTableSource( + SourceTestStep.newBuilder("source_t2") + .addSchema("a INT", "b VARCHAR", "c INT") + .producedBeforeRestore( + Row.of(0, "a", 6), + Row.of(7, "b", 8), + Row.of(8, "c", 10)) + .producedAfterRestore( + Row.of(1, "a", 5), + Row.of(13, "b", 7), + Row.of(50, "c", 9)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t1_union_all_t2") + .addSchema("a INT", "b VARCHAR", "c INT") + .consumedBeforeRestore( + Row.of(0, "a", 6), + Row.of(4, "b", 8), + Row.of(6, "c", 10)) + .consumedAfterRestore( + Row.of(1, "a", 5), Row.of(3, "b", 7), Row.of(5, "c", 9)) + .build()) + .runSql( + "INSERT INTO sink_t1_union_all_t2 (SELECT * FROM source_t1 where a >=3) UNION ALL (select * from source_t2 where a <= 3)") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-two-sources/plan/union-all-two-sources.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-two-sources/plan/union-all-two-sources.json new file mode 100644 index 00000000000..2349006532d --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-two-sources/plan/union-all-two-sources.json @@ -0,0 +1,145 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 7, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t1`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "d", + "dataType" : "TIMESTAMP(3)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ] ], + "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" + } ] + }, + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t1, project=[a, b], metadata=[]]], fields=[a, b])", + "inputProperties" : [ ] + }, { + "id" : 8, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t2`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "d", + "dataType" : "BIGINT" + }, { + "name" : "e", + "dataType" : "INT NOT NULL" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`d` BIGINT, `e` INT NOT NULL>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t2]], fields=[d, e])", + "inputProperties" : [ ] + }, { + "id" : 9, + "type" : "stream-exec-union_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + }, { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "description" : "Union(all=[true], union=[a, b])" + }, { + "id" : 10, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t1_union_all_t2`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t1_union_all_t2], fields=[a, b])" + } ], + "edges" : [ { + "source" : 7, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-two-sources/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-two-sources/savepoint/_metadata new file mode 100644 index 00000000000..df56312c5c1 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-two-sources/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-with-filter/plan/union-all-with-filter.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-with-filter/plan/union-all-with-filter.json new file mode 100644 index 00000000000..9abee8a51fb --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-with-filter/plan/union-all-with-filter.json @@ -0,0 +1,241 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 11, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t1`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "c", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "FilterPushDown", + "predicates" : [ ] + } ] + }, + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t1, filter=[]]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 12, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + } ], + "condition" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$>=$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "LITERAL", + "value" : 3, + "type" : "INT NOT NULL" + } ], + "type" : "BOOLEAN" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>", + "description" : "Calc(select=[a, b, c], where=[(a >= 3)])" + }, { + "id" : 13, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t2`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "c", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "FilterPushDown", + "predicates" : [ ] + } ] + }, + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t2, filter=[]]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 14, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "INT" + } ], + "condition" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$<=$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "LITERAL", + "value" : 3, + "type" : "INT NOT NULL" + } ], + "type" : "BOOLEAN" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>", + "description" : "Calc(select=[a, b, c], where=[(a <= 3)])" + }, { + "id" : 15, + "type" : "stream-exec-union_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + }, { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>", + "description" : "Union(all=[true], union=[a, b, c])" + }, { + "id" : 16, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t1_union_all_t2`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "c", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` INT>", + "description" : "Sink(table=[default_catalog.default_database.sink_t1_union_all_t2], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 12, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 14, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 15, + "target" : 16, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-with-filter/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-with-filter/savepoint/_metadata new file mode 100644 index 00000000000..de328432dc3 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-all-with-filter/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-two-sources/plan/union-two-sources.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-two-sources/plan/union-two-sources.json new file mode 100644 index 00000000000..afe61e6ac62 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-two-sources/plan/union-two-sources.json @@ -0,0 +1,199 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t1`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "d", + "dataType" : "TIMESTAMP(3)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "ProjectPushDown", + "projectedFields" : [ [ 0 ], [ 1 ] ], + "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" + }, { + "type" : "ReadingMetadata", + "metadataKeys" : [ ], + "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL" + } ] + }, + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t1, project=[a, b], metadata=[]]], fields=[a, b])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t2`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "d", + "dataType" : "BIGINT" + }, { + "name" : "e", + "dataType" : "INT NOT NULL" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`d` BIGINT, `e` INT NOT NULL>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t2]], fields=[d, e])", + "inputProperties" : [ ] + }, { + "id" : 3, + "type" : "stream-exec-union_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + }, { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "description" : "Union(all=[true], union=[a, b])" + }, { + "id" : 4, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0, 1 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "description" : "Exchange(distribution=[hash[a, b]])" + }, { + "id" : 5, + "type" : "stream-exec-group-aggregate_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "false", + "table.exec.mini-batch.size" : "-1" + }, + "grouping" : [ 0, 1 ], + "aggCalls" : [ ], + "aggCallNeedRetractions" : [ ], + "generateUpdateBefore" : true, + "needRetraction" : false, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "groupAggregateState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "description" : "GroupAggregate(groupBy=[a, b], select=[a, b])" + }, { + "id" : 6, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t1_union_t2`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "BIGINT" + }, { + "name" : "b", + "dataType" : "INT" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER" ], + "inputUpsertKey" : [ 0, 1 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>", + "description" : "Sink(table=[default_catalog.default_database.sink_t1_union_t2], fields=[a, b])" + } ], + "edges" : [ { + "source" : 1, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 5, + "target" : 6, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-two-sources/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-two-sources/savepoint/_metadata new file mode 100644 index 00000000000..34ee7652915 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-union_1/union-two-sources/savepoint/_metadata differ