This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 59b79efb17e Pipe: account page decode memory in scan parser (#17807)
59b79efb17e is described below
commit 59b79efb17efcbef4733cab107a15c58f18968ac
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 3 15:13:46 2026 +0800
Pipe: account page decode memory in scan parser (#17807)
* Pipe: account page decode memory in scan parser
* Fix pipe scan parser single page row count
* Fix pipe scan parser page memory test
---
.../scan/AlignedSinglePageWholeChunkReader.java | 55 +++++-
.../parser/scan/MemoryControlledChunkReader.java | 76 ++++++++
.../parser/scan/SinglePageWholeChunkReader.java | 193 +++++++++++++++++-
.../scan/TsFileInsertionEventScanParser.java | 53 +++--
.../pipe/event/TsFileInsertionEventParserTest.java | 215 +++++++++++++++++++++
5 files changed, 568 insertions(+), 24 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/AlignedSinglePageWholeChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/AlignedSinglePageWholeChunkReader.java
index ec6ed5f2da9..f0135f84cb7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/AlignedSinglePageWholeChunkReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/AlignedSinglePageWholeChunkReader.java
@@ -39,6 +39,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.function.LongConsumer;
/**
@@ -71,7 +72,7 @@ public class AlignedSinglePageWholeChunkReader extends
AbstractChunkReader
this.timeChunkDataBuffer = timeChunk.getData();
this.encryptParam = timeChunk.getEncryptParam();
this.pageEstimatedMemoryUsageInBytes =
- calculatePageEstimatedMemoryUsageInBytes(timeChunk, valueChunkList);
+ calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(timeChunk,
valueChunkList);
valueChunkList.forEach(
chunk -> {
@@ -216,4 +217,56 @@ public class AlignedSinglePageWholeChunkReader extends
AbstractChunkReader
return estimatedMemoryUsageInBytes;
}
+
+ public static long calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+ final Chunk timeChunk, final List<Chunk> valueChunkList) throws
IOException {
+ final List<Long> pageEstimatedMemoryUsageInBytesList =
+ calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(timeChunk,
valueChunkList);
+ return pageEstimatedMemoryUsageInBytesList.isEmpty()
+ ? 0
+ : pageEstimatedMemoryUsageInBytesList.get(0);
+ }
+
+ public static List<Long>
calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
+ final Chunk timeChunk, final List<Chunk> valueChunkList) throws
IOException {
+ final ByteBuffer timeChunkDataBuffer = timeChunk.getData().duplicate();
+ final List<ByteBuffer> valueChunkDataBufferList = new
ArrayList<>(valueChunkList.size());
+ for (final Chunk valueChunk : valueChunkList) {
+ valueChunkDataBufferList.add(
+ Objects.isNull(valueChunk) ? null :
valueChunk.getData().duplicate());
+ }
+
+ final List<Long> pageEstimatedMemoryUsageInBytesList = new ArrayList<>();
+ while (timeChunkDataBuffer.remaining() > 0) {
+ long pageUncompressedSizeInBytes = 0;
+ final PageHeader timePageHeader =
+ SinglePageWholeChunkReader.deserializePageHeader(
+ timeChunkDataBuffer, timeChunk.getHeader());
+ pageUncompressedSizeInBytes += timePageHeader.getUncompressedSize();
+ SinglePageWholeChunkReader.skipCompressedPageData(timeChunkDataBuffer,
timePageHeader);
+
+ final List<TSDataType> valueDataTypeList = new
ArrayList<>(valueChunkList.size());
+ for (int i = 0; i < valueChunkList.size(); ++i) {
+ final Chunk valueChunk = valueChunkList.get(i);
+ final ByteBuffer valueChunkDataBuffer =
valueChunkDataBufferList.get(i);
+ if (Objects.isNull(valueChunk) ||
Objects.isNull(valueChunkDataBuffer)) {
+ valueDataTypeList.add(null);
+ continue;
+ }
+
+ final PageHeader valuePageHeader =
+ SinglePageWholeChunkReader.deserializePageHeader(
+ valueChunkDataBuffer, valueChunk.getHeader());
+ pageUncompressedSizeInBytes += valuePageHeader.getUncompressedSize();
+ valueDataTypeList.add(valueChunk.getHeader().getDataType());
+
SinglePageWholeChunkReader.skipCompressedPageData(valueChunkDataBuffer,
valuePageHeader);
+ }
+ pageEstimatedMemoryUsageInBytesList.add(
+
SinglePageWholeChunkReader.estimatePageMemoryUsageInBytesWithBatchData(
+ pageUncompressedSizeInBytes,
+ SinglePageWholeChunkReader.getPageRowCount(timePageHeader,
timeChunk),
+ valueDataTypeList));
+ }
+ return
SinglePageWholeChunkReader.toSuffixMaxList(pageEstimatedMemoryUsageInBytesList);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/MemoryControlledChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/MemoryControlledChunkReader.java
new file mode 100644
index 00000000000..105bd5e8e33
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/MemoryControlledChunkReader.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan;
+
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.reader.IChunkReader;
+import org.apache.tsfile.read.reader.IPageReader;
+
+import java.io.IOException;
+import java.util.List;
+
+class MemoryControlledChunkReader implements IChunkReader,
EstimatedMemoryChunkReader {
+
+ private final IChunkReader delegate;
+ private final List<Long> pageEstimatedMemoryUsageInBytesList;
+ private int pageIndex;
+
+ MemoryControlledChunkReader(
+ final IChunkReader delegate, final List<Long>
pageEstimatedMemoryUsageInBytesList) {
+ this.delegate = delegate;
+ this.pageEstimatedMemoryUsageInBytesList =
pageEstimatedMemoryUsageInBytesList;
+ }
+
+ @Override
+ public long getCurrentPageEstimatedMemoryUsageInBytes() {
+ return pageIndex < pageEstimatedMemoryUsageInBytesList.size()
+ ? pageEstimatedMemoryUsageInBytesList.get(pageIndex)
+ : 0;
+ }
+
+ @Override
+ public boolean hasNextSatisfiedPage() throws IOException {
+ return delegate.hasNextSatisfiedPage();
+ }
+
+ @Override
+ public BatchData nextPageData() throws IOException {
+ try {
+ return delegate.nextPageData();
+ } finally {
+ ++pageIndex;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public void markDataTypeModifiedAndCannotUseStatistics() {
+ delegate.markDataTypeModifiedAndCannotUseStatistics();
+ }
+
+ @Override
+ public List<IPageReader> loadPageReaderList() throws IOException {
+ return delegate.loadPageReaderList();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
index 2bfa70f1dd0..4d8b35bac4a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
@@ -25,6 +25,8 @@ import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.encrypt.IDecryptor;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.enums.EncryptionType;
@@ -33,10 +35,15 @@ import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.tsfile.read.reader.page.LazyLoadPageData;
import org.apache.tsfile.read.reader.page.PageReader;
+import org.apache.tsfile.utils.RamUsageEstimator;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
import static
org.apache.tsfile.file.metadata.enums.CompressionType.UNCOMPRESSED;
@@ -53,7 +60,8 @@ public class SinglePageWholeChunkReader extends
AbstractChunkReader
this.chunkHeader = chunk.getHeader();
this.chunkDataBuffer = chunk.getData();
this.encryptParam = chunk.getEncryptParam();
- this.pageEstimatedMemoryUsageInBytes =
calculatePageEstimatedMemoryUsageInBytes(chunk);
+ this.pageEstimatedMemoryUsageInBytes =
+ calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(chunk);
initAllPageReaders();
}
@@ -91,11 +99,190 @@ public class SinglePageWholeChunkReader extends
AbstractChunkReader
public static long calculatePageEstimatedMemoryUsageInBytes(final Chunk
chunk)
throws IOException {
final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
- final PageHeader pageHeader =
- PageHeader.deserializeFrom(chunkDataBuffer, (Statistics<? extends
Serializable>) null);
+ final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer,
chunk.getHeader());
return pageHeader.getUncompressedSize();
}
+ public static long calculateMaxPageEstimatedMemoryUsageInBytes(final Chunk
chunk)
+ throws IOException {
+ final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+ long maxPageEstimatedMemoryUsageInBytes = 0;
+ while (chunkDataBuffer.remaining() > 0) {
+ final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer,
chunk.getHeader());
+ maxPageEstimatedMemoryUsageInBytes =
+ Math.max(maxPageEstimatedMemoryUsageInBytes,
pageHeader.getUncompressedSize());
+ skipCompressedPageData(chunkDataBuffer, pageHeader);
+ }
+ return maxPageEstimatedMemoryUsageInBytes;
+ }
+
+ public static long
calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(final Chunk chunk)
+ throws IOException {
+ final List<Long> pageEstimatedMemoryUsageInBytesList =
+ calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(chunk);
+ return pageEstimatedMemoryUsageInBytesList.isEmpty()
+ ? 0
+ : pageEstimatedMemoryUsageInBytesList.get(0);
+ }
+
+ public static List<Long>
calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
+ final Chunk chunk) throws IOException {
+ final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+ final List<Long> pageEstimatedMemoryUsageInBytesList = new ArrayList<>();
+ while (chunkDataBuffer.remaining() > 0) {
+ final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer,
chunk.getHeader());
+ pageEstimatedMemoryUsageInBytesList.add(
+ estimatePageMemoryUsageInBytesWithBatchData(
+ pageHeader, chunk,
Collections.singletonList(chunk.getHeader().getDataType())));
+ skipCompressedPageData(chunkDataBuffer, pageHeader);
+ }
+ return toSuffixMaxList(pageEstimatedMemoryUsageInBytesList);
+ }
+
+ static PageHeader deserializePageHeader(
+ final ByteBuffer chunkDataBuffer, final ChunkHeader chunkHeader) throws
IOException {
+ return isSinglePageChunk(chunkHeader)
+ ? PageHeader.deserializeFrom(chunkDataBuffer, (Statistics<? extends
Serializable>) null)
+ : PageHeader.deserializeFrom(chunkDataBuffer,
chunkHeader.getDataType());
+ }
+
+ static boolean isSinglePageChunk(final ChunkHeader chunkHeader) {
+ return (chunkHeader.getChunkType() & 0x3F) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER;
+ }
+
+ static void skipCompressedPageData(
+ final ByteBuffer chunkDataBuffer, final PageHeader pageHeader) {
+ chunkDataBuffer.position(chunkDataBuffer.position() +
pageHeader.getCompressedSize());
+ }
+
+ static List<Long> toSuffixMaxList(final List<Long>
pageEstimatedMemoryUsageInBytesList) {
+ long suffixMaxPageEstimatedMemoryUsageInBytes = 0;
+ for (int i = pageEstimatedMemoryUsageInBytesList.size() - 1; i >= 0; --i) {
+ suffixMaxPageEstimatedMemoryUsageInBytes =
+ Math.max(
+ suffixMaxPageEstimatedMemoryUsageInBytes,
pageEstimatedMemoryUsageInBytesList.get(i));
+ pageEstimatedMemoryUsageInBytesList.set(i,
suffixMaxPageEstimatedMemoryUsageInBytes);
+ }
+ return pageEstimatedMemoryUsageInBytesList;
+ }
+
+ static long estimatePageMemoryUsageInBytesWithBatchData(
+ final PageHeader timePageHeader,
+ final Chunk timeChunk,
+ final List<TSDataType> valueDataTypeList) {
+ return estimatePageMemoryUsageInBytesWithBatchData(
+ timePageHeader.getUncompressedSize(),
+ getPageRowCount(timePageHeader, timeChunk),
+ valueDataTypeList);
+ }
+
+ static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) {
+ if (isSinglePageChunk(chunk.getHeader())) {
+ return Objects.isNull(chunk.getChunkStatistic())
+ ? 0
+ : saturateToInt(chunk.getChunkStatistic().getCount());
+ }
+ return saturateToInt(pageHeader.getNumOfValues());
+ }
+
+ private static int saturateToInt(final long value) {
+ return (int) Math.min(Integer.MAX_VALUE, value);
+ }
+
+ static long estimatePageMemoryUsageInBytesWithBatchData(
+ final long pageUncompressedSizeInBytes,
+ final int rowCount,
+ final List<TSDataType> valueDataTypeList) {
+ return pageUncompressedSizeInBytes
+ + estimateBatchDataMemoryUsageInBytes(rowCount, valueDataTypeList);
+ }
+
+ private static long estimateBatchDataMemoryUsageInBytes(
+ final int rowCount, final List<TSDataType> valueDataTypeList) {
+ final int valueCount = valueDataTypeList.size();
+ final long segmentCount = Math.max(1, (rowCount + 15L) / 16);
+ long estimatedMemoryUsageInBytes = RamUsageEstimator.sizeOfLongArray(16) *
segmentCount;
+
+ if (valueCount == 1) {
+ estimatedMemoryUsageInBytes +=
+ estimateSingleValueArrayMemoryUsageInBytes(rowCount,
valueDataTypeList.get(0));
+ } else if (valueCount > 1) {
+ estimatedMemoryUsageInBytes += RamUsageEstimator.sizeOfObjectArray(16) *
segmentCount;
+ estimatedMemoryUsageInBytes +=
+ (long) rowCount
+ * (RamUsageEstimator.sizeOfObjectArray(valueCount)
+ + estimateVectorValueMemoryUsageInBytes(valueDataTypeList));
+ }
+
+ return estimatedMemoryUsageInBytes;
+ }
+
+ private static long estimateSingleValueArrayMemoryUsageInBytes(
+ final int rowCount, final TSDataType dataType) {
+ final long segmentCount = Math.max(1, (rowCount + 15L) / 16);
+ if (Objects.isNull(dataType)) {
+ return 0;
+ }
+
+ switch (dataType) {
+ case BOOLEAN:
+ return RamUsageEstimator.sizeOfBooleanArray(16) * segmentCount;
+ case INT32:
+ case DATE:
+ return RamUsageEstimator.sizeOfIntArray(16) * segmentCount;
+ case INT64:
+ case TIMESTAMP:
+ return RamUsageEstimator.sizeOfLongArray(16) * segmentCount;
+ case FLOAT:
+ return RamUsageEstimator.sizeOfFloatArray(16) * segmentCount;
+ case DOUBLE:
+ return RamUsageEstimator.sizeOfDoubleArray(16) * segmentCount;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ return RamUsageEstimator.sizeOfObjectArray(16) * segmentCount;
+ default:
+ return 0;
+ }
+ }
+
+ private static long estimateVectorValueMemoryUsageInBytes(
+ final List<TSDataType> valueDataTypeList) {
+ long estimatedMemoryUsageInBytes = 0;
+ for (final TSDataType dataType : valueDataTypeList) {
+ if (Objects.isNull(dataType)) {
+ continue;
+ }
+
+ estimatedMemoryUsageInBytes +=
+ RamUsageEstimator.alignObjectSize(
+ RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+ + estimateTsPrimitiveTypeValueMemoryUsageInBytes(dataType));
+ }
+ return estimatedMemoryUsageInBytes;
+ }
+
+ private static long estimateTsPrimitiveTypeValueMemoryUsageInBytes(final
TSDataType dataType) {
+ switch (dataType) {
+ case BOOLEAN:
+ return 1;
+ case INT32:
+ case DATE:
+ case FLOAT:
+ return Integer.BYTES;
+ case INT64:
+ case TIMESTAMP:
+ case DOUBLE:
+ return Long.BYTES;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ return RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+ default:
+ return 0;
+ }
+ }
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// util methods
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index b18bf6255ab..d843ab38ab4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -340,7 +340,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
data.next();
while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage()) {
- data = chunkReader.nextPageData();
+ data = nextPageData();
}
if (tablet != null && tablet.getRowSize() == tablet.getMaxRowNumber())
{
@@ -376,16 +376,18 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
}
do {
- resizePageDataMemoryForCurrentPageIfNeeded();
- data = chunkReader.nextPageData();
- long size = PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data);
- if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < size) {
-
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForBatchData,
size);
- }
+ data = nextPageData();
} while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage());
} while (!data.hasCurrent());
}
+ private BatchData nextPageData() throws IOException {
+ resizePageDataMemoryForCurrentPageIfNeeded();
+ final BatchData nextData = chunkReader.nextPageData();
+
resizePageDataMemoryIfNeeded(PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(nextData));
+ return nextData;
+ }
+
private void resizePageDataMemoryForCurrentPageIfNeeded() {
if (!(chunkReader instanceof EstimatedMemoryChunkReader)) {
return;
@@ -587,10 +589,14 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
Chunk chunk =
new Chunk(
chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
+ final List<Long> pageEstimatedMemoryUsageInBytesList =
+ SinglePageWholeChunkReader
+
.calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(chunk);
chunkReader =
currentIsMultiPage
- ? new ChunkReader(chunk, filter)
+ ? new MemoryControlledChunkReader(
+ new ChunkReader(chunk, filter),
pageEstimatedMemoryUsageInBytesList)
: new SinglePageWholeChunkReader(chunk);
currentIsAligned = false;
final String measurementID =
@@ -636,8 +642,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
chunk =
new Chunk(
chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
- currentValueChunkPageMemorySize =
- calculatePageMemorySizeIfSinglePageValueChunk(chunk);
+ currentValueChunkPageMemorySize =
calculateMaxPageMemorySize(chunk);
boolean needReturn = false;
final long timeChunkSize =
lastIndex >= 0
@@ -674,8 +679,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
chunk = firstChunk4NextSequentialValueChunks;
chunkHeader = chunk.getHeader();
firstChunk4NextSequentialValueChunks = null;
- currentValueChunkPageMemorySize =
- calculatePageMemorySizeIfSinglePageValueChunk(chunk);
+ currentValueChunkPageMemorySize =
calculateMaxPageMemorySize(chunk);
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList,
chunkHeader);
resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
valueChunkList, currentValueChunkPageMemorySize);
@@ -759,9 +763,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
final boolean isMultiPage = marker == MetaMarker.TIME_CHUNK_HEADER;
isMultiPageList.add(isMultiPage);
timeChunkPageMemorySizeList.add(
- isMultiPage
- ? 0
- :
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(timeChunk));
+
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(timeChunk));
return true;
}
}
@@ -825,9 +827,22 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
timeChunk, valueChunkList));
}
+ final List<Long> pageEstimatedMemoryUsageInBytesList =
+ currentIsMultiPage
+ ? AlignedSinglePageWholeChunkReader
+ .calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
+ timeChunk, valueChunkList)
+ : Collections.emptyList();
+ final long maxPageEstimatedMemoryUsageInBytes =
+ pageEstimatedMemoryUsageInBytesList.isEmpty()
+ ? 0
+ : pageEstimatedMemoryUsageInBytesList.get(0);
+ resizePageDataMemoryIfNeeded(maxPageEstimatedMemoryUsageInBytes);
chunkReader =
currentIsMultiPage
- ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
+ ? new MemoryControlledChunkReader(
+ new AlignedChunkReader(timeChunk, valueChunkList, filter),
+ pageEstimatedMemoryUsageInBytesList)
: new AlignedSinglePageWholeChunkReader(timeChunk,
valueChunkList, null);
currentIsAligned = true;
lastMarker = marker;
@@ -868,10 +883,8 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
}
}
- private long calculatePageMemorySizeIfSinglePageValueChunk(final Chunk
chunk) throws IOException {
- return isSinglePageValueChunk(chunk.getHeader())
- ?
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk)
- : 0;
+ private long calculateMaxPageMemorySize(final Chunk chunk) throws
IOException {
+ return
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(chunk);
}
private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 825a5bf6e66..84569bf586c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.AlignedSinglePageWholeChunkReader;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.SinglePageWholeChunkReader;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -148,6 +149,84 @@ public class TsFileInsertionEventParserTest {
System.out.println(System.currentTimeMillis() - startTime);
}
+ @Test
+ public void
testScanParserSplitNonAlignedSinglePageChunkByEstimatedPageMemory() throws
Exception {
+ final long originalPipeMaxReaderChunkSize =
+ CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+ final int originalPageSizeInByte =
+ TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ final int originalMaxNumberOfPointsInPage =
+
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
+ try {
+ TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024);
+
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10000);
+
+ final int measurementCount = 16;
+ final int rowCount = 64;
+ final List<IMeasurementSchema> schemaList = new ArrayList<>();
+ for (int i = 0; i < measurementCount; ++i) {
+ schemaList.add(
+ new MeasurementSchema(
+ "s" + i, TSDataType.STRING, TSEncoding.PLAIN,
CompressionType.LZ4));
+ }
+
+ nonalignedTsFile = new
File("nonaligned-single-page-high-compression.tsfile");
+ final Tablet tablet = new Tablet("root.sg.d", schemaList, rowCount);
+ final Binary value =
+ new Binary(new String(new char[512]).replace('\0', 'a'),
TSFileConfig.STRING_CHARSET);
+ for (int row = 0; row < rowCount; ++row) {
+ tablet.addTimestamp(row, row);
+ for (int measurementIndex = 0; measurementIndex < measurementCount;
++measurementIndex) {
+ tablet.addValue("s" + measurementIndex, row, value);
+ }
+ }
+
+ try (final TsFileWriter writer = new TsFileWriter(nonalignedTsFile)) {
+ writer.registerTimeseries(new PartialPath("root.sg.d"), schemaList);
+ writer.writeTree(tablet);
+ }
+
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(
+
calculatePipeMaxReaderChunkSizeForSinglePageNonAlignedChunk(nonalignedTsFile));
+
+ int tabletCount = 0;
+ int maxMeasurementCount = 0;
+ int pointCount = 0;
+ try (final TsFileInsertionEventScanParser parser =
+ new TsFileInsertionEventScanParser(
+ nonalignedTsFile,
+ new PrefixTreePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ false)) {
+ for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
+ Assert.assertFalse(tabletWithIsAligned.getRight());
+ final Tablet parsedTablet = tabletWithIsAligned.getLeft();
+ tabletCount++;
+ maxMeasurementCount = Math.max(maxMeasurementCount,
parsedTablet.getSchemas().size());
+ pointCount += getNonNullSize(parsedTablet);
+ }
+ }
+
+ Assert.assertTrue(tabletCount > 1);
+ Assert.assertTrue(maxMeasurementCount < measurementCount);
+ Assert.assertEquals(measurementCount * rowCount, pointCount);
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+
TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originalPageSizeInByte);
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setMaxNumberOfPointsInPage(originalMaxNumberOfPointsInPage);
+ }
+ }
+
@Test
public void testScanParserSplitAlignedSinglePageChunkByEstimatedPageMemory()
throws Exception {
final long originalPipeMaxReaderChunkSize =
@@ -226,6 +305,84 @@ public class TsFileInsertionEventParserTest {
}
}
+ @Test
+ public void testScanParserSplitAlignedMultiPageChunkByEstimatedPageMemory()
throws Exception {
+ final long originalPipeMaxReaderChunkSize =
+ CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+ final int originalPageSizeInByte =
+ TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ final int originalMaxNumberOfPointsInPage =
+
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
+ try {
+ TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024);
+
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(32);
+
+ final int measurementCount = 16;
+ final int rowCount = 64;
+ final List<IMeasurementSchema> schemaList = new ArrayList<>();
+ for (int i = 0; i < measurementCount; ++i) {
+ schemaList.add(
+ new MeasurementSchema(
+ "s" + i, TSDataType.STRING, TSEncoding.PLAIN,
CompressionType.LZ4));
+ }
+
+ alignedTsFile = new File("aligned-multi-page-high-compression.tsfile");
+ final Tablet tablet = new Tablet("root.sg.d", schemaList, rowCount);
+ final Binary value =
+ new Binary(new String(new char[512]).replace('\0', 'a'),
TSFileConfig.STRING_CHARSET);
+ for (int row = 0; row < rowCount; ++row) {
+ tablet.addTimestamp(row, row);
+ for (int measurementIndex = 0; measurementIndex < measurementCount;
++measurementIndex) {
+ tablet.addValue("s" + measurementIndex, row, value);
+ }
+ }
+
+ try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+ writer.registerAlignedTimeseries(new PartialPath("root.sg.d"),
schemaList);
+ writer.writeAligned(tablet);
+ }
+
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(
+
calculatePipeMaxReaderChunkSizeForMultiPageAlignedChunk(alignedTsFile));
+
+ int tabletCount = 0;
+ int maxMeasurementCount = 0;
+ int pointCount = 0;
+ try (final TsFileInsertionEventScanParser parser =
+ new TsFileInsertionEventScanParser(
+ alignedTsFile,
+ new PrefixTreePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ false)) {
+ for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
+ Assert.assertTrue(tabletWithIsAligned.getRight());
+ final Tablet parsedTablet = tabletWithIsAligned.getLeft();
+ tabletCount++;
+ maxMeasurementCount = Math.max(maxMeasurementCount,
parsedTablet.getSchemas().size());
+ pointCount += getNonNullSize(parsedTablet);
+ }
+ }
+
+ Assert.assertTrue(tabletCount > 1);
+ Assert.assertTrue(maxMeasurementCount < measurementCount);
+ Assert.assertEquals(measurementCount * rowCount, pointCount);
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+
TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originalPageSizeInByte);
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setMaxNumberOfPointsInPage(originalMaxNumberOfPointsInPage);
+ }
+ }
+
@Test
public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk()
throws Exception {
final long originalPipeMaxReaderChunkSize =
@@ -1584,6 +1741,64 @@ public class TsFileInsertionEventParserTest {
}
}
+ private long
calculatePipeMaxReaderChunkSizeForSinglePageNonAlignedChunk(final File tsFile)
+ throws Exception {
+ try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+ final IDeviceID deviceID =
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+ final List<String> measurements =
reader.getDeviceMeasurementsMap().get(deviceID);
+ Assert.assertFalse(measurements.isEmpty());
+
+ long chunkSizeLimit = 0;
+ long estimatedPageMemorySize = 0;
+ for (final String measurement : measurements) {
+ final List<ChunkMetadata> chunkMetadataList =
+ reader.getChunkMetadataList(new Path(deviceID, measurement,
false));
+ Assert.assertEquals(1, chunkMetadataList.size());
+
+ final Chunk chunk = reader.readMemChunk(chunkMetadataList.get(0));
+ Assert.assertEquals(
+ MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER,
chunk.getHeader().getChunkType() & 0x3F);
+ chunkSizeLimit += chunk.getHeader().getDataSize();
+ estimatedPageMemorySize +=
+
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+ chunk);
+ }
+
+ Assert.assertTrue(estimatedPageMemorySize > chunkSizeLimit);
+ return chunkSizeLimit;
+ }
+ }
+
+ private long calculatePipeMaxReaderChunkSizeForMultiPageAlignedChunk(final
File tsFile)
+ throws Exception {
+ try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+ final IDeviceID deviceID =
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+ final List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
+ reader.getAlignedChunkMetadata(deviceID, true);
+ Assert.assertEquals(1, alignedChunkMetadataList.size());
+
+ final AbstractAlignedChunkMetadata alignedChunkMetadata =
alignedChunkMetadataList.get(0);
+ final Chunk timeChunk =
+ reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
+ Assert.assertEquals(MetaMarker.CHUNK_HEADER,
timeChunk.getHeader().getChunkType() & 0x3F);
+
+ long chunkSizeLimit =
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+ long estimatedMaxPageMemorySize =
+
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(timeChunk);
+ for (final IChunkMetadata valueChunkMetadata :
+ alignedChunkMetadata.getValueChunkMetadataList()) {
+ final Chunk valueChunk = reader.readMemChunk((ChunkMetadata)
valueChunkMetadata);
+ Assert.assertEquals(MetaMarker.CHUNK_HEADER,
valueChunk.getHeader().getChunkType() & 0x3F);
+ chunkSizeLimit += valueChunk.getHeader().getDataSize();
+ estimatedMaxPageMemorySize +=
+
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(valueChunk);
+ }
+
+ Assert.assertTrue(estimatedMaxPageMemorySize > chunkSizeLimit);
+ return chunkSizeLimit;
+ }
+ }
+
private static class ParserPerformanceStats {
private long pointCount;