This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 99502365d2b IoTConsensus Transit Snapshot Rate Limiter (#12348)
99502365d2b is described below

commit 99502365d2b46c68f0b5a8ff4c229f34adcc5123
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Wed Apr 17 17:10:26 2024 +0800

    IoTConsensus Transit Snapshot Rate Limiter (#12348)
---
 .../iotdb/consensus/config/IoTConsensusConfig.java | 19 ++++++-
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  8 +++
 .../consensus/iot/IoTConsensusServerImpl.java      |  4 ++
 .../iot/snapshot/IoTConsensusRateLimiter.java      | 59 ++++++++++++++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  7 +++
 .../db/consensus/DataRegionConsensusImpl.java      |  2 +
 .../storageengine/rescon/quotas/QuotaLimiter.java  |  3 ++
 .../resources/conf/iotdb-common.properties         |  4 ++
 .../quotas/AverageIntervalRateLimiter.java         |  2 +-
 .../commons}/quotas/FixedIntervalRateLimiter.java  |  2 +-
 .../apache/iotdb/commons}/quotas/RateLimiter.java  |  6 +--
 12 files changed, 119 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
index f4bb131b3bf..6c89e19291b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/IoTConsensusConfig.java
@@ -248,6 +248,7 @@ public class IoTConsensusConfig {
     private final long checkpointGap;
     private final long allocateMemoryForConsensus;
     private final long allocateMemoryForQueue;
+    private final long regionMigrationSpeedLimitBytesPerSecond;
 
     private Replication(
         int maxLogEntriesNumPerBatch,
@@ -262,7 +263,8 @@ public class IoTConsensusConfig {
         long throttleTimeOutMs,
         long checkpointGap,
         long allocateMemoryForConsensus,
-        double maxMemoryRatioForQueue) {
+        double maxMemoryRatioForQueue,
+        long regionMigrationSpeedLimitBytesPerSecond) {
       this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
       this.maxSizePerBatch = maxSizePerBatch;
       this.maxPendingBatchesNum = maxPendingBatchesNum;
@@ -276,6 +278,7 @@ public class IoTConsensusConfig {
       this.checkpointGap = checkpointGap;
       this.allocateMemoryForConsensus = allocateMemoryForConsensus;
       this.allocateMemoryForQueue = (long) (allocateMemoryForConsensus * 
maxMemoryRatioForQueue);
+      this.regionMigrationSpeedLimitBytesPerSecond = 
regionMigrationSpeedLimitBytesPerSecond;
     }
 
     public int getMaxLogEntriesNumPerBatch() {
@@ -330,6 +333,10 @@ public class IoTConsensusConfig {
       return allocateMemoryForQueue;
     }
 
+    public long getRegionMigrationSpeedLimitBytesPerSecond() {
+      return regionMigrationSpeedLimitBytesPerSecond;
+    }
+
     public static Replication.Builder newBuilder() {
       return new Replication.Builder();
     }
@@ -350,6 +357,7 @@ public class IoTConsensusConfig {
       private long checkpointGap = 500;
       private long allocateMemoryForConsensus = 
Runtime.getRuntime().maxMemory() / 10;
       private double maxMemoryRatioForQueue = 0.6;
+      private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L;
 
       public Replication.Builder setMaxLogEntriesNumPerBatch(int 
maxLogEntriesNumPerBatch) {
         this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
@@ -418,6 +426,12 @@ public class IoTConsensusConfig {
         return this;
       }
 
+      public Builder setRegionMigrationSpeedLimitBytesPerSecond(
+          long regionMigrationSpeedLimitBytesPerSecond) {
+        this.regionMigrationSpeedLimitBytesPerSecond = 
regionMigrationSpeedLimitBytesPerSecond;
+        return this;
+      }
+
       public Replication build() {
         return new Replication(
             maxLogEntriesNumPerBatch,
@@ -432,7 +446,8 @@ public class IoTConsensusConfig {
             throttleTimeOutMs,
             checkpointGap,
             allocateMemoryForConsensus,
-            maxMemoryRatioForQueue);
+            maxMemoryRatioForQueue,
+            regionMigrationSpeedLimitBytesPerSecond);
       }
     }
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index a4d249c3d49..01d86f851db 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -57,6 +57,7 @@ import 
org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
 import org.apache.iotdb.consensus.iot.logdispatcher.IoTConsensusMemoryManager;
 import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCService;
 import org.apache.iotdb.consensus.iot.service.IoTConsensusRPCServiceProcessor;
+import org.apache.iotdb.consensus.iot.snapshot.IoTConsensusRateLimiter;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -120,6 +121,13 @@ public class IoTConsensus implements IConsensus {
         .init(
             
config.getIotConsensusConfig().getReplication().getAllocateMemoryForConsensus(),
             
config.getIotConsensusConfig().getReplication().getAllocateMemoryForQueue());
+    // init IoTConsensus Rate Limiter
+    IoTConsensusRateLimiter.getInstance()
+        .init(
+            config
+                .getIotConsensusConfig()
+                .getReplication()
+                .getRegionMigrationSpeedLimitBytesPerSecond());
   }
 
   @Override
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index b553218584a..ecd8debea7b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -43,6 +43,7 @@ import 
org.apache.iotdb.consensus.iot.client.SyncIoTConsensusServiceClient;
 import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
 import org.apache.iotdb.consensus.iot.log.GetConsensusReqReaderPlan;
 import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
+import org.apache.iotdb.consensus.iot.snapshot.IoTConsensusRateLimiter;
 import org.apache.iotdb.consensus.iot.snapshot.SnapshotFragmentReader;
 import org.apache.iotdb.consensus.iot.thrift.TActivatePeerReq;
 import org.apache.iotdb.consensus.iot.thrift.TActivatePeerRes;
@@ -120,6 +121,8 @@ public class IoTConsensusServerImpl {
   private final IoTConsensusServerMetrics ioTConsensusServerMetrics;
   private final String consensusGroupId;
   private final ScheduledExecutorService backgroundTaskService;
+  private final IoTConsensusRateLimiter ioTConsensusRateLimiter =
+      IoTConsensusRateLimiter.getInstance();
 
   public IoTConsensusServerImpl(
       String storageDir,
@@ -316,6 +319,7 @@ public class IoTConsensusServerImpl {
             // TODO: zero copy ?
             TSendSnapshotFragmentReq req = 
reader.next().toTSendSnapshotFragmentReq();
             
req.setConsensusGroupId(targetPeer.getGroupId().convertToTConsensusGroupId());
+            
ioTConsensusRateLimiter.acquireTransitDataSizeWithRateLimiter(req.getChunkLength());
             TSendSnapshotFragmentRes res = client.sendSnapshotFragment(req);
             if (!isSuccess(res.getStatus())) {
               throw new ConsensusGroupModifyPeerException(
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/IoTConsensusRateLimiter.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/IoTConsensusRateLimiter.java
new file mode 100644
index 00000000000..2622a7d16b3
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/IoTConsensusRateLimiter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.iot.snapshot;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IoTConsensusRateLimiter {
+  private static final Logger logger = 
LoggerFactory.getLogger(IoTConsensusRateLimiter.class);
+
+  private final RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE);
+
+  private IoTConsensusRateLimiter() {}
+
+  public void init(long regionMigrationSpeedLimitBytesPerSecond) {
+    rateLimiter.setRate(regionMigrationSpeedLimitBytesPerSecond);
+  }
+
+  /**
+   * Acquire the size of the data to be sent.
+   *
+   * @param transitDataSize the size of the data to be sent
+   */
+  public void acquireTransitDataSizeWithRateLimiter(long transitDataSize) {
+    while (transitDataSize > 0) {
+      if (transitDataSize > Integer.MAX_VALUE) {
+        rateLimiter.acquire(Integer.MAX_VALUE);
+        transitDataSize -= Integer.MAX_VALUE;
+      } else {
+        rateLimiter.acquire((int) transitDataSize);
+        return;
+      }
+    }
+  }
+
+  private static final IoTConsensusRateLimiter INSTANCE = new 
IoTConsensusRateLimiter();
+
+  public static IoTConsensusRateLimiter getInstance() {
+    return INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 27c07ff6bd0..e7493770f2e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1078,6 +1078,7 @@ public class IoTDBConfig {
   private int maxSizePerBatch = 16 * 1024 * 1024;
   private int maxPendingBatchesNum = 5;
   private double maxMemoryRatioForQueue = 0.6;
+  private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L;
 
   /** Load related */
   private double maxAllocateMemoryRatioForLoad = 0.8;
@@ -1127,6 +1128,15 @@ public class IoTDBConfig {
     this.maxLogEntriesNumPerBatch = maxLogEntriesNumPerBatch;
   }
 
+  public long getRegionMigrationSpeedLimitBytesPerSecond() {
+    return regionMigrationSpeedLimitBytesPerSecond;
+  }
+
+  public void setRegionMigrationSpeedLimitBytesPerSecond(
+      long regionMigrationSpeedLimitBytesPerSecond) {
+    this.regionMigrationSpeedLimitBytesPerSecond = 
regionMigrationSpeedLimitBytesPerSecond;
+  }
+
   public void setMaxSizePerBatch(int maxSizePerBatch) {
     this.maxSizePerBatch = maxSizePerBatch;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 14438eb845e..df797ff1afe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1067,6 +1067,13 @@ public class IoTDBDescriptor {
                     "data_region_iot_max_memory_ratio_for_queue",
                     String.valueOf(conf.getMaxMemoryRatioForQueue()))
                 .trim()));
+    conf.setRegionMigrationSpeedLimitBytesPerSecond(
+        Long.parseLong(
+            properties
+                .getProperty(
+                    "region_migration_speed_limit_bytes_per_second",
+                    
String.valueOf(conf.getRegionMigrationSpeedLimitBytesPerSecond()))
+                .trim()));
   }
 
   private void loadAuthorCache(Properties properties) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
index f1f978fd477..42f80ccd61a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java
@@ -112,6 +112,8 @@ public class DataRegionConsensusImpl {
                                       
.setMaxSizePerBatch(CONF.getMaxSizePerBatch())
                                       
.setMaxPendingBatchesNum(CONF.getMaxPendingBatchesNum())
                                       
.setMaxMemoryRatioForQueue(CONF.getMaxMemoryRatioForQueue())
+                                      
.setRegionMigrationSpeedLimitBytesPerSecond(
+                                          
CONF.getRegionMigrationSpeedLimitBytesPerSecond())
                                       .build())
                               .build())
                       .setRatisConfig(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/QuotaLimiter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/QuotaLimiter.java
index 138df719fdc..9a659d67dd9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/QuotaLimiter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/QuotaLimiter.java
@@ -22,6 +22,9 @@ package org.apache.iotdb.db.storageengine.rescon.quotas;
 import org.apache.iotdb.common.rpc.thrift.TTimedQuota;
 import org.apache.iotdb.common.rpc.thrift.ThrottleType;
 import org.apache.iotdb.commons.exception.RpcThrottlingException;
+import org.apache.iotdb.commons.quotas.AverageIntervalRateLimiter;
+import org.apache.iotdb.commons.quotas.FixedIntervalRateLimiter;
+import org.apache.iotdb.commons.quotas.RateLimiter;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.rpc.TSStatusCode;
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index fc2da76d388..6b98d0906c9 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -771,6 +771,10 @@ data_replication_factor=1
 # Datatype: double
 # data_region_iot_max_memory_ratio_for_queue = 0.6
 
+# The maximum transit size in byte per second for region migration
+# Datatype: long
+# region_migration_speed_limit_bytes_per_second = 33554432
+
 ####################
 ### TsFile Configurations
 ####################
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/AverageIntervalRateLimiter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/AverageIntervalRateLimiter.java
similarity index 97%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/AverageIntervalRateLimiter.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/AverageIntervalRateLimiter.java
index c88b78eb434..87c75ba9315 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/AverageIntervalRateLimiter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/AverageIntervalRateLimiter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.storageengine.rescon.quotas;
+package org.apache.iotdb.commons.quotas;
 
 /**
  * This limiter will refill resources at every TimeUnit/resources interval. 
For example: For a
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/FixedIntervalRateLimiter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/FixedIntervalRateLimiter.java
similarity index 96%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/FixedIntervalRateLimiter.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/FixedIntervalRateLimiter.java
index d17c5a4e2f1..739c9889bd5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/FixedIntervalRateLimiter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/FixedIntervalRateLimiter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.storageengine.rescon.quotas;
+package org.apache.iotdb.commons.quotas;
 
 /**
  * With this limiter resources will be refilled only after a fixed interval of 
time. Copy from
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/RateLimiter.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/RateLimiter.java
similarity index 95%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/RateLimiter.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/RateLimiter.java
index be35c8e48db..dee54e70c0b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/RateLimiter.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/quotas/RateLimiter.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.storageengine.rescon.quotas;
+package org.apache.iotdb.commons.quotas;
 
 public abstract class RateLimiter {
 
@@ -71,7 +71,7 @@ public abstract class RateLimiter {
    * @param limit Maximum available resource units that can be refilled to.
    * @return how many resource units may be refilled ?
    */
-  abstract long refill(long limit);
+  protected abstract long refill(long limit);
 
   /**
    * Time in milliseconds to wait for before requesting to consume 'amount' 
resource.
@@ -81,7 +81,7 @@ public abstract class RateLimiter {
    * @param amount Resources for which time interval to calculate for
    * @return estimate of the ms required to wait before being able to provide 
'amount' resources.
    */
-  abstract long getWaitInterval(long limit, long available, long amount);
+  protected abstract long getWaitInterval(long limit, long available, long 
amount);
 
   public synchronized long getTimeUnitInMillis() {
     return tunit;

Reply via email to