This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bd5d999148e release chunk or page memory when deserializing chunk or 
page (#12340)
bd5d999148e is described below

commit bd5d999148e3951335ed8b8992649e4efa59f0f1
Author: 周沛辰 <[email protected]>
AuthorDate: Fri Apr 19 10:50:11 2024 +0800

    release chunk or page memory when deserializing chunk or page (#12340)
---
 .../fast/AlignedSeriesCompactionExecutor.java      | 68 +++++-----------------
 .../fast/NonAlignedSeriesCompactionExecutor.java   | 21 ++-----
 .../executor/fast/element/AlignedPageElement.java  |  7 ++-
 .../fast/element/NonAlignedPageElement.java        |  3 +-
 .../fast/reader/CompactionChunkReader.java         | 33 ++++++++++-
 5 files changed, 60 insertions(+), 72 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
index 9cfd7cee948..f403fb9fc25 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
@@ -36,7 +36,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.utils.ModificationUtils;
 import org.apache.iotdb.tsfile.exception.write.PageException;
-import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
@@ -265,88 +264,53 @@ public class AlignedSeriesCompactionExecutor extends 
SeriesCompactionExecutor {
   @SuppressWarnings("squid:S3776")
   void deserializeChunkIntoPageQueue(ChunkMetadataElement 
chunkMetadataElement) throws IOException {
     updateSummary(chunkMetadataElement, ChunkStatus.DESERIALIZE_CHUNK);
-    List<PageHeader> timePageHeaders = new ArrayList<>();
-    List<ByteBuffer> compressedTimePageDatas = new ArrayList<>();
-    List<List<PageHeader>> valuePageHeaders = new ArrayList<>();
-    List<List<ByteBuffer>> compressedValuePageDatas = new ArrayList<>();
 
     // deserialize time chunk
     Chunk timeChunk = chunkMetadataElement.chunk;
 
     CompactionChunkReader chunkReader = new CompactionChunkReader(timeChunk);
-    ByteBuffer chunkDataBuffer = timeChunk.getData();
-    ChunkHeader chunkHeader = timeChunk.getHeader();
-    while (chunkDataBuffer.remaining() > 0) {
-      // deserialize a PageHeader from chunkDataBuffer
-      PageHeader pageHeader;
-      if (((byte) (chunkHeader.getChunkType() & 0x3F)) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
-        pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
timeChunk.getChunkStatistic());
-      } else {
-        pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunkHeader.getDataType());
-      }
-      ByteBuffer compressedPageData = 
chunkReader.readPageDataWithoutUncompressing(pageHeader);
-      timePageHeaders.add(pageHeader);
-      compressedTimePageDatas.add(compressedPageData);
-    }
+    List<Pair<PageHeader, ByteBuffer>> timePages = 
chunkReader.readPageDataWithoutUncompressing();
 
     // deserialize value chunks
+    List<List<Pair<PageHeader, ByteBuffer>>> valuePagesList = new 
ArrayList<>();
     List<Chunk> valueChunks = chunkMetadataElement.valueChunks;
     for (int i = 0; i < valueChunks.size(); i++) {
       Chunk valueChunk = valueChunks.get(i);
       if (valueChunk == null) {
         // value chunk has been deleted completely
-        valuePageHeaders.add(null);
-        compressedValuePageDatas.add(null);
+        valuePagesList.add(null);
         continue;
       }
+
       chunkReader = new CompactionChunkReader(valueChunk);
-      chunkDataBuffer = valueChunk.getData();
-      chunkHeader = valueChunk.getHeader();
-
-      valuePageHeaders.add(new ArrayList<>());
-      compressedValuePageDatas.add(new ArrayList<>());
-      while (chunkDataBuffer.remaining() > 0) {
-        // deserialize a PageHeader from chunkDataBuffer
-        PageHeader pageHeader;
-        if (((byte) (chunkHeader.getChunkType() & 0x3F)) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
-          pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
valueChunk.getChunkStatistic());
-        } else {
-          pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunkHeader.getDataType());
-        }
-        if (pageHeader.getCompressedSize() == 0) {
-          // empty value page
-          valuePageHeaders.get(i).add(null);
-          compressedValuePageDatas.get(i).add(null);
-        } else {
-          ByteBuffer compressedPageData = 
chunkReader.readPageDataWithoutUncompressing(pageHeader);
-          valuePageHeaders.get(i).add(pageHeader);
-          compressedValuePageDatas.get(i).add(compressedPageData);
-        }
-      }
+      List<Pair<PageHeader, ByteBuffer>> valuesPages =
+          chunkReader.readPageDataWithoutUncompressing();
+      valuePagesList.add(valuesPages);
     }
 
     // add aligned pages into page queue
-    for (int i = 0; i < timePageHeaders.size(); i++) {
+    for (int i = 0; i < timePages.size(); i++) {
       List<PageHeader> alignedPageHeaders = new ArrayList<>();
       List<ByteBuffer> alignedPageDatas = new ArrayList<>();
-      for (int j = 0; j < valuePageHeaders.size(); j++) {
-        if (valuePageHeaders.get(j) == null) {
+      for (int j = 0; j < valuePagesList.size(); j++) {
+        if (valuePagesList.get(j) == null) {
           alignedPageHeaders.add(null);
           alignedPageDatas.add(null);
           continue;
         }
-        alignedPageHeaders.add(valuePageHeaders.get(j).get(i));
-        alignedPageDatas.add(compressedValuePageDatas.get(j).get(i));
+        Pair<PageHeader, ByteBuffer> valuePage = valuePagesList.get(j).get(i);
+        alignedPageHeaders.add(valuePage == null ? null : valuePage.left);
+        alignedPageDatas.add(valuePage == null ? null : valuePage.right);
       }
       pageQueue.add(
           new AlignedPageElement(
-              timePageHeaders.get(i),
+              timePages.get(i).left,
               alignedPageHeaders,
-              compressedTimePageDatas.get(i),
+              timePages.get(i).right,
               alignedPageDatas,
               new CompactionAlignedChunkReader(timeChunk, valueChunks),
               chunkMetadataElement,
-              i == timePageHeaders.size() - 1,
+              i == timePages.size() - 1,
               chunkMetadataElement.priority));
     }
     chunkMetadataElement.clearChunks();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
index c164a5ae82e..bcf5e3c24e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
@@ -34,7 +34,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.utils.ModificationUtils;
 import org.apache.iotdb.tsfile.exception.write.PageException;
-import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -172,23 +171,13 @@ public class NonAlignedSeriesCompactionExecutor extends 
SeriesCompactionExecutor
     updateSummary(chunkMetadataElement, ChunkStatus.DESERIALIZE_CHUNK);
     Chunk chunk = chunkMetadataElement.chunk;
     CompactionChunkReader chunkReader = new CompactionChunkReader(chunk);
-    ByteBuffer chunkDataBuffer = chunk.getData();
-    ChunkHeader chunkHeader = chunk.getHeader();
-    while (chunkDataBuffer.remaining() > 0) {
-      // deserialize a PageHeader from chunkDataBuffer
-      PageHeader pageHeader;
-      if (((byte) (chunkHeader.getChunkType() & 0x3F)) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
-        pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunk.getChunkStatistic());
-      } else {
-        pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunkHeader.getDataType());
-      }
-      ByteBuffer compressedPageData = 
chunkReader.readPageDataWithoutUncompressing(pageHeader);
-
-      boolean isLastPage = chunkDataBuffer.remaining() <= 0;
+    List<Pair<PageHeader, ByteBuffer>> pages = 
chunkReader.readPageDataWithoutUncompressing();
+    for (int i = 0; i < pages.size(); i++) {
+      boolean isLastPage = i == pages.size() - 1;
       pageQueue.add(
           new NonAlignedPageElement(
-              pageHeader,
-              compressedPageData,
+              pages.get(i).left,
+              pages.get(i).right,
               chunkReader,
               chunkMetadataElement,
               isLastPage,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/AlignedPageElement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/AlignedPageElement.java
index 83b03cd095f..e1b5c2e49f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/AlignedPageElement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/AlignedPageElement.java
@@ -33,9 +33,9 @@ public class AlignedPageElement extends PageElement {
   private final List<PageHeader> valuePageHeaders;
 
   // compressed page data
-  private final ByteBuffer timePageData;
+  private ByteBuffer timePageData;
 
-  private final List<ByteBuffer> valuePageDataList;
+  private List<ByteBuffer> valuePageDataList;
 
   private final CompactionAlignedChunkReader chunkReader;
 
@@ -64,6 +64,9 @@ public class AlignedPageElement extends PageElement {
     pointReader =
         chunkReader.getPagePointReader(
             timePageHeader, valuePageHeaders, timePageData, valuePageDataList);
+    // friendly for gc
+    timePageData = null;
+    valuePageDataList = null;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/NonAlignedPageElement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/NonAlignedPageElement.java
index 315c40c2459..81b0b6afa0b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/NonAlignedPageElement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/NonAlignedPageElement.java
@@ -31,7 +31,7 @@ public class NonAlignedPageElement extends PageElement {
   private final PageHeader pageHeader;
 
   // compressed page data
-  private final ByteBuffer pageData;
+  private ByteBuffer pageData;
 
   private final CompactionChunkReader chunkReader;
 
@@ -52,6 +52,7 @@ public class NonAlignedPageElement extends PageElement {
   public void deserializePage() throws IOException {
     TsBlock batchData = chunkReader.readPageData(pageHeader, pageData);
     this.pointReader = batchData.getTsBlockSingleColumnIterator();
+    pageData = null;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionChunkReader.java
index 26f3f9b92e6..67268166ae4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionChunkReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionChunkReader.java
@@ -22,17 +22,21 @@ package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.ex
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.compress.IUnCompressor;
 import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader.readCompressedPageData;
@@ -41,13 +45,15 @@ import static 
org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader.uncompressPa
 public class CompactionChunkReader {
 
   private final ChunkHeader chunkHeader;
-  private final ByteBuffer chunkDataBuffer;
+  private ByteBuffer chunkDataBuffer;
   private final IUnCompressor unCompressor;
   private final Decoder timeDecoder =
       Decoder.getDecoderByType(
           
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
           TSDataType.INT64);
 
+  private final Statistics chunkStatistic;
+
   // A list of deleted intervals.
   private final List<TimeRange> deleteIntervalList;
 
@@ -60,6 +66,7 @@ public class CompactionChunkReader {
     this.chunkDataBuffer = chunk.getData();
     this.unCompressor = 
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
     this.deleteIntervalList = chunk.getDeleteIntervalList();
+    this.chunkStatistic = chunk.getChunkStatistic();
   }
 
   /**
@@ -72,6 +79,30 @@ public class CompactionChunkReader {
     return readCompressedPageData(pageHeader, chunkDataBuffer);
   }
 
+  public List<Pair<PageHeader, ByteBuffer>> readPageDataWithoutUncompressing() 
throws IOException {
+    List<Pair<PageHeader, ByteBuffer>> pages = new ArrayList<>();
+    while (chunkDataBuffer.remaining() > 0) {
+      // deserialize a PageHeader from chunkDataBuffer
+      PageHeader pageHeader;
+      if (((byte) (chunkHeader.getChunkType() & 0x3F)) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
+        pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunkStatistic);
+      } else {
+        pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunkHeader.getDataType());
+      }
+      if (pageHeader.getCompressedSize() == 0) {
+        // empty value page
+        pages.add(null);
+      } else {
+        ByteBuffer compressedPageData = readCompressedPageData(pageHeader, 
chunkDataBuffer);
+        Pair<PageHeader, ByteBuffer> page = new Pair<>(pageHeader, 
compressedPageData);
+        pages.add(page);
+      }
+    }
+    // clear chunk data to release memory
+    chunkDataBuffer = null;
+    return pages;
+  }
+
   /**
    * Read data from compressed page data. Uncompress the page and decode it to 
batch data.
    *

Reply via email to