This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit fff9bfe5bb7c1c2e80460a3e8746100b95c9991e
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 12 13:43:38 2025 +0800

    [core] Fix spillToBinary in KeyValueBuffer (#6586)
    
    (cherry picked from commit 3c3ba1c52253d887017e84e79c10c060938c2c84)
---
 .../paimon/mergetree/compact/KeyValueBuffer.java   | 31 +++++++--
 .../mergetree/compact/LookupMergeFunction.java     | 11 +--
 .../mergetree/compact/KeyValueBufferTest.java      | 81 +++++++++++++++-------
 3 files changed, 84 insertions(+), 39 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
index 597e2847b5..7e40ddaa86 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree.compact;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.disk.ExternalBuffer;
 import org.apache.paimon.disk.IOManager;
@@ -40,6 +41,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.function.Supplier;
 
 /** A buffer to cache {@link KeyValue}s. */
 public interface KeyValueBuffer {
@@ -65,6 +67,12 @@ public interface KeyValueBuffer {
             this.lazyBinaryBuffer = lazyBinaryBuffer;
         }
 
+        @Nullable
+        @VisibleForTesting
+        BinaryBuffer binaryBuffer() {
+            return binaryBuffer;
+        }
+
         @Override
         public void reset() {
             listBuffer.reset();
@@ -89,7 +97,9 @@ public interface KeyValueBuffer {
         private void spillToBinary() {
             BinaryBuffer binaryBuffer = lazyBinaryBuffer.get();
             try (CloseableIterator<KeyValue> iterator = listBuffer.iterator()) 
{
-                binaryBuffer.put(iterator.next());
+                while (iterator.hasNext()) {
+                    binaryBuffer.put(iterator.next());
+                }
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
@@ -162,20 +172,20 @@ public interface KeyValueBuffer {
             return new CloseableIterator<KeyValue>() {
 
                 private boolean hasNextWasCalled = false;
-                private boolean hasNext = false;
+                private boolean nextResult = false;
 
                 @Override
                 public boolean hasNext() {
                     if (!hasNextWasCalled) {
-                        hasNext = iterator.advanceNext();
+                        nextResult = iterator.advanceNext();
                         hasNextWasCalled = true;
                     }
-                    return hasNext;
+                    return nextResult;
                 }
 
                 @Override
                 public KeyValue next() {
-                    if (!hasNext) {
+                    if (!hasNext()) {
                         throw new NoSuchElementException();
                     }
                     hasNextWasCalled = false;
@@ -215,6 +225,17 @@ public interface KeyValueBuffer {
         return new BinaryBuffer(buffer, kvSerializer);
     }
 
+    static HybridBuffer createHybridBuffer(
+            CoreOptions options,
+            RowType keyType,
+            RowType valueType,
+            @Nullable IOManager ioManager) {
+        Supplier<BinaryBuffer> binarySupplier =
+                () -> createBinaryBuffer(options, keyType, valueType, 
ioManager);
+        int threshold = options == null ? 1024 : 
options.lookupMergeRecordsThreshold();
+        return new HybridBuffer(threshold, new LazyField<>(binarySupplier));
+    }
+
     static void insertInto(
             KeyValueBuffer buffer, KeyValue highLevel, Comparator<KeyValue> 
comparator) {
         List<KeyValue> newCandidates = new ArrayList<>();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index bcd0692757..1bd9aaa843 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -22,17 +22,12 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.mergetree.compact.KeyValueBuffer.BinaryBuffer;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CloseableIterator;
-import org.apache.paimon.utils.LazyField;
 
 import javax.annotation.Nullable;
 
 import java.util.Comparator;
-import java.util.function.Supplier;
-
-import static 
org.apache.paimon.mergetree.compact.KeyValueBuffer.createBinaryBuffer;
 
 /**
  * A {@link MergeFunction} for lookup, this wrapper only considers the latest 
high level record,
@@ -54,11 +49,7 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
             RowType valueType,
             @Nullable IOManager ioManager) {
         this.mergeFunction = mergeFunction;
-        Supplier<BinaryBuffer> binarySupplier =
-                () -> createBinaryBuffer(options, keyType, valueType, 
ioManager);
-        int threshold = options == null ? 1024 : 
options.lookupMergeRecordsThreshold();
-        this.candidates =
-                new KeyValueBuffer.HybridBuffer(threshold, new 
LazyField<>(binarySupplier));
+        this.candidates = KeyValueBuffer.createHybridBuffer(options, keyType, 
valueType, ioManager);
     }
 
     @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
index e3f485f765..d04015b370 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
@@ -25,10 +25,11 @@ import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.mergetree.compact.KeyValueBuffer.BinaryBuffer;
+import org.apache.paimon.mergetree.compact.KeyValueBuffer.HybridBuffer;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CloseableIterator;
@@ -40,7 +41,10 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.NoSuchElementException;
 
+import static java.util.Collections.singletonList;
+import static org.apache.paimon.CoreOptions.LOOKUP_MERGE_RECORDS_THRESHOLD;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link KeyValueBuffer}. */
@@ -55,30 +59,14 @@ public class KeyValueBufferTest {
     @BeforeEach
     public void beforeEach() {
         this.ioManager = new IOManagerImpl(tempDir.toString());
-        this.keyType =
-                new RowType(
-                        new ArrayList<DataField>() {
-                            {
-                                add(new DataField(0, "key", new IntType()));
-                            }
-                        });
-        this.valueType =
-                new RowType(
-                        new ArrayList<DataField>() {
-                            {
-                                add(new DataField(0, "value", new IntType()));
-                            }
-                        });
+        this.keyType = new RowType(singletonList(new DataField(0, "key", 
DataTypes.INT())));
+        this.valueType = new RowType(singletonList(new DataField(0, "value", 
DataTypes.INT())));
     }
 
     @AfterEach
-    public void afterEach() {
+    public void afterEach() throws Exception {
         if (ioManager != null) {
-            try {
-                ioManager.close();
-            } catch (Exception e) {
-                // Ignore exception during close
-            }
+            ioManager.close();
         }
     }
 
@@ -114,10 +102,35 @@ public class KeyValueBufferTest {
         BinaryBuffer binaryBuffer =
                 KeyValueBuffer.createBinaryBuffer(
                         new CoreOptions(options), keyType, valueType, 
ioManager);
+        innerTestBuffer(binaryBuffer, 10);
+    }
+
+    @Test
+    public void testHybridBufferWithoutFallback() throws Exception {
+        innerTestHybridBuffer(false);
+    }
+
+    @Test
+    public void testHybridBufferWithFallback() throws Exception {
+        innerTestHybridBuffer(true);
+    }
+
+    private void innerTestHybridBuffer(boolean fallbackToBinary) throws 
Exception {
+        Options options = new Options();
+        if (fallbackToBinary) {
+            options.set(LOOKUP_MERGE_RECORDS_THRESHOLD, 100);
+        }
+        HybridBuffer buffer =
+                KeyValueBuffer.createHybridBuffer(
+                        new CoreOptions(options), keyType, valueType, 
ioManager);
+        innerTestBuffer(buffer, 200);
+        assertThat(buffer.binaryBuffer() != null).isEqualTo(fallbackToBinary);
+    }
 
+    private void innerTestBuffer(KeyValueBuffer buffer, int recordNumber) 
throws Exception {
         // Create test data
         List<KeyValue> testData = new ArrayList<>();
-        for (int i = 0; i < 10; i++) {
+        for (int i = 0; i < recordNumber; i++) {
             // Create key as BinaryRow
             BinaryRow key = new BinaryRow(1);
             BinaryRowWriter keyWriter = new BinaryRowWriter(key);
@@ -135,11 +148,11 @@ public class KeyValueBufferTest {
 
         // Put data into buffer
         for (KeyValue kv : testData) {
-            binaryBuffer.put(kv);
+            buffer.put(kv);
         }
 
         // Verify data through iterator
-        try (CloseableIterator<KeyValue> iterator = binaryBuffer.iterator()) {
+        try (CloseableIterator<KeyValue> iterator = buffer.iterator()) {
             int count = 0;
             while (iterator.hasNext()) {
                 KeyValue kv = iterator.next();
@@ -152,5 +165,25 @@ public class KeyValueBufferTest {
             }
             assertThat(count).isEqualTo(testData.size());
         }
+
+        // Verify data through iterator without hasNext
+        try (CloseableIterator<KeyValue> iterator = buffer.iterator()) {
+            int count = 0;
+            while (true) {
+                KeyValue kv;
+                try {
+                    kv = iterator.next();
+                } catch (NoSuchElementException e) {
+                    break;
+                }
+                KeyValue expected = testData.get(count);
+                
assertThat(kv.key().getInt(0)).isEqualTo(expected.key().getInt(0));
+                
assertThat(kv.value().getInt(0)).isEqualTo(expected.value().getInt(0));
+                
assertThat(kv.sequenceNumber()).isEqualTo(expected.sequenceNumber());
+                assertThat(kv.valueKind()).isEqualTo(expected.valueKind());
+                count++;
+            }
+            assertThat(count).isEqualTo(testData.size());
+        }
     }
 }

Reply via email to