This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 65d2582cb05 Pipe: merge batched aligned chunks in scan parser (#18010)
(#18041)
65d2582cb05 is described below
commit 65d2582cb05d979d2d32d772db6d7059b9dbbd21
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 30 10:18:03 2026 +0800
Pipe: merge batched aligned chunks in scan parser (#18010) (#18041)
* Pipe: merge batched aligned chunks in scan parser
* Test pipe batched aligned chunk memory boundaries
* Pipe: fix batched aligned scan parser memory split
* Update TsFileInsertionEventParserTest.java
* Rename pending aligned chunk consumer
(cherry picked from commit f96fc5824f62b4c637c0d9b5e9ea4adc9f8b1853)
---
.../container/scan/SinglePageWholeChunkReader.java | 33 +-
.../scan/TsFileInsertionScanDataContainer.java | 527 +++++++++++++--------
.../pipe/resource/memory/PipeMemoryWeightUtil.java | 18 +-
.../event/TsFileInsertionDataContainerTest.java | 312 ++++++++++++
4 files changed, 691 insertions(+), 199 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
index f41a6861120..1f741ddfc70 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/SinglePageWholeChunkReader.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.container.scan;
+import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.compress.IUnCompressor;
import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.enums.TSDataType;
@@ -159,22 +160,44 @@ public class SinglePageWholeChunkReader extends
AbstractChunkReader
static long estimatePageMemoryUsageInBytesWithBatchData(
final PageHeader timePageHeader,
final Chunk timeChunk,
- final List<TSDataType> valueDataTypeList) {
+ final List<TSDataType> valueDataTypeList)
+ throws IOException {
return estimatePageMemoryUsageInBytesWithBatchData(
timePageHeader.getUncompressedSize(),
getPageRowCount(timePageHeader, timeChunk),
valueDataTypeList);
}
- static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) {
+ static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk)
throws IOException {
if (isSinglePageChunk(chunk.getHeader())) {
- return Objects.isNull(chunk.getChunkStatistic())
- ? 0
- : saturateToInt(chunk.getChunkStatistic().getCount());
+ if (Objects.nonNull(chunk.getChunkStatistic())) {
+ return saturateToInt(chunk.getChunkStatistic().getCount());
+ }
+ return isTimeChunk(chunk.getHeader()) ? countSinglePageTimeValues(chunk)
: 0;
}
return saturateToInt(pageHeader.getNumOfValues());
}
+ private static int countSinglePageTimeValues(final Chunk chunk) throws
IOException {
+ final ByteBuffer chunkDataBuffer = chunk.getData().duplicate();
+ final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer,
chunk.getHeader());
+ final ByteBuffer pageData = deserializePageData(pageHeader,
chunkDataBuffer, chunk.getHeader());
+ final Decoder decoder =
+ Decoder.getDecoderByType(chunk.getHeader().getEncodingType(),
TSDataType.INT64);
+
+ int rowCount = 0;
+ while (decoder.hasNext(pageData)) {
+ decoder.readLong(pageData);
+ ++rowCount;
+ }
+ return rowCount;
+ }
+
+ private static boolean isTimeChunk(final ChunkHeader chunkHeader) {
+ return (chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+ == TsFileConstant.TIME_COLUMN_MASK;
+ }
+
private static int saturateToInt(final long value) {
return (int) Math.min(Integer.MAX_VALUE, value);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 6c3d341f774..6a0cfa4f3c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -67,6 +67,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collections;
@@ -103,11 +104,11 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
// Cached time chunk
private final List<Chunk> timeChunkList = new ArrayList<>();
private final List<Boolean> isMultiPageList = new ArrayList<>();
- private final List<Long> timeChunkPageMemorySizeList = new ArrayList<>();
private final Map<String, Integer> measurementIndexMap = new HashMap<>();
- private int lastIndex = -1;
- private Chunk firstChunk4NextSequentialValueChunks;
+ private final List<PendingAlignedChunkGroup> pendingAlignedChunkGroups = new
ArrayList<>();
+ private long pendingAlignedChunkSize;
+ private CachedAlignedValueChunk cachedAlignedValueChunk;
private byte lastMarker = Byte.MIN_VALUE;
@@ -521,14 +522,14 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
private void moveToNextChunkReader() throws IOException,
IllegalStateException {
ChunkHeader chunkHeader;
- long valueChunkSize = 0;
- long valueChunkPageMemorySize = 0;
- final List<Chunk> valueChunkList = new ArrayList<>();
currentMeasurements.clear();
modsInfos.clear();
if (lastMarker == MetaMarker.SEPARATOR) {
- chunkReader = null;
+ if (!useNextPendingAlignedChunk(lastMarker)) {
+ clearCachedAlignedChunkData();
+ chunkReader = null;
+ }
return;
}
@@ -536,8 +537,8 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
while ((marker =
lastMarker != Byte.MIN_VALUE
? lastMarker
- : Objects.nonNull(firstChunk4NextSequentialValueChunks)
- ?
toValueChunkMarker(firstChunk4NextSequentialValueChunks.getHeader())
+ : Objects.nonNull(cachedAlignedValueChunk)
+ ?
toValueChunkMarker(cachedAlignedValueChunk.chunk.getHeader())
: tsFileSequenceReader.readMarker())
!= MetaMarker.SEPARATOR) {
lastMarker = Byte.MIN_VALUE;
@@ -550,70 +551,19 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
// Notice that the data in one chunk group is either aligned or
non-aligned
// There is no need to consider non-aligned chunks when there are
value chunks
currentIsMultiPage = marker == MetaMarker.CHUNK_HEADER;
- long currentChunkHeaderOffset = tsFileSequenceReader.position() -
1;
+ final long currentChunkHeaderOffset =
tsFileSequenceReader.position() - 1;
chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
- final long nextMarkerOffset =
- tsFileSequenceReader.position() + chunkHeader.getDataSize();
-
- if (Objects.isNull(currentDevice)) {
- tsFileSequenceReader.position(nextMarkerOffset);
- break;
- }
-
- if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
- == TsFileConstant.TIME_COLUMN_MASK) {
- final Chunk timeChunk =
- new Chunk(
- chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
- final boolean isMultiPage = marker ==
MetaMarker.TIME_CHUNK_HEADER;
- timeChunkList.add(timeChunk);
- isMultiPageList.add(isMultiPage);
- timeChunkPageMemorySizeList.add(
-
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(
- timeChunk));
- break;
- }
- if (!pattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
- tsFileSequenceReader.position(nextMarkerOffset);
+ if (filterChunk(currentChunkHeaderOffset, chunkHeader, false,
marker)) {
break;
}
- // Skip the chunk if it is fully deleted by mods
- if (!currentModifications.isEmpty()) {
- Statistics statistics = null;
- try {
- statistics =
- findNonAlignedChunkStatistics(
- tsFileSequenceReader.getIChunkMetadataList(
- CompactionPathUtils.getPath(
- currentDevice,
chunkHeader.getMeasurementID())),
- currentChunkHeaderOffset);
- } catch (IllegalPathException ignore) {
- LOGGER.warn(
- "Failed to get chunk metadata for {}.{}",
- currentDevice,
- chunkHeader.getMeasurementID());
- }
-
- if (statistics != null
- && ModsOperationUtil.isAllDeletedByMods(
- currentDevice,
- chunkHeader.getMeasurementID(),
- statistics.getStartTime(),
- statistics.getEndTime(),
- currentModifications)) {
- tsFileSequenceReader.position(nextMarkerOffset);
- break;
- }
- }
-
if (chunkHeader.getDataSize() >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
PipeDataNodeResourceManager.memory()
.forceResize(allocatedMemoryBlockForChunk,
chunkHeader.getDataSize());
}
- Chunk chunk =
+ final Chunk chunk =
new Chunk(
chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
final List<Long> pageEstimatedMemoryUsageInBytesList =
@@ -638,49 +588,16 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
case MetaMarker.VALUE_CHUNK_HEADER:
case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER:
{
- Chunk chunk;
- long currentValueChunkPageMemorySize = 0;
- if (Objects.isNull(firstChunk4NextSequentialValueChunks)) {
+ CachedAlignedValueChunk valueChunk = cachedAlignedValueChunk;
+ if (Objects.isNull(valueChunk)) {
final long currentChunkHeaderOffset =
tsFileSequenceReader.position() - 1;
chunkHeader = tsFileSequenceReader.readChunkHeader(marker);
- final long nextMarkerOffset =
- tsFileSequenceReader.position() + chunkHeader.getDataSize();
- if (Objects.isNull(currentDevice)
- || !pattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
- tsFileSequenceReader.position(nextMarkerOffset);
+ if (filterChunk(currentChunkHeaderOffset, chunkHeader, true,
marker)) {
break;
}
- if (!currentModifications.isEmpty()) {
- // Skip the chunk if it is fully deleted by mods
- Statistics statistics = null;
- try {
- statistics =
- findAlignedChunkStatistics(
- tsFileSequenceReader.getIChunkMetadataList(
- CompactionPathUtils.getPath(
- currentDevice,
chunkHeader.getMeasurementID())),
- currentChunkHeaderOffset);
- } catch (IllegalPathException ignore) {
- LOGGER.warn(
- "Failed to get chunk metadata for {}.{}",
- currentDevice,
- chunkHeader.getMeasurementID());
- }
- if (statistics != null
- && ModsOperationUtil.isAllDeletedByMods(
- currentDevice,
- chunkHeader.getMeasurementID(),
- statistics.getStartTime(),
- statistics.getEndTime(),
- currentModifications)) {
- tsFileSequenceReader.position(nextMarkerOffset);
- break;
- }
- }
-
- // Increase value index
+ // Increase value index.
final String measurementID =
tabletStringInternPool.intern(chunkHeader.getMeasurementID());
final int valueIndex =
@@ -688,86 +605,35 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
measurementID,
(measurement, index) -> Objects.nonNull(index) ? index +
1 : 0);
- // Emit when encountered non-sequential value chunk, or the
chunk size exceeds
- // certain value to avoid OOM
- // Do not record or end current value chunks when there are
empty chunks
+ // Do not record or end current value chunks when there are
empty chunks.
if (chunkHeader.getDataSize() == 0) {
break;
}
- chunk =
+ final Chunk chunk =
new Chunk(
chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
- currentValueChunkPageMemorySize =
calculateMaxPageMemorySize(chunk);
- boolean needReturn = false;
- final long timeChunkSize =
- lastIndex >= 0
- ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
- timeChunkList.get(lastIndex))
- : 0;
- final long timeChunkPageMemorySize =
- lastIndex >= 0 ? timeChunkPageMemorySizeList.get(lastIndex)
: 0;
- if (lastIndex >= 0) {
- if (valueIndex != lastIndex) {
- needReturn = recordAlignedChunk(valueChunkList, marker);
- } else {
- final long chunkSize = timeChunkSize + valueChunkSize;
- final long pageMemorySize = timeChunkPageMemorySize +
valueChunkPageMemorySize;
- if (chunkSize + chunkHeader.getDataSize()
- >
allocatedMemoryBlockForChunk.getMemoryUsageInBytes()
- || timeChunkPageMemorySize > 0
- && currentValueChunkPageMemorySize > 0
- && pageMemorySize + currentValueChunkPageMemorySize
- > getPageDataMemoryLimitInBytes()) {
- needReturn = recordAlignedChunk(valueChunkList, marker);
- }
- }
- }
- lastIndex = valueIndex;
- if (needReturn) {
- firstChunk4NextSequentialValueChunks = chunk;
- return;
- }
-
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList,
chunkHeader);
- resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
- valueChunkList, currentValueChunkPageMemorySize);
+ valueChunk =
+ new CachedAlignedValueChunk(valueIndex, chunk,
chunkHeader.getDataSize());
} else {
- chunk = firstChunk4NextSequentialValueChunks;
- chunkHeader = chunk.getHeader();
- firstChunk4NextSequentialValueChunks = null;
- currentValueChunkPageMemorySize =
calculateMaxPageMemorySize(chunk);
-
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList,
chunkHeader);
- resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
- valueChunkList, currentValueChunkPageMemorySize);
+ cachedAlignedValueChunk = null;
}
- valueChunkSize += chunkHeader.getDataSize();
- valueChunkPageMemorySize += currentValueChunkPageMemorySize;
- valueChunkList.add(chunk);
- final String measurementID =
- tabletStringInternPool.intern(chunkHeader.getMeasurementID());
- currentMeasurements.add(
- new MeasurementSchema(measurementID,
chunkHeader.getDataType()));
- modsInfos.addAll(
- ModsOperationUtil.initializeMeasurementMods(
- currentDevice, Collections.singletonList(measurementID),
currentModifications));
+ if (returnPendingAlignedChunkBeforeCaching(valueChunk)) {
+ return;
+ }
+ cacheAlignedValueChunk(valueChunk);
break;
}
case MetaMarker.CHUNK_GROUP_HEADER:
{
- // Return before "currentDevice" changes
- if (recordAlignedChunk(valueChunkList, marker)) {
+ // Return before "currentDevice" changes.
+ if (useNextPendingAlignedChunk(marker)) {
return;
}
+ clearCachedAlignedChunkData();
final String deviceID =
((PlainDeviceID)
tsFileSequenceReader.readChunkGroupHeader().getDeviceID())
.toStringID();
- // Clear because the cached data will never be used in the next
chunk group
- lastIndex = -1;
- timeChunkList.clear();
- isMultiPageList.clear();
- timeChunkPageMemorySizeList.clear();
- measurementIndexMap.clear();
-
currentDevice =
pattern.mayOverlapWithDevice(deviceID)
? tabletStringInternPool.intern(deviceID)
@@ -785,7 +651,8 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
lastMarker = marker;
- if (!recordAlignedChunk(valueChunkList, marker)) {
+ if (!useNextPendingAlignedChunk(marker)) {
+ clearCachedAlignedChunkData();
chunkReader = null;
}
}
@@ -794,22 +661,106 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
return PipeConfig.getInstance().getPipeMaxReaderChunkSize();
}
- private boolean recordAlignedChunk(final List<Chunk> valueChunkList, final
byte marker)
+ private long getChunkMemoryLimitInBytes() {
+ return PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+ }
+
+ private boolean filterChunk(
+ final long currentChunkHeaderOffset,
+ final ChunkHeader chunkHeader,
+ final boolean isAlignedValueChunk,
+ final byte marker)
throws IOException {
- if (!valueChunkList.isEmpty()) {
- final Chunk timeChunk = timeChunkList.get(lastIndex);
+ final long nextMarkerOffset = tsFileSequenceReader.position() +
chunkHeader.getDataSize();
+
+ if (Objects.isNull(currentDevice)) {
+ tsFileSequenceReader.position(nextMarkerOffset);
+ return true;
+ }
+
+ if (!isAlignedValueChunk) {
+ if ((chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
+ == TsFileConstant.TIME_COLUMN_MASK) {
+ final Chunk timeChunk =
+ new Chunk(chunkHeader, tsFileSequenceReader.readChunk(-1,
chunkHeader.getDataSize()));
+ timeChunkList.add(timeChunk);
+ final boolean isMultiPage = marker == MetaMarker.TIME_CHUNK_HEADER;
+ isMultiPageList.add(isMultiPage);
+ return true;
+ }
+ }
+
+ if (!pattern.matchesMeasurement(currentDevice,
chunkHeader.getMeasurementID())) {
+ tsFileSequenceReader.position(nextMarkerOffset);
+ return true;
+ }
+
+ // Skip the chunk if it is fully deleted by mods
+ if (!currentModifications.isEmpty()) {
+ Statistics statistics = null;
+ try {
+ statistics =
+ isAlignedValueChunk
+ ? findAlignedChunkStatistics(
+ tsFileSequenceReader.getIChunkMetadataList(
+ CompactionPathUtils.getPath(currentDevice,
chunkHeader.getMeasurementID())),
+ currentChunkHeaderOffset)
+ : findNonAlignedChunkStatistics(
+ tsFileSequenceReader.getIChunkMetadataList(
+ CompactionPathUtils.getPath(currentDevice,
chunkHeader.getMeasurementID())),
+ currentChunkHeaderOffset);
+ } catch (IllegalPathException ignore) {
+ LOGGER.warn(
+ "Failed to get chunk metadata for {}.",
+ currentDevice + "." + chunkHeader.getMeasurementID());
+ }
+
+ if (statistics != null
+ && ModsOperationUtil.isAllDeletedByMods(
+ currentDevice,
+ chunkHeader.getMeasurementID(),
+ statistics.getStartTime(),
+ statistics.getEndTime(),
+ currentModifications)) {
+ tsFileSequenceReader.position(nextMarkerOffset);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean useNextPendingAlignedChunk(final byte marker) throws
IOException {
+ while (!pendingAlignedChunkGroups.isEmpty()) {
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup =
pendingAlignedChunkGroups.remove(0);
+ pendingAlignedChunkSize =
+ Math.max(0, pendingAlignedChunkSize -
pendingAlignedChunkGroup.chunkSize);
+
+ if (pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
+ continue;
+ }
+
+ final Chunk timeChunk =
timeChunkList.get(pendingAlignedChunkGroup.timeChunkIndex);
timeChunk.getData().rewind();
- currentIsMultiPage = isMultiPageList.get(lastIndex);
+ for (final Chunk valueChunk : pendingAlignedChunkGroup.valueChunkList) {
+ valueChunk.getData().rewind();
+ }
+
+ currentMeasurements.clear();
+ currentMeasurements.addAll(pendingAlignedChunkGroup.measurements);
+ modsInfos.clear();
+ modsInfos.addAll(pendingAlignedChunkGroup.modsInfos);
+
+ currentIsMultiPage =
isMultiPageList.get(pendingAlignedChunkGroup.timeChunkIndex);
if (!currentIsMultiPage) {
resizePageDataMemoryIfNeeded(
AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes(
- timeChunk, valueChunkList));
+ timeChunk, pendingAlignedChunkGroup.valueChunkList));
}
final List<Long> pageEstimatedMemoryUsageInBytesList =
currentIsMultiPage
? AlignedSinglePageWholeChunkReader
.calculatePageEstimatedMemoryUsageInBytesWithBatchDataList(
- timeChunk, valueChunkList)
+ timeChunk, pendingAlignedChunkGroup.valueChunkList)
: Collections.emptyList();
final long maxPageEstimatedMemoryUsageInBytes =
pageEstimatedMemoryUsageInBytesList.isEmpty()
@@ -819,50 +770,222 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
chunkReader =
currentIsMultiPage
? new MemoryControlledChunkReader(
- new AlignedChunkReader(timeChunk, valueChunkList, filter),
+ new AlignedChunkReader(
+ timeChunk, pendingAlignedChunkGroup.valueChunkList,
filter),
pageEstimatedMemoryUsageInBytesList)
- : new AlignedSinglePageWholeChunkReader(timeChunk,
valueChunkList);
+ : new AlignedSinglePageWholeChunkReader(
+ timeChunk, pendingAlignedChunkGroup.valueChunkList);
currentIsAligned = true;
- lastMarker = marker;
+ if (marker != Byte.MIN_VALUE) {
+ lastMarker = marker;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private boolean shouldReturnPendingAlignedChunkBeforeCaching(
+ final CachedAlignedValueChunk valueChunk) throws IOException {
+ validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex);
+
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+ findPendingAlignedChunkGroup(valueChunk.timeChunkIndex);
+ final boolean isFirstValueChunkInGroup =
+ Objects.isNull(pendingAlignedChunkGroup)
+ || pendingAlignedChunkGroup.valueChunkList.isEmpty();
+ final long timeChunkSize =
+ Objects.isNull(pendingAlignedChunkGroup)
+ ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed(
+ timeChunkList.get(valueChunk.timeChunkIndex))
+ : 0;
+ final long chunkSizeAfterCaching =
+ pendingAlignedChunkSize + timeChunkSize + valueChunk.valueChunkSize;
+
+ if (isFirstValueChunkInGroup) {
+ final long firstValueChunkGroupSize =
+ timeChunkSize
+ + (Objects.isNull(pendingAlignedChunkGroup) ? 0 :
pendingAlignedChunkGroup.chunkSize)
+ + valueChunk.valueChunkSize;
+ if (firstValueChunkGroupSize > getChunkMemoryLimitInBytes()) {
+ return !pendingAlignedChunkGroups.isEmpty();
+ }
+ }
+
+ if (!pendingAlignedChunkGroups.isEmpty()
+ && chunkSizeAfterCaching > getChunkMemoryLimitInBytes()) {
+ return true;
+ }
+
+ final long pageMemorySizeAfterCaching =
+ calculateMaxAlignedPageMemorySizeWithBatchData(
+ valueChunk.timeChunkIndex, pendingAlignedChunkGroup, valueChunk);
+ return pageMemorySizeAfterCaching > getPageDataMemoryLimitInBytes()
+ && (!isFirstValueChunkInGroup || !pendingAlignedChunkGroups.isEmpty());
+ }
+
+ private boolean returnPendingAlignedChunkBeforeCaching(final
CachedAlignedValueChunk valueChunk)
+ throws IOException {
+ if (!shouldReturnPendingAlignedChunkBeforeCaching(valueChunk)) {
+ return false;
+ }
+
+ cachedAlignedValueChunk = valueChunk;
+ if (useNextPendingAlignedChunk(Byte.MIN_VALUE)) {
return true;
}
+ cachedAlignedValueChunk = null;
return false;
}
+ private void cacheAlignedValueChunk(final CachedAlignedValueChunk
valueChunk) throws IOException {
+ validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex);
+
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+ getOrCreatePendingAlignedChunkGroup(valueChunk.timeChunkIndex);
+
resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup,
valueChunk);
+
resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup,
valueChunk);
+
+ pendingAlignedChunkGroup.valueChunkList.add(valueChunk.chunk);
+ pendingAlignedChunkGroup.chunkSize += valueChunk.valueChunkSize;
+ pendingAlignedChunkSize += valueChunk.valueChunkSize;
+
+ final ChunkHeader chunkHeader = valueChunk.chunk.getHeader();
+ final String measurementID =
tabletStringInternPool.intern(chunkHeader.getMeasurementID());
+ pendingAlignedChunkGroup.measurements.add(
+ new MeasurementSchema(measurementID, chunkHeader.getDataType()));
+ pendingAlignedChunkGroup.modsInfos.addAll(
+ ModsOperationUtil.initializeMeasurementMods(
+ currentDevice, Collections.singletonList(measurementID),
currentModifications));
+ }
+
+ private PendingAlignedChunkGroup getOrCreatePendingAlignedChunkGroup(final
int timeChunkIndex) {
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup =
+ findPendingAlignedChunkGroup(timeChunkIndex);
+ if (Objects.nonNull(pendingAlignedChunkGroup)) {
+ return pendingAlignedChunkGroup;
+ }
+
+ final PendingAlignedChunkGroup newPendingAlignedChunkGroup =
+ new PendingAlignedChunkGroup(
+ timeChunkIndex,
+
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(timeChunkIndex)));
+ pendingAlignedChunkSize += newPendingAlignedChunkGroup.chunkSize;
+
+ for (int i = 0; i < pendingAlignedChunkGroups.size(); ++i) {
+ if (pendingAlignedChunkGroups.get(i).timeChunkIndex > timeChunkIndex) {
+ pendingAlignedChunkGroups.add(i, newPendingAlignedChunkGroup);
+ return newPendingAlignedChunkGroup;
+ }
+ }
+ pendingAlignedChunkGroups.add(newPendingAlignedChunkGroup);
+ return newPendingAlignedChunkGroup;
+ }
+
+ private PendingAlignedChunkGroup findPendingAlignedChunkGroup(final int
timeChunkIndex) {
+ for (final PendingAlignedChunkGroup pendingAlignedChunkGroup :
pendingAlignedChunkGroups) {
+ if (pendingAlignedChunkGroup.timeChunkIndex == timeChunkIndex) {
+ return pendingAlignedChunkGroup;
+ }
+ }
+ return null;
+ }
+
+ private void validateAlignedValueChunkTimeIndex(final int timeChunkIndex)
throws IOException {
+ if (timeChunkIndex < 0 || timeChunkIndex >= timeChunkList.size()) {
+ throw new IOException(
+ String.format(
+ "Invalid aligned value chunk index %d, while there are %d time
chunks.",
+ timeChunkIndex, timeChunkList.size()));
+ }
+ }
+
+ private void clearCachedAlignedChunkData() {
+ pendingAlignedChunkGroups.clear();
+ pendingAlignedChunkSize = 0;
+ cachedAlignedValueChunk = null;
+ timeChunkList.clear();
+ isMultiPageList.clear();
+ measurementIndexMap.clear();
+ }
+
private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(
- final List<Chunk> valueChunkList, final ChunkHeader valueChunkHeader) {
- if (!valueChunkList.isEmpty() || lastIndex < 0) {
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+ final CachedAlignedValueChunk valueChunk) {
+ if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
return;
}
- final long chunkSize =
-
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex))
- + valueChunkHeader.getDataSize();
+ final long chunkSize = pendingAlignedChunkGroup.chunkSize +
valueChunk.valueChunkSize;
if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) {
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk,
chunkSize);
}
}
private void resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(
- final List<Chunk> valueChunkList, final long valueChunkPageMemorySize) {
- if (!valueChunkList.isEmpty() || lastIndex < 0 || valueChunkPageMemorySize
<= 0) {
- return;
- }
-
- final long timeChunkPageMemorySize =
timeChunkPageMemorySizeList.get(lastIndex);
- if (timeChunkPageMemorySize <= 0) {
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+ final CachedAlignedValueChunk valueChunk)
+ throws IOException {
+ if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) {
return;
}
- final long pageMemorySize = timeChunkPageMemorySize +
valueChunkPageMemorySize;
+ final long pageMemorySize =
+ calculateMaxAlignedPageMemorySizeWithBatchData(
+ pendingAlignedChunkGroup.timeChunkIndex, pendingAlignedChunkGroup,
valueChunk);
if (pageMemorySize > getPageDataMemoryLimitInBytes()) {
PipeDataNodeResourceManager.memory()
.forceResize(allocatedMemoryBlockForBatchData, pageMemorySize);
}
}
- private long calculateMaxPageMemorySize(final Chunk chunk) throws
IOException {
- return
SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(chunk);
+ private long calculateMaxAlignedPageMemorySizeWithBatchData(
+ final int timeChunkIndex,
+ final PendingAlignedChunkGroup pendingAlignedChunkGroup,
+ final CachedAlignedValueChunk valueChunk)
+ throws IOException {
+ final List<Chunk> valueChunkList =
+ new ArrayList<>(
+ (Objects.isNull(pendingAlignedChunkGroup)
+ ? 0
+ : pendingAlignedChunkGroup.valueChunkList.size())
+ + 1);
+ if (Objects.nonNull(pendingAlignedChunkGroup)) {
+ valueChunkList.addAll(pendingAlignedChunkGroup.valueChunkList);
+ }
+ valueChunkList.add(valueChunk.chunk);
+
+ final Chunk timeChunk = timeChunkList.get(timeChunkIndex);
+ final int timeChunkDataPosition = timeChunk.getData().position();
+ final List<Integer> valueChunkDataPositions = new
ArrayList<>(valueChunkList.size());
+ for (final Chunk chunk : valueChunkList) {
+ valueChunkDataPositions.add(Objects.isNull(chunk) ? 0 :
chunk.getData().position());
+ }
+
+ rewindChunkData(timeChunk);
+ valueChunkList.forEach(this::rewindChunkData);
+ try {
+ return AlignedSinglePageWholeChunkReader
+ .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(timeChunk,
valueChunkList);
+ } finally {
+ timeChunk.getData().position(timeChunkDataPosition);
+ for (int i = 0; i < valueChunkList.size(); ++i) {
+ final Chunk chunk = valueChunkList.get(i);
+ if (Objects.nonNull(chunk)) {
+ chunk.getData().position(valueChunkDataPositions.get(i));
+ }
+ }
+ }
+ }
+
+ private void rewindChunkData(final Chunk chunk) {
+ if (Objects.isNull(chunk)) {
+ return;
+ }
+
+ final ByteBuffer chunkData = chunk.getData();
+ if (Objects.nonNull(chunkData)) {
+ chunkData.rewind();
+ }
}
private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) {
@@ -915,4 +1038,32 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
return null;
}
+
+ private static class PendingAlignedChunkGroup {
+
+ private final int timeChunkIndex;
+ private final List<Chunk> valueChunkList = new ArrayList<>();
+ private final List<MeasurementSchema> measurements = new ArrayList<>();
+ private final List<ModsOperationUtil.ModsInfo> modsInfos = new
ArrayList<>();
+ private long chunkSize;
+
+ private PendingAlignedChunkGroup(final int timeChunkIndex, final long
timeChunkSize) {
+ this.timeChunkIndex = timeChunkIndex;
+ this.chunkSize = timeChunkSize;
+ }
+ }
+
+ private static class CachedAlignedValueChunk {
+
+ private final int timeChunkIndex;
+ private final Chunk chunk;
+ private final long valueChunkSize;
+
+ private CachedAlignedValueChunk(
+ final int timeChunkIndex, final Chunk chunk, final long
valueChunkSize) {
+ this.timeChunkIndex = timeChunkIndex;
+ this.chunk = chunk;
+ this.valueChunkSize = valueChunkSize;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index a22522666c2..b02cff26256 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -179,6 +179,14 @@ public class PipeMemoryWeightUtil {
return new Pair<>(1, 0);
}
+ final int configuredTabletRowSize =
+ PipeConfig.getInstance().getPipeDataStructureTabletRowSize();
+ final boolean hasTabletRowSizeLimit = configuredTabletRowSize > 0;
+ final double inputSizeLimit =
+ hasTabletRowSizeLimit && inputNum > 0
+ ? 100 + inputNum * (double) rowBytesUsed * 1.2
+ : Integer.MAX_VALUE;
+
// Calculate row number according to the max size of a pipe tablet. "100"
is the estimated size
// of other data structures in a pipe tablet.
// "*8" converts bytes to bits, because the bitmap size is 1 bit per
schema.
@@ -186,17 +194,15 @@ public class PipeMemoryWeightUtil {
(int)
Math.min(
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
- Math.min(Integer.MAX_VALUE, 100 + inputNum * (double)
rowBytesUsed * 1.2));
+ Math.min(Integer.MAX_VALUE, inputSizeLimit));
int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
rowNumber = Math.max(1, rowNumber);
- if ( // This means the row number is larger than the max row count of a
pipe tablet
- rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) {
+ // This means the row number is larger than the max row count of a pipe
tablet.
+ if (hasTabletRowSizeLimit && rowNumber > configuredTabletRowSize) {
// Bound the row number, the memory cost is rowSize * rowNumber
- return new Pair<>(
- PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
- rowBytesUsed *
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
+ return new Pair<>(configuredTabletRowSize, rowBytesUsed *
configuredTabletRowSize);
} else {
return new Pair<>(rowNumber, sizeLimit);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
index 36f56e0e606..55a27348e3e 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java
@@ -31,6 +31,8 @@ import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.SinglePageWho
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -49,6 +51,7 @@ import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.common.TimeRange;
@@ -56,8 +59,11 @@ import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsFileGeneratorUtils;
import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.TsFileIOWriter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -422,6 +428,172 @@ public class TsFileInsertionDataContainerTest {
}
}
+ @Test
+ public void testScanParserMergesBatchedAlignedValueChunkGroups() throws
Exception {
+ final long originalPipeMaxReaderChunkSize =
+ CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+ final int originalPipeDataStructureTabletRowSize =
+
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+
+ final int measurementCount = 20;
+ final int batchSize = 10;
+ final int rowCount = 4;
+ final File sourceTsFile = new
File("aligned-source-for-batched-layout.tsfile");
+ alignedTsFile = new File("aligned-batched-layout.tsfile");
+
+ try {
+
CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(1024 *
1024L);
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+
+ final List<MeasurementSchema> schemaList = new ArrayList<>();
+ for (int i = 0; i < measurementCount; ++i) {
+ schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64));
+ }
+
+ writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount);
+ rewriteAlignedTsFileWithBatchedValueChunks(
+ sourceTsFile, alignedTsFile, measurementCount, batchSize);
+
+ int tabletCount = 0;
+ int pointCount = 0;
+ try (final TsFileInsertionScanDataContainer parser =
+ new TsFileInsertionScanDataContainer(
+ alignedTsFile,
+ new PrefixPipePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ false)) {
+ for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
+ Assert.assertTrue(tabletWithIsAligned.getRight());
+ final Tablet tablet = tabletWithIsAligned.getLeft();
+ ++tabletCount;
+ Assert.assertEquals(measurementCount, tablet.getSchemas().size());
+ Assert.assertEquals(rowCount / 2, tablet.rowSize);
+ pointCount += getNonNullSize(tablet);
+ }
+ }
+
+ Assert.assertEquals(measurementCount * rowCount, pointCount);
+ Assert.assertEquals(2, tabletCount);
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+ CommonDescriptor.getInstance()
+ .getConfig()
+
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+ sourceTsFile.delete();
+ }
+ }
+
+ @Test
+ public void
testScanParserFlushesBatchedAlignedValueChunkGroupsByMemoryLimit() throws
Exception {
+ final long originalPipeMaxReaderChunkSize =
+ CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+ final int originalPipeDataStructureTabletRowSize =
+
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+
+ final int measurementCount = 20;
+ final int batchSize = 10;
+ final int rowCount = 4;
+ final File sourceTsFile = new
File("aligned-source-for-batched-layout-memory-limit.tsfile");
+ alignedTsFile = new File("aligned-batched-layout-memory-limit.tsfile");
+
+ try {
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+
+ final List<MeasurementSchema> schemaList = new ArrayList<>();
+ for (int i = 0; i < measurementCount; ++i) {
+ schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64));
+ }
+
+ writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount);
+ rewriteAlignedTsFileWithBatchedValueChunks(
+ sourceTsFile, alignedTsFile, measurementCount, batchSize);
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(
+
calculateFirstBatchedAlignedValueChunkGroupMemoryLimit(alignedTsFile,
batchSize));
+
+ int tabletCount = 0;
+ int maxMeasurementCount = 0;
+ int pointCount = 0;
+ try (final TsFileInsertionScanDataContainer parser =
+ new TsFileInsertionScanDataContainer(
+ alignedTsFile,
+ new PrefixPipePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ false)) {
+ for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
+ Assert.assertTrue(tabletWithIsAligned.getRight());
+ final Tablet tablet = tabletWithIsAligned.getLeft();
+ ++tabletCount;
+ maxMeasurementCount = Math.max(maxMeasurementCount,
tablet.getSchemas().size());
+ Assert.assertTrue(tablet.getSchemas().size() <= batchSize);
+ Assert.assertEquals(rowCount / 2, tablet.rowSize);
+ pointCount += getNonNullSize(tablet);
+ }
+ }
+
+ Assert.assertEquals(batchSize, maxMeasurementCount);
+ Assert.assertEquals(measurementCount * rowCount, pointCount);
+ Assert.assertEquals(measurementCount / batchSize * 2, tabletCount);
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+ CommonDescriptor.getInstance()
+ .getConfig()
+
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+ sourceTsFile.delete();
+ }
+ }
+
+ @Test
+ public void testPipeTabletRowSizeCanBeDisabledByNonPositiveValue() {
+ final int originalPipeDataStructureTabletRowSize =
+
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize();
+ final int originalPipeDataStructureTabletSizeInBytes =
+
CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes();
+
+ try {
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(1024
* 1024);
+
+ final BatchData batchData = new BatchData(TSDataType.INT64);
+ for (int i = 0; i < 1000; ++i) {
+ batchData.putAnObject(i, (long) i);
+ }
+
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(2);
+ final int rowCountWithLimit =
+
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0);
+ final int rowCountWithoutLimit =
+
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(-1);
+ final int rowCountWithNegativeLimit =
+
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft();
+
+ Assert.assertEquals(rowCountWithoutLimit, rowCountWithNegativeLimit);
+ Assert.assertEquals(2, rowCountWithLimit);
+ Assert.assertTrue(rowCountWithoutLimit > rowCountWithLimit);
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
+ CommonDescriptor.getInstance()
+ .getConfig()
+
.setPipeDataStructureTabletSizeInBytes(originalPipeDataStructureTabletSizeInBytes);
+ }
+ }
+
public void testToTabletInsertionEvents(final boolean isQuery) throws
Exception {
// Test empty chunk
testMixedTsFileWithEmptyChunk(isQuery);
@@ -1059,4 +1231,144 @@ public class TsFileInsertionDataContainerTest {
return chunkSizeLimit;
}
}
+
+ private long calculateFirstBatchedAlignedValueChunkGroupMemoryLimit(
+ final File tsFile, final int batchSize) throws Exception {
+ try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+ final IDeviceID deviceID =
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+ final List<AlignedChunkMetadata> alignedChunkMetadataList =
+ reader.getAlignedChunkMetadata(deviceID);
+ Assert.assertEquals(2, alignedChunkMetadataList.size());
+
+ final AlignedChunkMetadata alignedChunkMetadata =
alignedChunkMetadataList.get(0);
+ final Chunk timeChunk =
+ reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
+ final List<IChunkMetadata> valueChunkMetadataList =
+ alignedChunkMetadata.getValueChunkMetadataList();
+ Assert.assertTrue(valueChunkMetadataList.size() >= batchSize * 2);
+
+ final List<Chunk> firstValueChunkBatch = new ArrayList<>();
+ final List<Chunk> firstTwoValueChunkBatches = new ArrayList<>();
+ long firstBatchChunkSize =
PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk);
+ long firstTwoBatchChunkSize = firstBatchChunkSize;
+ for (int index = 0; index < batchSize * 2; ++index) {
+ final Chunk valueChunk =
+ reader.readMemChunk((ChunkMetadata)
valueChunkMetadataList.get(index));
+ if (index < batchSize) {
+ firstValueChunkBatch.add(valueChunk);
+ firstBatchChunkSize += valueChunk.getHeader().getDataSize();
+ }
+ firstTwoValueChunkBatches.add(valueChunk);
+ firstTwoBatchChunkSize += valueChunk.getHeader().getDataSize();
+ }
+
+ final long firstBatchPageMemorySize =
+ AlignedSinglePageWholeChunkReader
+ .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+ timeChunk, firstValueChunkBatch);
+ final long firstTwoBatchPageMemorySize =
+ AlignedSinglePageWholeChunkReader
+ .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(
+ timeChunk, firstTwoValueChunkBatches);
+ Assert.assertTrue(firstTwoBatchChunkSize > firstBatchChunkSize);
+ Assert.assertTrue(firstTwoBatchPageMemorySize >
firstBatchPageMemorySize);
+ return Math.max(firstBatchChunkSize, firstBatchPageMemorySize);
+ }
+ }
+
+ private void writeAlignedSourceTsFile(
+ final File tsFile, final List<MeasurementSchema> schemaList, final int
rowCount)
+ throws IOException {
+ if (tsFile.exists()) {
+ Assert.assertTrue(tsFile.delete());
+ }
+ Assert.assertEquals(0, rowCount % 2);
+
+ final IDeviceID deviceID = new PlainDeviceID("root.sg.d");
+ try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) {
+ writer.startChunkGroup(deviceID);
+ final int rowCountPerChunk = rowCount / 2;
+ for (int chunkIndex = 0; chunkIndex < 2; ++chunkIndex) {
+ final AlignedChunkWriterImpl alignedChunkWriter =
+ new AlignedChunkWriterImpl(new
ArrayList<IMeasurementSchema>(schemaList));
+ for (int row = 0; row < rowCountPerChunk; ++row) {
+ final long time = (long) chunkIndex * rowCountPerChunk + row;
+ alignedChunkWriter.getTimeChunkWriter().write(time);
+ for (int measurementIndex = 0; measurementIndex < schemaList.size();
++measurementIndex) {
+ alignedChunkWriter
+ .getValueChunkWriterByIndex(measurementIndex)
+ .write(time, time * 100 + measurementIndex, false);
+ }
+ }
+ alignedChunkWriter.writeToFileWriter(writer);
+ }
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ }
+
+ private void rewriteAlignedTsFileWithBatchedValueChunks(
+ final File sourceTsFile,
+ final File targetTsFile,
+ final int measurementCount,
+ final int batchSize)
+ throws Exception {
+ if (targetTsFile.exists()) {
+ Assert.assertTrue(targetTsFile.delete());
+ }
+
+ try (final TsFileSequenceReader reader =
+ new TsFileSequenceReader(sourceTsFile.getAbsolutePath())) {
+ final IDeviceID deviceID =
reader.getDeviceMeasurementsMap().keySet().iterator().next();
+ final List<AlignedChunkMetadata> sourceAlignedChunkMetadataList =
+ reader.getAlignedChunkMetadata(deviceID);
+ Assert.assertEquals(2, sourceAlignedChunkMetadataList.size());
+ for (final AlignedChunkMetadata sourceAlignedChunkMetadata :
sourceAlignedChunkMetadataList) {
+ Assert.assertEquals(
+ measurementCount,
sourceAlignedChunkMetadata.getValueChunkMetadataList().size());
+ }
+
+ try (final CompactionTsFileWriter writer =
+ new CompactionTsFileWriter(
+ targetTsFile, Long.MAX_VALUE,
CompactionType.INNER_SEQ_COMPACTION)) {
+ writer.startChunkGroup(deviceID);
+ writer.markStartingWritingAligned();
+ for (final AlignedChunkMetadata sourceAlignedChunkMetadata :
+ sourceAlignedChunkMetadataList) {
+ final ChunkMetadata timeChunkMetadata =
+ (ChunkMetadata)
sourceAlignedChunkMetadata.getTimeChunkMetadata();
+ writer.writeChunk(reader.readMemChunk(timeChunkMetadata),
timeChunkMetadata);
+ }
+
+ for (int start = 0; start < measurementCount; start += batchSize) {
+ writeValueChunkBatch(
+ reader,
+ writer,
+ sourceAlignedChunkMetadataList,
+ start,
+ Math.min(start + batchSize, measurementCount));
+ }
+ writer.markEndingWritingAligned();
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ }
+ }
+
+ private void writeValueChunkBatch(
+ final TsFileSequenceReader reader,
+ final CompactionTsFileWriter writer,
+ final List<AlignedChunkMetadata> alignedChunkMetadataList,
+ final int start,
+ final int end)
+ throws IOException {
+ for (final AlignedChunkMetadata alignedChunkMetadata :
alignedChunkMetadataList) {
+ final List<IChunkMetadata> valueChunkMetadataList =
+ alignedChunkMetadata.getValueChunkMetadataList();
+ for (int index = start; index < end; ++index) {
+ final ChunkMetadata valueChunkMetadata = (ChunkMetadata)
valueChunkMetadataList.get(index);
+ writer.writeChunk(reader.readMemChunk(valueChunkMetadata),
valueChunkMetadata);
+ }
+ }
+ }
}