This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c3b7f53d52 Optimize priority map lock structure (#12351)
5c3b7f53d52 is described below

commit 5c3b7f53d52a70cdb875396018d74c3d79ebfc46
Author: Yongzao <[email protected]>
AuthorDate: Wed Apr 17 17:10:50 2024 +0800

    Optimize priority map lock structure (#12351)
    
    * Finish
    
    * bug fix
    
    * Fix nano time
---
 .../client/async/handlers/AsyncClientHandler.java        | 11 ++++++++++-
 .../async/handlers/rpc/TransferLeaderRPCHandler.java     |  2 +-
 .../confignode/manager/load/balancer/RouteBalancer.java  | 16 ++++++----------
 3 files changed, 17 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
index a8c445665d0..7cd86af6ef7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/AsyncClientHandler.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.confignode.client.async.handlers.rpc.FetchSchemaBlackLis
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.PipeHeartbeatRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.PipePushMetaRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.SchemaUpdateRPCHandler;
+import 
org.apache.iotdb.confignode.client.async.handlers.rpc.TransferLeaderRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.ConsumerGroupPushMetaRPCHandler;
 import 
org.apache.iotdb.confignode.client.async.handlers.rpc.subscription.TopicPushMetaRPCHandler;
 import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
@@ -39,6 +40,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -243,6 +245,14 @@ public class AsyncClientHandler<Q, R> {
             dataNodeLocationMap,
             (Map<Integer, TPushConsumerGroupMetaResp>) responseMap,
             countDownLatch);
+      case CHANGE_REGION_LEADER:
+        return new TransferLeaderRPCHandler(
+            requestType,
+            requestId,
+            targetDataNode,
+            dataNodeLocationMap,
+            (Map<Integer, TRegionLeaderChangeResp>) responseMap,
+            countDownLatch);
       case SET_TTL:
       case CREATE_DATA_REGION:
       case CREATE_SCHEMA_REGION:
@@ -264,7 +274,6 @@ public class AsyncClientHandler<Q, R> {
       case UPDATE_REGION_ROUTE_MAP:
       case INVALIDATE_MATCHED_SCHEMA_CACHE:
       case UPDATE_TEMPLATE:
-      case CHANGE_REGION_LEADER:
       case KILL_QUERY_INSTANCE:
       case RESET_PEER_LIST:
       default:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TransferLeaderRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TransferLeaderRPCHandler.java
index 9f0e2312717..d5107e315cf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TransferLeaderRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/TransferLeaderRPCHandler.java
@@ -35,7 +35,7 @@ public class TransferLeaderRPCHandler extends 
AbstractAsyncRPCHandler<TRegionLea
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TransferLeaderRPCHandler.class);
 
-  protected TransferLeaderRPCHandler(
+  public TransferLeaderRPCHandler(
       DataNodeRequestType requestType,
       int requestId,
       TDataNodeLocation targetDataNode,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 25b3fc31478..7269defdbe6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -108,7 +108,7 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
   private final Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap;
 
   // The interval of retrying to balance ratis leader after the last failed 
time
-  private static final long BALANCE_RATIS_LEADER_FAILED_INTERVAL = 60 * 1000L;
+  private static final long BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS = 60 * 
1000L * 1000L * 1000L;
   private final Map<TConsensusGroupId, Long> lastFailedTimeForLeaderBalance;
 
   public RouteBalancer(IManager configManager) {
@@ -175,7 +175,7 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
         (regionGroupId, newLeaderId) -> {
           if (ConsensusFactory.RATIS_CONSENSUS.equals(consensusProtocolClass)
               && currentTime - 
lastFailedTimeForLeaderBalance.getOrDefault(regionGroupId, 0L)
-                  > BALANCE_RATIS_LEADER_FAILED_INTERVAL) {
+                  <= BALANCE_RATIS_LEADER_FAILED_INTERVAL_IN_NS) {
             return;
           }
 
@@ -283,13 +283,8 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
     }
 
     if (needBroadcast.get()) {
-      priorityMapLock.readLock().lock();
-      try {
-        broadcastLatestRegionPriorityMap();
-        recordRegionPriorityMap(differentPriorityMap);
-      } finally {
-        priorityMapLock.readLock().unlock();
-      }
+      recordRegionPriorityMap(differentPriorityMap);
+      broadcastLatestRegionPriorityMap();
     }
   }
 
@@ -304,10 +299,11 @@ public class RouteBalancer implements 
IClusterStatusSubscriber {
             .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, 
location -> location));
 
     long broadcastTime = System.currentTimeMillis();
+    Map<TConsensusGroupId, TRegionReplicaSet> tmpPriorityMap = 
getRegionPriorityMap();
     AsyncClientHandler<TRegionRouteReq, TSStatus> clientHandler =
         new AsyncClientHandler<>(
             DataNodeRequestType.UPDATE_REGION_ROUTE_MAP,
-            new TRegionRouteReq(broadcastTime, regionPriorityMap),
+            new TRegionRouteReq(broadcastTime, tmpPriorityMap),
             dataNodeLocationMap);
     
AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
   }

Reply via email to