This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f57ebac39b5b97db0bb41519803a6bde0dfb84db
Author: lincoln lee <[email protected]>
AuthorDate: Mon Aug 19 22:26:48 2024 +0800

    [hotfix][table] Fix KeyedUpsertingSinkFunction's recovery logic and add 
restore test
---
 .../factories/TestValuesRuntimeFunctions.java      |  16 ++-
 .../nodes/exec/common/TableSinkTestPrograms.java   |  31 ++++-
 .../nodes/exec/stream/TableSinkRestoreTest.java    |   3 +-
 .../sink-upsert/plan/sink-upsert.json              | 141 +++++++++++++++++++++
 .../sink-upsert/savepoint/_metadata                | Bin 0 -> 10027 bytes
 5 files changed, 188 insertions(+), 3 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
index cb12026b4a0..52dd722ee3b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java
@@ -508,11 +508,25 @@ final class TestValuesRuntimeFunctions {
             super.initializeState(context);
 
             synchronized (LOCK) {
-                // always store in a single map, global upsert
+                // always store in a single map, global upsert similar to 
external database
                 this.localUpsertResult =
                         globalUpsertResult
                                 .computeIfAbsent(tableName, k -> new 
HashMap<>())
                                 .computeIfAbsent(0, k -> new HashMap<>());
+                // load all data from global raw result
+                globalRawResult.computeIfAbsent(tableName, k -> new 
HashMap<>()).values().stream()
+                        .flatMap(List::stream)
+                        .forEach(
+                                row -> {
+                                    boolean isDelete = row.getKind() == 
RowKind.DELETE;
+                                    Row key = Row.project(row, keyIndices);
+                                    key.setKind(RowKind.INSERT);
+                                    if (isDelete) {
+                                        localUpsertResult.remove(key);
+                                    } else {
+                                        localUpsertResult.put(key, row);
+                                    }
+                                });
             }
         }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java
index 3b8c043b4c8..7939a672415 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/TableSinkTestPrograms.java
@@ -24,10 +24,11 @@ import org.apache.flink.table.test.program.SinkTestStep;
 import org.apache.flink.table.test.program.SourceTestStep;
 import org.apache.flink.table.test.program.TableTestProgram;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
 import java.util.Arrays;
 
-/** {@link TableTestProgram} definitions for testing {@link 
StreamExecDeduplicate}. */
+/** {@link TableTestProgram} definitions for testing {@link StreamExecSink}. */
 public class TableSinkTestPrograms {
 
     public static final Row[] BEFORE_DATA = {
@@ -116,6 +117,7 @@ public class TableSinkTestPrograms {
                                     .build())
                     .runSql("INSERT OVERWRITE sink_t SELECT * FROM source_t")
                     .build();
+
     public static final TableTestProgram SINK_WRITING_METADATA =
             TableTestProgram.of("sink-writing-metadata", "validates writing 
metadata to sink")
                     .setupTableSource(
@@ -193,4 +195,31 @@ public class TableSinkTestPrograms {
                                     .build())
                     .runSql("INSERT INTO sink_t (a, b, c) SELECT a, b, c FROM 
source_t")
                     .build();
+
+    public static final TableTestProgram SINK_UPSERT =
+            TableTestProgram.of("sink-upsert", "validates sink with primary 
key")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    .addSchema(SOURCE_SCHEMA)
+                                    .addSchema("PRIMARY KEY (a) NOT ENFORCED")
+                                    .producedBeforeRestore(BEFORE_DATA)
+                                    .producedAfterRestore(
+                                            new Row[] {
+                                                Row.ofKind(RowKind.DELETE, 1, 
1L, "hi"),
+                                                Row.of(4, 4L, "foo")
+                                            })
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("a INT", "b BIGINT", "c 
VARCHAR")
+                                    .addSchema("PRIMARY KEY (a) NOT ENFORCED")
+                                    .consumedBeforeRestore(
+                                            "+I[1, 1, hi]",
+                                            "+I[2, 2, hello]",
+                                            "+I[3, 2, hello world]")
+                                    .consumedAfterRestore("-D[1, 1, hi]", 
"+I[4, 4, foo]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT * FROM source_t")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
index 7c52680a27a..1ea01803717 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkRestoreTest.java
@@ -43,6 +43,7 @@ public class TableSinkRestoreTest extends RestoreTestBase {
                 TableSinkTestPrograms.SINK_OVERWRITE,
                 TableSinkTestPrograms.SINK_WRITING_METADATA,
                 TableSinkTestPrograms.SINK_NDF_PRIMARY_KEY,
-                TableSinkTestPrograms.SINK_PARTIAL_INSERT);
+                TableSinkTestPrograms.SINK_PARTIAL_INSERT,
+                TableSinkTestPrograms.SINK_UPSERT);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-upsert/plan/sink-upsert.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-upsert/plan/sink-upsert.json
new file mode 100644
index 00000000000..905142cee35
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-upsert/plan/sink-upsert.json
@@ -0,0 +1,141 @@
+{
+  "flinkVersion" : "2.0",
+  "nodes" : [ {
+    "id" : 22,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`source_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_a",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "a" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, source_t]], fields=[a, b, c])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 23,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "Exchange(distribution=[hash[a]])"
+  }, {
+    "id" : 24,
+    "type" : "stream-exec-changelog-normalize_1",
+    "configuration" : {
+      "table.exec.mini-batch.enabled" : "false",
+      "table.exec.mini-batch.size" : "-1"
+    },
+    "uniqueKeys" : [ 0 ],
+    "generateUpdateBefore" : false,
+    "state" : [ {
+      "index" : 0,
+      "ttl" : "0 ms",
+      "name" : "changelogNormalizeState"
+    } ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "ChangelogNormalize(key=[a])"
+  }, {
+    "id" : 25,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink_t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "a",
+              "dataType" : "INT NOT NULL"
+            }, {
+              "name" : "b",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "dataType" : "VARCHAR(2147483647)"
+            } ],
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_a",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "a" ]
+            }
+          },
+          "partitionKeys" : [ ]
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_AFTER", "DELETE" ],
+    "inputUpsertKey" : [ 0 ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`a` INT NOT NULL, `b` BIGINT, `c` 
VARCHAR(2147483647)>",
+    "description" : "Sink(table=[default_catalog.default_database.sink_t], 
fields=[a, b, c])"
+  } ],
+  "edges" : [ {
+    "source" : 22,
+    "target" : 23,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 23,
+    "target" : 24,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 24,
+    "target" : 25,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-upsert/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-upsert/savepoint/_metadata
new file mode 100644
index 00000000000..66cc69855df
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-sink_1/sink-upsert/savepoint/_metadata
 differ

Reply via email to