This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch refactor-compaction-metrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/refactor-compaction-metrics by
this push:
new 1f9c235f058 edit according to review
1f9c235f058 is described below
commit 1f9c235f058ff03f35acae1d56f33de4e846c647
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Jun 20 15:01:27 2023 +0800
edit according to review
---
.../execute/utils/MultiTsFileDeviceIterator.java | 14 +++++++----
.../compaction/io/CompactionTsFileReader.java | 24 +++++++++++++++++++
.../compaction/io/CompactionTsFileWriter.java | 10 ++++++++
.../iotdb/tsfile/read/TsFileDeviceIterator.java | 3 +--
.../iotdb/tsfile/read/TsFileSequenceReader.java | 12 ++++++++++
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 28 +++++++++++++++-------
6 files changed, 76 insertions(+), 15 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 402a782bc7b..28329e0d5cb 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -116,10 +116,16 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
Collections.sort(
this.tsFileResourcesSortedByDesc,
TsFileResource::compareFileCreationOrderByDesc);
this.readerMap = readerMap;
- CompactionType type =
- !seqResources.isEmpty() && !unseqResources.isEmpty()
- ? CompactionType.CROSS_COMPACTION
- : CompactionType.INNER_UNSEQ_COMPACTION;
+
+ CompactionType type = null;
+ if (!seqResources.isEmpty() && !unseqResources.isEmpty()) {
+ type = CompactionType.CROSS_COMPACTION;
+ } else if (seqResources.isEmpty()) {
+ type = CompactionType.INNER_UNSEQ_COMPACTION;
+ } else {
+ type = CompactionType.INNER_SEQ_COMPACTION;
+ }
+
for (TsFileResource tsFileResource : tsFileResourcesSortedByDesc) {
TsFileSequenceReader reader =
new CompactionTsFileReader(tsFileResource.getTsFilePath(), type);
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
index 4fcaf6e3948..374cca8f219 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
@@ -24,13 +24,16 @@ import
org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -116,4 +119,25 @@ public class CompactionTsFileReader extends
TsFileSequenceReader {
.recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
return chunkMetadataList;
}
+
+ @Override
+ public void getDevicesAndEntriesOfOneLeafNode(
+ Long startOffset, Long endOffset, Queue<Pair<String, long[]>>
measurementNodeOffsetQueue)
+ throws IOException {
+ long before = readDataSize.get();
+ super.getDevicesAndEntriesOfOneLeafNode(startOffset, endOffset,
measurementNodeOffsetQueue);
+ long dataSize = readDataSize.get() - before;
+ CompactionMetrics.getInstance()
+ .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
+ }
+
+ @Override
+ public MetadataIndexNode readMetadataIndexNode(long start, long end) throws
IOException {
+ long before = readDataSize.get();
+ MetadataIndexNode metadataIndexNode = super.readMetadataIndexNode(start,
end);
+ long dataSize = readDataSize.get() - before;
+ CompactionMetrics.getInstance()
+ .recordReadInfo(compactionType, CompactionIoDataType.METADATA,
dataSize);
+ return metadataIndexNode;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
index 2cc672e64f8..4dac7843aae 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
@@ -106,6 +106,16 @@ public class CompactionTsFileWriter extends TsFileIOWriter
{
}
}
+ @Override
+ public int checkMetadataSizeAndMayFlush() throws IOException {
+ int size = super.checkMetadataSizeAndMayFlush();
+ if (size > 0) {
+
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire(size);
+ }
+ CompactionMetrics.getInstance().recordWriteInfo(type,
CompactionIoDataType.METADATA, size);
+ return size;
+ }
+
@Override
public void endFile() throws IOException {
long beforeSize = this.getPos();
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
index 76609747c37..075c8da0eae 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
@@ -82,8 +82,7 @@ public class TsFileDeviceIterator implements
Iterator<Pair<String, Boolean>> {
try {
// get the first measurement node of this device, to know if the device
is aligned
this.measurementNode =
- MetadataIndexNode.deserializeFrom(
- reader.readData(startEndPair.right[0], startEndPair.right[1]));
+ reader.readMetadataIndexNode(startEndPair.right[0],
startEndPair.right[1]);
boolean isAligned = reader.isAlignedDevice(measurementNode);
currentDevice = new Pair<>(startEndPair.left, isAligned);
return currentDevice;
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 6c29e7c4c44..1a118296b0f 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -2232,6 +2232,18 @@ public class TsFileSequenceReader implements
AutoCloseable {
}
}
+ /**
+ * Read MetadataIndexNode by start and end offset
+ *
+ * @param start the start offset of the MetadataIndexNode
+ * @param end the end offset of the MetadataIndexNode
+ * @return MetadataIndexNode
+ * @throws IOException IOException
+ */
+ public MetadataIndexNode readMetadataIndexNode(long start, long end) throws
IOException {
+ return MetadataIndexNode.deserializeFrom(readData(start, end));
+ }
+
@Override
public int hashCode() {
return file.hashCode();
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index e62aeb40c7e..ab4ea6198f9 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -631,7 +631,7 @@ public class TsFileIOWriter implements AutoCloseable {
*
* @throws IOException
*/
- public void checkMetadataSizeAndMayFlush() throws IOException {
+ public int checkMetadataSizeAndMayFlush() throws IOException {
// This function should be called after all data of an aligned device has
been written
if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
try {
@@ -642,11 +642,13 @@ public class TsFileIOWriter implements AutoCloseable {
chunkMetadataCount,
currentChunkMetadataSize / chunkMetadataCount);
}
- sortAndFlushChunkMetadata();
+ return sortAndFlushChunkMetadata();
} catch (IOException e) {
logger.error("Meets exception when flushing metadata to temp file for
{}", file, e);
throw e;
}
+ } else {
+ return 0;
}
}
@@ -656,7 +658,8 @@ public class TsFileIOWriter implements AutoCloseable {
*
* @throws IOException
*/
- protected void sortAndFlushChunkMetadata() throws IOException {
+ protected int sortAndFlushChunkMetadata() throws IOException {
+ int writtenSize = 0;
// group by series
List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
TSMIterator.sortChunkMetadata(
@@ -673,7 +676,7 @@ public class TsFileIOWriter implements AutoCloseable {
pathCount++;
}
List<IChunkMetadata> iChunkMetadataList = pair.right;
- writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath);
+ writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList,
seriesPath, isNewPath);
lastSerializePath = seriesPath;
logger.debug("Flushing {}", seriesPath);
}
@@ -684,11 +687,13 @@ public class TsFileIOWriter implements AutoCloseable {
}
chunkMetadataCount = 0;
currentChunkMetadataSize = 0;
+ return writtenSize;
}
- private void writeChunkMetadataToTempFile(
+ private int writeChunkMetadataToTempFile(
List<IChunkMetadata> iChunkMetadataList, Path seriesPath, boolean
isNewPath)
throws IOException {
+ int writtenSize = 0;
// [DeviceId] measurementId datatype size chunkMetadataBuffer
if (lastSerializePath == null
|| !seriesPath.getDevice().equals(lastSerializePath.getDevice())) {
@@ -696,20 +701,25 @@ public class TsFileIOWriter implements AutoCloseable {
endPosInCMTForDevice.add(tempOutput.getPosition());
// serialize the device
// for each device, we only serialize it once, in order to save io
- ReadWriteIOUtils.write(seriesPath.getDevice(),
tempOutput.wrapAsStream());
+ writtenSize += ReadWriteIOUtils.write(seriesPath.getDevice(),
tempOutput.wrapAsStream());
}
if (isNewPath && iChunkMetadataList.size() > 0) {
// serialize the public info of this measurement
- ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(),
tempOutput.wrapAsStream());
- ReadWriteIOUtils.write(iChunkMetadataList.get(0).getDataType(),
tempOutput.wrapAsStream());
+ writtenSize +=
+ ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(),
tempOutput.wrapAsStream());
+ writtenSize +=
+ ReadWriteIOUtils.write(
+ iChunkMetadataList.get(0).getDataType(),
tempOutput.wrapAsStream());
}
PublicBAOS buffer = new PublicBAOS();
int totalSize = 0;
for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
totalSize += chunkMetadata.serializeTo(buffer, true);
}
- ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream());
+ writtenSize += ReadWriteIOUtils.write(totalSize,
tempOutput.wrapAsStream());
buffer.writeTo(tempOutput);
+ writtenSize += buffer.size();
+ return writtenSize;
}
public String getCurrentChunkGroupDeviceId() {