This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6e0abf79f8 [core] Introduce external buffer in lookup merging (#6219)
6e0abf79f8 is described below
commit 6e0abf79f834aee3f624d142dd5fb25a1694d533
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Sep 9 14:51:48 2025 +0800
[core] Introduce external buffer in lookup merging (#6219)
---
.../shortcodes/generated/core_configuration.html | 12 +
.../main/java/org/apache/paimon/CoreOptions.java | 20 ++
.../java/org/apache/paimon/options/MemorySize.java | 4 +
.../apache/paimon/memory/UnlimitedSegmentPool.java | 52 +++++
.../paimon/crosspartition/GlobalIndexAssigner.java | 1 -
.../org/apache/paimon/disk/ExternalBuffer.java | 16 +-
.../org/apache/paimon/disk/InMemoryBuffer.java | 6 +-
.../java/org/apache/paimon/disk/RowBuffer.java | 2 -
.../paimon/mergetree/compact/KeyValueBuffer.java | 243 +++++++++++++++++++++
.../LookupChangelogMergeFunctionWrapper.java | 34 +--
.../mergetree/compact/LookupMergeFunction.java | 120 +++++++---
.../paimon/operation/KeyValueFileStoreWrite.java | 13 +-
.../paimon/operation/MergeFileSplitRead.java | 4 +
.../postpone/PostponeBucketFileStoreWrite.java | 11 +
.../paimon/table/PrimaryKeyFileStoreTable.java | 6 +-
.../java/org/apache/paimon/utils/SinkWriter.java | 1 -
.../org/apache/paimon/disk/ExternalBufferTest.java | 1 -
.../org/apache/paimon/disk/InMemoryBufferTest.java | 1 -
.../mergetree/SortBufferWriteBufferTestBase.java | 2 +-
.../compact/KeyValueBufferInsertIntoTest.java | 233 ++++++++++++++++++++
.../mergetree/compact/KeyValueBufferTest.java | 156 +++++++++++++
.../LookupChangelogMergeFunctionWrapperTest.java | 19 +-
.../mergetree/compact/LookupMergeFunctionTest.java | 8 +-
.../compact/LookupMergeFunctionUnitTest.java | 183 ++++++++++++++++
.../mergetree/compact/SortMergeReaderTestBase.java | 3 +-
.../apache/paimon/flink/DeletionVectorITCase.java | 14 ++
26 files changed, 1067 insertions(+), 98 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 80f6d51046..81a4c9990f 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -633,6 +633,18 @@ Mainly to resolve data skew on primary keys. We recommend
starting with 64 mb wh
<td><p>Enum</p></td>
<td>The local file type for lookup.<br /><br />Possible
values:<ul><li>"sort": Construct a sorted file for lookup.</li><li>"hash":
Construct a hash file for lookup.</li></ul></td>
</tr>
+ <tr>
+ <td><h5>lookup.merge-buffer-size</h5></td>
+ <td style="word-wrap: break-word;">8 mb</td>
+ <td>MemorySize</td>
+ <td>Buffer memory size for one key merging in lookup.</td>
+ </tr>
+ <tr>
+ <td><h5>lookup.merge-records-threshold</h5></td>
+ <td style="word-wrap: break-word;">1024</td>
+ <td>Integer</td>
+ <td>Threshold for merging records to binary buffer in lookup.</td>
+ </tr>
<tr>
<td><h5>manifest.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 5d1b168100..914c250151 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1950,6 +1950,18 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether index file in data file
directory.");
+ public static final ConfigOption<MemorySize> LOOKUP_MERGE_BUFFER_SIZE =
+ key("lookup.merge-buffer-size")
+ .memoryType()
+ .defaultValue(MemorySize.VALUE_8_MB)
+ .withDescription("Buffer memory size for one key merging
in lookup.");
+
+ public static final ConfigOption<Integer> LOOKUP_MERGE_RECORDS_THRESHOLD =
+ key("lookup.merge-records-threshold")
+ .intType()
+ .defaultValue(1024)
+ .withDescription("Threshold for merging records to binary
buffer in lookup.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -2989,6 +3001,14 @@ public class CoreOptions implements Serializable {
}
}
+ public long lookupMergeBufferSize() {
+ return options.get(LOOKUP_MERGE_BUFFER_SIZE).getBytes();
+ }
+
+ public int lookupMergeRecordsThreshold() {
+ return options.get(LOOKUP_MERGE_RECORDS_THRESHOLD);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git a/paimon-api/src/main/java/org/apache/paimon/options/MemorySize.java
b/paimon-api/src/main/java/org/apache/paimon/options/MemorySize.java
index 9b5d7d7ae4..3f041776a8 100644
--- a/paimon-api/src/main/java/org/apache/paimon/options/MemorySize.java
+++ b/paimon-api/src/main/java/org/apache/paimon/options/MemorySize.java
@@ -56,6 +56,10 @@ public class MemorySize implements java.io.Serializable,
Comparable<MemorySize>
public static final MemorySize MAX_VALUE = new MemorySize(Long.MAX_VALUE);
+ public static final MemorySize VALUE_32_KB = MemorySize.ofKibiBytes(32);
+
+ public static final MemorySize VALUE_8_MB = MemorySize.ofMebiBytes(8);
+
public static final MemorySize VALUE_128_MB = MemorySize.ofMebiBytes(128);
public static final MemorySize VALUE_256_MB = MemorySize.ofMebiBytes(256);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/UnlimitedSegmentPool.java
b/paimon-common/src/main/java/org/apache/paimon/memory/UnlimitedSegmentPool.java
new file mode 100644
index 0000000000..fb99cd987c
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/memory/UnlimitedSegmentPool.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.memory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** MemorySegment pool from heap with no limit. */
+public class UnlimitedSegmentPool implements MemorySegmentPool {
+
+ private final int pageSize;
+
+ public UnlimitedSegmentPool(int pageSize) {
+ this.pageSize = pageSize;
+ }
+
+ @Override
+ public int pageSize() {
+ return pageSize;
+ }
+
+ @Override
+ public void returnAll(List<MemorySegment> memory) {}
+
+ @Override
+ public int freePages() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Nullable
+ @Override
+ public MemorySegment nextSegment() {
+ return MemorySegment.allocateHeapMemory(pageSize);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index fe14048310..cfda37ea83 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -207,7 +207,6 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
public CloseableIterator<BinaryRow> endBoostrapWithoutEmit(boolean
isEndInput)
throws Exception {
bootstrap = false;
- bootstrapRecords.complete();
boolean isEmpty = true;
if (bootstrapKeys.size() > 0) {
RocksDBBulkLoader bulkLoader = keyIndex.createBulkLoader();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
index 638b152802..138061cc6c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -38,8 +38,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.paimon.utils.Preconditions.checkState;
-
/** An external buffer for storing rows, it will spill the data to disk when
the memory is full. */
public class ExternalBuffer implements RowBuffer {
@@ -58,9 +56,7 @@ public class ExternalBuffer implements RowBuffer {
private final List<ChannelWithMeta> spilledChannelIDs;
private int numRows;
- private boolean addCompleted;
-
- ExternalBuffer(
+ public ExternalBuffer(
IOManager ioManager,
MemorySegmentPool pool,
AbstractRowDataSerializer<?> serializer,
@@ -83,8 +79,6 @@ public class ExternalBuffer implements RowBuffer {
this.numRows = 0;
- this.addCompleted = false;
-
//noinspection unchecked
this.inMemoryBuffer =
new InMemoryBuffer(pool,
(AbstractRowDataSerializer<InternalRow>) serializer);
@@ -95,7 +89,6 @@ public class ExternalBuffer implements RowBuffer {
clearChannels();
inMemoryBuffer.reset();
numRows = 0;
- addCompleted = false;
}
@Override
@@ -120,7 +113,6 @@ public class ExternalBuffer implements RowBuffer {
@Override
public boolean put(InternalRow row) throws IOException {
- checkState(!addCompleted, "This buffer has add completed.");
if (!inMemoryBuffer.put(row)) {
// Check if record is too big.
if (inMemoryBuffer.getCurrentDataBufferOffset() == 0) {
@@ -136,14 +128,8 @@ public class ExternalBuffer implements RowBuffer {
return true;
}
- @Override
- public void complete() {
- addCompleted = true;
- }
-
@Override
public RowBufferIterator newIterator() {
- checkState(addCompleted, "This buffer has not add completed.");
return new BufferIterator();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
index 83c4e423b0..b0c5bf413f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
@@ -50,7 +50,8 @@ public class InMemoryBuffer implements RowBuffer {
private boolean isInitialized;
- InMemoryBuffer(MemorySegmentPool pool,
AbstractRowDataSerializer<InternalRow> serializer) {
+ public InMemoryBuffer(
+ MemorySegmentPool pool, AbstractRowDataSerializer<InternalRow>
serializer) {
// serializer has states, so we must duplicate
this.serializer = (AbstractRowDataSerializer<InternalRow>)
serializer.duplicate();
this.pool = pool;
@@ -109,9 +110,6 @@ public class InMemoryBuffer implements RowBuffer {
return numRecords;
}
- @Override
- public void complete() {}
-
@Override
public long memoryOccupancy() {
return currentDataBufferOffset;
diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
index 7a0e3e2d49..61bfa4ca06 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
@@ -37,8 +37,6 @@ public interface RowBuffer {
long memoryOccupancy();
- void complete();
-
void reset();
boolean flushMemory() throws IOException;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
new file mode 100644
index 0000000000..597e2847b5
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/KeyValueBuffer.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.disk.ExternalBuffer;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.InMemoryBuffer;
+import org.apache.paimon.disk.RowBuffer;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.memory.UnlimitedSegmentPool;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
+import org.apache.paimon.utils.LazyField;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/** A buffer to cache {@link KeyValue}s. */
+public interface KeyValueBuffer {
+
+ void reset();
+
+ void put(KeyValue kv);
+
+ CloseableIterator<KeyValue> iterator();
+
+ /** A {@link KeyValueBuffer} implemented by hybrid. */
+ class HybridBuffer implements KeyValueBuffer {
+
+ private final int threshold;
+ private final ListBuffer listBuffer;
+ private final LazyField<BinaryBuffer> lazyBinaryBuffer;
+
+ private @Nullable BinaryBuffer binaryBuffer;
+
+ public HybridBuffer(int threshold, LazyField<BinaryBuffer>
lazyBinaryBuffer) {
+ this.threshold = threshold;
+ this.listBuffer = new ListBuffer();
+ this.lazyBinaryBuffer = lazyBinaryBuffer;
+ }
+
+ @Override
+ public void reset() {
+ listBuffer.reset();
+ if (binaryBuffer != null) {
+ binaryBuffer.reset();
+ binaryBuffer = null;
+ }
+ }
+
+ @Override
+ public void put(KeyValue kv) {
+ if (binaryBuffer != null) {
+ binaryBuffer.put(kv);
+ } else {
+ listBuffer.put(kv);
+ if (listBuffer.list.size() > threshold) {
+ spillToBinary();
+ }
+ }
+ }
+
+ private void spillToBinary() {
+ BinaryBuffer binaryBuffer = lazyBinaryBuffer.get();
+ try (CloseableIterator<KeyValue> iterator = listBuffer.iterator())
{
+ binaryBuffer.put(iterator.next());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ this.listBuffer.reset();
+ this.binaryBuffer = binaryBuffer;
+ }
+
+ @Override
+ public CloseableIterator<KeyValue> iterator() {
+ if (binaryBuffer != null) {
+ return binaryBuffer.iterator();
+ }
+ return listBuffer.iterator();
+ }
+ }
+
+ /** A {@link KeyValueBuffer} implemented by a list. */
+ class ListBuffer implements KeyValueBuffer {
+
+ private final List<KeyValue> list = new ArrayList<>();
+
+ @Override
+ public CloseableIterator<KeyValue> iterator() {
+ return CloseableIterator.adapterForIterator(list.iterator());
+ }
+
+ @Override
+ public void reset() {
+ list.clear();
+ }
+
+ @Override
+ public void put(KeyValue kv) {
+ list.add(kv);
+ }
+ }
+
+ /** A {@link KeyValueBuffer} implemented by binary with spilling. */
+ class BinaryBuffer implements KeyValueBuffer {
+
+ private final RowBuffer buffer;
+ private final KeyValueWithLevelNoReusingSerializer kvSerializer;
+
+ public BinaryBuffer(RowBuffer buffer,
KeyValueWithLevelNoReusingSerializer kvSerializer) {
+ this.buffer = buffer;
+ this.kvSerializer = kvSerializer;
+ }
+
+ @Override
+ public void reset() {
+ buffer.reset();
+ }
+
+ @Override
+ public void put(KeyValue kv) {
+ try {
+ boolean success = buffer.put(kvSerializer.toRow(kv));
+ if (!success) {
+ throw new RuntimeException("This is a bug!");
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public CloseableIterator<KeyValue> iterator() {
+ @SuppressWarnings("resource")
+ RowBuffer.RowBufferIterator iterator = buffer.newIterator();
+ return new CloseableIterator<KeyValue>() {
+
+ private boolean hasNextWasCalled = false;
+ private boolean hasNext = false;
+
+ @Override
+ public boolean hasNext() {
+ if (!hasNextWasCalled) {
+ hasNext = iterator.advanceNext();
+ hasNextWasCalled = true;
+ }
+ return hasNext;
+ }
+
+ @Override
+ public KeyValue next() {
+ if (!hasNext) {
+ throw new NoSuchElementException();
+ }
+ hasNextWasCalled = false;
+ return kvSerializer.fromRow(iterator.getRow().copy());
+ }
+
+ @Override
+ public void close() {
+ iterator.close();
+ }
+ };
+ }
+ }
+
+ static BinaryBuffer createBinaryBuffer(
+ CoreOptions options,
+ RowType keyType,
+ RowType valueType,
+ @Nullable IOManager ioManager) {
+ KeyValueWithLevelNoReusingSerializer kvSerializer =
+ new KeyValueWithLevelNoReusingSerializer(keyType, valueType);
+ MemorySegmentPool pool =
+ ioManager == null
+ ? new UnlimitedSegmentPool(options.pageSize())
+ : new HeapMemorySegmentPool(
+ options.lookupMergeBufferSize(),
options.pageSize());
+ InternalRowSerializer serializer = new
InternalRowSerializer(kvSerializer.fieldTypes());
+ RowBuffer buffer =
+ ioManager == null
+ ? new InMemoryBuffer(pool, serializer)
+ : new ExternalBuffer(
+ ioManager,
+ pool,
+ serializer,
+ options.writeBufferSpillDiskSize(),
+ options.spillCompressOptions());
+ return new BinaryBuffer(buffer, kvSerializer);
+ }
+
+ static void insertInto(
+ KeyValueBuffer buffer, KeyValue highLevel, Comparator<KeyValue>
comparator) {
+ List<KeyValue> newCandidates = new ArrayList<>();
+ try (CloseableIterator<KeyValue> iterator = buffer.iterator()) {
+ while (iterator.hasNext()) {
+ KeyValue candidate = iterator.next();
+ if (highLevel != null && comparator.compare(highLevel,
candidate) < 0) {
+ newCandidates.add(highLevel);
+ newCandidates.add(candidate);
+ highLevel = null;
+ } else {
+ newCandidates.add(candidate);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (highLevel != null) {
+ newCandidates.add(highLevel);
+ }
+ buffer.reset();
+ for (KeyValue kv : newCandidates) {
+ buffer.put(kv);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index 6c9376ae33..3fccca9b5a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -30,10 +30,7 @@ import org.apache.paimon.utils.UserDefinedSeqComparator;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
import java.util.function.Function;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -106,7 +103,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
public ChangelogResult getResult() {
// 1. Find the latest high level record and compute containLevel0
KeyValue highLevel = mergeFunction.pickHighLevel();
- boolean containLevel0 = containLevel0();
+ boolean containLevel0 = mergeFunction.containLevel0();
// 2. Lookup if latest high level record is absent
if (highLevel == null) {
@@ -122,7 +119,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
}
}
if (highLevel != null) {
- insertInto(mergeFunction.candidates(), highLevel);
+ mergeFunction.insertInto(highLevel, comparator);
}
}
@@ -138,33 +135,6 @@ public class LookupChangelogMergeFunctionWrapper<T>
return reusedResult.setResult(result);
}
- public boolean containLevel0() {
- for (KeyValue kv : mergeFunction.candidates()) {
- if (kv.level() == 0) {
- return true;
- }
- }
- return false;
- }
-
- private void insertInto(LinkedList<KeyValue> candidates, KeyValue
highLevel) {
- List<KeyValue> newCandidates = new ArrayList<>();
- for (KeyValue candidate : candidates) {
- if (highLevel != null && comparator.compare(highLevel, candidate)
< 0) {
- newCandidates.add(highLevel);
- newCandidates.add(candidate);
- highLevel = null;
- } else {
- newCandidates.add(candidate);
- }
- }
- if (highLevel != null) {
- newCandidates.add(highLevel);
- }
- candidates.clear();
- candidates.addAll(newCandidates);
- }
-
private void setChangelog(@Nullable KeyValue before, KeyValue after) {
if (before == null || !before.isAdd()) {
if (after.isAdd()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 1537677ffc..bcd0692757 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -18,12 +18,21 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.mergetree.compact.KeyValueBuffer.BinaryBuffer;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.LazyField;
import javax.annotation.Nullable;
-import java.util.LinkedList;
+import java.util.Comparator;
+import java.util.function.Supplier;
+
+import static
org.apache.paimon.mergetree.compact.KeyValueBuffer.createBinaryBuffer;
/**
* A {@link MergeFunction} for lookup, this wrapper only considers the latest
high level record,
@@ -33,58 +42,91 @@ import java.util.LinkedList;
public class LookupMergeFunction implements MergeFunction<KeyValue> {
private final MergeFunction<KeyValue> mergeFunction;
- private final LinkedList<KeyValue> candidates = new LinkedList<>();
- public LookupMergeFunction(MergeFunction<KeyValue> mergeFunction) {
+ private final KeyValueBuffer candidates;
+ private boolean containLevel0;
+ private InternalRow currentKey;
+
+ public LookupMergeFunction(
+ MergeFunction<KeyValue> mergeFunction,
+ CoreOptions options,
+ RowType keyType,
+ RowType valueType,
+ @Nullable IOManager ioManager) {
this.mergeFunction = mergeFunction;
+ Supplier<BinaryBuffer> binarySupplier =
+ () -> createBinaryBuffer(options, keyType, valueType,
ioManager);
+ int threshold = options == null ? 1024 :
options.lookupMergeRecordsThreshold();
+ this.candidates =
+ new KeyValueBuffer.HybridBuffer(threshold, new
LazyField<>(binarySupplier));
}
@Override
public void reset() {
- candidates.clear();
+ candidates.reset();
+ currentKey = null;
+ containLevel0 = false;
}
@Override
public void add(KeyValue kv) {
- candidates.add(kv);
+ currentKey = kv.key();
+ if (kv.level() == 0) {
+ containLevel0 = true;
+ }
+ candidates.put(kv);
+ }
+
+ public boolean containLevel0() {
+ return containLevel0;
}
@Nullable
public KeyValue pickHighLevel() {
KeyValue highLevel = null;
- for (KeyValue kv : candidates) {
- // records that has not been stored on the disk yet, such as the
data in the write
- // buffer being at level -1
- if (kv.level() <= 0) {
- continue;
- }
- // For high-level comparison logic (not involving Level 0), only
the value of the
- // minimum Level should be selected
- if (highLevel == null || kv.level() < highLevel.level()) {
- highLevel = kv;
+ try (CloseableIterator<KeyValue> iterator = candidates.iterator()) {
+ while (iterator.hasNext()) {
+ KeyValue kv = iterator.next();
+ // records that has not been stored on the disk yet, such as
the data in the write
+ // buffer being at level -1
+ if (kv.level() <= 0) {
+ continue;
+ }
+ // For high-level comparison logic (not involving Level 0),
only the value of the
+ // minimum Level should be selected
+ if (highLevel == null || kv.level() < highLevel.level()) {
+ highLevel = kv;
+ }
}
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
return highLevel;
}
public InternalRow key() {
- return candidates.get(0).key();
+ return currentKey;
}
- public LinkedList<KeyValue> candidates() {
- return candidates;
+ public void insertInto(KeyValue highLevel, Comparator<KeyValue>
comparator) {
+ KeyValueBuffer.insertInto(candidates, highLevel, comparator);
}
@Override
public KeyValue getResult() {
mergeFunction.reset();
KeyValue highLevel = pickHighLevel();
- for (KeyValue kv : candidates) {
- // records that has not been stored on the disk yet, such as the
data in the write
- // buffer being at level -1
- if (kv.level() <= 0 || kv == highLevel) {
- mergeFunction.add(kv);
+ try (CloseableIterator<KeyValue> iterator = candidates.iterator()) {
+ while (iterator.hasNext()) {
+ KeyValue kv = iterator.next();
+ // records that has not been stored on the disk yet, such as
the data in the write
+ // buffer being at level -1
+ if (kv.level() <= 0 || kv == highLevel) {
+ mergeFunction.add(kv);
+ }
}
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
return mergeFunction.getResult();
}
@@ -94,28 +136,50 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
return true;
}
- public static MergeFunctionFactory<KeyValue>
wrap(MergeFunctionFactory<KeyValue> wrapped) {
+ public static MergeFunctionFactory<KeyValue> wrap(
+ MergeFunctionFactory<KeyValue> wrapped,
+ CoreOptions options,
+ RowType keyType,
+ RowType valueType) {
if (wrapped.create() instanceof FirstRowMergeFunction) {
// don't wrap first row, it is already OK
return wrapped;
}
- return new Factory(wrapped);
+ return new Factory(wrapped, options, keyType, valueType);
}
- private static class Factory implements MergeFunctionFactory<KeyValue> {
+ /** Factory to create {@link LookupMergeFunction}. */
+ public static class Factory implements MergeFunctionFactory<KeyValue> {
private static final long serialVersionUID = 1L;
private final MergeFunctionFactory<KeyValue> wrapped;
+ private final CoreOptions options;
+ private final RowType keyType;
+ private final RowType valueType;
+
+ private @Nullable IOManager ioManager;
- private Factory(MergeFunctionFactory<KeyValue> wrapped) {
+ private Factory(
+ MergeFunctionFactory<KeyValue> wrapped,
+ CoreOptions options,
+ RowType keyType,
+ RowType valueType) {
this.wrapped = wrapped;
+ this.options = options;
+ this.keyType = keyType;
+ this.valueType = valueType;
+ }
+
+ public void withIOManager(@Nullable IOManager ioManager) {
+ this.ioManager = ioManager;
}
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
- return new LookupMergeFunction(wrapped.create(projection));
+ return new LookupMergeFunction(
+ wrapped.create(projection), options, keyType, valueType,
ioManager);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index afd3fb906f..24b08f06c1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -31,6 +31,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
@@ -54,6 +55,7 @@ import org.apache.paimon.mergetree.compact.CompactStrategy;
import org.apache.paimon.mergetree.compact.ForceUpLevel0Compaction;
import
org.apache.paimon.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
import org.apache.paimon.mergetree.compact.FullCompactTrigger;
+import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter;
import
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.FirstRowMergeFunctionWrapperFactory;
import
org.apache.paimon.mergetree.compact.LookupMergeTreeCompactRewriter.LookupMergeFunctionWrapperFactory;
@@ -106,7 +108,6 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private final Supplier<RecordEqualiser> logDedupEqualSupplier;
private final MergeFunctionFactory<KeyValue> mfFactory;
private final CoreOptions options;
- private final FileIO fileIO;
private final RowType keyType;
private final RowType valueType;
private final RowType partitionType;
@@ -143,7 +144,6 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
dbMaintainerFactory,
dvMaintainerFactory,
tableName);
- this.fileIO = fileIO;
this.partitionType = partitionType;
this.keyType = keyType;
this.valueType = valueType;
@@ -177,6 +177,15 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
this.options = options;
}
+ @Override
+ public KeyValueFileStoreWrite withIOManager(IOManager ioManager) {
+ super.withIOManager(ioManager);
+ if (mfFactory instanceof LookupMergeFunction.Factory) {
+ ((LookupMergeFunction.Factory) mfFactory).withIOManager(ioManager);
+ }
+ return this;
+ }
+
@Override
protected MergeTreeWriter createWriter(
BinaryRow partition,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
index 07c9215f7e..08344c08b5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java
@@ -33,6 +33,7 @@ import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.mergetree.compact.IntervalPartition;
+import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import
org.apache.paimon.mergetree.compact.MergeFunctionFactory.AdjustedProjection;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
@@ -186,6 +187,9 @@ public class MergeFileSplitRead implements
SplitRead<KeyValue> {
@Override
public MergeFileSplitRead withIOManager(IOManager ioManager) {
this.mergeSorter.setIOManager(ioManager);
+ if (mfFactory instanceof LookupMergeFunction.Factory) {
+ ((LookupMergeFunction.Factory) mfFactory).withIOManager(ioManager);
+ }
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
index 4978ec6b3d..a6ba8febe5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
+import org.apache.paimon.disk.IOManager;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.avro.AvroSchemaConverter;
import org.apache.paimon.fs.FileIO;
@@ -29,6 +30,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
+import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.FileStoreWrite;
@@ -147,6 +149,15 @@ public class PostponeBucketFileStoreWrite extends
MemoryFileStoreWrite<KeyValue>
withIgnorePreviousFiles(true);
}
+ @Override
+ public PostponeBucketFileStoreWrite withIOManager(IOManager ioManager) {
+ super.withIOManager(ioManager);
+ if (mfFactory instanceof LookupMergeFunction.Factory) {
+ ((LookupMergeFunction.Factory) mfFactory).withIOManager(ioManager);
+ }
+ return this;
+ }
+
@Override
protected void forceBufferSpill() throws Exception {
if (ioManager == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 4901638b65..b17f307f8d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -76,10 +76,12 @@ public class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
KeyValueFieldsExtractor extractor =
PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR;
+ RowType keyType = new RowType(extractor.keyFields(tableSchema));
+
MergeFunctionFactory<KeyValue> mfFactory =
PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema, extractor);
if (options.needLookup()) {
- mfFactory = LookupMergeFunction.wrap(mfFactory);
+ mfFactory = LookupMergeFunction.wrap(mfFactory, options,
keyType, rowType);
}
lazyStore =
@@ -92,7 +94,7 @@ public class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
tableSchema.logicalPartitionType(),
PrimaryKeyTableUtils.addKeyNamePrefix(
tableSchema.logicalBucketKeyType()),
- new RowType(extractor.keyFields(tableSchema)),
+ keyType,
rowType,
extractor,
mfFactory,
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java
index c4596a1df4..439eea707c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java
@@ -172,7 +172,6 @@ public interface SinkWriter<T> {
public List<DataFileMeta> flush() throws IOException {
List<DataFileMeta> flushedFiles = new ArrayList<>();
if (writeBuffer != null) {
- writeBuffer.complete();
RollingFileWriter<T, DataFileMeta> writer =
writerSupplier.get();
IOException exception = null;
try (RowBuffer.RowBufferIterator iterator =
writeBuffer.newIterator()) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
index 130cf32e92..6265258d10 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
@@ -214,7 +214,6 @@ public class ExternalBufferTest {
private List<Long> insertMulti(ExternalBuffer buffer, int cnt) throws
IOException {
ArrayList<Long> expected = new ArrayList<>(cnt);
insertMulti(buffer, cnt, expected);
- buffer.complete();
return expected;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
index 039be29398..ad84ae2f8c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
@@ -198,7 +198,6 @@ public class InMemoryBufferTest {
@Override
public void flushMemory() {
- inMemoryBuffer.complete();
// emulate real-world flushing data to disk, we need to call
newIterator method
inMemoryBuffer.newIterator();
inMemoryBuffer.reset();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index ee34ac2e71..19af07dda6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -266,7 +266,7 @@ public abstract class SortBufferWriteBufferTestBase {
Arrays.asList("f0", "f1"),
Arrays.asList(DataTypes.INT().notNull(),
DataTypes.BIGINT()),
Collections.singletonList("f0"));
- return LookupMergeFunction.wrap(aggMergeFunction).create();
+ return LookupMergeFunction.wrap(aggMergeFunction, null, null,
null).create();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferInsertIntoTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferInsertIntoTest.java
new file mode 100644
index 0000000000..cf85164d95
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferInsertIntoTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.disk.InMemoryBuffer;
+import org.apache.paimon.disk.RowBuffer;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.mergetree.compact.KeyValueBuffer.BinaryBuffer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.KeyValueWithLevelNoReusingSerializer;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link KeyValueBuffer#insertInto(KeyValueBuffer, KeyValue,
Comparator)}. */
+public class KeyValueBufferInsertIntoTest {
+
+ private RowType keyType;
+ private RowType valueType;
+
+ @BeforeEach
+ public void beforeEach() {
+ this.keyType =
+ new RowType(
+ new ArrayList<DataField>() {
+ {
+ add(new DataField(0, "key", new IntType()));
+ }
+ });
+ this.valueType =
+ new RowType(
+ new ArrayList<DataField>() {
+ {
+ add(new DataField(0, "value", new IntType()));
+ }
+ });
+ }
+
+ private KeyValueBuffer createBuffer() {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ KeyValueWithLevelNoReusingSerializer kvSerializer =
+ new KeyValueWithLevelNoReusingSerializer(keyType, valueType);
+ RowBuffer rowBuffer =
+ new InMemoryBuffer(
+ new HeapMemorySegmentPool(8 * 1024, 1024),
+ new InternalRowSerializer(kvSerializer.fieldTypes()));
+ return rnd.nextBoolean() ? createBuffer() : new
BinaryBuffer(rowBuffer, kvSerializer);
+ }
+
+ /**
+ * Test insertInto method with an empty buffer. It should add the
highLevel KeyValue to the
+ * buffer.
+ */
+ @Test
+ public void testInsertIntoEmptyBuffer() {
+ KeyValueBuffer buffer = createBuffer();
+ KeyValue highLevel = createKeyValue(1, 10);
+ Comparator<KeyValue> comparator = Comparator.comparingInt(kv ->
kv.key().getInt(0));
+
+ KeyValueBuffer.insertInto(buffer, highLevel, comparator);
+
+ List<KeyValue> result = collectKeyValues(buffer);
+ assertThat(result).hasSize(1);
+ assertKeyValueEquals(result.get(0), highLevel);
+ }
+
+ /**
+ * Test insertInto method when highLevel should be inserted at the
beginning. The highLevel
+ * KeyValue has a smaller key than all existing entries.
+ */
+ @Test
+ public void testInsertIntoBeginning() {
+ KeyValueBuffer buffer = createBuffer();
+ // Add existing entries to buffer
+ buffer.put(createKeyValue(3, 30));
+ buffer.put(createKeyValue(5, 50));
+
+ KeyValue highLevel = createKeyValue(1, 10);
+ Comparator<KeyValue> comparator = Comparator.comparingInt(kv ->
kv.key().getInt(0));
+
+ KeyValueBuffer.insertInto(buffer, highLevel, comparator);
+
+ List<KeyValue> result = collectKeyValues(buffer);
+ assertThat(result).hasSize(3);
+ assertKeyValueEquals(result.get(0), highLevel); // Should be first
+ assertKeyValueEquals(result.get(1), createKeyValue(3, 30));
+ assertKeyValueEquals(result.get(2), createKeyValue(5, 50));
+ }
+
+ /**
+ * Test insertInto method when highLevel should be inserted in the middle.
The highLevel
+ * KeyValue has a key that fits between existing entries.
+ */
+ @Test
+ public void testInsertIntoMiddle() {
+ KeyValueBuffer buffer = createBuffer();
+ // Add existing entries to buffer
+ buffer.put(createKeyValue(1, 10));
+ buffer.put(createKeyValue(5, 50));
+
+ KeyValue highLevel = createKeyValue(3, 30);
+ Comparator<KeyValue> comparator = Comparator.comparingInt(kv ->
kv.key().getInt(0));
+
+ KeyValueBuffer.insertInto(buffer, highLevel, comparator);
+
+ List<KeyValue> result = collectKeyValues(buffer);
+ assertThat(result).hasSize(3);
+ assertKeyValueEquals(result.get(0), createKeyValue(1, 10));
+ assertKeyValueEquals(result.get(1), highLevel); // Should be in the
middle
+ assertKeyValueEquals(result.get(2), createKeyValue(5, 50));
+ }
+
+ /**
+ * Test insertInto method when highLevel should be inserted at the end.
The highLevel KeyValue
+ * has a larger key than all existing entries.
+ */
+ @Test
+ public void testInsertIntoEnd() {
+ KeyValueBuffer buffer = createBuffer();
+ // Add existing entries to buffer
+ buffer.put(createKeyValue(1, 10));
+ buffer.put(createKeyValue(3, 30));
+
+ KeyValue highLevel = createKeyValue(5, 50);
+ Comparator<KeyValue> comparator = Comparator.comparingInt(kv ->
kv.key().getInt(0));
+
+ KeyValueBuffer.insertInto(buffer, highLevel, comparator);
+
+ List<KeyValue> result = collectKeyValues(buffer);
+ assertThat(result).hasSize(3);
+ assertKeyValueEquals(result.get(0), createKeyValue(1, 10));
+ assertKeyValueEquals(result.get(1), createKeyValue(3, 30));
+ assertKeyValueEquals(result.get(2), highLevel); // Should be last
+ }
+
+ /**
+ * Test insertInto method with a buffer that has multiple entries with the
same key. It should
+ * insert highLevel in the correct position based on the comparator.
+ */
+ @Test
+ public void testInsertIntoWithDuplicateKeys() {
+ KeyValueBuffer buffer = createBuffer();
+ // Add existing entries to buffer
+ buffer.put(createKeyValue(1, 10));
+ buffer.put(createKeyValue(3, 30));
+ buffer.put(createKeyValue(3, 35));
+ buffer.put(createKeyValue(5, 50));
+
+ KeyValue highLevel = createKeyValue(3, 25); // Same key as existing
entries
+ Comparator<KeyValue> comparator =
+ Comparator.comparingInt((KeyValue kv) -> kv.key().getInt(0))
+ .thenComparingLong(KeyValue::sequenceNumber);
+
+ KeyValueBuffer.insertInto(buffer, highLevel, comparator);
+
+ List<KeyValue> result = collectKeyValues(buffer);
+ assertThat(result).hasSize(5);
+ assertKeyValueEquals(result.get(0), createKeyValue(1, 10));
+ assertKeyValueEquals(result.get(1), createKeyValue(3, 30));
+ assertKeyValueEquals(result.get(2), createKeyValue(3, 35));
+ assertKeyValueEquals(result.get(3), highLevel);
+ assertKeyValueEquals(result.get(4), createKeyValue(5, 50));
+ }
+
+ // Helper method to create a KeyValue for testing
+ private KeyValue createKeyValue(int key, int value) {
+ // Create key as BinaryRow
+ BinaryRow binaryKey = new BinaryRow(1);
+ BinaryRowWriter keyWriter = new BinaryRowWriter(binaryKey);
+ keyWriter.writeInt(0, key);
+ keyWriter.complete();
+
+ // Create value as BinaryRow
+ BinaryRow binaryValue = new BinaryRow(1);
+ BinaryRowWriter valueWriter = new BinaryRowWriter(binaryValue);
+ valueWriter.writeInt(0, value);
+ valueWriter.complete();
+
+ return new KeyValue().replace(binaryKey, key, RowKind.INSERT,
binaryValue);
+ }
+
+ // Helper method to collect all KeyValues from a buffer
+ private List<KeyValue> collectKeyValues(KeyValueBuffer buffer) {
+ List<KeyValue> result = new ArrayList<>();
+ try (CloseableIterator<KeyValue> iterator = buffer.iterator()) {
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return result;
+ }
+
+ // Helper method to assert that two KeyValues are equal
+ private void assertKeyValueEquals(KeyValue actual, KeyValue expected) {
+ assertThat(actual.key().getInt(0)).isEqualTo(expected.key().getInt(0));
+
assertThat(actual.value().getInt(0)).isEqualTo(expected.value().getInt(0));
+
assertThat(actual.sequenceNumber()).isEqualTo(expected.sequenceNumber());
+ assertThat(actual.valueKind()).isEqualTo(expected.valueKind());
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
new file mode 100644
index 0000000000..e3f485f765
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/KeyValueBufferTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.mergetree.compact.KeyValueBuffer.BinaryBuffer;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link KeyValueBuffer}. */
+public class KeyValueBufferTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private IOManager ioManager;
+ private RowType keyType;
+ private RowType valueType;
+
+ @BeforeEach
+ public void beforeEach() {
+ this.ioManager = new IOManagerImpl(tempDir.toString());
+ this.keyType =
+ new RowType(
+ new ArrayList<DataField>() {
+ {
+ add(new DataField(0, "key", new IntType()));
+ }
+ });
+ this.valueType =
+ new RowType(
+ new ArrayList<DataField>() {
+ {
+ add(new DataField(0, "value", new IntType()));
+ }
+ });
+ }
+
+ @AfterEach
+ public void afterEach() {
+ if (ioManager != null) {
+ try {
+ ioManager.close();
+ } catch (Exception e) {
+ // Ignore exception during close
+ }
+ }
+ }
+
+ @Test
+ public void testCreateBinaryBufferWithIOManager() {
+ Options options = new Options();
+ options.set(CoreOptions.LOOKUP_MERGE_BUFFER_SIZE,
MemorySize.ofMebiBytes(1L));
+
+ BinaryBuffer binaryBuffer =
+ KeyValueBuffer.createBinaryBuffer(
+ new CoreOptions(options), keyType, valueType,
ioManager);
+
+ assertThat(binaryBuffer).isNotNull();
+ }
+
+ @Test
+ public void testCreateBinaryBufferWithoutIOManager() {
+ Options options = new Options();
+ options.set(CoreOptions.LOOKUP_MERGE_BUFFER_SIZE,
MemorySize.ofMebiBytes(1L));
+
+ BinaryBuffer binaryBuffer =
+ KeyValueBuffer.createBinaryBuffer(
+ new CoreOptions(options), keyType, valueType, null);
+
+ assertThat(binaryBuffer).isNotNull();
+ }
+
+ @Test
+ public void testBinaryBufferPutAndIterator() throws Exception {
+ Options options = new Options();
+ options.set(CoreOptions.LOOKUP_MERGE_BUFFER_SIZE,
MemorySize.ofMebiBytes(1L));
+
+ BinaryBuffer binaryBuffer =
+ KeyValueBuffer.createBinaryBuffer(
+ new CoreOptions(options), keyType, valueType,
ioManager);
+
+ // Create test data
+ List<KeyValue> testData = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ // Create key as BinaryRow
+ BinaryRow key = new BinaryRow(1);
+ BinaryRowWriter keyWriter = new BinaryRowWriter(key);
+ keyWriter.writeInt(0, i);
+ keyWriter.complete();
+
+ // Create value as BinaryRow
+ BinaryRow value = new BinaryRow(1);
+ BinaryRowWriter valueWriter = new BinaryRowWriter(value);
+ valueWriter.writeInt(0, i * 2);
+ valueWriter.complete();
+
+ testData.add(new KeyValue().replace(key, i, RowKind.INSERT,
value));
+ }
+
+ // Put data into buffer
+ for (KeyValue kv : testData) {
+ binaryBuffer.put(kv);
+ }
+
+ // Verify data through iterator
+ try (CloseableIterator<KeyValue> iterator = binaryBuffer.iterator()) {
+ int count = 0;
+ while (iterator.hasNext()) {
+ KeyValue kv = iterator.next();
+ KeyValue expected = testData.get(count);
+
assertThat(kv.key().getInt(0)).isEqualTo(expected.key().getInt(0));
+
assertThat(kv.value().getInt(0)).isEqualTo(expected.value().getInt(0));
+
assertThat(kv.sequenceNumber()).isEqualTo(expected.sequenceNumber());
+ assertThat(kv.valueKind()).isEqualTo(expected.valueKind());
+ count++;
+ }
+ assertThat(count).isEqualTo(testData.size());
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 5341c6db69..57d99557ca 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -66,7 +66,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
Map<InternalRow, KeyValue> highLevel = new HashMap<>();
LookupChangelogMergeFunctionWrapper function =
new LookupChangelogMergeFunctionWrapper(
-
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
+ LookupMergeFunction.wrap(
+ DeduplicateMergeFunction.factory(), null,
null, null),
highLevel::get,
changelogRowDeduplicate ? EQUALISER : null,
LookupStrategy.from(false, true, false, false),
@@ -228,7 +229,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
ValueEqualiserSupplier.fromIgnoreFields(valueType,
ignoreFields);
LookupChangelogMergeFunctionWrapper function =
new LookupChangelogMergeFunctionWrapper(
-
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
+ LookupMergeFunction.wrap(
+ DeduplicateMergeFunction.factory(), null,
null, null),
highLevel::get,
logDedupEqualSupplier.get(),
LookupStrategy.from(false, true, false, false),
@@ -288,7 +290,10 @@ public class LookupChangelogMergeFunctionWrapperTest {
.create(DataTypes.INT(), null, null)
},
false,
- null)),
+ null),
+ null,
+ null,
+ null),
key -> null,
changelogRowDeduplicate ? EQUALISER : null,
LookupStrategy.from(false, true, false, false),
@@ -377,7 +382,10 @@ public class LookupChangelogMergeFunctionWrapperTest {
.create(DataTypes.INT(), null, null)
},
false,
- null)),
+ null),
+ null,
+ null,
+ null),
highLevel::get,
null,
LookupStrategy.from(false, true, false, false),
@@ -514,7 +522,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
Map<InternalRow, KeyValue> highLevel = new HashMap<>();
LookupChangelogMergeFunctionWrapper function =
new LookupChangelogMergeFunctionWrapper(
-
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()),
+ LookupMergeFunction.wrap(
+ DeduplicateMergeFunction.factory(), null,
null, null),
highLevel::get,
null,
LookupStrategy.from(false, true, false, false),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
index 03b201ca2f..71ac78259d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionTest.java
@@ -32,7 +32,9 @@ class LookupMergeFunctionTest {
public void testKeepLowestHighLevel() {
LookupMergeFunction function =
(LookupMergeFunction)
-
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()).create();
+ LookupMergeFunction.wrap(
+ DeduplicateMergeFunction.factory(),
null, null, null)
+ .create();
function.reset();
function.add(new KeyValue().replace(row(1), 1, INSERT,
row(2)).setLevel(1));
function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(2));
@@ -45,7 +47,9 @@ class LookupMergeFunctionTest {
public void testLevelNegative() {
LookupMergeFunction function =
(LookupMergeFunction)
-
LookupMergeFunction.wrap(DeduplicateMergeFunction.factory()).create();
+ LookupMergeFunction.wrap(
+ DeduplicateMergeFunction.factory(),
null, null, null)
+ .create();
function.reset();
function.add(new KeyValue().replace(row(1), 1, INSERT,
row(2)).setLevel(-1));
function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(-1));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionUnitTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionUnitTest.java
new file mode 100644
index 0000000000..57fb209c7d
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupMergeFunctionUnitTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static java.util.Collections.singletonList;
+import static org.apache.paimon.CoreOptions.LOOKUP_MERGE_RECORDS_THRESHOLD;
+import static org.apache.paimon.io.DataFileTestUtils.row;
+import static org.apache.paimon.types.RowKind.INSERT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link LookupMergeFunction}. */
+public class LookupMergeFunctionUnitTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private IOManager ioManager;
+
+ @BeforeEach
+ public void beforeEach() {
+ ioManager = new IOManagerImpl(tempDir.toString());
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ if (ioManager != null) {
+ ioManager.close();
+ }
+ }
+
+ // Test that the function correctly identifies when level 0 records are
present
+ @Test
+ public void testContainLevel0() {
+ LookupMergeFunction function = create();
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(1));
+ assertThat(function.containLevel0()).isFalse();
+
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(2)).setLevel(0));
+ assertThat(function.containLevel0()).isTrue();
+ }
+
+ // Test that the function correctly picks the highest level record (lowest
level number)
+ @Test
+ public void testPickHighLevel() {
+ LookupMergeFunction function = create();
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(3)).setLevel(3));
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(2)).setLevel(2));
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(1));
+
+ KeyValue highLevel = function.pickHighLevel();
+ assertThat(highLevel).isNotNull();
+ assertThat(highLevel.level()).isEqualTo(1);
+ assertThat(highLevel.value().getInt(0)).isEqualTo(1);
+ }
+
+ // Test that level 0 and negative level records are ignored when picking
high level records
+ @Test
+ public void testPickHighLevelIgnoreLevel0AndNegative() {
+ LookupMergeFunction function = create();
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(4)).setLevel(0));
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(3)).setLevel(-1));
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(2)).setLevel(2));
+
+ KeyValue highLevel = function.pickHighLevel();
+ assertThat(highLevel).isNotNull();
+ assertThat(highLevel.level()).isEqualTo(2);
+ assertThat(highLevel.value().getInt(0)).isEqualTo(2);
+ }
+
+ // Test getResult with a mix of level records including level 0
+ @Test
+ public void testGetResultWithLevel0() {
+ LookupMergeFunction function = create();
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(1));
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(2)).setLevel(0));
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(3)).setLevel(2));
+
+ KeyValue result = function.getResult();
+ assertThat(result).isNotNull();
+ // Should merge level 0 and level 1 (highest level record)
+ assertThat(result.value().getInt(0)).isEqualTo(2); // Value from level 0
+ }
+
+ // Test getResult with only negative level records
+ @Test
+ public void testGetResultWithNegativeLevels() {
+ LookupMergeFunction function = create();
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(2)).setLevel(-1));
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(-1));
+
+ KeyValue result = function.getResult();
+ assertThat(result).isNotNull();
+ // Should merge all negative level records
+ assertThat(result.value().getInt(0)).isEqualTo(1); // Value from the
last added record
+ }
+
+ // Test that the function correctly stores and returns the current key
+ @Test
+ public void testKey() {
+ LookupMergeFunction function = create();
+ function.reset();
+ BinaryRow key = row(1);
+ function.add(new KeyValue().replace(key, 1, INSERT,
row(1)).setLevel(1));
+
+ BinaryRow currentKey = (BinaryRow) function.key();
+ assertThat(currentKey).isNotNull();
+ assertThat(currentKey.getInt(0)).isEqualTo(1);
+ }
+
+ @Test
+ public void testBinaryBuffer() {
+ innerTestBinaryBuffer(false);
+ }
+
+ @Test
+ public void testSpillBinaryBuffer() {
+ innerTestBinaryBuffer(true);
+ }
+
+ private void innerTestBinaryBuffer(boolean spill) {
+ Options options = new Options();
+ options.set(LOOKUP_MERGE_RECORDS_THRESHOLD, 3);
+ LookupMergeFunction function = create(options, spill ? ioManager :
null);
+ function.reset();
+ for (int i = 5; i >= 0; i--) {
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(i)).setLevel(i));
+ }
+ KeyValue result = function.getResult();
+ assertThat(result).isNotNull();
+ assertThat(result.value().getInt(0)).isEqualTo(0);
+ }
+
+ private LookupMergeFunction create() {
+ return create(new Options(), null);
+ }
+
+ private LookupMergeFunction create(Options options, IOManager ioManager) {
+ LookupMergeFunction.Factory factory =
+ (LookupMergeFunction.Factory)
+ LookupMergeFunction.wrap(
+ DeduplicateMergeFunction.factory(),
+ new CoreOptions(options),
+ new RowType(singletonList(new DataField(0,
"k", DataTypes.INT()))),
+ new RowType(singletonList(new DataField(0,
"v", DataTypes.INT()))));
+ factory.withIOManager(ioManager);
+ return (LookupMergeFunction) factory.create();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index 50eebd5007..83c9cd54eb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -122,7 +122,8 @@ public abstract class SortMergeReaderTestBase extends
CombiningRecordReaderTestB
@Override
protected MergeFunction<KeyValue> createMergeFunction() {
- return new LookupMergeFunction(new FirstRowMergeFunction(false));
+ return new LookupMergeFunction(
+ new FirstRowMergeFunction(false), null, null, null, null);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index a524c3633d..36e82a782c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -480,4 +480,18 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
assertThat(inExternalPath).contains("bucket-0/index-");
assertThat(inExternalPath).doesNotContain("index/index-");
}
+
+ @Test
+ public void testLookupMergeBufferSize() {
+ sql(
+ "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING)
"
+ + "WITH ('deletion-vectors.enabled' = 'true',
'lookup.merge-records-threshold' = '2')");
+ for (int i = 0; i < 5; i++) {
+ sql(
+ String.format(
+ "INSERT INTO T /*+ OPTIONS('write-only' = '%s') */
VALUES (1, '%s')",
+ i != 4, i));
+ }
+ assertThat(sql("SELECT * FROM T")).containsExactly(Row.of(1,
String.valueOf(4)));
+ }
}