This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 65d2582cb05 Pipe: merge batched aligned chunks in scan parser (#18010) 
(#18041)
65d2582cb05 is described below

commit 65d2582cb05d979d2d32d772db6d7059b9dbbd21
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 30 10:18:03 2026 +0800

    Pipe: merge batched aligned chunks in scan parser (#18010) (#18041)
    
    * Pipe: merge batched aligned chunks in scan parser
    
    * Test pipe batched aligned chunk memory boundaries
    
    * Pipe: fix batched aligned scan parser memory split
    
    * Update TsFileInsertionEventParserTest.java
    
    * Rename pending aligned chunk consumer
    
    (cherry picked from commit f96fc5824f62b4c637c0d9b5e9ea4adc9f8b1853)
---
 .../container/scan/SinglePageWholeChunkReader.java |  33 +-
 .../scan/TsFileInsertionScanDataContainer.java     | 527 +++++++++++++--------
 .../pipe/resource/memory/PipeMemoryWeightUtil.java |  18 +-
 .../event/TsFileInsertionDataContainerTest.java    | 312 ++++++++++++
 4 files changed, 691 insertions(+), 199 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
index f41a6861120..1f741ddfc70 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
 
+import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.compress.IUnCompressor;
 import org.apache.tsfile.encoding.decoder.Decoder;
 import org.apache.tsfile.enums.TSDataType;
@@ -159,22 +160,44 @@ public class SinglePageWholeChunkReader extends 
AbstractChunkReader
   static long estimatePageMemoryUsageInBytesWithBatchData(
       final PageHeader timePageHeader,
       final Chunk timeChunk,
-      final List<TSDataType> valueDataTypeList) {
+      final List<TSDataType> valueDataTypeList)
+      throws IOException {
     return estimatePageMemoryUsageInBytesWithBatchData(
         timePageHeader.getUncompressedSize(),
         getPageRowCount(timePageHeader, timeChunk),
         valueDataTypeList);
   }
 
-  static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) {
+  static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) 
throws IOException {
     if (isSinglePageChunk(chunk.getHeader())) {
-      return Objects.isNull(chunk.getChunkStatistic())
-          ? 0
-          : saturateToInt(chunk.getChunkStatistic().getCount());
+      if (Objects.nonNull(chunk.getChunkStatistic())) {
+        return saturateToInt(chunk.getChunkStatistic().getCount());
+      }
+      return isTimeChunk(chunk.getHeader()) ? countSinglePageTimeValues(chunk) 
: 0;
     }
     return saturateToInt(pageHeader.getNumOfValues());
   }
 
+  private static int countSinglePageTimeValues(final Chunk chunk) throws 
IOException {
+    final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+    final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer, 
chunk.getHeader());
+    final ByteBuffer pageData = deserializePageData(pageHeader, 
chunkDataBuffer, chunk.getHeader());
+    final Decoder decoder =
+        Decoder.getDecoderByType(chunk.getHeader().getEncodingType(), 
TSDataType.INT64);
+
+    int rowCount = 0;
+    while (decoder.hasNext(pageData)) {
+      decoder.readLong(pageData);
+      ++rowCount;
+    }
+    return rowCount;
+  }
+
+  private static boolean isTimeChunk(final ChunkHeader chunkHeader) {
+    return (chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+        == TsFileConstant.TIME_COLUMN_MASK;
+  }
+
   private static int saturateToInt(final long value) {
     return (int) Math.min(Integer.MAX_VALUE, value);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 6c3d341f774..6a0cfa4f3c3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -67,6 +67,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -103,11 +104,11 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
   // Cached time chunk
   private final List<Chunk> timeChunkList = new ArrayList<>();
   private final List<Boolean> isMultiPageList = new ArrayList<>();
-  private final List<Long> timeChunkPageMemorySizeList = new ArrayList<>();
 
   private final Map<String, Integer> measurementIndexMap = new HashMap<>();
-  private int lastIndex = -1;
-  private Chunk firstChunk4NextSequentialValueChunks;
+  private final List<PendingAlignedChunkGroup> pendingAlignedChunkGroups = new 
ArrayList<>();
+  private long pendingAlignedChunkSize;
+  private CachedAlignedValueChunk cachedAlignedValueChunk;
 
   private byte lastMarker = Byte.MIN_VALUE;
 
@@ -521,14 +522,14 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
 
   private void moveToNextChunkReader() throws IOException, 
IllegalStateException {
     ChunkHeader chunkHeader;
-    long valueChunkSize = 0;
-    long valueChunkPageMemorySize = 0;
-    final List<Chunk> valueChunkList = new ArrayList<>();
     currentMeasurements.clear();
     modsInfos.clear();
 
     if (lastMarker == MetaMarker.SEPARATOR) {
-      chunkReader = null;
+      if (!useNextPendingAlignedChunk(lastMarker)) {
+        clearCachedAlignedChunkData();
+        chunkReader = null;
+      }
       return;
     }
 
@@ -536,8 +537,8 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     while ((marker =
             lastMarker != Byte.MIN_VALUE
                 ? lastMarker
-                : Objects.nonNull(firstChunk4NextSequentialValueChunks)
-                    ? 
toValueChunkMarker(firstChunk4NextSequentialValueChunks.getHeader())
+                : Objects.nonNull(cachedAlignedValueChunk)
+                    ? 
toValueChunkMarker(cachedAlignedValueChunk.chunk.getHeader())
                     : tsFileSequenceReader.readMarker())
         != MetaMarker.SEPARATOR) {
       lastMarker = Byte.MIN_VALUE;
@@ -550,70 +551,19 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
             // Notice that the data in one chunk group is either aligned or 
non-aligned
             // There is no need to consider non-aligned chunks when there are 
value chunks
             currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER;
-            long currentChunkHeaderOffset = tsFileSequenceReader.position() - 
1;
+            final long currentChunkHeaderOffset = 
tsFileSequenceReader.position() - 1;
             chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
-            final long nextMarkerOffset =
-                tsFileSequenceReader.position() + chunkHeader.getDataSize();
-
-            if (Objects.isNull(currentDevice)) {
-              tsFileSequenceReader.position(nextMarkerOffset);
-              break;
-            }
-
-            if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
-                == TsFileConstant.TIME_COLUMN_MASK) {
-              final Chunk timeChunk =
-                  new Chunk(
-                      chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
-              final boolean isMultiPage = marker == 
MetaMarker.TIME_CHUNK_HEADER;
-              timeChunkList.add(timeChunk);
-              isMultiPageList.add(isMultiPage);
-              timeChunkPageMemorySizeList.add(
-                  
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(
-                      timeChunk));
-              break;
-            }
 
-            if (!pattern.matchesMeasurement(currentDevice, 
chunkHeader.getMeasurementID())) {
-              tsFileSequenceReader.position(nextMarkerOffset);
+            if (filterChunk(currentChunkHeaderOffset, chunkHeader, false, 
marker)) {
               break;
             }
 
-            // Skip the chunk if it is fully deleted by mods
-            if (!currentModifications.isEmpty()) {
-              Statistics statistics = null;
-              try {
-                statistics =
-                    findNonAlignedChunkStatistics(
-                        tsFileSequenceReader.getIChunkMetadataList(
-                            CompactionPathUtils.getPath(
-                                currentDevice, 
chunkHeader.getMeasurementID())),
-                        currentChunkHeaderOffset);
-              } catch (IllegalPathException ignore) {
-                LOGGER.warn(
-                    "Failed to get chunk metadata for {}.{}",
-                    currentDevice,
-                    chunkHeader.getMeasurementID());
-              }
-
-              if (statistics != null
-                  && ModsOperationUtil.isAllDeletedByMods(
-                      currentDevice,
-                      chunkHeader.getMeasurementID(),
-                      statistics.getStartTime(),
-                      statistics.getEndTime(),
-                      currentModifications)) {
-                tsFileSequenceReader.position(nextMarkerOffset);
-                break;
-              }
-            }
-
             if (chunkHeader.getDataSize() > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
               PipeDataNodeResourceManager.memory()
                   .forceResize(allocatedMemoryBlockForChunk, 
chunkHeader.getDataSize());
             }
 
-            Chunk chunk =
+            final Chunk chunk =
                 new Chunk(
                     chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
             final List<Long> pageEstimatedMemoryUsageInBytesList =
@@ -638,49 +588,16 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
         case MetaMarker.VALUE_CHUNK_HEADER:
         case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
           {
-            Chunk chunk;
-            long currentValueChunkPageMemorySize = 0;
-            if (Objects.isNull(firstChunk4NextSequentialValueChunks)) {
+            CachedAlignedValueChunk valueChunk = cachedAlignedValueChunk;
+            if (Objects.isNull(valueChunk)) {
               final long currentChunkHeaderOffset = 
tsFileSequenceReader.position() - 1;
               chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
 
-              final long nextMarkerOffset =
-                  tsFileSequenceReader.position() + chunkHeader.getDataSize();
-              if (Objects.isNull(currentDevice)
-                  || !pattern.matchesMeasurement(currentDevice, 
chunkHeader.getMeasurementID())) {
-                tsFileSequenceReader.position(nextMarkerOffset);
+              if (filterChunk(currentChunkHeaderOffset, chunkHeader, true, 
marker)) {
                 break;
               }
 
-              if (!currentModifications.isEmpty()) {
-                // Skip the chunk if it is fully deleted by mods
-                Statistics statistics = null;
-                try {
-                  statistics =
-                      findAlignedChunkStatistics(
-                          tsFileSequenceReader.getIChunkMetadataList(
-                              CompactionPathUtils.getPath(
-                                  currentDevice, 
chunkHeader.getMeasurementID())),
-                          currentChunkHeaderOffset);
-                } catch (IllegalPathException ignore) {
-                  LOGGER.warn(
-                      "Failed to get chunk metadata for {}.{}",
-                      currentDevice,
-                      chunkHeader.getMeasurementID());
-                }
-                if (statistics != null
-                    && ModsOperationUtil.isAllDeletedByMods(
-                        currentDevice,
-                        chunkHeader.getMeasurementID(),
-                        statistics.getStartTime(),
-                        statistics.getEndTime(),
-                        currentModifications)) {
-                  tsFileSequenceReader.position(nextMarkerOffset);
-                  break;
-                }
-              }
-
-              // Increase value index
+              // Increase value index.
               final String measurementID =
                   
tabletStringInternPool.intern(chunkHeader.getMeasurementID());
               final int valueIndex =
@@ -688,86 +605,35 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
                       measurementID,
                       (measurement, index) -> Objects.nonNull(index) ? index + 
1 : 0);
 
-              // Emit when encountered non-sequential value chunk, or the 
chunk size exceeds
-              // certain value to avoid OOM
-              // Do not record or end current value chunks when there are 
empty chunks
+              // Do not record or end current value chunks when there are 
empty chunks.
               if (chunkHeader.getDataSize() == 0) {
                 break;
               }
-              chunk =
+              final Chunk chunk =
                   new Chunk(
                       chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
-              currentValueChunkPageMemorySize = 
calculateMaxPageMemorySize(chunk);
-              boolean needReturn = false;
-              final long timeChunkSize =
-                  lastIndex >= 0
-                      ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
-                          timeChunkList.get(lastIndex))
-                      : 0;
-              final long timeChunkPageMemorySize =
-                  lastIndex >= 0 ? timeChunkPageMemorySizeList.get(lastIndex) 
: 0;
-              if (lastIndex >= 0) {
-                if (valueIndex != lastIndex) {
-                  needReturn = recordAlignedChunk(valueChunkList, marker);
-                } else {
-                  final long chunkSize = timeChunkSize + valueChunkSize;
-                  final long pageMemorySize = timeChunkPageMemorySize + 
valueChunkPageMemorySize;
-                  if (chunkSize + chunkHeader.getDataSize()
-                          > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()
-                      || timeChunkPageMemorySize > 0
-                          && currentValueChunkPageMemorySize > 0
-                          && pageMemorySize + currentValueChunkPageMemorySize
-                              > getPageDataMemoryLimitInBytes()) {
-                    needReturn = recordAlignedChunk(valueChunkList, marker);
-                  }
-                }
-              }
-              lastIndex = valueIndex;
-              if (needReturn) {
-                firstChunk4NextSequentialValueChunks = chunk;
-                return;
-              }
-              
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
-              resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
-                  valueChunkList, currentValueChunkPageMemorySize);
+              valueChunk =
+                  new CachedAlignedValueChunk(valueIndex, chunk, 
chunkHeader.getDataSize());
             } else {
-              chunk = firstChunk4NextSequentialValueChunks;
-              chunkHeader = chunk.getHeader();
-              firstChunk4NextSequentialValueChunks = null;
-              currentValueChunkPageMemorySize = 
calculateMaxPageMemorySize(chunk);
-              
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
-              resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
-                  valueChunkList, currentValueChunkPageMemorySize);
+              cachedAlignedValueChunk = null;
             }
 
-            valueChunkSize += chunkHeader.getDataSize();
-            valueChunkPageMemorySize += currentValueChunkPageMemorySize;
-            valueChunkList.add(chunk);
-            final String measurementID =
-                tabletStringInternPool.intern(chunkHeader.getMeasurementID());
-            currentMeasurements.add(
-                new MeasurementSchema(measurementID, 
chunkHeader.getDataType()));
-            modsInfos.addAll(
-                ModsOperationUtil.initializeMeasurementMods(
-                    currentDevice, Collections.singletonList(measurementID), 
currentModifications));
+            if (returnPendingAlignedChunkBeforeCaching(valueChunk)) {
+              return;
+            }
+            cacheAlignedValueChunk(valueChunk);
             break;
           }
         case MetaMarker.CHUNK_GROUP_HEADER:
           {
-            // Return before "currentDevice" changes
-            if (recordAlignedChunk(valueChunkList, marker)) {
+            // Return before "currentDevice" changes.
+            if (useNextPendingAlignedChunk(marker)) {
               return;
             }
+            clearCachedAlignedChunkData();
             final String deviceID =
                 ((PlainDeviceID) 
tsFileSequenceReader.readChunkGroupHeader().getDeviceID())
                     .toStringID();
-            // Clear because the cached data will never be used in the next 
chunk group
-            lastIndex = -1;
-            timeChunkList.clear();
-            isMultiPageList.clear();
-            timeChunkPageMemorySizeList.clear();
-            measurementIndexMap.clear();
-
             currentDevice =
                 pattern.mayOverlapWithDevice(deviceID)
                     ? tabletStringInternPool.intern(deviceID)
@@ -785,7 +651,8 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     }
 
     lastMarker = marker;
-    if (!recordAlignedChunk(valueChunkList, marker)) {
+    if (!useNextPendingAlignedChunk(marker)) {
+      clearCachedAlignedChunkData();
       chunkReader = null;
     }
   }
@@ -794,22 +661,106 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     return PipeConfig.getInstance().getPipeMaxReaderChunkSize();
   }
 
-  private boolean recordAlignedChunk(final List<Chunk> valueChunkList, final 
byte marker)
+  private long getChunkMemoryLimitInBytes() {
+    return PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+  }
+
+  private boolean filterChunk(
+      final long currentChunkHeaderOffset,
+      final ChunkHeader chunkHeader,
+      final boolean isAlignedValueChunk,
+      final byte marker)
       throws IOException {
-    if (!valueChunkList.isEmpty()) {
-      final Chunk timeChunk = timeChunkList.get(lastIndex);
+    final long nextMarkerOffset = tsFileSequenceReader.position() + 
chunkHeader.getDataSize();
+
+    if (Objects.isNull(currentDevice)) {
+      tsFileSequenceReader.position(nextMarkerOffset);
+      return true;
+    }
+
+    if (!isAlignedValueChunk) {
+      if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+          == TsFileConstant.TIME_COLUMN_MASK) {
+        final Chunk timeChunk =
+            new Chunk(chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
+        timeChunkList.add(timeChunk);
+        final boolean isMultiPage = marker == MetaMarker.TIME_CHUNK_HEADER;
+        isMultiPageList.add(isMultiPage);
+        return true;
+      }
+    }
+
+    if (!pattern.matchesMeasurement(currentDevice, 
chunkHeader.getMeasurementID())) {
+      tsFileSequenceReader.position(nextMarkerOffset);
+      return true;
+    }
+
+    // Skip the chunk if it is fully deleted by mods
+    if (!currentModifications.isEmpty()) {
+      Statistics statistics = null;
+      try {
+        statistics =
+            isAlignedValueChunk
+                ? findAlignedChunkStatistics(
+                    tsFileSequenceReader.getIChunkMetadataList(
+                        CompactionPathUtils.getPath(currentDevice, 
chunkHeader.getMeasurementID())),
+                    currentChunkHeaderOffset)
+                : findNonAlignedChunkStatistics(
+                    tsFileSequenceReader.getIChunkMetadataList(
+                        CompactionPathUtils.getPath(currentDevice, 
chunkHeader.getMeasurementID())),
+                    currentChunkHeaderOffset);
+      } catch (IllegalPathException ignore) {
+        LOGGER.warn(
+            "Failed to get chunk metadata for {}.",
+            currentDevice + "." + chunkHeader.getMeasurementID());
+      }
+
+      if (statistics != null
+          && ModsOperationUtil.isAllDeletedByMods(
+              currentDevice,
+              chunkHeader.getMeasurementID(),
+              statistics.getStartTime(),
+              statistics.getEndTime(),
+              currentModifications)) {
+        tsFileSequenceReader.position(nextMarkerOffset);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean useNextPendingAlignedChunk(final byte marker) throws 
IOException {
+    while (!pendingAlignedChunkGroups.isEmpty()) {
+      final PendingAlignedChunkGroup pendingAlignedChunkGroup = 
pendingAlignedChunkGroups.remove(0);
+      pendingAlignedChunkSize =
+          Math.max(0, pendingAlignedChunkSize - 
pendingAlignedChunkGroup.chunkSize);
+
+      if (pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
+        continue;
+      }
+
+      final Chunk timeChunk = 
timeChunkList.get(pendingAlignedChunkGroup.timeChunkIndex);
       timeChunk.getData().rewind();
-      currentIsMultiPage = isMultiPageList.get(lastIndex);
+      for (final Chunk valueChunk : pendingAlignedChunkGroup.valueChunkList) {
+        valueChunk.getData().rewind();
+      }
+
+      currentMeasurements.clear();
+      currentMeasurements.addAll(pendingAlignedChunkGroup.measurements);
+      modsInfos.clear();
+      modsInfos.addAll(pendingAlignedChunkGroup.modsInfos);
+
+      currentIsMultiPage = 
isMultiPageList.get(pendingAlignedChunkGroup.timeChunkIndex);
       if (!currentIsMultiPage) {
         resizePageDataMemoryIfNeeded(
             
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
-                timeChunk, valueChunkList));
+                timeChunk, pendingAlignedChunkGroup.valueChunkList));
       }
       final List<Long> pageEstimatedMemoryUsageInBytesList =
           currentIsMultiPage
               ? AlignedSinglePageWholeChunkReader
                   .calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
-                      timeChunk, valueChunkList)
+                      timeChunk, pendingAlignedChunkGroup.valueChunkList)
               : Collections.emptyList();
       final long maxPageEstimatedMemoryUsageInBytes =
           pageEstimatedMemoryUsageInBytesList.isEmpty()
@@ -819,50 +770,222 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
       chunkReader =
           currentIsMultiPage
               ? new MemoryControlledChunkReader(
-                  new AlignedChunkReader(timeChunk, valueChunkList, filter),
+                  new AlignedChunkReader(
+                      timeChunk, pendingAlignedChunkGroup.valueChunkList, 
filter),
                   pageEstimatedMemoryUsageInBytesList)
-              : new AlignedSinglePageWholeChunkReader(timeChunk, 
valueChunkList);
+              : new AlignedSinglePageWholeChunkReader(
+                  timeChunk, pendingAlignedChunkGroup.valueChunkList);
       currentIsAligned = true;
-      lastMarker = marker;
+      if (marker != Byte.MIN_VALUE) {
+        lastMarker = marker;
+      }
+      return true;
+    }
+    return false;
+  }
+
+  private boolean shouldReturnPendingAlignedChunkBeforeCaching(
+      final CachedAlignedValueChunk valueChunk) throws IOException {
+    validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex);
+
+    final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+        findPendingAlignedChunkGroup(valueChunk.timeChunkIndex);
+    final boolean isFirstValueChunkInGroup =
+        Objects.isNull(pendingAlignedChunkGroup)
+            || pendingAlignedChunkGroup.valueChunkList.isEmpty();
+    final long timeChunkSize =
+        Objects.isNull(pendingAlignedChunkGroup)
+            ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
+                timeChunkList.get(valueChunk.timeChunkIndex))
+            : 0;
+    final long chunkSizeAfterCaching =
+        pendingAlignedChunkSize + timeChunkSize + valueChunk.valueChunkSize;
+
+    if (isFirstValueChunkInGroup) {
+      final long firstValueChunkGroupSize =
+          timeChunkSize
+              + (Objects.isNull(pendingAlignedChunkGroup) ? 0 : 
pendingAlignedChunkGroup.chunkSize)
+              + valueChunk.valueChunkSize;
+      if (firstValueChunkGroupSize > getChunkMemoryLimitInBytes()) {
+        return !pendingAlignedChunkGroups.isEmpty();
+      }
+    }
+
+    if (!pendingAlignedChunkGroups.isEmpty()
+        && chunkSizeAfterCaching > getChunkMemoryLimitInBytes()) {
+      return true;
+    }
+
+    final long pageMemorySizeAfterCaching =
+        calculateMaxAlignedPageMemorySizeWithBatchData(
+            valueChunk.timeChunkIndex, pendingAlignedChunkGroup, valueChunk);
+    return pageMemorySizeAfterCaching > getPageDataMemoryLimitInBytes()
+        && (!isFirstValueChunkInGroup || !pendingAlignedChunkGroups.isEmpty());
+  }
+
+  private boolean returnPendingAlignedChunkBeforeCaching(final 
CachedAlignedValueChunk valueChunk)
+      throws IOException {
+    if (!shouldReturnPendingAlignedChunkBeforeCaching(valueChunk)) {
+      return false;
+    }
+
+    cachedAlignedValueChunk = valueChunk;
+    if (useNextPendingAlignedChunk(Byte.MIN_VALUE)) {
       return true;
     }
+    cachedAlignedValueChunk = null;
     return false;
   }
 
+  private void cacheAlignedValueChunk(final CachedAlignedValueChunk 
valueChunk) throws IOException {
+    validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex);
+
+    final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+        getOrCreatePendingAlignedChunkGroup(valueChunk.timeChunkIndex);
+    
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup, 
valueChunk);
+    
resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup,
 valueChunk);
+
+    pendingAlignedChunkGroup.valueChunkList.add(valueChunk.chunk);
+    pendingAlignedChunkGroup.chunkSize += valueChunk.valueChunkSize;
+    pendingAlignedChunkSize += valueChunk.valueChunkSize;
+
+    final ChunkHeader chunkHeader = valueChunk.chunk.getHeader();
+    final String measurementID = 
tabletStringInternPool.intern(chunkHeader.getMeasurementID());
+    pendingAlignedChunkGroup.measurements.add(
+        new MeasurementSchema(measurementID, chunkHeader.getDataType()));
+    pendingAlignedChunkGroup.modsInfos.addAll(
+        ModsOperationUtil.initializeMeasurementMods(
+            currentDevice, Collections.singletonList(measurementID), 
currentModifications));
+  }
+
+  private PendingAlignedChunkGroup getOrCreatePendingAlignedChunkGroup(final 
int timeChunkIndex) {
+    final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+        findPendingAlignedChunkGroup(timeChunkIndex);
+    if (Objects.nonNull(pendingAlignedChunkGroup)) {
+      return pendingAlignedChunkGroup;
+    }
+
+    final PendingAlignedChunkGroup newPendingAlignedChunkGroup =
+        new PendingAlignedChunkGroup(
+            timeChunkIndex,
+            
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(timeChunkIndex)));
+    pendingAlignedChunkSize += newPendingAlignedChunkGroup.chunkSize;
+
+    for (int i = 0; i < pendingAlignedChunkGroups.size(); ++i) {
+      if (pendingAlignedChunkGroups.get(i).timeChunkIndex > timeChunkIndex) {
+        pendingAlignedChunkGroups.add(i, newPendingAlignedChunkGroup);
+        return newPendingAlignedChunkGroup;
+      }
+    }
+    pendingAlignedChunkGroups.add(newPendingAlignedChunkGroup);
+    return newPendingAlignedChunkGroup;
+  }
+
+  private PendingAlignedChunkGroup findPendingAlignedChunkGroup(final int 
timeChunkIndex) {
+    for (final PendingAlignedChunkGroup pendingAlignedChunkGroup : 
pendingAlignedChunkGroups) {
+      if (pendingAlignedChunkGroup.timeChunkIndex == timeChunkIndex) {
+        return pendingAlignedChunkGroup;
+      }
+    }
+    return null;
+  }
+
+  private void validateAlignedValueChunkTimeIndex(final int timeChunkIndex) 
throws IOException {
+    if (timeChunkIndex < 0 || timeChunkIndex >= timeChunkList.size()) {
+      throw new IOException(
+          String.format(
+              "Invalid aligned value chunk index %d, while there are %d time 
chunks.",
+              timeChunkIndex, timeChunkList.size()));
+    }
+  }
+
+  private void clearCachedAlignedChunkData() {
+    pendingAlignedChunkGroups.clear();
+    pendingAlignedChunkSize = 0;
+    cachedAlignedValueChunk = null;
+    timeChunkList.clear();
+    isMultiPageList.clear();
+    measurementIndexMap.clear();
+  }
+
   private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(
-      final List<Chunk> valueChunkList, final ChunkHeader valueChunkHeader) {
-    if (!valueChunkList.isEmpty() || lastIndex < 0) {
+      final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+      final CachedAlignedValueChunk valueChunk) {
+    if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
       return;
     }
 
-    final long chunkSize =
-        
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
-            + valueChunkHeader.getDataSize();
+    final long chunkSize = pendingAlignedChunkGroup.chunkSize + 
valueChunk.valueChunkSize;
     if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
       
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
chunkSize);
     }
   }
 
   private void resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
-      final List<Chunk> valueChunkList, final long valueChunkPageMemorySize) {
-    if (!valueChunkList.isEmpty() || lastIndex < 0 || valueChunkPageMemorySize 
<= 0) {
-      return;
-    }
-
-    final long timeChunkPageMemorySize = 
timeChunkPageMemorySizeList.get(lastIndex);
-    if (timeChunkPageMemorySize <= 0) {
+      final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+      final CachedAlignedValueChunk valueChunk)
+      throws IOException {
+    if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
       return;
     }
 
-    final long pageMemorySize = timeChunkPageMemorySize + 
valueChunkPageMemorySize;
+    final long pageMemorySize =
+        calculateMaxAlignedPageMemorySizeWithBatchData(
+            pendingAlignedChunkGroup.timeChunkIndex, pendingAlignedChunkGroup, 
valueChunk);
     if (pageMemorySize > getPageDataMemoryLimitInBytes()) {
       PipeDataNodeResourceManager.memory()
           .forceResize(allocatedMemoryBlockForBatchData, pageMemorySize);
     }
   }
 
-  private long calculateMaxPageMemorySize(final Chunk chunk) throws 
IOException {
-    return 
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(chunk);
+  private long calculateMaxAlignedPageMemorySizeWithBatchData(
+      final int timeChunkIndex,
+      final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+      final CachedAlignedValueChunk valueChunk)
+      throws IOException {
+    final List<Chunk> valueChunkList =
+        new ArrayList<>(
+            (Objects.isNull(pendingAlignedChunkGroup)
+                    ? 0
+                    : pendingAlignedChunkGroup.valueChunkList.size())
+                + 1);
+    if (Objects.nonNull(pendingAlignedChunkGroup)) {
+      valueChunkList.addAll(pendingAlignedChunkGroup.valueChunkList);
+    }
+    valueChunkList.add(valueChunk.chunk);
+
+    final Chunk timeChunk = timeChunkList.get(timeChunkIndex);
+    final int timeChunkDataPosition = timeChunk.getData().position();
+    final List<Integer> valueChunkDataPositions = new 
ArrayList<>(valueChunkList.size());
+    for (final Chunk chunk : valueChunkList) {
+      valueChunkDataPositions.add(Objects.isNull(chunk) ? 0 : 
chunk.getData().position());
+    }
+
+    rewindChunkData(timeChunk);
+    valueChunkList.forEach(this::rewindChunkData);
+    try {
+      return AlignedSinglePageWholeChunkReader
+          .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(timeChunk, 
valueChunkList);
+    } finally {
+      timeChunk.getData().position(timeChunkDataPosition);
+      for (int i = 0; i < valueChunkList.size(); ++i) {
+        final Chunk chunk = valueChunkList.get(i);
+        if (Objects.nonNull(chunk)) {
+          chunk.getData().position(valueChunkDataPositions.get(i));
+        }
+      }
+    }
+  }
+
+  private void rewindChunkData(final Chunk chunk) {
+    if (Objects.isNull(chunk)) {
+      return;
+    }
+
+    final ByteBuffer chunkData = chunk.getData();
+    if (Objects.nonNull(chunkData)) {
+      chunkData.rewind();
+    }
   }
 
   private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
@@ -915,4 +1038,32 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     }
     return null;
   }
+
+  private static class PendingAlignedChunkGroup {
+
+    private final int timeChunkIndex;
+    private final List<Chunk> valueChunkList = new ArrayList<>();
+    private final List<MeasurementSchema> measurements = new ArrayList<>();
+    private final List<ModsOperationUtil.ModsInfo> modsInfos = new 
ArrayList<>();
+    private long chunkSize;
+
+    private PendingAlignedChunkGroup(final int timeChunkIndex, final long 
timeChunkSize) {
+      this.timeChunkIndex = timeChunkIndex;
+      this.chunkSize = timeChunkSize;
+    }
+  }
+
+  private static class CachedAlignedValueChunk {
+
+    private final int timeChunkIndex;
+    private final Chunk chunk;
+    private final long valueChunkSize;
+
+    private CachedAlignedValueChunk(
+        final int timeChunkIndex, final Chunk chunk, final long 
valueChunkSize) {
+      this.timeChunkIndex = timeChunkIndex;
+      this.chunk = chunk;
+      this.valueChunkSize = valueChunkSize;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index a22522666c2..b02cff26256 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -179,6 +179,14 @@ public class PipeMemoryWeightUtil {
       return new Pair<>(1, 0);
     }
 
+    final int configuredTabletRowSize =
+        PipeConfig.getInstance().getPipeDataStructureTabletRowSize();
+    final boolean hasTabletRowSizeLimit = configuredTabletRowSize > 0;
+    final double inputSizeLimit =
+        hasTabletRowSizeLimit && inputNum > 0
+            ? 100 + inputNum * (double) rowBytesUsed * 1.2
+            : Integer.MAX_VALUE;
+
     // Calculate row number according to the max size of a pipe tablet. "100" 
is the estimated size
     // of other data structures in a pipe tablet.
     // "*8" converts bytes to bits, because the bitmap size is 1 bit per 
schema.
@@ -186,17 +194,15 @@ public class PipeMemoryWeightUtil {
         (int)
             Math.min(
                 
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
-                Math.min(Integer.MAX_VALUE, 100 + inputNum * (double) 
rowBytesUsed * 1.2));
+                Math.min(Integer.MAX_VALUE, inputSizeLimit));
 
     int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
     rowNumber = Math.max(1, rowNumber);
 
-    if ( // This means the row number is larger than the max row count of a 
pipe tablet
-    rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) {
+    // This means the row number is larger than the max row count of a pipe 
tablet.
+    if (hasTabletRowSizeLimit && rowNumber > configuredTabletRowSize) {
       // Bound the row number, the memory cost is rowSize * rowNumber
-      return new Pair<>(
-          PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
-          rowBytesUsed * 
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+      return new Pair<>(configuredTabletRowSize, rowBytesUsed * 
configuredTabletRowSize);
     } else {
       return new Pair<>(rowNumber, sizeLimit);
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 36f56e0e606..55a27348e3e 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -31,6 +31,8 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.SinglePageWho
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -49,6 +51,7 @@ import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.BatchData;
 import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.read.common.TimeRange;
@@ -56,8 +59,11 @@ import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsFileGeneratorUtils;
 import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -422,6 +428,172 @@ public class TsFileInsertionDataContainerTest {
     }
   }
 
+  @Test
+  public void testScanParserMergesBatchedAlignedValueChunkGroups() throws 
Exception {
+    final long originalPipeMaxReaderChunkSize =
+        CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+    final int originalPipeDataStructureTabletRowSize =
+        
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+
+    final int measurementCount = 20;
+    final int batchSize = 10;
+    final int rowCount = 4;
+    final File sourceTsFile = new 
File("aligned-source-for-batched-layout.tsfile");
+    alignedTsFile = new File("aligned-batched-layout.tsfile");
+
+    try {
+      
CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(1024 * 
1024L);
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+
+      final List<MeasurementSchema> schemaList = new ArrayList<>();
+      for (int i = 0; i < measurementCount; ++i) {
+        schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64));
+      }
+
+      writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount);
+      rewriteAlignedTsFileWithBatchedValueChunks(
+          sourceTsFile, alignedTsFile, measurementCount, batchSize);
+
+      int tabletCount = 0;
+      int pointCount = 0;
+      try (final TsFileInsertionScanDataContainer parser =
+          new TsFileInsertionScanDataContainer(
+              alignedTsFile,
+              new PrefixPipePattern("root"),
+              Long.MIN_VALUE,
+              Long.MAX_VALUE,
+              null,
+              null,
+              false)) {
+        for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
parser.toTabletWithIsAligneds()) {
+          Assert.assertTrue(tabletWithIsAligned.getRight());
+          final Tablet tablet = tabletWithIsAligned.getLeft();
+          ++tabletCount;
+          Assert.assertEquals(measurementCount, tablet.getSchemas().size());
+          Assert.assertEquals(rowCount / 2, tablet.rowSize);
+          pointCount += getNonNullSize(tablet);
+        }
+      }
+
+      Assert.assertEquals(measurementCount * rowCount, pointCount);
+      Assert.assertEquals(2, tabletCount);
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+      sourceTsFile.delete();
+    }
+  }
+
+  @Test
+  public void 
testScanParserFlushesBatchedAlignedValueChunkGroupsByMemoryLimit() throws 
Exception {
+    final long originalPipeMaxReaderChunkSize =
+        CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+    final int originalPipeDataStructureTabletRowSize =
+        
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+
+    final int measurementCount = 20;
+    final int batchSize = 10;
+    final int rowCount = 4;
+    final File sourceTsFile = new 
File("aligned-source-for-batched-layout-memory-limit.tsfile");
+    alignedTsFile = new File("aligned-batched-layout-memory-limit.tsfile");
+
+    try {
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+
+      final List<MeasurementSchema> schemaList = new ArrayList<>();
+      for (int i = 0; i < measurementCount; ++i) {
+        schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64));
+      }
+
+      writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount);
+      rewriteAlignedTsFileWithBatchedValueChunks(
+          sourceTsFile, alignedTsFile, measurementCount, batchSize);
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(
+              
calculateFirstBatchedAlignedValueChunkGroupMemoryLimit(alignedTsFile, 
batchSize));
+
+      int tabletCount = 0;
+      int maxMeasurementCount = 0;
+      int pointCount = 0;
+      try (final TsFileInsertionScanDataContainer parser =
+          new TsFileInsertionScanDataContainer(
+              alignedTsFile,
+              new PrefixPipePattern("root"),
+              Long.MIN_VALUE,
+              Long.MAX_VALUE,
+              null,
+              null,
+              false)) {
+        for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
parser.toTabletWithIsAligneds()) {
+          Assert.assertTrue(tabletWithIsAligned.getRight());
+          final Tablet tablet = tabletWithIsAligned.getLeft();
+          ++tabletCount;
+          maxMeasurementCount = Math.max(maxMeasurementCount, 
tablet.getSchemas().size());
+          Assert.assertTrue(tablet.getSchemas().size() <= batchSize);
+          Assert.assertEquals(rowCount / 2, tablet.rowSize);
+          pointCount += getNonNullSize(tablet);
+        }
+      }
+
+      Assert.assertEquals(batchSize, maxMeasurementCount);
+      Assert.assertEquals(measurementCount * rowCount, pointCount);
+      Assert.assertEquals(measurementCount / batchSize * 2, tabletCount);
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+      sourceTsFile.delete();
+    }
+  }
+
+  @Test
+  public void testPipeTabletRowSizeCanBeDisabledByNonPositiveValue() {
+    final int originalPipeDataStructureTabletRowSize =
+        
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+    final int originalPipeDataStructureTabletSizeInBytes =
+        
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes();
+
+    try {
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(1024
 * 1024);
+
+      final BatchData batchData = new BatchData(TSDataType.INT64);
+      for (int i = 0; i < 1000; ++i) {
+        batchData.putAnObject(i, (long) i);
+      }
+
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(2);
+      final int rowCountWithLimit =
+          
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+      final int rowCountWithoutLimit =
+          
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(-1);
+      final int rowCountWithNegativeLimit =
+          
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+      Assert.assertEquals(rowCountWithoutLimit, rowCountWithNegativeLimit);
+      Assert.assertEquals(2, rowCountWithLimit);
+      Assert.assertTrue(rowCountWithoutLimit > rowCountWithLimit);
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletSizeInBytes(originalPipeDataStructureTabletSizeInBytes);
+    }
+  }
+
   public void testToTabletInsertionEvents(final boolean isQuery) throws 
Exception {
     // Test empty chunk
     testMixedTsFileWithEmptyChunk(isQuery);
@@ -1059,4 +1231,144 @@ public class TsFileInsertionDataContainerTest {
       return chunkSizeLimit;
     }
   }
+
+  private long calculateFirstBatchedAlignedValueChunkGroupMemoryLimit(
+      final File tsFile, final int batchSize) throws Exception {
+    try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+      final IDeviceID deviceID = 
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+      final List<AlignedChunkMetadata> alignedChunkMetadataList =
+          reader.getAlignedChunkMetadata(deviceID);
+      Assert.assertEquals(2, alignedChunkMetadataList.size());
+
+      final AlignedChunkMetadata alignedChunkMetadata = 
alignedChunkMetadataList.get(0);
+      final Chunk timeChunk =
+          reader.readMemChunk((ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
+      final List<IChunkMetadata> valueChunkMetadataList =
+          alignedChunkMetadata.getValueChunkMetadataList();
+      Assert.assertTrue(valueChunkMetadataList.size() >= batchSize * 2);
+
+      final List<Chunk> firstValueChunkBatch = new ArrayList<>();
+      final List<Chunk> firstTwoValueChunkBatches = new ArrayList<>();
+      long firstBatchChunkSize = 
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+      long firstTwoBatchChunkSize = firstBatchChunkSize;
+      for (int index = 0; index < batchSize * 2; ++index) {
+        final Chunk valueChunk =
+            reader.readMemChunk((ChunkMetadata) 
valueChunkMetadataList.get(index));
+        if (index < batchSize) {
+          firstValueChunkBatch.add(valueChunk);
+          firstBatchChunkSize += valueChunk.getHeader().getDataSize();
+        }
+        firstTwoValueChunkBatches.add(valueChunk);
+        firstTwoBatchChunkSize += valueChunk.getHeader().getDataSize();
+      }
+
+      final long firstBatchPageMemorySize =
+          AlignedSinglePageWholeChunkReader
+              .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+                  timeChunk, firstValueChunkBatch);
+      final long firstTwoBatchPageMemorySize =
+          AlignedSinglePageWholeChunkReader
+              .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+                  timeChunk, firstTwoValueChunkBatches);
+      Assert.assertTrue(firstTwoBatchChunkSize > firstBatchChunkSize);
+      Assert.assertTrue(firstTwoBatchPageMemorySize > 
firstBatchPageMemorySize);
+      return Math.max(firstBatchChunkSize, firstBatchPageMemorySize);
+    }
+  }
+
+  private void writeAlignedSourceTsFile(
+      final File tsFile, final List<MeasurementSchema> schemaList, final int 
rowCount)
+      throws IOException {
+    if (tsFile.exists()) {
+      Assert.assertTrue(tsFile.delete());
+    }
+    Assert.assertEquals(0, rowCount % 2);
+
+    final IDeviceID deviceID = new PlainDeviceID("root.sg.d");
+    try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) {
+      writer.startChunkGroup(deviceID);
+      final int rowCountPerChunk = rowCount / 2;
+      for (int chunkIndex = 0; chunkIndex < 2; ++chunkIndex) {
+        final AlignedChunkWriterImpl alignedChunkWriter =
+            new AlignedChunkWriterImpl(new 
ArrayList<IMeasurementSchema>(schemaList));
+        for (int row = 0; row < rowCountPerChunk; ++row) {
+          final long time = (long) chunkIndex * rowCountPerChunk + row;
+          alignedChunkWriter.getTimeChunkWriter().write(time);
+          for (int measurementIndex = 0; measurementIndex < schemaList.size(); 
++measurementIndex) {
+            alignedChunkWriter
+                .getValueChunkWriterByIndex(measurementIndex)
+                .write(time, time * 100 + measurementIndex, false);
+          }
+        }
+        alignedChunkWriter.writeToFileWriter(writer);
+      }
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+  }
+
+  private void rewriteAlignedTsFileWithBatchedValueChunks(
+      final File sourceTsFile,
+      final File targetTsFile,
+      final int measurementCount,
+      final int batchSize)
+      throws Exception {
+    if (targetTsFile.exists()) {
+      Assert.assertTrue(targetTsFile.delete());
+    }
+
+    try (final TsFileSequenceReader reader =
+        new TsFileSequenceReader(sourceTsFile.getAbsolutePath())) {
+      final IDeviceID deviceID = 
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+      final List<AlignedChunkMetadata> sourceAlignedChunkMetadataList =
+          reader.getAlignedChunkMetadata(deviceID);
+      Assert.assertEquals(2, sourceAlignedChunkMetadataList.size());
+      for (final AlignedChunkMetadata sourceAlignedChunkMetadata : 
sourceAlignedChunkMetadataList) {
+        Assert.assertEquals(
+            measurementCount, 
sourceAlignedChunkMetadata.getValueChunkMetadataList().size());
+      }
+
+      try (final CompactionTsFileWriter writer =
+          new CompactionTsFileWriter(
+              targetTsFile, Long.MAX_VALUE, 
CompactionType.INNER_SEQ_COMPACTION)) {
+        writer.startChunkGroup(deviceID);
+        writer.markStartingWritingAligned();
+        for (final AlignedChunkMetadata sourceAlignedChunkMetadata :
+            sourceAlignedChunkMetadataList) {
+          final ChunkMetadata timeChunkMetadata =
+              (ChunkMetadata) 
sourceAlignedChunkMetadata.getTimeChunkMetadata();
+          writer.writeChunk(reader.readMemChunk(timeChunkMetadata), 
timeChunkMetadata);
+        }
+
+        for (int start = 0; start < measurementCount; start += batchSize) {
+          writeValueChunkBatch(
+              reader,
+              writer,
+              sourceAlignedChunkMetadataList,
+              start,
+              Math.min(start + batchSize, measurementCount));
+        }
+        writer.markEndingWritingAligned();
+        writer.endChunkGroup();
+        writer.endFile();
+      }
+    }
+  }
+
+  private void writeValueChunkBatch(
+      final TsFileSequenceReader reader,
+      final CompactionTsFileWriter writer,
+      final List<AlignedChunkMetadata> alignedChunkMetadataList,
+      final int start,
+      final int end)
+      throws IOException {
+    for (final AlignedChunkMetadata alignedChunkMetadata : 
alignedChunkMetadataList) {
+      final List<IChunkMetadata> valueChunkMetadataList =
+          alignedChunkMetadata.getValueChunkMetadataList();
+      for (int index = start; index < end; ++index) {
+        final ChunkMetadata valueChunkMetadata = (ChunkMetadata) 
valueChunkMetadataList.get(index);
+        writer.writeChunk(reader.readMemChunk(valueChunkMetadata), 
valueChunkMetadata);
+      }
+    }
+  }
 }

Reply via email to