This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 30b4d49fa6b [to dev/1.3] Pipe: account page decode memory in scan 
parser (#17807) (#17833)
30b4d49fa6b is described below

commit 30b4d49fa6b7dd1626533a2e06aaf657f300f8e7
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 3 18:27:44 2026 +0800

    [to dev/1.3] Pipe: account page decode memory in scan parser (#17807) 
(#17833)
    
    * Pipe: account page decode memory in scan parser (#17807)
    
    * Pipe: account page decode memory in scan parser
    
    * Fix pipe scan parser single page row count
    
    * Fix pipe scan parser page memory test
    
    * Fix tsfile writer API in pipe test
---
 .../scan/AlignedSinglePageWholeChunkReader.java    |  55 +++++-
 .../scan/MemoryControlledChunkReader.java          |  71 +++++++
 .../container/scan/SinglePageWholeChunkReader.java | 193 +++++++++++++++++-
 .../scan/TsFileInsertionScanDataContainer.java     |  55 ++++--
 .../event/TsFileInsertionDataContainerTest.java    | 219 +++++++++++++++++++++
 5 files changed, 568 insertions(+), 25 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
index a9de04cbe64..9c3b3514c83 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/AlignedSinglePageWholeChunkReader.java
@@ -37,6 +37,7 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * The {@link AlignedSinglePageWholeChunkReader} is used to read a whole 
single page aligned chunk
@@ -64,7 +65,7 @@ public class AlignedSinglePageWholeChunkReader extends 
AbstractChunkReader
     this.timeChunkHeader = timeChunk.getHeader();
     this.timeChunkDataBuffer = timeChunk.getData();
     this.pageEstimatedMemoryUsageInBytes =
-        calculatePageEstimatedMemoryUsageInBytes(timeChunk, valueChunkList);
+        calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(timeChunk, 
valueChunkList);
 
     valueChunkList.forEach(
         chunk -> {
@@ -206,4 +207,56 @@ public class AlignedSinglePageWholeChunkReader extends 
AbstractChunkReader
 
     return estimatedMemoryUsageInBytes;
   }
+
+  public static long calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+      final Chunk timeChunk, final List<Chunk> valueChunkList) throws 
IOException {
+    final List<Long> pageEstimatedMemoryUsageInBytesList =
+        calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(timeChunk, 
valueChunkList);
+    return pageEstimatedMemoryUsageInBytesList.isEmpty()
+        ? 0
+        : pageEstimatedMemoryUsageInBytesList.get(0);
+  }
+
+  public static List<Long> 
calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
+      final Chunk timeChunk, final List<Chunk> valueChunkList) throws 
IOException {
+    final ByteBuffer timeChunkDataBuffer = timeChunk.getData().duplicate();
+    final List<ByteBuffer> valueChunkDataBufferList = new 
ArrayList<>(valueChunkList.size());
+    for (final Chunk valueChunk : valueChunkList) {
+      valueChunkDataBufferList.add(
+          Objects.isNull(valueChunk) ? null : 
valueChunk.getData().duplicate());
+    }
+
+    final List<Long> pageEstimatedMemoryUsageInBytesList = new ArrayList<>();
+    while (timeChunkDataBuffer.remaining() > 0) {
+      long pageUncompressedSizeInBytes = 0;
+      final PageHeader timePageHeader =
+          SinglePageWholeChunkReader.deserializePageHeader(
+              timeChunkDataBuffer, timeChunk.getHeader());
+      pageUncompressedSizeInBytes += timePageHeader.getUncompressedSize();
+      SinglePageWholeChunkReader.skipCompressedPageData(timeChunkDataBuffer, 
timePageHeader);
+
+      final List<TSDataType> valueDataTypeList = new 
ArrayList<>(valueChunkList.size());
+      for (int i = 0; i < valueChunkList.size(); ++i) {
+        final Chunk valueChunk = valueChunkList.get(i);
+        final ByteBuffer valueChunkDataBuffer = 
valueChunkDataBufferList.get(i);
+        if (Objects.isNull(valueChunk) || 
Objects.isNull(valueChunkDataBuffer)) {
+          valueDataTypeList.add(null);
+          continue;
+        }
+
+        final PageHeader valuePageHeader =
+            SinglePageWholeChunkReader.deserializePageHeader(
+                valueChunkDataBuffer, valueChunk.getHeader());
+        pageUncompressedSizeInBytes += valuePageHeader.getUncompressedSize();
+        valueDataTypeList.add(valueChunk.getHeader().getDataType());
+        
SinglePageWholeChunkReader.skipCompressedPageData(valueChunkDataBuffer, 
valuePageHeader);
+      }
+      pageEstimatedMemoryUsageInBytesList.add(
+          
SinglePageWholeChunkReader.estimatePageMemoryUsageInBytesWithBatchData(
+              pageUncompressedSizeInBytes,
+              SinglePageWholeChunkReader.getPageRowCount(timePageHeader, 
timeChunk),
+              valueDataTypeList));
+    }
+    return 
SinglePageWholeChunkReader.toSuffixMaxList(pageEstimatedMemoryUsageInBytesList);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/MemoryControlledChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/MemoryControlledChunkReader.java
new file mode 100644
index 00000000000..6a74ffc54fc
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/MemoryControlledChunkReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
+
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.reader.IChunkReader;
+import org.apache.tsfile.read.reader.IPageReader;
+
+import java.io.IOException;
+import java.util.List;
+
+class MemoryControlledChunkReader implements IChunkReader, 
EstimatedMemoryChunkReader {
+
+  private final IChunkReader delegate;
+  private final List<Long> pageEstimatedMemoryUsageInBytesList;
+  private int pageIndex;
+
+  MemoryControlledChunkReader(
+      final IChunkReader delegate, final List<Long> 
pageEstimatedMemoryUsageInBytesList) {
+    this.delegate = delegate;
+    this.pageEstimatedMemoryUsageInBytesList = 
pageEstimatedMemoryUsageInBytesList;
+  }
+
+  @Override
+  public long getCurrentPageEstimatedMemoryUsageInBytes() {
+    return pageIndex < pageEstimatedMemoryUsageInBytesList.size()
+        ? pageEstimatedMemoryUsageInBytesList.get(pageIndex)
+        : 0;
+  }
+
+  @Override
+  public boolean hasNextSatisfiedPage() throws IOException {
+    return delegate.hasNextSatisfiedPage();
+  }
+
+  @Override
+  public BatchData nextPageData() throws IOException {
+    try {
+      return delegate.nextPageData();
+    } finally {
+      ++pageIndex;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    delegate.close();
+  }
+
+  @Override
+  public List<IPageReader> loadPageReaderList() throws IOException {
+    return delegate.loadPageReaderList();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
index ade50012903..f41a6861120 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
@@ -21,6 +21,8 @@ package 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
 
 import org.apache.tsfile.compress.IUnCompressor;
 import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.MetaMarker;
 import org.apache.tsfile.file.header.ChunkHeader;
 import org.apache.tsfile.file.header.PageHeader;
 import org.apache.tsfile.file.metadata.statistics.Statistics;
@@ -28,10 +30,15 @@ import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.read.reader.chunk.AbstractChunkReader;
 import org.apache.tsfile.read.reader.page.LazyLoadPageData;
 import org.apache.tsfile.read.reader.page.PageReader;
+import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
 
 public class SinglePageWholeChunkReader extends AbstractChunkReader
     implements EstimatedMemoryChunkReader {
@@ -44,7 +51,8 @@ public class SinglePageWholeChunkReader extends 
AbstractChunkReader
 
     this.chunkHeader = chunk.getHeader();
     this.chunkDataBuffer = chunk.getData();
-    this.pageEstimatedMemoryUsageInBytes = 
calculatePageEstimatedMemoryUsageInBytes(chunk);
+    this.pageEstimatedMemoryUsageInBytes =
+        calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(chunk);
     initAllPageReaders();
   }
 
@@ -81,11 +89,190 @@ public class SinglePageWholeChunkReader extends 
AbstractChunkReader
   public static long calculatePageEstimatedMemoryUsageInBytes(final Chunk 
chunk)
       throws IOException {
     final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
-    final PageHeader pageHeader =
-        PageHeader.deserializeFrom(chunkDataBuffer, (Statistics<? extends 
Serializable>) null);
+    final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer, 
chunk.getHeader());
     return pageHeader.getUncompressedSize();
   }
 
+  public static long calculateMaxPageEstimatedMemoryUsageInBytes(final Chunk 
chunk)
+      throws IOException {
+    final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+    long maxPageEstimatedMemoryUsageInBytes = 0;
+    while (chunkDataBuffer.remaining() > 0) {
+      final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer, 
chunk.getHeader());
+      maxPageEstimatedMemoryUsageInBytes =
+          Math.max(maxPageEstimatedMemoryUsageInBytes, 
pageHeader.getUncompressedSize());
+      skipCompressedPageData(chunkDataBuffer, pageHeader);
+    }
+    return maxPageEstimatedMemoryUsageInBytes;
+  }
+
+  public static long 
calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(final Chunk chunk)
+      throws IOException {
+    final List<Long> pageEstimatedMemoryUsageInBytesList =
+        calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(chunk);
+    return pageEstimatedMemoryUsageInBytesList.isEmpty()
+        ? 0
+        : pageEstimatedMemoryUsageInBytesList.get(0);
+  }
+
+  public static List<Long> 
calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
+      final Chunk chunk) throws IOException {
+    final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+    final List<Long> pageEstimatedMemoryUsageInBytesList = new ArrayList<>();
+    while (chunkDataBuffer.remaining() > 0) {
+      final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer, 
chunk.getHeader());
+      pageEstimatedMemoryUsageInBytesList.add(
+          estimatePageMemoryUsageInBytesWithBatchData(
+              pageHeader, chunk, 
Collections.singletonList(chunk.getHeader().getDataType())));
+      skipCompressedPageData(chunkDataBuffer, pageHeader);
+    }
+    return toSuffixMaxList(pageEstimatedMemoryUsageInBytesList);
+  }
+
+  static PageHeader deserializePageHeader(
+      final ByteBuffer chunkDataBuffer, final ChunkHeader chunkHeader) throws 
IOException {
+    return isSinglePageChunk(chunkHeader)
+        ? PageHeader.deserializeFrom(chunkDataBuffer, (Statistics<? extends 
Serializable>) null)
+        : PageHeader.deserializeFrom(chunkDataBuffer, 
chunkHeader.getDataType());
+  }
+
+  static boolean isSinglePageChunk(final ChunkHeader chunkHeader) {
+    return (chunkHeader.getChunkType() & 0x3F) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER;
+  }
+
+  static void skipCompressedPageData(
+      final ByteBuffer chunkDataBuffer, final PageHeader pageHeader) {
+    chunkDataBuffer.position(chunkDataBuffer.position() + 
pageHeader.getCompressedSize());
+  }
+
+  static List<Long> toSuffixMaxList(final List<Long> 
pageEstimatedMemoryUsageInBytesList) {
+    long suffixMaxPageEstimatedMemoryUsageInBytes = 0;
+    for (int i = pageEstimatedMemoryUsageInBytesList.size() - 1; i >= 0; --i) {
+      suffixMaxPageEstimatedMemoryUsageInBytes =
+          Math.max(
+              suffixMaxPageEstimatedMemoryUsageInBytes, 
pageEstimatedMemoryUsageInBytesList.get(i));
+      pageEstimatedMemoryUsageInBytesList.set(i, 
suffixMaxPageEstimatedMemoryUsageInBytes);
+    }
+    return pageEstimatedMemoryUsageInBytesList;
+  }
+
+  static long estimatePageMemoryUsageInBytesWithBatchData(
+      final PageHeader timePageHeader,
+      final Chunk timeChunk,
+      final List<TSDataType> valueDataTypeList) {
+    return estimatePageMemoryUsageInBytesWithBatchData(
+        timePageHeader.getUncompressedSize(),
+        getPageRowCount(timePageHeader, timeChunk),
+        valueDataTypeList);
+  }
+
+  static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) {
+    if (isSinglePageChunk(chunk.getHeader())) {
+      return Objects.isNull(chunk.getChunkStatistic())
+          ? 0
+          : saturateToInt(chunk.getChunkStatistic().getCount());
+    }
+    return saturateToInt(pageHeader.getNumOfValues());
+  }
+
+  private static int saturateToInt(final long value) {
+    return (int) Math.min(Integer.MAX_VALUE, value);
+  }
+
+  static long estimatePageMemoryUsageInBytesWithBatchData(
+      final long pageUncompressedSizeInBytes,
+      final int rowCount,
+      final List<TSDataType> valueDataTypeList) {
+    return pageUncompressedSizeInBytes
+        + estimateBatchDataMemoryUsageInBytes(rowCount, valueDataTypeList);
+  }
+
+  private static long estimateBatchDataMemoryUsageInBytes(
+      final int rowCount, final List<TSDataType> valueDataTypeList) {
+    final int valueCount = valueDataTypeList.size();
+    final long segmentCount = Math.max(1, (rowCount + 15L) / 16);
+    long estimatedMemoryUsageInBytes = RamUsageEstimator.sizeOfLongArray(16) * 
segmentCount;
+
+    if (valueCount == 1) {
+      estimatedMemoryUsageInBytes +=
+          estimateSingleValueArrayMemoryUsageInBytes(rowCount, 
valueDataTypeList.get(0));
+    } else if (valueCount > 1) {
+      estimatedMemoryUsageInBytes += RamUsageEstimator.sizeOfObjectArray(16) * 
segmentCount;
+      estimatedMemoryUsageInBytes +=
+          (long) rowCount
+              * (RamUsageEstimator.sizeOfObjectArray(valueCount)
+                  + estimateVectorValueMemoryUsageInBytes(valueDataTypeList));
+    }
+
+    return estimatedMemoryUsageInBytes;
+  }
+
+  private static long estimateSingleValueArrayMemoryUsageInBytes(
+      final int rowCount, final TSDataType dataType) {
+    final long segmentCount = Math.max(1, (rowCount + 15L) / 16);
+    if (Objects.isNull(dataType)) {
+      return 0;
+    }
+
+    switch (dataType) {
+      case BOOLEAN:
+        return RamUsageEstimator.sizeOfBooleanArray(16) * segmentCount;
+      case INT32:
+      case DATE:
+        return RamUsageEstimator.sizeOfIntArray(16) * segmentCount;
+      case INT64:
+      case TIMESTAMP:
+        return RamUsageEstimator.sizeOfLongArray(16) * segmentCount;
+      case FLOAT:
+        return RamUsageEstimator.sizeOfFloatArray(16) * segmentCount;
+      case DOUBLE:
+        return RamUsageEstimator.sizeOfDoubleArray(16) * segmentCount;
+      case TEXT:
+      case BLOB:
+      case STRING:
+        return RamUsageEstimator.sizeOfObjectArray(16) * segmentCount;
+      default:
+        return 0;
+    }
+  }
+
+  private static long estimateVectorValueMemoryUsageInBytes(
+      final List<TSDataType> valueDataTypeList) {
+    long estimatedMemoryUsageInBytes = 0;
+    for (final TSDataType dataType : valueDataTypeList) {
+      if (Objects.isNull(dataType)) {
+        continue;
+      }
+
+      estimatedMemoryUsageInBytes +=
+          RamUsageEstimator.alignObjectSize(
+              RamUsageEstimator.NUM_BYTES_OBJECT_HEADER
+                  + estimateTsPrimitiveTypeValueMemoryUsageInBytes(dataType));
+    }
+    return estimatedMemoryUsageInBytes;
+  }
+
+  private static long estimateTsPrimitiveTypeValueMemoryUsageInBytes(final 
TSDataType dataType) {
+    switch (dataType) {
+      case BOOLEAN:
+        return 1;
+      case INT32:
+      case DATE:
+      case FLOAT:
+        return Integer.BYTES;
+      case INT64:
+      case TIMESTAMP:
+      case DOUBLE:
+        return Long.BYTES;
+      case TEXT:
+      case BLOB:
+      case STRING:
+        return RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+      default:
+        return 0;
+    }
+  }
+
   
/////////////////////////////////////////////////////////////////////////////////////////////////
   // util methods
   
/////////////////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index e903c7340e4..ac1fdb94db7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -307,7 +307,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
 
         data.next();
         while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage()) {
-          data = chunkReader.nextPageData();
+          data = nextPageData();
         }
 
         if (tablet != null && tablet.rowSize == tablet.getMaxRowNumber()) {
@@ -343,16 +343,18 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
       }
 
       do {
-        resizePageDataMemoryForCurrentPageIfNeeded();
-        data = chunkReader.nextPageData();
-        long size = PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(data);
-        if (allocatedMemoryBlockForBatchData.getMemoryUsageInBytes() < size) {
-          
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForBatchData,
 size);
-        }
+        data = nextPageData();
       } while (!data.hasCurrent() && chunkReader.hasNextSatisfiedPage());
     } while (!data.hasCurrent());
   }
 
+  private BatchData nextPageData() throws IOException {
+    resizePageDataMemoryForCurrentPageIfNeeded();
+    final BatchData nextData = chunkReader.nextPageData();
+    
resizePageDataMemoryIfNeeded(PipeMemoryWeightUtil.calculateBatchDataRamBytesUsed(nextData));
+    return nextData;
+  }
+
   private void resizePageDataMemoryForCurrentPageIfNeeded() {
     if (!(chunkReader instanceof EstimatedMemoryChunkReader)) {
       return;
@@ -523,10 +525,8 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
               timeChunkList.add(timeChunk);
               isMultiPageList.add(isMultiPage);
               timeChunkPageMemorySizeList.add(
-                  isMultiPage
-                      ? 0
-                      : 
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
-                          timeChunk));
+                  
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(
+                      timeChunk));
               break;
             }
 
@@ -572,10 +572,14 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
             Chunk chunk =
                 new Chunk(
                     chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
+            final List<Long> pageEstimatedMemoryUsageInBytesList =
+                SinglePageWholeChunkReader
+                    
.calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(chunk);
 
             chunkReader =
                 currentIsMultiPage
-                    ? new ChunkReader(chunk, filter)
+                    ? new MemoryControlledChunkReader(
+                        new ChunkReader(chunk, filter), 
pageEstimatedMemoryUsageInBytesList)
                     : new SinglePageWholeChunkReader(chunk);
             currentIsAligned = false;
             final String measurementID =
@@ -649,8 +653,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
               chunk =
                   new Chunk(
                       chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
-              currentValueChunkPageMemorySize =
-                  calculatePageMemorySizeIfSinglePageValueChunk(chunk);
+              currentValueChunkPageMemorySize = 
calculateMaxPageMemorySize(chunk);
               boolean needReturn = false;
               final long timeChunkSize =
                   lastIndex >= 0
@@ -687,8 +690,7 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
               chunk = firstChunk4NextSequentialValueChunks;
               chunkHeader = chunk.getHeader();
               firstChunk4NextSequentialValueChunks = null;
-              currentValueChunkPageMemorySize =
-                  calculatePageMemorySizeIfSinglePageValueChunk(chunk);
+              currentValueChunkPageMemorySize = 
calculateMaxPageMemorySize(chunk);
               
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
               resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
                   valueChunkList, currentValueChunkPageMemorySize);
@@ -759,9 +761,22 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
             
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
                 timeChunk, valueChunkList));
       }
+      final List<Long> pageEstimatedMemoryUsageInBytesList =
+          currentIsMultiPage
+              ? AlignedSinglePageWholeChunkReader
+                  .calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
+                      timeChunk, valueChunkList)
+              : Collections.emptyList();
+      final long maxPageEstimatedMemoryUsageInBytes =
+          pageEstimatedMemoryUsageInBytesList.isEmpty()
+              ? 0
+              : pageEstimatedMemoryUsageInBytesList.get(0);
+      resizePageDataMemoryIfNeeded(maxPageEstimatedMemoryUsageInBytes);
       chunkReader =
           currentIsMultiPage
-              ? new AlignedChunkReader(timeChunk, valueChunkList, filter)
+              ? new MemoryControlledChunkReader(
+                  new AlignedChunkReader(timeChunk, valueChunkList, filter),
+                  pageEstimatedMemoryUsageInBytesList)
               : new AlignedSinglePageWholeChunkReader(timeChunk, 
valueChunkList);
       currentIsAligned = true;
       lastMarker = marker;
@@ -802,10 +817,8 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     }
   }
 
-  private long calculatePageMemorySizeIfSinglePageValueChunk(final Chunk 
chunk) throws IOException {
-    return isSinglePageValueChunk(chunk.getHeader())
-        ? 
SinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(chunk)
-        : 0;
+  private long calculateMaxPageMemorySize(final Chunk chunk) throws 
IOException {
+    return 
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(chunk);
   }
 
   private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 7fe514b277e..36f56e0e606 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.AlignedSinglePageWholeChunkReader;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.SinglePageWholeChunkReader;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
@@ -130,6 +131,85 @@ public class TsFileInsertionDataContainerTest {
     System.out.println(System.currentTimeMillis() - startTime);
   }
 
+  @Test
+  public void 
testScanParserSplitNonAlignedSinglePageChunkByEstimatedPageMemory() throws 
Exception {
+    final long originalPipeMaxReaderChunkSize =
+        CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+    final int originalPageSizeInByte =
+        TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    final int originalMaxNumberOfPointsInPage =
+        
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
+    try {
+      TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024);
+      
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10000);
+
+      final int measurementCount = 16;
+      final int rowCount = 64;
+      final List<MeasurementSchema> schemaList = new ArrayList<>();
+      for (int i = 0; i < measurementCount; ++i) {
+        schemaList.add(
+            new MeasurementSchema(
+                "s" + i, TSDataType.STRING, TSEncoding.PLAIN, 
CompressionType.LZ4));
+      }
+
+      nonalignedTsFile = new 
File("nonaligned-single-page-high-compression.tsfile");
+      final Tablet tablet = new Tablet("root.sg.d", schemaList, rowCount);
+      final Binary value =
+          new Binary(new String(new char[512]).replace('\0', 'a'), 
TSFileConfig.STRING_CHARSET);
+      for (int row = 0; row < rowCount; ++row) {
+        tablet.addTimestamp(row, row);
+        for (int measurementIndex = 0; measurementIndex < measurementCount; 
++measurementIndex) {
+          tablet.addValue("s" + measurementIndex, row, value);
+        }
+      }
+      tablet.rowSize = rowCount;
+
+      try (final TsFileWriter writer = new TsFileWriter(nonalignedTsFile)) {
+        writer.registerTimeseries(new Path("root.sg.d"), schemaList);
+        writer.write(tablet);
+      }
+
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(
+              
calculatePipeMaxReaderChunkSizeForSinglePageNonAlignedChunk(nonalignedTsFile));
+
+      int tabletCount = 0;
+      int maxMeasurementCount = 0;
+      int pointCount = 0;
+      try (final TsFileInsertionScanDataContainer parser =
+          new TsFileInsertionScanDataContainer(
+              nonalignedTsFile,
+              new PrefixPipePattern("root"),
+              Long.MIN_VALUE,
+              Long.MAX_VALUE,
+              null,
+              null,
+              false)) {
+        for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
parser.toTabletWithIsAligneds()) {
+          Assert.assertFalse(tabletWithIsAligned.getRight());
+          final Tablet parsedTablet = tabletWithIsAligned.getLeft();
+          tabletCount++;
+          maxMeasurementCount = Math.max(maxMeasurementCount, 
parsedTablet.getSchemas().size());
+          pointCount += getNonNullSize(parsedTablet);
+        }
+      }
+
+      Assert.assertTrue(tabletCount > 1);
+      Assert.assertTrue(maxMeasurementCount < measurementCount);
+      Assert.assertEquals(measurementCount * rowCount, pointCount);
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+      
TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originalPageSizeInByte);
+      TSFileDescriptor.getInstance()
+          .getConfig()
+          .setMaxNumberOfPointsInPage(originalMaxNumberOfPointsInPage);
+    }
+  }
+
   @Test
   public void testScanParserSplitAlignedSinglePageChunkByEstimatedPageMemory() 
throws Exception {
     final long originalPipeMaxReaderChunkSize =
@@ -209,6 +289,85 @@ public class TsFileInsertionDataContainerTest {
     }
   }
 
+  @Test
+  public void testScanParserSplitAlignedMultiPageChunkByEstimatedPageMemory() 
throws Exception {
+    final long originalPipeMaxReaderChunkSize =
+        CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+    final int originalPageSizeInByte =
+        TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+    final int originalMaxNumberOfPointsInPage =
+        
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+
+    try {
+      TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(64 * 1024);
+      
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(32);
+
+      final int measurementCount = 16;
+      final int rowCount = 64;
+      final List<MeasurementSchema> schemaList = new ArrayList<>();
+      for (int i = 0; i < measurementCount; ++i) {
+        schemaList.add(
+            new MeasurementSchema(
+                "s" + i, TSDataType.STRING, TSEncoding.PLAIN, 
CompressionType.LZ4));
+      }
+
+      alignedTsFile = new File("aligned-multi-page-high-compression.tsfile");
+      final Tablet tablet = new Tablet("root.sg.d", schemaList, rowCount);
+      final Binary value =
+          new Binary(new String(new char[512]).replace('\0', 'a'), 
TSFileConfig.STRING_CHARSET);
+      for (int row = 0; row < rowCount; ++row) {
+        tablet.addTimestamp(row, row);
+        for (int measurementIndex = 0; measurementIndex < measurementCount; 
++measurementIndex) {
+          tablet.addValue("s" + measurementIndex, row, value);
+        }
+      }
+      tablet.rowSize = rowCount;
+
+      try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+        writer.registerAlignedTimeseries(new Path("root.sg.d"), schemaList);
+        writer.writeAligned(tablet);
+      }
+
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(
+              
calculatePipeMaxReaderChunkSizeForMultiPageAlignedChunk(alignedTsFile));
+
+      int tabletCount = 0;
+      int maxMeasurementCount = 0;
+      int pointCount = 0;
+      try (final TsFileInsertionScanDataContainer parser =
+          new TsFileInsertionScanDataContainer(
+              alignedTsFile,
+              new PrefixPipePattern("root"),
+              Long.MIN_VALUE,
+              Long.MAX_VALUE,
+              null,
+              null,
+              false)) {
+        for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
parser.toTabletWithIsAligneds()) {
+          Assert.assertTrue(tabletWithIsAligned.getRight());
+          final Tablet parsedTablet = tabletWithIsAligned.getLeft();
+          tabletCount++;
+          maxMeasurementCount = Math.max(maxMeasurementCount, 
parsedTablet.getSchemas().size());
+          pointCount += getNonNullSize(parsedTablet);
+        }
+      }
+
+      Assert.assertTrue(tabletCount > 1);
+      Assert.assertTrue(maxMeasurementCount < measurementCount);
+      Assert.assertEquals(measurementCount * rowCount, pointCount);
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+      
TSFileDescriptor.getInstance().getConfig().setPageSizeInByte(originalPageSizeInByte);
+      TSFileDescriptor.getInstance()
+          .getConfig()
+          .setMaxNumberOfPointsInPage(originalMaxNumberOfPointsInPage);
+    }
+  }
+
   @Test
   public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() 
throws Exception {
     final long originalPipeMaxReaderChunkSize =
@@ -840,4 +999,64 @@ public class TsFileInsertionDataContainerTest {
       return chunkSizeLimit;
     }
   }
+
+  private long 
calculatePipeMaxReaderChunkSizeForSinglePageNonAlignedChunk(final File tsFile)
+      throws Exception {
+    try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+      final IDeviceID deviceID = 
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+      final List<String> measurements = 
reader.getDeviceMeasurementsMap().get(deviceID);
+      Assert.assertFalse(measurements.isEmpty());
+
+      long chunkSizeLimit = 0;
+      long estimatedPageMemorySize = 0;
+      for (final String measurement : measurements) {
+        final List<ChunkMetadata> chunkMetadataList =
+            reader.getChunkMetadataList(new Path(deviceID, measurement, 
false));
+        Assert.assertEquals(1, chunkMetadataList.size());
+
+        final Chunk chunk = reader.readMemChunk(chunkMetadataList.get(0));
+        Assert.assertEquals(
+            MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, 
chunk.getHeader().getChunkType() & 0x3F);
+        chunkSizeLimit += chunk.getHeader().getDataSize();
+        estimatedPageMemorySize +=
+            
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+                chunk);
+      }
+
+      Assert.assertTrue(estimatedPageMemorySize > chunkSizeLimit);
+      return chunkSizeLimit;
+    }
+  }
+
+  private long calculatePipeMaxReaderChunkSizeForMultiPageAlignedChunk(final 
File tsFile)
+      throws Exception {
+    try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+      final List<IDeviceID> deviceIDList = reader.getAllDevices();
+      Assert.assertEquals(1, deviceIDList.size());
+      final IDeviceID deviceID = deviceIDList.get(0);
+      final List<AlignedChunkMetadata> alignedChunkMetadataList =
+          reader.getAlignedChunkMetadata(deviceID);
+      Assert.assertEquals(1, alignedChunkMetadataList.size());
+
+      final AlignedChunkMetadata alignedChunkMetadata = 
alignedChunkMetadataList.get(0);
+      final Chunk timeChunk =
+          reader.readMemChunk((ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
+      Assert.assertEquals(MetaMarker.CHUNK_HEADER, 
timeChunk.getHeader().getChunkType() & 0x3F);
+
+      long chunkSizeLimit = 
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+      long estimatedMaxPageMemorySize =
+          
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(timeChunk);
+      for (final IChunkMetadata valueChunkMetadata :
+          alignedChunkMetadata.getValueChunkMetadataList()) {
+        final Chunk valueChunk = reader.readMemChunk((ChunkMetadata) 
valueChunkMetadata);
+        Assert.assertEquals(MetaMarker.CHUNK_HEADER, 
valueChunk.getHeader().getChunkType() & 0x3F);
+        chunkSizeLimit += valueChunk.getHeader().getDataSize();
+        estimatedMaxPageMemorySize +=
+            
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(valueChunk);
+      }
+
+      Assert.assertTrue(estimatedMaxPageMemorySize > chunkSizeLimit);
+      return chunkSizeLimit;
+    }
+  }
 }


Reply via email to