This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9a9b9ce81ca05398f8891c918c74294402462f5c Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Wed Jan 3 17:24:19 2024 -0800 [FLINK-33979] Implement restore tests for TableSink node --- .../planner/factories/TestValuesTableFactory.java | 9 +- .../nodes/exec/stream/TableSinkRestoreTest.java | 43 ++++++ .../nodes/exec/stream/TableSinkTestPrograms.java | 158 +++++++++++++++++++++ .../utils/JavaUserDefinedScalarFunctions.java | 2 +- .../plan/sink-ndf-primary-key.json | 123 ++++++++++++++++ .../sink-ndf-primary-key/savepoint/_metadata | Bin 0 -> 5080 bytes .../sink-overwrite/plan/sink-overwrite.json | 84 +++++++++++ .../sink-overwrite/savepoint/_metadata | Bin 0 -> 8381 bytes .../plan/sink-partial-insert.json | 128 +++++++++++++++++ .../sink-partial-insert/savepoint/_metadata | Bin 0 -> 11034 bytes .../sink-partition/plan/sink-partition.json | 126 ++++++++++++++++ .../sink-partition/savepoint/_metadata | Bin 0 -> 9435 bytes .../plan/sink-writing-metadata.json | 87 ++++++++++++ .../sink-writing-metadata/savepoint/_metadata | Bin 0 -> 8331 bytes 14 files changed, 758 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java index 8511e30ce3c..3dbf4d5b9c0 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java @@ -51,6 +51,7 @@ import org.apache.flink.table.connector.sink.DataStreamSinkProvider; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.OutputFormatProvider; import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata; import org.apache.flink.table.connector.source.AsyncTableFunctionProvider; @@ -1937,7 +1938,10 @@ public final class TestValuesTableFactory /** Values {@link DynamicTableSink} for testing. */ private static class TestValuesTableSink - implements DynamicTableSink, SupportsWritingMetadata, SupportsPartitioning { + implements DynamicTableSink, + SupportsWritingMetadata, + SupportsPartitioning, + SupportsOverwrite { private DataType consumedDataType; private int[] primaryKeyIndices; @@ -2135,6 +2139,9 @@ public final class TestValuesTableFactory public boolean requiresPartitionGrouping(boolean supportsGrouping) { return supportsGrouping; } + + @Override + public void applyOverwrite(boolean overwrite) {} } /** A TableSink used for testing the implementation of {@link SinkFunction.Context}. */ diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java new file mode 100644 index 00000000000..1ab3651cf09 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecSink}. */ +public class TableSinkRestoreTest extends RestoreTestBase { + + public TableSinkRestoreTest() { + super(StreamExecSink.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + TableSinkTestPrograms.SINK_PARTITION, + TableSinkTestPrograms.SINK_OVERWRITE, + TableSinkTestPrograms.SINK_WRITING_METADATA, + TableSinkTestPrograms.SINK_NDF_PRIMARY_KEY, + TableSinkTestPrograms.SINK_PARTIAL_INSERT); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkTestPrograms.java new file mode 100644 index 00000000000..3f5a51922fe --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkTestPrograms.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecDeduplicate}. */ +public class TableSinkTestPrograms { + + static final Row[] BEFORE_DATA = { + Row.of(1, 1L, "hi"), Row.of(2, 2L, "hello"), Row.of(3, 2L, "hello world") + }; + + static final Row[] AFTER_DATA = {Row.of(4, 4L, "foo"), Row.of(5, 2L, "foo bar")}; + + static final String[] SOURCE_SCHEMA = {"a INT", "b BIGINT", "c VARCHAR"}; + + static final TableTestProgram SINK_PARTITION = + TableTestProgram.of("sink-partition", "validates sink partition") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "a INT", "b BIGINT", "p BIGINT NOT NULL", "c VARCHAR") + .addPartitionKeys("b") + .addOption("partition-list", "b:1;b:2;b:3;b:4") + .consumedBeforeRestore( + "+I[1, 2, 1, hi]", + "+I[2, 2, 2, hello]", + "+I[3, 2, 2, hello world]") + .consumedAfterRestore( + "+I[4, 2, 4, foo]", "+I[5, 2, 2, foo bar]") + .build()) + .runSql("INSERT INTO sink_t PARTITION (b=2) SELECT * FROM source_t") + .build(); + + static final TableTestProgram SINK_OVERWRITE = + TableTestProgram.of("sink-overwrite", "validates sink with overwrite") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("a INT", "b BIGINT", "c VARCHAR") + .consumedBeforeRestore( + "+I[1, 1, hi]", + "+I[2, 2, hello]", + "+I[3, 2, hello world]") + .consumedAfterRestore("+I[4, 4, foo]", "+I[5, 2, foo bar]") + .build()) + .runSql("INSERT OVERWRITE sink_t SELECT * FROM source_t") + .build(); + static final TableTestProgram SINK_WRITING_METADATA = + TableTestProgram.of("sink-writing-metadata", "validates writing metadata to sink") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("a INT", "b BIGINT", "c VARCHAR METADATA") + .addOption("writable-metadata", "c:STRING") + .consumedBeforeRestore( + "+I[1, 1, hi]", + "+I[2, 2, hello]", + "+I[3, 2, hello world]") + .consumedAfterRestore("+I[4, 4, foo]", "+I[5, 2, foo bar]") + .build()) + .runSql("INSERT INTO sink_t SELECT * FROM source_t") + .build(); + + static final TableTestProgram SINK_NDF_PRIMARY_KEY = + TableTestProgram.of( + "sink-ndf-primary-key", + "validates sink with ndf and different primary key") + .setupTemporaryCatalogFunction( + "ndf", JavaUserDefinedScalarFunctions.NonDeterministicUdf.class) + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "a INT", + "b BIGINT", + "c VARCHAR", + "PRIMARY KEY(c) NOT ENFORCED") + .consumedBeforeRestore( + "+I[1, 1, hi--1170105035]", + "+I[2, 2, hello-234785527]", + "+I[3, 2, hello world--1360544799]") + .consumedAfterRestore( + "+I[4, 4, foo--1170105035]", + "+I[5, 2, foo bar-234785527]") + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, ndf(c) FROM source_t") + .build(); + + static final TableTestProgram SINK_PARTIAL_INSERT = + TableTestProgram.of("sink-partial-insert", "validates sink with partial insert") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "a INT", + "b BIGINT", + "c VARCHAR", + "d DECIMAL(10,2)", + "e DOUBLE") + .consumedBeforeRestore( + "+I[1, 1, hi, null, null]", + "+I[2, 2, hello, null, null]", + "+I[3, 2, hello world, null, null]") + .consumedAfterRestore( + "+I[4, 4, foo, null, null]", + "+I[5, 2, foo bar, null, null]") + .build()) + .runSql("INSERT INTO sink_t (a, b, c) SELECT a, b, c FROM source_t") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java index bbd0aa2b478..7013cd32083 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java @@ -121,7 +121,7 @@ public class JavaUserDefinedScalarFunctions { /** Non-deterministic scalar function. */ public static class NonDeterministicUdf extends ScalarFunction { - Random random = new Random(); + Random random = new Random(42); // seed for tests public int eval() { return random.nextInt(); diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-ndf-primary-key/plan/sink-ndf-primary-key.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-ndf-primary-key/plan/sink-ndf-primary-key.json new file mode 100644 index 00000000000..5c566d9d3e2 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-ndf-primary-key/plan/sink-ndf-primary-key.json @@ -0,0 +1,123 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 8, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 9, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "CALL", + "catalogName" : "`default_catalog`.`default_database`.`ndf`", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "type" : "VARCHAR(2147483647)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `EXPR$2` VARCHAR(2147483647)>", + "description" : "Calc(select=[a, b, ndf(c) AS EXPR$2])" + }, { + "id" : 10, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647) NOT NULL" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_c", + "type" : "PRIMARY_KEY", + "columns" : [ "c" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `EXPR$2` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, EXPR$2])" + } ], + "edges" : [ { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-ndf-primary-key/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-ndf-primary-key/savepoint/_metadata new file mode 100644 index 00000000000..27c4f41181d Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-ndf-primary-key/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-overwrite/plan/sink-overwrite.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-overwrite/plan/sink-overwrite.json new file mode 100644 index 00000000000..a2a09bc5178 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-overwrite/plan/sink-overwrite.json @@ -0,0 +1,84 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 4, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 5, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "Overwrite", + "overwrite" : true + } ] + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-overwrite/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-overwrite/savepoint/_metadata new file mode 100644 index 00000000000..5430483d096 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-overwrite/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partial-insert/plan/sink-partial-insert.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partial-insert/plan/sink-partial-insert.json new file mode 100644 index 00000000000..6c1b2784b94 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partial-insert/plan/sink-partial-insert.json @@ -0,0 +1,128 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 11, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 12, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + }, { + "kind" : "LITERAL", + "value" : null, + "type" : "DECIMAL(10, 2)" + }, { + "kind" : "LITERAL", + "value" : null, + "type" : "DOUBLE" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `EXPR$3` DECIMAL(10, 2), `EXPR$4` DOUBLE>", + "description" : "Calc(select=[a, b, c, null:DECIMAL(10, 2) AS EXPR$3, null:DOUBLE AS EXPR$4])" + }, { + "id" : 13, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "d", + "dataType" : "DECIMAL(10, 2)" + }, { + "name" : "e", + "dataType" : "DOUBLE" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "targetColumns" : [ [ 0 ], [ 1 ], [ 2 ] ] + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647), `EXPR$3` DECIMAL(10, 2), `EXPR$4` DOUBLE>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], targetColumns=[[0],[1],[2]], fields=[a, b, c, EXPR$3, EXPR$4])" + } ], + "edges" : [ { + "source" : 11, + "target" : 12, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partial-insert/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partial-insert/savepoint/_metadata new file mode 100644 index 00000000000..1c895c8a2b7 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partial-insert/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partition/plan/sink-partition.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partition/plan/sink-partition.json new file mode 100644 index 00000000000..cb4ebaa0e94 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partition/plan/sink-partition.json @@ -0,0 +1,126 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-calc_1", + "projection" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 0, + "type" : "INT" + }, { + "kind" : "LITERAL", + "value" : 2, + "type" : "BIGINT NOT NULL" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 1, + "type" : "BIGINT" + }, { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "VARCHAR(2147483647)" + } ], + "condition" : null, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Calc(select=[a, 2 AS EXPR$1, b, c])" + }, { + "id" : 3, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "p", + "dataType" : "BIGINT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ "b" ] + } + }, + "abilities" : [ { + "type" : "Partitioning", + "partition" : { + "b" : "2" + } + } ] + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `EXPR$1` BIGINT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, EXPR$1, b, c])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partition/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partition/savepoint/_metadata new file mode 100644 index 00000000000..24aebfe0308 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-partition/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-writing-metadata/plan/sink-writing-metadata.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-writing-metadata/plan/sink-writing-metadata.json new file mode 100644 index 00000000000..2220fab28ac --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-writing-metadata/plan/sink-writing-metadata.json @@ -0,0 +1,87 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 6, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 7, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "kind" : "METADATA", + "dataType" : "VARCHAR(2147483647)", + "isVirtual" : false + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + }, + "abilities" : [ { + "type" : "WritingMetadata", + "metadataKeys" : [ "c" ], + "consumedType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)> NOT NULL" + } ] + }, + "inputChangelogMode" : [ "INSERT" ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-writing-metadata/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-writing-metadata/savepoint/_metadata new file mode 100644 index 00000000000..b7874e2a2ef Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-writing-metadata/savepoint/_metadata differ