This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 32910ff7b73 Load: Fix LoadPieceNode reading aligned Chunk causing OOM 
(#16132)
32910ff7b73 is described below

commit 32910ff7b73d5f8df3c22af103779b88fa3510bf
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Aug 12 15:30:35 2025 +0800

    Load: Fix LoadPieceNode reading aligned Chunk causing OOM (#16132)
    
    * Load: Fix LoadPieceNode reading aligned Chunk causing OOM
    
    * update alignedChunkData
    
    * add ByteBufferInputStream
---
 .../plan/node/load/LoadTsFilePieceNode.java        | 42 +++++++++++++++-
 .../iotdb/db/storageengine/StorageEngine.java      |  3 +-
 .../db/storageengine/load/LoadTsFileManager.java   |  6 ++-
 .../load/splitter/AlignedChunkData.java            | 57 +++++++++++++---------
 .../splitter/BatchedAlignedValueChunkData.java     | 24 ++-------
 .../db/storageengine/load/splitter/ChunkData.java  |  2 +-
 6 files changed, 85 insertions(+), 49 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
index f578148e014..1488e354ef1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java
@@ -34,7 +34,6 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -152,7 +151,9 @@ public class LoadTsFilePieceNode extends WritePlanNode {
   }
 
   public static PlanNode deserialize(ByteBuffer buffer) {
-    InputStream stream = new ByteArrayInputStream(buffer.array());
+    buffer = buffer.duplicate();
+    buffer.position(0);
+    ByteBufferInputStream stream = new ByteBufferInputStream(buffer);
     try {
       ReadWriteIOUtils.readShort(stream); // read PlanNodeType
       final File tsFile = new File(ReadWriteIOUtils.readString(stream));
@@ -193,4 +194,41 @@ public class LoadTsFilePieceNode extends WritePlanNode {
   public String toString() {
     return "LoadTsFilePieceNode{" + "tsFile=" + tsFile + ", dataSize=" + 
dataSize + '}';
   }
+
+  public static class ByteBufferInputStream extends InputStream {
+    private final ByteBuffer buffer;
+
+    public ByteBufferInputStream(ByteBuffer buffer) {
+      this.buffer = buffer;
+    }
+
+    @Override
+    public int read() {
+      if (!buffer.hasRemaining()) {
+        return -1;
+      }
+      return buffer.get() & 0xFF;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) {
+      if (!buffer.hasRemaining()) {
+        return -1;
+      }
+      int toRead = Math.min(len, buffer.remaining());
+      buffer.get(b, off, toRead);
+      return toRead;
+    }
+
+    public ByteBuffer read(int length) {
+      if (length < 0 || length > buffer.remaining()) {
+        throw new IllegalArgumentException("Invalid length for slicing: " + 
length);
+      }
+      ByteBuffer slicedBuffer = buffer.slice();
+      slicedBuffer.limit(length);
+
+      buffer.position(buffer.position() + length);
+      return slicedBuffer;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index fb276150956..034d184bd03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -81,6 +81,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.exception.write.PageException;
 import org.apache.tsfile.utils.FilePathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -947,7 +948,7 @@ public class StorageEngine implements IService {
 
     try {
       loadTsFileManager.writeToDataRegion(dataRegion, pieceNode, uuid);
-    } catch (IOException e) {
+    } catch (IOException | PageException e) {
       LOGGER.warn(
           "IO error when writing piece node of TsFile {} to DataRegion {}.",
           pieceNode.getTsFile(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index 6d9b7fe2e1a..5876a884660 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.PageException;
 import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
@@ -203,7 +204,7 @@ public class LoadTsFileManager {
   }
 
   public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode 
pieceNode, String uuid)
-      throws IOException {
+      throws IOException, PageException {
     if (!uuid2WriterManager.containsKey(uuid)) {
       synchronized (uuid2CleanupTask) {
         final CleanupTask cleanupTask =
@@ -414,7 +415,8 @@ public class LoadTsFileManager {
      * BatchedAlignedChunkData, it may result in no data for the time column 
in the new file.
      */
     @SuppressWarnings("squid:S3824")
-    private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) 
throws IOException {
+    private void write(DataPartitionInfo partitionInfo, ChunkData chunkData)
+        throws IOException, PageException {
       if (isClosed) {
         throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
index ca7493d6ca0..ec6899d113b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/AlignedChunkData.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.load.splitter;
 
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.write.PageException;
@@ -66,16 +67,15 @@ public class AlignedChunkData implements ChunkData {
   protected final IDeviceID device;
   protected List<ChunkHeader> chunkHeaderList;
 
-  protected final PublicBAOS byteStream;
-  protected final DataOutputStream stream;
+  protected PublicBAOS byteStream;
+  protected DataOutputStream stream;
   protected List<long[]> timeBatch;
   protected long dataSize;
   protected boolean needDecodeChunk;
   protected List<Integer> pageNumbers;
   protected Queue<Integer> satisfiedLengthQueue;
 
-  private AlignedChunkWriterImpl chunkWriter;
-  protected List<Chunk> chunkList;
+  protected ByteBuffer chunkData;
 
   public AlignedChunkData(
       @Nonnull final IDeviceID device,
@@ -143,14 +143,8 @@ public class AlignedChunkData implements ChunkData {
   }
 
   @Override
-  public void writeToFileWriter(final TsFileIOWriter writer) throws 
IOException {
-    if (chunkList != null) {
-      for (final Chunk chunk : chunkList) {
-        writer.writeChunk(chunk);
-      }
-    } else {
-      chunkWriter.writeToFileWriter(writer);
-    }
+  public void writeToFileWriter(final TsFileIOWriter writer) throws 
IOException, PageException {
+    writeTsFileData(writer);
   }
 
   public void addValueChunk(final ChunkHeader chunkHeader) {
@@ -167,6 +161,7 @@ public class AlignedChunkData implements ChunkData {
     ReadWriteIOUtils.write(getType().ordinal(), stream);
     ReadWriteIOUtils.write(isAligned(), stream);
     serializeAttr(stream);
+    ReadWriteIOUtils.write(byteStream.size(), stream);
     byteStream.writeTo(stream);
     close();
   }
@@ -291,26 +286,41 @@ public class AlignedChunkData implements ChunkData {
     }
   }
 
-  protected void deserializeTsFileData(final InputStream stream) throws 
IOException, PageException {
+  protected void writeTsFileData(TsFileIOWriter writer) throws IOException, 
PageException {
+    final InputStream stream = new 
LoadTsFilePieceNode.ByteBufferInputStream(chunkData);
     if (needDecodeChunk) {
-      buildChunkWriter(stream);
+      writeChunkToWriter(stream, writer);
+    } else {
+      writeEntireChunkToWriter(stream, writer);
+    }
+  }
+
+  protected void deserializeTsFileDataByte(final InputStream stream) throws 
IOException {
+    final int size = ReadWriteIOUtils.readInt(stream);
+    if (stream instanceof LoadTsFilePieceNode.ByteBufferInputStream) {
+      this.chunkData = ((LoadTsFilePieceNode.ByteBufferInputStream) 
stream).read(size);
     } else {
-      deserializeEntireChunk(stream);
+      byte[] data = new byte[size];
+      if (size != stream.read(data)) {
+        throw new IOException("TsFileData byte array read error, size 
mismatch.");
+      }
+      this.chunkData = ByteBuffer.wrap(data);
     }
   }
 
-  private void deserializeEntireChunk(final InputStream stream) throws 
IOException {
-    chunkList = new ArrayList<>();
+  private void writeEntireChunkToWriter(final InputStream stream, final 
TsFileIOWriter writer)
+      throws IOException {
     for (final ChunkHeader chunkHeader : chunkHeaderList) {
       final ByteBuffer chunkData =
           
ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(stream));
       final Statistics<? extends Serializable> statistics =
           Statistics.deserialize(stream, chunkHeader.getDataType());
-      chunkList.add(new Chunk(chunkHeader, chunkData, null, statistics));
+      writer.writeChunk(new Chunk(chunkHeader, chunkData, null, statistics));
     }
   }
 
-  protected void buildChunkWriter(final InputStream stream) throws 
IOException, PageException {
+  protected void writeChunkToWriter(final InputStream stream, final 
TsFileIOWriter writer)
+      throws IOException, PageException {
     final List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
     IMeasurementSchema timeSchema = null;
     for (final ChunkHeader chunkHeader : chunkHeaderList) {
@@ -330,17 +340,20 @@ public class AlignedChunkData implements ChunkData {
               chunkHeader.getEncodingType(),
               chunkHeader.getCompressionType()));
     }
-    chunkWriter = new AlignedChunkWriterImpl(timeSchema, 
measurementSchemaList);
+    AlignedChunkWriterImpl chunkWriter =
+        new AlignedChunkWriterImpl(timeSchema, measurementSchemaList);
     timeBatch = new ArrayList<>();
     final int chunkHeaderSize = chunkHeaderList.size();
     for (int i = 0; i < chunkHeaderSize; i++) {
-      buildChunk(stream, chunkHeaderList.get(i), pageNumbers.get(i), i - 1, i 
== 0);
+      buildChunk(chunkWriter, stream, chunkHeaderList.get(i), 
pageNumbers.get(i), i - 1, i == 0);
     }
     timeBatch = null;
+    chunkWriter.writeToFileWriter(writer);
   }
 
   @SuppressWarnings({"squid:S6541", "squid:S3776"})
   private void buildChunk(
+      final AlignedChunkWriterImpl chunkWriter,
       final InputStream stream,
       final ChunkHeader chunkHeader,
       final int pageNumber,
@@ -463,7 +476,7 @@ public class AlignedChunkData implements ChunkData {
     chunkData.needDecodeChunk = needDecodeChunk;
     chunkData.chunkHeaderList = chunkHeaderList;
     chunkData.pageNumbers = pageNumbers;
-    chunkData.deserializeTsFileData(stream);
+    chunkData.deserializeTsFileDataByte(stream);
     chunkData.dataSize = dataSize;
     chunkData.close();
     return chunkData;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
index 5a8e22aa8e2..be31a03d76a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/BatchedAlignedValueChunkData.java
@@ -28,7 +28,6 @@ import org.apache.tsfile.file.header.ChunkHeader;
 import org.apache.tsfile.file.header.PageHeader;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.statistics.Statistics;
-import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.utils.TsPrimitiveType;
@@ -41,8 +40,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * This class is used to be compatible with the new distribution of aligned 
series in chunk group.
@@ -52,8 +49,6 @@ import java.util.List;
  */
 public class BatchedAlignedValueChunkData extends AlignedChunkData {
 
-  private List<ValueChunkWriter> valueChunkWriters;
-
   // Used for splitter
   public BatchedAlignedValueChunkData(AlignedChunkData alignedChunkData) {
     super(alignedChunkData);
@@ -62,7 +57,6 @@ public class BatchedAlignedValueChunkData extends 
AlignedChunkData {
   // Used for deserialize
   public BatchedAlignedValueChunkData(IDeviceID device, TTimePartitionSlot 
timePartitionSlot) {
     super(device, timePartitionSlot);
-    valueChunkWriters = new ArrayList<>();
   }
 
   @Override
@@ -130,7 +124,8 @@ public class BatchedAlignedValueChunkData extends 
AlignedChunkData {
   }
 
   @Override
-  protected void buildChunkWriter(final InputStream stream) throws 
IOException, PageException {
+  protected void writeChunkToWriter(final InputStream stream, final 
TsFileIOWriter writer)
+      throws IOException, PageException {
     for (int i = 0; i < chunkHeaderList.size(); i++) {
       ChunkHeader chunkHeader = chunkHeaderList.get(i);
       MeasurementSchema measurementSchema =
@@ -146,8 +141,8 @@ public class BatchedAlignedValueChunkData extends 
AlignedChunkData {
               measurementSchema.getType(),
               measurementSchema.getEncodingType(),
               measurementSchema.getValueEncoder());
-      valueChunkWriters.add(valueChunkWriter);
       buildValueChunkWriter(stream, chunkHeader, pageNumbers.get(i), 
valueChunkWriter);
+      valueChunkWriter.writeToFileWriter(writer);
     }
   }
 
@@ -226,17 +221,4 @@ public class BatchedAlignedValueChunkData extends 
AlignedChunkData {
       valueChunkWriter.sealCurrentPage();
     }
   }
-
-  @Override
-  public void writeToFileWriter(TsFileIOWriter writer) throws IOException {
-    if (chunkList != null) {
-      for (final Chunk chunk : chunkList) {
-        writer.writeChunk(chunk);
-      }
-    } else {
-      for (ValueChunkWriter valueChunkWriter : valueChunkWriters) {
-        valueChunkWriter.writeToFileWriter(writer);
-      }
-    }
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
index 1e96fac3868..7cc5db02995 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/ChunkData.java
@@ -48,7 +48,7 @@ public interface ChunkData extends TsFileData {
 
   void writeDecodePage(long[] times, Object[] values, int satisfiedLength) 
throws IOException;
 
-  void writeToFileWriter(TsFileIOWriter writer) throws IOException;
+  void writeToFileWriter(TsFileIOWriter writer) throws IOException, 
PageException;
 
   @Override
   default TsFileDataType getType() {

Reply via email to