This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch fix/pipe-merge-batched-aligned-chunks
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 66b19a52a6832865df767ba8dcc31ca77bdf86ba
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 23 17:19:43 2026 +0800

    Pipe: merge batched aligned chunks in scan parser
---
 .../scan/TsFileInsertionEventScanParser.java       | 359 +++++++++++++++------
 .../pipe/resource/memory/PipeMemoryWeightUtil.java |  18 +-
 .../pipe/event/TsFileInsertionEventParserTest.java | 195 +++++++++++
 3 files changed, 464 insertions(+), 108 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 4c5cd75d4c1..1251127379e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -106,8 +106,9 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
   private final List<Long> timeChunkPageMemorySizeList = new ArrayList<>();
 
   private final Map<String, Integer> measurementIndexMap = new HashMap<>();
-  private int lastIndex = -1;
-  private Chunk firstChunk4NextSequentialValueChunks;
+  private final List<PendingAlignedChunkGroup> pendingAlignedChunkGroups = new 
ArrayList<>();
+  private long pendingAlignedChunkSize;
+  private CachedAlignedValueChunk cachedAlignedValueChunk;
 
   private byte lastMarker = Byte.MIN_VALUE;
 
@@ -588,14 +589,14 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
   private void moveToNextChunkReader()
       throws IOException, IllegalStateException, IllegalPathException {
     ChunkHeader chunkHeader;
-    long valueChunkSize = 0;
-    long valueChunkPageMemorySize = 0;
-    final List<Chunk> valueChunkList = new ArrayList<>();
     currentMeasurements.clear();
     modsInfos.clear();
 
     if (lastMarker == MetaMarker.SEPARATOR) {
-      chunkReader = null;
+      if (!recordPendingAlignedChunk(lastMarker)) {
+        clearCachedAlignedChunkData();
+        chunkReader = null;
+      }
       return;
     }
 
@@ -603,8 +604,8 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     while ((marker =
             lastMarker != Byte.MIN_VALUE
                 ? lastMarker
-                : Objects.nonNull(firstChunk4NextSequentialValueChunks)
-                    ? 
toValueChunkMarker(firstChunk4NextSequentialValueChunks.getHeader())
+                : Objects.nonNull(cachedAlignedValueChunk)
+                    ? 
toValueChunkMarker(cachedAlignedValueChunk.chunk.getHeader())
                     : tsFileSequenceReader.readMarker())
         != MetaMarker.SEPARATOR) {
       lastMarker = Byte.MIN_VALUE;
@@ -658,9 +659,8 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
         case MetaMarker.VALUE_CHUNK_HEADER:
         case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
           {
-            Chunk chunk;
-            long currentValueChunkPageMemorySize = 0;
-            if (Objects.isNull(firstChunk4NextSequentialValueChunks)) {
+            CachedAlignedValueChunk valueChunk = cachedAlignedValueChunk;
+            if (Objects.isNull(valueChunk)) {
               final long currentChunkHeaderOffset = 
tsFileSequenceReader.position() - 1;
               chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
 
@@ -668,7 +668,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                 break;
               }
 
-              // Increase value index
+              // Increase value index.
               final String measurementID =
                   
tabletStringInternPool.intern(chunkHeader.getMeasurementID());
               final int valueIndex =
@@ -676,86 +676,36 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                       measurementID,
                       (measurement, index) -> Objects.nonNull(index) ? index + 
1 : 0);
 
-              // Emit when encountered non-sequential value chunk, or the 
chunk size exceeds
-              // certain value to avoid OOM
-              // Do not record or end current value chunks when there are 
empty chunks
+              // Do not record or end current value chunks when there are 
empty chunks.
               if (chunkHeader.getDataSize() == 0) {
                 break;
               }
-              chunk =
+              final Chunk chunk =
                   new Chunk(
                       chunkHeader, tsFileSequenceReader.readChunk(-1, 
chunkHeader.getDataSize()));
-              currentValueChunkPageMemorySize = 
calculateMaxPageMemorySize(chunk);
-              boolean needReturn = false;
-              final long timeChunkSize =
-                  lastIndex >= 0
-                      ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
-                          timeChunkList.get(lastIndex))
-                      : 0;
-              final long timeChunkPageMemorySize =
-                  lastIndex >= 0 ? timeChunkPageMemorySizeList.get(lastIndex) 
: 0;
-              if (lastIndex >= 0) {
-                if (valueIndex != lastIndex) {
-                  needReturn = recordAlignedChunk(valueChunkList, marker);
-                } else {
-                  final long chunkSize = timeChunkSize + valueChunkSize;
-                  final long pageMemorySize = timeChunkPageMemorySize + 
valueChunkPageMemorySize;
-                  if (chunkSize + chunkHeader.getDataSize()
-                          > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()
-                      || timeChunkPageMemorySize > 0
-                          && currentValueChunkPageMemorySize > 0
-                          && pageMemorySize + currentValueChunkPageMemorySize
-                              > getPageDataMemoryLimitInBytes()) {
-                    needReturn = recordAlignedChunk(valueChunkList, marker);
-                  }
-                }
-              }
-              lastIndex = valueIndex;
-              if (needReturn) {
-                firstChunk4NextSequentialValueChunks = chunk;
-                return;
-              }
-              
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
-              resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
-                  valueChunkList, currentValueChunkPageMemorySize);
+              valueChunk =
+                  new CachedAlignedValueChunk(
+                      valueIndex,
+                      chunk,
+                      chunkHeader.getDataSize(),
+                      calculateMaxPageMemorySize(chunk));
             } else {
-              chunk = firstChunk4NextSequentialValueChunks;
-              chunkHeader = chunk.getHeader();
-              firstChunk4NextSequentialValueChunks = null;
-              currentValueChunkPageMemorySize = 
calculateMaxPageMemorySize(chunk);
-              
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
-              resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
-                  valueChunkList, currentValueChunkPageMemorySize);
+              cachedAlignedValueChunk = null;
             }
 
-            valueChunkSize += chunkHeader.getDataSize();
-            valueChunkPageMemorySize += currentValueChunkPageMemorySize;
-            valueChunkList.add(chunk);
-            final String measurementID =
-                tabletStringInternPool.intern(chunkHeader.getMeasurementID());
-            currentMeasurements.add(
-                new MeasurementSchema(
-                    measurementID,
-                    chunkHeader.getDataType(),
-                    chunkHeader.getEncodingType(),
-                    chunkHeader.getCompressionType()));
-            modsInfos.addAll(
-                ModsOperationUtil.initializeMeasurementMods(
-                    currentDevice, Collections.singletonList(measurementID), 
currentModifications));
+            if (returnPendingAlignedChunkBeforeCaching(valueChunk)) {
+              return;
+            }
+            cacheAlignedValueChunk(valueChunk);
             break;
           }
         case MetaMarker.CHUNK_GROUP_HEADER:
           {
-            // Return before "currentDevice" changes
-            if (recordAlignedChunk(valueChunkList, marker)) {
+            // Return before "currentDevice" changes.
+            if (recordPendingAlignedChunk(marker)) {
               return;
             }
-            // Clear because the cached data will never be used in the next 
chunk group
-            lastIndex = -1;
-            timeChunkList.clear();
-            isMultiPageList.clear();
-            timeChunkPageMemorySizeList.clear();
-            measurementIndexMap.clear();
+            clearCachedAlignedChunkData();
             final IDeviceID deviceID = 
tsFileSequenceReader.readChunkGroupHeader().getDeviceID();
             currentDevice = treePattern.mayOverlapWithDevice(deviceID) ? 
deviceID : null;
             currentDeviceString =
@@ -775,7 +725,8 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     }
 
     lastMarker = marker;
-    if (!recordAlignedChunk(valueChunkList, marker)) {
+    if (!recordPendingAlignedChunk(marker)) {
+      clearCachedAlignedChunkData();
       chunkReader = null;
     }
   }
@@ -859,22 +810,38 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     return false;
   }
 
-  private boolean recordAlignedChunk(final List<Chunk> valueChunkList, final 
byte marker)
-      throws IOException {
-    if (!valueChunkList.isEmpty()) {
-      final Chunk timeChunk = timeChunkList.get(lastIndex);
+  private boolean recordPendingAlignedChunk(final byte marker) throws 
IOException {
+    while (!pendingAlignedChunkGroups.isEmpty()) {
+      final PendingAlignedChunkGroup pendingAlignedChunkGroup = 
pendingAlignedChunkGroups.remove(0);
+      pendingAlignedChunkSize =
+          Math.max(0, pendingAlignedChunkSize - 
pendingAlignedChunkGroup.chunkSize);
+
+      if (pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
+        continue;
+      }
+
+      final Chunk timeChunk = 
timeChunkList.get(pendingAlignedChunkGroup.timeChunkIndex);
       timeChunk.getData().rewind();
-      currentIsMultiPage = isMultiPageList.get(lastIndex);
+      for (final Chunk valueChunk : pendingAlignedChunkGroup.valueChunkList) {
+        valueChunk.getData().rewind();
+      }
+
+      currentMeasurements.clear();
+      currentMeasurements.addAll(pendingAlignedChunkGroup.measurements);
+      modsInfos.clear();
+      modsInfos.addAll(pendingAlignedChunkGroup.modsInfos);
+
+      currentIsMultiPage = 
isMultiPageList.get(pendingAlignedChunkGroup.timeChunkIndex);
       if (!currentIsMultiPage) {
         resizePageDataMemoryIfNeeded(
             
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
-                timeChunk, valueChunkList));
+                timeChunk, pendingAlignedChunkGroup.valueChunkList));
       }
       final List<Long> pageEstimatedMemoryUsageInBytesList =
           currentIsMultiPage
               ? AlignedSinglePageWholeChunkReader
                   .calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
-                      timeChunk, valueChunkList)
+                      timeChunk, pendingAlignedChunkGroup.valueChunkList)
               : Collections.emptyList();
       final long maxPageEstimatedMemoryUsageInBytes =
           pageEstimatedMemoryUsageInBytesList.isEmpty()
@@ -884,42 +851,193 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
       chunkReader =
           currentIsMultiPage
               ? new MemoryControlledChunkReader(
-                  new AlignedChunkReader(timeChunk, valueChunkList, filter),
+                  new AlignedChunkReader(
+                      timeChunk, pendingAlignedChunkGroup.valueChunkList, 
filter),
                   pageEstimatedMemoryUsageInBytesList)
-              : new AlignedSinglePageWholeChunkReader(timeChunk, 
valueChunkList, null);
+              : new AlignedSinglePageWholeChunkReader(
+                  timeChunk, pendingAlignedChunkGroup.valueChunkList, null);
       currentIsAligned = true;
-      lastMarker = marker;
+      if (marker != Byte.MIN_VALUE) {
+        lastMarker = marker;
+      }
       return true;
     }
     return false;
   }
 
+  private boolean shouldReturnPendingAlignedChunkBeforeCaching(
+      final CachedAlignedValueChunk valueChunk) throws IOException {
+    validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex);
+
+    final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+        findPendingAlignedChunkGroup(valueChunk.timeChunkIndex);
+    final boolean isFirstValueChunkInGroup =
+        Objects.isNull(pendingAlignedChunkGroup)
+            || pendingAlignedChunkGroup.valueChunkList.isEmpty();
+    final long timeChunkSize =
+        Objects.isNull(pendingAlignedChunkGroup)
+            ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
+                timeChunkList.get(valueChunk.timeChunkIndex))
+            : 0;
+    final long chunkSizeAfterCaching =
+        pendingAlignedChunkSize + timeChunkSize + valueChunk.valueChunkSize;
+
+    if (isFirstValueChunkInGroup) {
+      final long firstValueChunkGroupSize =
+          timeChunkSize
+              + (Objects.isNull(pendingAlignedChunkGroup) ? 0 : 
pendingAlignedChunkGroup.chunkSize)
+              + valueChunk.valueChunkSize;
+      if (firstValueChunkGroupSize > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+        return !pendingAlignedChunkGroups.isEmpty();
+      }
+    }
+
+    if (!pendingAlignedChunkGroups.isEmpty()
+        && chunkSizeAfterCaching > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+      return true;
+    }
+
+    final long timeChunkPageMemorySize = 
timeChunkPageMemorySizeList.get(valueChunk.timeChunkIndex);
+    if (timeChunkPageMemorySize <= 0 || valueChunk.valueChunkPageMemorySize <= 
0) {
+      return false;
+    }
+
+    final long pageMemorySizeAfterCaching =
+        timeChunkPageMemorySize
+            + (Objects.isNull(pendingAlignedChunkGroup)
+                ? 0
+                : pendingAlignedChunkGroup.valueChunkPageMemorySize)
+            + valueChunk.valueChunkPageMemorySize;
+    if (pageMemorySizeAfterCaching <= getPageDataMemoryLimitInBytes()) {
+      return false;
+    }
+
+    if (isFirstValueChunkInGroup) {
+      final long firstValueChunkPageMemorySize =
+          timeChunkPageMemorySize + valueChunk.valueChunkPageMemorySize;
+      return firstValueChunkPageMemorySize <= getPageDataMemoryLimitInBytes()
+          || !pendingAlignedChunkGroups.isEmpty();
+    }
+
+    return true;
+  }
+
+  private boolean returnPendingAlignedChunkBeforeCaching(final 
CachedAlignedValueChunk valueChunk)
+      throws IOException {
+    if (!shouldReturnPendingAlignedChunkBeforeCaching(valueChunk)) {
+      return false;
+    }
+
+    cachedAlignedValueChunk = valueChunk;
+    if (recordPendingAlignedChunk(Byte.MIN_VALUE)) {
+      return true;
+    }
+    cachedAlignedValueChunk = null;
+    return false;
+  }
+
+  private void cacheAlignedValueChunk(final CachedAlignedValueChunk 
valueChunk) throws IOException {
+    validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex);
+
+    final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+        getOrCreatePendingAlignedChunkGroup(valueChunk.timeChunkIndex);
+    
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup, 
valueChunk);
+    
resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup,
 valueChunk);
+
+    pendingAlignedChunkGroup.valueChunkList.add(valueChunk.chunk);
+    pendingAlignedChunkGroup.chunkSize += valueChunk.valueChunkSize;
+    pendingAlignedChunkGroup.valueChunkPageMemorySize += 
valueChunk.valueChunkPageMemorySize;
+    pendingAlignedChunkSize += valueChunk.valueChunkSize;
+
+    final ChunkHeader chunkHeader = valueChunk.chunk.getHeader();
+    final String measurementID = 
tabletStringInternPool.intern(chunkHeader.getMeasurementID());
+    pendingAlignedChunkGroup.measurements.add(
+        new MeasurementSchema(
+            measurementID,
+            chunkHeader.getDataType(),
+            chunkHeader.getEncodingType(),
+            chunkHeader.getCompressionType()));
+    pendingAlignedChunkGroup.modsInfos.addAll(
+        ModsOperationUtil.initializeMeasurementMods(
+            currentDevice, Collections.singletonList(measurementID), 
currentModifications));
+  }
+
+  private PendingAlignedChunkGroup getOrCreatePendingAlignedChunkGroup(final 
int timeChunkIndex) {
+    final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+        findPendingAlignedChunkGroup(timeChunkIndex);
+    if (Objects.nonNull(pendingAlignedChunkGroup)) {
+      return pendingAlignedChunkGroup;
+    }
+
+    final PendingAlignedChunkGroup newPendingAlignedChunkGroup =
+        new PendingAlignedChunkGroup(
+            timeChunkIndex,
+            
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(timeChunkIndex)),
+            timeChunkPageMemorySizeList.get(timeChunkIndex));
+    pendingAlignedChunkSize += newPendingAlignedChunkGroup.chunkSize;
+
+    for (int i = 0; i < pendingAlignedChunkGroups.size(); ++i) {
+      if (pendingAlignedChunkGroups.get(i).timeChunkIndex > timeChunkIndex) {
+        pendingAlignedChunkGroups.add(i, newPendingAlignedChunkGroup);
+        return newPendingAlignedChunkGroup;
+      }
+    }
+    pendingAlignedChunkGroups.add(newPendingAlignedChunkGroup);
+    return newPendingAlignedChunkGroup;
+  }
+
+  private PendingAlignedChunkGroup findPendingAlignedChunkGroup(final int 
timeChunkIndex) {
+    for (final PendingAlignedChunkGroup pendingAlignedChunkGroup : 
pendingAlignedChunkGroups) {
+      if (pendingAlignedChunkGroup.timeChunkIndex == timeChunkIndex) {
+        return pendingAlignedChunkGroup;
+      }
+    }
+    return null;
+  }
+
+  private void validateAlignedValueChunkTimeIndex(final int timeChunkIndex) 
throws IOException {
+    if (timeChunkIndex < 0 || timeChunkIndex >= timeChunkList.size()) {
+      throw new IOException(
+          String.format(
+              "Invalid aligned value chunk index %d, while there are %d time 
chunks.",
+              timeChunkIndex, timeChunkList.size()));
+    }
+  }
+
+  private void clearCachedAlignedChunkData() {
+    pendingAlignedChunkGroups.clear();
+    pendingAlignedChunkSize = 0;
+    cachedAlignedValueChunk = null;
+    timeChunkList.clear();
+    isMultiPageList.clear();
+    timeChunkPageMemorySizeList.clear();
+    measurementIndexMap.clear();
+  }
+
   private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(
-      final List<Chunk> valueChunkList, final ChunkHeader valueChunkHeader) {
-    if (!valueChunkList.isEmpty() || lastIndex < 0) {
+      final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+      final CachedAlignedValueChunk valueChunk) {
+    if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
       return;
     }
 
-    final long chunkSize =
-        
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
-            + valueChunkHeader.getDataSize();
+    final long chunkSize = pendingAlignedChunkGroup.chunkSize + 
valueChunk.valueChunkSize;
     if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
       
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
chunkSize);
     }
   }
 
   private void resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
-      final List<Chunk> valueChunkList, final long valueChunkPageMemorySize) {
-    if (!valueChunkList.isEmpty() || lastIndex < 0 || valueChunkPageMemorySize 
<= 0) {
+      final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+      final CachedAlignedValueChunk valueChunk) {
+    if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()
+        || pendingAlignedChunkGroup.timeChunkPageMemorySize <= 0
+        || valueChunk.valueChunkPageMemorySize <= 0) {
       return;
     }
 
-    final long timeChunkPageMemorySize = 
timeChunkPageMemorySizeList.get(lastIndex);
-    if (timeChunkPageMemorySize <= 0) {
-      return;
-    }
-
-    final long pageMemorySize = timeChunkPageMemorySize + 
valueChunkPageMemorySize;
+    final long pageMemorySize =
+        pendingAlignedChunkGroup.timeChunkPageMemorySize + 
valueChunk.valueChunkPageMemorySize;
     if (pageMemorySize > getPageDataMemoryLimitInBytes()) {
       PipeDataNodeResourceManager.memory()
           .forceResize(allocatedMemoryBlockForBatchData, pageMemorySize);
@@ -980,4 +1098,41 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     }
     return null;
   }
+
+  private static class PendingAlignedChunkGroup {
+
+    private final int timeChunkIndex;
+    private final List<Chunk> valueChunkList = new ArrayList<>();
+    private final List<IMeasurementSchema> measurements = new ArrayList<>();
+    private final List<ModsOperationUtil.ModsInfo> modsInfos = new 
ArrayList<>();
+    private final long timeChunkPageMemorySize;
+    private long chunkSize;
+    private long valueChunkPageMemorySize;
+
+    private PendingAlignedChunkGroup(
+        final int timeChunkIndex, final long timeChunkSize, final long 
timeChunkPageMemorySize) {
+      this.timeChunkIndex = timeChunkIndex;
+      this.chunkSize = timeChunkSize;
+      this.timeChunkPageMemorySize = timeChunkPageMemorySize;
+    }
+  }
+
+  private static class CachedAlignedValueChunk {
+
+    private final int timeChunkIndex;
+    private final Chunk chunk;
+    private final long valueChunkSize;
+    private final long valueChunkPageMemorySize;
+
+    private CachedAlignedValueChunk(
+        final int timeChunkIndex,
+        final Chunk chunk,
+        final long valueChunkSize,
+        final long valueChunkPageMemorySize) {
+      this.timeChunkIndex = timeChunkIndex;
+      this.chunk = chunk;
+      this.valueChunkSize = valueChunkSize;
+      this.valueChunkPageMemorySize = valueChunkPageMemorySize;
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index 04eff0067e5..33ceacf278c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -191,6 +191,14 @@ public class PipeMemoryWeightUtil {
       return new Pair<>(1, 0);
     }
 
+    final int configuredTabletRowSize =
+        PipeConfig.getInstance().getPipeDataStructureTabletRowSize();
+    final boolean hasTabletRowSizeLimit = configuredTabletRowSize > 0;
+    final double inputSizeLimit =
+        hasTabletRowSizeLimit && inputNum > 0
+            ? 100 + inputNum * (double) rowBytesUsed * 1.2
+            : Integer.MAX_VALUE;
+
     // Calculate row number according to the max size of a pipe tablet. "100" 
is the estimated size
     // of other data structures in a pipe tablet.
     // "*8" converts bytes to bits, because the bitmap size is 1 bit per 
schema.
@@ -198,17 +206,15 @@ public class PipeMemoryWeightUtil {
         (int)
             Math.min(
                 
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
-                Math.min(Integer.MAX_VALUE, 100 + inputNum * (double) 
rowBytesUsed * 1.2));
+                Math.min(Integer.MAX_VALUE, inputSizeLimit));
 
     int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
     rowNumber = Math.max(1, rowNumber);
 
-    if ( // This means the row number is larger than the max row count of a 
pipe tablet
-    rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) {
+    // This means the row number is larger than the max row count of a pipe 
tablet.
+    if (hasTabletRowSizeLimit && rowNumber > configuredTabletRowSize) {
       // Bound the row number, the memory cost is rowSize * rowNumber
-      return new Pair<>(
-          PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
-          rowBytesUsed * 
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+      return new Pair<>(configuredTabletRowSize, rowBytesUsed * 
configuredTabletRowSize);
     } else {
       return new Pair<>(rowNumber, sizeLimit);
     }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 50109b935c3..626b01c84e0 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -36,6 +36,8 @@ import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionE
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -55,6 +57,7 @@ import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.BatchData;
 import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.read.common.TimeRange;
@@ -63,9 +66,11 @@ import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.TsFileGeneratorUtils;
 import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -436,6 +441,100 @@ public class TsFileInsertionEventParserTest {
     }
   }
 
+  @Test
+  public void testScanParserMergesBatchedAlignedValueChunkGroups() throws 
Exception {
+    final long originalPipeMaxReaderChunkSize =
+        CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+    final int originalPipeDataStructureTabletRowSize =
+        
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+
+    final int measurementCount = 20;
+    final int batchSize = 10;
+    final int rowCount = 4;
+    final File sourceTsFile = new 
File("aligned-source-for-batched-layout.tsfile");
+    alignedTsFile = new File("aligned-batched-layout.tsfile");
+
+    try {
+      
CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(1024 * 
1024L);
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+
+      final List<IMeasurementSchema> schemaList = new ArrayList<>();
+      for (int i = 0; i < measurementCount; ++i) {
+        schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64));
+      }
+
+      writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount);
+      rewriteAlignedTsFileWithBatchedValueChunks(
+          sourceTsFile, alignedTsFile, measurementCount, batchSize);
+
+      int tabletCount = 0;
+      int pointCount = 0;
+      try (final TsFileInsertionEventScanParser parser =
+          new TsFileInsertionEventScanParser(
+              alignedTsFile,
+              new PrefixTreePattern("root"),
+              Long.MIN_VALUE,
+              Long.MAX_VALUE,
+              null,
+              null,
+              false)) {
+        for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
parser.toTabletWithIsAligneds()) {
+          Assert.assertTrue(tabletWithIsAligned.getRight());
+          final Tablet tablet = tabletWithIsAligned.getLeft();
+          ++tabletCount;
+          Assert.assertEquals(measurementCount, tablet.getSchemas().size());
+          pointCount += getNonNullSize(tablet);
+        }
+      }
+
+      Assert.assertEquals(measurementCount * rowCount, pointCount);
+      Assert.assertTrue(tabletCount > 0);
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+      sourceTsFile.delete();
+    }
+  }
+
+  @Test
+  public void testPipeTabletRowSizeCanBeDisabledByNonPositiveValue() {
+    final int originalPipeDataStructureTabletRowSize =
+        
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+    final int originalPipeDataStructureTabletSizeInBytes =
+        
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes();
+
+    try {
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(1024
 * 1024);
+
+      final BatchData batchData = new BatchData(TSDataType.INT64);
+      for (int i = 0; i < 1000; ++i) {
+        batchData.putAnObject(i, (long) i);
+      }
+
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(2);
+      final int rowCountWithLimit =
+          
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+      
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+      final int rowCountWithoutLimit =
+          
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+      Assert.assertEquals(2, rowCountWithLimit);
+      Assert.assertTrue(rowCountWithoutLimit > rowCountWithLimit);
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+      CommonDescriptor.getInstance()
+          .getConfig()
+          
.setPipeDataStructureTabletSizeInBytes(originalPipeDataStructureTabletSizeInBytes);
+    }
+  }
+
   @Test
   public void testQueryParserSkipsUnnecessaryBitMaps() throws Exception {
     testTreeParserSkipsUnnecessaryBitMaps(true);
@@ -1887,6 +1986,102 @@ public class TsFileInsertionEventParserTest {
     }
   }
 
+  private void writeAlignedSourceTsFile(
+      final File tsFile, final List<IMeasurementSchema> schemaList, final int 
rowCount)
+      throws IOException {
+    if (tsFile.exists()) {
+      Assert.assertTrue(tsFile.delete());
+    }
+    Assert.assertEquals(0, rowCount % 2);
+
+    final IDeviceID deviceID = 
IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg.d");
+    try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) {
+      writer.startChunkGroup(deviceID);
+      final int rowCountPerChunk = rowCount / 2;
+      for (int chunkIndex = 0; chunkIndex < 2; ++chunkIndex) {
+        final AlignedChunkWriterImpl alignedChunkWriter = new 
AlignedChunkWriterImpl(schemaList);
+        for (int row = 0; row < rowCountPerChunk; ++row) {
+          final long time = (long) chunkIndex * rowCountPerChunk + row;
+          alignedChunkWriter.getTimeChunkWriter().write(time);
+          for (int measurementIndex = 0; measurementIndex < schemaList.size(); 
++measurementIndex) {
+            alignedChunkWriter
+                .getValueChunkWriterByIndex(measurementIndex)
+                .write(time, time * 100 + measurementIndex, false);
+          }
+        }
+        alignedChunkWriter.writeToFileWriter(writer);
+      }
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+  }
+
+  private void rewriteAlignedTsFileWithBatchedValueChunks(
+      final File sourceTsFile,
+      final File targetTsFile,
+      final int measurementCount,
+      final int batchSize)
+      throws Exception {
+    if (targetTsFile.exists()) {
+      Assert.assertTrue(targetTsFile.delete());
+    }
+
+    try (final TsFileSequenceReader reader =
+        new TsFileSequenceReader(sourceTsFile.getAbsolutePath())) {
+      final IDeviceID deviceID = 
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+      final List<AbstractAlignedChunkMetadata> sourceAlignedChunkMetadataList =
+          reader.getAlignedChunkMetadata(deviceID, true);
+      Assert.assertEquals(2, sourceAlignedChunkMetadataList.size());
+      for (final AbstractAlignedChunkMetadata sourceAlignedChunkMetadata :
+          sourceAlignedChunkMetadataList) {
+        Assert.assertEquals(
+            measurementCount, 
sourceAlignedChunkMetadata.getValueChunkMetadataList().size());
+      }
+
+      try (final CompactionTsFileWriter writer =
+          new CompactionTsFileWriter(
+              targetTsFile, Long.MAX_VALUE, 
CompactionType.INNER_SEQ_COMPACTION)) {
+        writer.startChunkGroup(deviceID);
+        writer.markStartingWritingAligned();
+        for (final AbstractAlignedChunkMetadata sourceAlignedChunkMetadata :
+            sourceAlignedChunkMetadataList) {
+          final ChunkMetadata timeChunkMetadata =
+              (ChunkMetadata) 
sourceAlignedChunkMetadata.getTimeChunkMetadata();
+          writer.writeChunk(reader.readMemChunk(timeChunkMetadata), 
timeChunkMetadata);
+        }
+
+        for (int start = 0; start < measurementCount; start += batchSize) {
+          writeValueChunkBatch(
+              reader,
+              writer,
+              sourceAlignedChunkMetadataList,
+              start,
+              Math.min(start + batchSize, measurementCount));
+        }
+        writer.markEndingWritingAligned();
+        writer.endChunkGroup();
+        writer.endFile();
+      }
+    }
+  }
+
+  private void writeValueChunkBatch(
+      final TsFileSequenceReader reader,
+      final CompactionTsFileWriter writer,
+      final List<AbstractAlignedChunkMetadata> alignedChunkMetadataList,
+      final int start,
+      final int end)
+      throws IOException {
+    for (final AbstractAlignedChunkMetadata alignedChunkMetadata : 
alignedChunkMetadataList) {
+      final List<IChunkMetadata> valueChunkMetadataList =
+          alignedChunkMetadata.getValueChunkMetadataList();
+      for (int index = start; index < end; ++index) {
+        final ChunkMetadata valueChunkMetadata = (ChunkMetadata) 
valueChunkMetadataList.get(index);
+        writer.writeChunk(reader.readMemChunk(valueChunkMetadata), 
valueChunkMetadata);
+      }
+    }
+  }
+
   private static class ParserPerformanceStats {
 
     private long pointCount;


Reply via email to