This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f96fc5824f6 Pipe: merge batched aligned chunks in scan parser (#18010)
f96fc5824f6 is described below

commit f96fc5824f62b4c637c0d9b5e9ea4adc9f8b1853
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 26 14:19:52 2026 +0800

    Pipe: merge batched aligned chunks in scan parser (#18010)
    
    * Pipe: merge batched aligned chunks in scan parser
    
    * Test pipe batched aligned chunk memory boundaries
    
    * Pipe: fix batched aligned scan parser memory split
    
    * Update TsFileInsertionEventParserTest.java
    
    * Rename pending aligned chunk consumer
---
 .../parser/scan/SinglePageWholeChunkReader.java    |  38 +-
 .../scan/TsFileInsertionEventScanParser.java       | 383 +++++++++++++++------
 .../pipe/resource/memory/PipeMemoryWeightUtil.java |  18 +-
 .../pipe/event/TsFileInsertionEventParserTest.java | 343 +++++++++++++++++-
 4 files changed, 662 insertions(+), 120 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
index 1be5565b394..075e47296ce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan;
 
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
 
+import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.compress.IUnCompressor;
 import org.apache.tsfile.encoding.decoder.Decoder;
 import org.apache.tsfile.encrypt.EncryptParameter;
@@ -169,22 +170,49 @@ public class SinglePageWholeChunkReader extends 
AbstractChunkReader
   static long estimatePageMemoryUsageInBytesWithBatchData(
       final PageHeader timePageHeader,
       final Chunk timeChunk,
-      final List<TSDataType> valueDataTypeList) {
+      final List<TSDataType> valueDataTypeList)
+      throws IOException {
     return estimatePageMemoryUsageInBytesWithBatchData(
         timePageHeader.getUncompressedSize(),
         getPageRowCount(timePageHeader, timeChunk),
         valueDataTypeList);
   }
 
-  static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) {
+  static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) 
throws IOException {
     if (isSinglePageChunk(chunk.getHeader())) {
-      return Objects.isNull(chunk.getChunkStatistic())
-          ? 0
-          : saturateToInt(chunk.getChunkStatistic().getCount());
+      if (Objects.nonNull(chunk.getChunkStatistic())) {
+        return saturateToInt(chunk.getChunkStatistic().getCount());
+      }
+      return isTimeChunk(chunk.getHeader()) ? countSinglePageTimeValues(chunk) 
: 0;
     }
     return saturateToInt(pageHeader.getNumOfValues());
   }
 
+  private static int countSinglePageTimeValues(final Chunk chunk) throws 
IOException {
+    final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+    final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer, 
chunk.getHeader());
+    final ByteBuffer pageData =
+        deserializePageData(
+            pageHeader,
+            chunkDataBuffer,
+            chunk.getHeader(),
+            IDecryptor.getDecryptor(chunk.getEncryptParam()));
+    final Decoder decoder =
+        Decoder.getDecoderByType(chunk.getHeader().getEncodingType(), 
TSDataType.INT64);
+
+    int rowCount = 0;
+    while (decoder.hasNext(pageData)) {
+      decoder.readLong(pageData);
+      ++rowCount;
+    }
+    return rowCount;
+  }
+
+  private static boolean isTimeChunk(final ChunkHeader chunkHeader) {
+    return (chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+        == TsFileConstant.TIME_COLUMN_MASK;
+  }
+
   private static int saturateToInt(final long value) {
     return (int) Math.min(Integer.MAX_VALUE, value);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 4c5cd75d4c1..e3af5aaa0c1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -72,6 +72,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -103,11 +104,11 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
   // Cached time chunk
   private final List<Chunk> timeChunkList = new ArrayList<>();
   private final List<Boolean> isMultiPageList = new ArrayList<>();
-  private final List<Long> timeChunkPageMemorySizeList = new ArrayList<>();
 
   private final Map<String, Integer> measurementIndexMap = new HashMap<>();
-  private int lastIndex = -1;
-  private Chunk firstChunk4NextSequentialValueChunks;
+  private final List<PendingAlignedChunkGroup> pendingAlignedChunkGroups = new 
ArrayList<>();
+  private long pendingAlignedChunkSize;
+  private CachedAlignedValueChunk cachedAlignedValueChunk;
 
   private byte lastMarker = Byte.MIN_VALUE;
 
@@ -588,14 +589,14 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
   private void moveToNextChunkReader()
       throws IOException, IllegalStateException, IllegalPathException {
     ChunkHeader chunkHeader;
-    long valueChunkSize = 0;
-    long valueChunkPageMemorySize = 0;
-    final List<Chunk> valueChunkList = new ArrayList<>();
     currentMeasurements.clear();
     modsInfos.clear();
 
     if (lastMarker == MetaMarker.SEPARATOR) {
-      chunkReader = null;
+      if (!useNextPendingAlignedChunk(lastMarker)) {
+        clearCachedAlignedChunkData();
+        chunkReader = null;
+      }
       return;
     }
 
@@ -603,8 +604,8 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     while ((marker =
             lastMarker != Byte.MIN_VALUE
                 ? lastMarker
-                : Objects.nonNull(firstChunk4NextSequentialValueChunks)
-                    ? 
toValueChunkMarker(firstChunk4NextSequentialValueChunks.getHeader())
+                : Objects.nonNull(cachedAlignedValueChunk)
+                    ? 
toValueChunkMarker(cachedAlignedValueChunk.chunk.getHeader())
                     : tsFileSequenceReader.readMarker())
         != MetaMarker.SEPARATOR) {
       lastMarker = Byte.MIN_VALUE;
@@ -658,9 +659,8 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
         case MetaMarker.VALUE_CHUNK_HEADER:
         case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
           {
-            Chunk chunk;
-            long currentValueChunkPageMemorySize = 0;
-            if (Objects.isNull(firstChunk4NextSequentialValueChunks)) {
+            CachedAlignedValueChunk valueChunk = cachedAlignedValueChunk;
+            if (Objects.isNull(valueChunk)) {
               final long currentChunkHeaderOffset = 
tsFileSequenceReader.position() - 1;
               chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
 
@@ -668,7 +668,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                 break;
               }
 
-              // Increase value index
+              // Increase value index.
               final String measurementID =
                   
tabletStringInternPool.intern(chunkHeader.getMeasurementID());
               final int valueIndex =
@@ -676,86 +676,32 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                       measurementID,
                       (measurement, index) -> Objects.nonNull(index) ? index + 
1 : 0);
 
-              // Emit when encountered non-sequential value chunk, or the 
chunk size exceeds
-              // certain value to avoid OOM
-              // Do not record or end current value chunks when there are 
empty chunks
+              // Do not record or end current value chunks when there are 
empty chunks.
               if (chunkHeader.getDataSize() == 0) {
                 break;
               }
-              chunk =
+              final Chunk chunk =
                   new Chunk(
                       chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
-              currentValueChunkPageMemorySize = 
calculateMaxPageMemorySize(chunk);
-              boolean needReturn = false;
-              final long timeChunkSize =
-                  lastIndex >= 0
-                      ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
-                          timeChunkList.get(lastIndex))
-                      : 0;
-              final long timeChunkPageMemorySize =
-                  lastIndex >= 0 ? timeChunkPageMemorySizeList.get(lastIndex) 
: 0;
-              if (lastIndex >= 0) {
-                if (valueIndex != lastIndex) {
-                  needReturn = recordAlignedChunk(valueChunkList, marker);
-                } else {
-                  final long chunkSize = timeChunkSize + valueChunkSize;
-                  final long pageMemorySize = timeChunkPageMemorySize + 
valueChunkPageMemorySize;
-                  if (chunkSize + chunkHeader.getDataSize()
-                          > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()
-                      || timeChunkPageMemorySize > 0
-                          && currentValueChunkPageMemorySize > 0
-                          && pageMemorySize + currentValueChunkPageMemorySize
-                              > getPageDataMemoryLimitInBytes()) {
-                    needReturn = recordAlignedChunk(valueChunkList, marker);
-                  }
-                }
-              }
-              lastIndex = valueIndex;
-              if (needReturn) {
-                firstChunk4NextSequentialValueChunks = chunk;
-                return;
-              }
-              
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
-              resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
-                  valueChunkList, currentValueChunkPageMemorySize);
+              valueChunk =
+                  new CachedAlignedValueChunk(valueIndex, chunk, 
chunkHeader.getDataSize());
             } else {
-              chunk = firstChunk4NextSequentialValueChunks;
-              chunkHeader = chunk.getHeader();
-              firstChunk4NextSequentialValueChunks = null;
-              currentValueChunkPageMemorySize = 
calculateMaxPageMemorySize(chunk);
-              
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
-              resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
-                  valueChunkList, currentValueChunkPageMemorySize);
+              cachedAlignedValueChunk = null;
             }
 
-            valueChunkSize += chunkHeader.getDataSize();
-            valueChunkPageMemorySize += currentValueChunkPageMemorySize;
-            valueChunkList.add(chunk);
-            final String measurementID =
-                tabletStringInternPool.intern(chunkHeader.getMeasurementID());
-            currentMeasurements.add(
-                new MeasurementSchema(
-                    measurementID,
-                    chunkHeader.getDataType(),
-                    chunkHeader.getEncodingType(),
-                    chunkHeader.getCompressionType()));
-            modsInfos.addAll(
-                ModsOperationUtil.initializeMeasurementMods(
-                    currentDevice, Collections.singletonList(measurementID), 
currentModifications));
+            if (returnPendingAlignedChunkBeforeCaching(valueChunk)) {
+              return;
+            }
+            cacheAlignedValueChunk(valueChunk);
             break;
           }
         case MetaMarker.CHUNK_GROUP_HEADER:
           {
-            // Return before "currentDevice" changes
-            if (recordAlignedChunk(valueChunkList, marker)) {
+            // Return before "currentDevice" changes.
+            if (useNextPendingAlignedChunk(marker)) {
               return;
             }
-            // Clear because the cached data will never be used in the next 
chunk group
-            lastIndex = -1;
-            timeChunkList.clear();
-            isMultiPageList.clear();
-            timeChunkPageMemorySizeList.clear();
-            measurementIndexMap.clear();
+            clearCachedAlignedChunkData();
             final IDeviceID deviceID = 
tsFileSequenceReader.readChunkGroupHeader().getDeviceID();
             currentDevice = treePattern.mayOverlapWithDevice(deviceID) ? 
deviceID : null;
             currentDeviceString =
@@ -775,7 +721,8 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     }
 
     lastMarker = marker;
-    if (!recordAlignedChunk(valueChunkList, marker)) {
+    if (!useNextPendingAlignedChunk(marker)) {
+      clearCachedAlignedChunkData();
       chunkReader = null;
     }
   }
@@ -784,6 +731,10 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     return PipeConfig.getInstance().getPipeMaxReaderChunkSize();
   }
 
+  private long getChunkMemoryLimitInBytes() {
+    return PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+  }
+
   private boolean filterChunk(
       final long currentChunkHeaderOffset,
       final ChunkHeader chunkHeader,
@@ -805,8 +756,6 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
         timeChunkList.add(timeChunk);
         final boolean isMultiPage = marker == MetaMarker.TIME_CHUNK_HEADER;
         isMultiPageList.add(isMultiPage);
-        timeChunkPageMemorySizeList.add(
-            
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(timeChunk));
         return true;
       }
     }
@@ -859,22 +808,38 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     return false;
   }
 
-  private boolean recordAlignedChunk(final List<Chunk> valueChunkList, final 
byte marker)
-      throws IOException {
-    if (!valueChunkList.isEmpty()) {
-      final Chunk timeChunk = timeChunkList.get(lastIndex);
+  private boolean useNextPendingAlignedChunk(final byte marker) throws 
IOException {
+    while (!pendingAlignedChunkGroups.isEmpty()) {
+      final PendingAlignedChunkGroup pendingAlignedChunkGroup = 
pendingAlignedChunkGroups.remove(0);
+      pendingAlignedChunkSize =
+          Math.max(0, pendingAlignedChunkSize - 
pendingAlignedChunkGroup.chunkSize);
+
+      if (pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
+        continue;
+      }
+
+      final Chunk timeChunk = 
timeChunkList.get(pendingAlignedChunkGroup.timeChunkIndex);
       timeChunk.getData().rewind();
-      currentIsMultiPage = isMultiPageList.get(lastIndex);
+      for (final Chunk valueChunk : pendingAlignedChunkGroup.valueChunkList) {
+        valueChunk.getData().rewind();
+      }
+
+      currentMeasurements.clear();
+      currentMeasurements.addAll(pendingAlignedChunkGroup.measurements);
+      modsInfos.clear();
+      modsInfos.addAll(pendingAlignedChunkGroup.modsInfos);
+
+      currentIsMultiPage = 
isMultiPageList.get(pendingAlignedChunkGroup.timeChunkIndex);
       if (!currentIsMultiPage) {
         resizePageDataMemoryIfNeeded(
             
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
-                timeChunk, valueChunkList));
+                timeChunk, pendingAlignedChunkGroup.valueChunkList));
       }
       final List<Long> pageEstimatedMemoryUsageInBytesList =
           currentIsMultiPage
               ? AlignedSinglePageWholeChunkReader
                   .calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
-                      timeChunk, valueChunkList)
+                      timeChunk, pendingAlignedChunkGroup.valueChunkList)
               : Collections.emptyList();
       final long maxPageEstimatedMemoryUsageInBytes =
           pageEstimatedMemoryUsageInBytesList.isEmpty()
@@ -884,50 +849,226 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
       chunkReader =
           currentIsMultiPage
               ? new MemoryControlledChunkReader(
-                  new AlignedChunkReader(timeChunk, valueChunkList, filter),
+                  new AlignedChunkReader(
+                      timeChunk, pendingAlignedChunkGroup.valueChunkList, 
filter),
                   pageEstimatedMemoryUsageInBytesList)
-              : new AlignedSinglePageWholeChunkReader(timeChunk, 
valueChunkList, null);
+              : new AlignedSinglePageWholeChunkReader(
+                  timeChunk, pendingAlignedChunkGroup.valueChunkList, null);
       currentIsAligned = true;
-      lastMarker = marker;
+      if (marker != Byte.MIN_VALUE) {
+        lastMarker = marker;
+      }
+      return true;
+    }
+    return false;
+  }
+
+  private boolean shouldReturnPendingAlignedChunkBeforeCaching(
+      final CachedAlignedValueChunk valueChunk) throws IOException {
+    validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex);
+
+    final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+        findPendingAlignedChunkGroup(valueChunk.timeChunkIndex);
+    final boolean isFirstValueChunkInGroup =
+        Objects.isNull(pendingAlignedChunkGroup)
+            || pendingAlignedChunkGroup.valueChunkList.isEmpty();
+    final long timeChunkSize =
+        Objects.isNull(pendingAlignedChunkGroup)
+            ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
+                timeChunkList.get(valueChunk.timeChunkIndex))
+            : 0;
+    final long chunkSizeAfterCaching =
+        pendingAlignedChunkSize + timeChunkSize + valueChunk.valueChunkSize;
+
+    if (isFirstValueChunkInGroup) {
+      final long firstValueChunkGroupSize =
+          timeChunkSize
+              + (Objects.isNull(pendingAlignedChunkGroup) ? 0 : 
pendingAlignedChunkGroup.chunkSize)
+              + valueChunk.valueChunkSize;
+      if (firstValueChunkGroupSize > getChunkMemoryLimitInBytes()) {
+        return !pendingAlignedChunkGroups.isEmpty();
+      }
+    }
+
+    if (!pendingAlignedChunkGroups.isEmpty()
+        && chunkSizeAfterCaching > getChunkMemoryLimitInBytes()) {
+      return true;
+    }
+
+    final long pageMemorySizeAfterCaching =
+        calculateMaxAlignedPageMemorySizeWithBatchData(
+            valueChunk.timeChunkIndex, pendingAlignedChunkGroup, valueChunk);
+    return pageMemorySizeAfterCaching > getPageDataMemoryLimitInBytes()
+        && (!isFirstValueChunkInGroup || !pendingAlignedChunkGroups.isEmpty());
+  }
+
+  private boolean returnPendingAlignedChunkBeforeCaching(final 
CachedAlignedValueChunk valueChunk)
+      throws IOException {
+    if (!shouldReturnPendingAlignedChunkBeforeCaching(valueChunk)) {
+      return false;
+    }
+
+    cachedAlignedValueChunk = valueChunk;
+    if (useNextPendingAlignedChunk(Byte.MIN_VALUE)) {
       return true;
     }
+    cachedAlignedValueChunk = null;
     return false;
   }
 
+  private void cacheAlignedValueChunk(final CachedAlignedValueChunk 
valueChunk) throws IOException {
+    validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex);
+
+    final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+        getOrCreatePendingAlignedChunkGroup(valueChunk.timeChunkIndex);
+    
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup, 
valueChunk);
+    
resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup,
 valueChunk);
+
+    pendingAlignedChunkGroup.valueChunkList.add(valueChunk.chunk);
+    pendingAlignedChunkGroup.chunkSize += valueChunk.valueChunkSize;
+    pendingAlignedChunkSize += valueChunk.valueChunkSize;
+
+    final ChunkHeader chunkHeader = valueChunk.chunk.getHeader();
+    final String measurementID = 
tabletStringInternPool.intern(chunkHeader.getMeasurementID());
+    pendingAlignedChunkGroup.measurements.add(
+        new MeasurementSchema(
+            measurementID,
+            chunkHeader.getDataType(),
+            chunkHeader.getEncodingType(),
+            chunkHeader.getCompressionType()));
+    pendingAlignedChunkGroup.modsInfos.addAll(
+        ModsOperationUtil.initializeMeasurementMods(
+            currentDevice, Collections.singletonList(measurementID), 
currentModifications));
+  }
+
+  private PendingAlignedChunkGroup getOrCreatePendingAlignedChunkGroup(final 
int timeChunkIndex) {
+    final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+        findPendingAlignedChunkGroup(timeChunkIndex);
+    if (Objects.nonNull(pendingAlignedChunkGroup)) {
+      return pendingAlignedChunkGroup;
+    }
+
+    final PendingAlignedChunkGroup newPendingAlignedChunkGroup =
+        new PendingAlignedChunkGroup(
+            timeChunkIndex,
+            
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(timeChunkIndex)));
+    pendingAlignedChunkSize += newPendingAlignedChunkGroup.chunkSize;
+
+    for (int i = 0; i < pendingAlignedChunkGroups.size(); ++i) {
+      if (pendingAlignedChunkGroups.get(i).timeChunkIndex > timeChunkIndex) {
+        pendingAlignedChunkGroups.add(i, newPendingAlignedChunkGroup);
+        return newPendingAlignedChunkGroup;
+      }
+    }
+    pendingAlignedChunkGroups.add(newPendingAlignedChunkGroup);
+    return newPendingAlignedChunkGroup;
+  }
+
+  private PendingAlignedChunkGroup findPendingAlignedChunkGroup(final int 
timeChunkIndex) {
+    for (final PendingAlignedChunkGroup pendingAlignedChunkGroup : 
pendingAlignedChunkGroups) {
+      if (pendingAlignedChunkGroup.timeChunkIndex == timeChunkIndex) {
+        return pendingAlignedChunkGroup;
+      }
+    }
+    return null;
+  }
+
+  private void validateAlignedValueChunkTimeIndex(final int timeChunkIndex) 
throws IOException {
+    if (timeChunkIndex < 0 || timeChunkIndex >= timeChunkList.size()) {
+      throw new IOException(
+          String.format(
+              "Invalid aligned value chunk index %d, while there are %d time 
chunks.",
+              timeChunkIndex, timeChunkList.size()));
+    }
+  }
+
+  private void clearCachedAlignedChunkData() {
+    pendingAlignedChunkGroups.clear();
+    pendingAlignedChunkSize = 0;
+    cachedAlignedValueChunk = null;
+    timeChunkList.clear();
+    isMultiPageList.clear();
+    measurementIndexMap.clear();
+  }
+
   private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(
-      final List<Chunk> valueChunkList, final ChunkHeader valueChunkHeader) {
-    if (!valueChunkList.isEmpty() || lastIndex < 0) {
+      final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+      final CachedAlignedValueChunk valueChunk) {
+    if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
       return;
     }
 
-    final long chunkSize =
-        
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
-            + valueChunkHeader.getDataSize();
+    final long chunkSize = pendingAlignedChunkGroup.chunkSize + 
valueChunk.valueChunkSize;
     if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
       
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
chunkSize);
     }
   }
 
   private void resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
-      final List<Chunk> valueChunkList, final long valueChunkPageMemorySize) {
-    if (!valueChunkList.isEmpty() || lastIndex < 0 || valueChunkPageMemorySize 
<= 0) {
-      return;
-    }
-
-    final long timeChunkPageMemorySize = 
timeChunkPageMemorySizeList.get(lastIndex);
-    if (timeChunkPageMemorySize <= 0) {
+      final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+      final CachedAlignedValueChunk valueChunk)
+      throws IOException {
+    if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
       return;
     }
 
-    final long pageMemorySize = timeChunkPageMemorySize + 
valueChunkPageMemorySize;
+    final long pageMemorySize =
+        calculateMaxAlignedPageMemorySizeWithBatchData(
+            pendingAlignedChunkGroup.timeChunkIndex, pendingAlignedChunkGroup, 
valueChunk);
     if (pageMemorySize > getPageDataMemoryLimitInBytes()) {
       PipeDataNodeResourceManager.memory()
           .forceResize(allocatedMemoryBlockForBatchData, pageMemorySize);
     }
   }
 
-  private long calculateMaxPageMemorySize(final Chunk chunk) throws 
IOException {
-    return 
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(chunk);
+  private long calculateMaxAlignedPageMemorySizeWithBatchData(
+      final int timeChunkIndex,
+      final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+      final CachedAlignedValueChunk valueChunk)
+      throws IOException {
+    final List<Chunk> valueChunkList =
+        new ArrayList<>(
+            (Objects.isNull(pendingAlignedChunkGroup)
+                    ? 0
+                    : pendingAlignedChunkGroup.valueChunkList.size())
+                + 1);
+    if (Objects.nonNull(pendingAlignedChunkGroup)) {
+      valueChunkList.addAll(pendingAlignedChunkGroup.valueChunkList);
+    }
+    valueChunkList.add(valueChunk.chunk);
+
+    final Chunk timeChunk = timeChunkList.get(timeChunkIndex);
+    final int timeChunkDataPosition = timeChunk.getData().position();
+    final List<Integer> valueChunkDataPositions = new 
ArrayList<>(valueChunkList.size());
+    for (final Chunk chunk : valueChunkList) {
+      valueChunkDataPositions.add(Objects.isNull(chunk) ? 0 : 
chunk.getData().position());
+    }
+
+    rewindChunkData(timeChunk);
+    valueChunkList.forEach(this::rewindChunkData);
+    try {
+      return AlignedSinglePageWholeChunkReader
+          .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(timeChunk, 
valueChunkList);
+    } finally {
+      timeChunk.getData().position(timeChunkDataPosition);
+      for (int i = 0; i < valueChunkList.size(); ++i) {
+        final Chunk chunk = valueChunkList.get(i);
+        if (Objects.nonNull(chunk)) {
+          chunk.getData().position(valueChunkDataPositions.get(i));
+        }
+      }
+    }
+  }
+
+  private void rewindChunkData(final Chunk chunk) {
+    if (Objects.isNull(chunk)) {
+      return;
+    }
+
+    final ByteBuffer chunkData = chunk.getData();
+    if (Objects.nonNull(chunkData)) {
+      chunkData.rewind();
+    }
   }
 
   private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
@@ -980,4 +1121,32 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     }
     return null;
   }
+
+  private static class PendingAlignedChunkGroup {
+
+    private final int timeChunkIndex;
+    private final List<Chunk> valueChunkList = new ArrayList<>();
+    private final List<IMeasurementSchema> measurements = new ArrayList<>();
+    private final List<ModsOperationUtil.ModsInfo> modsInfos = new 
ArrayList<>();
+    private long chunkSize;
+
+    private PendingAlignedChunkGroup(final int timeChunkIndex, final long 
timeChunkSize) {
+      this.timeChunkIndex = timeChunkIndex;
+      this.chunkSize = timeChunkSize;
+    }
+  }
+
+  private static class CachedAlignedValueChunk {
+
+    private final int timeChunkIndex;
+    private final Chunk chunk;
+    private final long valueChunkSize;
+
+    private CachedAlignedValueChunk(
+        final int timeChunkIndex, final Chunk chunk, final long 
valueChunkSize) {
+      this.timeChunkIndex = timeChunkIndex;
+      this.chunk = chunk;
+      this.valueChunkSize = valueChunkSize;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index 04eff0067e5..33ceacf278c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -191,6 +191,14 @@ public class PipeMemoryWeightUtil {
       return new Pair<>(1, 0);
     }
 
+    final int configuredTabletRowSize =
+        PipeConfig.getInstance().getPipeDataStructureTabletRowSize();
+    final boolean hasTabletRowSizeLimit = configuredTabletRowSize > 0;
+    final double inputSizeLimit =
+        hasTabletRowSizeLimit && inputNum > 0
+            ? 100 + inputNum * (double) rowBytesUsed * 1.2
+            : Integer.MAX_VALUE;
+
     // Calculate row number according to the max size of a pipe tablet. "100" 
is the estimated size
     // of other data structures in a pipe tablet.
     // "*8" converts bytes to bits, because the bitmap size is 1 bit per 
schema.
@@ -198,17 +206,15 @@ public class PipeMemoryWeightUtil {
         (int)
             Math.min(
                 
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
-                Math.min(Integer.MAX_VALUE, 100 + inputNum * (double) 
rowBytesUsed * 1.2));
+                Math.min(Integer.MAX_VALUE, inputSizeLimit));
 
     int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
     rowNumber = Math.max(1, rowNumber);
 
-    if ( // This means the row number is larger than the max row count of a 
pipe tablet
-    rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) {
+    // This means the row number is larger than the max row count of a pipe 
tablet.
+    if (hasTabletRowSizeLimit && rowNumber > configuredTabletRowSize) {
       // Bound the row number, the memory cost is rowSize * rowNumber
-      return new Pair<>(
-          PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
-          rowBytesUsed * 
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+      return new Pair<>(configuredTabletRowSize, rowBytesUsed * 
configuredTabletRowSize);
     } else {
       return new Pair<>(rowNumber, sizeLimit);
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 50109b935c3..91b6d7d56c5 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -36,6 +36,8 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionE
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -55,6 +57,7 @@ import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.BatchData;
 import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.read.common.TimeRange;
@@ -63,9 +66,11 @@ import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsFileGeneratorUtils;
 import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -84,6 +89,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -436,6 +442,172 @@ public class TsFileInsertionEventParserTest {
     }
   }
 
+  @Test
+  public void testScanParserMergesBatchedAlignedValueChunkGroups() throws 
Exception {
+    final long originalPipeMaxReaderChunkSize =
+        CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+    final int originalPipeDataStructureTabletRowSize =
+        
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+
+    final int measurementCount = 20;
+    final int batchSize = 10;
+    final int rowCount = 4;
+    final File sourceTsFile = new 
File("aligned-source-for-batched-layout.tsfile");
+    alignedTsFile = new File("aligned-batched-layout.tsfile");
+
+    try {
+      
CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(1024 * 
1024L);
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+
+      final List<IMeasurementSchema> schemaList = new ArrayList<>();
+      for (int i = 0; i < measurementCount; ++i) {
+        schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64));
+      }
+
+      writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount);
+      rewriteAlignedTsFileWithBatchedValueChunks(
+          sourceTsFile, alignedTsFile, measurementCount, batchSize);
+
+      int tabletCount = 0;
+      int pointCount = 0;
+      try (final TsFileInsertionEventScanParser parser =
+          new TsFileInsertionEventScanParser(
+              alignedTsFile,
+              new PrefixTreePattern("root"),
+              Long.MIN_VALUE,
+              Long.MAX_VALUE,
+              null,
+              null,
+              false)) {
+        for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
parser.toTabletWithIsAligneds()) {
+          Assert.assertTrue(tabletWithIsAligned.getRight());
+          final Tablet tablet = tabletWithIsAligned.getLeft();
+          ++tabletCount;
+          Assert.assertEquals(measurementCount, tablet.getSchemas().size());
+          Assert.assertEquals(rowCount / 2, tablet.getRowSize());
+          pointCount += getNonNullSize(tablet);
+        }
+      }
+
+      Assert.assertEquals(measurementCount * rowCount, pointCount);
+      Assert.assertEquals(2, tabletCount);
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+      sourceTsFile.delete();
+    }
+  }
+
+  @Test
+  public void 
testScanParserFlushesBatchedAlignedValueChunkGroupsByMemoryLimit() throws 
Exception {
+    final long originalPipeMaxReaderChunkSize =
+        CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+    final int originalPipeDataStructureTabletRowSize =
+        
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+
+    final int measurementCount = 20;
+    final int batchSize = 10;
+    final int rowCount = 4;
+    final File sourceTsFile = new 
File("aligned-source-for-batched-layout-memory-limit.tsfile");
+    alignedTsFile = new File("aligned-batched-layout-memory-limit.tsfile");
+
+    try {
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+
+      final List<IMeasurementSchema> schemaList = new ArrayList<>();
+      for (int i = 0; i < measurementCount; ++i) {
+        schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64));
+      }
+
+      writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount);
+      rewriteAlignedTsFileWithBatchedValueChunks(
+          sourceTsFile, alignedTsFile, measurementCount, batchSize);
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(
+              
calculateFirstBatchedAlignedValueChunkGroupMemoryLimit(alignedTsFile, 
batchSize));
+
+      int tabletCount = 0;
+      int maxMeasurementCount = 0;
+      int pointCount = 0;
+      try (final TsFileInsertionEventScanParser parser =
+          new TsFileInsertionEventScanParser(
+              alignedTsFile,
+              new PrefixTreePattern("root"),
+              Long.MIN_VALUE,
+              Long.MAX_VALUE,
+              null,
+              null,
+              false)) {
+        for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
parser.toTabletWithIsAligneds()) {
+          Assert.assertTrue(tabletWithIsAligned.getRight());
+          final Tablet tablet = tabletWithIsAligned.getLeft();
+          ++tabletCount;
+          maxMeasurementCount = Math.max(maxMeasurementCount, 
tablet.getSchemas().size());
+          Assert.assertTrue(tablet.getSchemas().size() <= batchSize);
+          Assert.assertEquals(rowCount / 2, tablet.getRowSize());
+          pointCount += getNonNullSize(tablet);
+        }
+      }
+
+      Assert.assertEquals(batchSize, maxMeasurementCount);
+      Assert.assertEquals(measurementCount * rowCount, pointCount);
+      Assert.assertEquals(measurementCount / batchSize * 2, tabletCount);
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+      sourceTsFile.delete();
+    }
+  }
+
+  @Test
+  public void testPipeTabletRowSizeCanBeDisabledByNonPositiveValue() {
+    final int originalPipeDataStructureTabletRowSize =
+        
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+    final int originalPipeDataStructureTabletSizeInBytes =
+        
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes();
+
+    try {
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(1024
 * 1024);
+
+      final BatchData batchData = new BatchData(TSDataType.INT64);
+      for (int i = 0; i < 1000; ++i) {
+        batchData.putAnObject(i, (long) i);
+      }
+
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(2);
+      final int rowCountWithLimit =
+          
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+      final int rowCountWithoutLimit =
+          
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(-1);
+      final int rowCountWithNegativeLimit =
+          
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+      Assert.assertEquals(rowCountWithoutLimit, rowCountWithNegativeLimit);
+      Assert.assertEquals(2, rowCountWithLimit);
+      Assert.assertTrue(rowCountWithoutLimit > rowCountWithLimit);
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletSizeInBytes(originalPipeDataStructureTabletSizeInBytes);
+    }
+  }
+
   @Test
   public void testQueryParserSkipsUnnecessaryBitMaps() throws Exception {
     testTreeParserSkipsUnnecessaryBitMaps(true);
@@ -1340,8 +1512,8 @@ public class TsFileInsertionEventParserTest {
         Assert.assertTrue(iterator.hasNext());
         Tablet parsedTablet = ((PipeRawTabletInsertionEvent) 
iterator.next()).convertToTablet();
         if (parsedTablet.getSchemas().size() > 1) {
-          assertBitMapExistence(parsedTablet, false, true);
-          Assert.assertTrue(parsedTablet.isNull(1, 1));
+          assertBitMapExistenceByMeasurement(parsedTablet, Map.of("dense", 
false, "sparse", true));
+          Assert.assertTrue(parsedTablet.isNull(1, 
getColumnIndex(parsedTablet, "sparse")));
           Assert.assertFalse(iterator.hasNext());
         } else {
           Assert.assertNull(parsedTablet.getBitMaps());
@@ -1373,6 +1545,33 @@ public class TsFileInsertionEventParserTest {
     }
   }
 
+  private void assertBitMapExistenceByMeasurement(
+      final Tablet tablet, final Map<String, Boolean> 
expectedMeasurementHasBitMap) {
+    final BitMap[] bitMaps = tablet.getBitMaps();
+    Assert.assertNotNull(bitMaps);
+    Assert.assertEquals(tablet.getSchemas().size(), bitMaps.length);
+    Assert.assertEquals(expectedMeasurementHasBitMap.size(), 
tablet.getSchemas().size());
+    for (int i = 0; i < tablet.getSchemas().size(); ++i) {
+      final String measurement = 
tablet.getSchemas().get(i).getMeasurementName();
+      Assert.assertTrue(expectedMeasurementHasBitMap.containsKey(measurement));
+      if (expectedMeasurementHasBitMap.get(measurement)) {
+        Assert.assertNotNull(bitMaps[i]);
+      } else {
+        Assert.assertNull(bitMaps[i]);
+      }
+    }
+  }
+
+  private int getColumnIndex(final Tablet tablet, final String measurement) {
+    for (int i = 0; i < tablet.getSchemas().size(); ++i) {
+      if (tablet.getSchemas().get(i).getMeasurementName().equals(measurement)) 
{
+        return i;
+      }
+    }
+    fail(String.format("Measurement %s does not exist in tablet.", 
measurement));
+    return -1;
+  }
+
   private void generateLargeAlignedTsFile(
       final File tsFile,
       final List<IMeasurementSchema> schemaList,
@@ -1887,6 +2086,146 @@ public class TsFileInsertionEventParserTest {
     }
   }
 
+  private long calculateFirstBatchedAlignedValueChunkGroupMemoryLimit(
+      final File tsFile, final int batchSize) throws Exception {
+    try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+      final IDeviceID deviceID = 
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+      final List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
+          reader.getAlignedChunkMetadata(deviceID, true);
+      Assert.assertEquals(2, alignedChunkMetadataList.size());
+
+      final AbstractAlignedChunkMetadata alignedChunkMetadata = 
alignedChunkMetadataList.get(0);
+      final Chunk timeChunk =
+          reader.readMemChunk((ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
+      final List<IChunkMetadata> valueChunkMetadataList =
+          alignedChunkMetadata.getValueChunkMetadataList();
+      Assert.assertTrue(valueChunkMetadataList.size() >= batchSize * 2);
+
+      final List<Chunk> firstValueChunkBatch = new ArrayList<>();
+      final List<Chunk> firstTwoValueChunkBatches = new ArrayList<>();
+      long firstBatchChunkSize = 
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+      long firstTwoBatchChunkSize = firstBatchChunkSize;
+      for (int index = 0; index < batchSize * 2; ++index) {
+        final Chunk valueChunk =
+            reader.readMemChunk((ChunkMetadata) 
valueChunkMetadataList.get(index));
+        if (index < batchSize) {
+          firstValueChunkBatch.add(valueChunk);
+          firstBatchChunkSize += valueChunk.getHeader().getDataSize();
+        }
+        firstTwoValueChunkBatches.add(valueChunk);
+        firstTwoBatchChunkSize += valueChunk.getHeader().getDataSize();
+      }
+
+      final long firstBatchPageMemorySize =
+          AlignedSinglePageWholeChunkReader
+              .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+                  timeChunk, firstValueChunkBatch);
+      final long firstTwoBatchPageMemorySize =
+          AlignedSinglePageWholeChunkReader
+              .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+                  timeChunk, firstTwoValueChunkBatches);
+      Assert.assertTrue(firstTwoBatchChunkSize > firstBatchChunkSize);
+      Assert.assertTrue(firstTwoBatchPageMemorySize > 
firstBatchPageMemorySize);
+      return Math.max(firstBatchChunkSize, firstBatchPageMemorySize);
+    }
+  }
+
+  private void writeAlignedSourceTsFile(
+      final File tsFile, final List<IMeasurementSchema> schemaList, final int 
rowCount)
+      throws IOException {
+    if (tsFile.exists()) {
+      Assert.assertTrue(tsFile.delete());
+    }
+    Assert.assertEquals(0, rowCount % 2);
+
+    final IDeviceID deviceID = 
IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg.d");
+    try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) {
+      writer.startChunkGroup(deviceID);
+      final int rowCountPerChunk = rowCount / 2;
+      for (int chunkIndex = 0; chunkIndex < 2; ++chunkIndex) {
+        final AlignedChunkWriterImpl alignedChunkWriter = new 
AlignedChunkWriterImpl(schemaList);
+        for (int row = 0; row < rowCountPerChunk; ++row) {
+          final long time = (long) chunkIndex * rowCountPerChunk + row;
+          alignedChunkWriter.getTimeChunkWriter().write(time);
+          for (int measurementIndex = 0; measurementIndex < schemaList.size(); 
++measurementIndex) {
+            alignedChunkWriter
+                .getValueChunkWriterByIndex(measurementIndex)
+                .write(time, time * 100 + measurementIndex, false);
+          }
+        }
+        alignedChunkWriter.writeToFileWriter(writer);
+      }
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+  }
+
+  private void rewriteAlignedTsFileWithBatchedValueChunks(
+      final File sourceTsFile,
+      final File targetTsFile,
+      final int measurementCount,
+      final int batchSize)
+      throws Exception {
+    if (targetTsFile.exists()) {
+      Assert.assertTrue(targetTsFile.delete());
+    }
+
+    try (final TsFileSequenceReader reader =
+        new TsFileSequenceReader(sourceTsFile.getAbsolutePath())) {
+      final IDeviceID deviceID = 
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+      final List<AbstractAlignedChunkMetadata> sourceAlignedChunkMetadataList =
+          reader.getAlignedChunkMetadata(deviceID, true);
+      Assert.assertEquals(2, sourceAlignedChunkMetadataList.size());
+      for (final AbstractAlignedChunkMetadata sourceAlignedChunkMetadata :
+          sourceAlignedChunkMetadataList) {
+        Assert.assertEquals(
+            measurementCount, 
sourceAlignedChunkMetadata.getValueChunkMetadataList().size());
+      }
+
+      try (final CompactionTsFileWriter writer =
+          new CompactionTsFileWriter(
+              targetTsFile, Long.MAX_VALUE, 
CompactionType.INNER_SEQ_COMPACTION)) {
+        writer.startChunkGroup(deviceID);
+        writer.markStartingWritingAligned();
+        for (final AbstractAlignedChunkMetadata sourceAlignedChunkMetadata :
+            sourceAlignedChunkMetadataList) {
+          final ChunkMetadata timeChunkMetadata =
+              (ChunkMetadata) 
sourceAlignedChunkMetadata.getTimeChunkMetadata();
+          writer.writeChunk(reader.readMemChunk(timeChunkMetadata), 
timeChunkMetadata);
+        }
+
+        for (int start = 0; start < measurementCount; start += batchSize) {
+          writeValueChunkBatch(
+              reader,
+              writer,
+              sourceAlignedChunkMetadataList,
+              start,
+              Math.min(start + batchSize, measurementCount));
+        }
+        writer.markEndingWritingAligned();
+        writer.endChunkGroup();
+        writer.endFile();
+      }
+    }
+  }
+
+  private void writeValueChunkBatch(
+      final TsFileSequenceReader reader,
+      final CompactionTsFileWriter writer,
+      final List<AbstractAlignedChunkMetadata> alignedChunkMetadataList,
+      final int start,
+      final int end)
+      throws IOException {
+    for (final AbstractAlignedChunkMetadata alignedChunkMetadata : 
alignedChunkMetadataList) {
+      final List<IChunkMetadata> valueChunkMetadataList =
+          alignedChunkMetadata.getValueChunkMetadataList();
+      for (int index = start; index < end; ++index) {
+        final ChunkMetadata valueChunkMetadata = (ChunkMetadata) 
valueChunkMetadataList.get(index);
+        writer.writeChunk(reader.readMemChunk(valueChunkMetadata), 
valueChunkMetadata);
+      }
+    }
+  }
+
   private static class ParserPerformanceStats {
 
     private long pointCount;

Reply via email to