This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new ea45c4f9415 [To dev/1.3] Pipe: Fixed the first-chunk calculation bug 
for scan parser (#17597) (#17622)
ea45c4f9415 is described below

commit ea45c4f9415fa2f6c90ef55463ed6675e01e5adb
Author: Caideyipi <[email protected]>
AuthorDate: Sat May 9 19:21:24 2026 +0800

    [To dev/1.3] Pipe: Fixed the first-chunk calculation bug for scan parser 
(#17597) (#17622)
    
    * Pipe: Fixed the first-chunk calculation bug for scan parser (#17597)
    
    * Update TsFileInsertionDataContainerTest.java
    
    * Update TsFileInsertionDataContainerTest.java
---
 .../scan/TsFileInsertionScanDataContainer.java     | 21 ++++++---
 .../event/TsFileInsertionDataContainerTest.java    | 51 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 8ed9bdcd662..07b91cd68af 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -605,11 +605,6 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
                   final long chunkSize = timeChunkSize + valueChunkSize;
                   if (chunkSize + chunkHeader.getDataSize()
                       > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
-                    if (valueChunkList.size() == 1
-                        && chunkSize > 
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
-                      PipeDataNodeResourceManager.memory()
-                          .forceResize(allocatedMemoryBlockForChunk, 
chunkSize);
-                    }
                     needReturn = recordAlignedChunk(valueChunkList, marker);
                   }
                 }
@@ -619,9 +614,11 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
                 firstChunkHeader4NextSequentialValueChunks = chunkHeader;
                 return;
               }
+              
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
             } else {
               chunkHeader = firstChunkHeader4NextSequentialValueChunks;
               firstChunkHeader4NextSequentialValueChunks = null;
+              
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, 
chunkHeader);
             }
 
             Chunk chunk =
@@ -690,6 +687,20 @@ public class TsFileInsertionScanDataContainer extends 
TsFileInsertionDataContain
     return false;
   }
 
+  private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(
+      final List<Chunk> valueChunkList, final ChunkHeader valueChunkHeader) {
+    if (!valueChunkList.isEmpty() || lastIndex < 0) {
+      return;
+    }
+
+    final long chunkSize =
+        
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
+            + valueChunkHeader.getDataSize();
+    if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+      
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, 
chunkSize);
+    }
+  }
+
   @Override
   public void close() {
     super.close();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index bd4e3923815..51fc1f3a556 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -56,6 +57,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -119,6 +121,47 @@ public class TsFileInsertionDataContainerTest {
     System.out.println(System.currentTimeMillis() - startTime);
   }
 
+  @Test
+  public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() 
throws Exception {
+    final long originalPipeMaxReaderChunkSize =
+        PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+    CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0);
+
+    alignedTsFile = new File("single-aligned-value-chunk.tsfile");
+    final List<MeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+
+    final Tablet tablet = new Tablet("root.sg.d", schemaList, 2);
+    tablet.rowSize = 2;
+    tablet.addTimestamp(0, 1);
+    tablet.addValue("s1", 0, 1L);
+    tablet.addTimestamp(1, 2);
+    tablet.addValue("s1", 1, 2L);
+
+    try {
+      try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+        writer.registerAlignedTimeseries(new Path("root.sg.d"), schemaList);
+        writer.writeAligned(tablet);
+      }
+
+      try (final TsFileInsertionScanDataContainer parser =
+          new TsFileInsertionScanDataContainer(
+              alignedTsFile,
+              new PrefixPipePattern("root"),
+              Long.MIN_VALUE,
+              Long.MAX_VALUE,
+              null,
+              null,
+              false)) {
+        
Assert.assertTrue(getAllocatedChunkMemory(parser).getMemoryUsageInBytes() > 0);
+      }
+    } finally {
+      CommonDescriptor.getInstance()
+          .getConfig()
+          .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+    }
+  }
+
   public void testToTabletInsertionEvents(final boolean isQuery) throws 
Exception {
     // Test empty chunk
     testMixedTsFileWithEmptyChunk(isQuery);
@@ -645,4 +688,12 @@ public class TsFileInsertionDataContainerTest {
     }
     return count;
   }
+
+  private PipeMemoryBlock getAllocatedChunkMemory(final 
TsFileInsertionScanDataContainer parser)
+      throws NoSuchFieldException, IllegalAccessException {
+    final Field field =
+        
TsFileInsertionScanDataContainer.class.getDeclaredField("allocatedMemoryBlockForChunk");
+    field.setAccessible(true);
+    return (PipeMemoryBlock) field.get(parser);
+  }
 }

Reply via email to