This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 385be6551 [core][flink] Introduce HashMapLocalMerger to 
'local-merge-buffer-size' (#4492)
385be6551 is described below

commit 385be6551ed7a77e5cee42989b275e46184ab2eb
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Nov 11 19:45:13 2024 +0800

    [core][flink] Introduce HashMapLocalMerger to 'local-merge-buffer-size' 
(#4492)
---
 .../java/org/apache/paimon/data/BinaryRow.java     |  32 ++
 .../java/org/apache/paimon/data/InternalRow.java   |  94 +++++
 .../data/serializer/BinaryRowSerializer.java       |   5 +
 .../data/serializer/InternalRowSerializer.java     |   5 +
 .../data/serializer/PagedTypeSerializer.java       |   3 +
 .../java/org/apache/paimon/hash/BytesHashMap.java  | 362 ++++++++++++++++++++
 .../main/java/org/apache/paimon/hash/BytesMap.java | 379 +++++++++++++++++++++
 .../mergetree/localmerge/HashMapLocalMerger.java   | 133 ++++++++
 .../paimon/mergetree/localmerge/LocalMerger.java   |  38 +++
 .../localmerge/SortBufferLocalMerger.java          |  78 +++++
 .../org/apache/paimon/hash/BytesHashMapTest.java   |  44 +++
 .../apache/paimon/hash/BytesHashMapTestBase.java   | 373 ++++++++++++++++++++
 .../org/apache/paimon/hash/BytesMapTestBase.java   | 118 +++++++
 .../paimon/flink/sink/LocalMergeOperator.java      | 108 +++---
 .../org/apache/paimon/flink/FirstRowITCase.java    |   2 +-
 .../apache/paimon/flink/PartialUpdateITCase.java   |   4 +-
 .../apache/paimon/flink/PreAggregationITCase.java  |   2 +-
 .../flink/PrimaryKeyFileStoreTableITCase.java      |   2 +-
 .../paimon/flink/sink/LocalMergeOperatorTest.java  | 212 ++++++++++++
 19 files changed, 1948 insertions(+), 46 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
index 7068e25e6..d08c580be 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryRow.java
@@ -21,7 +21,11 @@ package org.apache.paimon.data;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.memory.MemorySegmentUtils;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.TimestampType;
 
 import javax.annotation.Nullable;
 
@@ -442,4 +446,32 @@ public final class BinaryRow extends BinarySection 
implements InternalRow, DataS
         writer.complete();
         return row;
     }
+
+    /**
+     * If it is a fixed-length field, we can call this BinaryRowData's setXX 
method for in-place
+     * updates. If it is variable-length field, can't use this method, because 
the underlying data
+     * is stored continuously.
+     */
+    public static boolean isInFixedLengthPart(DataType type) {
+        switch (type.getTypeRoot()) {
+            case BOOLEAN:
+            case TINYINT:
+            case SMALLINT:
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+                return true;
+            case DECIMAL:
+                return Decimal.isCompact(((DecimalType) type).getPrecision());
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return Timestamp.isCompact(((TimestampType) 
type).getPrecision());
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return Timestamp.isCompact(((LocalZonedTimestampType) 
type).getPrecision());
+            default:
+                return false;
+        }
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java 
b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java
index 4c4f3f978..a83cbaec7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/InternalRow.java
@@ -244,4 +244,98 @@ public interface InternalRow extends DataGetters {
         @Nullable
         Object getFieldOrNull(InternalRow row);
     }
+
+    /**
+     * Creates a {@link FieldSetter} for setting elements to a row from a row 
at the given position.
+     *
+     * @param fieldType the element type of the row
+     * @param fieldPos the element position of the row
+     */
+    static FieldSetter createFieldSetter(DataType fieldType, int fieldPos) {
+        final FieldSetter fieldSetter;
+        // ordered by type root definition
+        switch (fieldType.getTypeRoot()) {
+            case BOOLEAN:
+                fieldSetter = (from, to) -> to.setBoolean(fieldPos, 
from.getBoolean(fieldPos));
+                break;
+            case DECIMAL:
+                final int decimalPrecision = getPrecision(fieldType);
+                final int decimalScale = getScale(fieldType);
+                fieldSetter =
+                        (from, to) ->
+                                to.setDecimal(
+                                        fieldPos,
+                                        from.getDecimal(fieldPos, 
decimalPrecision, decimalScale),
+                                        decimalPrecision);
+                if (fieldType.isNullable() && 
!Decimal.isCompact(decimalPrecision)) {
+                    return (from, to) -> {
+                        if (from.isNullAt(fieldPos)) {
+                            to.setNullAt(fieldPos);
+                            to.setDecimal(fieldPos, null, decimalPrecision);
+                        } else {
+                            fieldSetter.setFieldFrom(from, to);
+                        }
+                    };
+                }
+                break;
+            case TINYINT:
+                fieldSetter = (from, to) -> to.setByte(fieldPos, 
from.getByte(fieldPos));
+                break;
+            case SMALLINT:
+                fieldSetter = (from, to) -> to.setShort(fieldPos, 
from.getShort(fieldPos));
+                break;
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+                fieldSetter = (from, to) -> to.setInt(fieldPos, 
from.getInt(fieldPos));
+                break;
+            case BIGINT:
+                fieldSetter = (from, to) -> to.setLong(fieldPos, 
from.getLong(fieldPos));
+                break;
+            case FLOAT:
+                fieldSetter = (from, to) -> to.setFloat(fieldPos, 
from.getFloat(fieldPos));
+                break;
+            case DOUBLE:
+                fieldSetter = (from, to) -> to.setDouble(fieldPos, 
from.getDouble(fieldPos));
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final int timestampPrecision = getPrecision(fieldType);
+                fieldSetter =
+                        (from, to) ->
+                                to.setTimestamp(
+                                        fieldPos,
+                                        from.getTimestamp(fieldPos, 
timestampPrecision),
+                                        timestampPrecision);
+                if (fieldType.isNullable() && 
!Timestamp.isCompact(timestampPrecision)) {
+                    return (from, to) -> {
+                        if (from.isNullAt(fieldPos)) {
+                            to.setNullAt(fieldPos);
+                            to.setTimestamp(fieldPos, null, 
timestampPrecision);
+                        } else {
+                            fieldSetter.setFieldFrom(from, to);
+                        }
+                    };
+                }
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("type %s not support for setting", 
fieldType));
+        }
+        if (!fieldType.isNullable()) {
+            return fieldSetter;
+        }
+        return (from, to) -> {
+            if (from.isNullAt(fieldPos)) {
+                to.setNullAt(fieldPos);
+            } else {
+                fieldSetter.setFieldFrom(from, to);
+            }
+        };
+    }
+
+    /** Accessor for setting the field of a row during runtime. */
+    interface FieldSetter extends Serializable {
+        void setFieldFrom(DataGetters from, DataSetters to);
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
index e5f3c9e7d..8773e734d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
@@ -107,6 +107,11 @@ public class BinaryRowSerializer extends 
AbstractRowDataSerializer<BinaryRow> {
 
     // ============================ Page related operations 
===================================
 
+    @Override
+    public BinaryRow createReuseInstance() {
+        return new BinaryRow(numFields);
+    }
+
     @Override
     public int serializeToPages(BinaryRow record, AbstractPagedOutputView 
headerLessView)
             throws IOException {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index 8a32a222c..ac8cc34e0 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -167,6 +167,11 @@ public class InternalRowSerializer extends 
AbstractRowDataSerializer<InternalRow
         return reuseRow;
     }
 
+    @Override
+    public InternalRow createReuseInstance() {
+        return binarySerializer.createReuseInstance();
+    }
+
     @Override
     public int serializeToPages(InternalRow row, AbstractPagedOutputView 
target)
             throws IOException {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/PagedTypeSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/PagedTypeSerializer.java
index f916d01b5..ede6c9b10 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/PagedTypeSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/PagedTypeSerializer.java
@@ -27,6 +27,9 @@ import java.io.IOException;
 /** A type serializer which provides paged serialize and deserialize methods. 
*/
 public interface PagedTypeSerializer<T> extends Serializer<T> {
 
+    /** Creates a new instance for reusing. */
+    T createReuseInstance();
+
     /**
      * Serializes the given record to the given target paged output view. Some 
implementations may
      * skip some bytes if current page does not have enough space left, .e.g 
{@link BinaryRow}.
diff --git a/paimon-core/src/main/java/org/apache/paimon/hash/BytesHashMap.java 
b/paimon-core/src/main/java/org/apache/paimon/hash/BytesHashMap.java
new file mode 100644
index 000000000..d739289e6
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/hash/BytesHashMap.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hash;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.AbstractPagedInputView;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.RandomAccessInputView;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.PagedTypeSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.KeyValueIterator;
+import org.apache.paimon.utils.MathUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Bytes based hash map. It can be used for performing aggregations where the 
aggregated values are
+ * fixed-width, because the data is stored in continuous memory, AggBuffer of 
variable length cannot
+ * be applied to this HashMap. The KeyValue form in hash map is designed to 
reduce the cost of key
+ * fetching in lookup. The memory is divided into two areas:
+ *
+ * <p>Bucket area: pointer + hashcode.
+ *
+ * <ul>
+ *   <li>Bytes 0 to 4: a pointer to the record in the record area
+ *   <li>Bytes 4 to 8: key's full 32-bit hashcode
+ * </ul>
+ *
+ * <p>Record area: the actual data in linked list records, a record has four 
parts:
+ *
+ * <ul>
+ *   <li>Bytes 0 to 4: len(k)
+ *   <li>Bytes 4 to 4 + len(k): key data
+ *   <li>Bytes 4 + len(k) to 8 + len(k): len(v)
+ *   <li>Bytes 8 + len(k) to 8 + len(k) + len(v): value data
+ * </ul>
+ *
+ * <p>{@code BytesHashMap} are influenced by Apache Spark BytesToBytesMap.
+ */
+public class BytesHashMap<K> extends BytesMap<K, BinaryRow> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BytesHashMap.class);
+
+    /**
+     * Set true when valueTypeInfos.length == 0. Usually in this case the 
BytesHashMap will be used
+     * as a HashSet. The value from {@link BytesHashMap#append(LookupInfo 
info, BinaryRow value)}
+     * will be ignored when hashSetMode set. The reusedValue will always point 
to a 16 bytes long
+     * MemorySegment acted as each BytesHashMap entry's value part when 
appended to make the
+     * BytesHashMap's spilling work compatible.
+     */
+    private final boolean hashSetMode;
+
+    /** Used to serialize map key into RecordArea's MemorySegments. */
+    protected final PagedTypeSerializer<K> keySerializer;
+
+    /** Used to serialize hash map value into RecordArea's MemorySegments. */
+    private final BinaryRowSerializer valueSerializer;
+
+    private volatile RecordArea.EntryIterator destructiveIterator = null;
+
+    public BytesHashMap(
+            MemorySegmentPool memoryPool, PagedTypeSerializer<K> 
keySerializer, int valueArity) {
+        super(memoryPool, keySerializer);
+
+        this.recordArea = new RecordArea();
+
+        this.keySerializer = keySerializer;
+        this.valueSerializer = new BinaryRowSerializer(valueArity);
+        if (valueArity == 0) {
+            this.hashSetMode = true;
+            this.reusedValue = new BinaryRow(0);
+            this.reusedValue.pointTo(MemorySegment.wrap(new byte[8]), 0, 8);
+            LOG.info("BytesHashMap with hashSetMode = true.");
+        } else {
+            this.hashSetMode = false;
+            this.reusedValue = this.valueSerializer.createInstance();
+        }
+
+        final int initBucketSegmentNum =
+                MathUtils.roundDownToPowerOf2((int) 
(INIT_BUCKET_MEMORY_IN_BYTES / segmentSize));
+
+        // allocate and initialize MemorySegments for bucket area
+        initBucketSegments(initBucketSegmentNum);
+
+        LOG.info(
+                "BytesHashMap with initial memory segments {}, {} in bytes, 
init allocating {} for bucket area.",
+                reservedNumBuffers,
+                reservedNumBuffers * segmentSize,
+                initBucketSegmentNum);
+    }
+
+    // ----------------------- Abstract Interface -----------------------
+
+    @Override
+    public long getNumKeys() {
+        return numElements;
+    }
+
+    // ----------------------- Public interface -----------------------
+
+    /**
+     * Append an value into the hash map's record area.
+     *
+     * @return An BinaryRow mapping to the memory segments in the map's record 
area belonging to the
+     *     newly appended value.
+     * @throws EOFException if the map can't allocate much more memory.
+     */
+    public BinaryRow append(LookupInfo<K, BinaryRow> lookupInfo, BinaryRow 
value)
+            throws IOException {
+        try {
+            if (numElements >= growthThreshold) {
+                growAndRehash();
+                // update info's bucketSegmentIndex and bucketOffset
+                lookup(lookupInfo.key);
+            }
+            BinaryRow toAppend = hashSetMode ? reusedValue : value;
+            int pointerToAppended = recordArea.appendRecord(lookupInfo, 
toAppend);
+            bucketSegments
+                    .get(lookupInfo.bucketSegmentIndex)
+                    .putInt(lookupInfo.bucketOffset, pointerToAppended);
+            bucketSegments
+                    .get(lookupInfo.bucketSegmentIndex)
+                    .putInt(lookupInfo.bucketOffset + ELEMENT_POINT_LENGTH, 
lookupInfo.keyHashCode);
+            numElements++;
+            recordArea.setReadPosition(pointerToAppended);
+            ((RecordArea) recordArea).skipKey();
+            return recordArea.readValue(reusedValue);
+        } catch (EOFException e) {
+            numSpillFiles++;
+            spillInBytes += recordArea.getSegmentsSize();
+            throw e;
+        }
+    }
+
+    public long getNumSpillFiles() {
+        return numSpillFiles;
+    }
+
+    public long getUsedMemoryInBytes() {
+        return bucketSegments.size() * ((long) segmentSize) + 
recordArea.getSegmentsSize();
+    }
+
+    public long getSpillInBytes() {
+        return spillInBytes;
+    }
+
+    public int getNumElements() {
+        return numElements;
+    }
+
+    /** Returns an iterator for iterating over the entries of this map. */
+    @SuppressWarnings("WeakerAccess")
+    public KeyValueIterator<K, BinaryRow> getEntryIterator(boolean 
requiresCopy) {
+        if (destructiveIterator != null) {
+            throw new IllegalArgumentException(
+                    "DestructiveIterator is not null, so this method can't be 
invoke!");
+        }
+        return ((RecordArea) recordArea).entryIterator(requiresCopy);
+    }
+
+    /** @return the underlying memory segments of the hash map's record area */
+    @SuppressWarnings("WeakerAccess")
+    public ArrayList<MemorySegment> getRecordAreaMemorySegments() {
+        return ((RecordArea) recordArea).segments;
+    }
+
+    @SuppressWarnings("WeakerAccess")
+    public List<MemorySegment> getBucketAreaMemorySegments() {
+        return bucketSegments;
+    }
+
+    public void free() {
+        recordArea.release();
+        destructiveIterator = null;
+        super.free();
+    }
+
+    /** reset the map's record and bucket area's memory segments for reusing. 
*/
+    public void reset() {
+        // reset the record segments.
+        recordArea.reset();
+        destructiveIterator = null;
+        super.reset();
+    }
+
+    /**
+     * @return true when BytesHashMap's valueTypeInfos.length == 0. Any 
appended value will be
+     *     ignored and replaced with a reusedValue as a present tag.
+     */
+    @VisibleForTesting
+    boolean isHashSetMode() {
+        return hashSetMode;
+    }
+
+    // ----------------------- Private methods -----------------------
+
+    static int getVariableLength(DataType[] types) {
+        int length = 0;
+        for (DataType type : types) {
+            if (!BinaryRow.isInFixedLengthPart(type)) {
+                // find a better way of computing generic type field 
variable-length
+                // right now we use a small value assumption
+                length += 16;
+            }
+        }
+        return length;
+    }
+
+    // ----------------------- Record Area -----------------------
+
+    private final class RecordArea implements BytesMap.RecordArea<K, 
BinaryRow> {
+        private final ArrayList<MemorySegment> segments = new ArrayList<>();
+
+        private final RandomAccessInputView inView;
+        private final SimpleCollectingOutputView outView;
+
+        RecordArea() {
+            this.outView = new SimpleCollectingOutputView(segments, 
memoryPool, segmentSize);
+            this.inView = new RandomAccessInputView(segments, segmentSize);
+        }
+
+        public void release() {
+            returnSegments(segments);
+            segments.clear();
+        }
+
+        public void reset() {
+            release();
+            // request a new memory segment from freeMemorySegments
+            // reset segmentNum and positionInSegment
+            outView.reset();
+            inView.setReadPosition(0);
+        }
+
+        // ----------------------- Append -----------------------
+        public int appendRecord(LookupInfo<K, BinaryRow> lookupInfo, BinaryRow 
value)
+                throws IOException {
+            final long oldLastPosition = outView.getCurrentOffset();
+            // serialize the key into the BytesHashMap record area
+            int skip = keySerializer.serializeToPages(lookupInfo.getKey(), 
outView);
+            long offset = oldLastPosition + skip;
+
+            // serialize the value into the BytesHashMap record area
+            valueSerializer.serializeToPages(value, outView);
+            if (offset > Integer.MAX_VALUE) {
+                LOG.warn(
+                        "We can't handle key area with more than 
Integer.MAX_VALUE bytes,"
+                                + " because the pointer is a integer.");
+                throw new EOFException();
+            }
+            return (int) offset;
+        }
+
+        @Override
+        public long getSegmentsSize() {
+            return segments.size() * ((long) segmentSize);
+        }
+
+        // ----------------------- Read -----------------------
+        public void setReadPosition(int position) {
+            inView.setReadPosition(position);
+        }
+
+        public boolean readKeyAndEquals(K lookupKey) throws IOException {
+            reusedKey = keySerializer.mapFromPages(reusedKey, inView);
+            return lookupKey.equals(reusedKey);
+        }
+
+        /** @throws IOException when invalid memory address visited. */
+        void skipKey() throws IOException {
+            keySerializer.skipRecordFromPages(inView);
+        }
+
+        public BinaryRow readValue(BinaryRow reuse) throws IOException {
+            // depends on BinaryRowSerializer to check writing skip
+            // and to find the real start offset of the data
+            return valueSerializer.mapFromPages(reuse, inView);
+        }
+
+        // ----------------------- Iterator -----------------------
+
+        private KeyValueIterator<K, BinaryRow> entryIterator(boolean 
requiresCopy) {
+            return new EntryIterator(requiresCopy);
+        }
+
+        private final class EntryIterator extends AbstractPagedInputView
+                implements KeyValueIterator<K, BinaryRow> {
+
+            private int count = 0;
+            private int currentSegmentIndex = 0;
+            private final boolean requiresCopy;
+
+            private EntryIterator(boolean requiresCopy) {
+                super(segments.get(0), segmentSize);
+                destructiveIterator = this;
+                this.requiresCopy = requiresCopy;
+            }
+
+            @Override
+            public boolean advanceNext() throws IOException {
+                if (count < numElements) {
+                    count++;
+                    // segment already is useless any more.
+                    keySerializer.mapFromPages(reusedKey, this);
+                    valueSerializer.mapFromPages(reusedValue, this);
+                    return true;
+                }
+                return false;
+            }
+
+            @Override
+            public K getKey() {
+                return requiresCopy ? keySerializer.copy(reusedKey) : 
reusedKey;
+            }
+
+            @Override
+            public BinaryRow getValue() {
+                return requiresCopy ? reusedValue.copy() : reusedValue;
+            }
+
+            public boolean hasNext() {
+                return count < numElements;
+            }
+
+            @Override
+            protected int getLimitForSegment(MemorySegment segment) {
+                return segmentSize;
+            }
+
+            @Override
+            protected MemorySegment nextSegment(MemorySegment current) {
+                return segments.get(++currentSegmentIndex);
+            }
+        }
+    }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/hash/BytesMap.java 
b/paimon-core/src/main/java/org/apache/paimon/hash/BytesMap.java
new file mode 100644
index 000000000..215cfface
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/hash/BytesMap.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hash;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.PagedTypeSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.utils.MathUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base class for {@code BytesHashMap}.
+ *
+ * @param <K> type of the map key.
+ * @param <V> type of the map value.
+ */
+public abstract class BytesMap<K, V> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(BytesMap.class);
+
+    public static final int BUCKET_SIZE = 8;
+    protected static final int END_OF_LIST = Integer.MAX_VALUE;
+    protected static final int STEP_INCREMENT = 1;
+    protected static final int ELEMENT_POINT_LENGTH = 4;
+    public static final int RECORD_EXTRA_LENGTH = 8;
+    protected static final int BUCKET_SIZE_BITS = 3;
+
+    protected final int numBucketsPerSegment;
+    protected final int numBucketsPerSegmentBits;
+    protected final int numBucketsPerSegmentMask;
+    protected final int lastBucketPosition;
+
+    protected final int segmentSize;
+    protected final MemorySegmentPool memoryPool;
+    protected List<MemorySegment> bucketSegments;
+
+    protected final int reservedNumBuffers;
+
+    protected int numElements = 0;
+    protected int numBucketsMask;
+    // get the second hashcode based log2NumBuckets and numBucketsMask2
+    protected int log2NumBuckets;
+    protected int numBucketsMask2;
+
+    protected static final double LOAD_FACTOR = 0.75;
+    // a smaller bucket can make the best of l1/l2/l3 cache.
+    protected static final long INIT_BUCKET_MEMORY_IN_BYTES = 1024 * 1024L;
+
+    /** The map will be expanded once the number of elements exceeds this 
threshold. */
+    protected int growthThreshold;
+
+    /** The segments where the actual data is stored. */
+    protected RecordArea<K, V> recordArea;
+
+    /** Used as a reused object when lookup and iteration. */
+    protected K reusedKey;
+
+    /** Used as a reused object when retrieve the map's value by key and 
iteration. */
+    protected V reusedValue;
+
+    /** Used as a reused object which lookup returned. */
+    private final LookupInfo<K, V> reuseLookupInfo;
+
+    // metric
+    protected long numSpillFiles;
+    protected long spillInBytes;
+
+    public BytesMap(MemorySegmentPool memoryPool, PagedTypeSerializer<K> 
keySerializer) {
+        this.memoryPool = memoryPool;
+        this.segmentSize = memoryPool.pageSize();
+        this.reservedNumBuffers = memoryPool.freePages();
+        this.numBucketsPerSegment = segmentSize / BUCKET_SIZE;
+        this.numBucketsPerSegmentBits = 
MathUtils.log2strict(this.numBucketsPerSegment);
+        this.numBucketsPerSegmentMask = (1 << this.numBucketsPerSegmentBits) - 
1;
+        this.lastBucketPosition = (numBucketsPerSegment - 1) * BUCKET_SIZE;
+
+        this.reusedKey = keySerializer.createReuseInstance();
+        this.reuseLookupInfo = new LookupInfo<>();
+    }
+
+    /** Returns the number of keys in this map. */
+    public abstract long getNumKeys();
+
+    protected void initBucketSegments(int numBucketSegments) {
+        if (numBucketSegments < 1) {
+            throw new RuntimeException("Too small memory allocated for 
BytesHashMap");
+        }
+        this.bucketSegments = new ArrayList<>(numBucketSegments);
+        for (int i = 0; i < numBucketSegments; i++) {
+            MemorySegment segment = memoryPool.nextSegment();
+            if (segment == null) {
+                throw new RuntimeException("Memory for hash map is too 
small.");
+            }
+            bucketSegments.add(i, segment);
+        }
+
+        resetBucketSegments(this.bucketSegments);
+        int numBuckets = numBucketSegments * numBucketsPerSegment;
+        this.log2NumBuckets = MathUtils.log2strict(numBuckets);
+        this.numBucketsMask = (1 << MathUtils.log2strict(numBuckets)) - 1;
+        this.numBucketsMask2 = (1 << MathUtils.log2strict(numBuckets >> 1)) - 
1;
+        this.growthThreshold = (int) (numBuckets * LOAD_FACTOR);
+    }
+
+    protected void resetBucketSegments(List<MemorySegment> resetBucketSegs) {
+        for (MemorySegment segment : resetBucketSegs) {
+            for (int j = 0; j <= lastBucketPosition; j += BUCKET_SIZE) {
+                segment.putInt(j, END_OF_LIST);
+            }
+        }
+    }
+
+    public long getNumSpillFiles() {
+        return numSpillFiles;
+    }
+
+    public long getSpillInBytes() {
+        return spillInBytes;
+    }
+
+    public int getNumElements() {
+        return numElements;
+    }
+
+    public void free() {
+        returnSegments(this.bucketSegments);
+        this.bucketSegments.clear();
+        numElements = 0;
+    }
+
+    /** reset the map's record and bucket area's memory segments for reusing. 
*/
+    public void reset() {
+        setBucketVariables(bucketSegments);
+        resetBucketSegments(bucketSegments);
+        numElements = 0;
+        LOG.debug(
+                "reset BytesHashMap with record memory segments {}, {} in 
bytes, init allocating {} for bucket area.",
+                memoryPool.freePages(),
+                memoryPool.freePages() * segmentSize,
+                bucketSegments.size());
+    }
+
+    /**
+     * @param key by which looking up the value in the hash map. Only support 
the key in the
+     *     BinaryRowData form who has only one MemorySegment.
+     * @return {@link LookupInfo}
+     */
+    public LookupInfo<K, V> lookup(K key) {
+        final int hashCode1 = key.hashCode();
+        int newPos = hashCode1 & numBucketsMask;
+        // which segment contains the bucket
+        int bucketSegmentIndex = newPos >>> numBucketsPerSegmentBits;
+        // offset of the bucket in the segment
+        int bucketOffset = (newPos & numBucketsPerSegmentMask) << 
BUCKET_SIZE_BITS;
+
+        boolean found = false;
+        int step = STEP_INCREMENT;
+        int hashCode2 = 0;
+        int findElementPtr;
+        try {
+            do {
+                findElementPtr = 
bucketSegments.get(bucketSegmentIndex).getInt(bucketOffset);
+                if (findElementPtr == END_OF_LIST) {
+                    // This is a new key.
+                    break;
+                } else {
+                    final int storedHashCode =
+                            bucketSegments
+                                    .get(bucketSegmentIndex)
+                                    .getInt(bucketOffset + 
ELEMENT_POINT_LENGTH);
+                    if (hashCode1 == storedHashCode) {
+                        recordArea.setReadPosition(findElementPtr);
+                        if (recordArea.readKeyAndEquals(key)) {
+                            // we found an element with a matching key, and 
not just a hash
+                            // collision
+                            found = true;
+                            reusedValue = recordArea.readValue(reusedValue);
+                            break;
+                        }
+                    }
+                }
+                if (step == 1) {
+                    hashCode2 = calcSecondHashCode(hashCode1);
+                }
+                newPos = (hashCode1 + step * hashCode2) & numBucketsMask;
+                // which segment contains the bucket
+                bucketSegmentIndex = newPos >>> numBucketsPerSegmentBits;
+                // offset of the bucket in the segment
+                bucketOffset = (newPos & numBucketsPerSegmentMask) << 
BUCKET_SIZE_BITS;
+                step += STEP_INCREMENT;
+            } while (true);
+        } catch (IOException ex) {
+            throw new RuntimeException(
+                    "Error reading record from the aggregate map: " + 
ex.getMessage(), ex);
+        }
+        reuseLookupInfo.set(found, hashCode1, key, reusedValue, 
bucketSegmentIndex, bucketOffset);
+        return reuseLookupInfo;
+    }
+
+    /** @throws EOFException if the map can't allocate much more memory. */
+    protected void growAndRehash() throws EOFException {
+        // allocate the new data structures
+        int required = 2 * bucketSegments.size();
+        if (required * (long) numBucketsPerSegment > Integer.MAX_VALUE) {
+            LOG.warn(
+                    "We can't handle more than Integer.MAX_VALUE buckets (eg. 
because hash functions return int)");
+            throw new EOFException();
+        }
+
+        int numAllocatedSegments = required - memoryPool.freePages();
+        if (numAllocatedSegments > 0) {
+            LOG.warn(
+                    "BytesHashMap can't allocate {} pages, and now used {} 
pages",
+                    required,
+                    reservedNumBuffers);
+            throw new EOFException();
+        }
+
+        List<MemorySegment> newBucketSegments = new ArrayList<>(required);
+        for (int i = 0; i < required; i++) {
+            newBucketSegments.add(memoryPool.nextSegment());
+        }
+        setBucketVariables(newBucketSegments);
+
+        long reHashStartTime = System.currentTimeMillis();
+        resetBucketSegments(newBucketSegments);
+        // Re-mask (we don't recompute the hashcode because we stored all 32 
bits of it)
+        for (MemorySegment memorySegment : bucketSegments) {
+            for (int j = 0; j < numBucketsPerSegment; j++) {
+                final int recordPointer = memorySegment.getInt(j * 
BUCKET_SIZE);
+                if (recordPointer != END_OF_LIST) {
+                    final int hashCode1 =
+                            memorySegment.getInt(j * BUCKET_SIZE + 
ELEMENT_POINT_LENGTH);
+                    int newPos = hashCode1 & numBucketsMask;
+                    int bucketSegmentIndex = newPos >>> 
numBucketsPerSegmentBits;
+                    int bucketOffset = (newPos & numBucketsPerSegmentMask) << 
BUCKET_SIZE_BITS;
+                    int step = STEP_INCREMENT;
+                    long hashCode2 = 0;
+                    while 
(newBucketSegments.get(bucketSegmentIndex).getInt(bucketOffset)
+                            != END_OF_LIST) {
+                        if (step == 1) {
+                            hashCode2 = calcSecondHashCode(hashCode1);
+                        }
+                        newPos = (int) ((hashCode1 + step * hashCode2) & 
numBucketsMask);
+                        // which segment contains the bucket
+                        bucketSegmentIndex = newPos >>> 
numBucketsPerSegmentBits;
+                        // offset of the bucket in the segment
+                        bucketOffset = (newPos & numBucketsPerSegmentMask) << 
BUCKET_SIZE_BITS;
+                        step += STEP_INCREMENT;
+                    }
+                    
newBucketSegments.get(bucketSegmentIndex).putInt(bucketOffset, recordPointer);
+                    newBucketSegments
+                            .get(bucketSegmentIndex)
+                            .putInt(bucketOffset + ELEMENT_POINT_LENGTH, 
hashCode1);
+                }
+            }
+        }
+        LOG.info(
+                "The rehash take {} ms for {} segments",
+                (System.currentTimeMillis() - reHashStartTime),
+                required);
+        this.memoryPool.returnAll(this.bucketSegments);
+        this.bucketSegments = newBucketSegments;
+    }
+
+    protected void returnSegments(List<MemorySegment> segments) {
+        memoryPool.returnAll(segments);
+    }
+
+    private void setBucketVariables(List<MemorySegment> bucketSegments) {
+        int numBuckets = bucketSegments.size() * numBucketsPerSegment;
+        this.log2NumBuckets = MathUtils.log2strict(numBuckets);
+        this.numBucketsMask = (1 << MathUtils.log2strict(numBuckets)) - 1;
+        this.numBucketsMask2 = (1 << MathUtils.log2strict(numBuckets >> 1)) - 
1;
+        this.growthThreshold = (int) (numBuckets * LOAD_FACTOR);
+    }
+
+    // M(the num of buckets) is the nth power of 2,  so the second hash code 
must be odd, and always
+    // is
+    // H2(K) = 1 + 2 * ((H1(K)/M) mod (M-1))
+    protected int calcSecondHashCode(final int firstHashCode) {
+        return ((((firstHashCode >> log2NumBuckets)) & numBucketsMask2) << 1) 
+ 1;
+    }
+
+    /** Record area. */
+    interface RecordArea<K, V> {
+
+        void setReadPosition(int position);
+
+        boolean readKeyAndEquals(K lookupKey) throws IOException;
+
+        V readValue(V reuse) throws IOException;
+
+        int appendRecord(LookupInfo<K, V> lookupInfo, BinaryRow value) throws 
IOException;
+
+        long getSegmentsSize();
+
+        void release();
+
+        void reset();
+    }
+
+    /** Result fetched when looking up a key. */
+    public static final class LookupInfo<K, V> {
+        boolean found;
+        K key;
+        V value;
+
+        /**
+         * The hashcode of the look up key passed to {@link 
BytesMap#lookup(K)}, Caching this
+         * hashcode here allows us to avoid re-hashing the key when inserting 
a value for that key.
+         * The same purpose with bucketSegmentIndex, bucketOffset.
+         */
+        int keyHashCode;
+
+        int bucketSegmentIndex;
+        int bucketOffset;
+
+        LookupInfo() {
+            this.found = false;
+            this.keyHashCode = -1;
+            this.key = null;
+            this.value = null;
+            this.bucketSegmentIndex = -1;
+            this.bucketOffset = -1;
+        }
+
+        void set(
+                boolean found,
+                int keyHashCode,
+                K key,
+                V value,
+                int bucketSegmentIndex,
+                int bucketOffset) {
+            this.found = found;
+            this.keyHashCode = keyHashCode;
+            this.key = key;
+            this.value = value;
+            this.bucketSegmentIndex = bucketSegmentIndex;
+            this.bucketOffset = bucketOffset;
+        }
+
+        public boolean isFound() {
+            return found;
+        }
+
+        public K getKey() {
+            return key;
+        }
+
+        public V getValue() {
+            return value;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/HashMapLocalMerger.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/HashMapLocalMerger.java
new file mode 100644
index 000000000..1a395fb36
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/HashMapLocalMerger.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.localmerge;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalRow.FieldSetter;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.hash.BytesHashMap;
+import org.apache.paimon.hash.BytesMap.LookupInfo;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FieldsComparator;
+import org.apache.paimon.utils.KeyValueIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.apache.paimon.data.InternalRow.createFieldSetter;
+
+/** A {@link LocalMerger} which stores records in {@link BytesHashMap}. */
+public class HashMapLocalMerger implements LocalMerger {
+
+    private final InternalRowSerializer valueSerializer;
+    private final MergeFunction<KeyValue> mergeFunction;
+    @Nullable private final FieldsComparator udsComparator;
+    private final BytesHashMap<BinaryRow> buffer;
+    private final List<FieldSetter> nonKeySetters;
+
+    public HashMapLocalMerger(
+            RowType rowType,
+            List<String> primaryKeys,
+            MemorySegmentPool memoryPool,
+            MergeFunction<KeyValue> mergeFunction,
+            @Nullable FieldsComparator userDefinedSeqComparator) {
+        this.valueSerializer = new InternalRowSerializer(rowType);
+        this.mergeFunction = mergeFunction;
+        this.udsComparator = userDefinedSeqComparator;
+        this.buffer =
+                new BytesHashMap<>(
+                        memoryPool,
+                        new BinaryRowSerializer(primaryKeys.size()),
+                        rowType.getFieldCount());
+
+        this.nonKeySetters = new ArrayList<>();
+        for (int i = 0; i < rowType.getFieldCount(); i++) {
+            DataField field = rowType.getFields().get(i);
+            if (primaryKeys.contains(field.name())) {
+                continue;
+            }
+            nonKeySetters.add(createFieldSetter(field.type(), i));
+        }
+    }
+
+    @Override
+    public boolean put(RowKind rowKind, BinaryRow key, InternalRow value) 
throws IOException {
+        // we store row kind in value
+        value.setRowKind(rowKind);
+
+        LookupInfo<BinaryRow, BinaryRow> lookup = buffer.lookup(key);
+        if (!lookup.isFound()) {
+            try {
+                buffer.append(lookup, valueSerializer.toBinaryRow(value));
+                return true;
+            } catch (EOFException eof) {
+                return false;
+            }
+        }
+
+        mergeFunction.reset();
+        BinaryRow stored = lookup.getValue();
+        KeyValue previousKv = new KeyValue().replace(key, stored.getRowKind(), 
stored);
+        KeyValue newKv = new KeyValue().replace(key, value.getRowKind(), 
value);
+        if (udsComparator != null && udsComparator.compare(stored, value) > 0) 
{
+            mergeFunction.add(newKv);
+            mergeFunction.add(previousKv);
+        } else {
+            mergeFunction.add(previousKv);
+            mergeFunction.add(newKv);
+        }
+
+        KeyValue result = mergeFunction.getResult();
+        stored.setRowKind(result.valueKind());
+        for (FieldSetter setter : nonKeySetters) {
+            setter.setFieldFrom(result.value(), stored);
+        }
+        return true;
+    }
+
+    @Override
+    public int size() {
+        return buffer.getNumElements();
+    }
+
+    @Override
+    public void forEach(Consumer<InternalRow> consumer) throws IOException {
+        KeyValueIterator<BinaryRow, BinaryRow> iterator = 
buffer.getEntryIterator(false);
+        while (iterator.advanceNext()) {
+            consumer.accept(iterator.getValue());
+        }
+    }
+
+    @Override
+    public void clear() {
+        buffer.reset();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/LocalMerger.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/LocalMerger.java
new file mode 100644
index 000000000..bec71808a
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/LocalMerger.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.localmerge;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowKind;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+/** Local merger to merge in memory. */
+public interface LocalMerger {
+
+    boolean put(RowKind rowKind, BinaryRow key, InternalRow value) throws 
IOException;
+
+    int size();
+
+    void forEach(Consumer<InternalRow> consumer) throws IOException;
+
+    void clear();
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/SortBufferLocalMerger.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/SortBufferLocalMerger.java
new file mode 100644
index 000000000..198e6c67d
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/localmerge/SortBufferLocalMerger.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.localmerge;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.mergetree.SortBufferWriteBuffer;
+import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.types.RowKind;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+/** A {@link LocalMerger} which stores records in {@link 
SortBufferWriteBuffer}. */
+public class SortBufferLocalMerger implements LocalMerger {
+
+    private final SortBufferWriteBuffer sortBuffer;
+    private final RecordComparator keyComparator;
+    private final MergeFunction<KeyValue> mergeFunction;
+
+    private long recordCount;
+
+    public SortBufferLocalMerger(
+            SortBufferWriteBuffer sortBuffer,
+            RecordComparator keyComparator,
+            MergeFunction<KeyValue> mergeFunction) {
+        this.sortBuffer = sortBuffer;
+        this.keyComparator = keyComparator;
+        this.mergeFunction = mergeFunction;
+        this.recordCount = 0;
+    }
+
+    @Override
+    public boolean put(RowKind rowKind, BinaryRow key, InternalRow value) 
throws IOException {
+        return sortBuffer.put(recordCount++, rowKind, key, value);
+    }
+
+    @Override
+    public int size() {
+        return sortBuffer.size();
+    }
+
+    @Override
+    public void forEach(Consumer<InternalRow> consumer) throws IOException {
+        sortBuffer.forEach(
+                keyComparator,
+                mergeFunction,
+                null,
+                kv -> {
+                    InternalRow row = kv.value();
+                    row.setRowKind(kv.valueKind());
+                    consumer.accept(row);
+                });
+    }
+
+    @Override
+    public void clear() {
+        sortBuffer.clear();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTest.java 
b/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTest.java
new file mode 100644
index 000000000..4020e3d48
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hash;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.types.DataType;
+
+/** Test case for {@link BytesHashMap}. */
+public class BytesHashMapTest extends BytesHashMapTestBase<BinaryRow> {
+
+    public BytesHashMapTest() {
+        super(new BinaryRowSerializer(KEY_TYPES.length));
+    }
+
+    @Override
+    public BytesHashMap<BinaryRow> createBytesHashMap(
+            MemorySegmentPool pool, DataType[] keyTypes, DataType[] 
valueTypes) {
+        return new BytesHashMap<>(
+                pool, new BinaryRowSerializer(keyTypes.length), 
valueTypes.length);
+    }
+
+    @Override
+    public BinaryRow[] generateRandomKeys(int num) {
+        return getRandomizedInputs(num);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTestBase.java
new file mode 100644
index 000000000..01bbd46e6
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/hash/BytesHashMapTestBase.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hash;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.RandomAccessInputView;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.PagedTypeSerializer;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.KeyValueIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base test class for both {@link BytesHashMap}. */
+abstract class BytesHashMapTestBase<K> extends BytesMapTestBase {
+
+    private static final int NUM_REWRITES = 10;
+
+    static final DataType[] KEY_TYPES =
+            new DataType[] {
+                new IntType(),
+                VarCharType.STRING_TYPE,
+                new DoubleType(),
+                new BigIntType(),
+                new BooleanType(),
+                new FloatType(),
+                new SmallIntType()
+            };
+
+    static final DataType[] VALUE_TYPES =
+            new DataType[] {
+                new DoubleType(),
+                new BigIntType(),
+                new BooleanType(),
+                new FloatType(),
+                new SmallIntType()
+            };
+
+    protected final BinaryRow defaultValue;
+    protected final PagedTypeSerializer<K> keySerializer;
+    protected final BinaryRowSerializer valueSerializer;
+
+    public BytesHashMapTestBase(PagedTypeSerializer<K> keySerializer) {
+        this.keySerializer = keySerializer;
+        this.valueSerializer = new BinaryRowSerializer(VALUE_TYPES.length);
+        this.defaultValue = valueSerializer.createInstance();
+        int valueSize = defaultValue.getFixedLengthPartSize();
+        this.defaultValue.pointTo(MemorySegment.wrap(new byte[valueSize]), 0, 
valueSize);
+    }
+
+    /** Creates the specific BytesHashMap, either {@link BytesHashMap}. */
+    public abstract BytesHashMap<K> createBytesHashMap(
+            MemorySegmentPool memorySegmentPool, DataType[] keyTypes, 
DataType[] valueTypes);
+
+    /**
+     * Generates {@code num} random keys, the types of key fields are defined 
in {@link #KEY_TYPES}.
+     */
+    public abstract K[] generateRandomKeys(int num);
+
+    // 
------------------------------------------------------------------------------------------
+    // Tests
+    // 
------------------------------------------------------------------------------------------
+
+    @Test
+    void testHashSetMode() throws IOException {
+        final int numMemSegments =
+                needNumMemSegments(
+                        NUM_ENTRIES,
+                        rowLength(RowType.of(VALUE_TYPES)),
+                        rowLength(RowType.of(KEY_TYPES)),
+                        PAGE_SIZE);
+        int memorySize = numMemSegments * PAGE_SIZE;
+        MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize, 
PAGE_SIZE);
+
+        BytesHashMap<K> table = createBytesHashMap(pool, KEY_TYPES, new 
DataType[] {});
+        assertThat(table.isHashSetMode()).isTrue();
+
+        K[] keys = generateRandomKeys(NUM_ENTRIES);
+        verifyKeyInsert(keys, table);
+        verifyKeyPresent(keys, table);
+        table.free();
+    }
+
+    @Test
+    void testBuildAndRetrieve() throws Exception {
+
+        final int numMemSegments =
+                needNumMemSegments(
+                        NUM_ENTRIES,
+                        rowLength(RowType.of(VALUE_TYPES)),
+                        rowLength(RowType.of(KEY_TYPES)),
+                        PAGE_SIZE);
+        int memorySize = numMemSegments * PAGE_SIZE;
+        MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize, 
PAGE_SIZE);
+
+        BytesHashMap<K> table = createBytesHashMap(pool, KEY_TYPES, 
VALUE_TYPES);
+
+        K[] keys = generateRandomKeys(NUM_ENTRIES);
+        List<BinaryRow> expected = new ArrayList<>(NUM_ENTRIES);
+        verifyInsert(keys, expected, table);
+        verifyRetrieve(table, keys, expected);
+        table.free();
+    }
+
+    @Test
+    void testBuildAndUpdate() throws Exception {
+        final int numMemSegments =
+                needNumMemSegments(
+                        NUM_ENTRIES,
+                        rowLength(RowType.of(VALUE_TYPES)),
+                        rowLength(RowType.of(KEY_TYPES)),
+                        PAGE_SIZE);
+        int memorySize = numMemSegments * PAGE_SIZE;
+        MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize, 
PAGE_SIZE);
+
+        BytesHashMap<K> table = createBytesHashMap(pool, KEY_TYPES, 
VALUE_TYPES);
+
+        K[] keys = generateRandomKeys(NUM_ENTRIES);
+        List<BinaryRow> expected = new ArrayList<>(NUM_ENTRIES);
+        verifyInsertAndUpdate(keys, expected, table);
+        verifyRetrieve(table, keys, expected);
+        table.free();
+    }
+
+    @Test
+    void testRest() throws Exception {
+        final int numMemSegments =
+                needNumMemSegments(
+                        NUM_ENTRIES,
+                        rowLength(RowType.of(VALUE_TYPES)),
+                        rowLength(RowType.of(KEY_TYPES)),
+                        PAGE_SIZE);
+
+        int memorySize = numMemSegments * PAGE_SIZE;
+
+        MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize, 
PAGE_SIZE);
+
+        BytesHashMap<K> table = createBytesHashMap(pool, KEY_TYPES, 
VALUE_TYPES);
+
+        final K[] keys = generateRandomKeys(NUM_ENTRIES);
+        List<BinaryRow> expected = new ArrayList<>(NUM_ENTRIES);
+        verifyInsertAndUpdate(keys, expected, table);
+        verifyRetrieve(table, keys, expected);
+
+        table.reset();
+        assertThat(table.getNumElements()).isEqualTo(0);
+        assertThat(table.getRecordAreaMemorySegments()).hasSize(1);
+
+        expected.clear();
+        verifyInsertAndUpdate(keys, expected, table);
+        verifyRetrieve(table, keys, expected);
+        table.free();
+    }
+
+    @Test
+    void testResetAndOutput() throws Exception {
+        final Random rnd = new Random(RANDOM_SEED);
+        final int reservedMemSegments = 64;
+
+        int minMemorySize = reservedMemSegments * PAGE_SIZE;
+        MemorySegmentPool pool = new HeapMemorySegmentPool(minMemorySize, 
PAGE_SIZE);
+
+        BytesHashMap<K> table = createBytesHashMap(pool, KEY_TYPES, 
VALUE_TYPES);
+
+        K[] keys = generateRandomKeys(NUM_ENTRIES);
+        List<BinaryRow> expected = new ArrayList<>(NUM_ENTRIES);
+        List<BinaryRow> actualValues = new ArrayList<>(NUM_ENTRIES);
+        List<K> actualKeys = new ArrayList<>(NUM_ENTRIES);
+        for (int i = 0; i < NUM_ENTRIES; i++) {
+            K groupKey = keys[i];
+            // look up and insert
+            BytesMap.LookupInfo<K, BinaryRow> lookupInfo = 
table.lookup(groupKey);
+            assertThat(lookupInfo.isFound()).isFalse();
+            try {
+                BinaryRow entry = table.append(lookupInfo, defaultValue);
+                assertThat(entry).isNotNull();
+                // mock multiple updates
+                for (int j = 0; j < NUM_REWRITES; j++) {
+                    updateOutputBuffer(entry, rnd);
+                }
+                expected.add(entry.copy());
+            } catch (Exception e) {
+                ArrayList<MemorySegment> segments = 
table.getRecordAreaMemorySegments();
+                RandomAccessInputView inView =
+                        new RandomAccessInputView(segments, 
segments.get(0).size());
+                K reuseKey = keySerializer.createReuseInstance();
+                BinaryRow reuseValue = valueSerializer.createInstance();
+                for (int index = 0; index < table.getNumElements(); index++) {
+                    reuseKey = keySerializer.mapFromPages(reuseKey, inView);
+                    reuseValue = valueSerializer.mapFromPages(reuseValue, 
inView);
+                    actualKeys.add(keySerializer.copy(reuseKey));
+                    actualValues.add(reuseValue.copy());
+                }
+                table.reset();
+                // retry
+                lookupInfo = table.lookup(groupKey);
+                BinaryRow entry = table.append(lookupInfo, defaultValue);
+                assertThat(entry).isNotNull();
+                // mock multiple updates
+                for (int j = 0; j < NUM_REWRITES; j++) {
+                    updateOutputBuffer(entry, rnd);
+                }
+                expected.add(entry.copy());
+            }
+        }
+        KeyValueIterator<K, BinaryRow> iter = table.getEntryIterator(false);
+        while (iter.advanceNext()) {
+            actualKeys.add(keySerializer.copy(iter.getKey()));
+            actualValues.add(iter.getValue().copy());
+        }
+        assertThat(expected).hasSize(NUM_ENTRIES);
+        assertThat(actualKeys).hasSize(NUM_ENTRIES);
+        assertThat(actualValues).hasSize(NUM_ENTRIES);
+        assertThat(actualValues).isEqualTo(expected);
+        table.free();
+    }
+
+    @Test
+    void testSingleKeyMultipleOps() throws Exception {
+        final int numMemSegments =
+                needNumMemSegments(
+                        NUM_ENTRIES,
+                        rowLength(RowType.of(VALUE_TYPES)),
+                        rowLength(RowType.of(KEY_TYPES)),
+                        PAGE_SIZE);
+
+        int memorySize = numMemSegments * PAGE_SIZE;
+
+        MemorySegmentPool pool = new HeapMemorySegmentPool(memorySize, 
PAGE_SIZE);
+
+        BytesHashMap<K> table = createBytesHashMap(pool, KEY_TYPES, 
VALUE_TYPES);
+        final K key = generateRandomKeys(1)[0];
+        for (int i = 0; i < 3; i++) {
+            BytesMap.LookupInfo<K, BinaryRow> lookupInfo = table.lookup(key);
+            assertThat(lookupInfo.isFound()).isFalse();
+        }
+
+        for (int i = 0; i < 3; i++) {
+            BytesMap.LookupInfo<K, BinaryRow> lookupInfo = table.lookup(key);
+            BinaryRow entry = lookupInfo.getValue();
+            if (i == 0) {
+                assertThat(lookupInfo.isFound()).isFalse();
+                entry = table.append(lookupInfo, defaultValue);
+            } else {
+                assertThat(lookupInfo.isFound()).isTrue();
+            }
+            assertThat(entry).isNotNull();
+        }
+        table.free();
+    }
+
+    // ----------------------------------------------
+    /** It will be codegened when in HashAggExec using rnd to mock 
update/initExprs resultTerm. */
+    private void updateOutputBuffer(BinaryRow reuse, Random rnd) {
+        long longVal = rnd.nextLong();
+        double doubleVal = rnd.nextDouble();
+        boolean boolVal = longVal % 2 == 0;
+        reuse.setDouble(2, doubleVal);
+        reuse.setLong(3, longVal);
+        reuse.setBoolean(4, boolVal);
+    }
+
+    // ----------------------- Utilities  -----------------------
+
+    private void verifyRetrieve(BytesHashMap<K> table, K[] keys, 
List<BinaryRow> expected) {
+        assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES);
+        for (int i = 0; i < NUM_ENTRIES; i++) {
+            K groupKey = keys[i];
+            // look up and retrieve
+            BytesMap.LookupInfo<K, BinaryRow> lookupInfo = 
table.lookup(groupKey);
+            assertThat(lookupInfo.isFound()).isTrue();
+            assertThat(lookupInfo.getValue()).isNotNull();
+            assertThat(lookupInfo.getValue()).isEqualTo(expected.get(i));
+        }
+    }
+
+    private void verifyInsert(K[] keys, List<BinaryRow> inserted, 
BytesHashMap<K> table)
+            throws IOException {
+        for (int i = 0; i < NUM_ENTRIES; i++) {
+            K groupKey = keys[i];
+            // look up and insert
+            BytesMap.LookupInfo<K, BinaryRow> lookupInfo = 
table.lookup(groupKey);
+            assertThat(lookupInfo.isFound()).isFalse();
+            BinaryRow entry = table.append(lookupInfo, defaultValue);
+            assertThat(entry).isNotNull();
+            assertThat(defaultValue).isEqualTo(entry);
+            inserted.add(entry.copy());
+        }
+        assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES);
+    }
+
+    private void verifyInsertAndUpdate(K[] keys, List<BinaryRow> inserted, 
BytesHashMap<K> table)
+            throws IOException {
+        final Random rnd = new Random(RANDOM_SEED);
+        for (int i = 0; i < NUM_ENTRIES; i++) {
+            K groupKey = keys[i];
+            // look up and insert
+            BytesMap.LookupInfo<K, BinaryRow> lookupInfo = 
table.lookup(groupKey);
+            assertThat(lookupInfo.isFound()).isFalse();
+            BinaryRow entry = table.append(lookupInfo, defaultValue);
+            assertThat(entry).isNotNull();
+            // mock multiple updates
+            for (int j = 0; j < NUM_REWRITES; j++) {
+                updateOutputBuffer(entry, rnd);
+            }
+            inserted.add(entry.copy());
+        }
+        assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES);
+    }
+
+    private void verifyKeyPresent(K[] keys, BytesHashMap<K> table) {
+        assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES);
+        BinaryRow present = new BinaryRow(0);
+        present.pointTo(MemorySegment.wrap(new byte[8]), 0, 8);
+        for (int i = 0; i < NUM_ENTRIES; i++) {
+            K groupKey = keys[i];
+            // look up and retrieve
+            BytesMap.LookupInfo<K, BinaryRow> lookupInfo = 
table.lookup(groupKey);
+            assertThat(lookupInfo.isFound()).isTrue();
+            assertThat(lookupInfo.getValue()).isNotNull();
+            assertThat(lookupInfo.getValue()).isEqualTo(present);
+        }
+    }
+
+    private void verifyKeyInsert(K[] keys, BytesHashMap<K> table) throws 
IOException {
+        BinaryRow present = new BinaryRow(0);
+        present.pointTo(MemorySegment.wrap(new byte[8]), 0, 8);
+        for (int i = 0; i < NUM_ENTRIES; i++) {
+            K groupKey = keys[i];
+            // look up and insert
+            BytesMap.LookupInfo<K, BinaryRow> lookupInfo = 
table.lookup(groupKey);
+            assertThat(lookupInfo.isFound()).isFalse();
+            BinaryRow entry = table.append(lookupInfo, defaultValue);
+            assertThat(entry).isNotNull();
+            assertThat(present).isEqualTo(entry);
+        }
+        assertThat(table.getNumElements()).isEqualTo(NUM_ENTRIES);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/hash/BytesMapTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/hash/BytesMapTestBase.java
new file mode 100644
index 000000000..acde6d9a1
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/hash/BytesMapTestBase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hash;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+
+import java.util.Random;
+
+/** Test case for {@link BytesMap}. */
+public class BytesMapTestBase {
+
+    protected static final long RANDOM_SEED = 76518743207143L;
+    protected static final int PAGE_SIZE = 32 * 1024;
+    protected static final int NUM_ENTRIES = 10000;
+
+    protected BinaryRow[] getRandomizedInputs(int num) {
+        final Random rnd = new Random(RANDOM_SEED);
+        return getRandomizedInputs(num, rnd, true);
+    }
+
+    protected BinaryRow[] getRandomizedInputs(int num, Random rnd, boolean 
nullable) {
+        BinaryRow[] lists = new BinaryRow[num];
+        for (int i = 0; i < num; i++) {
+            int intVal = rnd.nextInt(Integer.MAX_VALUE);
+            long longVal = -rnd.nextLong();
+            boolean boolVal = longVal % 2 == 0;
+            String strVal = nullable && boolVal ? null : getString(intVal, 
intVal % 1024) + i;
+            Double doubleVal = rnd.nextDouble();
+            Short shotVal = (short) intVal;
+            Float floatVal = nullable && boolVal ? null : rnd.nextFloat();
+            lists[i] = createRow(intVal, strVal, doubleVal, longVal, boolVal, 
floatVal, shotVal);
+        }
+        return lists;
+    }
+
+    protected BinaryRow createRow(
+            Integer f0, String f1, Double f2, Long f3, Boolean f4, Float f5, 
Short f6) {
+
+        BinaryRow row = new BinaryRow(7);
+        BinaryRowWriter writer = new BinaryRowWriter(row);
+
+        // int, string, double, long, boolean
+        if (f0 == null) {
+            writer.setNullAt(0);
+        } else {
+            writer.writeInt(0, f0);
+        }
+        if (f1 == null) {
+            writer.setNullAt(1);
+        } else {
+            writer.writeString(1, BinaryString.fromString(f1));
+        }
+        if (f2 == null) {
+            writer.setNullAt(2);
+        } else {
+            writer.writeDouble(2, f2);
+        }
+        if (f3 == null) {
+            writer.setNullAt(3);
+        } else {
+            writer.writeLong(3, f3);
+        }
+        if (f4 == null) {
+            writer.setNullAt(4);
+        } else {
+            writer.writeBoolean(4, f4);
+        }
+        if (f5 == null) {
+            writer.setNullAt(5);
+        } else {
+            writer.writeFloat(5, f5);
+        }
+        if (f6 == null) {
+            writer.setNullAt(6);
+        } else {
+            writer.writeShort(6, f6);
+        }
+        writer.complete();
+        return row;
+    }
+
+    protected int needNumMemSegments(int numEntries, int valLen, int keyLen, 
int pageSize) {
+        return 2 * (valLen + keyLen + 1024 * 3 + 4 + 8 + 8) * numEntries / 
pageSize;
+    }
+
+    protected int rowLength(RowType tpe) {
+        return BinaryRow.calculateFixPartSizeInBytes(tpe.getFieldCount())
+                + 
BytesHashMap.getVariableLength(tpe.getFieldTypes().toArray(new DataType[0]));
+    }
+
+    private String getString(int count, int length) {
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < length; i++) {
+            builder.append(count);
+        }
+        return builder.toString();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index aba891e44..6931fe907 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -20,13 +20,17 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.Projection;
-import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.mergetree.SortBufferWriteBuffer;
 import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.mergetree.localmerge.HashMapLocalMerger;
+import org.apache.paimon.mergetree.localmerge.LocalMerger;
+import org.apache.paimon.mergetree.localmerge.SortBufferLocalMerger;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.TableSchema;
@@ -43,6 +47,7 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -63,13 +68,10 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
     private final boolean ignoreDelete;
 
     private transient Projection keyProjection;
-    private transient RecordComparator keyComparator;
 
-    private transient long recordCount;
     private transient RowKindGenerator rowKindGenerator;
-    private transient MergeFunction<KeyValue> mergeFunction;
 
-    private transient SortBufferWriteBuffer buffer;
+    private transient LocalMerger merger;
     private transient long currentWatermark;
 
     private transient boolean endOfInput;
@@ -87,17 +89,14 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
     public void open() throws Exception {
         super.open();
 
-        RowType keyType = 
PrimaryKeyTableUtils.addKeyNamePrefix(schema.logicalPrimaryKeysType());
+        List<String> primaryKeys = schema.primaryKeys();
         RowType valueType = schema.logicalRowType();
         CoreOptions options = new CoreOptions(schema.options());
 
-        keyProjection =
-                CodeGenUtils.newProjection(valueType, 
schema.projection(schema.primaryKeys()));
-        keyComparator = new KeyComparatorSupplier(keyType).get();
+        keyProjection = CodeGenUtils.newProjection(valueType, 
schema.projection(primaryKeys));
 
-        recordCount = 0;
         rowKindGenerator = RowKindGenerator.create(schema, options);
-        mergeFunction =
+        MergeFunction<KeyValue> mergeFunction =
                 PrimaryKeyTableUtils.createMergeFunctionFactory(
                                 schema,
                                 new KeyValueFieldsExtractor() {
@@ -117,26 +116,51 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
                                 })
                         .create();
 
-        buffer =
-                new SortBufferWriteBuffer(
-                        keyType,
-                        valueType,
-                        UserDefinedSeqComparator.create(valueType, options),
-                        new HeapMemorySegmentPool(
-                                options.localMergeBufferSize(), 
options.pageSize()),
-                        false,
-                        MemorySize.MAX_VALUE,
-                        options.localSortMaxNumFileHandles(),
-                        options.spillCompressOptions(),
-                        null);
-        currentWatermark = Long.MIN_VALUE;
+        boolean canHashMerger = true;
+        for (DataField field : valueType.getFields()) {
+            if (primaryKeys.contains(field.name())) {
+                continue;
+            }
+
+            if (!BinaryRow.isInFixedLengthPart(field.type())) {
+                canHashMerger = false;
+                break;
+            }
+        }
+
+        HeapMemorySegmentPool pool =
+                new HeapMemorySegmentPool(options.localMergeBufferSize(), 
options.pageSize());
+        UserDefinedSeqComparator udsComparator =
+                UserDefinedSeqComparator.create(valueType, options);
+        if (canHashMerger) {
+            merger =
+                    new HashMapLocalMerger(
+                            valueType, primaryKeys, pool, mergeFunction, 
udsComparator);
+        } else {
+            RowType keyType =
+                    
PrimaryKeyTableUtils.addKeyNamePrefix(schema.logicalPrimaryKeysType());
+            SortBufferWriteBuffer sortBuffer =
+                    new SortBufferWriteBuffer(
+                            keyType,
+                            valueType,
+                            udsComparator,
+                            pool,
+                            false,
+                            MemorySize.MAX_VALUE,
+                            options.localSortMaxNumFileHandles(),
+                            options.spillCompressOptions(),
+                            null);
+            merger =
+                    new SortBufferLocalMerger(
+                            sortBuffer, new 
KeyComparatorSupplier(keyType).get(), mergeFunction);
+        }
 
+        currentWatermark = Long.MIN_VALUE;
         endOfInput = false;
     }
 
     @Override
     public void processElement(StreamRecord<InternalRow> record) throws 
Exception {
-        recordCount++;
         InternalRow row = record.getValue();
 
         RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
@@ -147,10 +171,10 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
         // row kind must be INSERT when it is divided into key and value
         row.setRowKind(RowKind.INSERT);
 
-        InternalRow key = keyProjection.apply(row);
-        if (!buffer.put(recordCount, rowKind, key, row)) {
+        BinaryRow key = keyProjection.apply(row);
+        if (!merger.put(rowKind, key, row)) {
             flushBuffer();
-            if (!buffer.put(recordCount, rowKind, key, row)) {
+            if (!merger.put(rowKind, key, row)) {
                 // change row kind back
                 row.setRowKind(rowKind);
                 output.collect(record);
@@ -180,28 +204,20 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
 
     @Override
     public void close() throws Exception {
-        if (buffer != null) {
-            buffer.clear();
+        if (merger != null) {
+            merger.clear();
         }
 
         super.close();
     }
 
     private void flushBuffer() throws Exception {
-        if (buffer.size() == 0) {
+        if (merger.size() == 0) {
             return;
         }
 
-        buffer.forEach(
-                keyComparator,
-                mergeFunction,
-                null,
-                kv -> {
-                    InternalRow row = kv.value();
-                    row.setRowKind(kv.valueKind());
-                    output.collect(new StreamRecord<>(row));
-                });
-        buffer.clear();
+        merger.forEach(row -> output.collect(new StreamRecord<>(row)));
+        merger.clear();
 
         if (currentWatermark != Long.MIN_VALUE) {
             super.processWatermark(new Watermark(currentWatermark));
@@ -209,4 +225,14 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
             currentWatermark = Long.MIN_VALUE;
         }
     }
+
+    @VisibleForTesting
+    LocalMerger merger() {
+        return merger;
+    }
+
+    @VisibleForTesting
+    void setOutput(Output<StreamRecord<InternalRow>> output) {
+        this.output = output;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
index e44aa4c63..dc7bf397e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
@@ -95,7 +95,7 @@ public class FirstRowITCase extends CatalogITCaseBase {
         sql(
                 "CREATE TABLE IF NOT EXISTS T1 ("
                         + "a INT, b INT, c STRING, PRIMARY KEY (a, b) NOT 
ENFORCED)"
-                        + " PARTITIONED BY (b) WITH 
('merge-engine'='first-row', 'local-merge-buffer-size' = '1m',"
+                        + " PARTITIONED BY (b) WITH 
('merge-engine'='first-row', 'local-merge-buffer-size' = '5m',"
                         + " 'file.format'='avro', 'changelog-producer' = 
'lookup');");
         batchSql("INSERT INTO T1 VALUES (1, 1, '1'), (1, 1, '2'), (2, 3, 
'3')");
         List<Row> result = batchSql("SELECT * FROM T1");
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 0bedbfd02..76ee8309e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -351,7 +351,7 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
                         + "d INT,"
                         + "PRIMARY KEY (k, d) NOT ENFORCED) PARTITIONED BY (d) 
"
                         + " WITH ('merge-engine'='partial-update', "
-                        + "'local-merge-buffer-size'='1m'"
+                        + "'local-merge-buffer-size'='5m'"
                         + ");");
 
         sql("INSERT INTO T1 VALUES (1, CAST(NULL AS INT), 1), (2, 1, 1), (1, 
2, 1)");
@@ -588,7 +588,7 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
                         + " 'changelog-producer' = 'lookup'"
                         + ")");
         if (localMerge) {
-            sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' = 
'256 kb')");
+            sql("ALTER TABLE ignore_delete SET ('local-merge-buffer-size' = 
'5m')");
         }
 
         sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING), 
'apple')");
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index e4c90695b..b8dfd8f6a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -1172,7 +1172,7 @@ public class PreAggregationITCase {
                             + "PRIMARY KEY (k, d) NOT ENFORCED) PARTITIONED BY 
(d) "
                             + " WITH ('merge-engine'='aggregation', "
                             + "'fields.v.aggregate-function'='sum',"
-                            + "'local-merge-buffer-size'='1m'"
+                            + "'local-merge-buffer-size'='5m'"
                             + ");");
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 027eada92..3fa95edb8 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -72,7 +72,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         tableDefaultProperties = new HashMap<>();
         if (random.nextBoolean()) {
-            
tableDefaultProperties.put(CoreOptions.LOCAL_MERGE_BUFFER_SIZE.key(), "256 kb");
+            
tableDefaultProperties.put(CoreOptions.LOCAL_MERGE_BUFFER_SIZE.key(), "5m");
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
new file mode 100644
index 000000000..1162e20b1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.mergetree.localmerge.HashMapLocalMerger;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
+import org.apache.flink.util.OutputTag;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.function.Consumer;
+
+import static org.apache.paimon.CoreOptions.LOCAL_MERGE_BUFFER_SIZE;
+import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD;
+import static org.apache.paimon.data.BinaryString.fromString;
+import static org.apache.paimon.types.RowKind.DELETE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+class LocalMergeOperatorTest {
+
+    private LocalMergeOperator operator;
+
+    @Test
+    public void testHashNormal() throws Exception {
+        prepareHashOperator();
+        List<String> result = new ArrayList<>();
+        setOutput(result);
+
+        // first test
+        processElement("a", 1);
+        processElement("b", 1);
+        processElement("a", 2);
+        processElement(DELETE, "b", 2);
+        operator.prepareSnapshotPreBarrier(0);
+        assertThat(result).containsExactlyInAnyOrder("+I:a->2", "-D:b->2");
+        result.clear();
+
+        // second test
+        processElement("c", 1);
+        processElement("d", 1);
+        operator.prepareSnapshotPreBarrier(0);
+        assertThat(result).containsExactlyInAnyOrder("+I:c->1", "+I:d->1");
+        result.clear();
+
+        // large records
+        Map<String, String> expected = new HashMap<>();
+        Random rnd = new Random();
+        int records = 10_000;
+        for (int i = 0; i < records; i++) {
+            String key = rnd.nextInt(records) + "";
+            expected.put(key, "+I:" + key + "->" + i);
+            processElement(key, i);
+        }
+
+        operator.prepareSnapshotPreBarrier(0);
+        
assertThat(result).containsExactlyInAnyOrderElementsOf(expected.values());
+        result.clear();
+    }
+
+    @Test
+    public void testUserDefineSequence() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(SEQUENCE_FIELD.key(), "f1");
+        prepareHashOperator(options);
+
+        List<String> result = new ArrayList<>();
+        setOutput(result);
+
+        processElement("a", 2);
+        processElement("b", 1);
+        processElement("a", 1);
+        operator.prepareSnapshotPreBarrier(0);
+        assertThat(result).containsExactlyInAnyOrder("+I:a->2", "+I:b->1");
+        result.clear();
+    }
+
+    @Test
+    public void testHashSpill() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(LOCAL_MERGE_BUFFER_SIZE.key(), "2 m");
+        prepareHashOperator(options);
+        List<String> result = new ArrayList<>();
+        setOutput(result);
+
+        Map<String, String> expected = new HashMap<>();
+        for (int i = 0; i < 30_000; i++) {
+            String key = i + "";
+            expected.put(key, "+I:" + key + "->" + i);
+            processElement(key, i);
+        }
+
+        operator.prepareSnapshotPreBarrier(0);
+        
assertThat(result).containsExactlyInAnyOrderElementsOf(expected.values());
+        result.clear();
+    }
+
+    private void prepareHashOperator() throws Exception {
+        prepareHashOperator(new HashMap<>());
+    }
+
+    private void prepareHashOperator(Map<String, String> options) throws 
Exception {
+        if (!options.containsKey(LOCAL_MERGE_BUFFER_SIZE.key())) {
+            options.put(LOCAL_MERGE_BUFFER_SIZE.key(), "10 m");
+        }
+        RowType rowType =
+                RowType.of(
+                        DataTypes.STRING(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT(),
+                        DataTypes.INT());
+        TableSchema schema =
+                new TableSchema(
+                        0L,
+                        rowType.getFields(),
+                        rowType.getFieldCount(),
+                        Collections.emptyList(),
+                        Collections.singletonList("f0"),
+                        options,
+                        null);
+        operator = new LocalMergeOperator(schema);
+        operator.open();
+        assertThat(operator.merger()).isInstanceOf(HashMapLocalMerger.class);
+    }
+
+    private void setOutput(List<String> result) {
+        operator.setOutput(
+                new TestOutput(
+                        row ->
+                                result.add(
+                                        row.getRowKind().shortString()
+                                                + ":"
+                                                + row.getString(0)
+                                                + "->"
+                                                + row.getInt(1))));
+    }
+
+    private void processElement(String key, int value) throws Exception {
+        processElement(RowKind.INSERT, key, value);
+    }
+
+    private void processElement(RowKind rowKind, String key, int value) throws 
Exception {
+        operator.processElement(
+                new StreamRecord<>(
+                        GenericRow.ofKind(rowKind, fromString(key), value, 
value, value, value)));
+    }
+
+    private static class TestOutput implements 
Output<StreamRecord<InternalRow>> {
+
+        private final Consumer<InternalRow> consumer;
+
+        private TestOutput(Consumer<InternalRow> consumer) {
+            this.consumer = consumer;
+        }
+
+        @Override
+        public void emitWatermark(Watermark mark) {}
+
+        @Override
+        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}
+
+        @Override
+        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> 
record) {}
+
+        @Override
+        public void emitLatencyMarker(LatencyMarker latencyMarker) {}
+
+        @Override
+        public void emitRecordAttributes(RecordAttributes recordAttributes) {}
+
+        @Override
+        public void collect(StreamRecord<InternalRow> record) {
+            consumer.accept(record.getValue());
+        }
+
+        @Override
+        public void close() {}
+    }
+}


Reply via email to