This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3c3ba1c522 [core] Fix spillToBinary in KeyValueBuffer (#6586)
3c3ba1c522 is described below
commit 3c3ba1c52253d887017e84e79c10c060938c2c84
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 12 13:43:38 2025 +0800
[core] Fix spillToBinary in KeyValueBuffer (#6586)
---
.../paimon/mergetree/compact/KeyValueBuffer.java | 31 +++++++--
.../mergetree/compact/LookupMergeFunction.java | 11 +--
.../mergetree/compact/KeyValueBufferTest.java | 81 +++++++++++++++-------
3 files changed, 84 insertions(+), 39 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
index 597e2847b5..7e40ddaa86 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
@@ -20,6 +20,7 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.ExternalBuffer;
import org.apache.paimon.disk.IOManager;
@@ -40,6 +41,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.function.Supplier;
/** A buffer to cache {@link KeyValue}s. */
public interface KeyValueBuffer {
@@ -65,6 +67,12 @@ public interface KeyValueBuffer {
this.lazyBinaryBuffer = lazyBinaryBuffer;
}
+ @Nullable
+ @VisibleForTesting
+ BinaryBuffer binaryBuffer() {
+ return binaryBuffer;
+ }
+
@Override
public void reset() {
listBuffer.reset();
@@ -89,7 +97,9 @@ public interface KeyValueBuffer {
private void spillToBinary() {
BinaryBuffer binaryBuffer = lazyBinaryBuffer.get();
try (CloseableIterator<KeyValue> iterator = listBuffer.iterator())
{
- binaryBuffer.put(iterator.next());
+ while (iterator.hasNext()) {
+ binaryBuffer.put(iterator.next());
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -162,20 +172,20 @@ public interface KeyValueBuffer {
return new CloseableIterator<KeyValue>() {
private boolean hasNextWasCalled = false;
- private boolean hasNext = false;
+ private boolean nextResult = false;
@Override
public boolean hasNext() {
if (!hasNextWasCalled) {
- hasNext = iterator.advanceNext();
+ nextResult = iterator.advanceNext();
hasNextWasCalled = true;
}
- return hasNext;
+ return nextResult;
}
@Override
public KeyValue next() {
- if (!hasNext) {
+ if (!hasNext()) {
throw new NoSuchElementException();
}
hasNextWasCalled = false;
@@ -215,6 +225,17 @@ public interface KeyValueBuffer {
return new BinaryBuffer(buffer, kvSerializer);
}
+ static HybridBuffer createHybridBuffer(
+ CoreOptions options,
+ RowType keyType,
+ RowType valueType,
+ @Nullable IOManager ioManager) {
+ Supplier<BinaryBuffer> binarySupplier =
+ () -> createBinaryBuffer(options, keyType, valueType,
ioManager);
+ int threshold = options == null ? 1024 :
options.lookupMergeRecordsThreshold();
+ return new HybridBuffer(threshold, new LazyField<>(binarySupplier));
+ }
+
static void insertInto(
KeyValueBuffer buffer, KeyValue highLevel, Comparator<KeyValue>
comparator) {
List<KeyValue> newCandidates = new ArrayList<>();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index bcd0692757..1bd9aaa843 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -22,17 +22,12 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.mergetree.compact.KeyValueBuffer.BinaryBuffer;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CloseableIterator;
-import org.apache.paimon.utils.LazyField;
import javax.annotation.Nullable;
import java.util.Comparator;
-import java.util.function.Supplier;
-
-import static
org.apache.paimon.mergetree.compact.KeyValueBuffer.createBinaryBuffer;
/**
* A {@link MergeFunction} for lookup, this wrapper only considers the latest
high level record,
@@ -54,11 +49,7 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
RowType valueType,
@Nullable IOManager ioManager) {
this.mergeFunction = mergeFunction;
- Supplier<BinaryBuffer> binarySupplier =
- () -> createBinaryBuffer(options, keyType, valueType,
ioManager);
- int threshold = options == null ? 1024 :
options.lookupMergeRecordsThreshold();
- this.candidates =
- new KeyValueBuffer.HybridBuffer(threshold, new
LazyField<>(binarySupplier));
+ this.candidates = KeyValueBuffer.createHybridBuffer(options, keyType,
valueType, ioManager);
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
index e3f485f765..d04015b370 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
@@ -25,10 +25,11 @@ import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.mergetree.compact.KeyValueBuffer.BinaryBuffer;
+import org.apache.paimon.mergetree.compact.KeyValueBuffer.HybridBuffer;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CloseableIterator;
@@ -40,7 +41,10 @@ import org.junit.jupiter.api.io.TempDir;
import java.util.ArrayList;
import java.util.List;
+import java.util.NoSuchElementException;
+import static java.util.Collections.singletonList;
+import static org.apache.paimon.CoreOptions.LOOKUP_MERGE_RECORDS_THRESHOLD;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link KeyValueBuffer}. */
@@ -55,30 +59,14 @@ public class KeyValueBufferTest {
@BeforeEach
public void beforeEach() {
this.ioManager = new IOManagerImpl(tempDir.toString());
- this.keyType =
- new RowType(
- new ArrayList<DataField>() {
- {
- add(new DataField(0, "key", new IntType()));
- }
- });
- this.valueType =
- new RowType(
- new ArrayList<DataField>() {
- {
- add(new DataField(0, "value", new IntType()));
- }
- });
+ this.keyType = new RowType(singletonList(new DataField(0, "key",
DataTypes.INT())));
+ this.valueType = new RowType(singletonList(new DataField(0, "value",
DataTypes.INT())));
}
@AfterEach
- public void afterEach() {
+ public void afterEach() throws Exception {
if (ioManager != null) {
- try {
- ioManager.close();
- } catch (Exception e) {
- // Ignore exception during close
- }
+ ioManager.close();
}
}
@@ -114,10 +102,35 @@ public class KeyValueBufferTest {
BinaryBuffer binaryBuffer =
KeyValueBuffer.createBinaryBuffer(
new CoreOptions(options), keyType, valueType,
ioManager);
+ innerTestBuffer(binaryBuffer, 10);
+ }
+
+ @Test
+ public void testHybridBufferWithoutFallback() throws Exception {
+ innerTestHybridBuffer(false);
+ }
+
+ @Test
+ public void testHybridBufferWithFallback() throws Exception {
+ innerTestHybridBuffer(true);
+ }
+
+ private void innerTestHybridBuffer(boolean fallbackToBinary) throws
Exception {
+ Options options = new Options();
+ if (fallbackToBinary) {
+ options.set(LOOKUP_MERGE_RECORDS_THRESHOLD, 100);
+ }
+ HybridBuffer buffer =
+ KeyValueBuffer.createHybridBuffer(
+ new CoreOptions(options), keyType, valueType,
ioManager);
+ innerTestBuffer(buffer, 200);
+ assertThat(buffer.binaryBuffer() != null).isEqualTo(fallbackToBinary);
+ }
+ private void innerTestBuffer(KeyValueBuffer buffer, int recordNumber)
throws Exception {
// Create test data
List<KeyValue> testData = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < recordNumber; i++) {
// Create key as BinaryRow
BinaryRow key = new BinaryRow(1);
BinaryRowWriter keyWriter = new BinaryRowWriter(key);
@@ -135,11 +148,11 @@ public class KeyValueBufferTest {
// Put data into buffer
for (KeyValue kv : testData) {
- binaryBuffer.put(kv);
+ buffer.put(kv);
}
// Verify data through iterator
- try (CloseableIterator<KeyValue> iterator = binaryBuffer.iterator()) {
+ try (CloseableIterator<KeyValue> iterator = buffer.iterator()) {
int count = 0;
while (iterator.hasNext()) {
KeyValue kv = iterator.next();
@@ -152,5 +165,25 @@ public class KeyValueBufferTest {
}
assertThat(count).isEqualTo(testData.size());
}
+
+ // Verify data through iterator without hasNext
+ try (CloseableIterator<KeyValue> iterator = buffer.iterator()) {
+ int count = 0;
+ while (true) {
+ KeyValue kv;
+ try {
+ kv = iterator.next();
+ } catch (NoSuchElementException e) {
+ break;
+ }
+ KeyValue expected = testData.get(count);
+
assertThat(kv.key().getInt(0)).isEqualTo(expected.key().getInt(0));
+
assertThat(kv.value().getInt(0)).isEqualTo(expected.value().getInt(0));
+
assertThat(kv.sequenceNumber()).isEqualTo(expected.sequenceNumber());
+ assertThat(kv.valueKind()).isEqualTo(expected.valueKind());
+ count++;
+ }
+ assertThat(count).isEqualTo(testData.size());
+ }
}
}