This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f57ebac39b5b97db0bb41519803a6bde0dfb84db Author: lincoln lee <[email protected]> AuthorDate: Mon Aug 19 22:26:48 2024 +0800 [hotfix][table] Fix KeyedUpsertingSinkFunction's recovery logic and add restore test --- .../factories/TestValuesRuntimeFunctions.java | 16 ++- .../nodes/exec/common/TableSinkTestPrograms.java | 31 ++++- .../nodes/exec/stream/TableSinkRestoreTest.java | 3 +- .../sink-upsert/plan/sink-upsert.json | 141 +++++++++++++++++++++ .../sink-upsert/savepoint/_metadata | Bin 0 -> 10027 bytes 5 files changed, 188 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java index cb12026b4a0..52dd722ee3b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java @@ -508,11 +508,25 @@ final class TestValuesRuntimeFunctions { super.initializeState(context); synchronized (LOCK) { - // always store in a single map, global upsert + // always store in a single map, global upsert similar to external database this.localUpsertResult = globalUpsertResult .computeIfAbsent(tableName, k -> new HashMap<>()) .computeIfAbsent(0, k -> new HashMap<>()); + // load all data from global raw result + globalRawResult.computeIfAbsent(tableName, k -> new HashMap<>()).values().stream() + .flatMap(List::stream) + .forEach( + row -> { + boolean isDelete = row.getKind() == RowKind.DELETE; + Row key = Row.project(row, keyIndices); + key.setKind(RowKind.INSERT); + if (isDelete) { + localUpsertResult.remove(key); + } else { + localUpsertResult.put(key, row); + } + }); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java index 3b8c043b4c8..7939a672415 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java @@ -24,10 +24,11 @@ import org.apache.flink.table.test.program.SinkTestStep; import org.apache.flink.table.test.program.SourceTestStep; import org.apache.flink.table.test.program.TableTestProgram; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import java.util.Arrays; -/** {@link TableTestProgram} definitions for testing {@link StreamExecDeduplicate}. */ +/** {@link TableTestProgram} definitions for testing {@link StreamExecSink}. */ public class TableSinkTestPrograms { public static final Row[] BEFORE_DATA = { @@ -116,6 +117,7 @@ public class TableSinkTestPrograms { .build()) .runSql("INSERT OVERWRITE sink_t SELECT * FROM source_t") .build(); + public static final TableTestProgram SINK_WRITING_METADATA = TableTestProgram.of("sink-writing-metadata", "validates writing metadata to sink") .setupTableSource( @@ -193,4 +195,31 @@ public class TableSinkTestPrograms { .build()) .runSql("INSERT INTO sink_t (a, b, c) SELECT a, b, c FROM source_t") .build(); + + public static final TableTestProgram SINK_UPSERT = + TableTestProgram.of("sink-upsert", "validates sink with primary key") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addOption("changelog-mode", "I,UA,D") + .addSchema(SOURCE_SCHEMA) + .addSchema("PRIMARY KEY (a) NOT ENFORCED") + .producedBeforeRestore(BEFORE_DATA) + .producedAfterRestore( + new Row[] { + Row.ofKind(RowKind.DELETE, 1, 1L, "hi"), + Row.of(4, 4L, "foo") + }) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("a INT", "b BIGINT", "c VARCHAR") + .addSchema("PRIMARY KEY (a) NOT ENFORCED") + .consumedBeforeRestore( + "+I[1, 1, hi]", + "+I[2, 2, hello]", + "+I[3, 2, hello world]") + .consumedAfterRestore("-D[1, 1, hi]", "+I[4, 4, foo]") + .build()) + .runSql("INSERT INTO sink_t SELECT * FROM source_t") + .build(); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java index 7c52680a27a..1ea01803717 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java @@ -43,6 +43,7 @@ public class TableSinkRestoreTest extends RestoreTestBase { TableSinkTestPrograms.SINK_OVERWRITE, TableSinkTestPrograms.SINK_WRITING_METADATA, TableSinkTestPrograms.SINK_NDF_PRIMARY_KEY, - TableSinkTestPrograms.SINK_PARTIAL_INSERT); + TableSinkTestPrograms.SINK_PARTIAL_INSERT, + TableSinkTestPrograms.SINK_UPSERT); } } diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-upsert/plan/sink-upsert.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-upsert/plan/sink-upsert.json new file mode 100644 index 00000000000..905142cee35 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-upsert/plan/sink-upsert.json @@ -0,0 +1,141 @@ +{ + "flinkVersion" : "2.0", + "nodes" : [ { + "id" : 22, + "type" : "stream-exec-table-source-scan_1", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`source_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT NOT NULL" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_a", + "type" : "PRIMARY_KEY", + "columns" : [ "a" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "TableSourceScan(table=[[default_catalog, default_database, source_t]], fields=[a, b, c])", + "inputProperties" : [ ] + }, { + "id" : 23, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Exchange(distribution=[hash[a]])" + }, { + "id" : 24, + "type" : "stream-exec-changelog-normalize_1", + "configuration" : { + "table.exec.mini-batch.enabled" : "false", + "table.exec.mini-batch.size" : "-1" + }, + "uniqueKeys" : [ 0 ], + "generateUpdateBefore" : false, + "state" : [ { + "index" : 0, + "ttl" : "0 ms", + "name" : "changelogNormalizeState" + } ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "ChangelogNormalize(key=[a])" + }, { + "id" : 25, + "type" : "stream-exec-sink_1", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink_t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "a", + "dataType" : "INT NOT NULL" + }, { + "name" : "b", + "dataType" : "BIGINT" + }, { + "name" : "c", + "dataType" : "VARCHAR(2147483647)" + } ], + "watermarkSpecs" : [ ], + "primaryKey" : { + "name" : "PK_a", + "type" : "PRIMARY_KEY", + "columns" : [ "a" ] + } + }, + "partitionKeys" : [ ] + } + } + }, + "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ], + "inputUpsertKey" : [ 0 ], + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` VARCHAR(2147483647)>", + "description" : "Sink(table=[default_catalog.default_database.sink_t], fields=[a, b, c])" + } ], + "edges" : [ { + "source" : 22, + "target" : 23, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 23, + "target" : 24, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 24, + "target" : 25, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-upsert/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-upsert/savepoint/_metadata new file mode 100644 index 00000000000..66cc69855df Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-upsert/savepoint/_metadata differ
