This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch replica_level_throttle
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 78d274d672e04c94a6c163917fe547e8cb371873
Author: Junkai Xue <[email protected]>
AuthorDate: Thu May 6 13:22:48 2021 -0700

    Apply per replica level throttling logic to Recovery/Load Rebalance and 
PendingMessage #1719
    
    This commit contains change of:
    
    Recovery/Load Rebalance and pending messages charge adapting per replica 
throttling.
    Change partition level comparator based on 1) missed top states, 2) missed 
active replicas and 3) ideal state matching or not.
---
 .../stages/IntermediateStateCalcStage.java         | 299 +++++++--------------
 1 file changed, 101 insertions(+), 198 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index c46a6ce..6a5d2f1 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -365,9 +365,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
               resourceName, partitionsWithErrorStateReplica));
     }
 
-    chargePendingTransition(resource, currentStateOutput, throttleController,
-        partitionsNeedRecovery, partitionsNeedLoadBalance, cache,
-        bestPossiblePartitionStateMap, intermediatePartitionStateMap);
+    chargePendingTransition(resource, currentStateOutput, throttleController, 
cache, preferenceLists, stateModelDef, intermediatePartitionStateMap);
 
     // Perform recovery balance
     Set<Partition> recoveryThrottledPartitions =
@@ -423,211 +421,114 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
   }
 
   /**
-   * Check for a partition, whether all transitions for its replicas are 
downward transitions. Note
-   * that this function does NOT check for ERROR states.
-   * @param currentStateMap
-   * @param bestPossibleMap
-   * @param stateModelDef
-   * @return true if there are; false otherwise
+   * Determine the message is downward message or not.
+   * @param message                  message for load rebalance
+   * @param stateModelDefinition     state model definition object for this 
resource
+   * @return                         set of messages allowed for downward 
state transitions
    */
-  private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String> 
currentStateMap,
-      Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef) 
{
-    Set<String> allInstances = new HashSet<>();
-    allInstances.addAll(currentStateMap.keySet());
-    allInstances.addAll(bestPossibleMap.keySet());
-    Map<String, Integer> statePriorityMap = 
stateModelDef.getStatePriorityMap();
-
-    for (String instance : allInstances) {
-      String currentState = currentStateMap.get(instance);
-      String bestPossibleState = bestPossibleMap.get(instance);
-      if (currentState == null) {
-        return false; // null -> state is upward
-      }
-      if (bestPossibleState != null) {
-        // Compare priority values and return if an upward transition is found
-        // Note that lower integer value implies higher priority
-        if (!statePriorityMap.containsKey(currentState)
-            || !statePriorityMap.containsKey(bestPossibleState)) {
-          // If the state is not found in statePriorityMap, consider it not 
strictly downward by
-          // default because we can't determine whether it is downward
-          return false;
-        }
-        if (statePriorityMap.get(currentState) > 
statePriorityMap.get(bestPossibleState)) {
-          return false;
-        }
-      }
-    }
-    return true;
+  private boolean isLoadBalanceDownwardStateTransition(Message message, 
StateModelDefinition stateModelDefinition) {
+    // state model definition is not found
+    if (stateModelDefinition == null) {
+      return false;
+    }
+
+    Map<String, Integer> statePriorityMap = 
stateModelDefinition.getStatePriorityMap();
+    // Compare priority values and return if an upward transition is found
+    // Note that lower integer value implies higher priority
+    // If the state is not found in statePriorityMap, consider it not strictly 
downward by
+    // default because we can't determine whether it is downward
+    if (statePriorityMap.containsKey(message.getFromState()) && 
statePriorityMap.containsKey(message.getToState())
+        && statePriorityMap.get(message.getFromState()) > 
statePriorityMap.get(message.getToState())) {
+      return true;
+    }
+    return false;
   }
 
   /**
    * Check and charge all pending transitions for throttling.
    */
   private void chargePendingTransition(Resource resource, CurrentStateOutput 
currentStateOutput,
-      StateTransitionThrottleController throttleController, Set<Partition> 
partitionsNeedRecovery,
-      Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider 
cache,
-      PartitionStateMap bestPossiblePartitionStateMap,
+      StateTransitionThrottleController throttleController, 
ResourceControllerDataProvider cache,
+      Map<String, List<String>> preferenceLists, StateModelDefinition 
stateModelDefinition,
       PartitionStateMap intermediatePartitionStateMap) {
     String resourceName = resource.getResourceName();
-
     // check and charge pending transitions
     for (Partition partition : resource.getPartitions()) {
+      // To clarify that custom mode does not apply recovery/load rebalance 
since user can define different number of
+      // replicas for different partitions. Actually, the custom will stopped 
from resource level checks if this resource
+      // is not FULL_AUTO, we will return best possible state and do nothing.
+      Map<String, Integer> requiredStates =
+          getRequiredStates(resourceName, cache, 
preferenceLists.get(partition.getPartitionName()));
       // Maps instance to its current state
-      Map<String, String> currentStateMap =
-          currentStateOutput.getCurrentStateMap(resourceName, partition);
+      Map<String, String> currentStateMap = 
currentStateOutput.getCurrentStateMap(resourceName, partition);
       // Maps instance to its pending (next) state
-      Map<String, String> pendingMap =
-          currentStateOutput.getPendingStateMap(resourceName, partition);
-
-      StateTransitionThrottleConfig.RebalanceType rebalanceType = 
RebalanceType.NONE;
-      if (partitionsNeedRecovery.contains(partition)) {
-        rebalanceType = 
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE;
-      } else if (partitionsNeedLoadbalance.contains(partition)) {
-        rebalanceType = 
StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE;
-      }
-
-      if (pendingMap.size() > 0) {
-        boolean shouldChargePartition = false;
-        for (String instance : pendingMap.keySet()) {
-          String currentState = currentStateMap.get(instance);
-          String pendingState = pendingMap.get(instance);
-          if (pendingState != null && !pendingState.equals(currentState)
-              && !cache.getDisabledInstancesForPartition(resourceName, 
partition.getPartitionName())
-              .contains(instance)) {
-            // Only charge this instance if the partition is not disabled
-            throttleController.chargeInstance(rebalanceType, instance);
-            shouldChargePartition = true;
-            // If there is a pending state transition for the partition, that 
means that an assignment
-            // has already been made and the state transition message has 
already been sent out for the partition
-            // in a previous pipeline run. We must honor this and reflect it 
by charging for the pending state transition message.
-
-            // Since the assignment has already been made for the pending 
message, we do a special treatment
-            // for it by setting the best possible state directly in 
intermediatePartitionStateMap so that the pending
-            // message won't be double-assigned or double-charged in recovery 
or load balance.
-            handlePendingStateTransitionsForThrottling(partition, 
partitionsNeedRecovery,
-                partitionsNeedLoadbalance, rebalanceType, 
bestPossiblePartitionStateMap,
-                intermediatePartitionStateMap);
-          }
-        }
-        if (shouldChargePartition) {
-          throttleController.chargeCluster(rebalanceType);
+      List<Message> pendingMessages =
+          new 
ArrayList<>(currentStateOutput.getPendingMessageMap(resourceName, 
partition).values());
+      Collections.sort(pendingMessages, new 
MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()),
+          stateModelDefinition.getStatePriorityMap()));
+
+      for (Message message : pendingMessages) {
+        StateTransitionThrottleConfig.RebalanceType rebalanceType =
+            getRebalanceTypePerMessage(requiredStates, message, 
currentStateMap);
+        String currentState = currentStateMap.get(message.getTgtName());
+        if (!message.getToState().equals(currentState) && 
message.getFromState().equals(currentState)
+            && !cache.getDisabledInstancesForPartition(resourceName, 
partition.getPartitionName())
+            .contains(message.getTgtName())) {
+          throttleController.chargeInstance(rebalanceType, 
message.getTgtName());
           throttleController.chargeResource(rebalanceType, resourceName);
+          throttleController.chargeCluster(rebalanceType);
         }
+        intermediatePartitionStateMap.setState(partition, 
message.getTgtName(), message.getToState());
       }
     }
   }
 
   /**
-   * Sort partitions according to partition priority {@link 
PartitionPriorityComparator}, and for
-   * each partition, throttle state transitions if needed. Also populate
-   * intermediatePartitionStateMap either with BestPossibleState (if no 
throttling is necessary) or
-   * CurrentState (if throttled).
-   * @param resource
-   * @param bestPossiblePartitionStateMap
-   * @param throttleController
-   * @param intermediatePartitionStateMap
-   * @param partitionsNeedRecovery
-   * @param currentStateOutput
-   * @param topState
-   * @param cache
-   * @return a set of partitions that need recovery but did not get recovered 
due to throttling
+   * Thin wrapper for per message throttling with recovery rebalance type. 
Also populate
+   * intermediatePartitionStateMap with generated messages from {@link 
MessageGenerationPhase}.
+   * @param resource                      the resource to throttle
+   * @param throttleController            throttle controller object
+   * @param messageToThrottle             the message to be throttled
+   * @param intermediatePartitionStateMap output result for this stage that 
intermediate state map
+   * @param cache                         cache object for computational 
metadata from external storage
+   * @param messagesThrottled             messages that have already been 
throttled
+   * @param resourceMessageMap            the map for all messages from 
MessageSelectStage. Remove the message
+   *                                      if it has been throttled
    */
-  private Set<Partition> recoveryRebalance(Resource resource,
-      PartitionStateMap bestPossiblePartitionStateMap,
-      StateTransitionThrottleController throttleController,
-      PartitionStateMap intermediatePartitionStateMap, Set<Partition> 
partitionsNeedRecovery,
-      CurrentStateOutput currentStateOutput, String topState,
-      ResourceControllerDataProvider cache) {
-    String resourceName = resource.getResourceName();
-    Set<Partition> partitionRecoveryBalanceThrottled = new HashSet<>();
-
-    // Maps Partition -> Instance -> State
-    Map<Partition, Map<String, String>> currentStateMap =
-        currentStateOutput.getCurrentStateMap(resourceName);
-    List<Partition> partitionsNeedRecoveryPrioritized = new 
ArrayList<>(partitionsNeedRecovery);
-
-    // We want the result of the intermediate state calculation to be 
deterministic. We sort here by
-    // partition name to ensure that the order is consistent for inputs fed 
into
-    // PartitionPriorityComparator sort
-    
partitionsNeedRecoveryPrioritized.sort(Comparator.comparing(Partition::getPartitionName));
-    partitionsNeedRecoveryPrioritized.sort(new PartitionPriorityComparator(
-        bestPossiblePartitionStateMap.getStateMap(), currentStateMap, 
topState, true));
-
-    // For each partition, apply throttling if needed.
-    for (Partition partition : partitionsNeedRecoveryPrioritized) {
-      throttleStateTransitionsForPartition(throttleController, resourceName, 
partition,
-          currentStateOutput, bestPossiblePartitionStateMap, 
partitionRecoveryBalanceThrottled,
-          intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE, 
cache);
-    }
-    LogUtil.logInfo(logger, _eventId, String.format(
-        "For resource %s: Num of partitions needing recovery: %d, Num of 
partitions needing recovery"
-            + " but throttled (not recovered): %d",
-        resourceName, partitionsNeedRecovery.size(), 
partitionRecoveryBalanceThrottled.size()));
-    return partitionRecoveryBalanceThrottled;
+  private void recoveryRebalance(Resource resource, Partition partition,
+      StateTransitionThrottleController throttleController, Message 
messageToThrottle,
+      PartitionStateMap intermediatePartitionStateMap, 
ResourceControllerDataProvider cache,
+      Set<Message> messagesThrottled, Map<Partition, List<Message>> 
resourceMessageMap) {
+    throttleStateTransitionsForReplica(throttleController, 
resource.getResourceName(), partition, messageToThrottle,
+        messagesThrottled, intermediatePartitionStateMap, 
RebalanceType.RECOVERY_BALANCE, cache, resourceMessageMap);
   }
 
   /**
-   * Sort partitions according to partition priority {@link 
PartitionPriorityComparator}, and for
-   * each partition, throttle state transitions if needed. Also populate
-   * intermediatePartitionStateMap either with BestPossibleState (if no 
throttling is necessary) or
-   * CurrentState (if throttled).
-   * @param resource
-   * @param currentStateOutput
-   * @param bestPossiblePartitionStateMap
-   * @param throttleController
-   * @param intermediatePartitionStateMap
-   * @param partitionsNeedLoadbalance
-   * @param currentStateMap
-   * @param onlyDownwardLoadBalance true when only allowing downward 
transitions
-   * @param stateModelDef for determining whether a partition's transitions 
are strictly downward
-   * @param cache
-   * @return
+   * Thin wrapper for per message throttling with load rebalance type. Also 
populate
+   * intermediatePartitionStateMap with generated messages from {@link 
MessageGenerationPhase}.
+   * @param resource                      the resource to throttle
+   * @param throttleController            throttle controller object
+   * @param messageToThrottle             the message to be throttle
+   * @param intermediatePartitionStateMap output result for this stage that 
intermediate state map
+   * @param cache                         cache object for computational 
metadata from external storage
+   * @param onlyDownwardLoadBalance       does allow only downward load balance
+   * @param stateModelDefinition          state model definition of this 
resource
+   * @param messagesThrottled             messages are already throttled
+   * @param resourceMessageMap            the map for all messages from 
MessageSelectStage. Remove the message
+   *                                      if it has been throttled
    */
-  private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput 
currentStateOutput,
-      PartitionStateMap bestPossiblePartitionStateMap,
-      StateTransitionThrottleController throttleController,
-      PartitionStateMap intermediatePartitionStateMap, Set<Partition> 
partitionsNeedLoadbalance,
-      Map<Partition, Map<String, String>> currentStateMap, boolean 
onlyDownwardLoadBalance,
-      StateModelDefinition stateModelDef, ResourceControllerDataProvider 
cache) {
-    String resourceName = resource.getResourceName();
-    Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>();
-
-    List<Partition> partitionsNeedLoadRebalancePrioritized =
-        new ArrayList<>(partitionsNeedLoadbalance);
-
-    // We want the result of the intermediate state calculation to be 
deterministic. We sort here by
-    // partition name to ensure that the order is consistent for inputs fed 
into
-    // PartitionPriorityComparator sort
-    
partitionsNeedLoadRebalancePrioritized.sort(Comparator.comparing(Partition::getPartitionName));
-    partitionsNeedLoadRebalancePrioritized.sort(new 
PartitionPriorityComparator(
-        bestPossiblePartitionStateMap.getStateMap(), currentStateMap, "", 
false));
-
-    for (Partition partition : partitionsNeedLoadRebalancePrioritized) {
-      // If this is a downward load balance, check if the partition's 
transition is strictly
-      // downward
-      if (onlyDownwardLoadBalance) {
-        Map<String, String> currentStateMapForPartition =
-            currentStateOutput.getCurrentStateMap(resourceName, partition);
-        Map<String, String> bestPossibleMapForPartition =
-            bestPossiblePartitionStateMap.getPartitionMap(partition);
-        if (!isLoadBalanceDownwardForAllReplicas(currentStateMapForPartition,
-            bestPossibleMapForPartition, stateModelDef)) {
-          // For downward load balance, if a partition's transitions are not 
strictly downward,
-          // set currentState to intermediateState
-          intermediatePartitionStateMap.setState(partition, 
currentStateMapForPartition);
-          continue;
-        }
-      }
-      throttleStateTransitionsForPartition(throttleController, resourceName, 
partition,
-          currentStateOutput, bestPossiblePartitionStateMap, 
partitionsLoadbalanceThrottled,
-          intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE, cache);
-    }
-    LogUtil.logInfo(logger, _eventId,
-        String.format(
-            "For resource %s: Num of partitions needing load-balance: %d, Num 
of partitions needing"
-                + " load-balance but throttled (not load-balanced): %d",
-            resourceName, partitionsNeedLoadbalance.size(), 
partitionsLoadbalanceThrottled.size()));
-    return partitionsLoadbalanceThrottled;
+  private void loadRebalance(Resource resource, Partition partition,
+      StateTransitionThrottleController throttleController, Message 
messageToThrottle,
+      PartitionStateMap intermediatePartitionStateMap, 
ResourceControllerDataProvider cache,
+      boolean onlyDownwardLoadBalance, StateModelDefinition 
stateModelDefinition, Set<Message> messagesThrottled,
+      Map<Partition, List<Message>> resourceMessageMap) {
+    if (onlyDownwardLoadBalance && 
isLoadBalanceDownwardStateTransition(messageToThrottle, stateModelDefinition)) {
+      // Remove the message already allowed for downward state transitions.
+      intermediatePartitionStateMap.setState(partition, 
messageToThrottle.getTgtName(), messageToThrottle.getToState());
+      return;
+    }
+    throttleStateTransitionsForReplica(throttleController, 
resource.getResourceName(), partition, messageToThrottle,
+        messagesThrottled, intermediatePartitionStateMap, 
RebalanceType.LOAD_BALANCE, cache, resourceMessageMap);
   }
 
   /**
@@ -643,11 +544,13 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
    *                                          of IntermediateStateCalcStage
    * @param rebalanceType                     the rebalance type to charge 
quota
    * @param cache                             cached cluster metadata required 
by the throttle controller
+   * @param resourceMessageMap                the map for all messages from 
MessageSelectStage. Remove the message
+   *                                          if it has been throttled.
    */
   private void 
throttleStateTransitionsForReplica(StateTransitionThrottleController 
throttleController,
       String resourceName, Partition partition, Message messageToThrottle, 
Set<Message> messagesThrottled,
       PartitionStateMap intermediatePartitionStateMap, RebalanceType 
rebalanceType,
-      ResourceControllerDataProvider cache) {
+      ResourceControllerDataProvider cache, Map<Partition, List<Message>> 
resourceMessageMap) {
     boolean hasReachedThrottlingLimit = false;
     if (throttleController.shouldThrottleForResource(rebalanceType, 
resourceName)) {
       hasReachedThrottlingLimit = true;
@@ -678,6 +581,9 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       intermediatePartitionStateMap.setState(partition, 
messageToThrottle.getTgtName(), messageToThrottle.getToState());
     } else {
       // Intermediate Map is based on current state
+      // Remove the message from MessageSelection result if it has been 
throttled since the message will be dispatched
+      // by next stage if it is not removed.
+      resourceMessageMap.get(partition).remove(messageToThrottle);
       messagesThrottled.add(messageToThrottle);
     }
   }
@@ -912,29 +818,26 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     private Map<Partition, Map<String, String>> _bestPossibleMap;
     private Map<Partition, Map<String, String>> _currentStateMap;
     private String _topState;
-    private boolean _recoveryRebalance;
 
     PartitionPriorityComparator(Map<Partition, Map<String, String>> 
bestPossibleMap,
-        Map<Partition, Map<String, String>> currentStateMap, String topState,
-        boolean recoveryRebalance) {
+        Map<Partition, Map<String, String>> currentStateMap, String topState){
       _bestPossibleMap = bestPossibleMap;
       _currentStateMap = currentStateMap;
       _topState = topState;
-      _recoveryRebalance = recoveryRebalance;
     }
 
     @Override
     public int compare(Partition p1, Partition p2) {
-      if (_recoveryRebalance) {
-        int missTopState1 = getMissTopStateIndex(p1);
-        int missTopState2 = getMissTopStateIndex(p2);
-        // Highest priority for the partition without top state
-        if (missTopState1 != missTopState2) {
-          return Integer.compare(missTopState1, missTopState2);
-        }
-        // Higher priority for the partition with fewer active replicas
-        int currentActiveReplicas1 = getCurrentActiveReplicas(p1);
-        int currentActiveReplicas2 = getCurrentActiveReplicas(p2);
+      int missTopState1 = getMissTopStateIndex(p1);
+      int missTopState2 = getMissTopStateIndex(p2);
+      // Highest priority for the partition without top state
+      if (missTopState1 != missTopState2) {
+        return Integer.compare(missTopState1, missTopState2);
+      }
+      // Higher priority for the partition with fewer active replicas
+      int currentActiveReplicas1 = getCurrentActiveReplicas(p1);
+      int currentActiveReplicas2 = getCurrentActiveReplicas(p2);
+      if (currentActiveReplicas1 != currentActiveReplicas2) {
         return Integer.compare(currentActiveReplicas1, currentActiveReplicas2);
       }
       // Higher priority for the partition with fewer replicas with states 
matching with IdealState

Reply via email to