This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-5140 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 25dfa92412bbee534abd8b275e230027f9bda505 Author: Liu Xuxin <[email protected]> AuthorDate: Thu Jan 12 17:17:48 2023 +0800 add summary for read chunk performer --- .../java/org/apache/iotdb/RewriteTsFileTool.java | 5 +- .../performer/impl/FastCompactionTaskSummary.java | 24 ++++++++ .../impl/ReadChunkCompactionPerformer.java | 5 +- .../execute/task/CompactionTaskSummary.java | 65 ++++++++++++++++++++++ .../readchunk/AlignedSeriesCompactionExecutor.java | 21 +++++-- .../readchunk/SingleSeriesCompactionExecutor.java | 21 ++++++- .../read/TsFileAlignedSeriesReaderIterator.java | 40 ++++++++++++- 7 files changed, 167 insertions(+), 14 deletions(-) diff --git a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java index 733460d9b5..095780141b 100644 --- a/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java +++ b/rewrite-tsfile-tool/src/main/java/org/apache/iotdb/RewriteTsFileTool.java @@ -851,8 +851,9 @@ public class RewriteTsFileTool { throws IOException, IoTDBConnectionException, StatementExecutionException { while (readerIterator.hasNext()) { Tablet tablet = new Tablet(device, schemaList, MAX_TABLET_LENGTH); - Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = readerIterator.nextReader(); - AlignedChunkReader alignedChunkReader = chunkReaderAndChunkSize.left; + TsFileAlignedSeriesReaderIterator.NextAlignedChunkInfo readerInfo = + readerIterator.nextReader(); + AlignedChunkReader alignedChunkReader = readerInfo.getReader(); while (alignedChunkReader.hasNextSatisfiedPage()) { IBatchDataIterator batchDataIterator = alignedChunkReader.nextPageData().getBatchDataIterator(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/FastCompactionTaskSummary.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/FastCompactionTaskSummary.java new file mode 100644 index 0000000000..f05e89a38c --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/FastCompactionTaskSummary.java @@ -0,0 +1,24 @@ +package org.apache.iotdb.db.engine.compaction.execute.performer.impl; + +import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; + +public class FastCompactionTaskSummary extends CompactionTaskSummary { + public int CHUNK_NONE_OVERLAP; + public int CHUNK_NONE_OVERLAP_BUT_DESERIALIZE; + public int CHUNK_OVERLAP_OR_MODIFIED; + + public int PAGE_NONE_OVERLAP; + public int PAGE_OVERLAP_OR_MODIFIED; + public int PAGE_FAKE_OVERLAP; + public int PAGE_NONE_OVERLAP_BUT_DESERIALIZE; + + public void increase(FastCompactionTaskSummary summary) { + this.CHUNK_NONE_OVERLAP += summary.CHUNK_NONE_OVERLAP; + this.CHUNK_NONE_OVERLAP_BUT_DESERIALIZE += summary.CHUNK_NONE_OVERLAP_BUT_DESERIALIZE; + this.CHUNK_OVERLAP_OR_MODIFIED += summary.CHUNK_OVERLAP_OR_MODIFIED; + this.PAGE_NONE_OVERLAP += summary.PAGE_NONE_OVERLAP; + this.PAGE_OVERLAP_OR_MODIFIED += summary.PAGE_OVERLAP_OR_MODIFIED; + this.PAGE_FAKE_OVERLAP += summary.PAGE_FAKE_OVERLAP; + this.PAGE_NONE_OVERLAP_BUT_DESERIALIZE += summary.PAGE_NONE_OVERLAP_BUT_DESERIALIZE; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index 238224f8b6..e97f2e5fe3 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -134,7 +134,7 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { writer.startChunkGroup(device); AlignedSeriesCompactionExecutor compactionExecutor = new AlignedSeriesCompactionExecutor( - device, targetResource, readerAndChunkMetadataList, writer); + device, targetResource, readerAndChunkMetadataList, writer, summary); compactionExecutor.execute(); writer.endChunkGroup(); } @@ -178,7 +178,8 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList = seriesIterator.getMetadataListForCurrentSeries(); SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries = - new SingleSeriesCompactionExecutor(p, readerAndChunkMetadataList, writer, targetResource); + new SingleSeriesCompactionExecutor( + p, readerAndChunkMetadataList, writer, targetResource, summary); compactionExecutorOfCurrentTimeSeries.execute(); } writer.endChunkGroup(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CompactionTaskSummary.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CompactionTaskSummary.java index 6c0ba15eab..60f13caf72 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CompactionTaskSummary.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/task/CompactionTaskSummary.java @@ -18,11 +18,20 @@ */ package org.apache.iotdb.db.engine.compaction.execute.task; +import java.text.SimpleDateFormat; +import java.util.Date; + /** The summary of one {@link AbstractCompactionTask} execution */ public class CompactionTaskSummary { private long timeCost = 0L; private volatile Status status = Status.NOT_STARTED; private long startTime = -1L; + private int processChunkNum = 0; + private int directlyFlushChunkNum = 0; + private int deserializeChunkCount = 0; + private int deserializePageCount = 0; + private int mergedChunkNum = 0; + private long processPointNum = 0; public CompactionTaskSummary() {} @@ -65,6 +74,46 @@ public class CompactionTaskSummary { return timeCost; } + public void increaseProcessChunkNum(int increment) { + processChunkNum += increment; + } + + public void increaseDirectlyFlushChunkNum(int increment) { + directlyFlushChunkNum += increment; + } + + public void increaseDeserializedChunkNum(int increment) { + deserializeChunkCount += increment; + } + + public void increaseDeserializePageNum(int increment) { + deserializePageCount += increment; + } + + public void increaseProcessPointNum(long increment) { + processPointNum += increment; + } + + public void increaseMergedChunkNum(int increment) { + this.mergedChunkNum += increment; + } + + public void setDirectlyFlushChunkNum(int directlyFlushChunkNum) { + this.directlyFlushChunkNum = directlyFlushChunkNum; + } + + public void setDeserializeChunkCount(int deserializeChunkCount) { + this.deserializeChunkCount = deserializeChunkCount; + } + + public void setDeserializePageCount(int deserializePageCount) { + this.deserializePageCount = deserializePageCount; + } + + public void setProcessPointNum(int processPointNum) { + this.processPointNum = processPointNum; + } + enum Status { NOT_STARTED, STARTED, @@ -72,4 +121,20 @@ public class CompactionTaskSummary { FAILED, CANCELED } + + @Override + public String toString() { + String startTimeInStr = new SimpleDateFormat().format(new Date(startTime)); + return String.format( + "Task start time: %s, time cost: %.2f s, total process chunk num: %d, " + + "directly flush chunk num: %d, deserialize chunk num: %d, deserialize page num: %d," + + " total process point num: %d", + startTimeInStr, + timeCost / 1000.0d, + processChunkNum, + directlyFlushChunkNum, + deserializeChunkCount, + deserializePageCount, + processPointNum); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java index a35163ee7f..cac97d83ef 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.TsFileMetricManager; import org.apache.iotdb.db.engine.cache.ChunkCache; +import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; @@ -62,6 +63,7 @@ public class AlignedSeriesCompactionExecutor { private final AlignedChunkWriterImpl chunkWriter; private final List<IMeasurementSchema> schemaList; private long remainingPointInChunkWriter = 0L; + private final CompactionTaskSummary summary; private final RateLimiter rateLimiter = CompactionTaskManager.getInstance().getMergeWriteRateLimiter(); @@ -74,7 +76,8 @@ public class AlignedSeriesCompactionExecutor { String device, TsFileResource targetResource, LinkedList<Pair<TsFileSequenceReader, List<AlignedChunkMetadata>>> readerAndChunkMetadataList, - TsFileIOWriter writer) + TsFileIOWriter writer, + CompactionTaskSummary summary) throws IOException { this.device = device; this.readerAndChunkMetadataList = readerAndChunkMetadataList; @@ -82,6 +85,7 @@ public class AlignedSeriesCompactionExecutor { this.targetResource = targetResource; schemaList = collectSchemaFromAlignedChunkMetadataList(readerAndChunkMetadataList); chunkWriter = new AlignedChunkWriterImpl(schemaList); + this.summary = summary; } /** @@ -137,9 +141,13 @@ public class AlignedSeriesCompactionExecutor { TsFileAlignedSeriesReaderIterator readerIterator = new TsFileAlignedSeriesReaderIterator(reader, alignedChunkMetadataList, schemaList); while (readerIterator.hasNext()) { - Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = readerIterator.nextReader(); - CompactionMetricsRecorder.recordReadInfo(chunkReaderAndChunkSize.right); - compactOneAlignedChunk(chunkReaderAndChunkSize.left); + TsFileAlignedSeriesReaderIterator.NextAlignedChunkInfo nextAlignedChunkInfo = + readerIterator.nextReader(); + summary.increaseProcessChunkNum(nextAlignedChunkInfo.getNotNullChunkNum()); + summary.increaseProcessPointNum(nextAlignedChunkInfo.getTotalPointNum()); + CompactionMetricsRecorder.recordReadInfo(nextAlignedChunkInfo.getTotalSize()); + compactOneAlignedChunk( + nextAlignedChunkInfo.getReader(), nextAlignedChunkInfo.getNotNullChunkNum()); } } @@ -160,8 +168,11 @@ public class AlignedSeriesCompactionExecutor { .addCompactionTempFileSize(true, true, writer.getPos() - originTempFileSize); } - private void compactOneAlignedChunk(AlignedChunkReader chunkReader) throws IOException { + private void compactOneAlignedChunk(AlignedChunkReader chunkReader, int notNullChunkNum) + throws IOException { while (chunkReader.hasNextSatisfiedPage()) { + // including value chunk and time chunk, thus we should plus one + summary.increaseDeserializePageNum(notNullChunkNum + 1); IBatchDataIterator batchDataIterator = chunkReader.nextPageData().getBatchDataIterator(); while (batchDataIterator.hasNext()) { TsPrimitiveType[] pointsData = (TsPrimitiveType[]) batchDataIterator.currentValue(); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java index e636e30086..4c7ef21289 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.executor.readchunk; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary; import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.engine.compaction.schedule.constant.ProcessChunkType; @@ -63,6 +64,7 @@ public class SingleSeriesCompactionExecutor { private long minStartTimestamp = Long.MAX_VALUE; private long maxEndTimestamp = Long.MIN_VALUE; private long pointCountInChunkWriter = 0; + private final CompactionTaskSummary summary; private final long targetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); @@ -88,13 +90,15 @@ public class SingleSeriesCompactionExecutor { this.cachedChunk = null; this.cachedChunkMetadata = null; this.targetResource = targetResource; + this.summary = new CompactionTaskSummary(); } public SingleSeriesCompactionExecutor( PartialPath series, LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList, TsFileIOWriter fileWriter, - TsFileResource targetResource) { + TsFileResource targetResource, + CompactionTaskSummary summary) { this.device = series.getDevice(); this.series = series; this.readerAndChunkMetadataList = readerAndChunkMetadataList; @@ -104,6 +108,7 @@ public class SingleSeriesCompactionExecutor { this.cachedChunk = null; this.cachedChunkMetadata = null; this.targetResource = targetResource; + this.summary = summary; } /** @@ -111,7 +116,6 @@ public class SingleSeriesCompactionExecutor { * series compaction may contain more than one chunk. */ public void execute() throws IOException { - long originTempFileSize = fileWriter.getPos(); while (readerAndChunkMetadataList.size() > 0) { Pair<TsFileSequenceReader, List<ChunkMetadata>> readerListPair = readerAndChunkMetadataList.removeFirst(); @@ -119,6 +123,8 @@ public class SingleSeriesCompactionExecutor { List<ChunkMetadata> chunkMetadataList = readerListPair.right; for (ChunkMetadata chunkMetadata : chunkMetadataList) { Chunk currentChunk = reader.readMemChunk(chunkMetadata); + summary.increaseProcessChunkNum(1); + summary.increaseProcessPointNum(chunkMetadata.getNumOfPoints()); if (this.chunkWriter == null) { constructChunkWriterFromReadChunk(currentChunk); } @@ -178,6 +184,7 @@ public class SingleSeriesCompactionExecutor { // if there is a cached chunk, deserialize it and write it to ChunkWriter writeCachedChunkIntoChunkWriter(); } + summary.increaseDeserializedChunkNum(1); // write this chunk to ChunkWriter writeChunkIntoChunkWriter(chunk); flushChunkWriterIfLargeEnough(); @@ -187,15 +194,18 @@ public class SingleSeriesCompactionExecutor { if (pointCountInChunkWriter != 0L) { // if there are points remaining in ChunkWriter // deserialize current chunk and write to ChunkWriter, then flush the ChunkWriter + summary.increaseDeserializedChunkNum(1); writeChunkIntoChunkWriter(chunk); flushChunkWriterIfLargeEnough(); } else if (cachedChunk != null) { // if there is a cached chunk, merge it with current chunk, then flush it + summary.increaseMergedChunkNum(1); mergeWithCachedChunk(chunk, chunkMetadata); flushCachedChunkIfLargeEnough(); } else { // there is no points remaining in ChunkWriter and no cached chunk // flush it to file directly + summary.increaseDirectlyFlushChunkNum(1); flushChunkToFileWriter(chunk, chunkMetadata, false); } } @@ -205,15 +215,18 @@ public class SingleSeriesCompactionExecutor { if (pointCountInChunkWriter != 0L) { // if there are points remaining in ChunkWriter // deserialize current chunk and write to ChunkWriter + summary.increaseDeserializedChunkNum(1); writeChunkIntoChunkWriter(chunk); flushChunkWriterIfLargeEnough(); } else if (cachedChunk != null) { // if there is a cached chunk, merge it with current chunk + summary.increaseMergedChunkNum(1); mergeWithCachedChunk(chunk, chunkMetadata); flushCachedChunkIfLargeEnough(); } else { // there is no points remaining in ChunkWriter and no cached chunk // cached current chunk + summary.increaseMergedChunkNum(1); cachedChunk = chunk; cachedChunkMetadata = chunkMetadata; } @@ -227,6 +240,7 @@ public class SingleSeriesCompactionExecutor { // if there is a cached chunk, write the cached chunk to ChunkWriter writeCachedChunkIntoChunkWriter(); } + summary.increaseDeserializedChunkNum(1); writeChunkIntoChunkWriter(chunk); flushChunkWriterIfLargeEnough(); } @@ -235,6 +249,9 @@ public class SingleSeriesCompactionExecutor { private void writeChunkIntoChunkWriter(Chunk chunk) throws IOException { IChunkReader chunkReader = new ChunkReader(chunk, null); while (chunkReader.hasNextSatisfiedPage()) { + if (cachedChunk != chunk) { + summary.increaseDeserializePageNum(1); + } IPointReader batchIterator = chunkReader.nextPageData().getBatchDataIterator(); while (batchIterator.hasNextTimeValuePair()) { TimeValuePair timeValuePair = batchIterator.nextTimeValuePair(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileAlignedSeriesReaderIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileAlignedSeriesReaderIterator.java index 7236d3c999..2113a523fc 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileAlignedSeriesReaderIterator.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileAlignedSeriesReaderIterator.java @@ -24,7 +24,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.reader.chunk.AlignedChunkReader; -import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import java.io.IOException; @@ -56,7 +55,7 @@ public class TsFileAlignedSeriesReaderIterator { return curIdx < alignedChunkMetadataList.size() - 1; } - public Pair<AlignedChunkReader, Long> nextReader() throws IOException { + public NextAlignedChunkInfo nextReader() throws IOException { AlignedChunkMetadata alignedChunkMetadata = alignedChunkMetadataList.get(++curIdx); IChunkMetadata timeChunkMetadata = alignedChunkMetadata.getTimeChunkMetadata(); List<IChunkMetadata> valueChunkMetadataList = alignedChunkMetadata.getValueChunkMetadataList(); @@ -64,6 +63,8 @@ public class TsFileAlignedSeriesReaderIterator { Chunk timeChunk = reader.readMemChunk((ChunkMetadata) timeChunkMetadata); Chunk[] valueChunks = new Chunk[schemaList.size()]; long totalSize = 0; + long totalPointNum = 0; + int notNullChunkNum = 0; for (IChunkMetadata valueChunkMetadata : valueChunkMetadataList) { if (valueChunkMetadata == null) { continue; @@ -75,12 +76,45 @@ public class TsFileAlignedSeriesReaderIterator { } Chunk chunk = reader.readMemChunk((ChunkMetadata) valueChunkMetadata); valueChunks[schemaIdx++] = chunk; + notNullChunkNum++; + totalPointNum += ((ChunkMetadata) valueChunkMetadata).getNumOfPoints(); totalSize += chunk.getHeader().getSerializedSize() + chunk.getHeader().getDataSize(); } AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, Arrays.asList(valueChunks), null); - return new Pair<>(chunkReader, totalSize); + return new NextAlignedChunkInfo(chunkReader, totalSize, notNullChunkNum, totalPointNum); + } + + public class NextAlignedChunkInfo { + private AlignedChunkReader reader; + private long totalSize; + private int notNullChunkNum; + private long totalPointNum; + + public NextAlignedChunkInfo( + AlignedChunkReader reader, long totalSize, int notNullChunkNum, long totalPointNum) { + this.reader = reader; + this.totalSize = totalSize; + this.notNullChunkNum = notNullChunkNum; + this.totalPointNum = totalPointNum; + } + + public AlignedChunkReader getReader() { + return reader; + } + + public long getTotalSize() { + return totalSize; + } + + public long getTotalPointNum() { + return totalPointNum; + } + + public int getNotNullChunkNum() { + return notNullChunkNum; + } } }
