This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch refactor-compaction-metrics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 72472a1df8a81ab797e2606f83e26fe33d9165f4 Author: Liu Xuxin <[email protected]> AuthorDate: Fri Jun 9 10:41:04 2023 +0800 finish refactor read statistic --- .../fast/AlignedSeriesCompactionExecutor.java | 21 +++++++++++++ .../readchunk/AlignedSeriesCompactionExecutor.java | 4 +-- .../compaction/io/CompactionTsFileReader.java | 36 ++++++++++++++++++++-- 3 files changed, 57 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java index cd7adbb1adc..4d68016a978 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/fast/AlignedSeriesCompactionExecutor.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element import org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element.FileElement; import org.apache.iotdb.db.engine.compaction.execute.utils.executor.fast.element.PageElement; import org.apache.iotdb.db.engine.compaction.execute.utils.writer.AbstractCompactionWriter; +import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.WriteProcessException; @@ -88,6 +89,8 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor { @Override protected void compactFiles() throws PageException, IOException, WriteProcessException, IllegalPathException { + markStartOfAlignedSeries(); + while (!fileList.isEmpty()) { List<FileElement> overlappedFiles = findOverlapFiles(fileList.get(0)); @@ -96,6 +99,24 @@ public class AlignedSeriesCompactionExecutor extends SeriesCompactionExecutor { compactChunks(); } + + markEndOfAlignedSeries(); + } + + private void markStartOfAlignedSeries() { + for (TsFileSequenceReader reader : readerCacheMap.values()) { + if (reader instanceof CompactionTsFileReader) { + ((CompactionTsFileReader) reader).markStartOfAlignedSeries(); + } + } + } + + private void markEndOfAlignedSeries() { + for (TsFileSequenceReader reader : readerCacheMap.values()) { + if (reader instanceof CompactionTsFileReader) { + ((CompactionTsFileReader) reader).markEndOfAlignedSeries(); + } + } } /** Deserialize files into chunk metadatas and put them into the chunk metadata queue. */ diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java index 849c3ef5742..00f12a2c8d9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java @@ -135,7 +135,7 @@ public class AlignedSeriesCompactionExecutor { List<AlignedChunkMetadata> alignedChunkMetadataList = readerListPair.right; if (reader instanceof CompactionTsFileReader) { - ((CompactionTsFileReader) reader).markStartAlignedSeries(); + ((CompactionTsFileReader) reader).markStartOfAlignedSeries(); } TsFileAlignedSeriesReaderIterator readerIterator = @@ -149,7 +149,7 @@ public class AlignedSeriesCompactionExecutor { nextAlignedChunkInfo.getReader(), nextAlignedChunkInfo.getNotNullChunkNum()); } if (reader instanceof CompactionTsFileReader) { - ((CompactionTsFileReader) reader).markEndAlignedSeries(); + ((CompactionTsFileReader) reader).markEndOfAlignedSeries(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java index 91a9c6d2108..b9b10791de7 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java @@ -23,19 +23,37 @@ import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataT import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.read.TsFileDeviceIterator; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Chunk; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; +/** + * This class extends the TsFileSequenceReader class to read and manage TsFile with a focus on + * compaction-related operations. This includes functions for tracking and recording the amount of + * data read and distinguishing between aligned and not aligned series during compaction. + */ public class CompactionTsFileReader extends TsFileSequenceReader { + /** Tracks the total amount of data (in bytes) that has been read. */ long readDataSize = 0L; + + /** The type of compaction running. */ CompactionType compactionType; + /** A flag that indicates if an aligned series is being read. */ boolean readingAlignedSeries = false; + /** + * Constructs a new instance of CompactionTsFileReader. + * + * @param file The file to be read. + * @param compactionType The type of compaction running. + * @throws IOException If an error occurs during file operations. + */ public CompactionTsFileReader(String file, CompactionType compactionType) throws IOException { super(file); this.compactionType = compactionType; @@ -48,11 +66,13 @@ public class CompactionTsFileReader extends TsFileSequenceReader { return buffer; } - public void markStartAlignedSeries() { + /** Marks the start of reading an aligned series. */ + public void markStartOfAlignedSeries() { readingAlignedSeries = true; } - public void markEndAlignedSeries() { + /** Marks the end of reading an aligned series. */ + public void markEndOfAlignedSeries() { readingAlignedSeries = false; } @@ -78,4 +98,16 @@ public class CompactionTsFileReader extends TsFileSequenceReader { .recordReadInfo(compactionType, CompactionIoDataType.METADATA, dataSize); return iterator; } + + @Override + public List<IChunkMetadata> getChunkMetadataListByTimeseriesMetadataOffset( + long startOffset, long endOffset) throws IOException { + long before = readDataSize; + List<IChunkMetadata> chunkMetadataList = + super.getChunkMetadataListByTimeseriesMetadataOffset(startOffset, endOffset); + long dataSize = readDataSize - before; + CompactionMetrics.getInstance() + .recordReadInfo(compactionType, CompactionIoDataType.METADATA, dataSize); + return chunkMetadataList; + } }
