This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new ea45c4f9415 [To dev/1.3] Pipe: Fixed the first-chunk calculation bug
for scan parser (#17597) (#17622)
ea45c4f9415 is described below
commit ea45c4f9415fa2f6c90ef55463ed6675e01e5adb
Author: Caideyipi <[email protected]>
AuthorDate: Sat May 9 19:21:24 2026 +0800
[To dev/1.3] Pipe: Fixed the first-chunk calculation bug for scan parser
(#17597) (#17622)
* Pipe: Fixed the first-chunk calculation bug for scan parser (#17597)
* Update TsFileInsertionDataContainerTest.java
* Update TsFileInsertionDataContainerTest.java
---
.../scan/TsFileInsertionScanDataContainer.java | 21 ++++++---
.../event/TsFileInsertionDataContainerTest.java | 51 ++++++++++++++++++++++
2 files changed, 67 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 8ed9bdcd662..07b91cd68af 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -605,11 +605,6 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
final long chunkSize = timeChunkSize + valueChunkSize;
if (chunkSize + chunkHeader.getDataSize()
> allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
- if (valueChunkList.size() == 1
- && chunkSize >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
- PipeDataNodeResourceManager.memory()
- .forceResize(allocatedMemoryBlockForChunk,
chunkSize);
- }
needReturn = recordAlignedChunk(valueChunkList, marker);
}
}
@@ -619,9 +614,11 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
firstChunkHeader4NextSequentialValueChunks = chunkHeader;
return;
}
+
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList,
chunkHeader);
} else {
chunkHeader = firstChunkHeader4NextSequentialValueChunks;
firstChunkHeader4NextSequentialValueChunks = null;
+
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList,
chunkHeader);
}
Chunk chunk =
@@ -690,6 +687,20 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
return false;
}
+ private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(
+ final List<Chunk> valueChunkList, final ChunkHeader valueChunkHeader) {
+ if (!valueChunkList.isEmpty() || lastIndex < 0) {
+ return;
+ }
+
+ final long chunkSize =
+
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
+ + valueChunkHeader.getDataSize();
+ if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
+
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk,
chunkSize);
+ }
+ }
+
@Override
public void close() {
super.close();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index bd4e3923815..51fc1f3a556 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -56,6 +57,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
@@ -119,6 +121,47 @@ public class TsFileInsertionDataContainerTest {
System.out.println(System.currentTimeMillis() - startTime);
}
+ @Test
+ public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk()
throws Exception {
+ final long originalPipeMaxReaderChunkSize =
+ PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+ CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0);
+
+ alignedTsFile = new File("single-aligned-value-chunk.tsfile");
+ final List<MeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+
+ final Tablet tablet = new Tablet("root.sg.d", schemaList, 2);
+ tablet.rowSize = 2;
+ tablet.addTimestamp(0, 1);
+ tablet.addValue("s1", 0, 1L);
+ tablet.addTimestamp(1, 2);
+ tablet.addValue("s1", 1, 2L);
+
+ try {
+ try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+ writer.registerAlignedTimeseries(new Path("root.sg.d"), schemaList);
+ writer.writeAligned(tablet);
+ }
+
+ try (final TsFileInsertionScanDataContainer parser =
+ new TsFileInsertionScanDataContainer(
+ alignedTsFile,
+ new PrefixPipePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ false)) {
+
Assert.assertTrue(getAllocatedChunkMemory(parser).getMemoryUsageInBytes() > 0);
+ }
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+ }
+ }
+
public void testToTabletInsertionEvents(final boolean isQuery) throws
Exception {
// Test empty chunk
testMixedTsFileWithEmptyChunk(isQuery);
@@ -645,4 +688,12 @@ public class TsFileInsertionDataContainerTest {
}
return count;
}
+
+ private PipeMemoryBlock getAllocatedChunkMemory(final
TsFileInsertionScanDataContainer parser)
+ throws NoSuchFieldException, IllegalAccessException {
+ final Field field =
+
TsFileInsertionScanDataContainer.class.getDeclaredField("allocatedMemoryBlockForChunk");
+ field.setAccessible(true);
+ return (PipeMemoryBlock) field.get(parser);
+ }
}