This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit fff9bfe5bb7c1c2e80460a3e8746100b95c9991e Author: Jingsong Lee <[email protected]> AuthorDate: Wed Nov 12 13:43:38 2025 +0800 [core] Fix spillToBinary in KeyValueBuffer (#6586) (cherry picked from commit 3c3ba1c52253d887017e84e79c10c060938c2c84) --- .../paimon/mergetree/compact/KeyValueBuffer.java | 31 +++++++-- .../mergetree/compact/LookupMergeFunction.java | 11 +-- .../mergetree/compact/KeyValueBufferTest.java | 81 +++++++++++++++------- 3 files changed, 84 insertions(+), 39 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java index 597e2847b5..7e40ddaa86 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java @@ -20,6 +20,7 @@ package org.apache.paimon.mergetree.compact; import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.disk.ExternalBuffer; import org.apache.paimon.disk.IOManager; @@ -40,6 +41,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.NoSuchElementException; +import java.util.function.Supplier; /** A buffer to cache {@link KeyValue}s. */ public interface KeyValueBuffer { @@ -65,6 +67,12 @@ public interface KeyValueBuffer { this.lazyBinaryBuffer = lazyBinaryBuffer; } + @Nullable + @VisibleForTesting + BinaryBuffer binaryBuffer() { + return binaryBuffer; + } + @Override public void reset() { listBuffer.reset(); @@ -89,7 +97,9 @@ public interface KeyValueBuffer { private void spillToBinary() { BinaryBuffer binaryBuffer = lazyBinaryBuffer.get(); try (CloseableIterator<KeyValue> iterator = listBuffer.iterator()) { - binaryBuffer.put(iterator.next()); + while (iterator.hasNext()) { + binaryBuffer.put(iterator.next()); + } } catch (Exception e) { throw new RuntimeException(e); } @@ -162,20 +172,20 @@ public interface KeyValueBuffer { return new CloseableIterator<KeyValue>() { private boolean hasNextWasCalled = false; - private boolean hasNext = false; + private boolean nextResult = false; @Override public boolean hasNext() { if (!hasNextWasCalled) { - hasNext = iterator.advanceNext(); + nextResult = iterator.advanceNext(); hasNextWasCalled = true; } - return hasNext; + return nextResult; } @Override public KeyValue next() { - if (!hasNext) { + if (!hasNext()) { throw new NoSuchElementException(); } hasNextWasCalled = false; @@ -215,6 +225,17 @@ public interface KeyValueBuffer { return new BinaryBuffer(buffer, kvSerializer); } + static HybridBuffer createHybridBuffer( + CoreOptions options, + RowType keyType, + RowType valueType, + @Nullable IOManager ioManager) { + Supplier<BinaryBuffer> binarySupplier = + () -> createBinaryBuffer(options, keyType, valueType, ioManager); + int threshold = options == null ? 1024 : options.lookupMergeRecordsThreshold(); + return new HybridBuffer(threshold, new LazyField<>(binarySupplier)); + } + static void insertInto( KeyValueBuffer buffer, KeyValue highLevel, Comparator<KeyValue> comparator) { List<KeyValue> newCandidates = new ArrayList<>(); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java index bcd0692757..1bd9aaa843 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java @@ -22,17 +22,12 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.mergetree.compact.KeyValueBuffer.BinaryBuffer; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CloseableIterator; -import org.apache.paimon.utils.LazyField; import javax.annotation.Nullable; import java.util.Comparator; -import java.util.function.Supplier; - -import static org.apache.paimon.mergetree.compact.KeyValueBuffer.createBinaryBuffer; /** * A {@link MergeFunction} for lookup, this wrapper only considers the latest high level record, @@ -54,11 +49,7 @@ public class LookupMergeFunction implements MergeFunction<KeyValue> { RowType valueType, @Nullable IOManager ioManager) { this.mergeFunction = mergeFunction; - Supplier<BinaryBuffer> binarySupplier = - () -> createBinaryBuffer(options, keyType, valueType, ioManager); - int threshold = options == null ? 1024 : options.lookupMergeRecordsThreshold(); - this.candidates = - new KeyValueBuffer.HybridBuffer(threshold, new LazyField<>(binarySupplier)); + this.candidates = KeyValueBuffer.createHybridBuffer(options, keyType, valueType, ioManager); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java index e3f485f765..d04015b370 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java @@ -25,10 +25,11 @@ import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.disk.IOManager; import org.apache.paimon.disk.IOManagerImpl; import org.apache.paimon.mergetree.compact.KeyValueBuffer.BinaryBuffer; +import org.apache.paimon.mergetree.compact.KeyValueBuffer.HybridBuffer; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataField; -import org.apache.paimon.types.IntType; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CloseableIterator; @@ -40,7 +41,10 @@ import org.junit.jupiter.api.io.TempDir; import java.util.ArrayList; import java.util.List; +import java.util.NoSuchElementException; +import static java.util.Collections.singletonList; +import static org.apache.paimon.CoreOptions.LOOKUP_MERGE_RECORDS_THRESHOLD; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link KeyValueBuffer}. */ @@ -55,30 +59,14 @@ public class KeyValueBufferTest { @BeforeEach public void beforeEach() { this.ioManager = new IOManagerImpl(tempDir.toString()); - this.keyType = - new RowType( - new ArrayList<DataField>() { - { - add(new DataField(0, "key", new IntType())); - } - }); - this.valueType = - new RowType( - new ArrayList<DataField>() { - { - add(new DataField(0, "value", new IntType())); - } - }); + this.keyType = new RowType(singletonList(new DataField(0, "key", DataTypes.INT()))); + this.valueType = new RowType(singletonList(new DataField(0, "value", DataTypes.INT()))); } @AfterEach - public void afterEach() { + public void afterEach() throws Exception { if (ioManager != null) { - try { - ioManager.close(); - } catch (Exception e) { - // Ignore exception during close - } + ioManager.close(); } } @@ -114,10 +102,35 @@ public class KeyValueBufferTest { BinaryBuffer binaryBuffer = KeyValueBuffer.createBinaryBuffer( new CoreOptions(options), keyType, valueType, ioManager); + innerTestBuffer(binaryBuffer, 10); + } + + @Test + public void testHybridBufferWithoutFallback() throws Exception { + innerTestHybridBuffer(false); + } + + @Test + public void testHybridBufferWithFallback() throws Exception { + innerTestHybridBuffer(true); + } + + private void innerTestHybridBuffer(boolean fallbackToBinary) throws Exception { + Options options = new Options(); + if (fallbackToBinary) { + options.set(LOOKUP_MERGE_RECORDS_THRESHOLD, 100); + } + HybridBuffer buffer = + KeyValueBuffer.createHybridBuffer( + new CoreOptions(options), keyType, valueType, ioManager); + innerTestBuffer(buffer, 200); + assertThat(buffer.binaryBuffer() != null).isEqualTo(fallbackToBinary); + } + private void innerTestBuffer(KeyValueBuffer buffer, int recordNumber) throws Exception { // Create test data List<KeyValue> testData = new ArrayList<>(); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < recordNumber; i++) { // Create key as BinaryRow BinaryRow key = new BinaryRow(1); BinaryRowWriter keyWriter = new BinaryRowWriter(key); @@ -135,11 +148,11 @@ public class KeyValueBufferTest { // Put data into buffer for (KeyValue kv : testData) { - binaryBuffer.put(kv); + buffer.put(kv); } // Verify data through iterator - try (CloseableIterator<KeyValue> iterator = binaryBuffer.iterator()) { + try (CloseableIterator<KeyValue> iterator = buffer.iterator()) { int count = 0; while (iterator.hasNext()) { KeyValue kv = iterator.next(); @@ -152,5 +165,25 @@ public class KeyValueBufferTest { } assertThat(count).isEqualTo(testData.size()); } + + // Verify data through iterator without hasNext + try (CloseableIterator<KeyValue> iterator = buffer.iterator()) { + int count = 0; + while (true) { + KeyValue kv; + try { + kv = iterator.next(); + } catch (NoSuchElementException e) { + break; + } + KeyValue expected = testData.get(count); + assertThat(kv.key().getInt(0)).isEqualTo(expected.key().getInt(0)); + assertThat(kv.value().getInt(0)).isEqualTo(expected.value().getInt(0)); + assertThat(kv.sequenceNumber()).isEqualTo(expected.sequenceNumber()); + assertThat(kv.valueKind()).isEqualTo(expected.valueKind()); + count++; + } + assertThat(count).isEqualTo(testData.size()); + } } }
