This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f96fc5824f6 Pipe: merge batched aligned chunks in scan parser (#18010)
f96fc5824f6 is described below
commit f96fc5824f62b4c637c0d9b5e9ea4adc9f8b1853
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 26 14:19:52 2026 +0800
Pipe: merge batched aligned chunks in scan parser (#18010)
* Pipe: merge batched aligned chunks in scan parser
* Test pipe batched aligned chunk memory boundaries
* Pipe: fix batched aligned scan parser memory split
* Update TsFileInsertionEventParserTest.java
* Rename pending aligned chunk consumer
---
.../parser/scan/SinglePageWholeChunkReader.java | 38 +-
.../scan/TsFileInsertionEventScanParser.java | 383 +++++++++++++++------
.../pipe/resource/memory/PipeMemoryWeightUtil.java | 18 +-
.../pipe/event/TsFileInsertionEventParserTest.java | 343 +++++++++++++++++-
4 files changed, 662 insertions(+), 120 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
index 1be5565b394..075e47296ce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.encrypt.EncryptParameter;
@@ -169,22 +170,49 @@ public class SinglePageWholeChunkReader extends
AbstractChunkReader
static long estimatePageMemoryUsageInBytesWithBatchData(
final PageHeader timePageHeader,
final Chunk timeChunk,
- final List<TSDataType> valueDataTypeList) {
+ final List<TSDataType> valueDataTypeList)
+ throws IOException {
return estimatePageMemoryUsageInBytesWithBatchData(
timePageHeader.getUncompressedSize(),
getPageRowCount(timePageHeader, timeChunk),
valueDataTypeList);
}
- static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) {
+ static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk)
throws IOException {
if (isSinglePageChunk(chunk.getHeader())) {
- return Objects.isNull(chunk.getChunkStatistic())
- ? 0
- : saturateToInt(chunk.getChunkStatistic().getCount());
+ if (Objects.nonNull(chunk.getChunkStatistic())) {
+ return saturateToInt(chunk.getChunkStatistic().getCount());
+ }
+ return isTimeChunk(chunk.getHeader()) ? countSinglePageTimeValues(chunk)
: 0;
}
return saturateToInt(pageHeader.getNumOfValues());
}
+ private static int countSinglePageTimeValues(final Chunk chunk) throws
IOException {
+ final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+ final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer,
chunk.getHeader());
+ final ByteBuffer pageData =
+ deserializePageData(
+ pageHeader,
+ chunkDataBuffer,
+ chunk.getHeader(),
+ IDecryptor.getDecryptor(chunk.getEncryptParam()));
+ final Decoder decoder =
+ Decoder.getDecoderByType(chunk.getHeader().getEncodingType(),
TSDataType.INT64);
+
+ int rowCount = 0;
+ while (decoder.hasNext(pageData)) {
+ decoder.readLong(pageData);
+ ++rowCount;
+ }
+ return rowCount;
+ }
+
+ private static boolean isTimeChunk(final ChunkHeader chunkHeader) {
+ return (chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+ == TsFileConstant.TIME_COLUMN_MASK;
+ }
+
private static int saturateToInt(final long value) {
return (int) Math.min(Integer.MAX_VALUE, value);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index 4c5cd75d4c1..e3af5aaa0c1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -72,6 +72,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -103,11 +104,11 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
// Cached time chunk
private final List<Chunk> timeChunkList = new ArrayList<>();
private final List<Boolean> isMultiPageList = new ArrayList<>();
- private final List<Long> timeChunkPageMemorySizeList = new ArrayList<>();
private final Map<String, Integer> measurementIndexMap = new HashMap<>();
- private int lastIndex = -1;
- private Chunk firstChunk4NextSequentialValueChunks;
+ private final List<PendingAlignedChunkGroup> pendingAlignedChunkGroups = new
ArrayList<>();
+ private long pendingAlignedChunkSize;
+ private CachedAlignedValueChunk cachedAlignedValueChunk;
private byte lastMarker = Byte.MIN_VALUE;
@@ -588,14 +589,14 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
private void moveToNextChunkReader()
throws IOException, IllegalStateException, IllegalPathException {
ChunkHeader chunkHeader;
- long valueChunkSize = 0;
- long valueChunkPageMemorySize = 0;
- final List<Chunk> valueChunkList = new ArrayList<>();
currentMeasurements.clear();
modsInfos.clear();
if (lastMarker == MetaMarker.SEPARATOR) {
- chunkReader = null;
+ if (!useNextPendingAlignedChunk(lastMarker)) {
+ clearCachedAlignedChunkData();
+ chunkReader = null;
+ }
return;
}
@@ -603,8 +604,8 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
while ((marker =
lastMarker != Byte.MIN_VALUE
? lastMarker
- : Objects.nonNull(firstChunk4NextSequentialValueChunks)
- ?
toValueChunkMarker(firstChunk4NextSequentialValueChunks.getHeader())
+ : Objects.nonNull(cachedAlignedValueChunk)
+ ?
toValueChunkMarker(cachedAlignedValueChunk.chunk.getHeader())
: tsFileSequenceReader.readMarker())
!= MetaMarker.SEPARATOR) {
lastMarker = Byte.MIN_VALUE;
@@ -658,9 +659,8 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
case MetaMarker.VALUE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
{
- Chunk chunk;
- long currentValueChunkPageMemorySize = 0;
- if (Objects.isNull(firstChunk4NextSequentialValueChunks)) {
+ CachedAlignedValueChunk valueChunk = cachedAlignedValueChunk;
+ if (Objects.isNull(valueChunk)) {
final long currentChunkHeaderOffset =
tsFileSequenceReader.position() - 1;
chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
@@ -668,7 +668,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
break;
}
- // Increase value index
+ // Increase value index.
final String measurementID =
tabletStringInternPool.intern(chunkHeader.getMeasurementID());
final int valueIndex =
@@ -676,86 +676,32 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
measurementID,
(measurement, index) -> Objects.nonNull(index) ? index +
1 : 0);
- // Emit when encountered non-sequential value chunk, or the
chunk size exceeds
- // certain value to avoid OOM
- // Do not record or end current value chunks when there are
empty chunks
+ // Do not record or end current value chunks when there are
empty chunks.
if (chunkHeader.getDataSize() == 0) {
break;
}
- chunk =
+ final Chunk chunk =
new Chunk(
chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
- currentValueChunkPageMemorySize =
calculateMaxPageMemorySize(chunk);
- boolean needReturn = false;
- final long timeChunkSize =
- lastIndex >= 0
- ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
- timeChunkList.get(lastIndex))
- : 0;
- final long timeChunkPageMemorySize =
- lastIndex >= 0 ? timeChunkPageMemorySizeList.get(lastIndex)
: 0;
- if (lastIndex >= 0) {
- if (valueIndex != lastIndex) {
- needReturn = recordAlignedChunk(valueChunkList, marker);
- } else {
- final long chunkSize = timeChunkSize + valueChunkSize;
- final long pageMemorySize = timeChunkPageMemorySize +
valueChunkPageMemorySize;
- if (chunkSize + chunkHeader.getDataSize()
- >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()
- || timeChunkPageMemorySize > 0
- && currentValueChunkPageMemorySize > 0
- && pageMemorySize + currentValueChunkPageMemorySize
- > getPageDataMemoryLimitInBytes()) {
- needReturn = recordAlignedChunk(valueChunkList, marker);
- }
- }
- }
- lastIndex = valueIndex;
- if (needReturn) {
- firstChunk4NextSequentialValueChunks = chunk;
- return;
- }
-
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList,
chunkHeader);
- resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
- valueChunkList, currentValueChunkPageMemorySize);
+ valueChunk =
+ new CachedAlignedValueChunk(valueIndex, chunk,
chunkHeader.getDataSize());
} else {
- chunk = firstChunk4NextSequentialValueChunks;
- chunkHeader = chunk.getHeader();
- firstChunk4NextSequentialValueChunks = null;
- currentValueChunkPageMemorySize =
calculateMaxPageMemorySize(chunk);
-
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList,
chunkHeader);
- resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
- valueChunkList, currentValueChunkPageMemorySize);
+ cachedAlignedValueChunk = null;
}
- valueChunkSize += chunkHeader.getDataSize();
- valueChunkPageMemorySize += currentValueChunkPageMemorySize;
- valueChunkList.add(chunk);
- final String measurementID =
- tabletStringInternPool.intern(chunkHeader.getMeasurementID());
- currentMeasurements.add(
- new MeasurementSchema(
- measurementID,
- chunkHeader.getDataType(),
- chunkHeader.getEncodingType(),
- chunkHeader.getCompressionType()));
- modsInfos.addAll(
- ModsOperationUtil.initializeMeasurementMods(
- currentDevice, Collections.singletonList(measurementID),
currentModifications));
+ if (returnPendingAlignedChunkBeforeCaching(valueChunk)) {
+ return;
+ }
+ cacheAlignedValueChunk(valueChunk);
break;
}
case MetaMarker.CHUNK_GROUP_HEADER:
{
- // Return before "currentDevice" changes
- if (recordAlignedChunk(valueChunkList, marker)) {
+ // Return before "currentDevice" changes.
+ if (useNextPendingAlignedChunk(marker)) {
return;
}
- // Clear because the cached data will never be used in the next
chunk group
- lastIndex = -1;
- timeChunkList.clear();
- isMultiPageList.clear();
- timeChunkPageMemorySizeList.clear();
- measurementIndexMap.clear();
+ clearCachedAlignedChunkData();
final IDeviceID deviceID =
tsFileSequenceReader.readChunkGroupHeader().getDeviceID();
currentDevice = treePattern.mayOverlapWithDevice(deviceID) ?
deviceID : null;
currentDeviceString =
@@ -775,7 +721,8 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
}
lastMarker = marker;
- if (!recordAlignedChunk(valueChunkList, marker)) {
+ if (!useNextPendingAlignedChunk(marker)) {
+ clearCachedAlignedChunkData();
chunkReader = null;
}
}
@@ -784,6 +731,10 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
return PipeConfig.getInstance().getPipeMaxReaderChunkSize();
}
+ private long getChunkMemoryLimitInBytes() {
+ return PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+ }
+
private boolean filterChunk(
final long currentChunkHeaderOffset,
final ChunkHeader chunkHeader,
@@ -805,8 +756,6 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
timeChunkList.add(timeChunk);
final boolean isMultiPage = marker == MetaMarker.TIME_CHUNK_HEADER;
isMultiPageList.add(isMultiPage);
- timeChunkPageMemorySizeList.add(
-
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(timeChunk));
return true;
}
}
@@ -859,22 +808,38 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
return false;
}
- private boolean recordAlignedChunk(final List<Chunk> valueChunkList, final
byte marker)
- throws IOException {
- if (!valueChunkList.isEmpty()) {
- final Chunk timeChunk = timeChunkList.get(lastIndex);
+ private boolean useNextPendingAlignedChunk(final byte marker) throws
IOException {
+ while (!pendingAlignedChunkGroups.isEmpty()) {
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup =
pendingAlignedChunkGroups.remove(0);
+ pendingAlignedChunkSize =
+ Math.max(0, pendingAlignedChunkSize -
pendingAlignedChunkGroup.chunkSize);
+
+ if (pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
+ continue;
+ }
+
+ final Chunk timeChunk =
timeChunkList.get(pendingAlignedChunkGroup.timeChunkIndex);
timeChunk.getData().rewind();
- currentIsMultiPage = isMultiPageList.get(lastIndex);
+ for (final Chunk valueChunk : pendingAlignedChunkGroup.valueChunkList) {
+ valueChunk.getData().rewind();
+ }
+
+ currentMeasurements.clear();
+ currentMeasurements.addAll(pendingAlignedChunkGroup.measurements);
+ modsInfos.clear();
+ modsInfos.addAll(pendingAlignedChunkGroup.modsInfos);
+
+ currentIsMultiPage =
isMultiPageList.get(pendingAlignedChunkGroup.timeChunkIndex);
if (!currentIsMultiPage) {
resizePageDataMemoryIfNeeded(
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
- timeChunk, valueChunkList));
+ timeChunk, pendingAlignedChunkGroup.valueChunkList));
}
final List<Long> pageEstimatedMemoryUsageInBytesList =
currentIsMultiPage
? AlignedSinglePageWholeChunkReader
.calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
- timeChunk, valueChunkList)
+ timeChunk, pendingAlignedChunkGroup.valueChunkList)
: Collections.emptyList();
final long maxPageEstimatedMemoryUsageInBytes =
pageEstimatedMemoryUsageInBytesList.isEmpty()
@@ -884,50 +849,226 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
chunkReader =
currentIsMultiPage
? new MemoryControlledChunkReader(
- new AlignedChunkReader(timeChunk, valueChunkList, filter),
+ new AlignedChunkReader(
+ timeChunk, pendingAlignedChunkGroup.valueChunkList,
filter),
pageEstimatedMemoryUsageInBytesList)
- : new AlignedSinglePageWholeChunkReader(timeChunk,
valueChunkList, null);
+ : new AlignedSinglePageWholeChunkReader(
+ timeChunk, pendingAlignedChunkGroup.valueChunkList, null);
currentIsAligned = true;
- lastMarker = marker;
+ if (marker != Byte.MIN_VALUE) {
+ lastMarker = marker;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private boolean shouldReturnPendingAlignedChunkBeforeCaching(
+ final CachedAlignedValueChunk valueChunk) throws IOException {
+ validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex);
+
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+ findPendingAlignedChunkGroup(valueChunk.timeChunkIndex);
+ final boolean isFirstValueChunkInGroup =
+ Objects.isNull(pendingAlignedChunkGroup)
+ || pendingAlignedChunkGroup.valueChunkList.isEmpty();
+ final long timeChunkSize =
+ Objects.isNull(pendingAlignedChunkGroup)
+ ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
+ timeChunkList.get(valueChunk.timeChunkIndex))
+ : 0;
+ final long chunkSizeAfterCaching =
+ pendingAlignedChunkSize + timeChunkSize + valueChunk.valueChunkSize;
+
+ if (isFirstValueChunkInGroup) {
+ final long firstValueChunkGroupSize =
+ timeChunkSize
+ + (Objects.isNull(pendingAlignedChunkGroup) ? 0 :
pendingAlignedChunkGroup.chunkSize)
+ + valueChunk.valueChunkSize;
+ if (firstValueChunkGroupSize > getChunkMemoryLimitInBytes()) {
+ return !pendingAlignedChunkGroups.isEmpty();
+ }
+ }
+
+ if (!pendingAlignedChunkGroups.isEmpty()
+ && chunkSizeAfterCaching > getChunkMemoryLimitInBytes()) {
+ return true;
+ }
+
+ final long pageMemorySizeAfterCaching =
+ calculateMaxAlignedPageMemorySizeWithBatchData(
+ valueChunk.timeChunkIndex, pendingAlignedChunkGroup, valueChunk);
+ return pageMemorySizeAfterCaching > getPageDataMemoryLimitInBytes()
+ && (!isFirstValueChunkInGroup || !pendingAlignedChunkGroups.isEmpty());
+ }
+
+ private boolean returnPendingAlignedChunkBeforeCaching(final
CachedAlignedValueChunk valueChunk)
+ throws IOException {
+ if (!shouldReturnPendingAlignedChunkBeforeCaching(valueChunk)) {
+ return false;
+ }
+
+ cachedAlignedValueChunk = valueChunk;
+ if (useNextPendingAlignedChunk(Byte.MIN_VALUE)) {
return true;
}
+ cachedAlignedValueChunk = null;
return false;
}
+ private void cacheAlignedValueChunk(final CachedAlignedValueChunk
valueChunk) throws IOException {
+ validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex);
+
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+ getOrCreatePendingAlignedChunkGroup(valueChunk.timeChunkIndex);
+
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup,
valueChunk);
+
resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup,
valueChunk);
+
+ pendingAlignedChunkGroup.valueChunkList.add(valueChunk.chunk);
+ pendingAlignedChunkGroup.chunkSize += valueChunk.valueChunkSize;
+ pendingAlignedChunkSize += valueChunk.valueChunkSize;
+
+ final ChunkHeader chunkHeader = valueChunk.chunk.getHeader();
+ final String measurementID =
tabletStringInternPool.intern(chunkHeader.getMeasurementID());
+ pendingAlignedChunkGroup.measurements.add(
+ new MeasurementSchema(
+ measurementID,
+ chunkHeader.getDataType(),
+ chunkHeader.getEncodingType(),
+ chunkHeader.getCompressionType()));
+ pendingAlignedChunkGroup.modsInfos.addAll(
+ ModsOperationUtil.initializeMeasurementMods(
+ currentDevice, Collections.singletonList(measurementID),
currentModifications));
+ }
+
+ private PendingAlignedChunkGroup getOrCreatePendingAlignedChunkGroup(final
int timeChunkIndex) {
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+ findPendingAlignedChunkGroup(timeChunkIndex);
+ if (Objects.nonNull(pendingAlignedChunkGroup)) {
+ return pendingAlignedChunkGroup;
+ }
+
+ final PendingAlignedChunkGroup newPendingAlignedChunkGroup =
+ new PendingAlignedChunkGroup(
+ timeChunkIndex,
+
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(timeChunkIndex)));
+ pendingAlignedChunkSize += newPendingAlignedChunkGroup.chunkSize;
+
+ for (int i = 0; i < pendingAlignedChunkGroups.size(); ++i) {
+ if (pendingAlignedChunkGroups.get(i).timeChunkIndex > timeChunkIndex) {
+ pendingAlignedChunkGroups.add(i, newPendingAlignedChunkGroup);
+ return newPendingAlignedChunkGroup;
+ }
+ }
+ pendingAlignedChunkGroups.add(newPendingAlignedChunkGroup);
+ return newPendingAlignedChunkGroup;
+ }
+
+ private PendingAlignedChunkGroup findPendingAlignedChunkGroup(final int
timeChunkIndex) {
+ for (final PendingAlignedChunkGroup pendingAlignedChunkGroup :
pendingAlignedChunkGroups) {
+ if (pendingAlignedChunkGroup.timeChunkIndex == timeChunkIndex) {
+ return pendingAlignedChunkGroup;
+ }
+ }
+ return null;
+ }
+
+ private void validateAlignedValueChunkTimeIndex(final int timeChunkIndex)
throws IOException {
+ if (timeChunkIndex < 0 || timeChunkIndex >= timeChunkList.size()) {
+ throw new IOException(
+ String.format(
+ "Invalid aligned value chunk index %d, while there are %d time
chunks.",
+ timeChunkIndex, timeChunkList.size()));
+ }
+ }
+
+ private void clearCachedAlignedChunkData() {
+ pendingAlignedChunkGroups.clear();
+ pendingAlignedChunkSize = 0;
+ cachedAlignedValueChunk = null;
+ timeChunkList.clear();
+ isMultiPageList.clear();
+ measurementIndexMap.clear();
+ }
+
private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(
- final List<Chunk> valueChunkList, final ChunkHeader valueChunkHeader) {
- if (!valueChunkList.isEmpty() || lastIndex < 0) {
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+ final CachedAlignedValueChunk valueChunk) {
+ if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
return;
}
- final long chunkSize =
-
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
- + valueChunkHeader.getDataSize();
+ final long chunkSize = pendingAlignedChunkGroup.chunkSize +
valueChunk.valueChunkSize;
if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk,
chunkSize);
}
}
private void resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
- final List<Chunk> valueChunkList, final long valueChunkPageMemorySize) {
- if (!valueChunkList.isEmpty() || lastIndex < 0 || valueChunkPageMemorySize
<= 0) {
- return;
- }
-
- final long timeChunkPageMemorySize =
timeChunkPageMemorySizeList.get(lastIndex);
- if (timeChunkPageMemorySize <= 0) {
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+ final CachedAlignedValueChunk valueChunk)
+ throws IOException {
+ if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
return;
}
- final long pageMemorySize = timeChunkPageMemorySize +
valueChunkPageMemorySize;
+ final long pageMemorySize =
+ calculateMaxAlignedPageMemorySizeWithBatchData(
+ pendingAlignedChunkGroup.timeChunkIndex, pendingAlignedChunkGroup,
valueChunk);
if (pageMemorySize > getPageDataMemoryLimitInBytes()) {
PipeDataNodeResourceManager.memory()
.forceResize(allocatedMemoryBlockForBatchData, pageMemorySize);
}
}
- private long calculateMaxPageMemorySize(final Chunk chunk) throws
IOException {
- return
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(chunk);
+ private long calculateMaxAlignedPageMemorySizeWithBatchData(
+ final int timeChunkIndex,
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+ final CachedAlignedValueChunk valueChunk)
+ throws IOException {
+ final List<Chunk> valueChunkList =
+ new ArrayList<>(
+ (Objects.isNull(pendingAlignedChunkGroup)
+ ? 0
+ : pendingAlignedChunkGroup.valueChunkList.size())
+ + 1);
+ if (Objects.nonNull(pendingAlignedChunkGroup)) {
+ valueChunkList.addAll(pendingAlignedChunkGroup.valueChunkList);
+ }
+ valueChunkList.add(valueChunk.chunk);
+
+ final Chunk timeChunk = timeChunkList.get(timeChunkIndex);
+ final int timeChunkDataPosition = timeChunk.getData().position();
+ final List<Integer> valueChunkDataPositions = new
ArrayList<>(valueChunkList.size());
+ for (final Chunk chunk : valueChunkList) {
+ valueChunkDataPositions.add(Objects.isNull(chunk) ? 0 :
chunk.getData().position());
+ }
+
+ rewindChunkData(timeChunk);
+ valueChunkList.forEach(this::rewindChunkData);
+ try {
+ return AlignedSinglePageWholeChunkReader
+ .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(timeChunk,
valueChunkList);
+ } finally {
+ timeChunk.getData().position(timeChunkDataPosition);
+ for (int i = 0; i < valueChunkList.size(); ++i) {
+ final Chunk chunk = valueChunkList.get(i);
+ if (Objects.nonNull(chunk)) {
+ chunk.getData().position(valueChunkDataPositions.get(i));
+ }
+ }
+ }
+ }
+
+ private void rewindChunkData(final Chunk chunk) {
+ if (Objects.isNull(chunk)) {
+ return;
+ }
+
+ final ByteBuffer chunkData = chunk.getData();
+ if (Objects.nonNull(chunkData)) {
+ chunkData.rewind();
+ }
}
private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
@@ -980,4 +1121,32 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
}
return null;
}
+
+ private static class PendingAlignedChunkGroup {
+
+ private final int timeChunkIndex;
+ private final List<Chunk> valueChunkList = new ArrayList<>();
+ private final List<IMeasurementSchema> measurements = new ArrayList<>();
+ private final List<ModsOperationUtil.ModsInfo> modsInfos = new
ArrayList<>();
+ private long chunkSize;
+
+ private PendingAlignedChunkGroup(final int timeChunkIndex, final long
timeChunkSize) {
+ this.timeChunkIndex = timeChunkIndex;
+ this.chunkSize = timeChunkSize;
+ }
+ }
+
+ private static class CachedAlignedValueChunk {
+
+ private final int timeChunkIndex;
+ private final Chunk chunk;
+ private final long valueChunkSize;
+
+ private CachedAlignedValueChunk(
+ final int timeChunkIndex, final Chunk chunk, final long
valueChunkSize) {
+ this.timeChunkIndex = timeChunkIndex;
+ this.chunk = chunk;
+ this.valueChunkSize = valueChunkSize;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index 04eff0067e5..33ceacf278c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -191,6 +191,14 @@ public class PipeMemoryWeightUtil {
return new Pair<>(1, 0);
}
+ final int configuredTabletRowSize =
+ PipeConfig.getInstance().getPipeDataStructureTabletRowSize();
+ final boolean hasTabletRowSizeLimit = configuredTabletRowSize > 0;
+ final double inputSizeLimit =
+ hasTabletRowSizeLimit && inputNum > 0
+ ? 100 + inputNum * (double) rowBytesUsed * 1.2
+ : Integer.MAX_VALUE;
+
// Calculate row number according to the max size of a pipe tablet. "100"
is the estimated size
// of other data structures in a pipe tablet.
// "*8" converts bytes to bits, because the bitmap size is 1 bit per
schema.
@@ -198,17 +206,15 @@ public class PipeMemoryWeightUtil {
(int)
Math.min(
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
- Math.min(Integer.MAX_VALUE, 100 + inputNum * (double)
rowBytesUsed * 1.2));
+ Math.min(Integer.MAX_VALUE, inputSizeLimit));
int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
rowNumber = Math.max(1, rowNumber);
- if ( // This means the row number is larger than the max row count of a
pipe tablet
- rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) {
+ // This means the row number is larger than the max row count of a pipe
tablet.
+ if (hasTabletRowSizeLimit && rowNumber > configuredTabletRowSize) {
// Bound the row number, the memory cost is rowSize * rowNumber
- return new Pair<>(
- PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
- rowBytesUsed *
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+ return new Pair<>(configuredTabletRowSize, rowBytesUsed *
configuredTabletRowSize);
} else {
return new Pair<>(rowNumber, sizeLimit);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 50109b935c3..91b6d7d56c5 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -36,6 +36,8 @@ import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionE
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -55,6 +57,7 @@ import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.common.TimeRange;
@@ -63,9 +66,11 @@ import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsFileGeneratorUtils;
import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -84,6 +89,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -436,6 +442,172 @@ public class TsFileInsertionEventParserTest {
}
}
+ @Test
+ public void testScanParserMergesBatchedAlignedValueChunkGroups() throws
Exception {
+ final long originalPipeMaxReaderChunkSize =
+ CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+ final int originalPipeDataStructureTabletRowSize =
+
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+
+ final int measurementCount = 20;
+ final int batchSize = 10;
+ final int rowCount = 4;
+ final File sourceTsFile = new
File("aligned-source-for-batched-layout.tsfile");
+ alignedTsFile = new File("aligned-batched-layout.tsfile");
+
+ try {
+
CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(1024 *
1024L);
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+
+ final List<IMeasurementSchema> schemaList = new ArrayList<>();
+ for (int i = 0; i < measurementCount; ++i) {
+ schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64));
+ }
+
+ writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount);
+ rewriteAlignedTsFileWithBatchedValueChunks(
+ sourceTsFile, alignedTsFile, measurementCount, batchSize);
+
+ int tabletCount = 0;
+ int pointCount = 0;
+ try (final TsFileInsertionEventScanParser parser =
+ new TsFileInsertionEventScanParser(
+ alignedTsFile,
+ new PrefixTreePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ false)) {
+ for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
+ Assert.assertTrue(tabletWithIsAligned.getRight());
+ final Tablet tablet = tabletWithIsAligned.getLeft();
+ ++tabletCount;
+ Assert.assertEquals(measurementCount, tablet.getSchemas().size());
+ Assert.assertEquals(rowCount / 2, tablet.getRowSize());
+ pointCount += getNonNullSize(tablet);
+ }
+ }
+
+ Assert.assertEquals(measurementCount * rowCount, pointCount);
+ Assert.assertEquals(2, tabletCount);
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+ CommonDescriptor.getInstance()
+ .getConfig()
+
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+ sourceTsFile.delete();
+ }
+ }
+
+ @Test
+ public void
testScanParserFlushesBatchedAlignedValueChunkGroupsByMemoryLimit() throws
Exception {
+ final long originalPipeMaxReaderChunkSize =
+ CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+ final int originalPipeDataStructureTabletRowSize =
+
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+
+ final int measurementCount = 20;
+ final int batchSize = 10;
+ final int rowCount = 4;
+ final File sourceTsFile = new
File("aligned-source-for-batched-layout-memory-limit.tsfile");
+ alignedTsFile = new File("aligned-batched-layout-memory-limit.tsfile");
+
+ try {
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+
+ final List<IMeasurementSchema> schemaList = new ArrayList<>();
+ for (int i = 0; i < measurementCount; ++i) {
+ schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64));
+ }
+
+ writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount);
+ rewriteAlignedTsFileWithBatchedValueChunks(
+ sourceTsFile, alignedTsFile, measurementCount, batchSize);
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(
+
calculateFirstBatchedAlignedValueChunkGroupMemoryLimit(alignedTsFile,
batchSize));
+
+ int tabletCount = 0;
+ int maxMeasurementCount = 0;
+ int pointCount = 0;
+ try (final TsFileInsertionEventScanParser parser =
+ new TsFileInsertionEventScanParser(
+ alignedTsFile,
+ new PrefixTreePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ false)) {
+ for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
+ Assert.assertTrue(tabletWithIsAligned.getRight());
+ final Tablet tablet = tabletWithIsAligned.getLeft();
+ ++tabletCount;
+ maxMeasurementCount = Math.max(maxMeasurementCount,
tablet.getSchemas().size());
+ Assert.assertTrue(tablet.getSchemas().size() <= batchSize);
+ Assert.assertEquals(rowCount / 2, tablet.getRowSize());
+ pointCount += getNonNullSize(tablet);
+ }
+ }
+
+ Assert.assertEquals(batchSize, maxMeasurementCount);
+ Assert.assertEquals(measurementCount * rowCount, pointCount);
+ Assert.assertEquals(measurementCount / batchSize * 2, tabletCount);
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+ CommonDescriptor.getInstance()
+ .getConfig()
+
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+ sourceTsFile.delete();
+ }
+ }
+
+ @Test
+ public void testPipeTabletRowSizeCanBeDisabledByNonPositiveValue() {
+ final int originalPipeDataStructureTabletRowSize =
+
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+ final int originalPipeDataStructureTabletSizeInBytes =
+
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes();
+
+ try {
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(1024
* 1024);
+
+ final BatchData batchData = new BatchData(TSDataType.INT64);
+ for (int i = 0; i < 1000; ++i) {
+ batchData.putAnObject(i, (long) i);
+ }
+
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(2);
+ final int rowCountWithLimit =
+
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+ final int rowCountWithoutLimit =
+
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(-1);
+ final int rowCountWithNegativeLimit =
+
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+ Assert.assertEquals(rowCountWithoutLimit, rowCountWithNegativeLimit);
+ Assert.assertEquals(2, rowCountWithLimit);
+ Assert.assertTrue(rowCountWithoutLimit > rowCountWithLimit);
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+ CommonDescriptor.getInstance()
+ .getConfig()
+
.setPipeDataStructureTabletSizeInBytes(originalPipeDataStructureTabletSizeInBytes);
+ }
+ }
+
@Test
public void testQueryParserSkipsUnnecessaryBitMaps() throws Exception {
testTreeParserSkipsUnnecessaryBitMaps(true);
@@ -1340,8 +1512,8 @@ public class TsFileInsertionEventParserTest {
Assert.assertTrue(iterator.hasNext());
Tablet parsedTablet = ((PipeRawTabletInsertionEvent)
iterator.next()).convertToTablet();
if (parsedTablet.getSchemas().size() > 1) {
- assertBitMapExistence(parsedTablet, false, true);
- Assert.assertTrue(parsedTablet.isNull(1, 1));
+ assertBitMapExistenceByMeasurement(parsedTablet, Map.of("dense",
false, "sparse", true));
+ Assert.assertTrue(parsedTablet.isNull(1,
getColumnIndex(parsedTablet, "sparse")));
Assert.assertFalse(iterator.hasNext());
} else {
Assert.assertNull(parsedTablet.getBitMaps());
@@ -1373,6 +1545,33 @@ public class TsFileInsertionEventParserTest {
}
}
+ private void assertBitMapExistenceByMeasurement(
+ final Tablet tablet, final Map<String, Boolean>
expectedMeasurementHasBitMap) {
+ final BitMap[] bitMaps = tablet.getBitMaps();
+ Assert.assertNotNull(bitMaps);
+ Assert.assertEquals(tablet.getSchemas().size(), bitMaps.length);
+ Assert.assertEquals(expectedMeasurementHasBitMap.size(),
tablet.getSchemas().size());
+ for (int i = 0; i < tablet.getSchemas().size(); ++i) {
+ final String measurement =
tablet.getSchemas().get(i).getMeasurementName();
+ Assert.assertTrue(expectedMeasurementHasBitMap.containsKey(measurement));
+ if (expectedMeasurementHasBitMap.get(measurement)) {
+ Assert.assertNotNull(bitMaps[i]);
+ } else {
+ Assert.assertNull(bitMaps[i]);
+ }
+ }
+ }
+
+ private int getColumnIndex(final Tablet tablet, final String measurement) {
+ for (int i = 0; i < tablet.getSchemas().size(); ++i) {
+ if (tablet.getSchemas().get(i).getMeasurementName().equals(measurement))
{
+ return i;
+ }
+ }
+ fail(String.format("Measurement %s does not exist in tablet.",
measurement));
+ return -1;
+ }
+
private void generateLargeAlignedTsFile(
final File tsFile,
final List<IMeasurementSchema> schemaList,
@@ -1887,6 +2086,146 @@ public class TsFileInsertionEventParserTest {
}
}
+ private long calculateFirstBatchedAlignedValueChunkGroupMemoryLimit(
+ final File tsFile, final int batchSize) throws Exception {
+ try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+ final IDeviceID deviceID =
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+ final List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
+ reader.getAlignedChunkMetadata(deviceID, true);
+ Assert.assertEquals(2, alignedChunkMetadataList.size());
+
+ final AbstractAlignedChunkMetadata alignedChunkMetadata =
alignedChunkMetadataList.get(0);
+ final Chunk timeChunk =
+ reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
+ final List<IChunkMetadata> valueChunkMetadataList =
+ alignedChunkMetadata.getValueChunkMetadataList();
+ Assert.assertTrue(valueChunkMetadataList.size() >= batchSize * 2);
+
+ final List<Chunk> firstValueChunkBatch = new ArrayList<>();
+ final List<Chunk> firstTwoValueChunkBatches = new ArrayList<>();
+ long firstBatchChunkSize =
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+ long firstTwoBatchChunkSize = firstBatchChunkSize;
+ for (int index = 0; index < batchSize * 2; ++index) {
+ final Chunk valueChunk =
+ reader.readMemChunk((ChunkMetadata)
valueChunkMetadataList.get(index));
+ if (index < batchSize) {
+ firstValueChunkBatch.add(valueChunk);
+ firstBatchChunkSize += valueChunk.getHeader().getDataSize();
+ }
+ firstTwoValueChunkBatches.add(valueChunk);
+ firstTwoBatchChunkSize += valueChunk.getHeader().getDataSize();
+ }
+
+ final long firstBatchPageMemorySize =
+ AlignedSinglePageWholeChunkReader
+ .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+ timeChunk, firstValueChunkBatch);
+ final long firstTwoBatchPageMemorySize =
+ AlignedSinglePageWholeChunkReader
+ .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+ timeChunk, firstTwoValueChunkBatches);
+ Assert.assertTrue(firstTwoBatchChunkSize > firstBatchChunkSize);
+ Assert.assertTrue(firstTwoBatchPageMemorySize >
firstBatchPageMemorySize);
+ return Math.max(firstBatchChunkSize, firstBatchPageMemorySize);
+ }
+ }
+
+ private void writeAlignedSourceTsFile(
+ final File tsFile, final List<IMeasurementSchema> schemaList, final int
rowCount)
+ throws IOException {
+ if (tsFile.exists()) {
+ Assert.assertTrue(tsFile.delete());
+ }
+ Assert.assertEquals(0, rowCount % 2);
+
+ final IDeviceID deviceID =
IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg.d");
+ try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) {
+ writer.startChunkGroup(deviceID);
+ final int rowCountPerChunk = rowCount / 2;
+ for (int chunkIndex = 0; chunkIndex < 2; ++chunkIndex) {
+ final AlignedChunkWriterImpl alignedChunkWriter = new
AlignedChunkWriterImpl(schemaList);
+ for (int row = 0; row < rowCountPerChunk; ++row) {
+ final long time = (long) chunkIndex * rowCountPerChunk + row;
+ alignedChunkWriter.getTimeChunkWriter().write(time);
+ for (int measurementIndex = 0; measurementIndex < schemaList.size();
++measurementIndex) {
+ alignedChunkWriter
+ .getValueChunkWriterByIndex(measurementIndex)
+ .write(time, time * 100 + measurementIndex, false);
+ }
+ }
+ alignedChunkWriter.writeToFileWriter(writer);
+ }
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ }
+
+ private void rewriteAlignedTsFileWithBatchedValueChunks(
+ final File sourceTsFile,
+ final File targetTsFile,
+ final int measurementCount,
+ final int batchSize)
+ throws Exception {
+ if (targetTsFile.exists()) {
+ Assert.assertTrue(targetTsFile.delete());
+ }
+
+ try (final TsFileSequenceReader reader =
+ new TsFileSequenceReader(sourceTsFile.getAbsolutePath())) {
+ final IDeviceID deviceID =
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+ final List<AbstractAlignedChunkMetadata> sourceAlignedChunkMetadataList =
+ reader.getAlignedChunkMetadata(deviceID, true);
+ Assert.assertEquals(2, sourceAlignedChunkMetadataList.size());
+ for (final AbstractAlignedChunkMetadata sourceAlignedChunkMetadata :
+ sourceAlignedChunkMetadataList) {
+ Assert.assertEquals(
+ measurementCount,
sourceAlignedChunkMetadata.getValueChunkMetadataList().size());
+ }
+
+ try (final CompactionTsFileWriter writer =
+ new CompactionTsFileWriter(
+ targetTsFile, Long.MAX_VALUE,
CompactionType.INNER_SEQ_COMPACTION)) {
+ writer.startChunkGroup(deviceID);
+ writer.markStartingWritingAligned();
+ for (final AbstractAlignedChunkMetadata sourceAlignedChunkMetadata :
+ sourceAlignedChunkMetadataList) {
+ final ChunkMetadata timeChunkMetadata =
+ (ChunkMetadata)
sourceAlignedChunkMetadata.getTimeChunkMetadata();
+ writer.writeChunk(reader.readMemChunk(timeChunkMetadata),
timeChunkMetadata);
+ }
+
+ for (int start = 0; start < measurementCount; start += batchSize) {
+ writeValueChunkBatch(
+ reader,
+ writer,
+ sourceAlignedChunkMetadataList,
+ start,
+ Math.min(start + batchSize, measurementCount));
+ }
+ writer.markEndingWritingAligned();
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ }
+ }
+
+ private void writeValueChunkBatch(
+ final TsFileSequenceReader reader,
+ final CompactionTsFileWriter writer,
+ final List<AbstractAlignedChunkMetadata> alignedChunkMetadataList,
+ final int start,
+ final int end)
+ throws IOException {
+ for (final AbstractAlignedChunkMetadata alignedChunkMetadata :
alignedChunkMetadataList) {
+ final List<IChunkMetadata> valueChunkMetadataList =
+ alignedChunkMetadata.getValueChunkMetadataList();
+ for (int index = start; index < end; ++index) {
+ final ChunkMetadata valueChunkMetadata = (ChunkMetadata)
valueChunkMetadataList.get(index);
+ writer.writeChunk(reader.readMemChunk(valueChunkMetadata),
valueChunkMetadata);
+ }
+ }
+ }
+
private static class ParserPerformanceStats {
private long pointCount;