This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new fcbd7822a Code cleanup and improvement in intermediate state calc 
stage related (#2456)
fcbd7822a is described below

commit fcbd7822a468b0a639d9c863de4bca71853296cf
Author: Qi (Quincy) Qu <[email protected]>
AuthorDate: Fri Apr 28 17:08:50 2023 -0400

    Code cleanup and improvement in intermediate state calc stage related 
(#2456)
---
 .../stages/IntermediateStateCalcStage.java         | 73 ++++++++--------------
 .../stages/StateTransitionThrottleController.java  | 54 ++++++----------
 2 files changed, 45 insertions(+), 82 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index dbd433b3e..00c49ad91 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -66,7 +65,6 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
   public void process(ClusterEvent event) throws Exception {
     _eventId = event.getEventId();
     CurrentStateOutput currentStateOutput = 
event.getAttribute(AttributeName.CURRENT_STATE.name());
-
     BestPossibleStateOutput bestPossibleStateOutput =
         event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
     Map<String, Resource> resourceToRebalance =
@@ -124,7 +122,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     // priority.
     List<ResourcePriority> prioritizedResourceList = new ArrayList<>();
     for (String resourceName : resourceMap.keySet()) {
-      prioritizedResourceList.add(new ResourcePriority(resourceName, 
Integer.MIN_VALUE));
+      prioritizedResourceList.add(new ResourcePriority(resourceName));
     }
     // If resourcePriorityField is null at the cluster level, all resources 
will be considered equal
     // in priority by keeping all priorities at MIN_VALUE
@@ -147,7 +145,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
               
dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField));
         }
       }
-      prioritizedResourceList.sort(new ResourcePriorityComparator());
+      Collections.sort(prioritizedResourceList);
     }
 
     ClusterStatusMonitor clusterStatusMonitor =
@@ -315,7 +313,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
 
     String stateModelDefName = idealState.getStateModelDefRef();
     StateModelDefinition stateModelDef = 
cache.getStateModelDef(stateModelDefName);
-    // This require a deep copy of current state map because some of the 
states will be overwritten by applying
+    // This requires a deep copy of current state map because some of the 
states will be overwritten by applying
     // messages to it.
 
     Set<Partition> partitionsWithErrorStateReplica = new HashSet<>();
@@ -357,9 +355,8 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
 
     // Sort partitions in case of urgent partition need to take the quota 
first.
     List<Partition> partitions = new ArrayList<>(resource.getPartitions());
-    Collections.sort(partitions,
-        new 
PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(),
-            currentStateOutput.getCurrentStateMap(resourceName), 
stateModelDef.getTopState()));
+    partitions.sort(new 
PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(),
+        currentStateOutput.getCurrentStateMap(resourceName), 
stateModelDef.getTopState()));
     for (Partition partition : partitions) {
       if (resourceMessageMap.get(partition) == null || 
resourceMessageMap.get(partition)
           .isEmpty()) {
@@ -368,11 +365,10 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       List<Message> messagesToThrottle = new 
ArrayList<>(resourceMessageMap.get(partition));
       Map<String, String> derivedCurrentStateMap =
           currentStateOutput.getCurrentStateMap(resourceName, 
partition).entrySet().stream()
-              .collect(Collectors.toMap(entry -> entry.getKey(), entry -> 
entry.getValue()));
+              .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
       List<String> preferenceList = 
preferenceLists.get(partition.getPartitionName());
       Map<String, Integer> requiredState = getRequiredStates(resourceName, 
cache, preferenceList);
-      Collections.sort(messagesToThrottle,
-          new MessagePriorityComparator(preferenceList, 
stateModelDef.getStatePriorityMap()));
+      messagesToThrottle.sort(new MessagePriorityComparator(preferenceList, 
stateModelDef.getStatePriorityMap()));
       for (Message message : messagesToThrottle) {
         RebalanceType rebalanceType =
             getRebalanceTypePerMessage(requiredState, message, 
derivedCurrentStateMap);
@@ -454,13 +450,9 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     // Note that lower integer value implies higher priority
     // If the state is not found in statePriorityMap, consider it not strictly 
downward by
     // default because we can't determine whether it is downward
-    if (statePriorityMap.containsKey(message.getFromState()) && 
statePriorityMap
-        .containsKey(message.getToState())
-        && statePriorityMap.get(message.getFromState()) < statePriorityMap
-        .get(message.getToState())) {
-      return true;
-    }
-    return false;
+    return statePriorityMap.containsKey(message.getFromState())
+        && statePriorityMap.containsKey(message.getToState())
+        && statePriorityMap.get(message.getFromState()) < 
statePriorityMap.get(message.getToState());
   }
 
   /**
@@ -483,9 +475,8 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       // Maps instance to its pending (next) state
       List<Message> pendingMessages = new ArrayList<>(
           currentStateOutput.getPendingMessageMap(resourceName, 
partition).values());
-      Collections.sort(pendingMessages,
-          new 
MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()),
-              stateModelDefinition.getStatePriorityMap()));
+      pendingMessages.sort(new 
MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()),
+          stateModelDefinition.getStatePriorityMap()));
 
       for (Message message : pendingMessages) {
         StateTransitionThrottleConfig.RebalanceType rebalanceType =
@@ -657,12 +648,10 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
 
     // Generate a state mapping, state -> required numbers based on the live 
and enabled instances for this partition
     // preference list
-    LinkedHashMap<String, Integer> expectedStateCountMap = 
stateModelDefinition.getStateCountMap(
+    return stateModelDefinition.getStateCountMap(
         (int) preferenceList.stream()
             .filter(i -> 
resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
             .count(), requiredNumReplica); // StateModelDefinition's counts
-
-    return expectedStateCountMap;
   }
 
   /**
@@ -709,17 +698,17 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
   /**
    * POJO that maps resource name to its priority represented by an integer.
    */
-  private static class ResourcePriority {
-    private String _resourceName;
-    private int _priority;
+  private static class ResourcePriority implements 
Comparable<ResourcePriority> {
+    private final String _resourceName;
+    private int _priority = Integer.MIN_VALUE;
 
-    ResourcePriority(String resourceName, Integer priority) {
+    ResourcePriority(String resourceName) {
       _resourceName = resourceName;
-      _priority = priority;
     }
 
+    @Override
     public int compareTo(ResourcePriority resourcePriority) {
-      return Integer.compare(_priority, resourcePriority._priority);
+      return Integer.compare(resourcePriority._priority, _priority);
     }
 
     public String getResourceName() {
@@ -736,16 +725,9 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     }
   }
 
-  private static class ResourcePriorityComparator implements 
Comparator<ResourcePriority> {
-    @Override
-    public int compare(ResourcePriority priority1, ResourcePriority priority2) 
{
-      return priority2.compareTo(priority1);
-    }
-  }
-
-  private class MessagePriorityComparator implements Comparator<Message> {
-    private Map<String, Integer> _preferenceInstanceMap;
-    private Map<String, Integer> _statePriorityMap;
+  private static class MessagePriorityComparator implements 
Comparator<Message> {
+    private final Map<String, Integer> _preferenceInstanceMap;
+    private final Map<String, Integer> _statePriorityMap;
 
     MessagePriorityComparator(List<String> preferenceList, Map<String, 
Integer> statePriorityMap) {
       // Get instance -> priority map.
@@ -776,10 +758,10 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
   // Compare partitions according following standard:
   // 1) Partition without top state always is the highest priority.
   // 2) For partition with top-state, the more number of active replica it 
has, the less priority.
-  private class PartitionPriorityComparator implements Comparator<Partition> {
-    private Map<Partition, Map<String, String>> _bestPossibleMap;
-    private Map<Partition, Map<String, String>> _currentStateMap;
-    private String _topState;
+  private static class PartitionPriorityComparator implements 
Comparator<Partition> {
+    private final Map<Partition, Map<String, String>> _bestPossibleMap;
+    private final Map<Partition, Map<String, String>> _currentStateMap;
+    private final String _topState;
 
     PartitionPriorityComparator(Map<Partition, Map<String, String>> 
bestPossibleMap,
         Map<Partition, Map<String, String>> currentStateMap, String topState) {
@@ -813,8 +795,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
 
     private int getMissTopStateIndex(Partition partition) {
       // 0 if no replicas in top-state, 1 if it has at least one replica in 
top-state.
-      if (!_currentStateMap.containsKey(partition) || 
!_currentStateMap.get(partition).values()
-          .contains(_topState)) {
+      if (!_currentStateMap.containsKey(partition) || 
!_currentStateMap.get(partition).containsValue(_topState)) {
         return 0;
       }
       return 1;
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
index 4d110afdb..2b3ad6d6b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
  * off.
  */
 class StateTransitionThrottleController {
-  private static Logger logger = 
LoggerFactory.getLogger(StateTransitionThrottleController.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(StateTransitionThrottleController.class);
 
   // pending allowed transition counts in the cluster level for recovery and 
load balance
   Map<StateTransitionThrottleConfig.RebalanceType, Long> 
_pendingTransitionAllowedInCluster;
@@ -75,23 +75,15 @@ class StateTransitionThrottleController {
         break;
       case RESOURCE:
         for (String resource : resources) {
-          if (!_pendingTransitionAllowedPerResource.containsKey(resource)) {
-            _pendingTransitionAllowedPerResource.put(resource,
-                new HashMap<StateTransitionThrottleConfig.RebalanceType, 
Long>());
-          }
-          
_pendingTransitionAllowedPerResource.get(resource).put(config.getRebalanceType(),
-              config.getMaxPartitionInTransition());
+          _pendingTransitionAllowedPerResource.computeIfAbsent(resource, k -> 
new HashMap<>())
+              .put(config.getRebalanceType(), 
config.getMaxPartitionInTransition());
         }
         _throttleEnabled = true;
         break;
       case INSTANCE:
         for (String instance : liveInstances) {
-          if (!_pendingTransitionAllowedPerInstance.containsKey(instance)) {
-            _pendingTransitionAllowedPerInstance.put(instance,
-                new HashMap<StateTransitionThrottleConfig.RebalanceType, 
Long>());
-          }
-          
_pendingTransitionAllowedPerInstance.get(instance).put(config.getRebalanceType(),
-              config.getMaxPartitionInTransition());
+          _pendingTransitionAllowedPerInstance.computeIfAbsent(instance, k -> 
new HashMap<>())
+              .put(config.getRebalanceType(), 
config.getMaxPartitionInTransition());
         }
         _throttleEnabled = true;
         break;
@@ -132,18 +124,7 @@ class StateTransitionThrottleController {
    */
   protected boolean shouldThrottleForResource(
       StateTransitionThrottleConfig.RebalanceType rebalanceType, String 
resourceName) {
-    if (shouldThrottleForCluster(rebalanceType)) {
-      return true;
-    }
-    Long resourceThrottle;
-    if (_pendingTransitionAllowedPerResource.containsKey(resourceName)) {
-      resourceThrottle = 
_pendingTransitionAllowedPerResource.get(resourceName).get(rebalanceType);
-      if 
(shouldThrottleForANYType(_pendingTransitionAllowedPerResource.get(resourceName))
-          || (resourceThrottle != null && resourceThrottle <= 0)) {
-        return true;
-      }
-    }
-    return false;
+    return shouldThrottleForGivenMap(rebalanceType, resourceName, 
_pendingTransitionAllowedPerResource);
   }
 
   /**
@@ -155,16 +136,20 @@ class StateTransitionThrottleController {
    */
   protected boolean shouldThrottleForInstance(
       StateTransitionThrottleConfig.RebalanceType rebalanceType, String 
instanceName) {
+    return shouldThrottleForGivenMap(rebalanceType, instanceName, 
_pendingTransitionAllowedPerInstance);
+  }
+
+  private boolean 
shouldThrottleForGivenMap(StateTransitionThrottleConfig.RebalanceType 
rebalanceType, String entryName,
+      Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>> 
transitionAllowedPerEntry) {
     if (shouldThrottleForCluster(rebalanceType)) {
       return true;
     }
     Long instanceThrottle;
-    if (_pendingTransitionAllowedPerInstance.containsKey(instanceName)) {
-      instanceThrottle = 
_pendingTransitionAllowedPerInstance.get(instanceName).get(rebalanceType);
-      if 
(shouldThrottleForANYType(_pendingTransitionAllowedPerInstance.get(instanceName))
-          || (instanceThrottle != null && instanceThrottle <= 0)) {
-        return true;
-      }
+    if (transitionAllowedPerEntry.containsKey(entryName)) {
+      Map<StateTransitionThrottleConfig.RebalanceType, Long> throttleConfigMap 
=
+          transitionAllowedPerEntry.get(entryName);
+      instanceThrottle = throttleConfigMap.get(rebalanceType);
+      return (instanceThrottle != null && instanceThrottle <= 0) || 
shouldThrottleForANYType(throttleConfigMap);
     }
     return false;
   }
@@ -223,11 +208,8 @@ class StateTransitionThrottleController {
   private boolean shouldThrottleForANYType(
       Map<StateTransitionThrottleConfig.RebalanceType, Long> 
pendingTransitionAllowed) {
     if 
(pendingTransitionAllowed.containsKey(StateTransitionThrottleConfig.RebalanceType.ANY))
 {
-      Long anyTypeThrottle =
-          
pendingTransitionAllowed.get(StateTransitionThrottleConfig.RebalanceType.ANY);
-      if (anyTypeThrottle != null && anyTypeThrottle <= 0) {
-        return true;
-      }
+      Long anyTypeThrottle = 
pendingTransitionAllowed.get(StateTransitionThrottleConfig.RebalanceType.ANY);
+      return anyTypeThrottle != null && anyTypeThrottle <= 0;
     }
     return false;
   }

Reply via email to