This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push: new b8baca989 Change assignable instance set caches to only be replaced after each update instead of reusing same maps and sets (#2746) b8baca989 is described below commit b8baca9897e4b64e0395cde3618a6d45b4891ebe Author: Zachary Pinto <zapi...@linkedin.com> AuthorDate: Tue Jan 30 15:56:45 2024 -0800 Change assignable instance set caches to only be replaced after each update instead of reusing same maps and sets (#2746) Build new DerivedInstanceCache object when updating instance maps and sets from caches and atomically swap the value for _derivedInstanceCache in order to stop having ConcurrentModificationErrors in async global and partial rebalance stages. --- .../dataproviders/BaseControllerDataProvider.java | 92 ++++++++++++++-------- 1 file changed, 60 insertions(+), 32 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java index c6c799dd1..1e40bbb72 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java @@ -119,13 +119,32 @@ public class BaseControllerDataProvider implements ControlContextProvider { private final Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>(); private final Set<String> _disabledInstanceSet = new HashSet<>(); - // Assignable instances are instances will contain at most one instance with a given logicalId. - // This is used for SWAP related operations where there can be two instances with the same logicalId. - private final Map<String, InstanceConfig> _assignableInstanceConfigMap = new HashMap<>(); - private final Map<String, LiveInstance> _assignableLiveInstancesMap = new HashMap<>(); - private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName = new HashMap<>(); - private final Set<String> _liveSwapInInstanceNames = new HashSet<>(); - private final Set<String> _enabledSwapInInstanceNames = new HashSet<>(); + private static final class DerivedInstanceCache { + // Assignable instances are instances will contain at most one instance with a given logicalId. + // This is used for SWAP related operations where there can be two instances with the same logicalId. + private final Map<String, InstanceConfig> _assignableInstanceConfigMap; + private final Map<String, LiveInstance> _assignableLiveInstancesMap; + private final Map<String, String> _swapOutInstanceNameToSwapInInstanceName; + private final Set<String> _liveSwapInInstanceNames; + private final Set<String> _enabledSwapInInstanceNames; + + DerivedInstanceCache(Map<String, InstanceConfig> assignableInstanceConfigMap, + Map<String, LiveInstance> assignableLiveInstancesMap, + Map<String, String> swapOutInstanceNameToSwapInInstanceName, + Set<String> liveSwapInInstanceNames, Set<String> enabledSwapInInstanceNames) { + _assignableInstanceConfigMap = assignableInstanceConfigMap; + _assignableLiveInstancesMap = assignableLiveInstancesMap; + _swapOutInstanceNameToSwapInInstanceName = swapOutInstanceNameToSwapInInstanceName; + _liveSwapInInstanceNames = liveSwapInInstanceNames; + _enabledSwapInInstanceNames = enabledSwapInInstanceNames; + } + } + + // All maps and sets are encapsulated in DerivedInstanceCache to ensure that they are updated together + // as a snapshot. + private DerivedInstanceCache _derivedInstanceCache = + new DerivedInstanceCache(new HashMap<>(), new HashMap<>(), new HashMap<>(), new HashSet<>(), + new HashSet<>()); private final Map<String, MonitoredAbnormalResolver> _abnormalStateResolverMap = new HashMap<>(); private final Set<String> _timedOutInstanceDuringMaintenance = new HashSet<>(); private Map<String, LiveInstance> _allLiveInstanceExcludeTimedOutForMaintenance = new HashMap<>(); @@ -363,12 +382,12 @@ public class BaseControllerDataProvider implements ControlContextProvider { ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig); - // Clear all caches - _assignableInstanceConfigMap.clear(); - _assignableLiveInstancesMap.clear(); - _swapOutInstanceNameToSwapInInstanceName.clear(); - _liveSwapInInstanceNames.clear(); - _enabledSwapInInstanceNames.clear(); + // Create new caches to be populated. + Map<String, InstanceConfig> newAssignableInstanceConfigMap = new HashMap<>(); + Map<String, LiveInstance> newAssignableLiveInstancesMap = new HashMap<>(); + Map<String, String> newSwapOutInstanceNameToSwapInInstanceName = new HashMap<>(); + Set<String> newLiveSwapInInstanceNames = new HashSet<>(); + Set<String> newEnabledSwapInInstanceNames = new HashSet<>(); Map<String, String> filteredInstancesByLogicalId = new HashMap<>(); Map<String, String> swapOutLogicalIdsByInstanceName = new HashMap<>(); @@ -403,14 +422,14 @@ public class BaseControllerDataProvider implements ControlContextProvider { // instance with this instance. If this instance has no InstanceOperation, then replace the filtered instance // with this instance. This is the case where the SWAP_IN node has been marked as complete or SWAP_IN exists and // SWAP_OUT does not. There can never be a case where both have no InstanceOperation set. - _assignableInstanceConfigMap.remove(filteredNode); - _assignableInstanceConfigMap.put(node, currentInstanceConfig); + newAssignableInstanceConfigMap.remove(filteredNode); + newAssignableInstanceConfigMap.put(node, currentInstanceConfig); filteredInstancesByLogicalId.put(currentInstanceLogicalId, node); } } else if (!currentInstanceConfig.getInstanceOperation() .equals(InstanceConstants.InstanceOperation.EVACUATE.name())) { // EVACUATE instances are not considered to be assignable. - _assignableInstanceConfigMap.put(node, currentInstanceConfig); + newAssignableInstanceConfigMap.put(node, currentInstanceConfig); filteredInstancesByLogicalId.put(currentInstanceLogicalId, node); } @@ -429,24 +448,30 @@ public class BaseControllerDataProvider implements ControlContextProvider { } liveInstancesMap.forEach((instanceName, liveInstance) -> { - if (_assignableInstanceConfigMap.containsKey(instanceName)) { - _assignableLiveInstancesMap.put(instanceName, liveInstance); + if (newAssignableInstanceConfigMap.containsKey(instanceName)) { + newAssignableLiveInstancesMap.put(instanceName, liveInstance); } }); swapOutLogicalIdsByInstanceName.forEach((swapOutInstanceName, value) -> { String swapInInstanceName = swapInInstancesByLogicalId.get(value); if (swapInInstanceName != null) { - _swapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, swapInInstanceName); + newSwapOutInstanceNameToSwapInInstanceName.put(swapOutInstanceName, swapInInstanceName); if (liveInstancesMap.containsKey(swapInInstanceName)) { - _liveSwapInInstanceNames.add(swapInInstanceName); + newLiveSwapInInstanceNames.add(swapInInstanceName); } if (InstanceValidationUtil.isInstanceEnabled(instanceConfigMap.get(swapInInstanceName), clusterConfig)) { - _enabledSwapInInstanceNames.add(swapInInstanceName); + newEnabledSwapInInstanceNames.add(swapInInstanceName); } } }); + + // Replace caches with up-to-date instance sets. + _derivedInstanceCache = + new DerivedInstanceCache(newAssignableInstanceConfigMap, newAssignableLiveInstancesMap, + newSwapOutInstanceNameToSwapInInstanceName, newLiveSwapInInstanceNames, + newEnabledSwapInInstanceNames); } private void refreshResourceConfig(final HelixDataAccessor accessor, @@ -506,7 +531,7 @@ public class BaseControllerDataProvider implements ControlContextProvider { .filter(e -> !_timedOutInstanceDuringMaintenance.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); _assignableLiveInstanceExcludeTimedOutForMaintenance = - _assignableLiveInstancesMap.entrySet().stream() + _derivedInstanceCache._assignableLiveInstancesMap.entrySet().stream() .filter(e -> !_timedOutInstanceDuringMaintenance.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } @@ -661,7 +686,7 @@ public class BaseControllerDataProvider implements ControlContextProvider { return Collections.unmodifiableMap(_assignableLiveInstanceExcludeTimedOutForMaintenance); } - return Collections.unmodifiableMap(_assignableLiveInstancesMap); + return Collections.unmodifiableMap(_derivedInstanceCache._assignableLiveInstancesMap); } /** @@ -685,7 +710,7 @@ public class BaseControllerDataProvider implements ControlContextProvider { * @return A new set contains instance name */ public Set<String> getAssignableInstances() { - return _assignableInstanceConfigMap.keySet(); + return _derivedInstanceCache._assignableInstanceConfigMap.keySet(); } /** @@ -780,7 +805,7 @@ public class BaseControllerDataProvider implements ControlContextProvider { */ public Set<String> getAssignableInstancesWithTag(String instanceTag) { Set<String> taggedInstances = new HashSet<>(); - for (String instance : _assignableInstanceConfigMap.keySet()) { + for (String instance : _derivedInstanceCache._assignableInstanceConfigMap.keySet()) { InstanceConfig instanceConfig = _allInstanceConfigCache.getPropertyByName(instance); if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) { taggedInstances.add(instance); @@ -796,7 +821,7 @@ public class BaseControllerDataProvider implements ControlContextProvider { */ public Set<String> getInstancesWithTag(String instanceTag) { Set<String> taggedInstances = new HashSet<>(); - for (String instance : _assignableInstanceConfigMap.keySet()) { + for (String instance : _derivedInstanceCache._assignableInstanceConfigMap.keySet()) { InstanceConfig instanceConfig = _allInstanceConfigCache.getPropertyByName(instance); if (instanceConfig != null && instanceConfig.containsTag(instanceTag)) { taggedInstances.add(instance); @@ -838,7 +863,8 @@ public class BaseControllerDataProvider implements ControlContextProvider { * @return a map of SWAP_OUT instanceNames and their corresponding SWAP_IN instanceNames. */ public Map<String, String> getSwapOutToSwapInInstancePairs() { - return Collections.unmodifiableMap(_swapOutInstanceNameToSwapInInstanceName); + return Collections.unmodifiableMap( + _derivedInstanceCache._swapOutInstanceNameToSwapInInstanceName); } /** @@ -847,7 +873,7 @@ public class BaseControllerDataProvider implements ControlContextProvider { * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT instance. */ public Set<String> getLiveSwapInInstanceNames() { - return Collections.unmodifiableSet(_liveSwapInInstanceNames); + return Collections.unmodifiableSet(_derivedInstanceCache._liveSwapInInstanceNames); } /** @@ -856,7 +882,7 @@ public class BaseControllerDataProvider implements ControlContextProvider { * @return a set of SWAP_IN instanceNames that have a corresponding SWAP_OUT instance. */ public Set<String> getEnabledSwapInInstanceNames() { - return Collections.unmodifiableSet(_enabledSwapInInstanceNames); + return Collections.unmodifiableSet(_derivedInstanceCache._enabledSwapInInstanceNames); } public synchronized void setLiveInstances(List<LiveInstance> liveInstances) { @@ -985,7 +1011,7 @@ public class BaseControllerDataProvider implements ControlContextProvider { * @return a map of instance name to instance config */ public Map<String, InstanceConfig> getAssignableInstanceConfigMap() { - return Collections.unmodifiableMap(_assignableInstanceConfigMap); + return Collections.unmodifiableMap(_derivedInstanceCache._assignableInstanceConfigMap); } /** @@ -1288,13 +1314,15 @@ public class BaseControllerDataProvider implements ControlContextProvider { StringBuilder sb = new StringBuilder(); sb.append(String.format("liveInstaceMap: %s", _allLiveInstanceCache.getPropertyMap())) .append("\n"); - sb.append(String.format("assignableLiveInstaceMap: %s", _assignableLiveInstancesMap)) + sb.append(String.format("assignableLiveInstaceMap: %s", + _derivedInstanceCache._assignableLiveInstancesMap)) .append("\n"); sb.append(String.format("idealStateMap: %s", _idealStateCache.getPropertyMap())).append("\n"); sb.append(String.format("stateModelDefMap: %s", _stateModelDefinitionCache.getPropertyMap())).append("\n"); sb.append(String.format("instanceConfigMap: %s", _allInstanceConfigCache.getPropertyMap())) .append("\n"); - sb.append(String.format("assignableInstanceConfigMap: %s", _assignableInstanceConfigMap)) + sb.append(String.format("assignableInstanceConfigMap: %s", + _derivedInstanceCache._assignableInstanceConfigMap)) .append("\n"); sb.append(String.format("resourceConfigMap: %s", _resourceConfigCache.getPropertyMap())).append("\n"); sb.append(String.format("messageCache: %s", _instanceMessagesCache)).append("\n");