This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 30b4d49fa6b [to dev/1.3] Pipe: account page decode memory in scan
parser (#17807) (#17833)
30b4d49fa6b is described below
commit 30b4d49fa6b7dd1626533a2e06aaf657f300f8e7
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 3 18:27:44 2026 +0800
[to dev/1.3] Pipe: account page decode memory in scan parser (#17807)
(#17833)
* Pipe: account page decode memory in scan parser (#17807)
* Pipe: account page decode memory in scan parser
* Fix pipe scan parser single page row count
* Fix pipe scan parser page memory test
* Fix tsfile writer API in pipe test
---
.../scan/AlignedSinglePageWholeChunkReader.java | 55 +++++-
.../scan/MemoryControlledChunkReader.java | 71 +++++++
.../container/scan/SinglePageWholeChunkReader.java | 193 +++++++++++++++++-
.../scan/TsFileInsertionScanDataContainer.java | 55 ++++--
.../event/TsFileInsertionDataContainerTest.java | 219 +++++++++++++++++++++
5 files changed, 568 insertions(+), 25 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
index a9de04cbe64..9c3b3514c83 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
@@ -37,6 +37,7 @@ import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
/**
* The {@link AlignedSinglePageWholeChunkReader} is used to read a whole
single page aligned chunk
@@ -64,7 +65,7 @@ public class AlignedSinglePageWholeChunkReader extends
AbstractChunkReader
this.timeChunkHeader = timeChunk.getHeader();
this.timeChunkDataBuffer = timeChunk.getData();
this.pageEstimatedMemoryUsageInBytes =
- calculatePageEstimatedMemoryUsageInBytes(timeChunk, valueChunkList);
+ calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(timeChunk,
valueChunkList);
valueChunkList.forEach(
chunk -> {
@@ -206,4 +207,56 @@ public class AlignedSinglePageWholeChunkReader extends
AbstractChunkReader
return estimatedMemoryUsageInBytes;
}
+
+ public static long calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+ final Chunk timeChunk, final List<Chunk> valueChunkList) throws
IOException {
+ final List<Long> pageEstimatedMemoryUsageInBytesList =
+ calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(timeChunk,
valueChunkList);
+ return pageEstimatedMemoryUsageInBytesList.isEmpty()
+ ? 0
+ : pageEstimatedMemoryUsageInBytesList.get(0);
+ }
+
+ public static List<Long>
calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
+ final Chunk timeChunk, final List<Chunk> valueChunkList) throws
IOException {
+ final ByteBuffer timeChunkDataBuffer = timeChunk.getData().duplicate();
+ final List<ByteBuffer> valueChunkDataBufferList = new
ArrayList<>(valueChunkList.size());
+ for (final Chunk valueChunk : valueChunkList) {
+ valueChunkDataBufferList.add(
+ Objects.isNull(valueChunk) ? null :
valueChunk.getData().duplicate());
+ }
+
+ final List<Long> pageEstimatedMemoryUsageInBytesList = new ArrayList<>();
+ while (timeChunkDataBuffer.remaining() > 0) {
+ long pageUncompressedSizeInBytes = 0;
+ final PageHeader timePageHeader =
+ SinglePageWholeChunkReader.deserializePageHeader(
+ timeChunkDataBuffer, timeChunk.getHeader());
+ pageUncompressedSizeInBytes += timePageHeader.getUncompressedSize();
+ SinglePageWholeChunkReader.skipCompressedPageData(timeChunkDataBuffer,
timePageHeader);
+
+ final List<TSDataType> valueDataTypeList = new
ArrayList<>(valueChunkList.size());
+ for (int i = 0; i < valueChunkList.size(); ++i) {
+ final Chunk valueChunk = valueChunkList.get(i);
+ final ByteBuffer valueChunkDataBuffer =
valueChunkDataBufferList.get(i);
+ if (Objects.isNull(valueChunk) ||
Objects.isNull(valueChunkDataBuffer)) {
+ valueDataTypeList.add(null);
+ continue;
+ }
+
+ final PageHeader valuePageHeader =
+ SinglePageWholeChunkReader.deserializePageHeader(
+ valueChunkDataBuffer, valueChunk.getHeader());
+ pageUncompressedSizeInBytes += valuePageHeader.getUncompressedSize();
+ valueDataTypeList.add(valueChunk.getHeader().getDataType());
+
SinglePageWholeChunkReader.skipCompressedPageData(valueChunkDataBuffer,
valuePageHeader);
+ }
+ pageEstimatedMemoryUsageInBytesList.add(
+
SinglePageWholeChunkReader.estimatePageMemoryUsageInBytesWithBatchData(
+ pageUncompressedSizeInBytes,
+ SinglePageWholeChunkReader.getPageRowCount(timePageHeader,
timeChunk),
+ valueDataTypeList));
+ }
+ return
SinglePageWholeChunkReader.toSuffixMaxList(pageEstimatedMemoryUsageInBytesList);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/MemoryControlledChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/MemoryControlledChunkReader.java
new file mode 100644
index 00000000000..6a74ffc54fc
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/MemoryControlledChunkReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
+
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.reader.IChunkReader;
+import org.apache.tsfile.read.reader.IPageReader;
+
+import java.io.IOException;
+import java.util.List;
+
+class MemoryControlledChunkReader implements IChunkReader,
EstimatedMemoryChunkReader {
+
+ private final IChunkReader delegate;
+ private final List<Long> pageEstimatedMemoryUsageInBytesList;
+ private int pageIndex;
+
+ MemoryControlledChunkReader(
+ final IChunkReader delegate, final List<Long>
pageEstimatedMemoryUsageInBytesList) {
+ this.delegate = delegate;
+ this.pageEstimatedMemoryUsageInBytesList =
pageEstimatedMemoryUsageInBytesList;
+ }
+
+ @Override
+ public long getCurrentPageEstimatedMemoryUsageInBytes() {
+ return pageIndex < pageEstimatedMemoryUsageInBytesList.size()
+ ? pageEstimatedMemoryUsageInBytesList.get(pageIndex)
+ : 0;
+ }
+
+ @Override
+ public boolean hasNextSatisfiedPage() throws IOException {
+ return delegate.hasNextSatisfiedPage();
+ }
+
+ @Override
+ public BatchData nextPageData() throws IOException {
+ try {
+ return delegate.nextPageData();
+ } finally {
+ ++pageIndex;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public List<IPageReader> loadPageReaderList() throws IOException {
+ return delegate.loadPageReaderList();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
index ade50012903..f41a6861120 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.statistics.Statistics;
@@ -28,10 +30,15 @@ import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
import org.apache.tsfile.read.reader.page.LazyLoadPageData;
import org.apache.tsfile.read.reader.page.PageReader;
+import org.apache.tsfile.utils.RamUsageEstimator;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
public class SinglePageWholeChunkReader extends AbstractChunkReader
implements EstimatedMemoryChunkReader {
@@ -44,7 +51,8 @@ public class SinglePageWholeChunkReader extends
AbstractChunkReader
this.chunkHeader = chunk.getHeader();
this.chunkDataBuffer = chunk.getData();
- this.pageEstimatedMemoryUsageInBytes =
calculatePageEstimatedMemoryUsageInBytes(chunk);
+ this.pageEstimatedMemoryUsageInBytes =
+ calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(chunk);
initAllPageReaders();
}
@@ -81,11 +89,190 @@ public class SinglePageWholeChunkReader extends
AbstractChunkReader
public static long calculatePageEstimatedMemoryUsageInBytes(final Chunk
chunk)
throws IOException {
final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
- final PageHeader pageHeader =
- PageHeader.deserializeFrom(chunkDataBuffer, (Statistics<? extends
Serializable>) null);
+ final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer,
chunk.getHeader());
return pageHeader.getUncompressedSize();
}
+ public static long calculateMaxPageEstimatedMemoryUsageInBytes(final Chunk
chunk)
+ throws IOException {
+ final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+ long maxPageEstimatedMemoryUsageInBytes = 0;
+ while (chunkDataBuffer.remaining() > 0) {
+ final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer,
chunk.getHeader());
+ maxPageEstimatedMemoryUsageInBytes =
+ Math.max(maxPageEstimatedMemoryUsageInBytes,
pageHeader.getUncompressedSize());
+ skipCompressedPageData(chunkDataBuffer, pageHeader);
+ }
+ return maxPageEstimatedMemoryUsageInBytes;
+ }
+
+ public static long
calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(final Chunk chunk)
+ throws IOException {
+ final List<Long> pageEstimatedMemoryUsageInBytesList =
+ calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(chunk);
+ return pageEstimatedMemoryUsageInBytesList.isEmpty()
+ ? 0
+ : pageEstimatedMemoryUsageInBytesList.get(0);
+ }
+
+ public static List<Long>
calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
+ final Chunk chunk) throws IOException {
+ final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+ final List<Long> pageEstimatedMemoryUsageInBytesList = new ArrayList<>();
+ while (chunkDataBuffer.remaining() > 0) {
+ final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer,
chunk.getHeader());
+ pageEstimatedMemoryUsageInBytesList.add(
+ estimatePageMemoryUsageInBytesWithBatchData(
+ pageHeader, chunk,
Collections.singletonList(chunk.getHeader().getDataType())));
+ skipCompressedPageData(chunkDataBuffer, pageHeader);
+ }
+ return toSuffixMaxList(pageEstimatedMemoryUsageInBytesList);
+ }
+
+ static PageHeader deserializePageHeader(
+ final ByteBuffer chunkDataBuffer, final ChunkHeader chunkHeader) throws
IOException {
+ return isSinglePageChunk(chunkHeader)
+ ? PageHeader.deserializeFrom(chunkDataBuffer, (Statistics<? extends
Serializable>) null)
+ : PageHeader.deserializeFrom(chunkDataBuffer,
chunkHeader.getDataType());
+ }
+
+ static boolean isSinglePageChunk(final ChunkHeader chunkHeader) {
+ return (chunkHeader.getChunkType() & 0x3F) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER;
+ }
+
+ static void skipCompressedPageData(
+ final ByteBuffer chunkDataBuffer, final PageHeader pageHeader) {
+ chunkDataBuffer.position(chunkDataBuffer.position() +
pageHeader.getCompressedSize());
+ }
+
+ static List<Long> toSuffixMaxList(final List<Long>
pageEstimatedMemoryUsageInBytesList) {
+ long suffixMaxPageEstimatedMemoryUsageInBytes = 0;
+ for (int i = pageEstimatedMemoryUsageInBytesList.size() - 1; i >= 0; --i) {
+ suffixMaxPageEstimatedMemoryUsageInBytes =
+ Math.max(
+ suffixMaxPageEstimatedMemoryUsageInBytes,
pageEstimatedMemoryUsageInBytesList.get(i));
+ pageEstimatedMemoryUsageInBytesList.set(i,
suffixMaxPageEstimatedMemoryUsageInBytes);
+ }
+ return pageEstimatedMemoryUsageInBytesList;
+ }
+
+ static long estimatePageMemoryUsageInBytesWithBatchData(
+ final PageHeader timePageHeader,
+ final Chunk timeChunk,
+ final List<TSDataType> valueDataTypeList) {
+ return estimatePageMemoryUsageInBytesWithBatchData(
+ timePageHeader.getUncompressedSize(),
+ getPageRowCount(timePageHeader, timeChunk),
+ valueDataTypeList);
+ }
+
+ static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) {
+ if (isSinglePageChunk(chunk.getHeader())) {
+ return Objects.isNull(chunk.getChunkStatistic())
+ ? 0
+ : saturateToInt(chunk.getChunkStatistic().getCount());
+ }
+ return saturateToInt(pageHeader.getNumOfValues());
+ }
+
+ private static int saturateToInt(final long value) {
+ return (int) Math.min(Integer.MAX_VALUE, value);
+ }
+
+ static long estimatePageMemoryUsageInBytesWithBatchData(
+ final long pageUncompressedSizeInBytes,
+ final int rowCount,
+ final List<TSDataType> valueDataTypeList) {
+ return pageUncompressedSizeInBytes
+ + estimateBatchDataMemoryUsageInBytes(rowCount, valueDataTypeList);
+ }
+
+ private static long estimateBatchDataMemoryUsageInBytes(
+ final int rowCount, final List<TSDataType> valueDataTypeList) {
+ final int valueCount = valueDataTypeList.size();
+ final long segmentCount = Math.max(1, (rowCount + 15L) / 16);
+ long estimatedMemoryUsageInBytes = RamUsageEstimator.sizeOfLongArray(16) *
segmentCount;
+
+ if (valueCount == 1) {
+ estimatedMemoryUsageInBytes +=
+ estimateSingleValueArrayMemoryUsageInBytes(rowCount,
valueDataTypeList.get(0));
+ } else if (valueCount > 1) {
+ estimatedMemoryUsageInBytes += RamUsageEstimator.sizeOfObjectArray(16) *
segmentCount;
+ estimatedMemoryUsageInBytes +=
+ (long) rowCount
+ * (RamUsageEstimator.sizeOfObjectArray(valueCount)
+ + estimateVectorValueMemoryUsageInBytes(valueDataTypeList));
+ }
+
+ return estimatedMemoryUsageInBytes;
+ }
+
+ private static long estimateSingleValueArrayMemoryUsageInBytes(
+ final int rowCount, final TSDataType dataType) {
+ final long segmentCount = Math.max(1, (rowCount + 15L) / 16);
+ if (Objects.isNull(dataType)) {
+ return 0;
+ }
+
+ switch (dataType) {
+ case BOOLEAN:
+ return RamUsageEstimator.sizeOfBooleanArray(16) * segmentCount;
+ case INT32:
+ case DATE:
+ return RamUsageEstimator.sizeOfIntArray(16) * segmentCount;
+ case INT64:
+ case TIMESTAMP:
+ return RamUsageEstimator.sizeOfLongArray(16) * segmentCount;
+ case FLOAT:
+ return RamUsageEstimator.sizeOfFloatArray(16) * segmentCount;
+ case DOUBLE:
+ return RamUsageEstimator.sizeOfDoubleArray(16) * segmentCount;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ return RamUsageEstimator.sizeOfObjectArray(16) * segmentCount;
+ default:
+ return 0;
+ }
+ }
+
+ private static long estimateVectorValueMemoryUsageInBytes(
+ final List<TSDataType> valueDataTypeList) {
+ long estimatedMemoryUsageInBytes = 0;
+ for (final TSDataType dataType : valueDataTypeList) {
+ if (Objects.isNull(dataType)) {
+ continue;
+ }
+
+ estimatedMemoryUsageInBytes +=
+ RamUsageEstimator.alignObjectSize(
+ RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+ + estimateTsPrimitiveTypeValueMemoryUsageInBytes(dataType));
+ }
+ return estimatedMemoryUsageInBytes;
+ }
+
+ private static long estimateTsPrimitiveTypeValueMemoryUsageInBytes(final
TSDataType dataType) {
+ switch (dataType) {
+ case BOOLEAN:
+ return 1;
+ case INT32:
+ case DATE:
+ case FLOAT:
+ return Integer.BYTES;
+ case INT64:
+ case TIMESTAMP:
+ case DOUBLE:
+ return Long.BYTES;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ return RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+ default:
+ return 0;
+ }
+ }
+
/////////////////////////////////////////////////////////////////////////////////////////////////
// util methods
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index e903c7340e4..ac1fdb94db7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -307,7 +307,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
data.next();
while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage()) {
- data = chunkReader.nextPageData();
+ data = nextPageData();
}
if (tablet != null && tablet.rowSize == tablet.getMaxRowNumber()) {
@@ -343,16 +343,18 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
do {
- resizePageDataMemoryForCurrentPageIfNeeded();
- data = chunkReader.nextPageData();
- long size = PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data);
- if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < size) {
-
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForBatchData,
size);
- }
+ data = nextPageData();
} while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage());
} while (!data.hasCurrent());
}
+ private BatchData nextPageData() throws IOException {
+ resizePageDataMemoryForCurrentPageIfNeeded();
+ final BatchData nextData = chunkReader.nextPageData();
+
resizePageDataMemoryIfNeeded(PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(nextData));
+ return nextData;
+ }
+
private void resizePageDataMemoryForCurrentPageIfNeeded() {
if (!(chunkReader instanceof EstimatedMemoryChunkReader)) {
return;
@@ -523,10 +525,8 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
timeChunkList.add(timeChunk);
isMultiPageList.add(isMultiPage);
timeChunkPageMemorySizeList.add(
- isMultiPage
- ? 0
- :
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
- timeChunk));
+
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(
+ timeChunk));
break;
}
@@ -572,10 +572,14 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
Chunk chunk =
new Chunk(
chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
+ final List<Long> pageEstimatedMemoryUsageInBytesList =
+ SinglePageWholeChunkReader
+
.calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(chunk);
chunkReader =
currentIsMultiPage
- ? new ChunkReader(chunk, filter)
+ ? new MemoryControlledChunkReader(
+ new ChunkReader(chunk, filter),
pageEstimatedMemoryUsageInBytesList)
: new SinglePageWholeChunkReader(chunk);
currentIsAligned = false;
final String measurementID =
@@ -649,8 +653,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
chunk =
new Chunk(
chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
- currentValueChunkPageMemorySize =
- calculatePageMemorySizeIfSinglePageValueChunk(chunk);
+ currentValueChunkPageMemorySize =
calculateMaxPageMemorySize(chunk);
boolean needReturn = false;
final long timeChunkSize =
lastIndex >= 0
@@ -687,8 +690,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
chunk = firstChunk4NextSequentialValueChunks;
chunkHeader = chunk.getHeader();
firstChunk4NextSequentialValueChunks = null;
- currentValueChunkPageMemorySize =
- calculatePageMemorySizeIfSinglePageValueChunk(chunk);
+ currentValueChunkPageMemorySize =
calculateMaxPageMemorySize(chunk);
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList,
chunkHeader);
resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
valueChunkList, currentValueChunkPageMemorySize);
@@ -759,9 +761,22 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
timeChunk, valueChunkList));
}
+ final List<Long> pageEstimatedMemoryUsageInBytesList =
+ currentIsMultiPage
+ ? AlignedSinglePageWholeChunkReader
+ .calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
+ timeChunk, valueChunkList)
+ : Collections.emptyList();
+ final long maxPageEstimatedMemoryUsageInBytes =
+ pageEstimatedMemoryUsageInBytesList.isEmpty()
+ ? 0
+ : pageEstimatedMemoryUsageInBytesList.get(0);
+ resizePageDataMemoryIfNeeded(maxPageEstimatedMemoryUsageInBytes);
chunkReader =
currentIsMultiPage
- ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
+ ? new MemoryControlledChunkReader(
+ new AlignedChunkReader(timeChunk, valueChunkList, filter),
+ pageEstimatedMemoryUsageInBytesList)
: new AlignedSinglePageWholeChunkReader(timeChunk,
valueChunkList);
currentIsAligned = true;
lastMarker = marker;
@@ -802,10 +817,8 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
}
- private long calculatePageMemorySizeIfSinglePageValueChunk(final Chunk
chunk) throws IOException {
- return isSinglePageValueChunk(chunk.getHeader())
- ?
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk)
- : 0;
+ private long calculateMaxPageMemorySize(final Chunk chunk) throws
IOException {
+ return
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(chunk);
}
private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 7fe514b277e..36f56e0e606 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.AlignedSinglePageWholeChunkReader;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.SinglePageWholeChunkReader;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
@@ -130,6 +131,85 @@ public class TsFileInsertionDataContainerTest {
System.out.println(System.currentTimeMillis() - startTime);
}
+ @Test
+ public void
testScanParserSplitNonAlignedSinglePageChunkByEstimatedPageMemory() throws
Exception {
+ final long originalPipeMaxReaderChunkSize =
+ CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+ final int originalPageSizeInByte =
+ TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ final int originalMaxNumberOfPointsInPage =
+
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
+ try {
+ TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024);
+
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10000);
+
+ final int measurementCount = 16;
+ final int rowCount = 64;
+ final List<MeasurementSchema> schemaList = new ArrayList<>();
+ for (int i = 0; i < measurementCount; ++i) {
+ schemaList.add(
+ new MeasurementSchema(
+ "s" + i, TSDataType.STRING, TSEncoding.PLAIN,
CompressionType.LZ4));
+ }
+
+ nonalignedTsFile = new
File("nonaligned-single-page-high-compression.tsfile");
+ final Tablet tablet = new Tablet("root.sg.d", schemaList, rowCount);
+ final Binary value =
+ new Binary(new String(new char[512]).replace('\0', 'a'),
TSFileConfig.STRING_CHARSET);
+ for (int row = 0; row < rowCount; ++row) {
+ tablet.addTimestamp(row, row);
+ for (int measurementIndex = 0; measurementIndex < measurementCount;
++measurementIndex) {
+ tablet.addValue("s" + measurementIndex, row, value);
+ }
+ }
+ tablet.rowSize = rowCount;
+
+ try (final TsFileWriter writer = new TsFileWriter(nonalignedTsFile)) {
+ writer.registerTimeseries(new Path("root.sg.d"), schemaList);
+ writer.write(tablet);
+ }
+
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(
+
calculatePipeMaxReaderChunkSizeForSinglePageNonAlignedChunk(nonalignedTsFile));
+
+ int tabletCount = 0;
+ int maxMeasurementCount = 0;
+ int pointCount = 0;
+ try (final TsFileInsertionScanDataContainer parser =
+ new TsFileInsertionScanDataContainer(
+ nonalignedTsFile,
+ new PrefixPipePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ false)) {
+ for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
+ Assert.assertFalse(tabletWithIsAligned.getRight());
+ final Tablet parsedTablet = tabletWithIsAligned.getLeft();
+ tabletCount++;
+ maxMeasurementCount = Math.max(maxMeasurementCount,
parsedTablet.getSchemas().size());
+ pointCount += getNonNullSize(parsedTablet);
+ }
+ }
+
+ Assert.assertTrue(tabletCount > 1);
+ Assert.assertTrue(maxMeasurementCount < measurementCount);
+ Assert.assertEquals(measurementCount * rowCount, pointCount);
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+
TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originalPageSizeInByte);
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setMaxNumberOfPointsInPage(originalMaxNumberOfPointsInPage);
+ }
+ }
+
@Test
public void testScanParserSplitAlignedSinglePageChunkByEstimatedPageMemory()
throws Exception {
final long originalPipeMaxReaderChunkSize =
@@ -209,6 +289,85 @@ public class TsFileInsertionDataContainerTest {
}
}
+ @Test
+ public void testScanParserSplitAlignedMultiPageChunkByEstimatedPageMemory()
throws Exception {
+ final long originalPipeMaxReaderChunkSize =
+ CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+ final int originalPageSizeInByte =
+ TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ final int originalMaxNumberOfPointsInPage =
+
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
+ try {
+ TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024);
+
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(32);
+
+ final int measurementCount = 16;
+ final int rowCount = 64;
+ final List<MeasurementSchema> schemaList = new ArrayList<>();
+ for (int i = 0; i < measurementCount; ++i) {
+ schemaList.add(
+ new MeasurementSchema(
+ "s" + i, TSDataType.STRING, TSEncoding.PLAIN,
CompressionType.LZ4));
+ }
+
+ alignedTsFile = new File("aligned-multi-page-high-compression.tsfile");
+ final Tablet tablet = new Tablet("root.sg.d", schemaList, rowCount);
+ final Binary value =
+ new Binary(new String(new char[512]).replace('\0', 'a'),
TSFileConfig.STRING_CHARSET);
+ for (int row = 0; row < rowCount; ++row) {
+ tablet.addTimestamp(row, row);
+ for (int measurementIndex = 0; measurementIndex < measurementCount;
++measurementIndex) {
+ tablet.addValue("s" + measurementIndex, row, value);
+ }
+ }
+ tablet.rowSize = rowCount;
+
+ try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+ writer.registerAlignedTimeseries(new Path("root.sg.d"), schemaList);
+ writer.writeAligned(tablet);
+ }
+
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(
+
calculatePipeMaxReaderChunkSizeForMultiPageAlignedChunk(alignedTsFile));
+
+ int tabletCount = 0;
+ int maxMeasurementCount = 0;
+ int pointCount = 0;
+ try (final TsFileInsertionScanDataContainer parser =
+ new TsFileInsertionScanDataContainer(
+ alignedTsFile,
+ new PrefixPipePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ false)) {
+ for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
+ Assert.assertTrue(tabletWithIsAligned.getRight());
+ final Tablet parsedTablet = tabletWithIsAligned.getLeft();
+ tabletCount++;
+ maxMeasurementCount = Math.max(maxMeasurementCount,
parsedTablet.getSchemas().size());
+ pointCount += getNonNullSize(parsedTablet);
+ }
+ }
+
+ Assert.assertTrue(tabletCount > 1);
+ Assert.assertTrue(maxMeasurementCount < measurementCount);
+ Assert.assertEquals(measurementCount * rowCount, pointCount);
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+
TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originalPageSizeInByte);
+ TSFileDescriptor.getInstance()
+ .getConfig()
+ .setMaxNumberOfPointsInPage(originalMaxNumberOfPointsInPage);
+ }
+ }
+
@Test
public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk()
throws Exception {
final long originalPipeMaxReaderChunkSize =
@@ -840,4 +999,64 @@ public class TsFileInsertionDataContainerTest {
return chunkSizeLimit;
}
}
+
+ private long
calculatePipeMaxReaderChunkSizeForSinglePageNonAlignedChunk(final File tsFile)
+ throws Exception {
+ try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+ final IDeviceID deviceID =
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+ final List<String> measurements =
reader.getDeviceMeasurementsMap().get(deviceID);
+ Assert.assertFalse(measurements.isEmpty());
+
+ long chunkSizeLimit = 0;
+ long estimatedPageMemorySize = 0;
+ for (final String measurement : measurements) {
+ final List<ChunkMetadata> chunkMetadataList =
+ reader.getChunkMetadataList(new Path(deviceID, measurement,
false));
+ Assert.assertEquals(1, chunkMetadataList.size());
+
+ final Chunk chunk = reader.readMemChunk(chunkMetadataList.get(0));
+ Assert.assertEquals(
+ MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER,
chunk.getHeader().getChunkType() & 0x3F);
+ chunkSizeLimit += chunk.getHeader().getDataSize();
+ estimatedPageMemorySize +=
+
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+ chunk);
+ }
+
+ Assert.assertTrue(estimatedPageMemorySize > chunkSizeLimit);
+ return chunkSizeLimit;
+ }
+ }
+
+ private long calculatePipeMaxReaderChunkSizeForMultiPageAlignedChunk(final
File tsFile)
+ throws Exception {
+ try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+ final List<IDeviceID> deviceIDList = reader.getAllDevices();
+ Assert.assertEquals(1, deviceIDList.size());
+ final IDeviceID deviceID = deviceIDList.get(0);
+ final List<AlignedChunkMetadata> alignedChunkMetadataList =
+ reader.getAlignedChunkMetadata(deviceID);
+ Assert.assertEquals(1, alignedChunkMetadataList.size());
+
+ final AlignedChunkMetadata alignedChunkMetadata =
alignedChunkMetadataList.get(0);
+ final Chunk timeChunk =
+ reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
+ Assert.assertEquals(MetaMarker.CHUNK_HEADER,
timeChunk.getHeader().getChunkType() & 0x3F);
+
+ long chunkSizeLimit =
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+ long estimatedMaxPageMemorySize =
+
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(timeChunk);
+ for (final IChunkMetadata valueChunkMetadata :
+ alignedChunkMetadata.getValueChunkMetadataList()) {
+ final Chunk valueChunk = reader.readMemChunk((ChunkMetadata)
valueChunkMetadata);
+ Assert.assertEquals(MetaMarker.CHUNK_HEADER,
valueChunk.getHeader().getChunkType() & 0x3F);
+ chunkSizeLimit += valueChunk.getHeader().getDataSize();
+ estimatedMaxPageMemorySize +=
+
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(valueChunk);
+ }
+
+ Assert.assertTrue(estimatedMaxPageMemorySize > chunkSizeLimit);
+ return chunkSizeLimit;
+ }
+ }
}