This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new b8baca989 Change assignable instance set caches to only be replaced 
after each update instead of reusing same maps and sets (#2746)
b8baca989 is described below

commit b8baca9897e4b64e0395cde3618a6d45b4891ebe
Author: Zachary Pinto <zapi...@linkedin.com>
AuthorDate: Tue Jan 30 15:56:45 2024 -0800

    Change assignable instance set caches to only be replaced after each update 
instead of reusing same maps and sets (#2746)
    
    Build new DerivedInstanceCache object when updating instance maps and sets 
from caches and atomically swap the value for _derivedInstanceCache in order to 
stop having ConcurrentModificationErrors in async global and partial rebalance 
stages.
---
 .../dataproviders/BaseControllerDataProvider.java  | 92 ++++++++++++++--------
 1 file changed, 60 insertions(+), 32 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index c6c799dd1..1e40bbb72 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -119,13 +119,32 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
   private final Map<String, Map<String, Set<String>>> 
_disabledInstanceForPartitionMap = new HashMap<>();
   private final Set<String> _disabledInstanceSet = new HashSet<>();
 
-  // Assignable instances are instances will contain at most one instance with 
a given logicalId.
-  // This is used for SWAP related operations where there can be two instances 
with the same logicalId.
-  private final Map<String, InstanceConfig> _assignableInstanceConfigMap = new 
HashMap<>();
-  private final Map<String, LiveInstance> _assignableLiveInstancesMap = new 
HashMap<>();
-  private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName = 
new HashMap<>();
-  private final Set<String> _liveSwapInInstanceNames = new HashSet<>();
-  private final Set<String> _enabledSwapInInstanceNames = new HashSet<>();
+  private static final class DerivedInstanceCache {
+    // Assignable instances are instances will contain at most one instance 
with a given logicalId.
+    // This is used for SWAP related operations where there can be two 
instances with the same logicalId.
+    private final Map<String, InstanceConfig> _assignableInstanceConfigMap;
+    private final Map<String, LiveInstance> _assignableLiveInstancesMap;
+    private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName;
+    private final Set<String> _liveSwapInInstanceNames;
+    private final Set<String> _enabledSwapInInstanceNames;
+
+    DerivedInstanceCache(Map<String, InstanceConfig> 
assignableInstanceConfigMap,
+        Map<String, LiveInstance> assignableLiveInstancesMap,
+        Map<String, String> swapOutInstanceNameToSwapInInstanceName,
+        Set<String> liveSwapInInstanceNames, Set<String> 
enabledSwapInInstanceNames) {
+      _assignableInstanceConfigMap = assignableInstanceConfigMap;
+      _assignableLiveInstancesMap = assignableLiveInstancesMap;
+      _swapOutInstanceNameToSwapInInstanceName = 
swapOutInstanceNameToSwapInInstanceName;
+      _liveSwapInInstanceNames = liveSwapInInstanceNames;
+      _enabledSwapInInstanceNames = enabledSwapInInstanceNames;
+    }
+  }
+
+  // All maps and sets are encapsulated in DerivedInstanceCache to ensure that 
they are updated together
+  // as a snapshot.
+  private DerivedInstanceCache _derivedInstanceCache =
+      new DerivedInstanceCache(new HashMap<>(), new HashMap<>(), new 
HashMap<>(), new HashSet<>(),
+          new HashSet<>());
   private final Map<String, MonitoredAbnormalResolver> 
_abnormalStateResolverMap = new HashMap<>();
   private final Set<String> _timedOutInstanceDuringMaintenance = new 
HashSet<>();
   private Map<String, LiveInstance> 
_allLiveInstanceExcludeTimedOutForMaintenance = new HashMap<>();
@@ -363,12 +382,12 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     ClusterTopologyConfig clusterTopologyConfig =
         ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
 
-    // Clear all caches
-    _assignableInstanceConfigMap.clear();
-    _assignableLiveInstancesMap.clear();
-    _swapOutInstanceNameToSwapInInstanceName.clear();
-    _liveSwapInInstanceNames.clear();
-    _enabledSwapInInstanceNames.clear();
+    // Create new caches to be populated.
+    Map<String, InstanceConfig> newAssignableInstanceConfigMap = new 
HashMap<>();
+    Map<String, LiveInstance> newAssignableLiveInstancesMap = new HashMap<>();
+    Map<String, String> newSwapOutInstanceNameToSwapInInstanceName = new 
HashMap<>();
+    Set<String> newLiveSwapInInstanceNames = new HashSet<>();
+    Set<String> newEnabledSwapInInstanceNames = new HashSet<>();
 
     Map<String, String> filteredInstancesByLogicalId = new HashMap<>();
     Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>();
@@ -403,14 +422,14 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
           // instance with this instance. If this instance has no 
InstanceOperation, then replace the filtered instance
           // with this instance. This is the case where the SWAP_IN node has 
been marked as complete or SWAP_IN exists and
           // SWAP_OUT does not. There can never be a case where both have no 
InstanceOperation set.
-          _assignableInstanceConfigMap.remove(filteredNode);
-          _assignableInstanceConfigMap.put(node, currentInstanceConfig);
+          newAssignableInstanceConfigMap.remove(filteredNode);
+          newAssignableInstanceConfigMap.put(node, currentInstanceConfig);
           filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
         }
       } else if (!currentInstanceConfig.getInstanceOperation()
           .equals(InstanceConstants.InstanceOperation.EVACUATE.name())) {
         // EVACUATE instances are not considered to be assignable.
-        _assignableInstanceConfigMap.put(node, currentInstanceConfig);
+        newAssignableInstanceConfigMap.put(node, currentInstanceConfig);
         filteredInstancesByLogicalId.put(currentInstanceLogicalId, node);
       }
 
@@ -429,24 +448,30 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     }
 
     liveInstancesMap.forEach((instanceName, liveInstance) -> {
-      if (_assignableInstanceConfigMap.containsKey(instanceName)) {
-        _assignableLiveInstancesMap.put(instanceName, liveInstance);
+      if (newAssignableInstanceConfigMap.containsKey(instanceName)) {
+        newAssignableLiveInstancesMap.put(instanceName, liveInstance);
       }
     });
 
     swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> {
       String swapInInstanceName = swapInInstancesByLogicalId.get(value);
       if (swapInInstanceName != null) {
-        _swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, 
swapInInstanceName);
+        newSwapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, 
swapInInstanceName);
         if (liveInstancesMap.containsKey(swapInInstanceName)) {
-          _liveSwapInInstanceNames.add(swapInInstanceName);
+          newLiveSwapInInstanceNames.add(swapInInstanceName);
         }
         if 
(InstanceValidationUtil.isInstanceEnabled(instanceConfigMap.get(swapInInstanceName),
             clusterConfig)) {
-          _enabledSwapInInstanceNames.add(swapInInstanceName);
+          newEnabledSwapInInstanceNames.add(swapInInstanceName);
         }
       }
     });
+
+    // Replace caches with up-to-date instance sets.
+    _derivedInstanceCache =
+        new DerivedInstanceCache(newAssignableInstanceConfigMap, 
newAssignableLiveInstancesMap,
+            newSwapOutInstanceNameToSwapInInstanceName, 
newLiveSwapInInstanceNames,
+            newEnabledSwapInInstanceNames);
   }
 
   private void refreshResourceConfig(final HelixDataAccessor accessor,
@@ -506,7 +531,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
               .filter(e -> 
!_timedOutInstanceDuringMaintenance.contains(e.getKey()))
               .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
       _assignableLiveInstanceExcludeTimedOutForMaintenance =
-          _assignableLiveInstancesMap.entrySet().stream()
+          _derivedInstanceCache._assignableLiveInstancesMap.entrySet().stream()
               .filter(e -> 
!_timedOutInstanceDuringMaintenance.contains(e.getKey()))
               .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
     }
@@ -661,7 +686,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
       return 
Collections.unmodifiableMap(_assignableLiveInstanceExcludeTimedOutForMaintenance);
     }
 
-    return Collections.unmodifiableMap(_assignableLiveInstancesMap);
+    return 
Collections.unmodifiableMap(_derivedInstanceCache._assignableLiveInstancesMap);
   }
 
   /**
@@ -685,7 +710,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
    * @return A new set contains instance name
    */
   public Set<String> getAssignableInstances() {
-    return _assignableInstanceConfigMap.keySet();
+    return _derivedInstanceCache._assignableInstanceConfigMap.keySet();
   }
 
   /**
@@ -780,7 +805,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
    */
   public Set<String> getAssignableInstancesWithTag(String instanceTag) {
     Set<String> taggedInstances = new HashSet<>();
-    for (String instance : _assignableInstanceConfigMap.keySet()) {
+    for (String instance : 
_derivedInstanceCache._assignableInstanceConfigMap.keySet()) {
       InstanceConfig instanceConfig = 
_allInstanceConfigCache.getPropertyByName(instance);
       if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) {
         taggedInstances.add(instance);
@@ -796,7 +821,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
    */
   public Set<String> getInstancesWithTag(String instanceTag) {
     Set<String> taggedInstances = new HashSet<>();
-    for (String instance : _assignableInstanceConfigMap.keySet()) {
+    for (String instance : 
_derivedInstanceCache._assignableInstanceConfigMap.keySet()) {
       InstanceConfig instanceConfig = 
_allInstanceConfigCache.getPropertyByName(instance);
       if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) {
         taggedInstances.add(instance);
@@ -838,7 +863,8 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
    * @return a map of SWAP_OUT instanceNames and their corresponding SWAP_IN 
instanceNames.
    */
   public Map<String, String> getSwapOutToSwapInInstancePairs() {
-    return 
Collections.unmodifiableMap(_swapOutInstanceNameToSwapInInstanceName);
+    return Collections.unmodifiableMap(
+        _derivedInstanceCache._swapOutInstanceNameToSwapInInstanceName);
   }
 
   /**
@@ -847,7 +873,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
    * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT 
instance.
    */
   public Set<String> getLiveSwapInInstanceNames() {
-    return Collections.unmodifiableSet(_liveSwapInInstanceNames);
+    return 
Collections.unmodifiableSet(_derivedInstanceCache._liveSwapInInstanceNames);
   }
 
   /**
@@ -856,7 +882,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
    * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT 
instance.
    */
   public Set<String> getEnabledSwapInInstanceNames() {
-    return Collections.unmodifiableSet(_enabledSwapInInstanceNames);
+    return 
Collections.unmodifiableSet(_derivedInstanceCache._enabledSwapInInstanceNames);
   }
 
   public synchronized void setLiveInstances(List<LiveInstance> liveInstances) {
@@ -985,7 +1011,7 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
    * @return a map of instance name to instance config
    */
   public Map<String, InstanceConfig> getAssignableInstanceConfigMap() {
-    return Collections.unmodifiableMap(_assignableInstanceConfigMap);
+    return 
Collections.unmodifiableMap(_derivedInstanceCache._assignableInstanceConfigMap);
   }
 
   /**
@@ -1288,13 +1314,15 @@ public class BaseControllerDataProvider implements 
ControlContextProvider {
     StringBuilder sb = new StringBuilder();
     sb.append(String.format("liveInstaceMap: %s", 
_allLiveInstanceCache.getPropertyMap()))
         .append("\n");
-    sb.append(String.format("assignableLiveInstaceMap: %s", 
_assignableLiveInstancesMap))
+    sb.append(String.format("assignableLiveInstaceMap: %s",
+            _derivedInstanceCache._assignableLiveInstancesMap))
         .append("\n");
     sb.append(String.format("idealStateMap: %s", 
_idealStateCache.getPropertyMap())).append("\n");
     sb.append(String.format("stateModelDefMap: %s",  
_stateModelDefinitionCache.getPropertyMap())).append("\n");
     sb.append(String.format("instanceConfigMap: %s", 
_allInstanceConfigCache.getPropertyMap()))
         .append("\n");
-    sb.append(String.format("assignableInstanceConfigMap: %s", 
_assignableInstanceConfigMap))
+    sb.append(String.format("assignableInstanceConfigMap: %s",
+            _derivedInstanceCache._assignableInstanceConfigMap))
         .append("\n");
     sb.append(String.format("resourceConfigMap: %s", 
_resourceConfigCache.getPropertyMap())).append("\n");
     sb.append(String.format("messageCache: %s", 
_instanceMessagesCache)).append("\n");

Reply via email to