This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ff99bec75 [core] Support drop delete in rewriteChangelogCompaction 
(#2875)
ff99bec75 is described below

commit ff99bec75d3687fdbe0b45efe8fe09eba10516e0
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Feb 22 13:27:16 2024 +0800

    [core] Support drop delete in rewriteChangelogCompaction (#2875)
---
 .../apache/paimon/mergetree/DropDeleteReader.java  |  8 ++---
 .../compact/ChangelogMergeTreeRewriter.java        | 14 +++++---
 .../paimon/table/PrimaryKeyFileStoreTableTest.java | 39 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 9 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/DropDeleteReader.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/DropDeleteReader.java
index 7bb50d177..980a6a1c7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/DropDeleteReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/DropDeleteReader.java
@@ -27,8 +27,8 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
- * A {@link RecordReader} which drops {@link KeyValue} of {@link 
RowKind#DELETE} kind from the
- * wrapped reader.
+ * A {@link RecordReader} which drops {@link KeyValue} that does not meet 
{@link RowKind#isAdd} from
+ * the wrapped reader.
  */
 public class DropDeleteReader implements RecordReader<KeyValue> {
 
@@ -55,9 +55,7 @@ public class DropDeleteReader implements 
RecordReader<KeyValue> {
                     if (kv == null) {
                         return null;
                     }
-
-                    if (kv.valueKind() == RowKind.INSERT
-                            || kv.valueKind() == RowKind.UPDATE_AFTER) {
+                    if (kv.isAdd()) {
                         return kv;
                     }
                 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index 7be5fc741..c57087eea 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -93,7 +93,7 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
     public CompactResult rewrite(
             int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) throws Exception {
         if (rewriteChangelog(outputLevel, dropDelete, sections)) {
-            return rewriteChangelogCompaction(outputLevel, sections, true);
+            return rewriteChangelogCompaction(outputLevel, sections, 
dropDelete, true);
         } else {
             return rewriteCompaction(outputLevel, dropDelete, sections);
         }
@@ -102,10 +102,14 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
     /**
      * Rewrite and produce changelog at the same time.
      *
+     * @param dropDelete whether to drop delete when rewrite compact file
      * @param rewriteCompactFile whether to rewrite compact file
      */
     private CompactResult rewriteChangelogCompaction(
-            int outputLevel, List<List<SortedRun>> sections, boolean 
rewriteCompactFile)
+            int outputLevel,
+            List<List<SortedRun>> sections,
+            boolean dropDelete,
+            boolean rewriteCompactFile)
             throws Exception {
         List<ConcatRecordReader.ReaderSupplier<ChangelogResult>> 
sectionReaders = new ArrayList<>();
         for (List<SortedRun> section : sections) {
@@ -132,8 +136,9 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
 
             while (iterator.hasNext()) {
                 ChangelogResult result = iterator.next();
-                if (rewriteCompactFile && result.result() != null) {
-                    compactFileWriter.write(result.result());
+                KeyValue keyValue = result.result();
+                if (rewriteCompactFile && keyValue != null && (!dropDelete || 
keyValue.isAdd())) {
+                    compactFileWriter.write(keyValue);
                 }
                 for (KeyValue kv : result.changelogs()) {
                     changelogFileWriter.write(kv);
@@ -170,6 +175,7 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
                     outputLevel,
                     Collections.singletonList(
                             
Collections.singletonList(SortedRun.fromSingle(file))),
+                    false,
                     strategy.rewrite);
         } else {
             return super.upgrade(outputLevel, file);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 2412e4b7c..2bd3fe1a4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -80,6 +81,7 @@ import java.util.function.Function;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
+import static org.apache.paimon.Snapshot.CommitKind.COMPACT;
 import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
 import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -1233,6 +1235,43 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         innerTestTableQuery(table);
     }
 
+    @Test
+    public void testLookupWithDropDelete() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CHANGELOG_PRODUCER, LOOKUP);
+                            conf.set("num-levels", "2");
+                        });
+        IOManager ioManager = IOManager.create(tablePath.toString());
+        StreamTableWrite write = 
table.newWrite(commitUser).withIOManager(ioManager);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        write.write(rowData(1, 1, 100L));
+        write.write(rowData(1, 2, 200L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        // set num-levels = 2 to make sure that this delete can trigger 
compaction with drop delete
+        write.write(rowDataWithKind(RowKind.DELETE, 1, 1, 100L));
+        commit.commit(1, write.prepareCommit(true, 0));
+        write.close();
+        commit.close();
+
+        Snapshot latestSnapshot = 
table.newSnapshotReader().snapshotManager().latestSnapshot();
+        assertThat(latestSnapshot.commitKind()).isEqualTo(COMPACT);
+        assertThat(latestSnapshot.totalRecordCount()).isEqualTo(1);
+
+        assertThat(
+                        getResult(
+                                table.newRead(),
+                                
toSplits(table.newSnapshotReader().read().dataSplits()),
+                                binaryRow(1),
+                                0,
+                                BATCH_ROW_TO_STRING))
+                .isEqualTo(
+                        Collections.singletonList(
+                                
"1|2|200|binary|varbinary|mapKey:mapVal|multiset"));
+    }
+
     private void innerTestTableQuery(FileStoreTable table) throws Exception {
         IOManager ioManager = IOManager.create(tablePath.toString());
         StreamTableWrite write = 
table.newWrite(commitUser).withIOManager(ioManager);

Reply via email to