This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch fix/pipe-merge-batched-aligned-chunks in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 66b19a52a6832865df767ba8dcc31ca77bdf86ba Author: Caideyipi <[email protected]> AuthorDate: Tue Jun 23 17:19:43 2026 +0800 Pipe: merge batched aligned chunks in scan parser --- .../scan/TsFileInsertionEventScanParser.java | 359 +++++++++++++++------ .../pipe/resource/memory/PipeMemoryWeightUtil.java | 18 +- .../pipe/event/TsFileInsertionEventParserTest.java | 195 +++++++++++ 3 files changed, 464 insertions(+), 108 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 4c5cd75d4c1..1251127379e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -106,8 +106,9 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { private final List<Long> timeChunkPageMemorySizeList = new ArrayList<>(); private final Map<String, Integer> measurementIndexMap = new HashMap<>(); - private int lastIndex = -1; - private Chunk firstChunk4NextSequentialValueChunks; + private final List<PendingAlignedChunkGroup> pendingAlignedChunkGroups = new ArrayList<>(); + private long pendingAlignedChunkSize; + private CachedAlignedValueChunk cachedAlignedValueChunk; private byte lastMarker = Byte.MIN_VALUE; @@ -588,14 +589,14 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { private void moveToNextChunkReader() throws IOException, IllegalStateException, IllegalPathException { ChunkHeader chunkHeader; - long valueChunkSize = 0; - long valueChunkPageMemorySize = 0; - final List<Chunk> valueChunkList = new ArrayList<>(); currentMeasurements.clear(); modsInfos.clear(); if (lastMarker == MetaMarker.SEPARATOR) { - chunkReader = null; + if (!recordPendingAlignedChunk(lastMarker)) { + clearCachedAlignedChunkData(); + chunkReader = null; + } return; } @@ -603,8 +604,8 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { while ((marker = lastMarker != Byte.MIN_VALUE ? lastMarker - : Objects.nonNull(firstChunk4NextSequentialValueChunks) - ? toValueChunkMarker(firstChunk4NextSequentialValueChunks.getHeader()) + : Objects.nonNull(cachedAlignedValueChunk) + ? toValueChunkMarker(cachedAlignedValueChunk.chunk.getHeader()) : tsFileSequenceReader.readMarker()) != MetaMarker.SEPARATOR) { lastMarker = Byte.MIN_VALUE; @@ -658,9 +659,8 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { case MetaMarker.VALUE_CHUNK_HEADER: case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: { - Chunk chunk; - long currentValueChunkPageMemorySize = 0; - if (Objects.isNull(firstChunk4NextSequentialValueChunks)) { + CachedAlignedValueChunk valueChunk = cachedAlignedValueChunk; + if (Objects.isNull(valueChunk)) { final long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); @@ -668,7 +668,7 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { break; } - // Increase value index + // Increase value index. final String measurementID = tabletStringInternPool.intern(chunkHeader.getMeasurementID()); final int valueIndex = @@ -676,86 +676,36 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { measurementID, (measurement, index) -> Objects.nonNull(index) ? index + 1 : 0); - // Emit when encountered non-sequential value chunk, or the chunk size exceeds - // certain value to avoid OOM - // Do not record or end current value chunks when there are empty chunks + // Do not record or end current value chunks when there are empty chunks. if (chunkHeader.getDataSize() == 0) { break; } - chunk = + final Chunk chunk = new Chunk( chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); - currentValueChunkPageMemorySize = calculateMaxPageMemorySize(chunk); - boolean needReturn = false; - final long timeChunkSize = - lastIndex >= 0 - ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed( - timeChunkList.get(lastIndex)) - : 0; - final long timeChunkPageMemorySize = - lastIndex >= 0 ? timeChunkPageMemorySizeList.get(lastIndex) : 0; - if (lastIndex >= 0) { - if (valueIndex != lastIndex) { - needReturn = recordAlignedChunk(valueChunkList, marker); - } else { - final long chunkSize = timeChunkSize + valueChunkSize; - final long pageMemorySize = timeChunkPageMemorySize + valueChunkPageMemorySize; - if (chunkSize + chunkHeader.getDataSize() - > allocatedMemoryBlockForChunk.getMemoryUsageInBytes() - || timeChunkPageMemorySize > 0 - && currentValueChunkPageMemorySize > 0 - && pageMemorySize + currentValueChunkPageMemorySize - > getPageDataMemoryLimitInBytes()) { - needReturn = recordAlignedChunk(valueChunkList, marker); - } - } - } - lastIndex = valueIndex; - if (needReturn) { - firstChunk4NextSequentialValueChunks = chunk; - return; - } - resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); - resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit( - valueChunkList, currentValueChunkPageMemorySize); + valueChunk = + new CachedAlignedValueChunk( + valueIndex, + chunk, + chunkHeader.getDataSize(), + calculateMaxPageMemorySize(chunk)); } else { - chunk = firstChunk4NextSequentialValueChunks; - chunkHeader = chunk.getHeader(); - firstChunk4NextSequentialValueChunks = null; - currentValueChunkPageMemorySize = calculateMaxPageMemorySize(chunk); - resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); - resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit( - valueChunkList, currentValueChunkPageMemorySize); + cachedAlignedValueChunk = null; } - valueChunkSize += chunkHeader.getDataSize(); - valueChunkPageMemorySize += currentValueChunkPageMemorySize; - valueChunkList.add(chunk); - final String measurementID = - tabletStringInternPool.intern(chunkHeader.getMeasurementID()); - currentMeasurements.add( - new MeasurementSchema( - measurementID, - chunkHeader.getDataType(), - chunkHeader.getEncodingType(), - chunkHeader.getCompressionType())); - modsInfos.addAll( - ModsOperationUtil.initializeMeasurementMods( - currentDevice, Collections.singletonList(measurementID), currentModifications)); + if (returnPendingAlignedChunkBeforeCaching(valueChunk)) { + return; + } + cacheAlignedValueChunk(valueChunk); break; } case MetaMarker.CHUNK_GROUP_HEADER: { - // Return before "currentDevice" changes - if (recordAlignedChunk(valueChunkList, marker)) { + // Return before "currentDevice" changes. + if (recordPendingAlignedChunk(marker)) { return; } - // Clear because the cached data will never be used in the next chunk group - lastIndex = -1; - timeChunkList.clear(); - isMultiPageList.clear(); - timeChunkPageMemorySizeList.clear(); - measurementIndexMap.clear(); + clearCachedAlignedChunkData(); final IDeviceID deviceID = tsFileSequenceReader.readChunkGroupHeader().getDeviceID(); currentDevice = treePattern.mayOverlapWithDevice(deviceID) ? deviceID : null; currentDeviceString = @@ -775,7 +725,8 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { } lastMarker = marker; - if (!recordAlignedChunk(valueChunkList, marker)) { + if (!recordPendingAlignedChunk(marker)) { + clearCachedAlignedChunkData(); chunkReader = null; } } @@ -859,22 +810,38 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { return false; } - private boolean recordAlignedChunk(final List<Chunk> valueChunkList, final byte marker) - throws IOException { - if (!valueChunkList.isEmpty()) { - final Chunk timeChunk = timeChunkList.get(lastIndex); + private boolean recordPendingAlignedChunk(final byte marker) throws IOException { + while (!pendingAlignedChunkGroups.isEmpty()) { + final PendingAlignedChunkGroup pendingAlignedChunkGroup = pendingAlignedChunkGroups.remove(0); + pendingAlignedChunkSize = + Math.max(0, pendingAlignedChunkSize - pendingAlignedChunkGroup.chunkSize); + + if (pendingAlignedChunkGroup.valueChunkList.isEmpty()) { + continue; + } + + final Chunk timeChunk = timeChunkList.get(pendingAlignedChunkGroup.timeChunkIndex); timeChunk.getData().rewind(); - currentIsMultiPage = isMultiPageList.get(lastIndex); + for (final Chunk valueChunk : pendingAlignedChunkGroup.valueChunkList) { + valueChunk.getData().rewind(); + } + + currentMeasurements.clear(); + currentMeasurements.addAll(pendingAlignedChunkGroup.measurements); + modsInfos.clear(); + modsInfos.addAll(pendingAlignedChunkGroup.modsInfos); + + currentIsMultiPage = isMultiPageList.get(pendingAlignedChunkGroup.timeChunkIndex); if (!currentIsMultiPage) { resizePageDataMemoryIfNeeded( AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes( - timeChunk, valueChunkList)); + timeChunk, pendingAlignedChunkGroup.valueChunkList)); } final List<Long> pageEstimatedMemoryUsageInBytesList = currentIsMultiPage ? AlignedSinglePageWholeChunkReader .calculatePageEstimatedMemoryUsageInBytesWithBatchDataList( - timeChunk, valueChunkList) + timeChunk, pendingAlignedChunkGroup.valueChunkList) : Collections.emptyList(); final long maxPageEstimatedMemoryUsageInBytes = pageEstimatedMemoryUsageInBytesList.isEmpty() @@ -884,42 +851,193 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { chunkReader = currentIsMultiPage ? new MemoryControlledChunkReader( - new AlignedChunkReader(timeChunk, valueChunkList, filter), + new AlignedChunkReader( + timeChunk, pendingAlignedChunkGroup.valueChunkList, filter), pageEstimatedMemoryUsageInBytesList) - : new AlignedSinglePageWholeChunkReader(timeChunk, valueChunkList, null); + : new AlignedSinglePageWholeChunkReader( + timeChunk, pendingAlignedChunkGroup.valueChunkList, null); currentIsAligned = true; - lastMarker = marker; + if (marker != Byte.MIN_VALUE) { + lastMarker = marker; + } return true; } return false; } + private boolean shouldReturnPendingAlignedChunkBeforeCaching( + final CachedAlignedValueChunk valueChunk) throws IOException { + validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex); + + final PendingAlignedChunkGroup pendingAlignedChunkGroup = + findPendingAlignedChunkGroup(valueChunk.timeChunkIndex); + final boolean isFirstValueChunkInGroup = + Objects.isNull(pendingAlignedChunkGroup) + || pendingAlignedChunkGroup.valueChunkList.isEmpty(); + final long timeChunkSize = + Objects.isNull(pendingAlignedChunkGroup) + ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed( + timeChunkList.get(valueChunk.timeChunkIndex)) + : 0; + final long chunkSizeAfterCaching = + pendingAlignedChunkSize + timeChunkSize + valueChunk.valueChunkSize; + + if (isFirstValueChunkInGroup) { + final long firstValueChunkGroupSize = + timeChunkSize + + (Objects.isNull(pendingAlignedChunkGroup) ? 0 : pendingAlignedChunkGroup.chunkSize) + + valueChunk.valueChunkSize; + if (firstValueChunkGroupSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + return !pendingAlignedChunkGroups.isEmpty(); + } + } + + if (!pendingAlignedChunkGroups.isEmpty() + && chunkSizeAfterCaching > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + return true; + } + + final long timeChunkPageMemorySize = timeChunkPageMemorySizeList.get(valueChunk.timeChunkIndex); + if (timeChunkPageMemorySize <= 0 || valueChunk.valueChunkPageMemorySize <= 0) { + return false; + } + + final long pageMemorySizeAfterCaching = + timeChunkPageMemorySize + + (Objects.isNull(pendingAlignedChunkGroup) + ? 0 + : pendingAlignedChunkGroup.valueChunkPageMemorySize) + + valueChunk.valueChunkPageMemorySize; + if (pageMemorySizeAfterCaching <= getPageDataMemoryLimitInBytes()) { + return false; + } + + if (isFirstValueChunkInGroup) { + final long firstValueChunkPageMemorySize = + timeChunkPageMemorySize + valueChunk.valueChunkPageMemorySize; + return firstValueChunkPageMemorySize <= getPageDataMemoryLimitInBytes() + || !pendingAlignedChunkGroups.isEmpty(); + } + + return true; + } + + private boolean returnPendingAlignedChunkBeforeCaching(final CachedAlignedValueChunk valueChunk) + throws IOException { + if (!shouldReturnPendingAlignedChunkBeforeCaching(valueChunk)) { + return false; + } + + cachedAlignedValueChunk = valueChunk; + if (recordPendingAlignedChunk(Byte.MIN_VALUE)) { + return true; + } + cachedAlignedValueChunk = null; + return false; + } + + private void cacheAlignedValueChunk(final CachedAlignedValueChunk valueChunk) throws IOException { + validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex); + + final PendingAlignedChunkGroup pendingAlignedChunkGroup = + getOrCreatePendingAlignedChunkGroup(valueChunk.timeChunkIndex); + resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup, valueChunk); + resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup, valueChunk); + + pendingAlignedChunkGroup.valueChunkList.add(valueChunk.chunk); + pendingAlignedChunkGroup.chunkSize += valueChunk.valueChunkSize; + pendingAlignedChunkGroup.valueChunkPageMemorySize += valueChunk.valueChunkPageMemorySize; + pendingAlignedChunkSize += valueChunk.valueChunkSize; + + final ChunkHeader chunkHeader = valueChunk.chunk.getHeader(); + final String measurementID = tabletStringInternPool.intern(chunkHeader.getMeasurementID()); + pendingAlignedChunkGroup.measurements.add( + new MeasurementSchema( + measurementID, + chunkHeader.getDataType(), + chunkHeader.getEncodingType(), + chunkHeader.getCompressionType())); + pendingAlignedChunkGroup.modsInfos.addAll( + ModsOperationUtil.initializeMeasurementMods( + currentDevice, Collections.singletonList(measurementID), currentModifications)); + } + + private PendingAlignedChunkGroup getOrCreatePendingAlignedChunkGroup(final int timeChunkIndex) { + final PendingAlignedChunkGroup pendingAlignedChunkGroup = + findPendingAlignedChunkGroup(timeChunkIndex); + if (Objects.nonNull(pendingAlignedChunkGroup)) { + return pendingAlignedChunkGroup; + } + + final PendingAlignedChunkGroup newPendingAlignedChunkGroup = + new PendingAlignedChunkGroup( + timeChunkIndex, + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(timeChunkIndex)), + timeChunkPageMemorySizeList.get(timeChunkIndex)); + pendingAlignedChunkSize += newPendingAlignedChunkGroup.chunkSize; + + for (int i = 0; i < pendingAlignedChunkGroups.size(); ++i) { + if (pendingAlignedChunkGroups.get(i).timeChunkIndex > timeChunkIndex) { + pendingAlignedChunkGroups.add(i, newPendingAlignedChunkGroup); + return newPendingAlignedChunkGroup; + } + } + pendingAlignedChunkGroups.add(newPendingAlignedChunkGroup); + return newPendingAlignedChunkGroup; + } + + private PendingAlignedChunkGroup findPendingAlignedChunkGroup(final int timeChunkIndex) { + for (final PendingAlignedChunkGroup pendingAlignedChunkGroup : pendingAlignedChunkGroups) { + if (pendingAlignedChunkGroup.timeChunkIndex == timeChunkIndex) { + return pendingAlignedChunkGroup; + } + } + return null; + } + + private void validateAlignedValueChunkTimeIndex(final int timeChunkIndex) throws IOException { + if (timeChunkIndex < 0 || timeChunkIndex >= timeChunkList.size()) { + throw new IOException( + String.format( + "Invalid aligned value chunk index %d, while there are %d time chunks.", + timeChunkIndex, timeChunkList.size())); + } + } + + private void clearCachedAlignedChunkData() { + pendingAlignedChunkGroups.clear(); + pendingAlignedChunkSize = 0; + cachedAlignedValueChunk = null; + timeChunkList.clear(); + isMultiPageList.clear(); + timeChunkPageMemorySizeList.clear(); + measurementIndexMap.clear(); + } + private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit( - final List<Chunk> valueChunkList, final ChunkHeader valueChunkHeader) { - if (!valueChunkList.isEmpty() || lastIndex < 0) { + final PendingAlignedChunkGroup pendingAlignedChunkGroup, + final CachedAlignedValueChunk valueChunk) { + if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) { return; } - final long chunkSize = - PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex)) - + valueChunkHeader.getDataSize(); + final long chunkSize = pendingAlignedChunkGroup.chunkSize + valueChunk.valueChunkSize; if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, chunkSize); } } private void resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit( - final List<Chunk> valueChunkList, final long valueChunkPageMemorySize) { - if (!valueChunkList.isEmpty() || lastIndex < 0 || valueChunkPageMemorySize <= 0) { + final PendingAlignedChunkGroup pendingAlignedChunkGroup, + final CachedAlignedValueChunk valueChunk) { + if (!pendingAlignedChunkGroup.valueChunkList.isEmpty() + || pendingAlignedChunkGroup.timeChunkPageMemorySize <= 0 + || valueChunk.valueChunkPageMemorySize <= 0) { return; } - final long timeChunkPageMemorySize = timeChunkPageMemorySizeList.get(lastIndex); - if (timeChunkPageMemorySize <= 0) { - return; - } - - final long pageMemorySize = timeChunkPageMemorySize + valueChunkPageMemorySize; + final long pageMemorySize = + pendingAlignedChunkGroup.timeChunkPageMemorySize + valueChunk.valueChunkPageMemorySize; if (pageMemorySize > getPageDataMemoryLimitInBytes()) { PipeDataNodeResourceManager.memory() .forceResize(allocatedMemoryBlockForBatchData, pageMemorySize); @@ -980,4 +1098,41 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { } return null; } + + private static class PendingAlignedChunkGroup { + + private final int timeChunkIndex; + private final List<Chunk> valueChunkList = new ArrayList<>(); + private final List<IMeasurementSchema> measurements = new ArrayList<>(); + private final List<ModsOperationUtil.ModsInfo> modsInfos = new ArrayList<>(); + private final long timeChunkPageMemorySize; + private long chunkSize; + private long valueChunkPageMemorySize; + + private PendingAlignedChunkGroup( + final int timeChunkIndex, final long timeChunkSize, final long timeChunkPageMemorySize) { + this.timeChunkIndex = timeChunkIndex; + this.chunkSize = timeChunkSize; + this.timeChunkPageMemorySize = timeChunkPageMemorySize; + } + } + + private static class CachedAlignedValueChunk { + + private final int timeChunkIndex; + private final Chunk chunk; + private final long valueChunkSize; + private final long valueChunkPageMemorySize; + + private CachedAlignedValueChunk( + final int timeChunkIndex, + final Chunk chunk, + final long valueChunkSize, + final long valueChunkPageMemorySize) { + this.timeChunkIndex = timeChunkIndex; + this.chunk = chunk; + this.valueChunkSize = valueChunkSize; + this.valueChunkPageMemorySize = valueChunkPageMemorySize; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java index 04eff0067e5..33ceacf278c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java @@ -191,6 +191,14 @@ public class PipeMemoryWeightUtil { return new Pair<>(1, 0); } + final int configuredTabletRowSize = + PipeConfig.getInstance().getPipeDataStructureTabletRowSize(); + final boolean hasTabletRowSizeLimit = configuredTabletRowSize > 0; + final double inputSizeLimit = + hasTabletRowSizeLimit && inputNum > 0 + ? 100 + inputNum * (double) rowBytesUsed * 1.2 + : Integer.MAX_VALUE; + // Calculate row number according to the max size of a pipe tablet. "100" is the estimated size // of other data structures in a pipe tablet. // "*8" converts bytes to bits, because the bitmap size is 1 bit per schema. @@ -198,17 +206,15 @@ public class PipeMemoryWeightUtil { (int) Math.min( IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), - Math.min(Integer.MAX_VALUE, 100 + inputNum * (double) rowBytesUsed * 1.2)); + Math.min(Integer.MAX_VALUE, inputSizeLimit)); int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount); rowNumber = Math.max(1, rowNumber); - if ( // This means the row number is larger than the max row count of a pipe tablet - rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) { + // This means the row number is larger than the max row count of a pipe tablet. + if (hasTabletRowSizeLimit && rowNumber > configuredTabletRowSize) { // Bound the row number, the memory cost is rowSize * rowNumber - return new Pair<>( - PipeConfig.getInstance().getPipeDataStructureTabletRowSize(), - rowBytesUsed * PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); + return new Pair<>(configuredTabletRowSize, rowBytesUsed * configuredTabletRowSize); } else { return new Pair<>(rowNumber, sizeLimit); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 50109b935c3..626b01c84e0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -36,6 +36,8 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionE import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; +import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; @@ -55,6 +57,7 @@ import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.BatchData; import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.Path; import org.apache.tsfile.read.common.TimeRange; @@ -63,9 +66,11 @@ import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsFileGeneratorUtils; import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.writer.TsFileIOWriter; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -436,6 +441,100 @@ public class TsFileInsertionEventParserTest { } } + @Test + public void testScanParserMergesBatchedAlignedValueChunkGroups() throws Exception { + final long originalPipeMaxReaderChunkSize = + CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize(); + final int originalPipeDataStructureTabletRowSize = + CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize(); + + final int measurementCount = 20; + final int batchSize = 10; + final int rowCount = 4; + final File sourceTsFile = new File("aligned-source-for-batched-layout.tsfile"); + alignedTsFile = new File("aligned-batched-layout.tsfile"); + + try { + CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(1024 * 1024L); + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0); + + final List<IMeasurementSchema> schemaList = new ArrayList<>(); + for (int i = 0; i < measurementCount; ++i) { + schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64)); + } + + writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount); + rewriteAlignedTsFileWithBatchedValueChunks( + sourceTsFile, alignedTsFile, measurementCount, batchSize); + + int tabletCount = 0; + int pointCount = 0; + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + alignedTsFile, + new PrefixTreePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + for (final Pair<Tablet, Boolean> tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + Assert.assertTrue(tabletWithIsAligned.getRight()); + final Tablet tablet = tabletWithIsAligned.getLeft(); + ++tabletCount; + Assert.assertEquals(measurementCount, tablet.getSchemas().size()); + pointCount += getNonNullSize(tablet); + } + } + + Assert.assertEquals(measurementCount * rowCount, pointCount); + Assert.assertTrue(tabletCount > 0); + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize); + sourceTsFile.delete(); + } + } + + @Test + public void testPipeTabletRowSizeCanBeDisabledByNonPositiveValue() { + final int originalPipeDataStructureTabletRowSize = + CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize(); + final int originalPipeDataStructureTabletSizeInBytes = + CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(); + + try { + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(1024 * 1024); + + final BatchData batchData = new BatchData(TSDataType.INT64); + for (int i = 0; i < 1000; ++i) { + batchData.putAnObject(i, (long) i); + } + + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(2); + final int rowCountWithLimit = + PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft(); + + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0); + final int rowCountWithoutLimit = + PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft(); + + Assert.assertEquals(2, rowCountWithLimit); + Assert.assertTrue(rowCountWithoutLimit > rowCountWithLimit); + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize); + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletSizeInBytes(originalPipeDataStructureTabletSizeInBytes); + } + } + @Test public void testQueryParserSkipsUnnecessaryBitMaps() throws Exception { testTreeParserSkipsUnnecessaryBitMaps(true); @@ -1887,6 +1986,102 @@ public class TsFileInsertionEventParserTest { } } + private void writeAlignedSourceTsFile( + final File tsFile, final List<IMeasurementSchema> schemaList, final int rowCount) + throws IOException { + if (tsFile.exists()) { + Assert.assertTrue(tsFile.delete()); + } + Assert.assertEquals(0, rowCount % 2); + + final IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg.d"); + try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) { + writer.startChunkGroup(deviceID); + final int rowCountPerChunk = rowCount / 2; + for (int chunkIndex = 0; chunkIndex < 2; ++chunkIndex) { + final AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(schemaList); + for (int row = 0; row < rowCountPerChunk; ++row) { + final long time = (long) chunkIndex * rowCountPerChunk + row; + alignedChunkWriter.getTimeChunkWriter().write(time); + for (int measurementIndex = 0; measurementIndex < schemaList.size(); ++measurementIndex) { + alignedChunkWriter + .getValueChunkWriterByIndex(measurementIndex) + .write(time, time * 100 + measurementIndex, false); + } + } + alignedChunkWriter.writeToFileWriter(writer); + } + writer.endChunkGroup(); + writer.endFile(); + } + } + + private void rewriteAlignedTsFileWithBatchedValueChunks( + final File sourceTsFile, + final File targetTsFile, + final int measurementCount, + final int batchSize) + throws Exception { + if (targetTsFile.exists()) { + Assert.assertTrue(targetTsFile.delete()); + } + + try (final TsFileSequenceReader reader = + new TsFileSequenceReader(sourceTsFile.getAbsolutePath())) { + final IDeviceID deviceID = reader.getDeviceMeasurementsMap().keySet().iterator().next(); + final List<AbstractAlignedChunkMetadata> sourceAlignedChunkMetadataList = + reader.getAlignedChunkMetadata(deviceID, true); + Assert.assertEquals(2, sourceAlignedChunkMetadataList.size()); + for (final AbstractAlignedChunkMetadata sourceAlignedChunkMetadata : + sourceAlignedChunkMetadataList) { + Assert.assertEquals( + measurementCount, sourceAlignedChunkMetadata.getValueChunkMetadataList().size()); + } + + try (final CompactionTsFileWriter writer = + new CompactionTsFileWriter( + targetTsFile, Long.MAX_VALUE, CompactionType.INNER_SEQ_COMPACTION)) { + writer.startChunkGroup(deviceID); + writer.markStartingWritingAligned(); + for (final AbstractAlignedChunkMetadata sourceAlignedChunkMetadata : + sourceAlignedChunkMetadataList) { + final ChunkMetadata timeChunkMetadata = + (ChunkMetadata) sourceAlignedChunkMetadata.getTimeChunkMetadata(); + writer.writeChunk(reader.readMemChunk(timeChunkMetadata), timeChunkMetadata); + } + + for (int start = 0; start < measurementCount; start += batchSize) { + writeValueChunkBatch( + reader, + writer, + sourceAlignedChunkMetadataList, + start, + Math.min(start + batchSize, measurementCount)); + } + writer.markEndingWritingAligned(); + writer.endChunkGroup(); + writer.endFile(); + } + } + } + + private void writeValueChunkBatch( + final TsFileSequenceReader reader, + final CompactionTsFileWriter writer, + final List<AbstractAlignedChunkMetadata> alignedChunkMetadataList, + final int start, + final int end) + throws IOException { + for (final AbstractAlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) { + final List<IChunkMetadata> valueChunkMetadataList = + alignedChunkMetadata.getValueChunkMetadataList(); + for (int index = start; index < end; ++index) { + final ChunkMetadata valueChunkMetadata = (ChunkMetadata) valueChunkMetadataList.get(index); + writer.writeChunk(reader.readMemChunk(valueChunkMetadata), valueChunkMetadata); + } + } + } + private static class ParserPerformanceStats { private long pointCount;
