This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ad9f83cc1 [IOTDB-3881] strength the concurrency control of 
regionReplicaSetCache. (#6708)
6ad9f83cc1 is described below

commit 6ad9f83cc13c1d5d46f01bf1bb2d15e16e023f9d
Author: ZhangHongYin <[email protected]>
AuthorDate: Thu Jul 21 14:09:41 2022 +0800

    [IOTDB-3881] strength the concurrency control of regionReplicaSetCache. 
(#6708)
---
 .../db/mpp/plan/analyze/cache/PartitionCache.java  | 80 ++++++++++++++--------
 1 file changed, 53 insertions(+), 27 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
index a2b53dfb48..86fbe66bad 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
@@ -66,7 +66,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -96,8 +95,7 @@ public class PartitionCache {
   /** the latest time when groupIdToReplicaSetMap updated. */
   private final AtomicLong latestUpdateTime = new AtomicLong(0);
   /** TConsensusGroupId -> TRegionReplicaSet */
-  private final Map<TConsensusGroupId, TRegionReplicaSet> 
groupIdToReplicaSetMap =
-      new ConcurrentHashMap<>();
+  private final Map<TConsensusGroupId, TRegionReplicaSet> 
groupIdToReplicaSetMap = new HashMap<>();
 
   /** The lock of cache */
   private final ReentrantReadWriteLock storageGroupCacheLock = new 
ReentrantReadWriteLock();
@@ -105,6 +103,8 @@ public class PartitionCache {
   private final ReentrantReadWriteLock schemaPartitionCacheLock = new 
ReentrantReadWriteLock();
   private final ReentrantReadWriteLock dataPartitionCacheLock = new 
ReentrantReadWriteLock();
 
+  private final ReentrantReadWriteLock regionReplicaSetLock = new 
ReentrantReadWriteLock();
+
   private final IClientManager<PartitionRegionId, ConfigNodeClient> 
configNodeClientManager =
       new IClientManager.Factory<PartitionRegionId, ConfigNodeClient>()
           .createClientManager(new 
DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
@@ -396,28 +396,44 @@ public class PartitionCache {
    * @throws StatementAnalyzeException if there are exception when try to get 
latestRegionRouteMap
    */
   public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId 
consensusGroupId) {
-    if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
-      // try to update latestRegionRegionRouteMap when miss
-      synchronized (groupIdToReplicaSetMap) {
-        try (ConfigNodeClient client =
-            
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
-          TRegionRouteMapResp resp = client.getLatestRegionRouteMap();
-          if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
resp.getStatus().getCode()) {
-            updateGroupIdToReplicaSetMap(resp.getTimestamp(), 
resp.getRegionRouteMap());
-          }
-          if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
-            // failed to get RegionReplicaSet from confignode
-            throw new RuntimeException(
-                "Failed to get replicaSet of consensus group[id= " + 
consensusGroupId + "]");
+    TRegionReplicaSet result;
+    // try to get regionReplicaSet from cache
+    try {
+      regionReplicaSetLock.readLock().lock();
+      result = groupIdToReplicaSetMap.get(consensusGroupId);
+    } finally {
+      regionReplicaSetLock.readLock().unlock();
+    }
+    if (result == null) {
+      // if not hit then try to get regionReplicaSet from confignode
+      try {
+        regionReplicaSetLock.writeLock().lock();
+        // verify that there are not hit in cache
+        if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
+          try (ConfigNodeClient client =
+              
configNodeClientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+            TRegionRouteMapResp resp = client.getLatestRegionRouteMap();
+            if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
resp.getStatus().getCode()) {
+              updateGroupIdToReplicaSetMap(resp.getTimestamp(), 
resp.getRegionRouteMap());
+            }
+            // if confignode don't have then will throw RuntimeException
+            if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
+              // failed to get RegionReplicaSet from confignode
+              throw new RuntimeException(
+                  "Failed to get replicaSet of consensus group[id= " + 
consensusGroupId + "]");
+            }
+          } catch (IOException | TException e) {
+            throw new StatementAnalyzeException(
+                "An error occurred when executing getRegionReplicaSet():" + 
e.getMessage());
           }
-        } catch (IOException | TException e) {
-          throw new StatementAnalyzeException(
-              "An error occurred when executing getRegionReplicaSet():" + 
e.getMessage());
         }
+        result = groupIdToReplicaSetMap.get(consensusGroupId);
+      } finally {
+        regionReplicaSetLock.writeLock().unlock();
       }
     }
     // try to get regionReplicaSet by consensusGroupId
-    return groupIdToReplicaSetMap.get(consensusGroupId);
+    return result;
   }
 
   /**
@@ -429,18 +445,28 @@ public class PartitionCache {
    */
   public boolean updateGroupIdToReplicaSetMap(
       long timestamp, Map<TConsensusGroupId, TRegionReplicaSet> map) {
-    boolean result = (timestamp == 
latestUpdateTime.accumulateAndGet(timestamp, Math::max));
-    // if timestamp is greater than latestUpdateTime, then update
-    if (result) {
-      groupIdToReplicaSetMap.clear();
-      groupIdToReplicaSetMap.putAll(map);
+    try {
+      regionReplicaSetLock.writeLock().lock();
+      boolean result = (timestamp == 
latestUpdateTime.accumulateAndGet(timestamp, Math::max));
+      // if timestamp is greater than latestUpdateTime, then update
+      if (result) {
+        groupIdToReplicaSetMap.clear();
+        groupIdToReplicaSetMap.putAll(map);
+      }
+      return result;
+    } finally {
+      regionReplicaSetLock.writeLock().unlock();
     }
-    return result;
   }
 
   /** invalid replicaSetCache */
   public void invalidReplicaSetCache() {
-    groupIdToReplicaSetMap.clear();
+    try {
+      regionReplicaSetLock.writeLock().lock();
+      groupIdToReplicaSetMap.clear();
+    } finally {
+      regionReplicaSetLock.writeLock().unlock();
+    }
   }
 
   // endregion

Reply via email to