This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch chunk-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/chunk-fix by this push:
     new 435ea419e58 shop
435ea419e58 is described below

commit 435ea419e58c2774ef2e7829c7a458bf9364f9b2
Author: Caideyipi <[email protected]>
AuthorDate: Tue May 12 10:48:40 2026 +0800

    shop
---
 .../scan/TsFileInsertionEventScanParser.java       | 58 +++++++++++-----------
 .../pipe/event/TsFileInsertionEventParserTest.java |  5 +-
 2 files changed, 32 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 492bd4f51e4..7ed3bf5af33 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -373,7 +373,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
       }
 
       do {
-        resizeBatchDataMemoryForCurrentPageIfNeeded();
+        resizePageDataMemoryForCurrentPageIfNeeded();
         data = chunkReader.nextPageData();
         long size = PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data);
         if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < size) {
@@ -383,13 +383,17 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     } while (!data.hasCurrent());
   }
 
-  private void resizeBatchDataMemoryForCurrentPageIfNeeded() {
+  private void resizePageDataMemoryForCurrentPageIfNeeded() {
     if (!(chunkReader instanceof EstimatedMemoryChunkReader)) {
       return;
     }
 
     final long estimatedMemoryUsageInBytes =
         ((EstimatedMemoryChunkReader) 
chunkReader).getCurrentPageEstimatedMemoryUsageInBytes();
+    resizePageDataMemoryIfNeeded(estimatedMemoryUsageInBytes);
+  }
+
+  private void resizePageDataMemoryIfNeeded(final long 
estimatedMemoryUsageInBytes) {
     if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < 
estimatedMemoryUsageInBytes) {
       PipeDataNodeResourceManager.memory()
           .forceResize(allocatedMemoryBlockForBatchData, 
estimatedMemoryUsageInBytes);
@@ -529,15 +533,6 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
             Chunk chunk =
                 new Chunk(
                     chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
-            if (!currentIsMultiPage) {
-              final long pageEstimatedMemoryUsageInBytes =
-                  
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk);
-              if (pageEstimatedMemoryUsageInBytes
-                  > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
-                PipeDataNodeResourceManager.memory()
-                    .forceResize(allocatedMemoryBlockForChunk, 
pageEstimatedMemoryUsageInBytes);
-              }
-            }
 
             chunkReader =
                 currentIsMultiPage
@@ -607,22 +602,20 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                       || timeChunkPageMemorySize > 0
                           && chunkPageMemorySize > 0
                           && pageMemorySize + chunkPageMemorySize
-                              > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
-                    if (valueChunkList.size() == 1
-                        && Math.max(
-                                chunkSize,
-                                timeChunkPageMemorySize > 0 && 
chunkPageMemorySize > 0
-                                    ? pageMemorySize
-                                    : 0)
-                            > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
-                      PipeDataNodeResourceManager.memory()
-                          .forceResize(
-                              allocatedMemoryBlockForChunk,
-                              Math.max(
-                                  chunkSize,
-                                  timeChunkPageMemorySize > 0 && 
chunkPageMemorySize > 0
-                                      ? pageMemorySize
-                                      : 0));
+                              > getPageDataMemoryLimitInBytes()) {
+                    if (valueChunkList.size() == 1) {
+                      if (chunkSize > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+                        PipeDataNodeResourceManager.memory()
+                            .forceResize(allocatedMemoryBlockForChunk, 
chunkSize);
+                      }
+                      final long currentPageMemorySize =
+                          timeChunkPageMemorySize > 0 && 
valueChunkPageMemorySize > 0
+                              ? pageMemorySize
+                              : 0;
+                      if (currentPageMemorySize > 
getPageDataMemoryLimitInBytes()) {
+                        PipeDataNodeResourceManager.memory()
+                            .forceResize(allocatedMemoryBlockForBatchData, 
currentPageMemorySize);
+                      }
                     }
                     needReturn = recordAlignedChunk(valueChunkList, marker);
                   }
@@ -690,6 +683,10 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     }
   }
 
+  private long getPageDataMemoryLimitInBytes() {
+    return PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+  }
+
   private boolean filterChunk(
       final long currentChunkHeaderOffset,
       final ChunkHeader chunkHeader,
@@ -773,6 +770,11 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
       final Chunk timeChunk = timeChunkList.get(lastIndex);
       timeChunk.getData().rewind();
       currentIsMultiPage = isMultiPageList.get(lastIndex);
+      if (!currentIsMultiPage) {
+        resizePageDataMemoryIfNeeded(
+            
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
+                timeChunk, valueChunkList));
+      }
       chunkReader =
           currentIsMultiPage
               ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
@@ -785,7 +787,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
   }
 
   private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
-    return (chunkHeader.getChunkType() & 0x3F) == 
MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER;
+    return (chunkHeader.getChunkType() & 0x3F) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER;
   }
 
   private byte toValueChunkMarker(final ChunkHeader chunkHeader) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 5d3cf6a74ea..91bfa9c5d3b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -766,7 +766,7 @@ public class TsFileInsertionEventParserTest {
       final Chunk timeChunk =
           reader.readMemChunk((ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
       Assert.assertEquals(
-          MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER, 
timeChunk.getHeader().getChunkType() & 0x3F);
+          MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, 
timeChunk.getHeader().getChunkType() & 0x3F);
 
       final List<Chunk> valueChunkList = new ArrayList<>();
       long chunkSizeLimit = 
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
@@ -774,8 +774,7 @@ public class TsFileInsertionEventParserTest {
           alignedChunkMetadata.getValueChunkMetadataList()) {
         final Chunk valueChunk = reader.readMemChunk((ChunkMetadata) 
valueChunkMetadata);
         Assert.assertEquals(
-            MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER,
-            valueChunk.getHeader().getChunkType() & 0x3F);
+            MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, 
valueChunk.getHeader().getChunkType() & 0x3F);
         valueChunkList.add(valueChunk);
         chunkSizeLimit += valueChunk.getHeader().getDataSize();
       }

Reply via email to