This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e76ccdc1fd8b15a5aac4968fd89643b0b17e1a48 Author: bvarghese1 <bvargh...@confluent.io> AuthorDate: Fri Jan 26 15:04:19 2024 -0800 [FLINK-34248] Implement restore tests for changelog normalize node --- .../exec/stream/ChangelogNormalizeRestoreTest.java | 41 +++++ .../stream/ChangelogNormalizeTestPrograms.java | 167 +++++++++++++++++++ .../changelog-normalize-source-mini-batch.json | 178 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 11960 bytes .../plan/changelog-normalize-source.json | 155 ++++++++++++++++++ .../changelog-normalize-source/savepoint/_metadata | Bin 0 -> 13487 bytes .../plan/changelog-normalize-upsert.json | 136 ++++++++++++++++ .../changelog-normalize-upsert/savepoint/_metadata | Bin 0 -> 13436 bytes 8 files changed, 677 insertions(+) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeRestoreTest.java new file mode 100644 index 00000000000..c7e1f5e36ce --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeRestoreTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.Arrays; +import java.util.List; + +/** Restore tests for {@link StreamExecChangelogNormalize}. */ +public class ChangelogNormalizeRestoreTest extends RestoreTestBase { + + public ChangelogNormalizeRestoreTest() { + super(StreamExecChangelogNormalize.class); + } + + @Override + public List<TableTestProgram> programs() { + return Arrays.asList( + ChangelogNormalizeTestPrograms.CHANGELOG_SOURCE, + ChangelogNormalizeTestPrograms.CHANGELOG_SOURCE_MINI_BATCH, + ChangelogNormalizeTestPrograms.UPSERT_SOURCE); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeTestPrograms.java new file mode 100644 index 00000000000..e4a3cd1825e --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogNormalizeTestPrograms.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import java.time.Duration; + +/** {@link TableTestProgram} definitions for testing {@link StreamExecChangelogNormalize}. */ +public class ChangelogNormalizeTestPrograms { + + static final String[] SOURCE_SCHEMA = { + "a VARCHAR", "b INT NOT NULL", "c VARCHAR", "PRIMARY KEY(a) NOT ENFORCED" + }; + + static final String[] SINK_SCHEMA = {"a VARCHAR", "b INT", "c VARCHAR"}; + + static final Row[] BEFORE_DATA = { + Row.ofKind(RowKind.INSERT, "one", 1, "a"), + Row.ofKind(RowKind.INSERT, "two", 2, "b"), + Row.ofKind(RowKind.UPDATE_BEFORE, "one", 1, "a"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 1, "aa"), + Row.ofKind(RowKind.INSERT, "three", 3, "c"), + Row.ofKind(RowKind.DELETE, "two", 2, "b"), + Row.ofKind(RowKind.UPDATE_BEFORE, "three", 3, "c"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "cc"), + }; + + static final Row[] AFTER_DATA = { + Row.ofKind(RowKind.INSERT, "four", 4, "d"), + Row.ofKind(RowKind.INSERT, "five", 5, "e"), + Row.ofKind(RowKind.UPDATE_BEFORE, "four", 4, "d"), + Row.ofKind(RowKind.UPDATE_AFTER, "four", 4, "dd"), + Row.ofKind(RowKind.INSERT, "six", 6, "f"), + Row.ofKind(RowKind.DELETE, "six", 6, "f") + }; + + static final String[] BEFORE_OUTPUT = { + "+I[one, 1, a]", + "+I[two, 2, b]", + "-U[one, 1, a]", + "+U[one, 1, aa]", + "+I[three, 3, c]", + "-D[two, 2, b]", + "-U[three, 3, c]", + "+U[three, 3, cc]" + }; + + static final String[] AFTER_OUTPUT = { + "+I[four, 4, d]", + "+I[five, 5, e]", + "-U[four, 4, d]", + "+U[four, 4, dd]", + "+I[six, 6, f]", + "-D[six, 6, f]" + }; + + static final TableTestProgram CHANGELOG_SOURCE = + TableTestProgram.of( + "changelog-normalize-source", "validates changelog normalize source") + .setupConfig( + ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true) + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addOption("changelog-mode", "I,UA,UB,D") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore(BEFORE_OUTPUT) + .consumedAfterRestore(AFTER_OUTPUT) + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t") + .build(); + + static final TableTestProgram CHANGELOG_SOURCE_MINI_BATCH = + TableTestProgram.of( + "changelog-normalize-source-mini-batch", + "validates changelog normalize source with mini batch") + .setupConfig( + ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true) + .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true) + .setupConfig( + ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, + Duration.ofSeconds(10)) + .setupConfig(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 2L) + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addOption("changelog-mode", "I,UA,UB,D") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore(AFTER_DATA) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore(BEFORE_OUTPUT) + .consumedAfterRestore(AFTER_OUTPUT) + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t") + .build(); + + static final TableTestProgram UPSERT_SOURCE = + TableTestProgram.of( + "changelog-normalize-upsert", "validates changelog normalize upsert") + .setupConfig( + ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, true) + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addOption("changelog-mode", "I,UA,D") + .addSchema(SOURCE_SCHEMA) + .producedBeforeRestore( + Row.ofKind(RowKind.UPDATE_AFTER, "one", 1, "a"), + Row.ofKind(RowKind.UPDATE_AFTER, "two", 2, "b"), + Row.ofKind(RowKind.UPDATE_AFTER, "one", 1, "aa"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "c"), + Row.ofKind(RowKind.DELETE, "two", 2, "b"), + Row.ofKind(RowKind.UPDATE_AFTER, "three", 3, "cc")) + .producedAfterRestore( + Row.ofKind(RowKind.UPDATE_AFTER, "four", 4, "d"), + Row.ofKind(RowKind.UPDATE_AFTER, "five", 5, "e"), + Row.ofKind(RowKind.UPDATE_AFTER, "six", 6, "f"), + Row.ofKind(RowKind.UPDATE_AFTER, "five", 5, "ee"), + Row.ofKind(RowKind.DELETE, "six", 6, "f"), + Row.ofKind(RowKind.UPDATE_AFTER, "four", 4, "dd")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema(SINK_SCHEMA) + .consumedBeforeRestore(BEFORE_OUTPUT) + .consumedAfterRestore( + "+I[four, 4, d]", + "+I[five, 5, e]", + "+I[six, 6, f]", + "-U[five, 5, e]", + "+U[five, 5, ee]", + "-D[six, 6, f]", + "-U[four, 4, d]", + "+U[four, 4, dd]") + .build()) + .runSql("INSERT INTO sink_t SELECT a, b, c FROM source_t") + .build(); +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source-mini-batch/plan/changelog-normalize-source-mini-batch.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source-mini-batch/plan/changelog-normalize-source-mini-batch.json new file mode 100644 index 00000000000..864379b0884 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source-mini-batch/plan/changelog-normalize-source-mini-batch.json @@ -0,0 +1,178 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 6, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "b", + "dataType" : "INT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_a", + "type" : "PRIMARY_KEY", + "columns" : [ "a" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 7, + "type" : "stream-exec-mini-batch-assigner_1", + "miniBatchInterval" : { + "interval" : 10000, + "mode" : "ProcTime" + }, + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "MiniBatchAssigner(interval=[10000ms], mode=[ProcTime])" + }, { + "id" : 8, + "type" : "stream-exec-drop-update-before_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "DropUpdateBefore" + }, { + "id" : 9, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "Exchange(distribution=[hash[a]])" + }, { + "id" : 10, + "type" : "stream-exec-changelog-normalize_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "true", + "table.exec.mini-batch.size" : "2" + }, + "uniqueKeys" : [ 0 ], + "generateUpdateBefore" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "changelogNormalizeState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "ChangelogNormalize(key=[a])" + }, { + "id" : 11, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "dataType" : "INT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 0 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 6, + "target" : 7, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 7, + "target" : 8, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 8, + "target" : 9, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 9, + "target" : 10, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 10, + "target" : 11, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source-mini-batch/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source-mini-batch/savepoint/_metadata new file mode 100644 index 00000000000..6962eafb7aa Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source-mini-batch/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source/plan/changelog-normalize-source.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source/plan/changelog-normalize-source.json new file mode 100644 index 00000000000..a2ef3ec51b4 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source/plan/changelog-normalize-source.json @@ -0,0 +1,155 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "b", + "dataType" : "INT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_a", + "type" : "PRIMARY_KEY", + "columns" : [ "a" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 2, + "type" : "stream-exec-drop-update-before_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "DropUpdateBefore" + }, { + "id" : 3, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "Exchange(distribution=[hash[a]])" + }, { + "id" : 4, + "type" : "stream-exec-changelog-normalize_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "false", + "table.exec.mini-batch.size" : "-1" + }, + "uniqueKeys" : [ 0 ], + "generateUpdateBefore" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "changelogNormalizeState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "ChangelogNormalize(key=[a])" + }, { + "id" : 5, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "dataType" : "INT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 0 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 4, + "target" : 5, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source/savepoint/_metadata new file mode 100644 index 00000000000..a8b4a5285ea Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-source/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-upsert/plan/changelog-normalize-upsert.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-upsert/plan/changelog-normalize-upsert.json new file mode 100644 index 00000000000..af41a0b15cd --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-upsert/plan/changelog-normalize-upsert.json @@ -0,0 +1,136 @@ +{ + "flinkVersion" : "1.19", + "nodes" : [ { + "id" : 12, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "VARCHAR(2147483647) NOT NULL" + }, { + "name" : "b", + "dataType" : "INT NOT NULL" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_a", + "type" : "PRIMARY_KEY", + "columns" : [ "a" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 13, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "Exchange(distribution=[hash[a]])" + }, { + "id" : 14, + "type" : "stream-exec-changelog-normalize_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "false", + "table.exec.mini-batch.size" : "-1" + }, + "uniqueKeys" : [ 0 ], + "generateUpdateBefore" : true, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "changelogNormalizeState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "ChangelogNormalize(key=[a])" + }, { + "id" : 15, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "b", + "dataType" : "INT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ] + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 0 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` VARCHAR(2147483647) NOT NULL, `b` INT NOT NULL, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 12, + "target" : 13, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 13, + "target" : 14, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 14, + "target" : 15, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-upsert/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-upsert/savepoint/_metadata new file mode 100644 index 00000000000..6f1c9e13ccf Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-changelog-normalize_1/changelog-normalize-upsert/savepoint/_metadata differ