This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 385be6551 [core][flink] Introduce HashMapLocalMerger to
'local-merge-buffer-size' (#4492)
385be6551 is described below
commit 385be6551ed7a77e5cee42989b275e46184ab2eb
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 11 19:45:13 2024 +0800
[core][flink] Introduce HashMapLocalMerger to 'local-merge-buffer-size'
(#4492)
---
.../java/org/apache/paimon/data/BinaryRow.java | 32 ++
.../java/org/apache/paimon/data/InternalRow.java | 94 +++++
.../data/serializer/BinaryRowSerializer.java | 5 +
.../data/serializer/InternalRowSerializer.java | 5 +
.../data/serializer/PagedTypeSerializer.java | 3 +
.../java/org/apache/paimon/hash/BytesHashMap.java | 362 ++++++++++++++++++++
.../main/java/org/apache/paimon/hash/BytesMap.java | 379 +++++++++++++++++++++
.../mergetree/localmerge/HashMapLocalMerger.java | 133 ++++++++
.../paimon/mergetree/localmerge/LocalMerger.java | 38 +++
.../localmerge/SortBufferLocalMerger.java | 78 +++++
.../org/apache/paimon/hash/BytesHashMapTest.java | 44 +++
.../apache/paimon/hash/BytesHashMapTestBase.java | 373 ++++++++++++++++++++
.../org/apache/paimon/hash/BytesMapTestBase.java | 118 +++++++
.../paimon/flink/sink/LocalMergeOperator.java | 108 +++---
.../org/apache/paimon/flink/FirstRowITCase.java | 2 +-
.../apache/paimon/flink/PartialUpdateITCase.java | 4 +-
.../apache/paimon/flink/PreAggregationITCase.java | 2 +-
.../flink/PrimaryKeyFileStoreTableITCase.java | 2 +-
.../paimon/flink/sink/LocalMergeOperatorTest.java | 212 ++++++++++++
19 files changed, 1948 insertions(+), 46 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
index 7068e25e6..d08c580be 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
@@ -21,7 +21,11 @@ package org.apache.paimon.data;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.TimestampType;
import javax.annotation.Nullable;
@@ -442,4 +446,32 @@ public final class BinaryRow extends BinarySection
implements InternalRow, DataS
writer.complete();
return row;
}
+
+ /**
+ * If it is a fixed-length field, we can call this BinaryRowData's setXX
method for in-place
+ * updates. If it is variable-length field, can't use this method, because
the underlying data
+ * is stored continuously.
+ */
+ public static boolean isInFixedLengthPart(DataType type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ return true;
+ case DECIMAL:
+ return Decimal.isCompact(((DecimalType) type).getPrecision());
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return Timestamp.isCompact(((TimestampType)
type).getPrecision());
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return Timestamp.isCompact(((LocalZonedTimestampType)
type).getPrecision());
+ default:
+ return false;
+ }
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java
b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java
index 4c4f3f978..a83cbaec7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java
@@ -244,4 +244,98 @@ public interface InternalRow extends DataGetters {
@Nullable
Object getFieldOrNull(InternalRow row);
}
+
+ /**
+ * Creates a {@link FieldSetter} for setting elements to a row from a row
at the given position.
+ *
+ * @param fieldType the element type of the row
+ * @param fieldPos the element position of the row
+ */
+ static FieldSetter createFieldSetter(DataType fieldType, int fieldPos) {
+ final FieldSetter fieldSetter;
+ // ordered by type root definition
+ switch (fieldType.getTypeRoot()) {
+ case BOOLEAN:
+ fieldSetter = (from, to) -> to.setBoolean(fieldPos,
from.getBoolean(fieldPos));
+ break;
+ case DECIMAL:
+ final int decimalPrecision = getPrecision(fieldType);
+ final int decimalScale = getScale(fieldType);
+ fieldSetter =
+ (from, to) ->
+ to.setDecimal(
+ fieldPos,
+ from.getDecimal(fieldPos,
decimalPrecision, decimalScale),
+ decimalPrecision);
+ if (fieldType.isNullable() &&
!Decimal.isCompact(decimalPrecision)) {
+ return (from, to) -> {
+ if (from.isNullAt(fieldPos)) {
+ to.setNullAt(fieldPos);
+ to.setDecimal(fieldPos, null, decimalPrecision);
+ } else {
+ fieldSetter.setFieldFrom(from, to);
+ }
+ };
+ }
+ break;
+ case TINYINT:
+ fieldSetter = (from, to) -> to.setByte(fieldPos,
from.getByte(fieldPos));
+ break;
+ case SMALLINT:
+ fieldSetter = (from, to) -> to.setShort(fieldPos,
from.getShort(fieldPos));
+ break;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ fieldSetter = (from, to) -> to.setInt(fieldPos,
from.getInt(fieldPos));
+ break;
+ case BIGINT:
+ fieldSetter = (from, to) -> to.setLong(fieldPos,
from.getLong(fieldPos));
+ break;
+ case FLOAT:
+ fieldSetter = (from, to) -> to.setFloat(fieldPos,
from.getFloat(fieldPos));
+ break;
+ case DOUBLE:
+ fieldSetter = (from, to) -> to.setDouble(fieldPos,
from.getDouble(fieldPos));
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final int timestampPrecision = getPrecision(fieldType);
+ fieldSetter =
+ (from, to) ->
+ to.setTimestamp(
+ fieldPos,
+ from.getTimestamp(fieldPos,
timestampPrecision),
+ timestampPrecision);
+ if (fieldType.isNullable() &&
!Timestamp.isCompact(timestampPrecision)) {
+ return (from, to) -> {
+ if (from.isNullAt(fieldPos)) {
+ to.setNullAt(fieldPos);
+ to.setTimestamp(fieldPos, null,
timestampPrecision);
+ } else {
+ fieldSetter.setFieldFrom(from, to);
+ }
+ };
+ }
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("type %s not support for setting",
fieldType));
+ }
+ if (!fieldType.isNullable()) {
+ return fieldSetter;
+ }
+ return (from, to) -> {
+ if (from.isNullAt(fieldPos)) {
+ to.setNullAt(fieldPos);
+ } else {
+ fieldSetter.setFieldFrom(from, to);
+ }
+ };
+ }
+
+ /** Accessor for setting the field of a row during runtime. */
+ interface FieldSetter extends Serializable {
+ void setFieldFrom(DataGetters from, DataSetters to);
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
index e5f3c9e7d..8773e734d 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
@@ -107,6 +107,11 @@ public class BinaryRowSerializer extends
AbstractRowDataSerializer<BinaryRow> {
// ============================ Page related operations
===================================
+ @Override
+ public BinaryRow createReuseInstance() {
+ return new BinaryRow(numFields);
+ }
+
@Override
public int serializeToPages(BinaryRow record, AbstractPagedOutputView
headerLessView)
throws IOException {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index 8a32a222c..ac8cc34e0 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -167,6 +167,11 @@ public class InternalRowSerializer extends
AbstractRowDataSerializer<InternalRow
return reuseRow;
}
+ @Override
+ public InternalRow createReuseInstance() {
+ return binarySerializer.createReuseInstance();
+ }
+
@Override
public int serializeToPages(InternalRow row, AbstractPagedOutputView
target)
throws IOException {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/PagedTypeSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/PagedTypeSerializer.java
index f916d01b5..ede6c9b10 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/PagedTypeSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/PagedTypeSerializer.java
@@ -27,6 +27,9 @@ import java.io.IOException;
/** A type serializer which provides paged serialize and deserialize methods.
*/
public interface PagedTypeSerializer<T> extends Serializer<T> {
+ /** Creates a new instance for reusing. */
+ T createReuseInstance();
+
/**
* Serializes the given record to the given target paged output view. Some
implementations may
* skip some bytes if current page does not have enough space left, .e.g
{@link BinaryRow}.
diff --git a/paimon-core/src/main/java/org/apache/paimon/hash/BytesHashMap.java
b/paimon-core/src/main/java/org/apache/paimon/hash/BytesHashMap.java
new file mode 100644
index 000000000..d739289e6
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/hash/BytesHashMap.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hash;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.AbstractPagedInputView;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.RandomAccessInputView;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.PagedTypeSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.KeyValueIterator;
+import org.apache.paimon.utils.MathUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Bytes based hash map. It can be used for performing aggregations where the
aggregated values are
+ * fixed-width, because the data is stored in continuous memory, AggBuffer of
variable length cannot
+ * be applied to this HashMap. The KeyValue form in hash map is designed to
reduce the cost of key
+ * fetching in lookup. The memory is divided into two areas:
+ *
+ * <p>Bucket area: pointer + hashcode.
+ *
+ * <ul>
+ * <li>Bytes 0 to 4: a pointer to the record in the record area
+ * <li>Bytes 4 to 8: key's full 32-bit hashcode
+ * </ul>
+ *
+ * <p>Record area: the actual data in linked list records, a record has four
parts:
+ *
+ * <ul>
+ * <li>Bytes 0 to 4: len(k)
+ * <li>Bytes 4 to 4 + len(k): key data
+ * <li>Bytes 4 + len(k) to 8 + len(k): len(v)
+ * <li>Bytes 8 + len(k) to 8 + len(k) + len(v): value data
+ * </ul>
+ *
+ * <p>{@code BytesHashMap} are influenced by Apache Spark BytesToBytesMap.
+ */
+public class BytesHashMap<K> extends BytesMap<K, BinaryRow> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BytesHashMap.class);
+
+ /**
+ * Set true when valueTypeInfos.length == 0. Usually in this case the
BytesHashMap will be used
+ * as a HashSet. The value from {@link BytesHashMap#append(LookupInfo
info, BinaryRow value)}
+ * will be ignored when hashSetMode set. The reusedValue will always point
to a 16 bytes long
+ * MemorySegment acted as each BytesHashMap entry's value part when
appended to make the
+ * BytesHashMap's spilling work compatible.
+ */
+ private final boolean hashSetMode;
+
+ /** Used to serialize map key into RecordArea's MemorySegments. */
+ protected final PagedTypeSerializer<K> keySerializer;
+
+ /** Used to serialize hash map value into RecordArea's MemorySegments. */
+ private final BinaryRowSerializer valueSerializer;
+
+ private volatile RecordArea.EntryIterator destructiveIterator = null;
+
+ public BytesHashMap(
+ MemorySegmentPool memoryPool, PagedTypeSerializer<K>
keySerializer, int valueArity) {
+ super(memoryPool, keySerializer);
+
+ this.recordArea = new RecordArea();
+
+ this.keySerializer = keySerializer;
+ this.valueSerializer = new BinaryRowSerializer(valueArity);
+ if (valueArity == 0) {
+ this.hashSetMode = true;
+ this.reusedValue = new BinaryRow(0);
+ this.reusedValue.pointTo(MemorySegment.wrap(new byte[8]), 0, 8);
+ LOG.info("BytesHashMap with hashSetMode = true.");
+ } else {
+ this.hashSetMode = false;
+ this.reusedValue = this.valueSerializer.createInstance();
+ }
+
+ final int initBucketSegmentNum =
+ MathUtils.roundDownToPowerOf2((int)
(INIT_BUCKET_MEMORY_IN_BYTES / segmentSize));
+
+ // allocate and initialize MemorySegments for bucket area
+ initBucketSegments(initBucketSegmentNum);
+
+ LOG.info(
+ "BytesHashMap with initial memory segments {}, {} in bytes,
init allocating {} for bucket area.",
+ reservedNumBuffers,
+ reservedNumBuffers * segmentSize,
+ initBucketSegmentNum);
+ }
+
+ // ----------------------- Abstract Interface -----------------------
+
+ @Override
+ public long getNumKeys() {
+ return numElements;
+ }
+
+ // ----------------------- Public interface -----------------------
+
+ /**
+ * Append an value into the hash map's record area.
+ *
+ * @return An BinaryRow mapping to the memory segments in the map's record
area belonging to the
+ * newly appended value.
+ * @throws EOFException if the map can't allocate much more memory.
+ */
+ public BinaryRow append(LookupInfo<K, BinaryRow> lookupInfo, BinaryRow
value)
+ throws IOException {
+ try {
+ if (numElements >= growthThreshold) {
+ growAndRehash();
+ // update info's bucketSegmentIndex and bucketOffset
+ lookup(lookupInfo.key);
+ }
+ BinaryRow toAppend = hashSetMode ? reusedValue : value;
+ int pointerToAppended = recordArea.appendRecord(lookupInfo,
toAppend);
+ bucketSegments
+ .get(lookupInfo.bucketSegmentIndex)
+ .putInt(lookupInfo.bucketOffset, pointerToAppended);
+ bucketSegments
+ .get(lookupInfo.bucketSegmentIndex)
+ .putInt(lookupInfo.bucketOffset + ELEMENT_POINT_LENGTH,
lookupInfo.keyHashCode);
+ numElements++;
+ recordArea.setReadPosition(pointerToAppended);
+ ((RecordArea) recordArea).skipKey();
+ return recordArea.readValue(reusedValue);
+ } catch (EOFException e) {
+ numSpillFiles++;
+ spillInBytes += recordArea.getSegmentsSize();
+ throw e;
+ }
+ }
+
+ public long getNumSpillFiles() {
+ return numSpillFiles;
+ }
+
+ public long getUsedMemoryInBytes() {
+ return bucketSegments.size() * ((long) segmentSize) +
recordArea.getSegmentsSize();
+ }
+
+ public long getSpillInBytes() {
+ return spillInBytes;
+ }
+
+ public int getNumElements() {
+ return numElements;
+ }
+
+ /** Returns an iterator for iterating over the entries of this map. */
+ @SuppressWarnings("WeakerAccess")
+ public KeyValueIterator<K, BinaryRow> getEntryIterator(boolean
requiresCopy) {
+ if (destructiveIterator != null) {
+ throw new IllegalArgumentException(
+ "DestructiveIterator is not null, so this method can't be
invoke!");
+ }
+ return ((RecordArea) recordArea).entryIterator(requiresCopy);
+ }
+
+ /** @return the underlying memory segments of the hash map's record area */
+ @SuppressWarnings("WeakerAccess")
+ public ArrayList<MemorySegment> getRecordAreaMemorySegments() {
+ return ((RecordArea) recordArea).segments;
+ }
+
+ @SuppressWarnings("WeakerAccess")
+ public List<MemorySegment> getBucketAreaMemorySegments() {
+ return bucketSegments;
+ }
+
+ public void free() {
+ recordArea.release();
+ destructiveIterator = null;
+ super.free();
+ }
+
+ /** reset the map's record and bucket area's memory segments for reusing.
*/
+ public void reset() {
+ // reset the record segments.
+ recordArea.reset();
+ destructiveIterator = null;
+ super.reset();
+ }
+
+ /**
+ * @return true when BytesHashMap's valueTypeInfos.length == 0. Any
appended value will be
+ * ignored and replaced with a reusedValue as a present tag.
+ */
+ @VisibleForTesting
+ boolean isHashSetMode() {
+ return hashSetMode;
+ }
+
+ // ----------------------- Private methods -----------------------
+
+ static int getVariableLength(DataType[] types) {
+ int length = 0;
+ for (DataType type : types) {
+ if (!BinaryRow.isInFixedLengthPart(type)) {
+ // find a better way of computing generic type field
variable-length
+ // right now we use a small value assumption
+ length += 16;
+ }
+ }
+ return length;
+ }
+
+ // ----------------------- Record Area -----------------------
+
+ private final class RecordArea implements BytesMap.RecordArea<K,
BinaryRow> {
+ private final ArrayList<MemorySegment> segments = new ArrayList<>();
+
+ private final RandomAccessInputView inView;
+ private final SimpleCollectingOutputView outView;
+
+ RecordArea() {
+ this.outView = new SimpleCollectingOutputView(segments,
memoryPool, segmentSize);
+ this.inView = new RandomAccessInputView(segments, segmentSize);
+ }
+
+ public void release() {
+ returnSegments(segments);
+ segments.clear();
+ }
+
+ public void reset() {
+ release();
+ // request a new memory segment from freeMemorySegments
+ // reset segmentNum and positionInSegment
+ outView.reset();
+ inView.setReadPosition(0);
+ }
+
+ // ----------------------- Append -----------------------
+ public int appendRecord(LookupInfo<K, BinaryRow> lookupInfo, BinaryRow
value)
+ throws IOException {
+ final long oldLastPosition = outView.getCurrentOffset();
+ // serialize the key into the BytesHashMap record area
+ int skip = keySerializer.serializeToPages(lookupInfo.getKey(),
outView);
+ long offset = oldLastPosition + skip;
+
+ // serialize the value into the BytesHashMap record area
+ valueSerializer.serializeToPages(value, outView);
+ if (offset > Integer.MAX_VALUE) {
+ LOG.warn(
+ "We can't handle key area with more than
Integer.MAX_VALUE bytes,"
+ + " because the pointer is a integer.");
+ throw new EOFException();
+ }
+ return (int) offset;
+ }
+
+ @Override
+ public long getSegmentsSize() {
+ return segments.size() * ((long) segmentSize);
+ }
+
+ // ----------------------- Read -----------------------
+ public void setReadPosition(int position) {
+ inView.setReadPosition(position);
+ }
+
+ public boolean readKeyAndEquals(K lookupKey) throws IOException {
+ reusedKey = keySerializer.mapFromPages(reusedKey, inView);
+ return lookupKey.equals(reusedKey);
+ }
+
+ /** @throws IOException when invalid memory address visited. */
+ void skipKey() throws IOException {
+ keySerializer.skipRecordFromPages(inView);
+ }
+
+ public BinaryRow readValue(BinaryRow reuse) throws IOException {
+ // depends on BinaryRowSerializer to check writing skip
+ // and to find the real start offset of the data
+ return valueSerializer.mapFromPages(reuse, inView);
+ }
+
+ // ----------------------- Iterator -----------------------
+
+ private KeyValueIterator<K, BinaryRow> entryIterator(boolean
requiresCopy) {
+ return new EntryIterator(requiresCopy);
+ }
+
+ private final class EntryIterator extends AbstractPagedInputView
+ implements KeyValueIterator<K, BinaryRow> {
+
+ private int count = 0;
+ private int currentSegmentIndex = 0;
+ private final boolean requiresCopy;
+
+ private EntryIterator(boolean requiresCopy) {
+ super(segments.get(0), segmentSize);
+ destructiveIterator = this;
+ this.requiresCopy = requiresCopy;
+ }
+
+ @Override
+ public boolean advanceNext() throws IOException {
+ if (count < numElements) {
+ count++;
+ // segment already is useless any more.
+ keySerializer.mapFromPages(reusedKey, this);
+ valueSerializer.mapFromPages(reusedValue, this);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public K getKey() {
+ return requiresCopy ? keySerializer.copy(reusedKey) :
reusedKey;
+ }
+
+ @Override
+ public BinaryRow getValue() {
+ return requiresCopy ? reusedValue.copy() : reusedValue;
+ }
+
+ public boolean hasNext() {
+ return count < numElements;
+ }
+
+ @Override
+ protected int getLimitForSegment(MemorySegment segment) {
+ return segmentSize;
+ }
+
+ @Override
+ protected MemorySegment nextSegment(MemorySegment current) {
+ return segments.get(++currentSegmentIndex);
+ }
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/hash/BytesMap.java
b/paimon-core/src/main/java/org/apache/paimon/hash/BytesMap.java
new file mode 100644
index 000000000..215cfface
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/hash/BytesMap.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hash;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.PagedTypeSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.utils.MathUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base class for {@code BytesHashMap}.
+ *
+ * @param <K> type of the map key.
+ * @param <V> type of the map value.
+ */
+public abstract class BytesMap<K, V> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BytesMap.class);
+
+ public static final int BUCKET_SIZE = 8;
+ protected static final int END_OF_LIST = Integer.MAX_VALUE;
+ protected static final int STEP_INCREMENT = 1;
+ protected static final int ELEMENT_POINT_LENGTH = 4;
+ public static final int RECORD_EXTRA_LENGTH = 8;
+ protected static final int BUCKET_SIZE_BITS = 3;
+
+ protected final int numBucketsPerSegment;
+ protected final int numBucketsPerSegmentBits;
+ protected final int numBucketsPerSegmentMask;
+ protected final int lastBucketPosition;
+
+ protected final int segmentSize;
+ protected final MemorySegmentPool memoryPool;
+ protected List<MemorySegment> bucketSegments;
+
+ protected final int reservedNumBuffers;
+
+ protected int numElements = 0;
+ protected int numBucketsMask;
+ // get the second hashcode based log2NumBuckets and numBucketsMask2
+ protected int log2NumBuckets;
+ protected int numBucketsMask2;
+
+ protected static final double LOAD_FACTOR = 0.75;
+ // a smaller bucket can make the best of l1/l2/l3 cache.
+ protected static final long INIT_BUCKET_MEMORY_IN_BYTES = 1024 * 1024L;
+
+ /** The map will be expanded once the number of elements exceeds this
threshold. */
+ protected int growthThreshold;
+
+ /** The segments where the actual data is stored. */
+ protected RecordArea<K, V> recordArea;
+
+ /** Used as a reused object when lookup and iteration. */
+ protected K reusedKey;
+
+ /** Used as a reused object when retrieve the map's value by key and
iteration. */
+ protected V reusedValue;
+
+ /** Used as a reused object which lookup returned. */
+ private final LookupInfo<K, V> reuseLookupInfo;
+
+ // metric
+ protected long numSpillFiles;
+ protected long spillInBytes;
+
+ public BytesMap(MemorySegmentPool memoryPool, PagedTypeSerializer<K>
keySerializer) {
+ this.memoryPool = memoryPool;
+ this.segmentSize = memoryPool.pageSize();
+ this.reservedNumBuffers = memoryPool.freePages();
+ this.numBucketsPerSegment = segmentSize / BUCKET_SIZE;
+ this.numBucketsPerSegmentBits =
MathUtils.log2strict(this.numBucketsPerSegment);
+ this.numBucketsPerSegmentMask = (1 << this.numBucketsPerSegmentBits) -
1;
+ this.lastBucketPosition = (numBucketsPerSegment - 1) * BUCKET_SIZE;
+
+ this.reusedKey = keySerializer.createReuseInstance();
+ this.reuseLookupInfo = new LookupInfo<>();
+ }
+
+ /** Returns the number of keys in this map. */
+ public abstract long getNumKeys();
+
+ protected void initBucketSegments(int numBucketSegments) {
+ if (numBucketSegments < 1) {
+ throw new RuntimeException("Too small memory allocated for
BytesHashMap");
+ }
+ this.bucketSegments = new ArrayList<>(numBucketSegments);
+ for (int i = 0; i < numBucketSegments; i++) {
+ MemorySegment segment = memoryPool.nextSegment();
+ if (segment == null) {
+ throw new RuntimeException("Memory for hash map is too
small.");
+ }
+ bucketSegments.add(i, segment);
+ }
+
+ resetBucketSegments(this.bucketSegments);
+ int numBuckets = numBucketSegments * numBucketsPerSegment;
+ this.log2NumBuckets = MathUtils.log2strict(numBuckets);
+ this.numBucketsMask = (1 << MathUtils.log2strict(numBuckets)) - 1;
+ this.numBucketsMask2 = (1 << MathUtils.log2strict(numBuckets >> 1)) -
1;
+ this.growthThreshold = (int) (numBuckets * LOAD_FACTOR);
+ }
+
+ protected void resetBucketSegments(List<MemorySegment> resetBucketSegs) {
+ for (MemorySegment segment : resetBucketSegs) {
+ for (int j = 0; j <= lastBucketPosition; j += BUCKET_SIZE) {
+ segment.putInt(j, END_OF_LIST);
+ }
+ }
+ }
+
+ public long getNumSpillFiles() {
+ return numSpillFiles;
+ }
+
+ public long getSpillInBytes() {
+ return spillInBytes;
+ }
+
+ public int getNumElements() {
+ return numElements;
+ }
+
+ public void free() {
+ returnSegments(this.bucketSegments);
+ this.bucketSegments.clear();
+ numElements = 0;
+ }
+
+ /** reset the map's record and bucket area's memory segments for reusing.
*/
+ public void reset() {
+ setBucketVariables(bucketSegments);
+ resetBucketSegments(bucketSegments);
+ numElements = 0;
+ LOG.debug(
+ "reset BytesHashMap with record memory segments {}, {} in
bytes, init allocating {} for bucket area.",
+ memoryPool.freePages(),
+ memoryPool.freePages() * segmentSize,
+ bucketSegments.size());
+ }
+
+ /**
+ * @param key by which looking up the value in the hash map. Only support
the key in the
+ * BinaryRowData form who has only one MemorySegment.
+ * @return {@link LookupInfo}
+ */
+ public LookupInfo<K, V> lookup(K key) {
+ final int hashCode1 = key.hashCode();
+ int newPos = hashCode1 & numBucketsMask;
+ // which segment contains the bucket
+ int bucketSegmentIndex = newPos >>> numBucketsPerSegmentBits;
+ // offset of the bucket in the segment
+ int bucketOffset = (newPos & numBucketsPerSegmentMask) <<
BUCKET_SIZE_BITS;
+
+ boolean found = false;
+ int step = STEP_INCREMENT;
+ int hashCode2 = 0;
+ int findElementPtr;
+ try {
+ do {
+ findElementPtr =
bucketSegments.get(bucketSegmentIndex).getInt(bucketOffset);
+ if (findElementPtr == END_OF_LIST) {
+ // This is a new key.
+ break;
+ } else {
+ final int storedHashCode =
+ bucketSegments
+ .get(bucketSegmentIndex)
+ .getInt(bucketOffset +
ELEMENT_POINT_LENGTH);
+ if (hashCode1 == storedHashCode) {
+ recordArea.setReadPosition(findElementPtr);
+ if (recordArea.readKeyAndEquals(key)) {
+ // we found an element with a matching key, and
not just a hash
+ // collision
+ found = true;
+ reusedValue = recordArea.readValue(reusedValue);
+ break;
+ }
+ }
+ }
+ if (step == 1) {
+ hashCode2 = calcSecondHashCode(hashCode1);
+ }
+ newPos = (hashCode1 + step * hashCode2) & numBucketsMask;
+ // which segment contains the bucket
+ bucketSegmentIndex = newPos >>> numBucketsPerSegmentBits;
+ // offset of the bucket in the segment
+ bucketOffset = (newPos & numBucketsPerSegmentMask) <<
BUCKET_SIZE_BITS;
+ step += STEP_INCREMENT;
+ } while (true);
+ } catch (IOException ex) {
+ throw new RuntimeException(
+ "Error reading record from the aggregate map: " +
ex.getMessage(), ex);
+ }
+ reuseLookupInfo.set(found, hashCode1, key, reusedValue,
bucketSegmentIndex, bucketOffset);
+ return reuseLookupInfo;
+ }
+
+ /** @throws EOFException if the map can't allocate much more memory. */
+ protected void growAndRehash() throws EOFException {
+ // allocate the new data structures
+ int required = 2 * bucketSegments.size();
+ if (required * (long) numBucketsPerSegment > Integer.MAX_VALUE) {
+ LOG.warn(
+ "We can't handle more than Integer.MAX_VALUE buckets (eg.
because hash functions return int)");
+ throw new EOFException();
+ }
+
+ int numAllocatedSegments = required - memoryPool.freePages();
+ if (numAllocatedSegments > 0) {
+ LOG.warn(
+ "BytesHashMap can't allocate {} pages, and now used {}
pages",
+ required,
+ reservedNumBuffers);
+ throw new EOFException();
+ }
+
+ List<MemorySegment> newBucketSegments = new ArrayList<>(required);
+ for (int i = 0; i < required; i++) {
+ newBucketSegments.add(memoryPool.nextSegment());
+ }
+ setBucketVariables(newBucketSegments);
+
+ long reHashStartTime = System.currentTimeMillis();
+ resetBucketSegments(newBucketSegments);
+ // Re-mask (we don't recompute the hashcode because we stored all 32
bits of it)
+ for (MemorySegment memorySegment : bucketSegments) {
+ for (int j = 0; j < numBucketsPerSegment; j++) {
+ final int recordPointer = memorySegment.getInt(j *
BUCKET_SIZE);
+ if (recordPointer != END_OF_LIST) {
+ final int hashCode1 =
+ memorySegment.getInt(j * BUCKET_SIZE +
ELEMENT_POINT_LENGTH);
+ int newPos = hashCode1 & numBucketsMask;
+ int bucketSegmentIndex = newPos >>>
numBucketsPerSegmentBits;
+ int bucketOffset = (newPos & numBucketsPerSegmentMask) <<
BUCKET_SIZE_BITS;
+ int step = STEP_INCREMENT;
+ long hashCode2 = 0;
+ while
(newBucketSegments.get(bucketSegmentIndex).getInt(bucketOffset)
+ != END_OF_LIST) {
+ if (step == 1) {
+ hashCode2 = calcSecondHashCode(hashCode1);
+ }
+ newPos = (int) ((hashCode1 + step * hashCode2) &
numBucketsMask);
+ // which segment contains the bucket
+ bucketSegmentIndex = newPos >>>
numBucketsPerSegmentBits;
+ // offset of the bucket in the segment
+ bucketOffset = (newPos & numBucketsPerSegmentMask) <<
BUCKET_SIZE_BITS;
+ step += STEP_INCREMENT;
+ }
+
newBucketSegments.get(bucketSegmentIndex).putInt(bucketOffset, recordPointer);
+ newBucketSegments
+ .get(bucketSegmentIndex)
+ .putInt(bucketOffset + ELEMENT_POINT_LENGTH,
hashCode1);
+ }
+ }
+ }
+ LOG.info(
+ "The rehash take {} ms for {} segments",
+ (System.currentTimeMillis() - reHashStartTime),
+ required);
+ this.memoryPool.returnAll(this.bucketSegments);
+ this.bucketSegments = newBucketSegments;
+ }
+
+ protected void returnSegments(List<MemorySegment> segments) {
+ memoryPool.returnAll(segments);
+ }
+
+ private void setBucketVariables(List<MemorySegment> bucketSegments) {
+ int numBuckets = bucketSegments.size() * numBucketsPerSegment;
+ this.log2NumBuckets = MathUtils.log2strict(numBuckets);
+ this.numBucketsMask = (1 << MathUtils.log2strict(numBuckets)) - 1;
+ this.numBucketsMask2 = (1 << MathUtils.log2strict(numBuckets >> 1)) -
1;
+ this.growthThreshold = (int) (numBuckets * LOAD_FACTOR);
+ }
+
+ // M(the num of buckets) is the nth power of 2, so the second hash code
must be odd, and always
+ // is
+ // H2(K) = 1 + 2 * ((H1(K)/M) mod (M-1))
+ protected int calcSecondHashCode(final int firstHashCode) {
+ return ((((firstHashCode >> log2NumBuckets)) & numBucketsMask2) << 1)
+ 1;
+ }
+
+ /** Record area. */
+ interface RecordArea<K, V> {
+
+ void setReadPosition(int position);
+
+ boolean readKeyAndEquals(K lookupKey) throws IOException;
+
+ V readValue(V reuse) throws IOException;
+
+ int appendRecord(LookupInfo<K, V> lookupInfo, BinaryRow value) throws
IOException;
+
+ long getSegmentsSize();
+
+ void release();
+
+ void reset();
+ }
+
+ /** Result fetched when looking up a key. */
+ public static final class LookupInfo<K, V> {
+ boolean found;
+ K key;
+ V value;
+
+ /**
+ * The hashcode of the look up key passed to {@link
BytesMap#lookup(K)}, Caching this
+ * hashcode here allows us to avoid re-hashing the key when inserting
a value for that key.
+ * The same purpose with bucketSegmentIndex, bucketOffset.
+ */
+ int keyHashCode;
+
+ int bucketSegmentIndex;
+ int bucketOffset;
+
+ LookupInfo() {
+ this.found = false;
+ this.keyHashCode = -1;
+ this.key = null;
+ this.value = null;
+ this.bucketSegmentIndex = -1;
+ this.bucketOffset = -1;
+ }
+
+ void set(
+ boolean found,
+ int keyHashCode,
+ K key,
+ V value,
+ int bucketSegmentIndex,
+ int bucketOffset) {
+ this.found = found;
+ this.keyHashCode = keyHashCode;
+ this.key = key;
+ this.value = value;
+ this.bucketSegmentIndex = bucketSegmentIndex;
+ this.bucketOffset = bucketOffset;
+ }
+
+ public boolean isFound() {
+ return found;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/HashMapLocalMerger.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/HashMapLocalMerger.java
new file mode 100644
index 000000000..1a395fb36
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/HashMapLocalMerger.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.localmerge;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalRow.FieldSetter;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.hash.BytesHashMap;
+import org.apache.paimon.hash.BytesMap.LookupInfo;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FieldsComparator;
+import org.apache.paimon.utils.KeyValueIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.apache.paimon.data.InternalRow.createFieldSetter;
+
+/** A {@link LocalMerger} which stores records in {@link BytesHashMap}. */
+public class HashMapLocalMerger implements LocalMerger {
+
+ private final InternalRowSerializer valueSerializer;
+ private final MergeFunction<KeyValue> mergeFunction;
+ @Nullable private final FieldsComparator udsComparator;
+ private final BytesHashMap<BinaryRow> buffer;
+ private final List<FieldSetter> nonKeySetters;
+
+ public HashMapLocalMerger(
+ RowType rowType,
+ List<String> primaryKeys,
+ MemorySegmentPool memoryPool,
+ MergeFunction<KeyValue> mergeFunction,
+ @Nullable FieldsComparator userDefinedSeqComparator) {
+ this.valueSerializer = new InternalRowSerializer(rowType);
+ this.mergeFunction = mergeFunction;
+ this.udsComparator = userDefinedSeqComparator;
+ this.buffer =
+ new BytesHashMap<>(
+ memoryPool,
+ new BinaryRowSerializer(primaryKeys.size()),
+ rowType.getFieldCount());
+
+ this.nonKeySetters = new ArrayList<>();
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ DataField field = rowType.getFields().get(i);
+ if (primaryKeys.contains(field.name())) {
+ continue;
+ }
+ nonKeySetters.add(createFieldSetter(field.type(), i));
+ }
+ }
+
+ @Override
+ public boolean put(RowKind rowKind, BinaryRow key, InternalRow value)
throws IOException {
+ // we store row kind in value
+ value.setRowKind(rowKind);
+
+ LookupInfo<BinaryRow, BinaryRow> lookup = buffer.lookup(key);
+ if (!lookup.isFound()) {
+ try {
+ buffer.append(lookup, valueSerializer.toBinaryRow(value));
+ return true;
+ } catch (EOFException eof) {
+ return false;
+ }
+ }
+
+ mergeFunction.reset();
+ BinaryRow stored = lookup.getValue();
+ KeyValue previousKv = new KeyValue().replace(key, stored.getRowKind(),
stored);
+ KeyValue newKv = new KeyValue().replace(key, value.getRowKind(),
value);
+ if (udsComparator != null && udsComparator.compare(stored, value) > 0)
{
+ mergeFunction.add(newKv);
+ mergeFunction.add(previousKv);
+ } else {
+ mergeFunction.add(previousKv);
+ mergeFunction.add(newKv);
+ }
+
+ KeyValue result = mergeFunction.getResult();
+ stored.setRowKind(result.valueKind());
+ for (FieldSetter setter : nonKeySetters) {
+ setter.setFieldFrom(result.value(), stored);
+ }
+ return true;
+ }
+
+ @Override
+ public int size() {
+ return buffer.getNumElements();
+ }
+
+ @Override
+ public void forEach(Consumer<InternalRow> consumer) throws IOException {
+ KeyValueIterator<BinaryRow, BinaryRow> iterator =
buffer.getEntryIterator(false);
+ while (iterator.advanceNext()) {
+ consumer.accept(iterator.getValue());
+ }
+ }
+
+ @Override
+ public void clear() {
+ buffer.reset();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/LocalMerger.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/LocalMerger.java
new file mode 100644
index 000000000..bec71808a
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/LocalMerger.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.localmerge;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowKind;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+/** Local merger to merge in memory. */
+public interface LocalMerger {
+
+ boolean put(RowKind rowKind, BinaryRow key, InternalRow value) throws
IOException;
+
+ int size();
+
+ void forEach(Consumer<InternalRow> consumer) throws IOException;
+
+ void clear();
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/SortBufferLocalMerger.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/SortBufferLocalMerger.java
new file mode 100644
index 000000000..198e6c67d
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/SortBufferLocalMerger.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.localmerge;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.mergetree.SortBufferWriteBuffer;
+import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.types.RowKind;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+/** A {@link LocalMerger} which stores records in {@link
SortBufferWriteBuffer}. */
+public class SortBufferLocalMerger implements LocalMerger {
+
+ private final SortBufferWriteBuffer sortBuffer;
+ private final RecordComparator keyComparator;
+ private final MergeFunction<KeyValue> mergeFunction;
+
+ private long recordCount;
+
+ public SortBufferLocalMerger(
+ SortBufferWriteBuffer sortBuffer,
+ RecordComparator keyComparator,
+ MergeFunction<KeyValue> mergeFunction) {
+ this.sortBuffer = sortBuffer;
+ this.keyComparator = keyComparator;
+ this.mergeFunction = mergeFunction;
+ this.recordCount = 0;
+ }
+
+ @Override
+ public boolean put(RowKind rowKind, BinaryRow key, InternalRow value)
throws IOException {
+ return sortBuffer.put(recordCount++, rowKind, key, value);
+ }
+
+ @Override
+ public int size() {
+ return sortBuffer.size();
+ }
+
+ @Override
+ public void forEach(Consumer<InternalRow> consumer) throws IOException {
+ sortBuffer.forEach(
+ keyComparator,
+ mergeFunction,
+ null,
+ kv -> {
+ InternalRow row = kv.value();
+ row.setRowKind(kv.valueKind());
+ consumer.accept(row);
+ });
+ }
+
+ @Override
+ public void clear() {
+ sortBuffer.clear();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTest.java
b/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTest.java
new file mode 100644
index 000000000..4020e3d48
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hash;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.types.DataType;
+
+/** Test case for {@link BytesHashMap}. */
+public class BytesHashMapTest extends BytesHashMapTestBase<BinaryRow> {
+
+ public BytesHashMapTest() {
+ super(new BinaryRowSerializer(KEY_TYPES.length));
+ }
+
+ @Override
+ public BytesHashMap<BinaryRow> createBytesHashMap(
+ MemorySegmentPool pool, DataType[] keyTypes, DataType[]
valueTypes) {
+ return new BytesHashMap<>(
+ pool, new BinaryRowSerializer(keyTypes.length),
valueTypes.length);
+ }
+
+ @Override
+ public BinaryRow[] generateRandomKeys(int num) {
+ return getRandomizedInputs(num);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTestBase.java
new file mode 100644
index 000000000..01bbd46e6
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTestBase.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hash;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.RandomAccessInputView;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.PagedTypeSerializer;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.KeyValueIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base test class for both {@link BytesHashMap}. */
+abstract class BytesHashMapTestBase<K> extends BytesMapTestBase {
+
+ private static final int NUM_REWRITES = 10;
+
+ static final DataType[] KEY_TYPES =
+ new DataType[] {
+ new IntType(),
+ VarCharType.STRING_TYPE,
+ new DoubleType(),
+ new BigIntType(),
+ new BooleanType(),
+ new FloatType(),
+ new SmallIntType()
+ };
+
+ static final DataType[] VALUE_TYPES =
+ new DataType[] {
+ new DoubleType(),
+ new BigIntType(),
+ new BooleanType(),
+ new FloatType(),
+ new SmallIntType()
+ };
+
+ protected final BinaryRow defaultValue;
+ protected final PagedTypeSerializer<K> keySerializer;
+ protected final BinaryRowSerializer valueSerializer;
+
+ public BytesHashMapTestBase(PagedTypeSerializer<K> keySerializer) {
+ this.keySerializer = keySerializer;
+ this.valueSerializer = new BinaryRowSerializer(VALUE_TYPES.length);
+ this.defaultValue = valueSerializer.createInstance();
+ int valueSize = defaultValue.getFixedLengthPartSize();
+ this.defaultValue.pointTo(MemorySegment.wrap(new byte[valueSize]), 0,
valueSize);
+ }
+
+ /** Creates the specific BytesHashMap, either {@link BytesHashMap}. */
+ public abstract BytesHashMap<K> createBytesHashMap(
+ MemorySegmentPool memorySegmentPool, DataType[] keyTypes,
DataType[] valueTypes);
+
+ /**
+ * Generates {@code num} random keys, the types of key fields are defined
in {@link #KEY_TYPES}.
+ */
+ public abstract K[] generateRandomKeys(int num);
+
+ //
------------------------------------------------------------------------------------------
+ // Tests
+ //
------------------------------------------------------------------------------------------
+
+ @Test
+ void testHashSetMode() throws IOException {
+ final int numMemSegments =
+ needNumMemSegments(
+ NUM_ENTRIES,
+ rowLength(RowType.of(VALUE_TYPES)),
+ rowLength(RowType.of(KEY_TYPES)),
+ PAGE_SIZE);
+ int memorySize = numMemSegments * PAGE_SIZE;
+ MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize,
PAGE_SIZE);
+
+ BytesHashMap<K> table = createBytesHashMap(pool, KEY_TYPES, new
DataType[] {});
+ assertThat(table.isHashSetMode()).isTrue();
+
+ K[] keys = generateRandomKeys(NUM_ENTRIES);
+ verifyKeyInsert(keys, table);
+ verifyKeyPresent(keys, table);
+ table.free();
+ }
+
+ @Test
+ void testBuildAndRetrieve() throws Exception {
+
+ final int numMemSegments =
+ needNumMemSegments(
+ NUM_ENTRIES,
+ rowLength(RowType.of(VALUE_TYPES)),
+ rowLength(RowType.of(KEY_TYPES)),
+ PAGE_SIZE);
+ int memorySize = numMemSegments * PAGE_SIZE;
+ MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize,
PAGE_SIZE);
+
+ BytesHashMap<K> table = createBytesHashMap(pool, KEY_TYPES,
VALUE_TYPES);
+
+ K[] keys = generateRandomKeys(NUM_ENTRIES);
+ List<BinaryRow> expected = new ArrayList<>(NUM_ENTRIES);
+ verifyInsert(keys, expected, table);
+ verifyRetrieve(table, keys, expected);
+ table.free();
+ }
+
+ @Test
+ void testBuildAndUpdate() throws Exception {
+ final int numMemSegments =
+ needNumMemSegments(
+ NUM_ENTRIES,
+ rowLength(RowType.of(VALUE_TYPES)),
+ rowLength(RowType.of(KEY_TYPES)),
+ PAGE_SIZE);
+ int memorySize = numMemSegments * PAGE_SIZE;
+ MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize,
PAGE_SIZE);
+
+ BytesHashMap<K> table = createBytesHashMap(pool, KEY_TYPES,
VALUE_TYPES);
+
+ K[] keys = generateRandomKeys(NUM_ENTRIES);
+ List<BinaryRow> expected = new ArrayList<>(NUM_ENTRIES);
+ verifyInsertAndUpdate(keys, expected, table);
+ verifyRetrieve(table, keys, expected);
+ table.free();
+ }
+
+ @Test
+ void testRest() throws Exception {
+ final int numMemSegments =
+ needNumMemSegments(
+ NUM_ENTRIES,
+ rowLength(RowType.of(VALUE_TYPES)),
+ rowLength(RowType.of(KEY_TYPES)),
+ PAGE_SIZE);
+
+ int memorySize = numMemSegments * PAGE_SIZE;
+
+ MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize,
PAGE_SIZE);
+
+ BytesHashMap<K> table = createBytesHashMap(pool, KEY_TYPES,
VALUE_TYPES);
+
+ final K[] keys = generateRandomKeys(NUM_ENTRIES);
+ List<BinaryRow> expected = new ArrayList<>(NUM_ENTRIES);
+ verifyInsertAndUpdate(keys, expected, table);
+ verifyRetrieve(table, keys, expected);
+
+ table.reset();
+ assertThat(table.getNumElements()).isEqualTo(0);
+ assertThat(table.getRecordAreaMemorySegments()).hasSize(1);
+
+ expected.clear();
+ verifyInsertAndUpdate(keys, expected, table);
+ verifyRetrieve(table, keys, expected);
+ table.free();
+ }
+
+ @Test
+ void testResetAndOutput() throws Exception {
+ final Random rnd = new Random(RANDOM_SEED);
+ final int reservedMemSegments = 64;
+
+ int minMemorySize = reservedMemSegments * PAGE_SIZE;
+ MemorySegmentPool pool = new HeapMemorySegmentPool(minMemorySize,
PAGE_SIZE);
+
+ BytesHashMap<K> table = createBytesHashMap(pool, KEY_TYPES,
VALUE_TYPES);
+
+ K[] keys = generateRandomKeys(NUM_ENTRIES);
+ List<BinaryRow> expected = new ArrayList<>(NUM_ENTRIES);
+ List<BinaryRow> actualValues = new ArrayList<>(NUM_ENTRIES);
+ List<K> actualKeys = new ArrayList<>(NUM_ENTRIES);
+ for (int i = 0; i < NUM_ENTRIES; i++) {
+ K groupKey = keys[i];
+ // look up and insert
+ BytesMap.LookupInfo<K, BinaryRow> lookupInfo =
table.lookup(groupKey);
+ assertThat(lookupInfo.isFound()).isFalse();
+ try {
+ BinaryRow entry = table.append(lookupInfo, defaultValue);
+ assertThat(entry).isNotNull();
+ // mock multiple updates
+ for (int j = 0; j < NUM_REWRITES; j++) {
+ updateOutputBuffer(entry, rnd);
+ }
+ expected.add(entry.copy());
+ } catch (Exception e) {
+ ArrayList<MemorySegment> segments =
table.getRecordAreaMemorySegments();
+ RandomAccessInputView inView =
+ new RandomAccessInputView(segments,
segments.get(0).size());
+ K reuseKey = keySerializer.createReuseInstance();
+ BinaryRow reuseValue = valueSerializer.createInstance();
+ for (int index = 0; index < table.getNumElements(); index++) {
+ reuseKey = keySerializer.mapFromPages(reuseKey, inView);
+ reuseValue = valueSerializer.mapFromPages(reuseValue,
inView);
+ actualKeys.add(keySerializer.copy(reuseKey));
+ actualValues.add(reuseValue.copy());
+ }
+ table.reset();
+ // retry
+ lookupInfo = table.lookup(groupKey);
+ BinaryRow entry = table.append(lookupInfo, defaultValue);
+ assertThat(entry).isNotNull();
+ // mock multiple updates
+ for (int j = 0; j < NUM_REWRITES; j++) {
+ updateOutputBuffer(entry, rnd);
+ }
+ expected.add(entry.copy());
+ }
+ }
+ KeyValueIterator<K, BinaryRow> iter = table.getEntryIterator(false);
+ while (iter.advanceNext()) {
+ actualKeys.add(keySerializer.copy(iter.getKey()));
+ actualValues.add(iter.getValue().copy());
+ }
+ assertThat(expected).hasSize(NUM_ENTRIES);
+ assertThat(actualKeys).hasSize(NUM_ENTRIES);
+ assertThat(actualValues).hasSize(NUM_ENTRIES);
+ assertThat(actualValues).isEqualTo(expected);
+ table.free();
+ }
+
+ @Test
+ void testSingleKeyMultipleOps() throws Exception {
+ final int numMemSegments =
+ needNumMemSegments(
+ NUM_ENTRIES,
+ rowLength(RowType.of(VALUE_TYPES)),
+ rowLength(RowType.of(KEY_TYPES)),
+ PAGE_SIZE);
+
+ int memorySize = numMemSegments * PAGE_SIZE;
+
+ MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize,
PAGE_SIZE);
+
+ BytesHashMap<K> table = createBytesHashMap(pool, KEY_TYPES,
VALUE_TYPES);
+ final K key = generateRandomKeys(1)[0];
+ for (int i = 0; i < 3; i++) {
+ BytesMap.LookupInfo<K, BinaryRow> lookupInfo = table.lookup(key);
+ assertThat(lookupInfo.isFound()).isFalse();
+ }
+
+ for (int i = 0; i < 3; i++) {
+ BytesMap.LookupInfo<K, BinaryRow> lookupInfo = table.lookup(key);
+ BinaryRow entry = lookupInfo.getValue();
+ if (i == 0) {
+ assertThat(lookupInfo.isFound()).isFalse();
+ entry = table.append(lookupInfo, defaultValue);
+ } else {
+ assertThat(lookupInfo.isFound()).isTrue();
+ }
+ assertThat(entry).isNotNull();
+ }
+ table.free();
+ }
+
+ // ----------------------------------------------
+ /** It will be codegened when in HashAggExec using rnd to mock
update/initExprs resultTerm. */
+ private void updateOutputBuffer(BinaryRow reuse, Random rnd) {
+ long longVal = rnd.nextLong();
+ double doubleVal = rnd.nextDouble();
+ boolean boolVal = longVal % 2 == 0;
+ reuse.setDouble(2, doubleVal);
+ reuse.setLong(3, longVal);
+ reuse.setBoolean(4, boolVal);
+ }
+
+ // ----------------------- Utilities -----------------------
+
+ private void verifyRetrieve(BytesHashMap<K> table, K[] keys,
List<BinaryRow> expected) {
+ assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES);
+ for (int i = 0; i < NUM_ENTRIES; i++) {
+ K groupKey = keys[i];
+ // look up and retrieve
+ BytesMap.LookupInfo<K, BinaryRow> lookupInfo =
table.lookup(groupKey);
+ assertThat(lookupInfo.isFound()).isTrue();
+ assertThat(lookupInfo.getValue()).isNotNull();
+ assertThat(lookupInfo.getValue()).isEqualTo(expected.get(i));
+ }
+ }
+
+ private void verifyInsert(K[] keys, List<BinaryRow> inserted,
BytesHashMap<K> table)
+ throws IOException {
+ for (int i = 0; i < NUM_ENTRIES; i++) {
+ K groupKey = keys[i];
+ // look up and insert
+ BytesMap.LookupInfo<K, BinaryRow> lookupInfo =
table.lookup(groupKey);
+ assertThat(lookupInfo.isFound()).isFalse();
+ BinaryRow entry = table.append(lookupInfo, defaultValue);
+ assertThat(entry).isNotNull();
+ assertThat(defaultValue).isEqualTo(entry);
+ inserted.add(entry.copy());
+ }
+ assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES);
+ }
+
+ private void verifyInsertAndUpdate(K[] keys, List<BinaryRow> inserted,
BytesHashMap<K> table)
+ throws IOException {
+ final Random rnd = new Random(RANDOM_SEED);
+ for (int i = 0; i < NUM_ENTRIES; i++) {
+ K groupKey = keys[i];
+ // look up and insert
+ BytesMap.LookupInfo<K, BinaryRow> lookupInfo =
table.lookup(groupKey);
+ assertThat(lookupInfo.isFound()).isFalse();
+ BinaryRow entry = table.append(lookupInfo, defaultValue);
+ assertThat(entry).isNotNull();
+ // mock multiple updates
+ for (int j = 0; j < NUM_REWRITES; j++) {
+ updateOutputBuffer(entry, rnd);
+ }
+ inserted.add(entry.copy());
+ }
+ assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES);
+ }
+
+ private void verifyKeyPresent(K[] keys, BytesHashMap<K> table) {
+ assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES);
+ BinaryRow present = new BinaryRow(0);
+ present.pointTo(MemorySegment.wrap(new byte[8]), 0, 8);
+ for (int i = 0; i < NUM_ENTRIES; i++) {
+ K groupKey = keys[i];
+ // look up and retrieve
+ BytesMap.LookupInfo<K, BinaryRow> lookupInfo =
table.lookup(groupKey);
+ assertThat(lookupInfo.isFound()).isTrue();
+ assertThat(lookupInfo.getValue()).isNotNull();
+ assertThat(lookupInfo.getValue()).isEqualTo(present);
+ }
+ }
+
+ private void verifyKeyInsert(K[] keys, BytesHashMap<K> table) throws
IOException {
+ BinaryRow present = new BinaryRow(0);
+ present.pointTo(MemorySegment.wrap(new byte[8]), 0, 8);
+ for (int i = 0; i < NUM_ENTRIES; i++) {
+ K groupKey = keys[i];
+ // look up and insert
+ BytesMap.LookupInfo<K, BinaryRow> lookupInfo =
table.lookup(groupKey);
+ assertThat(lookupInfo.isFound()).isFalse();
+ BinaryRow entry = table.append(lookupInfo, defaultValue);
+ assertThat(entry).isNotNull();
+ assertThat(present).isEqualTo(entry);
+ }
+ assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/hash/BytesMapTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/hash/BytesMapTestBase.java
new file mode 100644
index 000000000..acde6d9a1
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/hash/BytesMapTestBase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hash;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+
+import java.util.Random;
+
+/** Test case for {@link BytesMap}. */
+public class BytesMapTestBase {
+
+ protected static final long RANDOM_SEED = 76518743207143L;
+ protected static final int PAGE_SIZE = 32 * 1024;
+ protected static final int NUM_ENTRIES = 10000;
+
+ protected BinaryRow[] getRandomizedInputs(int num) {
+ final Random rnd = new Random(RANDOM_SEED);
+ return getRandomizedInputs(num, rnd, true);
+ }
+
+ protected BinaryRow[] getRandomizedInputs(int num, Random rnd, boolean
nullable) {
+ BinaryRow[] lists = new BinaryRow[num];
+ for (int i = 0; i < num; i++) {
+ int intVal = rnd.nextInt(Integer.MAX_VALUE);
+ long longVal = -rnd.nextLong();
+ boolean boolVal = longVal % 2 == 0;
+ String strVal = nullable && boolVal ? null : getString(intVal,
intVal % 1024) + i;
+ Double doubleVal = rnd.nextDouble();
+ Short shotVal = (short) intVal;
+ Float floatVal = nullable && boolVal ? null : rnd.nextFloat();
+ lists[i] = createRow(intVal, strVal, doubleVal, longVal, boolVal,
floatVal, shotVal);
+ }
+ return lists;
+ }
+
+ protected BinaryRow createRow(
+ Integer f0, String f1, Double f2, Long f3, Boolean f4, Float f5,
Short f6) {
+
+ BinaryRow row = new BinaryRow(7);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+
+ // int, string, double, long, boolean
+ if (f0 == null) {
+ writer.setNullAt(0);
+ } else {
+ writer.writeInt(0, f0);
+ }
+ if (f1 == null) {
+ writer.setNullAt(1);
+ } else {
+ writer.writeString(1, BinaryString.fromString(f1));
+ }
+ if (f2 == null) {
+ writer.setNullAt(2);
+ } else {
+ writer.writeDouble(2, f2);
+ }
+ if (f3 == null) {
+ writer.setNullAt(3);
+ } else {
+ writer.writeLong(3, f3);
+ }
+ if (f4 == null) {
+ writer.setNullAt(4);
+ } else {
+ writer.writeBoolean(4, f4);
+ }
+ if (f5 == null) {
+ writer.setNullAt(5);
+ } else {
+ writer.writeFloat(5, f5);
+ }
+ if (f6 == null) {
+ writer.setNullAt(6);
+ } else {
+ writer.writeShort(6, f6);
+ }
+ writer.complete();
+ return row;
+ }
+
+ protected int needNumMemSegments(int numEntries, int valLen, int keyLen,
int pageSize) {
+ return 2 * (valLen + keyLen + 1024 * 3 + 4 + 8 + 8) * numEntries /
pageSize;
+ }
+
+ protected int rowLength(RowType tpe) {
+ return BinaryRow.calculateFixPartSizeInBytes(tpe.getFieldCount())
+ +
BytesHashMap.getVariableLength(tpe.getFieldTypes().toArray(new DataType[0]));
+ }
+
+ private String getString(int count, int length) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ builder.append(count);
+ }
+ return builder.toString();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index aba891e44..6931fe907 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -20,13 +20,17 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
-import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.mergetree.SortBufferWriteBuffer;
import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.mergetree.localmerge.HashMapLocalMerger;
+import org.apache.paimon.mergetree.localmerge.LocalMerger;
+import org.apache.paimon.mergetree.localmerge.SortBufferLocalMerger;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.TableSchema;
@@ -43,6 +47,7 @@ import
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -63,13 +68,10 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
private final boolean ignoreDelete;
private transient Projection keyProjection;
- private transient RecordComparator keyComparator;
- private transient long recordCount;
private transient RowKindGenerator rowKindGenerator;
- private transient MergeFunction<KeyValue> mergeFunction;
- private transient SortBufferWriteBuffer buffer;
+ private transient LocalMerger merger;
private transient long currentWatermark;
private transient boolean endOfInput;
@@ -87,17 +89,14 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
public void open() throws Exception {
super.open();
- RowType keyType =
PrimaryKeyTableUtils.addKeyNamePrefix(schema.logicalPrimaryKeysType());
+ List<String> primaryKeys = schema.primaryKeys();
RowType valueType = schema.logicalRowType();
CoreOptions options = new CoreOptions(schema.options());
- keyProjection =
- CodeGenUtils.newProjection(valueType,
schema.projection(schema.primaryKeys()));
- keyComparator = new KeyComparatorSupplier(keyType).get();
+ keyProjection = CodeGenUtils.newProjection(valueType,
schema.projection(primaryKeys));
- recordCount = 0;
rowKindGenerator = RowKindGenerator.create(schema, options);
- mergeFunction =
+ MergeFunction<KeyValue> mergeFunction =
PrimaryKeyTableUtils.createMergeFunctionFactory(
schema,
new KeyValueFieldsExtractor() {
@@ -117,26 +116,51 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
})
.create();
- buffer =
- new SortBufferWriteBuffer(
- keyType,
- valueType,
- UserDefinedSeqComparator.create(valueType, options),
- new HeapMemorySegmentPool(
- options.localMergeBufferSize(),
options.pageSize()),
- false,
- MemorySize.MAX_VALUE,
- options.localSortMaxNumFileHandles(),
- options.spillCompressOptions(),
- null);
- currentWatermark = Long.MIN_VALUE;
+ boolean canHashMerger = true;
+ for (DataField field : valueType.getFields()) {
+ if (primaryKeys.contains(field.name())) {
+ continue;
+ }
+
+ if (!BinaryRow.isInFixedLengthPart(field.type())) {
+ canHashMerger = false;
+ break;
+ }
+ }
+
+ HeapMemorySegmentPool pool =
+ new HeapMemorySegmentPool(options.localMergeBufferSize(),
options.pageSize());
+ UserDefinedSeqComparator udsComparator =
+ UserDefinedSeqComparator.create(valueType, options);
+ if (canHashMerger) {
+ merger =
+ new HashMapLocalMerger(
+ valueType, primaryKeys, pool, mergeFunction,
udsComparator);
+ } else {
+ RowType keyType =
+
PrimaryKeyTableUtils.addKeyNamePrefix(schema.logicalPrimaryKeysType());
+ SortBufferWriteBuffer sortBuffer =
+ new SortBufferWriteBuffer(
+ keyType,
+ valueType,
+ udsComparator,
+ pool,
+ false,
+ MemorySize.MAX_VALUE,
+ options.localSortMaxNumFileHandles(),
+ options.spillCompressOptions(),
+ null);
+ merger =
+ new SortBufferLocalMerger(
+ sortBuffer, new
KeyComparatorSupplier(keyType).get(), mergeFunction);
+ }
+ currentWatermark = Long.MIN_VALUE;
endOfInput = false;
}
@Override
public void processElement(StreamRecord<InternalRow> record) throws
Exception {
- recordCount++;
InternalRow row = record.getValue();
RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
@@ -147,10 +171,10 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
// row kind must be INSERT when it is divided into key and value
row.setRowKind(RowKind.INSERT);
- InternalRow key = keyProjection.apply(row);
- if (!buffer.put(recordCount, rowKind, key, row)) {
+ BinaryRow key = keyProjection.apply(row);
+ if (!merger.put(rowKind, key, row)) {
flushBuffer();
- if (!buffer.put(recordCount, rowKind, key, row)) {
+ if (!merger.put(rowKind, key, row)) {
// change row kind back
row.setRowKind(rowKind);
output.collect(record);
@@ -180,28 +204,20 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
@Override
public void close() throws Exception {
- if (buffer != null) {
- buffer.clear();
+ if (merger != null) {
+ merger.clear();
}
super.close();
}
private void flushBuffer() throws Exception {
- if (buffer.size() == 0) {
+ if (merger.size() == 0) {
return;
}
- buffer.forEach(
- keyComparator,
- mergeFunction,
- null,
- kv -> {
- InternalRow row = kv.value();
- row.setRowKind(kv.valueKind());
- output.collect(new StreamRecord<>(row));
- });
- buffer.clear();
+ merger.forEach(row -> output.collect(new StreamRecord<>(row)));
+ merger.clear();
if (currentWatermark != Long.MIN_VALUE) {
super.processWatermark(new Watermark(currentWatermark));
@@ -209,4 +225,14 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
currentWatermark = Long.MIN_VALUE;
}
}
+
+ @VisibleForTesting
+ LocalMerger merger() {
+ return merger;
+ }
+
+ @VisibleForTesting
+ void setOutput(Output<StreamRecord<InternalRow>> output) {
+ this.output = output;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
index e44aa4c63..dc7bf397e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
@@ -95,7 +95,7 @@ public class FirstRowITCase extends CatalogITCaseBase {
sql(
"CREATE TABLE IF NOT EXISTS T1 ("
+ "a INT, b INT, c STRING, PRIMARY KEY (a, b) NOT
ENFORCED)"
- + " PARTITIONED BY (b) WITH
('merge-engine'='first-row', 'local-merge-buffer-size' = '1m',"
+ + " PARTITIONED BY (b) WITH
('merge-engine'='first-row', 'local-merge-buffer-size' = '5m',"
+ " 'file.format'='avro', 'changelog-producer' =
'lookup');");
batchSql("INSERT INTO T1 VALUES (1, 1, '1'), (1, 1, '2'), (2, 3,
'3')");
List<Row> result = batchSql("SELECT * FROM T1");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 0bedbfd02..76ee8309e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -351,7 +351,7 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
+ "d INT,"
+ "PRIMARY KEY (k, d) NOT ENFORCED) PARTITIONED BY (d)
"
+ " WITH ('merge-engine'='partial-update', "
- + "'local-merge-buffer-size'='1m'"
+ + "'local-merge-buffer-size'='5m'"
+ ");");
sql("INSERT INTO T1 VALUES (1, CAST(NULL AS INT), 1), (2, 1, 1), (1,
2, 1)");
@@ -588,7 +588,7 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
+ " 'changelog-producer' = 'lookup'"
+ ")");
if (localMerge) {
- sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' =
'256 kb')");
+ sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' =
'5m')");
}
sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING),
'apple')");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index e4c90695b..b8dfd8f6a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -1172,7 +1172,7 @@ public class PreAggregationITCase {
+ "PRIMARY KEY (k, d) NOT ENFORCED) PARTITIONED BY
(d) "
+ " WITH ('merge-engine'='aggregation', "
+ "'fields.v.aggregate-function'='sum',"
- + "'local-merge-buffer-size'='1m'"
+ + "'local-merge-buffer-size'='5m'"
+ ");");
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 027eada92..3fa95edb8 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -72,7 +72,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
ThreadLocalRandom random = ThreadLocalRandom.current();
tableDefaultProperties = new HashMap<>();
if (random.nextBoolean()) {
-
tableDefaultProperties.put(CoreOptions.LOCAL_MERGE_BUFFER_SIZE.key(), "256 kb");
+
tableDefaultProperties.put(CoreOptions.LOCAL_MERGE_BUFFER_SIZE.key(), "5m");
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
new file mode 100644
index 000000000..1162e20b1
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.mergetree.localmerge.HashMapLocalMerger;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.util.OutputTag;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.function.Consumer;
+
+import static org.apache.paimon.CoreOptions.LOCAL_MERGE_BUFFER_SIZE;
+import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD;
+import static org.apache.paimon.data.BinaryString.fromString;
+import static org.apache.paimon.types.RowKind.DELETE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class LocalMergeOperatorTest {
+
+ private LocalMergeOperator operator;
+
+ @Test
+ public void testHashNormal() throws Exception {
+ prepareHashOperator();
+ List<String> result = new ArrayList<>();
+ setOutput(result);
+
+ // first test
+ processElement("a", 1);
+ processElement("b", 1);
+ processElement("a", 2);
+ processElement(DELETE, "b", 2);
+ operator.prepareSnapshotPreBarrier(0);
+ assertThat(result).containsExactlyInAnyOrder("+I:a->2", "-D:b->2");
+ result.clear();
+
+ // second test
+ processElement("c", 1);
+ processElement("d", 1);
+ operator.prepareSnapshotPreBarrier(0);
+ assertThat(result).containsExactlyInAnyOrder("+I:c->1", "+I:d->1");
+ result.clear();
+
+ // large records
+ Map<String, String> expected = new HashMap<>();
+ Random rnd = new Random();
+ int records = 10_000;
+ for (int i = 0; i < records; i++) {
+ String key = rnd.nextInt(records) + "";
+ expected.put(key, "+I:" + key + "->" + i);
+ processElement(key, i);
+ }
+
+ operator.prepareSnapshotPreBarrier(0);
+
assertThat(result).containsExactlyInAnyOrderElementsOf(expected.values());
+ result.clear();
+ }
+
+ @Test
+ public void testUserDefineSequence() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(SEQUENCE_FIELD.key(), "f1");
+ prepareHashOperator(options);
+
+ List<String> result = new ArrayList<>();
+ setOutput(result);
+
+ processElement("a", 2);
+ processElement("b", 1);
+ processElement("a", 1);
+ operator.prepareSnapshotPreBarrier(0);
+ assertThat(result).containsExactlyInAnyOrder("+I:a->2", "+I:b->1");
+ result.clear();
+ }
+
+ @Test
+ public void testHashSpill() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(LOCAL_MERGE_BUFFER_SIZE.key(), "2 m");
+ prepareHashOperator(options);
+ List<String> result = new ArrayList<>();
+ setOutput(result);
+
+ Map<String, String> expected = new HashMap<>();
+ for (int i = 0; i < 30_000; i++) {
+ String key = i + "";
+ expected.put(key, "+I:" + key + "->" + i);
+ processElement(key, i);
+ }
+
+ operator.prepareSnapshotPreBarrier(0);
+
assertThat(result).containsExactlyInAnyOrderElementsOf(expected.values());
+ result.clear();
+ }
+
+ private void prepareHashOperator() throws Exception {
+ prepareHashOperator(new HashMap<>());
+ }
+
+ private void prepareHashOperator(Map<String, String> options) throws
Exception {
+ if (!options.containsKey(LOCAL_MERGE_BUFFER_SIZE.key())) {
+ options.put(LOCAL_MERGE_BUFFER_SIZE.key(), "10 m");
+ }
+ RowType rowType =
+ RowType.of(
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.INT());
+ TableSchema schema =
+ new TableSchema(
+ 0L,
+ rowType.getFields(),
+ rowType.getFieldCount(),
+ Collections.emptyList(),
+ Collections.singletonList("f0"),
+ options,
+ null);
+ operator = new LocalMergeOperator(schema);
+ operator.open();
+ assertThat(operator.merger()).isInstanceOf(HashMapLocalMerger.class);
+ }
+
+ private void setOutput(List<String> result) {
+ operator.setOutput(
+ new TestOutput(
+ row ->
+ result.add(
+ row.getRowKind().shortString()
+ + ":"
+ + row.getString(0)
+ + "->"
+ + row.getInt(1))));
+ }
+
+ private void processElement(String key, int value) throws Exception {
+ processElement(RowKind.INSERT, key, value);
+ }
+
+ private void processElement(RowKind rowKind, String key, int value) throws
Exception {
+ operator.processElement(
+ new StreamRecord<>(
+ GenericRow.ofKind(rowKind, fromString(key), value,
value, value, value)));
+ }
+
+ private static class TestOutput implements
Output<StreamRecord<InternalRow>> {
+
+ private final Consumer<InternalRow> consumer;
+
+ private TestOutput(Consumer<InternalRow> consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {}
+
+ @Override
+ public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}
+
+ @Override
+ public <X> void collect(OutputTag<X> outputTag, StreamRecord<X>
record) {}
+
+ @Override
+ public void emitLatencyMarker(LatencyMarker latencyMarker) {}
+
+ @Override
+ public void emitRecordAttributes(RecordAttributes recordAttributes) {}
+
+ @Override
+ public void collect(StreamRecord<InternalRow> record) {
+ consumer.accept(record.getValue());
+ }
+
+ @Override
+ public void close() {}
+ }
+}