This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch replica_level_throttle
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/replica_level_throttle by this
push:
new 6b9bddd [Replica Level Throttle] Apply per replica level throttling
logic to Recovery/Load Rebalance and PendingMessage (#1719)
6b9bddd is described below
commit 6b9bddd795968e49d1ebcc79e9761185090f6d97
Author: Junkai Xue <[email protected]>
AuthorDate: Thu May 6 13:13:46 2021 -0700
[Replica Level Throttle] Apply per replica level throttling logic to
Recovery/Load Rebalance and PendingMessage (#1719)
Apply per replica level throttling logic to Recovery/Load Rebalance and
PendingMessage
This commit contains change of:
1. Recovery/Load Rebalance and pending messages charge adapting per replica
throttling.
2. Change partition level comparator based on 1) missed top states, 2)
missed active replicas and 3) ideal state matching or not.
---
.../stages/IntermediateStateCalcStage.java | 299 +++++++--------------
1 file changed, 101 insertions(+), 198 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index c46a6ce..6a5d2f1 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -365,9 +365,7 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
resourceName, partitionsWithErrorStateReplica));
}
- chargePendingTransition(resource, currentStateOutput, throttleController,
- partitionsNeedRecovery, partitionsNeedLoadBalance, cache,
- bestPossiblePartitionStateMap, intermediatePartitionStateMap);
+ chargePendingTransition(resource, currentStateOutput, throttleController,
cache, preferenceLists, stateModelDef, intermediatePartitionStateMap);
// Perform recovery balance
Set<Partition> recoveryThrottledPartitions =
@@ -423,211 +421,114 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
}
/**
- * Check for a partition, whether all transitions for its replicas are
downward transitions. Note
- * that this function does NOT check for ERROR states.
- * @param currentStateMap
- * @param bestPossibleMap
- * @param stateModelDef
- * @return true if there are; false otherwise
+ * Determine the message is downward message or not.
+ * @param message message for load rebalance
+ * @param stateModelDefinition state model definition object for this
resource
+ * @return set of messages allowed for downward
state transitions
*/
- private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String>
currentStateMap,
- Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef)
{
- Set<String> allInstances = new HashSet<>();
- allInstances.addAll(currentStateMap.keySet());
- allInstances.addAll(bestPossibleMap.keySet());
- Map<String, Integer> statePriorityMap =
stateModelDef.getStatePriorityMap();
-
- for (String instance : allInstances) {
- String currentState = currentStateMap.get(instance);
- String bestPossibleState = bestPossibleMap.get(instance);
- if (currentState == null) {
- return false; // null -> state is upward
- }
- if (bestPossibleState != null) {
- // Compare priority values and return if an upward transition is found
- // Note that lower integer value implies higher priority
- if (!statePriorityMap.containsKey(currentState)
- || !statePriorityMap.containsKey(bestPossibleState)) {
- // If the state is not found in statePriorityMap, consider it not
strictly downward by
- // default because we can't determine whether it is downward
- return false;
- }
- if (statePriorityMap.get(currentState) >
statePriorityMap.get(bestPossibleState)) {
- return false;
- }
- }
- }
- return true;
+ private boolean isLoadBalanceDownwardStateTransition(Message message,
StateModelDefinition stateModelDefinition) {
+ // state model definition is not found
+ if (stateModelDefinition == null) {
+ return false;
+ }
+
+ Map<String, Integer> statePriorityMap =
stateModelDefinition.getStatePriorityMap();
+ // Compare priority values and return if an upward transition is found
+ // Note that lower integer value implies higher priority
+ // If the state is not found in statePriorityMap, consider it not strictly
downward by
+ // default because we can't determine whether it is downward
+ if (statePriorityMap.containsKey(message.getFromState()) &&
statePriorityMap.containsKey(message.getToState())
+ && statePriorityMap.get(message.getFromState()) >
statePriorityMap.get(message.getToState())) {
+ return true;
+ }
+ return false;
}
/**
* Check and charge all pending transitions for throttling.
*/
private void chargePendingTransition(Resource resource, CurrentStateOutput
currentStateOutput,
- StateTransitionThrottleController throttleController, Set<Partition>
partitionsNeedRecovery,
- Set<Partition> partitionsNeedLoadbalance, ResourceControllerDataProvider
cache,
- PartitionStateMap bestPossiblePartitionStateMap,
+ StateTransitionThrottleController throttleController,
ResourceControllerDataProvider cache,
+ Map<String, List<String>> preferenceLists, StateModelDefinition
stateModelDefinition,
PartitionStateMap intermediatePartitionStateMap) {
String resourceName = resource.getResourceName();
-
// check and charge pending transitions
for (Partition partition : resource.getPartitions()) {
+ // To clarify that custom mode does not apply recovery/load rebalance
since user can define different number of
+ // replicas for different partitions. Actually, the custom will stopped
from resource level checks if this resource
+ // is not FULL_AUTO, we will return best possible state and do nothing.
+ Map<String, Integer> requiredStates =
+ getRequiredStates(resourceName, cache,
preferenceLists.get(partition.getPartitionName()));
// Maps instance to its current state
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceName, partition);
+ Map<String, String> currentStateMap =
currentStateOutput.getCurrentStateMap(resourceName, partition);
// Maps instance to its pending (next) state
- Map<String, String> pendingMap =
- currentStateOutput.getPendingStateMap(resourceName, partition);
-
- StateTransitionThrottleConfig.RebalanceType rebalanceType =
RebalanceType.NONE;
- if (partitionsNeedRecovery.contains(partition)) {
- rebalanceType =
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE;
- } else if (partitionsNeedLoadbalance.contains(partition)) {
- rebalanceType =
StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE;
- }
-
- if (pendingMap.size() > 0) {
- boolean shouldChargePartition = false;
- for (String instance : pendingMap.keySet()) {
- String currentState = currentStateMap.get(instance);
- String pendingState = pendingMap.get(instance);
- if (pendingState != null && !pendingState.equals(currentState)
- && !cache.getDisabledInstancesForPartition(resourceName,
partition.getPartitionName())
- .contains(instance)) {
- // Only charge this instance if the partition is not disabled
- throttleController.chargeInstance(rebalanceType, instance);
- shouldChargePartition = true;
- // If there is a pending state transition for the partition, that
means that an assignment
- // has already been made and the state transition message has
already been sent out for the partition
- // in a previous pipeline run. We must honor this and reflect it
by charging for the pending state transition message.
-
- // Since the assignment has already been made for the pending
message, we do a special treatment
- // for it by setting the best possible state directly in
intermediatePartitionStateMap so that the pending
- // message won't be double-assigned or double-charged in recovery
or load balance.
- handlePendingStateTransitionsForThrottling(partition,
partitionsNeedRecovery,
- partitionsNeedLoadbalance, rebalanceType,
bestPossiblePartitionStateMap,
- intermediatePartitionStateMap);
- }
- }
- if (shouldChargePartition) {
- throttleController.chargeCluster(rebalanceType);
+ List<Message> pendingMessages =
+ new
ArrayList<>(currentStateOutput.getPendingMessageMap(resourceName,
partition).values());
+ Collections.sort(pendingMessages, new
MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()),
+ stateModelDefinition.getStatePriorityMap()));
+
+ for (Message message : pendingMessages) {
+ StateTransitionThrottleConfig.RebalanceType rebalanceType =
+ getRebalanceTypePerMessage(requiredStates, message,
currentStateMap);
+ String currentState = currentStateMap.get(message.getTgtName());
+ if (!message.getToState().equals(currentState) &&
message.getFromState().equals(currentState)
+ && !cache.getDisabledInstancesForPartition(resourceName,
partition.getPartitionName())
+ .contains(message.getTgtName())) {
+ throttleController.chargeInstance(rebalanceType,
message.getTgtName());
throttleController.chargeResource(rebalanceType, resourceName);
+ throttleController.chargeCluster(rebalanceType);
}
+ intermediatePartitionStateMap.setState(partition,
message.getTgtName(), message.getToState());
}
}
}
/**
- * Sort partitions according to partition priority {@link
PartitionPriorityComparator}, and for
- * each partition, throttle state transitions if needed. Also populate
- * intermediatePartitionStateMap either with BestPossibleState (if no
throttling is necessary) or
- * CurrentState (if throttled).
- * @param resource
- * @param bestPossiblePartitionStateMap
- * @param throttleController
- * @param intermediatePartitionStateMap
- * @param partitionsNeedRecovery
- * @param currentStateOutput
- * @param topState
- * @param cache
- * @return a set of partitions that need recovery but did not get recovered
due to throttling
+ * Thin wrapper for per message throttling with recovery rebalance type.
Also populate
+ * intermediatePartitionStateMap with generated messages from {@link
MessageGenerationPhase}.
+ * @param resource the resource to throttle
+ * @param throttleController throttle controller object
+ * @param messageToThrottle the message to be throttled
+ * @param intermediatePartitionStateMap output result for this stage that
intermediate state map
+ * @param cache cache object for computational
metadata from external storage
+ * @param messagesThrottled messages that have already been
throttled
+ * @param resourceMessageMap the map for all messages from
MessageSelectStage. Remove the message
+ * if it has been throttled
*/
- private Set<Partition> recoveryRebalance(Resource resource,
- PartitionStateMap bestPossiblePartitionStateMap,
- StateTransitionThrottleController throttleController,
- PartitionStateMap intermediatePartitionStateMap, Set<Partition>
partitionsNeedRecovery,
- CurrentStateOutput currentStateOutput, String topState,
- ResourceControllerDataProvider cache) {
- String resourceName = resource.getResourceName();
- Set<Partition> partitionRecoveryBalanceThrottled = new HashSet<>();
-
- // Maps Partition -> Instance -> State
- Map<Partition, Map<String, String>> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceName);
- List<Partition> partitionsNeedRecoveryPrioritized = new
ArrayList<>(partitionsNeedRecovery);
-
- // We want the result of the intermediate state calculation to be
deterministic. We sort here by
- // partition name to ensure that the order is consistent for inputs fed
into
- // PartitionPriorityComparator sort
-
partitionsNeedRecoveryPrioritized.sort(Comparator.comparing(Partition::getPartitionName));
- partitionsNeedRecoveryPrioritized.sort(new PartitionPriorityComparator(
- bestPossiblePartitionStateMap.getStateMap(), currentStateMap,
topState, true));
-
- // For each partition, apply throttling if needed.
- for (Partition partition : partitionsNeedRecoveryPrioritized) {
- throttleStateTransitionsForPartition(throttleController, resourceName,
partition,
- currentStateOutput, bestPossiblePartitionStateMap,
partitionRecoveryBalanceThrottled,
- intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE,
cache);
- }
- LogUtil.logInfo(logger, _eventId, String.format(
- "For resource %s: Num of partitions needing recovery: %d, Num of
partitions needing recovery"
- + " but throttled (not recovered): %d",
- resourceName, partitionsNeedRecovery.size(),
partitionRecoveryBalanceThrottled.size()));
- return partitionRecoveryBalanceThrottled;
+ private void recoveryRebalance(Resource resource, Partition partition,
+ StateTransitionThrottleController throttleController, Message
messageToThrottle,
+ PartitionStateMap intermediatePartitionStateMap,
ResourceControllerDataProvider cache,
+ Set<Message> messagesThrottled, Map<Partition, List<Message>>
resourceMessageMap) {
+ throttleStateTransitionsForReplica(throttleController,
resource.getResourceName(), partition, messageToThrottle,
+ messagesThrottled, intermediatePartitionStateMap,
RebalanceType.RECOVERY_BALANCE, cache, resourceMessageMap);
}
/**
- * Sort partitions according to partition priority {@link
PartitionPriorityComparator}, and for
- * each partition, throttle state transitions if needed. Also populate
- * intermediatePartitionStateMap either with BestPossibleState (if no
throttling is necessary) or
- * CurrentState (if throttled).
- * @param resource
- * @param currentStateOutput
- * @param bestPossiblePartitionStateMap
- * @param throttleController
- * @param intermediatePartitionStateMap
- * @param partitionsNeedLoadbalance
- * @param currentStateMap
- * @param onlyDownwardLoadBalance true when only allowing downward
transitions
- * @param stateModelDef for determining whether a partition's transitions
are strictly downward
- * @param cache
- * @return
+ * Thin wrapper for per message throttling with load rebalance type. Also
populate
+ * intermediatePartitionStateMap with generated messages from {@link
MessageGenerationPhase}.
+ * @param resource the resource to throttle
+ * @param throttleController throttle controller object
+ * @param messageToThrottle the message to be throttle
+ * @param intermediatePartitionStateMap output result for this stage that
intermediate state map
+ * @param cache cache object for computational
metadata from external storage
+ * @param onlyDownwardLoadBalance does allow only downward load balance
+ * @param stateModelDefinition state model definition of this
resource
+ * @param messagesThrottled messages are already throttled
+ * @param resourceMessageMap the map for all messages from
MessageSelectStage. Remove the message
+ * if it has been throttled
*/
- private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput
currentStateOutput,
- PartitionStateMap bestPossiblePartitionStateMap,
- StateTransitionThrottleController throttleController,
- PartitionStateMap intermediatePartitionStateMap, Set<Partition>
partitionsNeedLoadbalance,
- Map<Partition, Map<String, String>> currentStateMap, boolean
onlyDownwardLoadBalance,
- StateModelDefinition stateModelDef, ResourceControllerDataProvider
cache) {
- String resourceName = resource.getResourceName();
- Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>();
-
- List<Partition> partitionsNeedLoadRebalancePrioritized =
- new ArrayList<>(partitionsNeedLoadbalance);
-
- // We want the result of the intermediate state calculation to be
deterministic. We sort here by
- // partition name to ensure that the order is consistent for inputs fed
into
- // PartitionPriorityComparator sort
-
partitionsNeedLoadRebalancePrioritized.sort(Comparator.comparing(Partition::getPartitionName));
- partitionsNeedLoadRebalancePrioritized.sort(new
PartitionPriorityComparator(
- bestPossiblePartitionStateMap.getStateMap(), currentStateMap, "",
false));
-
- for (Partition partition : partitionsNeedLoadRebalancePrioritized) {
- // If this is a downward load balance, check if the partition's
transition is strictly
- // downward
- if (onlyDownwardLoadBalance) {
- Map<String, String> currentStateMapForPartition =
- currentStateOutput.getCurrentStateMap(resourceName, partition);
- Map<String, String> bestPossibleMapForPartition =
- bestPossiblePartitionStateMap.getPartitionMap(partition);
- if (!isLoadBalanceDownwardForAllReplicas(currentStateMapForPartition,
- bestPossibleMapForPartition, stateModelDef)) {
- // For downward load balance, if a partition's transitions are not
strictly downward,
- // set currentState to intermediateState
- intermediatePartitionStateMap.setState(partition,
currentStateMapForPartition);
- continue;
- }
- }
- throttleStateTransitionsForPartition(throttleController, resourceName,
partition,
- currentStateOutput, bestPossiblePartitionStateMap,
partitionsLoadbalanceThrottled,
- intermediatePartitionStateMap, RebalanceType.LOAD_BALANCE, cache);
- }
- LogUtil.logInfo(logger, _eventId,
- String.format(
- "For resource %s: Num of partitions needing load-balance: %d, Num
of partitions needing"
- + " load-balance but throttled (not load-balanced): %d",
- resourceName, partitionsNeedLoadbalance.size(),
partitionsLoadbalanceThrottled.size()));
- return partitionsLoadbalanceThrottled;
+ private void loadRebalance(Resource resource, Partition partition,
+ StateTransitionThrottleController throttleController, Message
messageToThrottle,
+ PartitionStateMap intermediatePartitionStateMap,
ResourceControllerDataProvider cache,
+ boolean onlyDownwardLoadBalance, StateModelDefinition
stateModelDefinition, Set<Message> messagesThrottled,
+ Map<Partition, List<Message>> resourceMessageMap) {
+ if (onlyDownwardLoadBalance &&
isLoadBalanceDownwardStateTransition(messageToThrottle, stateModelDefinition)) {
+ // Remove the message already allowed for downward state transitions.
+ intermediatePartitionStateMap.setState(partition,
messageToThrottle.getTgtName(), messageToThrottle.getToState());
+ return;
+ }
+ throttleStateTransitionsForReplica(throttleController,
resource.getResourceName(), partition, messageToThrottle,
+ messagesThrottled, intermediatePartitionStateMap,
RebalanceType.LOAD_BALANCE, cache, resourceMessageMap);
}
/**
@@ -643,11 +544,13 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
* of IntermediateStateCalcStage
* @param rebalanceType the rebalance type to charge
quota
* @param cache cached cluster metadata required
by the throttle controller
+ * @param resourceMessageMap the map for all messages from
MessageSelectStage. Remove the message
+ * if it has been throttled.
*/
private void
throttleStateTransitionsForReplica(StateTransitionThrottleController
throttleController,
String resourceName, Partition partition, Message messageToThrottle,
Set<Message> messagesThrottled,
PartitionStateMap intermediatePartitionStateMap, RebalanceType
rebalanceType,
- ResourceControllerDataProvider cache) {
+ ResourceControllerDataProvider cache, Map<Partition, List<Message>>
resourceMessageMap) {
boolean hasReachedThrottlingLimit = false;
if (throttleController.shouldThrottleForResource(rebalanceType,
resourceName)) {
hasReachedThrottlingLimit = true;
@@ -678,6 +581,9 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
intermediatePartitionStateMap.setState(partition,
messageToThrottle.getTgtName(), messageToThrottle.getToState());
} else {
// Intermediate Map is based on current state
+ // Remove the message from MessageSelection result if it has been
throttled since the message will be dispatched
+ // by next stage if it is not removed.
+ resourceMessageMap.get(partition).remove(messageToThrottle);
messagesThrottled.add(messageToThrottle);
}
}
@@ -912,29 +818,26 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
private Map<Partition, Map<String, String>> _bestPossibleMap;
private Map<Partition, Map<String, String>> _currentStateMap;
private String _topState;
- private boolean _recoveryRebalance;
PartitionPriorityComparator(Map<Partition, Map<String, String>>
bestPossibleMap,
- Map<Partition, Map<String, String>> currentStateMap, String topState,
- boolean recoveryRebalance) {
+ Map<Partition, Map<String, String>> currentStateMap, String topState){
_bestPossibleMap = bestPossibleMap;
_currentStateMap = currentStateMap;
_topState = topState;
- _recoveryRebalance = recoveryRebalance;
}
@Override
public int compare(Partition p1, Partition p2) {
- if (_recoveryRebalance) {
- int missTopState1 = getMissTopStateIndex(p1);
- int missTopState2 = getMissTopStateIndex(p2);
- // Highest priority for the partition without top state
- if (missTopState1 != missTopState2) {
- return Integer.compare(missTopState1, missTopState2);
- }
- // Higher priority for the partition with fewer active replicas
- int currentActiveReplicas1 = getCurrentActiveReplicas(p1);
- int currentActiveReplicas2 = getCurrentActiveReplicas(p2);
+ int missTopState1 = getMissTopStateIndex(p1);
+ int missTopState2 = getMissTopStateIndex(p2);
+ // Highest priority for the partition without top state
+ if (missTopState1 != missTopState2) {
+ return Integer.compare(missTopState1, missTopState2);
+ }
+ // Higher priority for the partition with fewer active replicas
+ int currentActiveReplicas1 = getCurrentActiveReplicas(p1);
+ int currentActiveReplicas2 = getCurrentActiveReplicas(p2);
+ if (currentActiveReplicas1 != currentActiveReplicas2) {
return Integer.compare(currentActiveReplicas1, currentActiveReplicas2);
}
// Higher priority for the partition with fewer replicas with states
matching with IdealState