This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 436dd370dbb [To dev/1.3] Load: Fix LoadPieceNode reading aligned Chunk 
causing OOM (#16152)
436dd370dbb is described below

commit 436dd370dbbb948d790af884c90787e7495d2215
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Aug 13 10:14:29 2025 +0800

    [To dev/1.3] Load: Fix LoadPieceNode reading aligned Chunk causing OOM 
(#16152)
---
 .../iotdb/db/storageengine/StorageEngine.java      |  3 +-
 .../db/storageengine/load/LoadTsFileManager.java   |  6 ++-
 .../load/splitter/AlignedChunkData.java            | 59 +++++++++++++---------
 .../splitter/BatchedAlignedValueChunkData.java     | 24 ++-------
 .../db/storageengine/load/splitter/ChunkData.java  |  2 +-
 5 files changed, 45 insertions(+), 49 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 6fbf870617a..0419f26d3c4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -80,6 +80,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.exception.write.PageException;
 import org.apache.tsfile.utils.FilePathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -938,7 +939,7 @@ public class StorageEngine implements IService {
 
     try {
       loadTsFileManager.writeToDataRegion(dataRegion, pieceNode, uuid);
-    } catch (IOException e) {
+    } catch (IOException | PageException e) {
       LOGGER.warn(
           "IO error when writing piece node of TsFile {} to DataRegion {}.",
           pieceNode.getTsFile(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 28c952b3d72..58d10a87697 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -54,6 +54,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.PageException;
 import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
@@ -199,7 +200,7 @@ public class LoadTsFileManager {
   }
 
   public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode 
pieceNode, String uuid)
-      throws IOException {
+      throws IOException, PageException {
     if (!uuid2WriterManager.containsKey(uuid)) {
       synchronized (uuid2CleanupTask) {
         final CleanupTask cleanupTask =
@@ -403,7 +404,8 @@ public class LoadTsFileManager {
      * BatchedAlignedChunkData, it may result in no data for the time column 
in the new file.
      */
     @SuppressWarnings("squid:S3824")
-    private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) 
throws IOException {
+    private void write(DataPartitionInfo partitionInfo, ChunkData chunkData)
+        throws IOException, PageException {
       if (isClosed) {
         throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index 489f7904b9d..d898563fcf2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -41,6 +41,9 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.apache.tsfile.write.writer.TsFileIOWriter;
 
+import javax.annotation.Nonnull;
+
+import java.io.ByteArrayInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -63,16 +66,15 @@ public class AlignedChunkData implements ChunkData {
   protected final String device;
   protected List<ChunkHeader> chunkHeaderList;
 
-  protected final PublicBAOS byteStream;
-  protected final DataOutputStream stream;
+  public PublicBAOS byteStream;
+  public DataOutputStream stream;
   protected List<long[]> timeBatch;
   protected long dataSize;
   protected boolean needDecodeChunk;
   protected List<Integer> pageNumbers;
   protected Queue<Integer> satisfiedLengthQueue;
 
-  private AlignedChunkWriterImpl chunkWriter;
-  protected List<Chunk> chunkList;
+  public byte[] chunkData;
 
   public AlignedChunkData(
       final String device,
@@ -84,14 +86,15 @@ public class AlignedChunkData implements ChunkData {
     addAttrDataSize();
   }
 
-  protected AlignedChunkData(AlignedChunkData alignedChunkData) {
+  protected AlignedChunkData(final AlignedChunkData alignedChunkData) {
     this(alignedChunkData.device, alignedChunkData.timePartitionSlot);
     this.satisfiedLengthQueue = new 
LinkedList<>(alignedChunkData.satisfiedLengthQueue);
     this.needDecodeChunk = alignedChunkData.needDecodeChunk;
     addAttrDataSize();
   }
 
-  protected AlignedChunkData(String device, TTimePartitionSlot 
timePartitionSlot) {
+  protected AlignedChunkData(
+      @Nonnull final String device, final TTimePartitionSlot 
timePartitionSlot) {
     this.dataSize = 0;
     this.device = device;
     this.chunkHeaderList = new ArrayList<>();
@@ -141,14 +144,8 @@ public class AlignedChunkData implements ChunkData {
   }
 
   @Override
-  public void writeToFileWriter(final TsFileIOWriter writer) throws 
IOException {
-    if (chunkList != null) {
-      for (final Chunk chunk : chunkList) {
-        writer.writeChunk(chunk);
-      }
-    } else {
-      chunkWriter.writeToFileWriter(writer);
-    }
+  public void writeToFileWriter(final TsFileIOWriter writer) throws 
IOException, PageException {
+    deserializeTsFileData(writer);
   }
 
   public void addValueChunk(final ChunkHeader chunkHeader) {
@@ -165,6 +162,7 @@ public class AlignedChunkData implements ChunkData {
     ReadWriteIOUtils.write(isModification(), stream);
     ReadWriteIOUtils.write(isAligned(), stream);
     serializeAttr(stream);
+    ReadWriteIOUtils.write(byteStream.size(), stream);
     byteStream.writeTo(stream);
     close();
   }
@@ -286,26 +284,36 @@ public class AlignedChunkData implements ChunkData {
     }
   }
 
-  protected void deserializeTsFileData(final InputStream stream) throws 
IOException, PageException {
+  protected void deserializeTsFileData(TsFileIOWriter writer) throws 
IOException, PageException {
+    final InputStream stream = new ByteArrayInputStream(chunkData);
     if (needDecodeChunk) {
-      buildChunkWriter(stream);
+      buildChunkWriter(stream, writer);
     } else {
-      deserializeEntireChunk(stream);
+      deserializeEntireChunk(stream, writer);
     }
   }
 
-  private void deserializeEntireChunk(final InputStream stream) throws 
IOException {
-    chunkList = new ArrayList<>();
+  protected void deserializeTsFileDataByte(final InputStream stream) throws 
IOException {
+    final int size = ReadWriteIOUtils.readInt(stream);
+    this.chunkData = new byte[size];
+    if (size != stream.read(chunkData)) {
+      throw new IOException("TsFileData byte array read error, size 
mismatch.");
+    }
+  }
+
+  private void deserializeEntireChunk(final InputStream stream, final 
TsFileIOWriter writer)
+      throws IOException {
     for (final ChunkHeader chunkHeader : chunkHeaderList) {
       final ByteBuffer chunkData =
           
ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(stream));
       final Statistics<? extends Serializable> statistics =
           Statistics.deserialize(stream, chunkHeader.getDataType());
-      chunkList.add(new Chunk(chunkHeader, chunkData, null, statistics));
+      writer.writeChunk(new Chunk(chunkHeader, chunkData, null, statistics));
     }
   }
 
-  protected void buildChunkWriter(final InputStream stream) throws 
IOException, PageException {
+  protected void buildChunkWriter(final InputStream stream, final 
TsFileIOWriter writer)
+      throws IOException, PageException {
     final List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
     IMeasurementSchema timeSchema = null;
     for (final ChunkHeader chunkHeader : chunkHeaderList) {
@@ -325,17 +333,20 @@ public class AlignedChunkData implements ChunkData {
               chunkHeader.getEncodingType(),
               chunkHeader.getCompressionType()));
     }
-    chunkWriter = new AlignedChunkWriterImpl(timeSchema, 
measurementSchemaList);
+    AlignedChunkWriterImpl chunkWriter =
+        new AlignedChunkWriterImpl(timeSchema, measurementSchemaList);
     timeBatch = new ArrayList<>();
     final int chunkHeaderSize = chunkHeaderList.size();
     for (int i = 0; i < chunkHeaderSize; i++) {
-      buildChunk(stream, chunkHeaderList.get(i), pageNumbers.get(i), i - 1, i 
== 0);
+      buildChunk(chunkWriter, stream, chunkHeaderList.get(i), 
pageNumbers.get(i), i - 1, i == 0);
     }
     timeBatch = null;
+    chunkWriter.writeToFileWriter(writer);
   }
 
   @SuppressWarnings({"squid:S6541", "squid:S3776"})
   private void buildChunk(
+      final AlignedChunkWriterImpl chunkWriter,
       final InputStream stream,
       final ChunkHeader chunkHeader,
       final int pageNumber,
@@ -454,7 +465,7 @@ public class AlignedChunkData implements ChunkData {
     chunkData.needDecodeChunk = needDecodeChunk;
     chunkData.chunkHeaderList = chunkHeaderList;
     chunkData.pageNumbers = pageNumbers;
-    chunkData.deserializeTsFileData(stream);
+    chunkData.deserializeTsFileDataByte(stream);
     chunkData.dataSize = dataSize;
     chunkData.close();
     return chunkData;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
index b80c51b9212..c34659399b5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
@@ -27,7 +27,6 @@ import org.apache.tsfile.exception.write.PageException;
 import org.apache.tsfile.file.header.ChunkHeader;
 import org.apache.tsfile.file.header.PageHeader;
 import org.apache.tsfile.file.metadata.statistics.Statistics;
-import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.utils.TsPrimitiveType;
@@ -40,8 +39,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * This class is used to be compatible with the new distribution of aligned 
series in chunk group.
@@ -51,8 +48,6 @@ import java.util.List;
  */
 public class BatchedAlignedValueChunkData extends AlignedChunkData {
 
-  private List<ValueChunkWriter> valueChunkWriters;
-
   // Used for splitter
   public BatchedAlignedValueChunkData(AlignedChunkData alignedChunkData) {
     super(alignedChunkData);
@@ -61,7 +56,6 @@ public class BatchedAlignedValueChunkData extends 
AlignedChunkData {
   // Used for deserialize
   public BatchedAlignedValueChunkData(String device, TTimePartitionSlot 
timePartitionSlot) {
     super(device, timePartitionSlot);
-    valueChunkWriters = new ArrayList<>();
   }
 
   @Override
@@ -129,7 +123,8 @@ public class BatchedAlignedValueChunkData extends 
AlignedChunkData {
   }
 
   @Override
-  protected void buildChunkWriter(final InputStream stream) throws 
IOException, PageException {
+  protected void buildChunkWriter(final InputStream stream, final 
TsFileIOWriter writer)
+      throws IOException, PageException {
     for (int i = 0; i < chunkHeaderList.size(); i++) {
       ChunkHeader chunkHeader = chunkHeaderList.get(i);
       MeasurementSchema measurementSchema =
@@ -145,8 +140,8 @@ public class BatchedAlignedValueChunkData extends 
AlignedChunkData {
               measurementSchema.getType(),
               measurementSchema.getEncodingType(),
               measurementSchema.getValueEncoder());
-      valueChunkWriters.add(valueChunkWriter);
       buildValueChunkWriter(stream, chunkHeader, pageNumbers.get(i), 
valueChunkWriter);
+      valueChunkWriter.writeToFileWriter(writer);
     }
   }
 
@@ -225,17 +220,4 @@ public class BatchedAlignedValueChunkData extends 
AlignedChunkData {
       valueChunkWriter.sealCurrentPage();
     }
   }
-
-  @Override
-  public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
-    if (chunkList != null) {
-      for (final Chunk chunk : chunkList) {
-        writer.writeChunk(chunk);
-      }
-    } else {
-      for (ValueChunkWriter valueChunkWriter : valueChunkWriters) {
-        valueChunkWriter.writeToFileWriter(writer);
-      }
-    }
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
index 3b16a9d660c..56988b2cddb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
@@ -47,7 +47,7 @@ public interface ChunkData extends TsFileData {
 
   void writeDecodePage(long[] times, Object[] values, int satisfiedLength) 
throws IOException;
 
-  void writeToFileWriter(TsFileIOWriter writer) throws IOException;
+  void writeToFileWriter(TsFileIOWriter writer) throws IOException, 
PageException;
 
   @Override
   default boolean isModification() {

Reply via email to