This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 32910ff7b73 Load: Fix LoadPieceNode reading aligned Chunk causing OOM
(#16132)
32910ff7b73 is described below
commit 32910ff7b73d5f8df3c22af103779b88fa3510bf
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Aug 12 15:30:35 2025 +0800
Load: Fix LoadPieceNode reading aligned Chunk causing OOM (#16132)
* Load: Fix LoadPieceNode reading aligned Chunk causing OOM
* update alignedChunkData
* add ByteBufferInputStream
---
.../plan/node/load/LoadTsFilePieceNode.java | 42 +++++++++++++++-
.../iotdb/db/storageengine/StorageEngine.java | 3 +-
.../db/storageengine/load/LoadTsFileManager.java | 6 ++-
.../load/splitter/AlignedChunkData.java | 57 +++++++++++++---------
.../splitter/BatchedAlignedValueChunkData.java | 24 ++-------
.../db/storageengine/load/splitter/ChunkData.java | 2 +-
6 files changed, 85 insertions(+), 49 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index f578148e014..1488e354ef1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -34,7 +34,6 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
@@ -152,7 +151,9 @@ public class LoadTsFilePieceNode extends WritePlanNode {
}
public static PlanNode deserialize(ByteBuffer buffer) {
- InputStream stream = new ByteArrayInputStream(buffer.array());
+ buffer = buffer.duplicate();
+ buffer.position(0);
+ ByteBufferInputStream stream = new ByteBufferInputStream(buffer);
try {
ReadWriteIOUtils.readShort(stream); // read PlanNodeType
final File tsFile = new File(ReadWriteIOUtils.readString(stream));
@@ -193,4 +194,41 @@ public class LoadTsFilePieceNode extends WritePlanNode {
public String toString() {
return "LoadTsFilePieceNode{" + "tsFile=" + tsFile + ", dataSize=" +
dataSize + '}';
}
+
+ public static class ByteBufferInputStream extends InputStream {
+ private final ByteBuffer buffer;
+
+ public ByteBufferInputStream(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ @Override
+ public int read() {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+ return buffer.get() & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+ int toRead = Math.min(len, buffer.remaining());
+ buffer.get(b, off, toRead);
+ return toRead;
+ }
+
+ public ByteBuffer read(int length) {
+ if (length < 0 || length > buffer.remaining()) {
+ throw new IllegalArgumentException("Invalid length for slicing: " +
length);
+ }
+ ByteBuffer slicedBuffer = buffer.slice();
+ slicedBuffer.limit(length);
+
+ buffer.position(buffer.position() + length);
+ return slicedBuffer;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index fb276150956..034d184bd03 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -81,6 +81,7 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.utils.FilePathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -947,7 +948,7 @@ public class StorageEngine implements IService {
try {
loadTsFileManager.writeToDataRegion(dataRegion, pieceNode, uuid);
- } catch (IOException e) {
+ } catch (IOException | PageException e) {
LOGGER.warn(
"IO error when writing piece node of TsFile {} to DataRegion {}.",
pieceNode.getTsFile(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 6d9b7fe2e1a..5876a884660 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
@@ -203,7 +204,7 @@ public class LoadTsFileManager {
}
public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode
pieceNode, String uuid)
- throws IOException {
+ throws IOException, PageException {
if (!uuid2WriterManager.containsKey(uuid)) {
synchronized (uuid2CleanupTask) {
final CleanupTask cleanupTask =
@@ -414,7 +415,8 @@ public class LoadTsFileManager {
* BatchedAlignedChunkData, it may result in no data for the time column
in the new file.
*/
@SuppressWarnings("squid:S3824")
- private void write(DataPartitionInfo partitionInfo, ChunkData chunkData)
throws IOException {
+ private void write(DataPartitionInfo partitionInfo, ChunkData chunkData)
+ throws IOException, PageException {
if (isClosed) {
throw new
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index ca7493d6ca0..ec6899d113b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.load.splitter;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.PageException;
@@ -66,16 +67,15 @@ public class AlignedChunkData implements ChunkData {
protected final IDeviceID device;
protected List<ChunkHeader> chunkHeaderList;
- protected final PublicBAOS byteStream;
- protected final DataOutputStream stream;
+ protected PublicBAOS byteStream;
+ protected DataOutputStream stream;
protected List<long[]> timeBatch;
protected long dataSize;
protected boolean needDecodeChunk;
protected List<Integer> pageNumbers;
protected Queue<Integer> satisfiedLengthQueue;
- private AlignedChunkWriterImpl chunkWriter;
- protected List<Chunk> chunkList;
+ protected ByteBuffer chunkData;
public AlignedChunkData(
@Nonnull final IDeviceID device,
@@ -143,14 +143,8 @@ public class AlignedChunkData implements ChunkData {
}
@Override
- public void writeToFileWriter(final TsFileIOWriter writer) throws
IOException {
- if (chunkList != null) {
- for (final Chunk chunk : chunkList) {
- writer.writeChunk(chunk);
- }
- } else {
- chunkWriter.writeToFileWriter(writer);
- }
+ public void writeToFileWriter(final TsFileIOWriter writer) throws
IOException, PageException {
+ writeTsFileData(writer);
}
public void addValueChunk(final ChunkHeader chunkHeader) {
@@ -167,6 +161,7 @@ public class AlignedChunkData implements ChunkData {
ReadWriteIOUtils.write(getType().ordinal(), stream);
ReadWriteIOUtils.write(isAligned(), stream);
serializeAttr(stream);
+ ReadWriteIOUtils.write(byteStream.size(), stream);
byteStream.writeTo(stream);
close();
}
@@ -291,26 +286,41 @@ public class AlignedChunkData implements ChunkData {
}
}
- protected void deserializeTsFileData(final InputStream stream) throws
IOException, PageException {
+ protected void writeTsFileData(TsFileIOWriter writer) throws IOException,
PageException {
+ final InputStream stream = new
LoadTsFilePieceNode.ByteBufferInputStream(chunkData);
if (needDecodeChunk) {
- buildChunkWriter(stream);
+ writeChunkToWriter(stream, writer);
+ } else {
+ writeEntireChunkToWriter(stream, writer);
+ }
+ }
+
+ protected void deserializeTsFileDataByte(final InputStream stream) throws
IOException {
+ final int size = ReadWriteIOUtils.readInt(stream);
+ if (stream instanceof LoadTsFilePieceNode.ByteBufferInputStream) {
+ this.chunkData = ((LoadTsFilePieceNode.ByteBufferInputStream)
stream).read(size);
} else {
- deserializeEntireChunk(stream);
+ byte[] data = new byte[size];
+ if (size != stream.read(data)) {
+ throw new IOException("TsFileData byte array read error, size
mismatch.");
+ }
+ this.chunkData = ByteBuffer.wrap(data);
}
}
- private void deserializeEntireChunk(final InputStream stream) throws
IOException {
- chunkList = new ArrayList<>();
+ private void writeEntireChunkToWriter(final InputStream stream, final
TsFileIOWriter writer)
+ throws IOException {
for (final ChunkHeader chunkHeader : chunkHeaderList) {
final ByteBuffer chunkData =
ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(stream));
final Statistics<? extends Serializable> statistics =
Statistics.deserialize(stream, chunkHeader.getDataType());
- chunkList.add(new Chunk(chunkHeader, chunkData, null, statistics));
+ writer.writeChunk(new Chunk(chunkHeader, chunkData, null, statistics));
}
}
- protected void buildChunkWriter(final InputStream stream) throws
IOException, PageException {
+ protected void writeChunkToWriter(final InputStream stream, final
TsFileIOWriter writer)
+ throws IOException, PageException {
final List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
IMeasurementSchema timeSchema = null;
for (final ChunkHeader chunkHeader : chunkHeaderList) {
@@ -330,17 +340,20 @@ public class AlignedChunkData implements ChunkData {
chunkHeader.getEncodingType(),
chunkHeader.getCompressionType()));
}
- chunkWriter = new AlignedChunkWriterImpl(timeSchema,
measurementSchemaList);
+ AlignedChunkWriterImpl chunkWriter =
+ new AlignedChunkWriterImpl(timeSchema, measurementSchemaList);
timeBatch = new ArrayList<>();
final int chunkHeaderSize = chunkHeaderList.size();
for (int i = 0; i < chunkHeaderSize; i++) {
- buildChunk(stream, chunkHeaderList.get(i), pageNumbers.get(i), i - 1, i
== 0);
+ buildChunk(chunkWriter, stream, chunkHeaderList.get(i),
pageNumbers.get(i), i - 1, i == 0);
}
timeBatch = null;
+ chunkWriter.writeToFileWriter(writer);
}
@SuppressWarnings({"squid:S6541", "squid:S3776"})
private void buildChunk(
+ final AlignedChunkWriterImpl chunkWriter,
final InputStream stream,
final ChunkHeader chunkHeader,
final int pageNumber,
@@ -463,7 +476,7 @@ public class AlignedChunkData implements ChunkData {
chunkData.needDecodeChunk = needDecodeChunk;
chunkData.chunkHeaderList = chunkHeaderList;
chunkData.pageNumbers = pageNumbers;
- chunkData.deserializeTsFileData(stream);
+ chunkData.deserializeTsFileDataByte(stream);
chunkData.dataSize = dataSize;
chunkData.close();
return chunkData;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
index 5a8e22aa8e2..be31a03d76a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
@@ -28,7 +28,6 @@ import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.statistics.Statistics;
-import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.utils.TsPrimitiveType;
@@ -41,8 +40,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
/**
* This class is used to be compatible with the new distribution of aligned
series in chunk group.
@@ -52,8 +49,6 @@ import java.util.List;
*/
public class BatchedAlignedValueChunkData extends AlignedChunkData {
- private List<ValueChunkWriter> valueChunkWriters;
-
// Used for splitter
public BatchedAlignedValueChunkData(AlignedChunkData alignedChunkData) {
super(alignedChunkData);
@@ -62,7 +57,6 @@ public class BatchedAlignedValueChunkData extends
AlignedChunkData {
// Used for deserialize
public BatchedAlignedValueChunkData(IDeviceID device, TTimePartitionSlot
timePartitionSlot) {
super(device, timePartitionSlot);
- valueChunkWriters = new ArrayList<>();
}
@Override
@@ -130,7 +124,8 @@ public class BatchedAlignedValueChunkData extends
AlignedChunkData {
}
@Override
- protected void buildChunkWriter(final InputStream stream) throws
IOException, PageException {
+ protected void writeChunkToWriter(final InputStream stream, final
TsFileIOWriter writer)
+ throws IOException, PageException {
for (int i = 0; i < chunkHeaderList.size(); i++) {
ChunkHeader chunkHeader = chunkHeaderList.get(i);
MeasurementSchema measurementSchema =
@@ -146,8 +141,8 @@ public class BatchedAlignedValueChunkData extends
AlignedChunkData {
measurementSchema.getType(),
measurementSchema.getEncodingType(),
measurementSchema.getValueEncoder());
- valueChunkWriters.add(valueChunkWriter);
buildValueChunkWriter(stream, chunkHeader, pageNumbers.get(i),
valueChunkWriter);
+ valueChunkWriter.writeToFileWriter(writer);
}
}
@@ -226,17 +221,4 @@ public class BatchedAlignedValueChunkData extends
AlignedChunkData {
valueChunkWriter.sealCurrentPage();
}
}
-
- @Override
- public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
- if (chunkList != null) {
- for (final Chunk chunk : chunkList) {
- writer.writeChunk(chunk);
- }
- } else {
- for (ValueChunkWriter valueChunkWriter : valueChunkWriters) {
- valueChunkWriter.writeToFileWriter(writer);
- }
- }
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
index 1e96fac3868..7cc5db02995 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
@@ -48,7 +48,7 @@ public interface ChunkData extends TsFileData {
void writeDecodePage(long[] times, Object[] values, int satisfiedLength)
throws IOException;
- void writeToFileWriter(TsFileIOWriter writer) throws IOException;
+ void writeToFileWriter(TsFileIOWriter writer) throws IOException,
PageException;
@Override
default TsFileDataType getType() {