This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch DEBUG-WRITE-FLUCTUATION
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/DEBUG-WRITE-FLUCTUATION by 
this push:
     new c4bf152c14 add io rate limit for compaction
c4bf152c14 is described below

commit c4bf152c1467443f6ecc65f74ccfc238b3ec3b8b
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Dec 6 16:32:46 2022 +0800

    add io rate limit for compaction
---
 .../src/assembly/resources/conf/iotdb-common.properties     |  2 +-
 .../src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 ++++++------
 .../main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java |  9 ++++-----
 .../iotdb/db/engine/compaction/CompactionTaskManager.java   | 13 ++++++-------
 .../inner/utils/AlignedSeriesCompactionExecutor.java        | 11 ++++++-----
 .../inner/utils/SingleSeriesCompactionExecutor.java         |  8 ++++----
 .../engine/compaction/writer/AbstractCompactionWriter.java  | 12 +++++-------
 .../engine/compaction/utils/CompactionConfigRestorer.java   |  2 +-
 8 files changed, 33 insertions(+), 36 deletions(-)

diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index d932a8e9f3..3b07e81b5c 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -621,7 +621,7 @@
 
 # The limit of write throughput merge can reach per second
 # Datatype: int
-# compaction_write_throughput_mb_per_sec=16
+# compaction_io_per_sec=16
 
 # The number of sub compaction threads to be set up to perform compaction.
 # Currently only works for nonAligned data in cross space compaction and unseq 
inner space compaction.
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 99dcf1b865..0c2d469817 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -658,8 +658,8 @@ public class IoTDBConfig {
    */
   private long mergeIntervalSec = 0L;
 
-  /** The limit of compaction merge can reach per second */
-  private int compactionWriteThroughputMbPerSec = 16;
+  /** The limit of compaction IO times per second */
+  private int compactionIOPerSec = 16;
 
   /**
    * How many thread will be set up to perform compaction, 10 by default. Set 
to 1 when less than or
@@ -1913,12 +1913,12 @@ public class IoTDBConfig {
     this.intoOperationExecutionThreadCount = intoOperationExecutionThreadCount;
   }
 
-  public int getCompactionWriteThroughputMbPerSec() {
-    return compactionWriteThroughputMbPerSec;
+  public int getCompactionIOPerSec() {
+    return compactionIOPerSec;
   }
 
-  public void setCompactionWriteThroughputMbPerSec(int 
compactionWriteThroughputMbPerSec) {
-    this.compactionWriteThroughputMbPerSec = compactionWriteThroughputMbPerSec;
+  public void setCompactionIOPerSec(int compactionIOPerSec) {
+    this.compactionIOPerSec = compactionIOPerSec;
   }
 
   public boolean isEnableMemControl() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 57a0aee054..731713ee07 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -667,11 +667,10 @@ public class IoTDBDescriptor {
                 "max_cross_compaction_candidate_file_size",
                 
Long.toString(conf.getMaxCrossCompactionCandidateFileSize()))));
 
-    conf.setCompactionWriteThroughputMbPerSec(
+    conf.setCompactionIOPerSec(
         Integer.parseInt(
             properties.getProperty(
-                "compaction_write_throughput_mb_per_sec",
-                
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+                "compaction_io_per_sec", 
Integer.toString(conf.getCompactionIOPerSec()))));
 
     conf.setEnableCompactionValidation(
         Boolean.parseBoolean(
@@ -1475,11 +1474,11 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "slow_query_threshold", 
Long.toString(conf.getSlowQueryThreshold()))));
       // update merge_write_throughput_mb_per_sec
-      conf.setCompactionWriteThroughputMbPerSec(
+      conf.setCompactionIOPerSec(
           Integer.parseInt(
               properties.getProperty(
                   "merge_write_throughput_mb_per_sec",
-                  
Integer.toString(conf.getCompactionWriteThroughputMbPerSec()))));
+                  Integer.toString(conf.getCompactionIOPerSec()))));
 
       // update insert-tablet-plan's row limit for select-into
       conf.setSelectIntoInsertTabletPlanRowLimit(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
index edd79a608c..5f74773c0c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionTaskManager.java
@@ -75,7 +75,7 @@ public class CompactionTaskManager implements IService {
       storageGroupTasks = new ConcurrentHashMap<>();
   private final AtomicInteger finishedTaskNum = new AtomicInteger(0);
 
-  private final RateLimiter mergeWriteRateLimiter = 
RateLimiter.create(Double.MAX_VALUE);
+  private final RateLimiter compactionIORateLimiter = 
RateLimiter.create(Double.MAX_VALUE);
 
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private volatile boolean init = false;
@@ -237,10 +237,9 @@ public class CompactionTaskManager implements IService {
         .containsKey(task);
   }
 
-  public RateLimiter getMergeWriteRateLimiter() {
-    setWriteMergeRate(
-        
IoTDBDescriptor.getInstance().getConfig().getCompactionWriteThroughputMbPerSec());
-    return mergeWriteRateLimiter;
+  public RateLimiter getCompactionIORateLimiter() {
+    
setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getCompactionIOPerSec());
+    return compactionIORateLimiter;
   }
 
   private void setWriteMergeRate(final double throughoutMbPerSec) {
@@ -249,8 +248,8 @@ public class CompactionTaskManager implements IService {
     if (throughout == 0) {
       throughout = Double.MAX_VALUE;
     }
-    if (mergeWriteRateLimiter.getRate() != throughout) {
-      mergeWriteRateLimiter.setRate(throughout);
+    if (compactionIORateLimiter.getRate() != throughout) {
+      compactionIORateLimiter.setRate(throughout);
     }
   }
   /** wait by throughoutMbPerSec limit to avoid continuous Write Or Read */
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
index 3de4c64a36..b09fdab0b5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/AlignedSeriesCompactionExecutor.java
@@ -62,7 +62,7 @@ public class AlignedSeriesCompactionExecutor {
   private final List<IMeasurementSchema> schemaList;
   private long remainingPointInChunkWriter = 0L;
   private final RateLimiter rateLimiter =
-      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
+      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
 
   private final long chunkSizeThreshold =
       IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -135,6 +135,9 @@ public class AlignedSeriesCompactionExecutor {
       TsFileAlignedSeriesReaderIterator readerIterator =
           new TsFileAlignedSeriesReaderIterator(reader, 
alignedChunkMetadataList, schemaList);
       while (readerIterator.hasNext()) {
+        CompactionTaskManager.getInstance()
+            .getCompactionIORateLimiter()
+            .acquire(schemaList.size() + 1);
         Pair<AlignedChunkReader, Long> chunkReaderAndChunkSize = 
readerIterator.nextReader();
         
CompactionMetricsRecorder.recordReadInfo(chunkReaderAndChunkSize.right);
         compactOneAlignedChunk(chunkReaderAndChunkSize.left);
@@ -142,8 +145,7 @@ public class AlignedSeriesCompactionExecutor {
     }
 
     if (remainingPointInChunkWriter != 0L) {
-      CompactionTaskManager.mergeRateLimiterAcquire(
-          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
+      CompactionTaskManager.mergeRateLimiterAcquire(rateLimiter, 1);
       CompactionMetricsRecorder.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
@@ -181,8 +183,7 @@ public class AlignedSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (remainingPointInChunkWriter >= chunkPointNumThreshold
         || chunkWriter.estimateMaxSeriesMemSize() >= chunkSizeThreshold * 
schemaList.size()) {
-      CompactionTaskManager.mergeRateLimiterAcquire(
-          rateLimiter, chunkWriter.estimateMaxSeriesMemSize());
+      CompactionTaskManager.mergeRateLimiterAcquire(rateLimiter, 1);
       CompactionMetricsRecorder.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
index d1d4a366e7..784b15e3cb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java
@@ -58,7 +58,7 @@ public class SingleSeriesCompactionExecutor {
   private Chunk cachedChunk;
   private ChunkMetadata cachedChunkMetadata;
   private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
+      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
   // record the min time and max time to update the target resource
   private long minStartTimestamp = Long.MAX_VALUE;
   private long maxEndTimestamp = Long.MIN_VALUE;
@@ -117,6 +117,7 @@ public class SingleSeriesCompactionExecutor {
       TsFileSequenceReader reader = readerListPair.left;
       List<ChunkMetadata> chunkMetadataList = readerListPair.right;
       for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+        compactionRateLimiter.acquire(1);
         Chunk currentChunk = reader.readMemChunk(chunkMetadata);
         if (this.chunkWriter == null) {
           constructChunkWriterFromReadChunk(currentChunk);
@@ -298,7 +299,7 @@ public class SingleSeriesCompactionExecutor {
 
   private void flushChunkToFileWriter(
       Chunk chunk, ChunkMetadata chunkMetadata, boolean isCachedChunk) throws 
IOException {
-    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 
getChunkSize(chunk));
+    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 1);
     if (chunkMetadata.getStartTime() < minStartTimestamp) {
       minStartTimestamp = chunkMetadata.getStartTime();
     }
@@ -316,8 +317,7 @@ public class SingleSeriesCompactionExecutor {
   private void flushChunkWriterIfLargeEnough() throws IOException {
     if (pointCountInChunkWriter >= targetChunkPointNum
         || chunkWriter.estimateMaxSeriesMemSize() >= targetChunkSize) {
-      CompactionTaskManager.mergeRateLimiterAcquire(
-          compactionRateLimiter, chunkWriter.estimateMaxSeriesMemSize());
+      CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 1);
       CompactionMetricsRecorder.recordWriteInfo(
           CompactionType.INNER_SEQ_COMPACTION,
           ProcessChunkType.DESERIALIZE_CHUNK,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
index 24a82136ce..c2f54080e4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/compaction/writer/AbstractCompactionWriter.java
@@ -49,7 +49,7 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
   protected int subTaskNum = 
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
 
   private RateLimiter compactionRateLimiter =
-      CompactionTaskManager.getInstance().getMergeWriteRateLimiter();
+      CompactionTaskManager.getInstance().getCompactionIORateLimiter();
 
   // check if there is unseq error point during writing
   protected long[] lastTime = new long[subTaskNum];
@@ -162,8 +162,7 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
 
   protected void sealChunk(TsFileIOWriter targetWriter, IChunkWriter 
iChunkWriter, int subTaskId)
       throws IOException {
-    CompactionTaskManager.mergeRateLimiterAcquire(
-        compactionRateLimiter, iChunkWriter.estimateMaxSeriesMemSize());
+    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 1);
     synchronized (targetWriter) {
       iChunkWriter.writeToFileWriter(targetWriter);
     }
@@ -184,7 +183,7 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
   protected void flushNonAlignedChunkToFileWriter(
       TsFileIOWriter targetWriter, Chunk chunk, ChunkMetadata chunkMetadata, 
int subTaskId)
       throws IOException {
-    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 
getChunkSize(chunk));
+    CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 1);
     synchronized (targetWriter) {
       // seal last chunk to file writer
       chunkWriters[subTaskId].writeToFileWriter(targetWriter);
@@ -208,7 +207,7 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
       chunkPointNumArray[subTaskId] = 0;
 
       // flush time chunk
-      CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 
getChunkSize(timeChunk));
+      CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 1);
       targetWriter.writeChunk(timeChunk, (ChunkMetadata) timeChunkMetadata);
 
       // flush value chunks
@@ -225,8 +224,7 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
               valueChunkWriter.getStatistics());
           continue;
         }
-        CompactionTaskManager.mergeRateLimiterAcquire(
-            compactionRateLimiter, getChunkSize(valueChunk));
+        CompactionTaskManager.mergeRateLimiterAcquire(compactionRateLimiter, 
1);
         targetWriter.writeChunk(valueChunk, (ChunkMetadata) 
valueChunkMetadatas.get(i));
       }
     }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
index a809d10262..a9194d2fa1 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/compaction/utils/CompactionConfigRestorer.java
@@ -65,6 +65,6 @@ public class CompactionConfigRestorer {
     config.setCompactionThreadCount(concurrentCompactionThread);
     config.setCompactionScheduleIntervalInMs(compactionScheduleIntervalInMs);
     
config.setCompactionSubmissionIntervalInMs(compactionSubmissionIntervalInMs);
-    
config.setCompactionWriteThroughputMbPerSec(compactionWriteThroughputMbPerSec);
+    config.setCompactionIOPerSec(compactionWriteThroughputMbPerSec);
   }
 }

Reply via email to