This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e0abf79f8 [core] Introduce external buffer in lookup merging (#6219)
6e0abf79f8 is described below

commit 6e0abf79f834aee3f624d142dd5fb25a1694d533
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Sep 9 14:51:48 2025 +0800

    [core] Introduce external buffer in lookup merging (#6219)
---
 .../shortcodes/generated/core_configuration.html   |  12 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  20 ++
 .../java/org/apache/paimon/options/MemorySize.java |   4 +
 .../apache/paimon/memory/UnlimitedSegmentPool.java |  52 +++++
 .../paimon/crosspartition/GlobalIndexAssigner.java |   1 -
 .../org/apache/paimon/disk/ExternalBuffer.java     |  16 +-
 .../org/apache/paimon/disk/InMemoryBuffer.java     |   6 +-
 .../java/org/apache/paimon/disk/RowBuffer.java     |   2 -
 .../paimon/mergetree/compact/KeyValueBuffer.java   | 243 +++++++++++++++++++++
 .../LookupChangelogMergeFunctionWrapper.java       |  34 +--
 .../mergetree/compact/LookupMergeFunction.java     | 120 +++++++---
 .../paimon/operation/KeyValueFileStoreWrite.java   |  13 +-
 .../paimon/operation/MergeFileSplitRead.java       |   4 +
 .../postpone/PostponeBucketFileStoreWrite.java     |  11 +
 .../paimon/table/PrimaryKeyFileStoreTable.java     |   6 +-
 .../java/org/apache/paimon/utils/SinkWriter.java   |   1 -
 .../org/apache/paimon/disk/ExternalBufferTest.java |   1 -
 .../org/apache/paimon/disk/InMemoryBufferTest.java |   1 -
 .../mergetree/SortBufferWriteBufferTestBase.java   |   2 +-
 .../compact/KeyValueBufferInsertIntoTest.java      | 233 ++++++++++++++++++++
 .../mergetree/compact/KeyValueBufferTest.java      | 156 +++++++++++++
 .../LookupChangelogMergeFunctionWrapperTest.java   |  19 +-
 .../mergetree/compact/LookupMergeFunctionTest.java |   8 +-
 .../compact/LookupMergeFunctionUnitTest.java       | 183 ++++++++++++++++
 .../mergetree/compact/SortMergeReaderTestBase.java |   3 +-
 .../apache/paimon/flink/DeletionVectorITCase.java  |  14 ++
 26 files changed, 1067 insertions(+), 98 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 80f6d51046..81a4c9990f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -633,6 +633,18 @@ Mainly to resolve data skew on primary keys. We recommend 
starting with 64 mb wh
             <td><p>Enum</p></td>
             <td>The local file type for lookup.<br /><br />Possible 
values:<ul><li>"sort": Construct a sorted file for lookup.</li><li>"hash": 
Construct a hash file for lookup.</li></ul></td>
         </tr>
+        <tr>
+            <td><h5>lookup.merge-buffer-size</h5></td>
+            <td style="word-wrap: break-word;">8 mb</td>
+            <td>MemorySize</td>
+            <td>Buffer memory size for one key merging in lookup.</td>
+        </tr>
+        <tr>
+            <td><h5>lookup.merge-records-threshold</h5></td>
+            <td style="word-wrap: break-word;">1024</td>
+            <td>Integer</td>
+            <td>Threshold for merging records to binary buffer in lookup.</td>
+        </tr>
         <tr>
             <td><h5>manifest.compression</h5></td>
             <td style="word-wrap: break-word;">"zstd"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 5d1b168100..914c250151 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1950,6 +1950,18 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether index file in data file 
directory.");
 
+    public static final ConfigOption<MemorySize> LOOKUP_MERGE_BUFFER_SIZE =
+            key("lookup.merge-buffer-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.VALUE_8_MB)
+                    .withDescription("Buffer memory size for one key merging 
in lookup.");
+
+    public static final ConfigOption<Integer> LOOKUP_MERGE_RECORDS_THRESHOLD =
+            key("lookup.merge-records-threshold")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription("Threshold for merging records to binary 
buffer in lookup.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -2989,6 +3001,14 @@ public class CoreOptions implements Serializable {
         }
     }
 
+    public long lookupMergeBufferSize() {
+        return options.get(LOOKUP_MERGE_BUFFER_SIZE).getBytes();
+    }
+
+    public int lookupMergeRecordsThreshold() {
+        return options.get(LOOKUP_MERGE_RECORDS_THRESHOLD);
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/paimon-api/src/main/java/org/apache/paimon/options/MemorySize.java 
b/paimon-api/src/main/java/org/apache/paimon/options/MemorySize.java
index 9b5d7d7ae4..3f041776a8 100644
--- a/paimon-api/src/main/java/org/apache/paimon/options/MemorySize.java
+++ b/paimon-api/src/main/java/org/apache/paimon/options/MemorySize.java
@@ -56,6 +56,10 @@ public class MemorySize implements java.io.Serializable, 
Comparable<MemorySize>
 
     public static final MemorySize MAX_VALUE = new MemorySize(Long.MAX_VALUE);
 
+    public static final MemorySize VALUE_32_KB = MemorySize.ofKibiBytes(32);
+
+    public static final MemorySize VALUE_8_MB = MemorySize.ofMebiBytes(8);
+
     public static final MemorySize VALUE_128_MB = MemorySize.ofMebiBytes(128);
 
     public static final MemorySize VALUE_256_MB = MemorySize.ofMebiBytes(256);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/memory/UnlimitedSegmentPool.java
 
b/paimon-common/src/main/java/org/apache/paimon/memory/UnlimitedSegmentPool.java
new file mode 100644
index 0000000000..fb99cd987c
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/memory/UnlimitedSegmentPool.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.memory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** MemorySegment pool from heap with no limit. */
+public class UnlimitedSegmentPool implements MemorySegmentPool {
+
+    private final int pageSize;
+
+    public UnlimitedSegmentPool(int pageSize) {
+        this.pageSize = pageSize;
+    }
+
+    @Override
+    public int pageSize() {
+        return pageSize;
+    }
+
+    @Override
+    public void returnAll(List<MemorySegment> memory) {}
+
+    @Override
+    public int freePages() {
+        return Integer.MAX_VALUE;
+    }
+
+    @Nullable
+    @Override
+    public MemorySegment nextSegment() {
+        return MemorySegment.allocateHeapMemory(pageSize);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index fe14048310..cfda37ea83 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -207,7 +207,6 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
     public CloseableIterator<BinaryRow> endBoostrapWithoutEmit(boolean 
isEndInput)
             throws Exception {
         bootstrap = false;
-        bootstrapRecords.complete();
         boolean isEmpty = true;
         if (bootstrapKeys.size() > 0) {
             RocksDBBulkLoader bulkLoader = keyIndex.createBulkLoader();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
index 638b152802..138061cc6c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -38,8 +38,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.paimon.utils.Preconditions.checkState;
-
 /** An external buffer for storing rows, it will spill the data to disk when 
the memory is full. */
 public class ExternalBuffer implements RowBuffer {
 
@@ -58,9 +56,7 @@ public class ExternalBuffer implements RowBuffer {
     private final List<ChannelWithMeta> spilledChannelIDs;
     private int numRows;
 
-    private boolean addCompleted;
-
-    ExternalBuffer(
+    public ExternalBuffer(
             IOManager ioManager,
             MemorySegmentPool pool,
             AbstractRowDataSerializer<?> serializer,
@@ -83,8 +79,6 @@ public class ExternalBuffer implements RowBuffer {
 
         this.numRows = 0;
 
-        this.addCompleted = false;
-
         //noinspection unchecked
         this.inMemoryBuffer =
                 new InMemoryBuffer(pool, 
(AbstractRowDataSerializer<InternalRow>) serializer);
@@ -95,7 +89,6 @@ public class ExternalBuffer implements RowBuffer {
         clearChannels();
         inMemoryBuffer.reset();
         numRows = 0;
-        addCompleted = false;
     }
 
     @Override
@@ -120,7 +113,6 @@ public class ExternalBuffer implements RowBuffer {
 
     @Override
     public boolean put(InternalRow row) throws IOException {
-        checkState(!addCompleted, "This buffer has add completed.");
         if (!inMemoryBuffer.put(row)) {
             // Check if record is too big.
             if (inMemoryBuffer.getCurrentDataBufferOffset() == 0) {
@@ -136,14 +128,8 @@ public class ExternalBuffer implements RowBuffer {
         return true;
     }
 
-    @Override
-    public void complete() {
-        addCompleted = true;
-    }
-
     @Override
     public RowBufferIterator newIterator() {
-        checkState(addCompleted, "This buffer has not add completed.");
         return new BufferIterator();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
index 83c4e423b0..b0c5bf413f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
@@ -50,7 +50,8 @@ public class InMemoryBuffer implements RowBuffer {
 
     private boolean isInitialized;
 
-    InMemoryBuffer(MemorySegmentPool pool, 
AbstractRowDataSerializer<InternalRow> serializer) {
+    public InMemoryBuffer(
+            MemorySegmentPool pool, AbstractRowDataSerializer<InternalRow> 
serializer) {
         // serializer has states, so we must duplicate
         this.serializer = (AbstractRowDataSerializer<InternalRow>) 
serializer.duplicate();
         this.pool = pool;
@@ -109,9 +110,6 @@ public class InMemoryBuffer implements RowBuffer {
         return numRecords;
     }
 
-    @Override
-    public void complete() {}
-
     @Override
     public long memoryOccupancy() {
         return currentDataBufferOffset;
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
index 7a0e3e2d49..61bfa4ca06 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
@@ -37,8 +37,6 @@ public interface RowBuffer {
 
     long memoryOccupancy();
 
-    void complete();
-
     void reset();
 
     boolean flushMemory() throws IOException;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
new file mode 100644
index 0000000000..597e2847b5
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.disk.ExternalBuffer;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.InMemoryBuffer;
+import org.apache.paimon.disk.RowBuffer;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.memory.UnlimitedSegmentPool;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
+import org.apache.paimon.utils.LazyField;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/** A buffer to cache {@link KeyValue}s. */
+public interface KeyValueBuffer {
+
+    void reset();
+
+    void put(KeyValue kv);
+
+    CloseableIterator<KeyValue> iterator();
+
+    /** A {@link KeyValueBuffer} implemented by hybrid. */
+    class HybridBuffer implements KeyValueBuffer {
+
+        private final int threshold;
+        private final ListBuffer listBuffer;
+        private final LazyField<BinaryBuffer> lazyBinaryBuffer;
+
+        private @Nullable BinaryBuffer binaryBuffer;
+
+        public HybridBuffer(int threshold, LazyField<BinaryBuffer> 
lazyBinaryBuffer) {
+            this.threshold = threshold;
+            this.listBuffer = new ListBuffer();
+            this.lazyBinaryBuffer = lazyBinaryBuffer;
+        }
+
+        @Override
+        public void reset() {
+            listBuffer.reset();
+            if (binaryBuffer != null) {
+                binaryBuffer.reset();
+                binaryBuffer = null;
+            }
+        }
+
+        @Override
+        public void put(KeyValue kv) {
+            if (binaryBuffer != null) {
+                binaryBuffer.put(kv);
+            } else {
+                listBuffer.put(kv);
+                if (listBuffer.list.size() > threshold) {
+                    spillToBinary();
+                }
+            }
+        }
+
+        private void spillToBinary() {
+            BinaryBuffer binaryBuffer = lazyBinaryBuffer.get();
+            try (CloseableIterator<KeyValue> iterator = listBuffer.iterator()) 
{
+                binaryBuffer.put(iterator.next());
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            this.listBuffer.reset();
+            this.binaryBuffer = binaryBuffer;
+        }
+
+        @Override
+        public CloseableIterator<KeyValue> iterator() {
+            if (binaryBuffer != null) {
+                return binaryBuffer.iterator();
+            }
+            return listBuffer.iterator();
+        }
+    }
+
+    /** A {@link KeyValueBuffer} implemented by a list. */
+    class ListBuffer implements KeyValueBuffer {
+
+        private final List<KeyValue> list = new ArrayList<>();
+
+        @Override
+        public CloseableIterator<KeyValue> iterator() {
+            return CloseableIterator.adapterForIterator(list.iterator());
+        }
+
+        @Override
+        public void reset() {
+            list.clear();
+        }
+
+        @Override
+        public void put(KeyValue kv) {
+            list.add(kv);
+        }
+    }
+
+    /** A {@link KeyValueBuffer} implemented by binary with spilling. */
+    class BinaryBuffer implements KeyValueBuffer {
+
+        private final RowBuffer buffer;
+        private final KeyValueWithLevelNoReusingSerializer kvSerializer;
+
+        public BinaryBuffer(RowBuffer buffer, 
KeyValueWithLevelNoReusingSerializer kvSerializer) {
+            this.buffer = buffer;
+            this.kvSerializer = kvSerializer;
+        }
+
+        @Override
+        public void reset() {
+            buffer.reset();
+        }
+
+        @Override
+        public void put(KeyValue kv) {
+            try {
+                boolean success = buffer.put(kvSerializer.toRow(kv));
+                if (!success) {
+                    throw new RuntimeException("This is a bug!");
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public CloseableIterator<KeyValue> iterator() {
+            @SuppressWarnings("resource")
+            RowBuffer.RowBufferIterator iterator = buffer.newIterator();
+            return new CloseableIterator<KeyValue>() {
+
+                private boolean hasNextWasCalled = false;
+                private boolean hasNext = false;
+
+                @Override
+                public boolean hasNext() {
+                    if (!hasNextWasCalled) {
+                        hasNext = iterator.advanceNext();
+                        hasNextWasCalled = true;
+                    }
+                    return hasNext;
+                }
+
+                @Override
+                public KeyValue next() {
+                    if (!hasNext) {
+                        throw new NoSuchElementException();
+                    }
+                    hasNextWasCalled = false;
+                    return kvSerializer.fromRow(iterator.getRow().copy());
+                }
+
+                @Override
+                public void close() {
+                    iterator.close();
+                }
+            };
+        }
+    }
+
+    static BinaryBuffer createBinaryBuffer(
+            CoreOptions options,
+            RowType keyType,
+            RowType valueType,
+            @Nullable IOManager ioManager) {
+        KeyValueWithLevelNoReusingSerializer kvSerializer =
+                new KeyValueWithLevelNoReusingSerializer(keyType, valueType);
+        MemorySegmentPool pool =
+                ioManager == null
+                        ? new UnlimitedSegmentPool(options.pageSize())
+                        : new HeapMemorySegmentPool(
+                                options.lookupMergeBufferSize(), 
options.pageSize());
+        InternalRowSerializer serializer = new 
InternalRowSerializer(kvSerializer.fieldTypes());
+        RowBuffer buffer =
+                ioManager == null
+                        ? new InMemoryBuffer(pool, serializer)
+                        : new ExternalBuffer(
+                                ioManager,
+                                pool,
+                                serializer,
+                                options.writeBufferSpillDiskSize(),
+                                options.spillCompressOptions());
+        return new BinaryBuffer(buffer, kvSerializer);
+    }
+
+    static void insertInto(
+            KeyValueBuffer buffer, KeyValue highLevel, Comparator<KeyValue> 
comparator) {
+        List<KeyValue> newCandidates = new ArrayList<>();
+        try (CloseableIterator<KeyValue> iterator = buffer.iterator()) {
+            while (iterator.hasNext()) {
+                KeyValue candidate = iterator.next();
+                if (highLevel != null && comparator.compare(highLevel, 
candidate) < 0) {
+                    newCandidates.add(highLevel);
+                    newCandidates.add(candidate);
+                    highLevel = null;
+                } else {
+                    newCandidates.add(candidate);
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        if (highLevel != null) {
+            newCandidates.add(highLevel);
+        }
+        buffer.reset();
+        for (KeyValue kv : newCandidates) {
+            buffer.put(kv);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index 6c9376ae33..3fccca9b5a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -30,10 +30,7 @@ import org.apache.paimon.utils.UserDefinedSeqComparator;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.function.Function;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -106,7 +103,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
     public ChangelogResult getResult() {
         // 1. Find the latest high level record and compute containLevel0
         KeyValue highLevel = mergeFunction.pickHighLevel();
-        boolean containLevel0 = containLevel0();
+        boolean containLevel0 = mergeFunction.containLevel0();
 
         // 2. Lookup if latest high level record is absent
         if (highLevel == null) {
@@ -122,7 +119,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
                 }
             }
             if (highLevel != null) {
-                insertInto(mergeFunction.candidates(), highLevel);
+                mergeFunction.insertInto(highLevel, comparator);
             }
         }
 
@@ -138,33 +135,6 @@ public class LookupChangelogMergeFunctionWrapper<T>
         return reusedResult.setResult(result);
     }
 
-    public boolean containLevel0() {
-        for (KeyValue kv : mergeFunction.candidates()) {
-            if (kv.level() == 0) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    private void insertInto(LinkedList<KeyValue> candidates, KeyValue 
highLevel) {
-        List<KeyValue> newCandidates = new ArrayList<>();
-        for (KeyValue candidate : candidates) {
-            if (highLevel != null && comparator.compare(highLevel, candidate) 
< 0) {
-                newCandidates.add(highLevel);
-                newCandidates.add(candidate);
-                highLevel = null;
-            } else {
-                newCandidates.add(candidate);
-            }
-        }
-        if (highLevel != null) {
-            newCandidates.add(highLevel);
-        }
-        candidates.clear();
-        candidates.addAll(newCandidates);
-    }
-
     private void setChangelog(@Nullable KeyValue before, KeyValue after) {
         if (before == null || !before.isAdd()) {
             if (after.isAdd()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 1537677ffc..bcd0692757 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -18,12 +18,21 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.mergetree.compact.KeyValueBuffer.BinaryBuffer;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.LazyField;
 
 import javax.annotation.Nullable;
 
-import java.util.LinkedList;
+import java.util.Comparator;
+import java.util.function.Supplier;
+
+import static 
org.apache.paimon.mergetree.compact.KeyValueBuffer.createBinaryBuffer;
 
 /**
  * A {@link MergeFunction} for lookup, this wrapper only considers the latest 
high level record,
@@ -33,58 +42,91 @@ import java.util.LinkedList;
 public class LookupMergeFunction implements MergeFunction<KeyValue> {
 
     private final MergeFunction<KeyValue> mergeFunction;
-    private final LinkedList<KeyValue> candidates = new LinkedList<>();
 
-    public LookupMergeFunction(MergeFunction<KeyValue> mergeFunction) {
+    private final KeyValueBuffer candidates;
+    private boolean containLevel0;
+    private InternalRow currentKey;
+
+    public LookupMergeFunction(
+            MergeFunction<KeyValue> mergeFunction,
+            CoreOptions options,
+            RowType keyType,
+            RowType valueType,
+            @Nullable IOManager ioManager) {
         this.mergeFunction = mergeFunction;
+        Supplier<BinaryBuffer> binarySupplier =
+                () -> createBinaryBuffer(options, keyType, valueType, 
ioManager);
+        int threshold = options == null ? 1024 : 
options.lookupMergeRecordsThreshold();
+        this.candidates =
+                new KeyValueBuffer.HybridBuffer(threshold, new 
LazyField<>(binarySupplier));
     }
 
     @Override
     public void reset() {
-        candidates.clear();
+        candidates.reset();
+        currentKey = null;
+        containLevel0 = false;
     }
 
     @Override
     public void add(KeyValue kv) {
-        candidates.add(kv);
+        currentKey = kv.key();
+        if (kv.level() == 0) {
+            containLevel0 = true;
+        }
+        candidates.put(kv);
+    }
+
+    public boolean containLevel0() {
+        return containLevel0;
     }
 
     @Nullable
     public KeyValue pickHighLevel() {
         KeyValue highLevel = null;
-        for (KeyValue kv : candidates) {
-            // records that has not been stored on the disk yet, such as the 
data in the write
-            // buffer being at level -1
-            if (kv.level() <= 0) {
-                continue;
-            }
-            // For high-level comparison logic (not involving Level 0), only 
the value of the
-            // minimum Level should be selected
-            if (highLevel == null || kv.level() < highLevel.level()) {
-                highLevel = kv;
+        try (CloseableIterator<KeyValue> iterator = candidates.iterator()) {
+            while (iterator.hasNext()) {
+                KeyValue kv = iterator.next();
+                // records that has not been stored on the disk yet, such as 
the data in the write
+                // buffer being at level -1
+                if (kv.level() <= 0) {
+                    continue;
+                }
+                // For high-level comparison logic (not involving Level 0), 
only the value of the
+                // minimum Level should be selected
+                if (highLevel == null || kv.level() < highLevel.level()) {
+                    highLevel = kv;
+                }
             }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
         return highLevel;
     }
 
     public InternalRow key() {
-        return candidates.get(0).key();
+        return currentKey;
     }
 
-    public LinkedList<KeyValue> candidates() {
-        return candidates;
+    public void insertInto(KeyValue highLevel, Comparator<KeyValue> 
comparator) {
+        KeyValueBuffer.insertInto(candidates, highLevel, comparator);
     }
 
     @Override
     public KeyValue getResult() {
         mergeFunction.reset();
         KeyValue highLevel = pickHighLevel();
-        for (KeyValue kv : candidates) {
-            // records that has not been stored on the disk yet, such as the 
data in the write
-            // buffer being at level -1
-            if (kv.level() <= 0 || kv == highLevel) {
-                mergeFunction.add(kv);
+        try (CloseableIterator<KeyValue> iterator = candidates.iterator()) {
+            while (iterator.hasNext()) {
+                KeyValue kv = iterator.next();
+                // records that has not been stored on the disk yet, such as 
the data in the write
+                // buffer being at level -1
+                if (kv.level() <= 0 || kv == highLevel) {
+                    mergeFunction.add(kv);
+                }
             }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
         return mergeFunction.getResult();
     }
@@ -94,28 +136,50 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
         return true;
     }
 
-    public static MergeFunctionFactory<KeyValue> 
wrap(MergeFunctionFactory<KeyValue> wrapped) {
+    public static MergeFunctionFactory<KeyValue> wrap(
+            MergeFunctionFactory<KeyValue> wrapped,
+            CoreOptions options,
+            RowType keyType,
+            RowType valueType) {
         if (wrapped.create() instanceof FirstRowMergeFunction) {
             // don't wrap first row, it is already OK
             return wrapped;
         }
 
-        return new Factory(wrapped);
+        return new Factory(wrapped, options, keyType, valueType);
     }
 
-    private static class Factory implements MergeFunctionFactory<KeyValue> {
+    /** Factory to create {@link LookupMergeFunction}. */
+    public static class Factory implements MergeFunctionFactory<KeyValue> {
 
         private static final long serialVersionUID = 1L;
 
         private final MergeFunctionFactory<KeyValue> wrapped;
+        private final CoreOptions options;
+        private final RowType keyType;
+        private final RowType valueType;
+
+        private @Nullable IOManager ioManager;
 
-        private Factory(MergeFunctionFactory<KeyValue> wrapped) {
+        private Factory(
+                MergeFunctionFactory<KeyValue> wrapped,
+                CoreOptions options,
+                RowType keyType,
+                RowType valueType) {
             this.wrapped = wrapped;
+            this.options = options;
+            this.keyType = keyType;
+            this.valueType = valueType;
+        }
+
+        public void withIOManager(@Nullable IOManager ioManager) {
+            this.ioManager = ioManager;
         }
 
         @Override
         public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
-            return new LookupMergeFunction(wrapped.create(projection));
+            return new LookupMergeFunction(
+                    wrapped.create(projection), options, keyType, valueType, 
ioManager);
         }
 
         @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index afd3fb906f..24b08f06c1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -31,6 +31,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.RowCompactedSerializer;
 import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.index.DynamicBucketIndexMaintainer;
@@ -54,6 +55,7 @@ import org.apache.paimon.mergetree.compact.CompactStrategy;
 import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
 import 
org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
 import org.apache.paimon.mergetree.compact.FullCompactTrigger;
+import org.apache.paimon.mergetree.compact.LookupMergeFunction;
 import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
 import 
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.FirstRowMergeFunctionWrapperFactory;
 import 
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.LookupMergeFunctionWrapperFactory;
@@ -106,7 +108,6 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     private final Supplier<RecordEqualiser> logDedupEqualSupplier;
     private final MergeFunctionFactory<KeyValue> mfFactory;
     private final CoreOptions options;
-    private final FileIO fileIO;
     private final RowType keyType;
     private final RowType valueType;
     private final RowType partitionType;
@@ -143,7 +144,6 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 dbMaintainerFactory,
                 dvMaintainerFactory,
                 tableName);
-        this.fileIO = fileIO;
         this.partitionType = partitionType;
         this.keyType = keyType;
         this.valueType = valueType;
@@ -177,6 +177,15 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         this.options = options;
     }
 
+    @Override
+    public KeyValueFileStoreWrite withIOManager(IOManager ioManager) {
+        super.withIOManager(ioManager);
+        if (mfFactory instanceof LookupMergeFunction.Factory) {
+            ((LookupMergeFunction.Factory) mfFactory).withIOManager(ioManager);
+        }
+        return this;
+    }
+
     @Override
     protected MergeTreeWriter createWriter(
             BinaryRow partition,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 07c9215f7e..08344c08b5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -33,6 +33,7 @@ import org.apache.paimon.mergetree.MergeTreeReaders;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.mergetree.compact.IntervalPartition;
+import org.apache.paimon.mergetree.compact.LookupMergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import 
org.apache.paimon.mergetree.compact.MergeFunctionFactory.AdjustedProjection;
 import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
@@ -186,6 +187,9 @@ public class MergeFileSplitRead implements 
SplitRead<KeyValue> {
     @Override
     public MergeFileSplitRead withIOManager(IOManager ioManager) {
         this.mergeSorter.setIOManager(ioManager);
+        if (mfFactory instanceof LookupMergeFunction.Factory) {
+            ((LookupMergeFunction.Factory) mfFactory).withIOManager(ioManager);
+        }
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
index 4978ec6b3d..a6ba8febe5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.avro.AvroSchemaConverter;
 import org.apache.paimon.fs.FileIO;
@@ -29,6 +30,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.mergetree.compact.LookupMergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.FileStoreWrite;
@@ -147,6 +149,15 @@ public class PostponeBucketFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue>
         withIgnorePreviousFiles(true);
     }
 
+    @Override
+    public PostponeBucketFileStoreWrite withIOManager(IOManager ioManager) {
+        super.withIOManager(ioManager);
+        if (mfFactory instanceof LookupMergeFunction.Factory) {
+            ((LookupMergeFunction.Factory) mfFactory).withIOManager(ioManager);
+        }
+        return this;
+    }
+
     @Override
     protected void forceBufferSpill() throws Exception {
         if (ioManager == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 4901638b65..b17f307f8d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -76,10 +76,12 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
             KeyValueFieldsExtractor extractor =
                     PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR;
 
+            RowType keyType = new RowType(extractor.keyFields(tableSchema));
+
             MergeFunctionFactory<KeyValue> mfFactory =
                     
PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor);
             if (options.needLookup()) {
-                mfFactory = LookupMergeFunction.wrap(mfFactory);
+                mfFactory = LookupMergeFunction.wrap(mfFactory, options, 
keyType, rowType);
             }
 
             lazyStore =
@@ -92,7 +94,7 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
                             tableSchema.logicalPartitionType(),
                             PrimaryKeyTableUtils.addKeyNamePrefix(
                                     tableSchema.logicalBucketKeyType()),
-                            new RowType(extractor.keyFields(tableSchema)),
+                            keyType,
                             rowType,
                             extractor,
                             mfFactory,
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java
index c4596a1df4..439eea707c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java
@@ -172,7 +172,6 @@ public interface SinkWriter<T> {
         public List<DataFileMeta> flush() throws IOException {
             List<DataFileMeta> flushedFiles = new ArrayList<>();
             if (writeBuffer != null) {
-                writeBuffer.complete();
                 RollingFileWriter<T, DataFileMeta> writer = 
writerSupplier.get();
                 IOException exception = null;
                 try (RowBuffer.RowBufferIterator iterator = 
writeBuffer.newIterator()) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java 
b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
index 130cf32e92..6265258d10 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
@@ -214,7 +214,6 @@ public class ExternalBufferTest {
     private List<Long> insertMulti(ExternalBuffer buffer, int cnt) throws 
IOException {
         ArrayList<Long> expected = new ArrayList<>(cnt);
         insertMulti(buffer, cnt, expected);
-        buffer.complete();
         return expected;
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java 
b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
index 039be29398..ad84ae2f8c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
@@ -198,7 +198,6 @@ public class InMemoryBufferTest {
 
         @Override
         public void flushMemory() {
-            inMemoryBuffer.complete();
             // emulate real-world flushing data to disk, we need to call 
newIterator method
             inMemoryBuffer.newIterator();
             inMemoryBuffer.reset();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index ee34ac2e71..19af07dda6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -266,7 +266,7 @@ public abstract class SortBufferWriteBufferTestBase {
                             Arrays.asList("f0", "f1"),
                             Arrays.asList(DataTypes.INT().notNull(), 
DataTypes.BIGINT()),
                             Collections.singletonList("f0"));
-            return LookupMergeFunction.wrap(aggMergeFunction).create();
+            return LookupMergeFunction.wrap(aggMergeFunction, null, null, 
null).create();
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferInsertIntoTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferInsertIntoTest.java
new file mode 100644
index 0000000000..cf85164d95
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferInsertIntoTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.disk.InMemoryBuffer;
+import org.apache.paimon.disk.RowBuffer;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.mergetree.compact.KeyValueBuffer.BinaryBuffer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link KeyValueBuffer#insertInto(KeyValueBuffer, KeyValue, 
Comparator)}. */
+public class KeyValueBufferInsertIntoTest {
+
+    private RowType keyType;
+    private RowType valueType;
+
+    @BeforeEach
+    public void beforeEach() {
+        this.keyType =
+                new RowType(
+                        new ArrayList<DataField>() {
+                            {
+                                add(new DataField(0, "key", new IntType()));
+                            }
+                        });
+        this.valueType =
+                new RowType(
+                        new ArrayList<DataField>() {
+                            {
+                                add(new DataField(0, "value", new IntType()));
+                            }
+                        });
+    }
+
+    private KeyValueBuffer createBuffer() {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+        KeyValueWithLevelNoReusingSerializer kvSerializer =
+                new KeyValueWithLevelNoReusingSerializer(keyType, valueType);
+        RowBuffer rowBuffer =
+                new InMemoryBuffer(
+                        new HeapMemorySegmentPool(8 * 1024, 1024),
+                        new InternalRowSerializer(kvSerializer.fieldTypes()));
+        return rnd.nextBoolean() ? createBuffer() : new 
BinaryBuffer(rowBuffer, kvSerializer);
+    }
+
+    /**
+     * Test insertInto method with an empty buffer. It should add the 
highLevel KeyValue to the
+     * buffer.
+     */
+    @Test
+    public void testInsertIntoEmptyBuffer() {
+        KeyValueBuffer buffer = createBuffer();
+        KeyValue highLevel = createKeyValue(1, 10);
+        Comparator<KeyValue> comparator = Comparator.comparingInt(kv -> 
kv.key().getInt(0));
+
+        KeyValueBuffer.insertInto(buffer, highLevel, comparator);
+
+        List<KeyValue> result = collectKeyValues(buffer);
+        assertThat(result).hasSize(1);
+        assertKeyValueEquals(result.get(0), highLevel);
+    }
+
+    /**
+     * Test insertInto method when highLevel should be inserted at the 
beginning. The highLevel
+     * KeyValue has a smaller key than all existing entries.
+     */
+    @Test
+    public void testInsertIntoBeginning() {
+        KeyValueBuffer buffer = createBuffer();
+        // Add existing entries to buffer
+        buffer.put(createKeyValue(3, 30));
+        buffer.put(createKeyValue(5, 50));
+
+        KeyValue highLevel = createKeyValue(1, 10);
+        Comparator<KeyValue> comparator = Comparator.comparingInt(kv -> 
kv.key().getInt(0));
+
+        KeyValueBuffer.insertInto(buffer, highLevel, comparator);
+
+        List<KeyValue> result = collectKeyValues(buffer);
+        assertThat(result).hasSize(3);
+        assertKeyValueEquals(result.get(0), highLevel); // Should be first
+        assertKeyValueEquals(result.get(1), createKeyValue(3, 30));
+        assertKeyValueEquals(result.get(2), createKeyValue(5, 50));
+    }
+
+    /**
+     * Test insertInto method when highLevel should be inserted in the middle. 
The highLevel
+     * KeyValue has a key that fits between existing entries.
+     */
+    @Test
+    public void testInsertIntoMiddle() {
+        KeyValueBuffer buffer = createBuffer();
+        // Add existing entries to buffer
+        buffer.put(createKeyValue(1, 10));
+        buffer.put(createKeyValue(5, 50));
+
+        KeyValue highLevel = createKeyValue(3, 30);
+        Comparator<KeyValue> comparator = Comparator.comparingInt(kv -> 
kv.key().getInt(0));
+
+        KeyValueBuffer.insertInto(buffer, highLevel, comparator);
+
+        List<KeyValue> result = collectKeyValues(buffer);
+        assertThat(result).hasSize(3);
+        assertKeyValueEquals(result.get(0), createKeyValue(1, 10));
+        assertKeyValueEquals(result.get(1), highLevel); // Should be in the 
middle
+        assertKeyValueEquals(result.get(2), createKeyValue(5, 50));
+    }
+
+    /**
+     * Test insertInto method when highLevel should be inserted at the end. 
The highLevel KeyValue
+     * has a larger key than all existing entries.
+     */
+    @Test
+    public void testInsertIntoEnd() {
+        KeyValueBuffer buffer = createBuffer();
+        // Add existing entries to buffer
+        buffer.put(createKeyValue(1, 10));
+        buffer.put(createKeyValue(3, 30));
+
+        KeyValue highLevel = createKeyValue(5, 50);
+        Comparator<KeyValue> comparator = Comparator.comparingInt(kv -> 
kv.key().getInt(0));
+
+        KeyValueBuffer.insertInto(buffer, highLevel, comparator);
+
+        List<KeyValue> result = collectKeyValues(buffer);
+        assertThat(result).hasSize(3);
+        assertKeyValueEquals(result.get(0), createKeyValue(1, 10));
+        assertKeyValueEquals(result.get(1), createKeyValue(3, 30));
+        assertKeyValueEquals(result.get(2), highLevel); // Should be last
+    }
+
+    /**
+     * Test insertInto method with a buffer that has multiple entries with the 
same key. It should
+     * insert highLevel in the correct position based on the comparator.
+     */
+    @Test
+    public void testInsertIntoWithDuplicateKeys() {
+        KeyValueBuffer buffer = createBuffer();
+        // Add existing entries to buffer
+        buffer.put(createKeyValue(1, 10));
+        buffer.put(createKeyValue(3, 30));
+        buffer.put(createKeyValue(3, 35));
+        buffer.put(createKeyValue(5, 50));
+
+        KeyValue highLevel = createKeyValue(3, 25); // Same key as existing 
entries
+        Comparator<KeyValue> comparator =
+                Comparator.comparingInt((KeyValue kv) -> kv.key().getInt(0))
+                        .thenComparingLong(KeyValue::sequenceNumber);
+
+        KeyValueBuffer.insertInto(buffer, highLevel, comparator);
+
+        List<KeyValue> result = collectKeyValues(buffer);
+        assertThat(result).hasSize(5);
+        assertKeyValueEquals(result.get(0), createKeyValue(1, 10));
+        assertKeyValueEquals(result.get(1), createKeyValue(3, 30));
+        assertKeyValueEquals(result.get(2), createKeyValue(3, 35));
+        assertKeyValueEquals(result.get(3), highLevel);
+        assertKeyValueEquals(result.get(4), createKeyValue(5, 50));
+    }
+
+    // Helper method to create a KeyValue for testing
+    private KeyValue createKeyValue(int key, int value) {
+        // Create key as BinaryRow
+        BinaryRow binaryKey = new BinaryRow(1);
+        BinaryRowWriter keyWriter = new BinaryRowWriter(binaryKey);
+        keyWriter.writeInt(0, key);
+        keyWriter.complete();
+
+        // Create value as BinaryRow
+        BinaryRow binaryValue = new BinaryRow(1);
+        BinaryRowWriter valueWriter = new BinaryRowWriter(binaryValue);
+        valueWriter.writeInt(0, value);
+        valueWriter.complete();
+
+        return new KeyValue().replace(binaryKey, key, RowKind.INSERT, 
binaryValue);
+    }
+
+    // Helper method to collect all KeyValues from a buffer
+    private List<KeyValue> collectKeyValues(KeyValueBuffer buffer) {
+        List<KeyValue> result = new ArrayList<>();
+        try (CloseableIterator<KeyValue> iterator = buffer.iterator()) {
+            while (iterator.hasNext()) {
+                result.add(iterator.next());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return result;
+    }
+
+    // Helper method to assert that two KeyValues are equal
+    private void assertKeyValueEquals(KeyValue actual, KeyValue expected) {
+        assertThat(actual.key().getInt(0)).isEqualTo(expected.key().getInt(0));
+        
assertThat(actual.value().getInt(0)).isEqualTo(expected.value().getInt(0));
+        
assertThat(actual.sequenceNumber()).isEqualTo(expected.sequenceNumber());
+        assertThat(actual.valueKind()).isEqualTo(expected.valueKind());
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
new file mode 100644
index 0000000000..e3f485f765
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.mergetree.compact.KeyValueBuffer.BinaryBuffer;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link KeyValueBuffer}. */
+public class KeyValueBufferTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private IOManager ioManager;
+    private RowType keyType;
+    private RowType valueType;
+
+    @BeforeEach
+    public void beforeEach() {
+        this.ioManager = new IOManagerImpl(tempDir.toString());
+        this.keyType =
+                new RowType(
+                        new ArrayList<DataField>() {
+                            {
+                                add(new DataField(0, "key", new IntType()));
+                            }
+                        });
+        this.valueType =
+                new RowType(
+                        new ArrayList<DataField>() {
+                            {
+                                add(new DataField(0, "value", new IntType()));
+                            }
+                        });
+    }
+
+    @AfterEach
+    public void afterEach() {
+        if (ioManager != null) {
+            try {
+                ioManager.close();
+            } catch (Exception e) {
+                // Ignore exception during close
+            }
+        }
+    }
+
+    @Test
+    public void testCreateBinaryBufferWithIOManager() {
+        Options options = new Options();
+        options.set(CoreOptions.LOOKUP_MERGE_BUFFER_SIZE, 
MemorySize.ofMebiBytes(1L));
+
+        BinaryBuffer binaryBuffer =
+                KeyValueBuffer.createBinaryBuffer(
+                        new CoreOptions(options), keyType, valueType, 
ioManager);
+
+        assertThat(binaryBuffer).isNotNull();
+    }
+
+    @Test
+    public void testCreateBinaryBufferWithoutIOManager() {
+        Options options = new Options();
+        options.set(CoreOptions.LOOKUP_MERGE_BUFFER_SIZE, 
MemorySize.ofMebiBytes(1L));
+
+        BinaryBuffer binaryBuffer =
+                KeyValueBuffer.createBinaryBuffer(
+                        new CoreOptions(options), keyType, valueType, null);
+
+        assertThat(binaryBuffer).isNotNull();
+    }
+
+    @Test
+    public void testBinaryBufferPutAndIterator() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.LOOKUP_MERGE_BUFFER_SIZE, 
MemorySize.ofMebiBytes(1L));
+
+        BinaryBuffer binaryBuffer =
+                KeyValueBuffer.createBinaryBuffer(
+                        new CoreOptions(options), keyType, valueType, 
ioManager);
+
+        // Create test data
+        List<KeyValue> testData = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            // Create key as BinaryRow
+            BinaryRow key = new BinaryRow(1);
+            BinaryRowWriter keyWriter = new BinaryRowWriter(key);
+            keyWriter.writeInt(0, i);
+            keyWriter.complete();
+
+            // Create value as BinaryRow
+            BinaryRow value = new BinaryRow(1);
+            BinaryRowWriter valueWriter = new BinaryRowWriter(value);
+            valueWriter.writeInt(0, i * 2);
+            valueWriter.complete();
+
+            testData.add(new KeyValue().replace(key, i, RowKind.INSERT, 
value));
+        }
+
+        // Put data into buffer
+        for (KeyValue kv : testData) {
+            binaryBuffer.put(kv);
+        }
+
+        // Verify data through iterator
+        try (CloseableIterator<KeyValue> iterator = binaryBuffer.iterator()) {
+            int count = 0;
+            while (iterator.hasNext()) {
+                KeyValue kv = iterator.next();
+                KeyValue expected = testData.get(count);
+                
assertThat(kv.key().getInt(0)).isEqualTo(expected.key().getInt(0));
+                
assertThat(kv.value().getInt(0)).isEqualTo(expected.value().getInt(0));
+                
assertThat(kv.sequenceNumber()).isEqualTo(expected.sequenceNumber());
+                assertThat(kv.valueKind()).isEqualTo(expected.valueKind());
+                count++;
+            }
+            assertThat(count).isEqualTo(testData.size());
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 5341c6db69..57d99557ca 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -66,7 +66,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
         Map<InternalRow, KeyValue> highLevel = new HashMap<>();
         LookupChangelogMergeFunctionWrapper function =
                 new LookupChangelogMergeFunctionWrapper(
-                        
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
+                        LookupMergeFunction.wrap(
+                                DeduplicateMergeFunction.factory(), null, 
null, null),
                         highLevel::get,
                         changelogRowDeduplicate ? EQUALISER : null,
                         LookupStrategy.from(false, true, false, false),
@@ -228,7 +229,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
                 ValueEqualiserSupplier.fromIgnoreFields(valueType, 
ignoreFields);
         LookupChangelogMergeFunctionWrapper function =
                 new LookupChangelogMergeFunctionWrapper(
-                        
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
+                        LookupMergeFunction.wrap(
+                                DeduplicateMergeFunction.factory(), null, 
null, null),
                         highLevel::get,
                         logDedupEqualSupplier.get(),
                         LookupStrategy.from(false, true, false, false),
@@ -288,7 +290,10 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                                             
.create(DataTypes.INT(), null, null)
                                                 },
                                                 false,
-                                                null)),
+                                                null),
+                                null,
+                                null,
+                                null),
                         key -> null,
                         changelogRowDeduplicate ? EQUALISER : null,
                         LookupStrategy.from(false, true, false, false),
@@ -377,7 +382,10 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                                             
.create(DataTypes.INT(), null, null)
                                                 },
                                                 false,
-                                                null)),
+                                                null),
+                                null,
+                                null,
+                                null),
                         highLevel::get,
                         null,
                         LookupStrategy.from(false, true, false, false),
@@ -514,7 +522,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
         Map<InternalRow, KeyValue> highLevel = new HashMap<>();
         LookupChangelogMergeFunctionWrapper function =
                 new LookupChangelogMergeFunctionWrapper(
-                        
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
+                        LookupMergeFunction.wrap(
+                                DeduplicateMergeFunction.factory(), null, 
null, null),
                         highLevel::get,
                         null,
                         LookupStrategy.from(false, true, false, false),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
index 03b201ca2f..71ac78259d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
@@ -32,7 +32,9 @@ class LookupMergeFunctionTest {
     public void testKeepLowestHighLevel() {
         LookupMergeFunction function =
                 (LookupMergeFunction)
-                        
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()).create();
+                        LookupMergeFunction.wrap(
+                                        DeduplicateMergeFunction.factory(), 
null, null, null)
+                                .create();
         function.reset();
         function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(2)).setLevel(1));
         function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(2));
@@ -45,7 +47,9 @@ class LookupMergeFunctionTest {
     public void testLevelNegative() {
         LookupMergeFunction function =
                 (LookupMergeFunction)
-                        
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()).create();
+                        LookupMergeFunction.wrap(
+                                        DeduplicateMergeFunction.factory(), 
null, null, null)
+                                .create();
         function.reset();
         function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(2)).setLevel(-1));
         function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(-1));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionUnitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionUnitTest.java
new file mode 100644
index 0000000000..57fb209c7d
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionUnitTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static java.util.Collections.singletonList;
+import static org.apache.paimon.CoreOptions.LOOKUP_MERGE_RECORDS_THRESHOLD;
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.apache.paimon.types.RowKind.INSERT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link LookupMergeFunction}. */
+public class LookupMergeFunctionUnitTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private IOManager ioManager;
+
+    @BeforeEach
+    public void beforeEach() {
+        ioManager = new IOManagerImpl(tempDir.toString());
+    }
+
+    @AfterEach
+    public void afterEach() throws Exception {
+        if (ioManager != null) {
+            ioManager.close();
+        }
+    }
+
+    // Test that the function correctly identifies when level 0 records are 
present
+    @Test
+    public void testContainLevel0() {
+        LookupMergeFunction function = create();
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(1));
+        assertThat(function.containLevel0()).isFalse();
+
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(2)).setLevel(0));
+        assertThat(function.containLevel0()).isTrue();
+    }
+
+    // Test that the function correctly picks the highest level record (lowest 
level number)
+    @Test
+    public void testPickHighLevel() {
+        LookupMergeFunction function = create();
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(3)).setLevel(3));
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(2)).setLevel(2));
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(1));
+
+        KeyValue highLevel = function.pickHighLevel();
+        assertThat(highLevel).isNotNull();
+        assertThat(highLevel.level()).isEqualTo(1);
+        assertThat(highLevel.value().getInt(0)).isEqualTo(1);
+    }
+
+    // Test that level 0 and negative level records are ignored when picking 
high level records
+    @Test
+    public void testPickHighLevelIgnoreLevel0AndNegative() {
+        LookupMergeFunction function = create();
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(4)).setLevel(0));
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(3)).setLevel(-1));
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(2)).setLevel(2));
+
+        KeyValue highLevel = function.pickHighLevel();
+        assertThat(highLevel).isNotNull();
+        assertThat(highLevel.level()).isEqualTo(2);
+        assertThat(highLevel.value().getInt(0)).isEqualTo(2);
+    }
+
+    // Test getResult with a mix of level records including level 0
+    @Test
+    public void testGetResultWithLevel0() {
+        LookupMergeFunction function = create();
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(1));
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(2)).setLevel(0));
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(3)).setLevel(2));
+
+        KeyValue result = function.getResult();
+        assertThat(result).isNotNull();
+        // Should merge level 0 and level 1 (highest level record)
+        assertThat(result.value().getInt(0)).isEqualTo(2); // Value from level 0
+    }
+
+    // Test getResult with only negative level records
+    @Test
+    public void testGetResultWithNegativeLevels() {
+        LookupMergeFunction function = create();
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(2)).setLevel(-1));
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(-1));
+
+        KeyValue result = function.getResult();
+        assertThat(result).isNotNull();
+        // Should merge all negative level records
+        assertThat(result.value().getInt(0)).isEqualTo(1); // Value from the 
last added record
+    }
+
+    // Test that the function correctly stores and returns the current key
+    @Test
+    public void testKey() {
+        LookupMergeFunction function = create();
+        function.reset();
+        BinaryRow key = row(1);
+        function.add(new KeyValue().replace(key, 1, INSERT, 
row(1)).setLevel(1));
+
+        BinaryRow currentKey = (BinaryRow) function.key();
+        assertThat(currentKey).isNotNull();
+        assertThat(currentKey.getInt(0)).isEqualTo(1);
+    }
+
+    @Test
+    public void testBinaryBuffer() {
+        innerTestBinaryBuffer(false);
+    }
+
+    @Test
+    public void testSpillBinaryBuffer() {
+        innerTestBinaryBuffer(true);
+    }
+
+    private void innerTestBinaryBuffer(boolean spill) {
+        Options options = new Options();
+        options.set(LOOKUP_MERGE_RECORDS_THRESHOLD, 3);
+        LookupMergeFunction function = create(options, spill ? ioManager : 
null);
+        function.reset();
+        for (int i = 5; i >= 0; i--) {
+            function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(i)).setLevel(i));
+        }
+        KeyValue result = function.getResult();
+        assertThat(result).isNotNull();
+        assertThat(result.value().getInt(0)).isEqualTo(0);
+    }
+
+    private LookupMergeFunction create() {
+        return create(new Options(), null);
+    }
+
+    private LookupMergeFunction create(Options options, IOManager ioManager) {
+        LookupMergeFunction.Factory factory =
+                (LookupMergeFunction.Factory)
+                        LookupMergeFunction.wrap(
+                                DeduplicateMergeFunction.factory(),
+                                new CoreOptions(options),
+                                new RowType(singletonList(new DataField(0, 
"k", DataTypes.INT()))),
+                                new RowType(singletonList(new DataField(0, 
"v", DataTypes.INT()))));
+        factory.withIOManager(ioManager);
+        return (LookupMergeFunction) factory.create();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index 50eebd5007..83c9cd54eb 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -122,7 +122,8 @@ public abstract class SortMergeReaderTestBase extends 
CombiningRecordReaderTestB
 
         @Override
         protected MergeFunction<KeyValue> createMergeFunction() {
-            return new LookupMergeFunction(new FirstRowMergeFunction(false));
+            return new LookupMergeFunction(
+                    new FirstRowMergeFunction(false), null, null, null, null);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index a524c3633d..36e82a782c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -480,4 +480,18 @@ public class DeletionVectorITCase extends 
CatalogITCaseBase {
         assertThat(inExternalPath).contains("bucket-0/index-");
         assertThat(inExternalPath).doesNotContain("index/index-");
     }
+
+    @Test
+    public void testLookupMergeBufferSize() {
+        sql(
+                "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) 
"
+                        + "WITH ('deletion-vectors.enabled' = 'true', 
'lookup.merge-records-threshold' = '2')");
+        for (int i = 0; i < 5; i++) {
+            sql(
+                    String.format(
+                            "INSERT INTO T /*+ OPTIONS('write-only' = '%s') */ 
VALUES (1, '%s')",
+                            i != 4, i));
+        }
+        assertThat(sql("SELECT * FROM T")).containsExactly(Row.of(1, 
String.valueOf(4)));
+    }
 }

Reply via email to