This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1ab90c670c [flink] Flink batch delete supports
partial-update.remove-record-on-sequence-group option (#4861)
1ab90c670c is described below
commit 1ab90c670c8eff9a70e41e0d79b79911da837ca9
Author: yuzelin <[email protected]>
AuthorDate: Wed Jan 8 16:22:00 2025 +0800
[flink] Flink batch delete supports
partial-update.remove-record-on-sequence-group option (#4861)
---
.../main/java/org/apache/paimon/CoreOptions.java | 4 ---
.../SupportsRowLevelOperationFlinkTableSink.java | 37 ++++++++++++++------
.../apache/paimon/flink/PartialUpdateITCase.java | 40 +++++++++++++++++++++-
3 files changed, 66 insertions(+), 15 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 9a59bad356..93bc23a41f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2208,10 +2208,6 @@ public class CoreOptions implements Serializable {
return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
}
- public boolean partialUpdateRemoveRecordOnDelete() {
- return options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
- }
-
public Optional<String> rowkindField() {
return options.getOptional(ROWKIND_FIELD);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 4e4c2ff2c6..c0d19abff2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -58,7 +58,10 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
+import static
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
+import static
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP;
import static org.apache.paimon.CoreOptions.createCommitUser;
+import static
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Flink table sink that supports row level update and delete. */
@@ -185,17 +188,31 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
table.getClass().getName()));
}
- CoreOptions coreOptions = CoreOptions.fromMap(table.options());
- if (coreOptions.mergeEngine() == DEDUPLICATE
- || (coreOptions.mergeEngine() == PARTIAL_UPDATE
- && coreOptions.partialUpdateRemoveRecordOnDelete())) {
- return;
- }
+ Options options = Options.fromMap(table.options());
+ MergeEngine mergeEngine = options.get(MERGE_ENGINE);
- throw new UnsupportedOperationException(
- String.format(
- "Merge engine %s can not support batch delete.",
- coreOptions.mergeEngine()));
+ switch (mergeEngine) {
+ case DEDUPLICATE:
+ return;
+ case PARTIAL_UPDATE:
+ if (options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE)
+ ||
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP) != null) {
+ return;
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Merge engine %s doesn't support batch
delete by default. To support batch delete, "
+ + "please set %s to true when
there is no %s or set %s.",
+ mergeEngine,
+
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE.key(),
+ SEQUENCE_GROUP,
+
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP));
+ }
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Merge engine %s can not support batch
delete.", mergeEngine));
+ }
}
private boolean canPushDownDeleteFilter() {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 76ee8309e8..be2d6b3433 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -623,7 +623,7 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
}
@Test
- public void testRemoveRecordOnDelete() {
+ public void testRemoveRecordOnDeleteWithoutSequenceGroup() {
sql(
"CREATE TABLE remove_record_on_delete (pk INT PRIMARY KEY NOT
ENFORCED, a STRING, b STRING) WITH ("
+ " 'merge-engine' = 'partial-update',"
@@ -647,6 +647,44 @@ public class PartialUpdateITCase extends CatalogITCaseBase
{
.containsExactlyInAnyOrder(Row.of(1, "A", "apache"));
}
+ @Test
+ public void testRemoveRecordOnDeleteWithSequenceGroup() throws Exception {
+ sql(
+ "CREATE TABLE remove_record_on_delete_sequence_group"
+ + " (pk INT PRIMARY KEY NOT ENFORCED, a STRING, seq_a
INT, b STRING, seq_b INT) WITH ("
+ + " 'merge-engine' = 'partial-update',"
+ + " 'fields.seq_a.sequence-group' = 'a',"
+ + " 'fields.seq_b.sequence-group' = 'b',"
+ + " 'partial-update.remove-record-on-sequence-group' =
'seq_a'"
+ + ")");
+
+ sql("INSERT INTO remove_record_on_delete_sequence_group VALUES (1,
'apple', 2, 'a', 1)");
+ sql("INSERT INTO remove_record_on_delete_sequence_group VALUES (1,
'banana', 1, 'b', 2)");
+ assertThat(sql("SELECT * FROM remove_record_on_delete_sequence_group"))
+ .containsExactlyInAnyOrder(Row.of(1, "apple", 2, "b", 2));
+
+ // delete with seq_b won't delete record but retract b
+ String id =
+ TestValuesTableFactory.registerData(
+ Collections.singletonList(
+ Row.ofKind(RowKind.DELETE, 1, null, null, "b",
2)));
+ sEnv.executeSql(
+ String.format(
+ "CREATE TEMPORARY TABLE delete_source1 (pk INT, a
STRING, seq_a INT, b STRING, seq_b INT) "
+ + "WITH ('connector'='values',
'bounded'='true', 'data-id'='%s', "
+ + "'changelog-mode' = 'I,D,UA,UB')",
+ id));
+ sEnv.executeSql(
+ "INSERT INTO remove_record_on_delete_sequence_group
SELECT * FROM delete_source1")
+ .await();
+ assertThat(sql("SELECT * FROM remove_record_on_delete_sequence_group"))
+ .containsExactlyInAnyOrder(Row.of(1, "apple", 2, null, 2));
+
+ // delete record
+ sql("DELETE FROM remove_record_on_delete_sequence_group WHERE pk = 1");
+ assertThat(sql("SELECT * FROM
remove_record_on_delete_sequence_group")).isEmpty();
+ }
+
@Test
public void testRemoveRecordOnDeleteLookup() throws Exception {
sql(