This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 436dd370dbb [To dev/1.3] Load: Fix LoadPieceNode reading aligned Chunk
causing OOM (#16152)
436dd370dbb is described below
commit 436dd370dbbb948d790af884c90787e7495d2215
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Aug 13 10:14:29 2025 +0800
[To dev/1.3] Load: Fix LoadPieceNode reading aligned Chunk causing OOM
(#16152)
---
.../iotdb/db/storageengine/StorageEngine.java | 3 +-
.../db/storageengine/load/LoadTsFileManager.java | 6 ++-
.../load/splitter/AlignedChunkData.java | 59 +++++++++++++---------
.../splitter/BatchedAlignedValueChunkData.java | 24 ++-------
.../db/storageengine/load/splitter/ChunkData.java | 2 +-
5 files changed, 45 insertions(+), 49 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 6fbf870617a..0419f26d3c4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -80,6 +80,7 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.utils.FilePathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -938,7 +939,7 @@ public class StorageEngine implements IService {
try {
loadTsFileManager.writeToDataRegion(dataRegion, pieceNode, uuid);
- } catch (IOException e) {
+ } catch (IOException | PageException e) {
LOGGER.warn(
"IO error when writing piece node of TsFile {} to DataRegion {}.",
pieceNode.getTsFile(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 28c952b3d72..58d10a87697 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
@@ -199,7 +200,7 @@ public class LoadTsFileManager {
}
public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode
pieceNode, String uuid)
- throws IOException {
+ throws IOException, PageException {
if (!uuid2WriterManager.containsKey(uuid)) {
synchronized (uuid2CleanupTask) {
final CleanupTask cleanupTask =
@@ -403,7 +404,8 @@ public class LoadTsFileManager {
* BatchedAlignedChunkData, it may result in no data for the time column
in the new file.
*/
@SuppressWarnings("squid:S3824")
- private void write(DataPartitionInfo partitionInfo, ChunkData chunkData)
throws IOException {
+ private void write(DataPartitionInfo partitionInfo, ChunkData chunkData)
+ throws IOException, PageException {
if (isClosed) {
throw new
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index 489f7904b9d..d898563fcf2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -41,6 +41,9 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.writer.TsFileIOWriter;
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -63,16 +66,15 @@ public class AlignedChunkData implements ChunkData {
protected final String device;
protected List<ChunkHeader> chunkHeaderList;
- protected final PublicBAOS byteStream;
- protected final DataOutputStream stream;
+ public PublicBAOS byteStream;
+ public DataOutputStream stream;
protected List<long[]> timeBatch;
protected long dataSize;
protected boolean needDecodeChunk;
protected List<Integer> pageNumbers;
protected Queue<Integer> satisfiedLengthQueue;
- private AlignedChunkWriterImpl chunkWriter;
- protected List<Chunk> chunkList;
+ public byte[] chunkData;
public AlignedChunkData(
final String device,
@@ -84,14 +86,15 @@ public class AlignedChunkData implements ChunkData {
addAttrDataSize();
}
- protected AlignedChunkData(AlignedChunkData alignedChunkData) {
+ protected AlignedChunkData(final AlignedChunkData alignedChunkData) {
this(alignedChunkData.device, alignedChunkData.timePartitionSlot);
this.satisfiedLengthQueue = new
LinkedList<>(alignedChunkData.satisfiedLengthQueue);
this.needDecodeChunk = alignedChunkData.needDecodeChunk;
addAttrDataSize();
}
- protected AlignedChunkData(String device, TTimePartitionSlot
timePartitionSlot) {
+ protected AlignedChunkData(
+ @Nonnull final String device, final TTimePartitionSlot
timePartitionSlot) {
this.dataSize = 0;
this.device = device;
this.chunkHeaderList = new ArrayList<>();
@@ -141,14 +144,8 @@ public class AlignedChunkData implements ChunkData {
}
@Override
- public void writeToFileWriter(final TsFileIOWriter writer) throws
IOException {
- if (chunkList != null) {
- for (final Chunk chunk : chunkList) {
- writer.writeChunk(chunk);
- }
- } else {
- chunkWriter.writeToFileWriter(writer);
- }
+ public void writeToFileWriter(final TsFileIOWriter writer) throws
IOException, PageException {
+ deserializeTsFileData(writer);
}
public void addValueChunk(final ChunkHeader chunkHeader) {
@@ -165,6 +162,7 @@ public class AlignedChunkData implements ChunkData {
ReadWriteIOUtils.write(isModification(), stream);
ReadWriteIOUtils.write(isAligned(), stream);
serializeAttr(stream);
+ ReadWriteIOUtils.write(byteStream.size(), stream);
byteStream.writeTo(stream);
close();
}
@@ -286,26 +284,36 @@ public class AlignedChunkData implements ChunkData {
}
}
- protected void deserializeTsFileData(final InputStream stream) throws
IOException, PageException {
+ protected void deserializeTsFileData(TsFileIOWriter writer) throws
IOException, PageException {
+ final InputStream stream = new ByteArrayInputStream(chunkData);
if (needDecodeChunk) {
- buildChunkWriter(stream);
+ buildChunkWriter(stream, writer);
} else {
- deserializeEntireChunk(stream);
+ deserializeEntireChunk(stream, writer);
}
}
- private void deserializeEntireChunk(final InputStream stream) throws
IOException {
- chunkList = new ArrayList<>();
+ protected void deserializeTsFileDataByte(final InputStream stream) throws
IOException {
+ final int size = ReadWriteIOUtils.readInt(stream);
+ this.chunkData = new byte[size];
+ if (size != stream.read(chunkData)) {
+ throw new IOException("TsFileData byte array read error, size
mismatch.");
+ }
+ }
+
+ private void deserializeEntireChunk(final InputStream stream, final
TsFileIOWriter writer)
+ throws IOException {
for (final ChunkHeader chunkHeader : chunkHeaderList) {
final ByteBuffer chunkData =
ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(stream));
final Statistics<? extends Serializable> statistics =
Statistics.deserialize(stream, chunkHeader.getDataType());
- chunkList.add(new Chunk(chunkHeader, chunkData, null, statistics));
+ writer.writeChunk(new Chunk(chunkHeader, chunkData, null, statistics));
}
}
- protected void buildChunkWriter(final InputStream stream) throws
IOException, PageException {
+ protected void buildChunkWriter(final InputStream stream, final
TsFileIOWriter writer)
+ throws IOException, PageException {
final List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
IMeasurementSchema timeSchema = null;
for (final ChunkHeader chunkHeader : chunkHeaderList) {
@@ -325,17 +333,20 @@ public class AlignedChunkData implements ChunkData {
chunkHeader.getEncodingType(),
chunkHeader.getCompressionType()));
}
- chunkWriter = new AlignedChunkWriterImpl(timeSchema,
measurementSchemaList);
+ AlignedChunkWriterImpl chunkWriter =
+ new AlignedChunkWriterImpl(timeSchema, measurementSchemaList);
timeBatch = new ArrayList<>();
final int chunkHeaderSize = chunkHeaderList.size();
for (int i = 0; i < chunkHeaderSize; i++) {
- buildChunk(stream, chunkHeaderList.get(i), pageNumbers.get(i), i - 1, i
== 0);
+ buildChunk(chunkWriter, stream, chunkHeaderList.get(i),
pageNumbers.get(i), i - 1, i == 0);
}
timeBatch = null;
+ chunkWriter.writeToFileWriter(writer);
}
@SuppressWarnings({"squid:S6541", "squid:S3776"})
private void buildChunk(
+ final AlignedChunkWriterImpl chunkWriter,
final InputStream stream,
final ChunkHeader chunkHeader,
final int pageNumber,
@@ -454,7 +465,7 @@ public class AlignedChunkData implements ChunkData {
chunkData.needDecodeChunk = needDecodeChunk;
chunkData.chunkHeaderList = chunkHeaderList;
chunkData.pageNumbers = pageNumbers;
- chunkData.deserializeTsFileData(stream);
+ chunkData.deserializeTsFileDataByte(stream);
chunkData.dataSize = dataSize;
chunkData.close();
return chunkData;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
index b80c51b9212..c34659399b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
@@ -27,7 +27,6 @@ import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.statistics.Statistics;
-import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.utils.TsPrimitiveType;
@@ -40,8 +39,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
/**
* This class is used to be compatible with the new distribution of aligned
series in chunk group.
@@ -51,8 +48,6 @@ import java.util.List;
*/
public class BatchedAlignedValueChunkData extends AlignedChunkData {
- private List<ValueChunkWriter> valueChunkWriters;
-
// Used for splitter
public BatchedAlignedValueChunkData(AlignedChunkData alignedChunkData) {
super(alignedChunkData);
@@ -61,7 +56,6 @@ public class BatchedAlignedValueChunkData extends
AlignedChunkData {
// Used for deserialize
public BatchedAlignedValueChunkData(String device, TTimePartitionSlot
timePartitionSlot) {
super(device, timePartitionSlot);
- valueChunkWriters = new ArrayList<>();
}
@Override
@@ -129,7 +123,8 @@ public class BatchedAlignedValueChunkData extends
AlignedChunkData {
}
@Override
- protected void buildChunkWriter(final InputStream stream) throws
IOException, PageException {
+ protected void buildChunkWriter(final InputStream stream, final
TsFileIOWriter writer)
+ throws IOException, PageException {
for (int i = 0; i < chunkHeaderList.size(); i++) {
ChunkHeader chunkHeader = chunkHeaderList.get(i);
MeasurementSchema measurementSchema =
@@ -145,8 +140,8 @@ public class BatchedAlignedValueChunkData extends
AlignedChunkData {
measurementSchema.getType(),
measurementSchema.getEncodingType(),
measurementSchema.getValueEncoder());
- valueChunkWriters.add(valueChunkWriter);
buildValueChunkWriter(stream, chunkHeader, pageNumbers.get(i),
valueChunkWriter);
+ valueChunkWriter.writeToFileWriter(writer);
}
}
@@ -225,17 +220,4 @@ public class BatchedAlignedValueChunkData extends
AlignedChunkData {
valueChunkWriter.sealCurrentPage();
}
}
-
- @Override
- public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
- if (chunkList != null) {
- for (final Chunk chunk : chunkList) {
- writer.writeChunk(chunk);
- }
- } else {
- for (ValueChunkWriter valueChunkWriter : valueChunkWriters) {
- valueChunkWriter.writeToFileWriter(writer);
- }
- }
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
index 3b16a9d660c..56988b2cddb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
@@ -47,7 +47,7 @@ public interface ChunkData extends TsFileData {
void writeDecodePage(long[] times, Object[] values, int satisfiedLength)
throws IOException;
- void writeToFileWriter(TsFileIOWriter writer) throws IOException;
+ void writeToFileWriter(TsFileIOWriter writer) throws IOException,
PageException;
@Override
default boolean isModification() {