This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch refactor-compaction-metrics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7fc3df42f526b807011985462156e9a56a94f429
Author: Liu Xuxin <[email protected]>
AuthorDate: Fri Jun 9 19:07:27 2023 +0800

    complete
---
 .../readchunk/AlignedSeriesCompactionExecutor.java | 11 ++++----
 .../readchunk/SingleSeriesCompactionExecutor.java  |  6 -----
 .../utils/writer/AbstractCompactionWriter.java     | 26 +++----------------
 .../compaction/io/CompactionTsFileReader.java      | 27 +++++++++++--------
 .../compaction/io/CompactionTsFileWriter.java      | 30 +++++++++++++++++++---
 .../compaction/schedule/CompactionTaskManager.java | 10 --------
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  5 ++++
 7 files changed, 57 insertions(+), 58 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
index 00f12a2c8d9..616dbf1013e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/AlignedSeriesCompactionExecutor.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileReader;
 import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
-import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
@@ -39,8 +38,6 @@ import 
org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import com.google.common.util.concurrent.RateLimiter;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -60,8 +57,6 @@ public class AlignedSeriesCompactionExecutor {
   private final List<IMeasurementSchema> schemaList;
   private long remainingPointInChunkWriter = 0L;
   private final CompactionTaskSummary summary;
-  private final RateLimiter rateLimiter =
-      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
 
   private final long chunkSizeThreshold =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -99,6 +94,9 @@ public class AlignedSeriesCompactionExecutor {
     for (Pair<TsFileSequenceReader, List<AlignedChunkMetadata>> readerListPair 
:
         readerAndChunkMetadataList) {
       TsFileSequenceReader reader = readerListPair.left;
+      if (reader instanceof CompactionTsFileReader) {
+        ((CompactionTsFileReader) reader).markStartOfAlignedSeries();
+      }
       List<AlignedChunkMetadata> alignedChunkMetadataList = 
readerListPair.right;
       for (AlignedChunkMetadata alignedChunkMetadata : 
alignedChunkMetadataList) {
         List<IChunkMetadata> valueChunkMetadataList =
@@ -121,6 +119,9 @@ public class AlignedSeriesCompactionExecutor {
                   header.getCompressionType()));
         }
       }
+      if (reader instanceof CompactionTsFileReader) {
+        ((CompactionTsFileReader) reader).markEndOfAlignedSeries();
+      }
     }
     List<IMeasurementSchema> schemaList = new ArrayList<>(schemaSet);
     
schemaList.sort(Comparator.comparing(IMeasurementSchema::getMeasurementId));
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
index 689f13aac20..89cff34c54e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/executor/readchunk/SingleSeriesCompactionExecutor.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.engine.compaction.execute.task.CompactionTaskSummary;
 import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
-import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.tsfile.file.header.ChunkHeader;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
@@ -38,8 +37,6 @@ import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import com.google.common.util.concurrent.RateLimiter;
-
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
@@ -56,8 +53,6 @@ public class SingleSeriesCompactionExecutor {
   private ChunkWriterImpl chunkWriter;
   private Chunk cachedChunk;
   private ChunkMetadata cachedChunkMetadata;
-  private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
   // record the min time and max time to update the target resource
   private long minStartTimestamp = Long.MAX_VALUE;
   private long maxEndTimestamp = Long.MIN_VALUE;
@@ -322,7 +317,6 @@ public class SingleSeriesCompactionExecutor {
 
   private void flushChunkToFileWriter(
       Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws 
IOException {
-    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 
getChunkSize(chunk));
     if (chunkMetadata.getStartTime() < minStartTimestamp) {
       minStartTimestamp = chunkMetadata.getStartTime();
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 75e3c76b9af..afcb162311e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -20,10 +20,6 @@ package 
org.apache.iotdb.db.engine.compaction.execute.utils.writer;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.io.CompactionTsFileWriter;
-import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
-import 
org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionIoDataType;
-import org.apache.iotdb.db.engine.compaction.schedule.constant.CompactionType;
-import org.apache.iotdb.db.service.metrics.CompactionMetrics;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.write.PageException;
 import org.apache.iotdb.tsfile.file.header.PageHeader;
@@ -41,8 +37,6 @@ import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.chunk.ValueChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 
-import com.google.common.util.concurrent.RateLimiter;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -50,9 +44,6 @@ import java.util.List;
 public abstract class AbstractCompactionWriter implements AutoCloseable {
   protected int subTaskNum = 
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
 
-  private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
-
   // check if there is unseq error point during writing
   protected long[] lastTime = new long[subTaskNum];
 
@@ -169,8 +160,6 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
   protected void sealChunk(
       CompactionTsFileWriter targetWriter, IChunkWriter iChunkWriter, int 
subTaskId)
       throws IOException {
-    CompactionTaskManager.mergeRateLimiterAcquire(
-        compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize());
     synchronized (targetWriter) {
       targetWriter.writeChunk(iChunkWriter);
     }
@@ -191,7 +180,6 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
   protected void flushNonAlignedChunkToFileWriter(
       CompactionTsFileWriter targetWriter, Chunk chunk, ChunkMetadata 
chunkMetadata, int subTaskId)
       throws IOException {
-    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 
getChunkSize(chunk));
     synchronized (targetWriter) {
       // seal last chunk to file writer
       targetWriter.writeChunk(chunkWriters[subTaskId]);
@@ -214,8 +202,9 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
       targetWriter.writeChunk(alignedChunkWriter);
       chunkPointNumArray[subTaskId] = 0;
 
+      targetWriter.markStartingWritingAligned();
+
       // flush time chunk
-      CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 
getChunkSize(timeChunk));
       targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata);
 
       // flush value chunks
@@ -232,10 +221,10 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
               Statistics.getStatsByType(valueChunkWriter.getDataType()));
           continue;
         }
-        CompactionTaskManager.mergeRateLimiterAcquire(
-            compactionRateLimiter, getChunkSize(valueChunk));
         targetWriter.writeChunk(valueChunk, (ChunkMetadata) 
valueChunkMetadatas.get(i));
       }
+
+      targetWriter.markEndingWritingAligned();
     }
   }
 
@@ -304,13 +293,6 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
       if (iChunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, 
targetChunkPointNum, false)) {
         sealChunk(fileWriter, iChunkWriter, subTaskId);
         lastCheckIndex = 0;
-        CompactionMetrics.getInstance()
-            .recordWriteInfo(
-                isCrossSpace
-                    ? CompactionType.CROSS_COMPACTION
-                    : CompactionType.INNER_UNSEQ_COMPACTION,
-                isAlign ? CompactionIoDataType.ALIGNED : 
CompactionIoDataType.NOT_ALIGNED,
-                iChunkWriter.estimateMaxSeriesMemSize());
       }
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
index b9b10791de7..efc0e3635c8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileReader.java
@@ -39,13 +39,13 @@ import java.util.List;
  */
 public class CompactionTsFileReader extends TsFileSequenceReader {
   /** Tracks the total amount of data (in bytes) that has been read. */
-  long readDataSize = 0L;
+  private volatile long readDataSize = 0L;
 
   /** The type of compaction running. */
   CompactionType compactionType;
 
   /** A flag that indicates if an aligned series is being read. */
-  boolean readingAlignedSeries = false;
+  private volatile boolean readingAlignedSeries = false;
 
   /**
    * Constructs a new instance of CompactionTsFileReader.
@@ -78,15 +78,20 @@ public class CompactionTsFileReader extends 
TsFileSequenceReader {
 
   @Override
   public Chunk readMemChunk(ChunkMetadata metaData) throws IOException {
-    long before = readDataSize;
-    Chunk chunk = super.readMemChunk(metaData);
-    long dataSize = readDataSize - before;
-    CompactionMetrics.getInstance()
-        .recordReadInfo(
-            compactionType,
-            readingAlignedSeries ? CompactionIoDataType.ALIGNED : 
CompactionIoDataType.NOT_ALIGNED,
-            dataSize);
-    return chunk;
+    synchronized (this) {
+      // using synchronized to avoid concurrent read that makes readDataSize 
not correct
+      long before = readDataSize;
+      Chunk chunk = super.readMemChunk(metaData);
+      long dataSize = readDataSize - before;
+      CompactionMetrics.getInstance()
+          .recordReadInfo(
+              compactionType,
+              readingAlignedSeries
+                  ? CompactionIoDataType.ALIGNED
+                  : CompactionIoDataType.NOT_ALIGNED,
+              dataSize);
+      return chunk;
+    }
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
index f86f7a4391f..db65178fae8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/io/CompactionTsFileWriter.java
@@ -40,6 +40,8 @@ import java.io.Serializable;
 public class CompactionTsFileWriter extends TsFileIOWriter {
   CompactionType type;
 
+  private volatile boolean isWritingAligned = false;
+
   public CompactionTsFileWriter(
       File file, boolean enableMemoryControl, long maxMetadataSize, 
CompactionType type)
       throws IOException {
@@ -50,13 +52,16 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
   @Override
   public void writeChunk(Chunk chunk, ChunkMetadata chunkMetadata) throws 
IOException {
     long beforeOffset = this.getPos();
-    CompactionTaskManager.getInstance()
-        .getMergeWriteRateLimiter()
-        .acquire(chunk.getHeader().getDataSize() + 
chunk.getHeader().getSerializedSize());
     super.writeChunk(chunk, chunkMetadata);
     long writtenDataSize = this.getPos() - beforeOffset;
+    if (writtenDataSize > 0) {
+      
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) 
writtenDataSize);
+    }
     CompactionMetrics.getInstance()
-        .recordWriteInfo(type, CompactionIoDataType.NOT_ALIGNED, 
writtenDataSize);
+        .recordWriteInfo(
+            type,
+            isWritingAligned ? CompactionIoDataType.ALIGNED : 
CompactionIoDataType.NOT_ALIGNED,
+            writtenDataSize);
   }
 
   @Override
@@ -73,6 +78,9 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
     long writtenDataSize = this.getPos() - beforeOffset;
     CompactionMetrics.getInstance()
         .recordWriteInfo(type, CompactionIoDataType.ALIGNED, writtenDataSize);
+    if (writtenDataSize > 0) {
+      
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) 
writtenDataSize);
+    }
   }
 
   public void writeChunk(IChunkWriter chunkWriter) throws IOException {
@@ -80,6 +88,9 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
     long beforeOffset = this.getPos();
     chunkWriter.writeToFileWriter(this);
     long writtenDataSize = this.getPos() - beforeOffset;
+    if (writtenDataSize > 0) {
+      
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) 
writtenDataSize);
+    }
     CompactionMetrics.getInstance()
         .recordWriteInfo(
             type,
@@ -92,7 +103,18 @@ public class CompactionTsFileWriter extends TsFileIOWriter {
     long beforeSize = this.getPos();
     super.endFile();
     long writtenDataSize = this.getPos() - beforeSize;
+    if (writtenDataSize > 0) {
+      
CompactionTaskManager.getInstance().getMergeWriteRateLimiter().acquire((int) 
writtenDataSize);
+    }
     CompactionMetrics.getInstance()
         .recordWriteInfo(type, CompactionIoDataType.METADATA, writtenDataSize);
   }
+
+  public void markStartingWritingAligned() {
+    isWritingAligned = true;
+  }
+
+  public void markEndingWritingAligned() {
+    isWritingAligned = false;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
index 6dd13065941..baad46f2a0d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/schedule/CompactionTaskManager.java
@@ -252,16 +252,6 @@ public class CompactionTaskManager implements IService {
       mergeWriteRateLimiter.setRate(throughout);
     }
   }
-  /** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */
-  public static void mergeRateLimiterAcquire(RateLimiter limiter, long 
bytesLength) {
-    while (bytesLength >= Integer.MAX_VALUE) {
-      limiter.acquire(Integer.MAX_VALUE);
-      bytesLength -= Integer.MAX_VALUE;
-    }
-    if (bytesLength > 0) {
-      limiter.acquire((int) bytesLength);
-    }
-  }
 
   public synchronized void removeRunningTaskFuture(AbstractCompactionTask 
task) {
     String regionWithSG = getSGWithRegionId(task.getStorageGroupName(), 
task.getDataRegionId());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index f4de1add68a..b7a8aa7a958 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1555,6 +1555,11 @@ public class DataRegion implements IDataRegionForQuery {
       tsFileResourceList.addAll(tsFileManager.getTsFileList(false));
       tsFileResourceList.forEach(
           x -> {
+            FileMetrics.getInstance()
+                .deleteFile(
+                    new long[] {x.getTsFileSize()},
+                    x.isSeq(),
+                    Collections.singletonList(x.getTsFile().getName()));
             if (x.getModFile().exists()) {
               FileMetrics.getInstance().decreaseModFileNum(1);
               
FileMetrics.getInstance().decreaseModFileSize(x.getModFile().getSize());

Reply via email to