This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch refactor-compaction-metrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/refactor-compaction-metrics by 
this push:
     new 1f9c235f058 edit according to review
1f9c235f058 is described below

commit 1f9c235f058ff03f35acae1d56f33de4e846c647
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Jun 20 15:01:27 2023 +0800

    edit according to review
---
 .../execute/utils/MultiTsFileDeviceIterator.java   | 14 +++++++----
 .../compaction/io/CompactionTsFileReader.java      | 24 +++++++++++++++++++
 .../compaction/io/CompactionTsFileWriter.java      | 10 ++++++++
 .../iotdb/tsfile/read/TsFileDeviceIterator.java    |  3 +--
 .../iotdb/tsfile/read/TsFileSequenceReader.java    | 12 ++++++++++
 .../iotdb/tsfile/write/writer/TsFileIOWriter.java  | 28 +++++++++++++++-------
 6 files changed, 76 insertions(+), 15 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 402a782bc7b..28329e0d5cb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -116,10 +116,16 @@ public class MultiTsFileDeviceIterator implements 
AutoCloseable {
     Collections.sort(
         this.tsFileResourcesSortedByDesc, 
TsFileResource::compareFileCreationOrderByDesc);
     this.readerMap = readerMap;
-    CompactionType type =
-        !seqResources.isEmpty() && !unseqResources.isEmpty()
-            ? CompactionType.CROSS_COMPACTION
-            : CompactionType.INNER_UNSEQ_COMPACTION;
+
+    CompactionType type = null;
+    if (!seqResources.isEmpty() && !unseqResources.isEmpty()) {
+      type = CompactionType.CROSS_COMPACTION;
+    } else if (seqResources.isEmpty()) {
+      type = CompactionType.INNER_UNSEQ_COMPACTION;
+    } else {
+      type = CompactionType.INNER_SEQ_COMPACTION;
+    }
+
     for (TsFileResource tsFileResource : tsFileResourcesSortedByDesc) {
       TsFileSequenceReader reader =
           new CompactionTsFileReader(tsFileResource.getTsFilePath(), type);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
index 4fcaf6e3948..374cca8f219 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
@@ -24,13 +24,16 @@ import 
org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
 import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
+import org.apache.iotdb.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -116,4 +119,25 @@ public class CompactionTsFileReader extends 
TsFileSequenceReader {
         .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
     return chunkMetadataList;
   }
+
+  @Override
+  public void getDevicesAndEntriesOfOneLeafNode(
+      Long startOffset, Long endOffset, Queue<Pair<String, long[]>> 
measurementNodeOffsetQueue)
+      throws IOException {
+    long before = readDataSize.get();
+    super.getDevicesAndEntriesOfOneLeafNode(startOffset, endOffset, 
measurementNodeOffsetQueue);
+    long dataSize = readDataSize.get() - before;
+    CompactionMetrics.getInstance()
+        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
+  }
+
+  @Override
+  public MetadataIndexNode readMetadataIndexNode(long start, long end) throws 
IOException {
+    long before = readDataSize.get();
+    MetadataIndexNode metadataIndexNode = super.readMetadataIndexNode(start, 
end);
+    long dataSize = readDataSize.get() - before;
+    CompactionMetrics.getInstance()
+        .recordReadInfo(compactionType, CompactionIoDataType.METADATA, 
dataSize);
+    return metadataIndexNode;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
index 2cc672e64f8..4dac7843aae 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
@@ -106,6 +106,16 @@ public class CompactionTsFileWriter extends TsFileIOWriter 
{
     }
   }
 
+  @Override
+  public int checkMetadataSizeAndMayFlush() throws IOException {
+    int size = super.checkMetadataSizeAndMayFlush();
+    if (size > 0) {
+      
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire(size);
+    }
+    CompactionMetrics.getInstance().recordWriteInfo(type, 
CompactionIoDataType.METADATA, size);
+    return size;
+  }
+
   @Override
   public void endFile() throws IOException {
     long beforeSize = this.getPos();
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
index 76609747c37..075c8da0eae 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileDeviceIterator.java
@@ -82,8 +82,7 @@ public class TsFileDeviceIterator implements 
Iterator<Pair<String, Boolean>> {
     try {
       // get the first measurement node of this device, to know if the device 
is aligned
       this.measurementNode =
-          MetadataIndexNode.deserializeFrom(
-              reader.readData(startEndPair.right[0], startEndPair.right[1]));
+          reader.readMetadataIndexNode(startEndPair.right[0], 
startEndPair.right[1]);
       boolean isAligned = reader.isAlignedDevice(measurementNode);
       currentDevice = new Pair<>(startEndPair.left, isAligned);
       return currentDevice;
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 6c29e7c4c44..1a118296b0f 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -2232,6 +2232,18 @@ public class TsFileSequenceReader implements 
AutoCloseable {
     }
   }
 
+  /**
+   * Read MetadataIndexNode by start and end offset
+   *
+   * @param start the start offset of the MetadataIndexNode
+   * @param end the end offset of the MetadataIndexNode
+   * @return MetadataIndexNode
+   * @throws IOException IOException
+   */
+  public MetadataIndexNode readMetadataIndexNode(long start, long end) throws 
IOException {
+    return MetadataIndexNode.deserializeFrom(readData(start, end));
+  }
+
   @Override
   public int hashCode() {
     return file.hashCode();
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index e62aeb40c7e..ab4ea6198f9 100644
--- 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -631,7 +631,7 @@ public class TsFileIOWriter implements AutoCloseable {
    *
    * @throws IOException
    */
-  public void checkMetadataSizeAndMayFlush() throws IOException {
+  public int checkMetadataSizeAndMayFlush() throws IOException {
     // This function should be called after all data of an aligned device has 
been written
     if (enableMemoryControl && currentChunkMetadataSize > maxMetadataSize) {
       try {
@@ -642,11 +642,13 @@ public class TsFileIOWriter implements AutoCloseable {
               chunkMetadataCount,
               currentChunkMetadataSize / chunkMetadataCount);
         }
-        sortAndFlushChunkMetadata();
+        return sortAndFlushChunkMetadata();
       } catch (IOException e) {
         logger.error("Meets exception when flushing metadata to temp file for 
{}", file, e);
         throw e;
       }
+    } else {
+      return 0;
     }
   }
 
@@ -656,7 +658,8 @@ public class TsFileIOWriter implements AutoCloseable {
    *
    * @throws IOException
    */
-  protected void sortAndFlushChunkMetadata() throws IOException {
+  protected int sortAndFlushChunkMetadata() throws IOException {
+    int writtenSize = 0;
     // group by series
     List<Pair<Path, List<IChunkMetadata>>> sortedChunkMetadataList =
         TSMIterator.sortChunkMetadata(
@@ -673,7 +676,7 @@ public class TsFileIOWriter implements AutoCloseable {
         pathCount++;
       }
       List<IChunkMetadata> iChunkMetadataList = pair.right;
-      writeChunkMetadataToTempFile(iChunkMetadataList, seriesPath, isNewPath);
+      writtenSize += writeChunkMetadataToTempFile(iChunkMetadataList, 
seriesPath, isNewPath);
       lastSerializePath = seriesPath;
       logger.debug("Flushing {}", seriesPath);
     }
@@ -684,11 +687,13 @@ public class TsFileIOWriter implements AutoCloseable {
     }
     chunkMetadataCount = 0;
     currentChunkMetadataSize = 0;
+    return writtenSize;
   }
 
-  private void writeChunkMetadataToTempFile(
+  private int writeChunkMetadataToTempFile(
       List<IChunkMetadata> iChunkMetadataList, Path seriesPath, boolean 
isNewPath)
       throws IOException {
+    int writtenSize = 0;
     // [DeviceId] measurementId datatype size chunkMetadataBuffer
     if (lastSerializePath == null
         || !seriesPath.getDevice().equals(lastSerializePath.getDevice())) {
@@ -696,20 +701,25 @@ public class TsFileIOWriter implements AutoCloseable {
       endPosInCMTForDevice.add(tempOutput.getPosition());
       // serialize the device
       // for each device, we only serialize it once, in order to save io
-      ReadWriteIOUtils.write(seriesPath.getDevice(), 
tempOutput.wrapAsStream());
+      writtenSize += ReadWriteIOUtils.write(seriesPath.getDevice(), 
tempOutput.wrapAsStream());
     }
     if (isNewPath && iChunkMetadataList.size() > 0) {
       // serialize the public info of this measurement
-      ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), 
tempOutput.wrapAsStream());
-      ReadWriteIOUtils.write(iChunkMetadataList.get(0).getDataType(), 
tempOutput.wrapAsStream());
+      writtenSize +=
+          ReadWriteIOUtils.writeVar(seriesPath.getMeasurement(), 
tempOutput.wrapAsStream());
+      writtenSize +=
+          ReadWriteIOUtils.write(
+              iChunkMetadataList.get(0).getDataType(), 
tempOutput.wrapAsStream());
     }
     PublicBAOS buffer = new PublicBAOS();
     int totalSize = 0;
     for (IChunkMetadata chunkMetadata : iChunkMetadataList) {
       totalSize += chunkMetadata.serializeTo(buffer, true);
     }
-    ReadWriteIOUtils.write(totalSize, tempOutput.wrapAsStream());
+    writtenSize += ReadWriteIOUtils.write(totalSize, 
tempOutput.wrapAsStream());
     buffer.writeTo(tempOutput);
+    writtenSize += buffer.size();
+    return writtenSize;
   }
 
   public String getCurrentChunkGroupDeviceId() {

Reply via email to