This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch chunk-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/chunk-fix by this push:
new 435ea419e58 shop
435ea419e58 is described below
commit 435ea419e58c2774ef2e7829c7a458bf9364f9b2
Author: Caideyipi <[email protected]>
AuthorDate: Tue May 12 10:48:40 2026 +0800
shop
---
.../scan/TsFileInsertionEventScanParser.java | 58 +++++++++++-----------
.../pipe/event/TsFileInsertionEventParserTest.java | 5 +-
2 files changed, 32 insertions(+), 31 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 492bd4f51e4..7ed3bf5af33 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -373,7 +373,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
}
do {
- resizeBatchDataMemoryForCurrentPageIfNeeded();
+ resizePageDataMemoryForCurrentPageIfNeeded();
data = chunkReader.nextPageData();
long size = PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data);
if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < size) {
@@ -383,13 +383,17 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
} while (!data.hasCurrent());
}
- private void resizeBatchDataMemoryForCurrentPageIfNeeded() {
+ private void resizePageDataMemoryForCurrentPageIfNeeded() {
if (!(chunkReader instanceof EstimatedMemoryChunkReader)) {
return;
}
final long estimatedMemoryUsageInBytes =
((EstimatedMemoryChunkReader)
chunkReader).getCurrentPageEstimatedMemoryUsageInBytes();
+ resizePageDataMemoryIfNeeded(estimatedMemoryUsageInBytes);
+ }
+
+ private void resizePageDataMemoryIfNeeded(final long
estimatedMemoryUsageInBytes) {
if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() <
estimatedMemoryUsageInBytes) {
PipeDataNodeResourceManager.memory()
.forceResize(allocatedMemoryBlockForBatchData,
estimatedMemoryUsageInBytes);
@@ -529,15 +533,6 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
Chunk chunk =
new Chunk(
chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
- if (!currentIsMultiPage) {
- final long pageEstimatedMemoryUsageInBytes =
-
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk);
- if (pageEstimatedMemoryUsageInBytes
- > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
- PipeDataNodeResourceManager.memory()
- .forceResize(allocatedMemoryBlockForChunk,
pageEstimatedMemoryUsageInBytes);
- }
- }
chunkReader =
currentIsMultiPage
@@ -607,22 +602,20 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
|| timeChunkPageMemorySize > 0
&& chunkPageMemorySize > 0
&& pageMemorySize + chunkPageMemorySize
- >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
- if (valueChunkList.size() == 1
- && Math.max(
- chunkSize,
- timeChunkPageMemorySize > 0 &&
chunkPageMemorySize > 0
- ? pageMemorySize
- : 0)
- >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
- PipeDataNodeResourceManager.memory()
- .forceResize(
- allocatedMemoryBlockForChunk,
- Math.max(
- chunkSize,
- timeChunkPageMemorySize > 0 &&
chunkPageMemorySize > 0
- ? pageMemorySize
- : 0));
+ > getPageDataMemoryLimitInBytes()) {
+ if (valueChunkList.size() == 1) {
+ if (chunkSize >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForChunk,
chunkSize);
+ }
+ final long currentPageMemorySize =
+ timeChunkPageMemorySize > 0 &&
valueChunkPageMemorySize > 0
+ ? pageMemorySize
+ : 0;
+ if (currentPageMemorySize >
getPageDataMemoryLimitInBytes()) {
+ PipeDataNodeResourceManager.memory()
+ .forceResize(allocatedMemoryBlockForBatchData,
currentPageMemorySize);
+ }
}
needReturn = recordAlignedChunk(valueChunkList, marker);
}
@@ -690,6 +683,10 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
}
}
+ private long getPageDataMemoryLimitInBytes() {
+ return PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+ }
+
private boolean filterChunk(
final long currentChunkHeaderOffset,
final ChunkHeader chunkHeader,
@@ -773,6 +770,11 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
final Chunk timeChunk = timeChunkList.get(lastIndex);
timeChunk.getData().rewind();
currentIsMultiPage = isMultiPageList.get(lastIndex);
+ if (!currentIsMultiPage) {
+ resizePageDataMemoryIfNeeded(
+
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
+ timeChunk, valueChunkList));
+ }
chunkReader =
currentIsMultiPage
? new AlignedChunkReader(timeChunk, valueChunkList, filter)
@@ -785,7 +787,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
}
private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
- return (chunkHeader.getChunkType() & 0x3F) ==
MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER;
+ return (chunkHeader.getChunkType() & 0x3F) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER;
}
private byte toValueChunkMarker(final ChunkHeader chunkHeader) {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 5d3cf6a74ea..91bfa9c5d3b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -766,7 +766,7 @@ public class TsFileInsertionEventParserTest {
final Chunk timeChunk =
reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
Assert.assertEquals(
- MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER,
timeChunk.getHeader().getChunkType() & 0x3F);
+ MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER,
timeChunk.getHeader().getChunkType() & 0x3F);
final List<Chunk> valueChunkList = new ArrayList<>();
long chunkSizeLimit =
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
@@ -774,8 +774,7 @@ public class TsFileInsertionEventParserTest {
alignedChunkMetadata.getValueChunkMetadataList()) {
final Chunk valueChunk = reader.readMemChunk((ChunkMetadata)
valueChunkMetadata);
Assert.assertEquals(
- MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER,
- valueChunk.getHeader().getChunkType() & 0x3F);
+ MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER,
valueChunk.getHeader().getChunkType() & 0x3F);
valueChunkList.add(valueChunk);
chunkSizeLimit += valueChunk.getHeader().getDataSize();
}