This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 91a89b4da9 [core] Refactor LookupChangelogMergeFunctionWrapper to 
better readability (#5645)
91a89b4da9 is described below

commit 91a89b4da99bc58f980f158c33c1f3c3b5e16e7b
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed May 21 22:44:53 2025 +0800

    [core] Refactor LookupChangelogMergeFunctionWrapper to better readability 
(#5645)
---
 .../LookupChangelogMergeFunctionWrapper.java       | 62 +++++++++++-----------
 .../mergetree/compact/LookupMergeFunction.java     | 49 +++++++++++------
 .../mergetree/compact/LookupMergeFunctionTest.java | 56 +++++++++++++++++++
 3 files changed, 118 insertions(+), 49 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index bbaa354401..ba427f7e92 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -30,8 +30,8 @@ import org.apache.paimon.utils.UserDefinedSeqComparator;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.function.Function;
@@ -56,7 +56,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public class LookupChangelogMergeFunctionWrapper<T>
         implements MergeFunctionWrapper<ChangelogResult> {
 
-    private final MergeFunction<KeyValue> mergeFunction;
+    private final LookupMergeFunction mergeFunction;
     private final Function<InternalRow, T> lookup;
 
     private final ChangelogResult reusedResult = new ChangelogResult();
@@ -67,8 +67,6 @@ public class LookupChangelogMergeFunctionWrapper<T>
     private final @Nullable DeletionVectorsMaintainer 
deletionVectorsMaintainer;
     private final Comparator<KeyValue> comparator;
 
-    private final LinkedList<KeyValue> candidates = new LinkedList<>();
-
     public LookupChangelogMergeFunctionWrapper(
             MergeFunctionFactory<KeyValue> mergeFunctionFactory,
             Function<InternalRow, T> lookup,
@@ -86,7 +84,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
                     deletionVectorsMaintainer != null,
                     "deletionVectorsMaintainer should not be null, there is a 
bug.");
         }
-        this.mergeFunction = mergeFunctionFactory.create();
+        this.mergeFunction = (LookupMergeFunction) mergeFunction;
         this.lookup = lookup;
         this.valueEqualiser = valueEqualiser;
         this.lookupStrategy = lookupStrategy;
@@ -96,36 +94,23 @@ public class LookupChangelogMergeFunctionWrapper<T>
 
     @Override
     public void reset() {
-        candidates.clear();
+        mergeFunction.reset();
     }
 
     @Override
     public void add(KeyValue kv) {
-        candidates.add(kv);
+        mergeFunction.add(kv);
     }
 
     @Override
     public ChangelogResult getResult() {
-        // 1. Compute the latest high level record and containLevel0 of 
candidates
-        Iterator<KeyValue> descending = candidates.descendingIterator();
-        KeyValue highLevel = null;
-        boolean containLevel0 = false;
-        while (descending.hasNext()) {
-            KeyValue kv = descending.next();
-            if (kv.level() > 0) {
-                descending.remove();
-                if (highLevel == null || kv.level() < highLevel.level()) {
-                    highLevel = kv;
-                }
-            } else {
-                containLevel0 = true;
-            }
-        }
+        // 1. Find the latest high level record and compute containLevel0
+        KeyValue highLevel = mergeFunction.pickHighLevel();
+        boolean containLevel0 = containLevel0();
 
         // 2. Lookup if latest high level record is absent
         if (highLevel == null) {
-            InternalRow lookupKey = candidates.get(0).key();
-            T lookupResult = lookup.apply(lookupKey);
+            T lookupResult = lookup.apply(mergeFunction.key());
             if (lookupResult != null) {
                 if (lookupStrategy.deletionVector) {
                     PositionedKeyValue positionedKeyValue = 
(PositionedKeyValue) lookupResult;
@@ -136,10 +121,13 @@ public class LookupChangelogMergeFunctionWrapper<T>
                     highLevel = (KeyValue) lookupResult;
                 }
             }
+            if (highLevel != null) {
+                insertInto(mergeFunction.candidates(), highLevel);
+            }
         }
 
         // 3. Calculate result
-        KeyValue result = calculateResult(candidates, highLevel);
+        KeyValue result = mergeFunction.getResult();
 
         // 4. Set changelog when there's level-0 records
         reusedResult.reset();
@@ -150,21 +138,31 @@ public class LookupChangelogMergeFunctionWrapper<T>
         return reusedResult.setResult(result);
     }
 
-    private KeyValue calculateResult(List<KeyValue> candidates, @Nullable 
KeyValue highLevel) {
-        mergeFunction.reset();
+    public boolean containLevel0() {
+        for (KeyValue kv : mergeFunction.candidates()) {
+            if (kv.level() == 0) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void insertInto(LinkedList<KeyValue> candidates, KeyValue 
highLevel) {
+        List<KeyValue> newCandidates = new ArrayList<>();
         for (KeyValue candidate : candidates) {
             if (highLevel != null && comparator.compare(highLevel, candidate) 
< 0) {
-                mergeFunction.add(highLevel);
-                mergeFunction.add(candidate);
+                newCandidates.add(highLevel);
+                newCandidates.add(candidate);
                 highLevel = null;
             } else {
-                mergeFunction.add(candidate);
+                newCandidates.add(candidate);
             }
         }
         if (highLevel != null) {
-            mergeFunction.add(highLevel);
+            newCandidates.add(highLevel);
         }
-        return mergeFunction.getResult();
+        candidates.clear();
+        candidates.addAll(newCandidates);
     }
 
     private void setChangelog(@Nullable KeyValue before, KeyValue after) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 2aadbff49a..1537677ffc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.mergetree.compact;
 
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.InternalRow;
 
 import javax.annotation.Nullable;
 
@@ -48,29 +49,43 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
         candidates.add(kv);
     }
 
-    @Override
-    public KeyValue getResult() {
-        // 1. Find the latest high level record
-        KeyValue keptHighLevel = null;
-        LinkedList<KeyValue> highLevels = new LinkedList<>();
-
+    @Nullable
+    public KeyValue pickHighLevel() {
+        KeyValue highLevel = null;
         for (KeyValue kv : candidates) {
-            if (kv.level() > 0) {
-                highLevels.add(kv);
-                if (keptHighLevel == null || kv.level() < 
keptHighLevel.level()) {
-                    keptHighLevel = kv;
-                }
+            // records that has not been stored on the disk yet, such as the 
data in the write
+            // buffer being at level -1
+            if (kv.level() <= 0) {
+                continue;
+            }
+            // For high-level comparison logic (not involving Level 0), only 
the value of the
+            // minimum Level should be selected
+            if (highLevel == null || kv.level() < highLevel.level()) {
+                highLevel = kv;
             }
         }
+        return highLevel;
+    }
 
-        if (highLevels.size() > 1) {
-            highLevels.remove(keptHighLevel);
-            candidates.removeAll(highLevels);
-        }
+    public InternalRow key() {
+        return candidates.get(0).key();
+    }
 
-        // 2. Do the merge for inputs
+    public LinkedList<KeyValue> candidates() {
+        return candidates;
+    }
+
+    @Override
+    public KeyValue getResult() {
         mergeFunction.reset();
-        candidates.forEach(mergeFunction::add);
+        KeyValue highLevel = pickHighLevel();
+        for (KeyValue kv : candidates) {
+            // records that has not been stored on the disk yet, such as the 
data in the write
+            // buffer being at level -1
+            if (kv.level() <= 0 || kv == highLevel) {
+                mergeFunction.add(kv);
+            }
+        }
         return mergeFunction.getResult();
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
new file mode 100644
index 0000000000..03b201ca2f
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.KeyValue;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.apache.paimon.types.RowKind.INSERT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class LookupMergeFunctionTest {
+
+    @Test
+    public void testKeepLowestHighLevel() {
+        LookupMergeFunction function =
+                (LookupMergeFunction)
+                        
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()).create();
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(2)).setLevel(1));
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(2));
+        KeyValue kv = function.getResult();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(2);
+    }
+
+    @Test
+    public void testLevelNegative() {
+        LookupMergeFunction function =
+                (LookupMergeFunction)
+                        
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()).create();
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(2)).setLevel(-1));
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(-1));
+        KeyValue kv = function.getResult();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(1);
+    }
+}

Reply via email to