This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 960dce1a18 [core] Introduce DataPagedOutputSerializer to limit memory 
in ManifestEntryCache (#6355)
960dce1a18 is described below

commit 960dce1a18cccb3beac1d5ae0c8a1f59414498ae
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Oct 11 16:18:31 2025 +0800

    [core] Introduce DataPagedOutputSerializer to limit memory in 
ManifestEntryCache (#6355)
---
 .../data/serializer/BinaryRowSerializer.java       |  17 ++-
 .../org/apache/paimon/io/DataOutputSerializer.java |  11 --
 .../paimon/io/DataPagedOutputSerializer.java       | 129 +++++++++++++++++++++
 .../paimon/io/DataPagedOutputSerializerTest.java   | 112 ++++++++++++++++++
 .../apache/paimon/manifest/ManifestEntryCache.java |  24 ++--
 5 files changed, 259 insertions(+), 34 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
index 8773e734d8..49dcee73ef 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
@@ -113,15 +113,14 @@ public class BinaryRowSerializer extends 
AbstractRowDataSerializer<BinaryRow> {
     }
 
     @Override
-    public int serializeToPages(BinaryRow record, AbstractPagedOutputView 
headerLessView)
-            throws IOException {
-        int skip = checkSkipWriteForFixLengthPart(headerLessView);
-        headerLessView.writeInt(record.getSizeInBytes());
-        serializeWithoutLength(record, headerLessView);
+    public int serializeToPages(BinaryRow record, AbstractPagedOutputView out) 
throws IOException {
+        int skip = checkSkipWriteForFixLengthPart(out);
+        out.writeInt(record.getSizeInBytes());
+        serializeWithoutLength(record, out);
         return skip;
     }
 
-    private static void serializeWithoutLength(BinaryRow record, 
MemorySegmentWritable writable)
+    public static void serializeWithoutLength(BinaryRow record, 
MemorySegmentWritable writable)
             throws IOException {
         if (record.getSegments().length == 1) {
             writable.write(record.getSegments()[0], record.getOffset(), 
record.getSizeInBytes());
@@ -281,11 +280,11 @@ public class BinaryRowSerializer extends 
AbstractRowDataSerializer<BinaryRow> {
 
     /** Return fixed part length to serialize one row. */
     public int getSerializedRowFixedPartLength() {
-        return getFixedLengthPartSize() + LENGTH_SIZE_IN_BYTES;
+        return fixedLengthPartSize + LENGTH_SIZE_IN_BYTES;
     }
 
-    public int getFixedLengthPartSize() {
-        return fixedLengthPartSize;
+    public static int getSerializedRowLength(BinaryRow row) {
+        return row.getSizeInBytes() + LENGTH_SIZE_IN_BYTES;
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/DataOutputSerializer.java 
b/paimon-common/src/main/java/org/apache/paimon/io/DataOutputSerializer.java
index a0923d94da..33f2a8f0bf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/DataOutputSerializer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/DataOutputSerializer.java
@@ -26,7 +26,6 @@ import org.apache.paimon.utils.Preconditions;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.Arrays;
 
@@ -37,8 +36,6 @@ public class DataOutputSerializer implements DataOutputView, 
MemorySegmentWritab
 
     private int position;
 
-    private ByteBuffer wrapper;
-
     // ------------------------------------------------------------------------
 
     public DataOutputSerializer(int startSize) {
@@ -47,13 +44,6 @@ public class DataOutputSerializer implements DataOutputView, 
MemorySegmentWritab
         }
 
         this.buffer = new byte[startSize];
-        this.wrapper = ByteBuffer.wrap(buffer);
-    }
-
-    public ByteBuffer wrapAsByteBuffer() {
-        this.wrapper.position(0);
-        this.wrapper.limit(this.position);
-        return this.wrapper;
     }
 
     /** @deprecated Replaced by {@link #getSharedBuffer()} for a better, safer 
name. */
@@ -329,7 +319,6 @@ public class DataOutputSerializer implements 
DataOutputView, MemorySegmentWritab
 
         System.arraycopy(this.buffer, 0, nb, 0, this.position);
         this.buffer = nb;
-        this.wrapper = ByteBuffer.wrap(this.buffer);
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/DataPagedOutputSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/io/DataPagedOutputSerializer.java
new file mode 100644
index 0000000000..c7ce3c94ec
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/io/DataPagedOutputSerializer.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.utils.MathUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static 
org.apache.paimon.data.serializer.BinaryRowSerializer.getSerializedRowLength;
+import static 
org.apache.paimon.data.serializer.BinaryRowSerializer.serializeWithoutLength;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * A paged output serializer that efficiently handles serialization of rows 
using memory pages.
+ *
+ * <p>This serializer uses a two-phase approach:
+ *
+ * <ol>
+ *   <li><strong>Initial phase:</strong> Writes data to an initial buffer 
until it reaches the
+ *       specified page size.
+ *   <li><strong>Paged phase:</strong> Once the initial buffer exceeds the 
page size, switches to
+ *       using {@link SimpleCollectingOutputView} with allocated memory 
segments for efficient
+ *       memory management.
+ * </ol>
+ *
+ * <p>The design ensures optimal performance for both small datasets (using 
single buffer) and large
+ * datasets (using paged memory allocation).
+ */
+public class DataPagedOutputSerializer {
+
+    private final InternalRowSerializer serializer;
+    private final int pageSize;
+
+    private DataOutputSerializer initialOut;
+    private SimpleCollectingOutputView pagedOut;
+
+    /**
+     * Constructs a new DataPagedOutputSerializer with the specified 
parameters.
+     *
+     * @param serializer the internal row serializer used for converting rows 
to binary format
+     * @param startSize the initial buffer size for storing serialized data
+     * @param pageSize the maximum size of each memory page before switching 
to paged mode
+     */
+    public DataPagedOutputSerializer(
+            InternalRowSerializer serializer, int startSize, int pageSize) {
+        this.serializer = serializer;
+        this.pageSize = pageSize;
+        this.initialOut = new DataOutputSerializer(startSize);
+    }
+
+    @VisibleForTesting
+    SimpleCollectingOutputView pagedOut() {
+        return pagedOut;
+    }
+
+    /**
+     * Serializes a binary row to the output.
+     *
+     * <p>Depending on the current state and available space, this method will 
either:
+     *
+     * <ul>
+     *   <li>Write directly to the initial buffer if there's sufficient space 
remaining
+     *   <li>Switch to paged mode and write to memory segments once the 
initial buffer fills up
+     * </ul>
+     *
+     * @param row the binary row to serialize
+     * @throws IOException if an I/O error occurs during serialization
+     */
+    public void write(InternalRow row) throws IOException {
+        if (pagedOut != null) {
+            serializer.serializeToPages(row, pagedOut);
+        } else {
+            BinaryRow binaryRow = serializer.toBinaryRow(row);
+            int serializedSize = getSerializedRowLength(binaryRow);
+            if (initialOut.length() + serializedSize > pageSize) {
+                pagedOut = toPagedOutput(initialOut, pageSize);
+                initialOut = null;
+                serializer.serializeToPages(row, pagedOut);
+            } else {
+                initialOut.writeInt(binaryRow.getSizeInBytes());
+                serializeWithoutLength(binaryRow, initialOut);
+            }
+        }
+    }
+
+    private static SimpleCollectingOutputView toPagedOutput(
+            DataOutputSerializer output, int pageSize) throws IOException {
+        checkArgument(output.length() <= pageSize);
+        SimpleCollectingOutputView pagedOut =
+                new SimpleCollectingOutputView(
+                        new ArrayList<>(),
+                        () -> MemorySegment.allocateHeapMemory(pageSize),
+                        pageSize);
+        pagedOut.write(output.getSharedBuffer(), 0, output.length());
+        return pagedOut;
+    }
+
+    public SimpleCollectingOutputView close() throws IOException {
+        if (pagedOut != null) {
+            return pagedOut;
+        }
+
+        int pageSize = MathUtils.roundUpToPowerOf2(initialOut.length());
+        return toPagedOutput(initialOut, pageSize);
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/io/DataPagedOutputSerializerTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/io/DataPagedOutputSerializerTest.java
new file mode 100644
index 0000000000..1156b8533a
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/io/DataPagedOutputSerializerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link DataPagedOutputSerializer}. */
+class DataPagedOutputSerializerTest {
+
+    @Test
+    void testSmallDatasetStaysInInitialPhase() throws IOException {
+        InternalRowSerializer serializer = createSimpleSerializer();
+        DataPagedOutputSerializer output = new 
DataPagedOutputSerializer(serializer, 256, 8 * 1024);
+        SimpleCollectingOutputView pagedView =
+                new SimpleCollectingOutputView(
+                        new ArrayList<>(),
+                        () -> MemorySegment.allocateHeapMemory(4 * 1024),
+                        4 * 1024);
+
+        BinaryRow row = createSampleRow();
+
+        // Write a few small rows that fit within the initial buffer
+        for (int i = 0; i < 50; i++) {
+            output.write(row);
+            serializer.serializeToPages(row, pagedView);
+        }
+
+        // Should still be using initial buffer since we haven't exceeded page 
size
+        assertThat(output.pagedOut()).isNull();
+
+        // assert result
+        SimpleCollectingOutputView result = output.close();
+        assertEqual(result, pagedView);
+    }
+
+    @Test
+    void testLargeDatasetSwitchesToPagedMode() throws IOException {
+        InternalRowSerializer serializer = createSimpleSerializer();
+        DataPagedOutputSerializer output = new 
DataPagedOutputSerializer(serializer, 256, 4 * 1024);
+        SimpleCollectingOutputView pagedView =
+                new SimpleCollectingOutputView(
+                        new ArrayList<>(),
+                        () -> MemorySegment.allocateHeapMemory(4 * 1024),
+                        4 * 1024);
+
+        BinaryRow row = createSampleRow();
+
+        // Write many rows
+        for (int i = 0; i < 500; i++) {
+            output.write(row);
+            serializer.serializeToPages(row, pagedView);
+        }
+
+        assertThat(output.pagedOut()).isNotNull();
+
+        // assert result
+        SimpleCollectingOutputView result = output.close();
+        assertEqual(result, pagedView);
+    }
+
+    private void assertEqual(SimpleCollectingOutputView view1, 
SimpleCollectingOutputView view2) {
+        
assertThat(view1.getCurrentOffset()).isEqualTo(view2.getCurrentOffset());
+        
assertThat(view1.fullSegments().size()).isEqualTo(view2.fullSegments().size());
+        for (int i = 0; i < view1.fullSegments().size(); i++) {
+            MemorySegment segment1 = view1.fullSegments().get(i);
+            MemorySegment segment2 = view2.fullSegments().get(i);
+            assertThat(segment1.size()).isEqualTo(segment2.size());
+            
assertThat(segment1.getHeapMemory()).isEqualTo(segment2.getHeapMemory());
+        }
+    }
+
+    private InternalRowSerializer createSimpleSerializer() {
+        return new InternalRowSerializer(DataTypes.INT(), DataTypes.STRING());
+    }
+
+    private BinaryRow createSampleRow() {
+        GenericRow genericRow =
+                GenericRow.of(
+                        42, BinaryString.fromString("sample-data-" + 
System.currentTimeMillis()));
+        InternalRowSerializer tempSerializer = createSimpleSerializer();
+        return tempSerializer.toBinaryRow(genericRow);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
index 6e43ba01a3..59b2a34650 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
@@ -24,9 +24,8 @@ import org.apache.paimon.data.Segments;
 import org.apache.paimon.data.SimpleCollectingOutputView;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataPagedOutputSerializer;
 import org.apache.paimon.manifest.ManifestEntrySegments.RichSegments;
-import org.apache.paimon.memory.MemorySegment;
-import org.apache.paimon.memory.MemorySegmentSource;
 import org.apache.paimon.partition.PartitionPredicate;
 import 
org.apache.paimon.partition.PartitionPredicate.MultiplePartitionPredicate;
 import org.apache.paimon.types.RowType;
@@ -77,18 +76,15 @@ public class ManifestEntryCache extends ObjectsCache<Path, 
ManifestEntry, Manife
 
     @Override
     protected ManifestEntrySegments createSegments(Path path, @Nullable Long 
fileSize) {
-        Map<Triple<BinaryRow, Integer, Integer>, SimpleCollectingOutputView> 
segments =
+        Map<Triple<BinaryRow, Integer, Integer>, DataPagedOutputSerializer> 
segments =
                 new HashMap<>();
         Function<InternalRow, BinaryRow> partitionGetter = partitionGetter();
         Function<InternalRow, Integer> bucketGetter = bucketGetter();
         Function<InternalRow, Integer> totalBucketGetter = totalBucketGetter();
-        MemorySegmentSource segmentSource =
-                () -> MemorySegment.allocateHeapMemory(cache.pageSize());
-        Supplier<SimpleCollectingOutputView> outViewSupplier =
-                () ->
-                        new SimpleCollectingOutputView(
-                                new ArrayList<>(), segmentSource, 
cache.pageSize());
+        int pageSize = cache.pageSize();
         InternalRowSerializer formatSerializer = this.formatSerializer.get();
+        Supplier<DataPagedOutputSerializer> outputSupplier =
+                () -> new DataPagedOutputSerializer(formatSerializer, 2048, 
pageSize);
         try (CloseableIterator<InternalRow> iterator = reader.apply(path, 
fileSize)) {
             while (iterator.hasNext()) {
                 InternalRow row = iterator.next();
@@ -96,15 +92,15 @@ public class ManifestEntryCache extends ObjectsCache<Path, 
ManifestEntry, Manife
                 int bucket = bucketGetter.apply(row);
                 int totalBucket = totalBucketGetter.apply(row);
                 Triple<BinaryRow, Integer, Integer> key = Triple.of(partition, 
bucket, totalBucket);
-                SimpleCollectingOutputView view =
-                        segments.computeIfAbsent(key, k -> 
outViewSupplier.get());
-                formatSerializer.serializeToPages(row, view);
+                DataPagedOutputSerializer output =
+                        segments.computeIfAbsent(key, k -> 
outputSupplier.get());
+                output.write(row);
             }
             List<RichSegments> result = new ArrayList<>();
-            for (Map.Entry<Triple<BinaryRow, Integer, Integer>, 
SimpleCollectingOutputView> entry :
+            for (Map.Entry<Triple<BinaryRow, Integer, Integer>, 
DataPagedOutputSerializer> entry :
                     segments.entrySet()) {
                 Triple<BinaryRow, Integer, Integer> key = entry.getKey();
-                SimpleCollectingOutputView view = entry.getValue();
+                SimpleCollectingOutputView view = entry.getValue().close();
                 Segments seg =
                         Segments.create(view.fullSegments(), 
view.getCurrentPositionInSegment());
                 result.add(new RichSegments(key.f0, key.f1, key.f2, seg));

Reply via email to