This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new bd5d999148e release chunk or page memory when deserializing chunk or
page (#12340)
bd5d999148e is described below
commit bd5d999148e3951335ed8b8992649e4efa59f0f1
Author: 周沛辰 <[email protected]>
AuthorDate: Fri Apr 19 10:50:11 2024 +0800
release chunk or page memory when deserializing chunk or page (#12340)
---
.../fast/AlignedSeriesCompactionExecutor.java | 68 +++++-----------------
.../fast/NonAlignedSeriesCompactionExecutor.java | 21 ++-----
.../executor/fast/element/AlignedPageElement.java | 7 ++-
.../fast/element/NonAlignedPageElement.java | 3 +-
.../fast/reader/CompactionChunkReader.java | 33 ++++++++++-
5 files changed, 60 insertions(+), 72 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
index 9cfd7cee948..f403fb9fc25 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java
@@ -36,7 +36,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.iotdb.tsfile.exception.write.PageException;
-import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
@@ -265,88 +264,53 @@ public class AlignedSeriesCompactionExecutor extends
SeriesCompactionExecutor {
@SuppressWarnings("squid:S3776")
void deserializeChunkIntoPageQueue(ChunkMetadataElement
chunkMetadataElement) throws IOException {
updateSummary(chunkMetadataElement, ChunkStatus.DESERIALIZE_CHUNK);
- List<PageHeader> timePageHeaders = new ArrayList<>();
- List<ByteBuffer> compressedTimePageDatas = new ArrayList<>();
- List<List<PageHeader>> valuePageHeaders = new ArrayList<>();
- List<List<ByteBuffer>> compressedValuePageDatas = new ArrayList<>();
// deserialize time chunk
Chunk timeChunk = chunkMetadataElement.chunk;
CompactionChunkReader chunkReader = new CompactionChunkReader(timeChunk);
- ByteBuffer chunkDataBuffer = timeChunk.getData();
- ChunkHeader chunkHeader = timeChunk.getHeader();
- while (chunkDataBuffer.remaining() > 0) {
- // deserialize a PageHeader from chunkDataBuffer
- PageHeader pageHeader;
- if (((byte) (chunkHeader.getChunkType() & 0x3F)) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
- pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
timeChunk.getChunkStatistic());
- } else {
- pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunkHeader.getDataType());
- }
- ByteBuffer compressedPageData =
chunkReader.readPageDataWithoutUncompressing(pageHeader);
- timePageHeaders.add(pageHeader);
- compressedTimePageDatas.add(compressedPageData);
- }
+ List<Pair<PageHeader, ByteBuffer>> timePages =
chunkReader.readPageDataWithoutUncompressing();
// deserialize value chunks
+ List<List<Pair<PageHeader, ByteBuffer>>> valuePagesList = new
ArrayList<>();
List<Chunk> valueChunks = chunkMetadataElement.valueChunks;
for (int i = 0; i < valueChunks.size(); i++) {
Chunk valueChunk = valueChunks.get(i);
if (valueChunk == null) {
// value chunk has been deleted completely
- valuePageHeaders.add(null);
- compressedValuePageDatas.add(null);
+ valuePagesList.add(null);
continue;
}
+
chunkReader = new CompactionChunkReader(valueChunk);
- chunkDataBuffer = valueChunk.getData();
- chunkHeader = valueChunk.getHeader();
-
- valuePageHeaders.add(new ArrayList<>());
- compressedValuePageDatas.add(new ArrayList<>());
- while (chunkDataBuffer.remaining() > 0) {
- // deserialize a PageHeader from chunkDataBuffer
- PageHeader pageHeader;
- if (((byte) (chunkHeader.getChunkType() & 0x3F)) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
- pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
valueChunk.getChunkStatistic());
- } else {
- pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunkHeader.getDataType());
- }
- if (pageHeader.getCompressedSize() == 0) {
- // empty value page
- valuePageHeaders.get(i).add(null);
- compressedValuePageDatas.get(i).add(null);
- } else {
- ByteBuffer compressedPageData =
chunkReader.readPageDataWithoutUncompressing(pageHeader);
- valuePageHeaders.get(i).add(pageHeader);
- compressedValuePageDatas.get(i).add(compressedPageData);
- }
- }
+ List<Pair<PageHeader, ByteBuffer>> valuesPages =
+ chunkReader.readPageDataWithoutUncompressing();
+ valuePagesList.add(valuesPages);
}
// add aligned pages into page queue
- for (int i = 0; i < timePageHeaders.size(); i++) {
+ for (int i = 0; i < timePages.size(); i++) {
List<PageHeader> alignedPageHeaders = new ArrayList<>();
List<ByteBuffer> alignedPageDatas = new ArrayList<>();
- for (int j = 0; j < valuePageHeaders.size(); j++) {
- if (valuePageHeaders.get(j) == null) {
+ for (int j = 0; j < valuePagesList.size(); j++) {
+ if (valuePagesList.get(j) == null) {
alignedPageHeaders.add(null);
alignedPageDatas.add(null);
continue;
}
- alignedPageHeaders.add(valuePageHeaders.get(j).get(i));
- alignedPageDatas.add(compressedValuePageDatas.get(j).get(i));
+ Pair<PageHeader, ByteBuffer> valuePage = valuePagesList.get(j).get(i);
+ alignedPageHeaders.add(valuePage == null ? null : valuePage.left);
+ alignedPageDatas.add(valuePage == null ? null : valuePage.right);
}
pageQueue.add(
new AlignedPageElement(
- timePageHeaders.get(i),
+ timePages.get(i).left,
alignedPageHeaders,
- compressedTimePageDatas.get(i),
+ timePages.get(i).right,
alignedPageDatas,
new CompactionAlignedChunkReader(timeChunk, valueChunks),
chunkMetadataElement,
- i == timePageHeaders.size() - 1,
+ i == timePages.size() - 1,
chunkMetadataElement.priority));
}
chunkMetadataElement.clearChunks();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
index c164a5ae82e..bcf5e3c24e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/NonAlignedSeriesCompactionExecutor.java
@@ -34,7 +34,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.iotdb.tsfile.exception.write.PageException;
-import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -172,23 +171,13 @@ public class NonAlignedSeriesCompactionExecutor extends
SeriesCompactionExecutor
updateSummary(chunkMetadataElement, ChunkStatus.DESERIALIZE_CHUNK);
Chunk chunk = chunkMetadataElement.chunk;
CompactionChunkReader chunkReader = new CompactionChunkReader(chunk);
- ByteBuffer chunkDataBuffer = chunk.getData();
- ChunkHeader chunkHeader = chunk.getHeader();
- while (chunkDataBuffer.remaining() > 0) {
- // deserialize a PageHeader from chunkDataBuffer
- PageHeader pageHeader;
- if (((byte) (chunkHeader.getChunkType() & 0x3F)) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
- pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunk.getChunkStatistic());
- } else {
- pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunkHeader.getDataType());
- }
- ByteBuffer compressedPageData =
chunkReader.readPageDataWithoutUncompressing(pageHeader);
-
- boolean isLastPage = chunkDataBuffer.remaining() <= 0;
+ List<Pair<PageHeader, ByteBuffer>> pages =
chunkReader.readPageDataWithoutUncompressing();
+ for (int i = 0; i < pages.size(); i++) {
+ boolean isLastPage = i == pages.size() - 1;
pageQueue.add(
new NonAlignedPageElement(
- pageHeader,
- compressedPageData,
+ pages.get(i).left,
+ pages.get(i).right,
chunkReader,
chunkMetadataElement,
isLastPage,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/AlignedPageElement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/AlignedPageElement.java
index 83b03cd095f..e1b5c2e49f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/AlignedPageElement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/AlignedPageElement.java
@@ -33,9 +33,9 @@ public class AlignedPageElement extends PageElement {
private final List<PageHeader> valuePageHeaders;
// compressed page data
- private final ByteBuffer timePageData;
+ private ByteBuffer timePageData;
- private final List<ByteBuffer> valuePageDataList;
+ private List<ByteBuffer> valuePageDataList;
private final CompactionAlignedChunkReader chunkReader;
@@ -64,6 +64,9 @@ public class AlignedPageElement extends PageElement {
pointReader =
chunkReader.getPagePointReader(
timePageHeader, valuePageHeaders, timePageData, valuePageDataList);
+ // friendly for gc
+ timePageData = null;
+ valuePageDataList = null;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/NonAlignedPageElement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/NonAlignedPageElement.java
index 315c40c2459..81b0b6afa0b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/NonAlignedPageElement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/element/NonAlignedPageElement.java
@@ -31,7 +31,7 @@ public class NonAlignedPageElement extends PageElement {
private final PageHeader pageHeader;
// compressed page data
- private final ByteBuffer pageData;
+ private ByteBuffer pageData;
private final CompactionChunkReader chunkReader;
@@ -52,6 +52,7 @@ public class NonAlignedPageElement extends PageElement {
public void deserializePage() throws IOException {
TsBlock batchData = chunkReader.readPageData(pageHeader, pageData);
this.pointReader = batchData.getTsBlockSingleColumnIterator();
+ pageData = null;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionChunkReader.java
index 26f3f9b92e6..67268166ae4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionChunkReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/reader/CompactionChunkReader.java
@@ -22,17 +22,21 @@ package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.ex
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.compress.IUnCompressor;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
+import org.apache.iotdb.tsfile.file.MetaMarker;
import org.apache.iotdb.tsfile.file.header.ChunkHeader;
import org.apache.iotdb.tsfile.file.header.PageHeader;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import static
org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader.readCompressedPageData;
@@ -41,13 +45,15 @@ import static
org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader.uncompressPa
public class CompactionChunkReader {
private final ChunkHeader chunkHeader;
- private final ByteBuffer chunkDataBuffer;
+ private ByteBuffer chunkDataBuffer;
private final IUnCompressor unCompressor;
private final Decoder timeDecoder =
Decoder.getDecoderByType(
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
TSDataType.INT64);
+ private final Statistics chunkStatistic;
+
// A list of deleted intervals.
private final List<TimeRange> deleteIntervalList;
@@ -60,6 +66,7 @@ public class CompactionChunkReader {
this.chunkDataBuffer = chunk.getData();
this.unCompressor =
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType());
this.deleteIntervalList = chunk.getDeleteIntervalList();
+ this.chunkStatistic = chunk.getChunkStatistic();
}
/**
@@ -72,6 +79,30 @@ public class CompactionChunkReader {
return readCompressedPageData(pageHeader, chunkDataBuffer);
}
+ public List<Pair<PageHeader, ByteBuffer>> readPageDataWithoutUncompressing()
throws IOException {
+ List<Pair<PageHeader, ByteBuffer>> pages = new ArrayList<>();
+ while (chunkDataBuffer.remaining() > 0) {
+ // deserialize a PageHeader from chunkDataBuffer
+ PageHeader pageHeader;
+ if (((byte) (chunkHeader.getChunkType() & 0x3F)) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
+ pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunkStatistic);
+ } else {
+ pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunkHeader.getDataType());
+ }
+ if (pageHeader.getCompressedSize() == 0) {
+ // empty value page
+ pages.add(null);
+ } else {
+ ByteBuffer compressedPageData = readCompressedPageData(pageHeader,
chunkDataBuffer);
+ Pair<PageHeader, ByteBuffer> page = new Pair<>(pageHeader,
compressedPageData);
+ pages.add(page);
+ }
+ }
+ // clear chunk data to release memory
+ chunkDataBuffer = null;
+ return pages;
+ }
+
/**
* Read data from compressed page data. Uncompress the page and decode it to
batch data.
*