This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 960dce1a18 [core] Introduce DataPagedOutputSerializer to limit memory
in ManifestEntryCache (#6355)
960dce1a18 is described below
commit 960dce1a18cccb3beac1d5ae0c8a1f59414498ae
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Oct 11 16:18:31 2025 +0800
[core] Introduce DataPagedOutputSerializer to limit memory in
ManifestEntryCache (#6355)
---
.../data/serializer/BinaryRowSerializer.java | 17 ++-
.../org/apache/paimon/io/DataOutputSerializer.java | 11 --
.../paimon/io/DataPagedOutputSerializer.java | 129 +++++++++++++++++++++
.../paimon/io/DataPagedOutputSerializerTest.java | 112 ++++++++++++++++++
.../apache/paimon/manifest/ManifestEntryCache.java | 24 ++--
5 files changed, 259 insertions(+), 34 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
index 8773e734d8..49dcee73ef 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java
@@ -113,15 +113,14 @@ public class BinaryRowSerializer extends
AbstractRowDataSerializer<BinaryRow> {
}
@Override
- public int serializeToPages(BinaryRow record, AbstractPagedOutputView
headerLessView)
- throws IOException {
- int skip = checkSkipWriteForFixLengthPart(headerLessView);
- headerLessView.writeInt(record.getSizeInBytes());
- serializeWithoutLength(record, headerLessView);
+ public int serializeToPages(BinaryRow record, AbstractPagedOutputView out)
throws IOException {
+ int skip = checkSkipWriteForFixLengthPart(out);
+ out.writeInt(record.getSizeInBytes());
+ serializeWithoutLength(record, out);
return skip;
}
- private static void serializeWithoutLength(BinaryRow record,
MemorySegmentWritable writable)
+ public static void serializeWithoutLength(BinaryRow record,
MemorySegmentWritable writable)
throws IOException {
if (record.getSegments().length == 1) {
writable.write(record.getSegments()[0], record.getOffset(),
record.getSizeInBytes());
@@ -281,11 +280,11 @@ public class BinaryRowSerializer extends
AbstractRowDataSerializer<BinaryRow> {
/** Return fixed part length to serialize one row. */
public int getSerializedRowFixedPartLength() {
- return getFixedLengthPartSize() + LENGTH_SIZE_IN_BYTES;
+ return fixedLengthPartSize + LENGTH_SIZE_IN_BYTES;
}
- public int getFixedLengthPartSize() {
- return fixedLengthPartSize;
+ public static int getSerializedRowLength(BinaryRow row) {
+ return row.getSizeInBytes() + LENGTH_SIZE_IN_BYTES;
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/DataOutputSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/io/DataOutputSerializer.java
index a0923d94da..33f2a8f0bf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/DataOutputSerializer.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/DataOutputSerializer.java
@@ -26,7 +26,6 @@ import org.apache.paimon.utils.Preconditions;
import java.io.EOFException;
import java.io.IOException;
import java.io.UTFDataFormatException;
-import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
@@ -37,8 +36,6 @@ public class DataOutputSerializer implements DataOutputView,
MemorySegmentWritab
private int position;
- private ByteBuffer wrapper;
-
// ------------------------------------------------------------------------
public DataOutputSerializer(int startSize) {
@@ -47,13 +44,6 @@ public class DataOutputSerializer implements DataOutputView,
MemorySegmentWritab
}
this.buffer = new byte[startSize];
- this.wrapper = ByteBuffer.wrap(buffer);
- }
-
- public ByteBuffer wrapAsByteBuffer() {
- this.wrapper.position(0);
- this.wrapper.limit(this.position);
- return this.wrapper;
}
/** @deprecated Replaced by {@link #getSharedBuffer()} for a better, safer
name. */
@@ -329,7 +319,6 @@ public class DataOutputSerializer implements
DataOutputView, MemorySegmentWritab
System.arraycopy(this.buffer, 0, nb, 0, this.position);
this.buffer = nb;
- this.wrapper = ByteBuffer.wrap(this.buffer);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/io/DataPagedOutputSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/io/DataPagedOutputSerializer.java
new file mode 100644
index 0000000000..c7ce3c94ec
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/io/DataPagedOutputSerializer.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.utils.MathUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static
org.apache.paimon.data.serializer.BinaryRowSerializer.getSerializedRowLength;
+import static
org.apache.paimon.data.serializer.BinaryRowSerializer.serializeWithoutLength;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * A paged output serializer that efficiently handles serialization of rows
using memory pages.
+ *
+ * <p>This serializer uses a two-phase approach:
+ *
+ * <ol>
+ * <li><strong>Initial phase:</strong> Writes data to an initial buffer
until it reaches the
+ * specified page size.
+ * <li><strong>Paged phase:</strong> Once the initial buffer exceeds the
page size, switches to
+ * using {@link SimpleCollectingOutputView} with allocated memory
segments for efficient
+ * memory management.
+ * </ol>
+ *
+ * <p>The design ensures optimal performance for both small datasets (using
single buffer) and large
+ * datasets (using paged memory allocation).
+ */
+public class DataPagedOutputSerializer {
+
+ private final InternalRowSerializer serializer;
+ private final int pageSize;
+
+ private DataOutputSerializer initialOut;
+ private SimpleCollectingOutputView pagedOut;
+
+ /**
+ * Constructs a new DataPagedOutputSerializer with the specified
parameters.
+ *
+ * @param serializer the internal row serializer used for converting rows
to binary format
+ * @param startSize the initial buffer size for storing serialized data
+ * @param pageSize the maximum size of each memory page before switching
to paged mode
+ */
+ public DataPagedOutputSerializer(
+ InternalRowSerializer serializer, int startSize, int pageSize) {
+ this.serializer = serializer;
+ this.pageSize = pageSize;
+ this.initialOut = new DataOutputSerializer(startSize);
+ }
+
+ @VisibleForTesting
+ SimpleCollectingOutputView pagedOut() {
+ return pagedOut;
+ }
+
+ /**
+ * Serializes a binary row to the output.
+ *
+ * <p>Depending on the current state and available space, this method will
either:
+ *
+ * <ul>
+ * <li>Write directly to the initial buffer if there's sufficient space
remaining
+ * <li>Switch to paged mode and write to memory segments once the
initial buffer fills up
+ * </ul>
+ *
+ * @param row the binary row to serialize
+ * @throws IOException if an I/O error occurs during serialization
+ */
+ public void write(InternalRow row) throws IOException {
+ if (pagedOut != null) {
+ serializer.serializeToPages(row, pagedOut);
+ } else {
+ BinaryRow binaryRow = serializer.toBinaryRow(row);
+ int serializedSize = getSerializedRowLength(binaryRow);
+ if (initialOut.length() + serializedSize > pageSize) {
+ pagedOut = toPagedOutput(initialOut, pageSize);
+ initialOut = null;
+ serializer.serializeToPages(row, pagedOut);
+ } else {
+ initialOut.writeInt(binaryRow.getSizeInBytes());
+ serializeWithoutLength(binaryRow, initialOut);
+ }
+ }
+ }
+
+ private static SimpleCollectingOutputView toPagedOutput(
+ DataOutputSerializer output, int pageSize) throws IOException {
+ checkArgument(output.length() <= pageSize);
+ SimpleCollectingOutputView pagedOut =
+ new SimpleCollectingOutputView(
+ new ArrayList<>(),
+ () -> MemorySegment.allocateHeapMemory(pageSize),
+ pageSize);
+ pagedOut.write(output.getSharedBuffer(), 0, output.length());
+ return pagedOut;
+ }
+
+ public SimpleCollectingOutputView close() throws IOException {
+ if (pagedOut != null) {
+ return pagedOut;
+ }
+
+ int pageSize = MathUtils.roundUpToPowerOf2(initialOut.length());
+ return toPagedOutput(initialOut, pageSize);
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/io/DataPagedOutputSerializerTest.java
b/paimon-common/src/test/java/org/apache/paimon/io/DataPagedOutputSerializerTest.java
new file mode 100644
index 0000000000..1156b8533a
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/io/DataPagedOutputSerializerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link DataPagedOutputSerializer}. */
+class DataPagedOutputSerializerTest {
+
+ @Test
+ void testSmallDatasetStaysInInitialPhase() throws IOException {
+ InternalRowSerializer serializer = createSimpleSerializer();
+ DataPagedOutputSerializer output = new
DataPagedOutputSerializer(serializer, 256, 8 * 1024);
+ SimpleCollectingOutputView pagedView =
+ new SimpleCollectingOutputView(
+ new ArrayList<>(),
+ () -> MemorySegment.allocateHeapMemory(4 * 1024),
+ 4 * 1024);
+
+ BinaryRow row = createSampleRow();
+
+ // Write a few small rows that fit within the initial buffer
+ for (int i = 0; i < 50; i++) {
+ output.write(row);
+ serializer.serializeToPages(row, pagedView);
+ }
+
+ // Should still be using initial buffer since we haven't exceeded page
size
+ assertThat(output.pagedOut()).isNull();
+
+ // assert result
+ SimpleCollectingOutputView result = output.close();
+ assertEqual(result, pagedView);
+ }
+
+ @Test
+ void testLargeDatasetSwitchesToPagedMode() throws IOException {
+ InternalRowSerializer serializer = createSimpleSerializer();
+ DataPagedOutputSerializer output = new
DataPagedOutputSerializer(serializer, 256, 4 * 1024);
+ SimpleCollectingOutputView pagedView =
+ new SimpleCollectingOutputView(
+ new ArrayList<>(),
+ () -> MemorySegment.allocateHeapMemory(4 * 1024),
+ 4 * 1024);
+
+ BinaryRow row = createSampleRow();
+
+ // Write many rows
+ for (int i = 0; i < 500; i++) {
+ output.write(row);
+ serializer.serializeToPages(row, pagedView);
+ }
+
+ assertThat(output.pagedOut()).isNotNull();
+
+ // assert result
+ SimpleCollectingOutputView result = output.close();
+ assertEqual(result, pagedView);
+ }
+
+ private void assertEqual(SimpleCollectingOutputView view1,
SimpleCollectingOutputView view2) {
+
assertThat(view1.getCurrentOffset()).isEqualTo(view2.getCurrentOffset());
+
assertThat(view1.fullSegments().size()).isEqualTo(view2.fullSegments().size());
+ for (int i = 0; i < view1.fullSegments().size(); i++) {
+ MemorySegment segment1 = view1.fullSegments().get(i);
+ MemorySegment segment2 = view2.fullSegments().get(i);
+ assertThat(segment1.size()).isEqualTo(segment2.size());
+
assertThat(segment1.getHeapMemory()).isEqualTo(segment2.getHeapMemory());
+ }
+ }
+
+ private InternalRowSerializer createSimpleSerializer() {
+ return new InternalRowSerializer(DataTypes.INT(), DataTypes.STRING());
+ }
+
+ private BinaryRow createSampleRow() {
+ GenericRow genericRow =
+ GenericRow.of(
+ 42, BinaryString.fromString("sample-data-" +
System.currentTimeMillis()));
+ InternalRowSerializer tempSerializer = createSimpleSerializer();
+ return tempSerializer.toBinaryRow(genericRow);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
index 6e43ba01a3..59b2a34650 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
@@ -24,9 +24,8 @@ import org.apache.paimon.data.Segments;
import org.apache.paimon.data.SimpleCollectingOutputView;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataPagedOutputSerializer;
import org.apache.paimon.manifest.ManifestEntrySegments.RichSegments;
-import org.apache.paimon.memory.MemorySegment;
-import org.apache.paimon.memory.MemorySegmentSource;
import org.apache.paimon.partition.PartitionPredicate;
import
org.apache.paimon.partition.PartitionPredicate.MultiplePartitionPredicate;
import org.apache.paimon.types.RowType;
@@ -77,18 +76,15 @@ public class ManifestEntryCache extends ObjectsCache<Path,
ManifestEntry, Manife
@Override
protected ManifestEntrySegments createSegments(Path path, @Nullable Long
fileSize) {
- Map<Triple<BinaryRow, Integer, Integer>, SimpleCollectingOutputView>
segments =
+ Map<Triple<BinaryRow, Integer, Integer>, DataPagedOutputSerializer>
segments =
new HashMap<>();
Function<InternalRow, BinaryRow> partitionGetter = partitionGetter();
Function<InternalRow, Integer> bucketGetter = bucketGetter();
Function<InternalRow, Integer> totalBucketGetter = totalBucketGetter();
- MemorySegmentSource segmentSource =
- () -> MemorySegment.allocateHeapMemory(cache.pageSize());
- Supplier<SimpleCollectingOutputView> outViewSupplier =
- () ->
- new SimpleCollectingOutputView(
- new ArrayList<>(), segmentSource,
cache.pageSize());
+ int pageSize = cache.pageSize();
InternalRowSerializer formatSerializer = this.formatSerializer.get();
+ Supplier<DataPagedOutputSerializer> outputSupplier =
+ () -> new DataPagedOutputSerializer(formatSerializer, 2048,
pageSize);
try (CloseableIterator<InternalRow> iterator = reader.apply(path,
fileSize)) {
while (iterator.hasNext()) {
InternalRow row = iterator.next();
@@ -96,15 +92,15 @@ public class ManifestEntryCache extends ObjectsCache<Path,
ManifestEntry, Manife
int bucket = bucketGetter.apply(row);
int totalBucket = totalBucketGetter.apply(row);
Triple<BinaryRow, Integer, Integer> key = Triple.of(partition,
bucket, totalBucket);
- SimpleCollectingOutputView view =
- segments.computeIfAbsent(key, k ->
outViewSupplier.get());
- formatSerializer.serializeToPages(row, view);
+ DataPagedOutputSerializer output =
+ segments.computeIfAbsent(key, k ->
outputSupplier.get());
+ output.write(row);
}
List<RichSegments> result = new ArrayList<>();
- for (Map.Entry<Triple<BinaryRow, Integer, Integer>,
SimpleCollectingOutputView> entry :
+ for (Map.Entry<Triple<BinaryRow, Integer, Integer>,
DataPagedOutputSerializer> entry :
segments.entrySet()) {
Triple<BinaryRow, Integer, Integer> key = entry.getKey();
- SimpleCollectingOutputView view = entry.getValue();
+ SimpleCollectingOutputView view = entry.getValue().close();
Segments seg =
Segments.create(view.fullSegments(),
view.getCurrentPositionInSegment());
result.add(new RichSegments(key.f0, key.f1, key.f2, seg));