This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch replica_level_throttle
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/replica_level_throttle by this
push:
new 29aaf64 Change throttling logic to per message (#1714)
29aaf64 is described below
commit 29aaf644a755fab4aae27274f396115eda8ac493
Author: Junkai Xue <[email protected]>
AuthorDate: Wed Apr 28 13:20:09 2021 -0700
Change throttling logic to per message (#1714)
Apply the logic for throttling with per message quota charge.
---
.../stages/IntermediateStateCalcStage.java | 115 ++++++---------------
1 file changed, 33 insertions(+), 82 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index a840d5e..c46a6ce 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -631,104 +631,55 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
}
/**
- * Check the status on throttling at every level (cluster, resource,
instance) and set
- * intermediatePartitionStateMap accordingly per partition.
- * @param throttleController
- * @param resourceName
- * @param partition
- * @param currentStateOutput
- * @param bestPossiblePartitionStateMap
- * @param partitionsThrottled
- * @param intermediatePartitionStateMap
- * @param rebalanceType
- * @param cache
+ * Check the status for a single message on throttling at every level
(cluster, resource, replica) and set
+ * intermediatePartitionStateMap accordingly for that replica.
+ * @param throttleController throttle controller object for
throttling quota
+ * @param resourceName the resource for throttling check
+ * @param partition the partition for throttling
check
+ * @param messageToThrottle the message to be throttled
+ * @param messagesThrottled the cumulative set of messages
that have been throttled already. These
+ * messages represent the replicas
of this partition that have been throttled.
+ * @param intermediatePartitionStateMap the cumulative partition-state
mapping as a result of the throttling step
+ * of IntermediateStateCalcStage
+ * @param rebalanceType the rebalance type to charge
quota
+ * @param cache cached cluster metadata required
by the throttle controller
*/
- private void throttleStateTransitionsForPartition(
- StateTransitionThrottleController throttleController, String
resourceName,
- Partition partition, CurrentStateOutput currentStateOutput,
- PartitionStateMap bestPossiblePartitionStateMap, Set<Partition>
partitionsThrottled,
+ private void
throttleStateTransitionsForReplica(StateTransitionThrottleController
throttleController,
+ String resourceName, Partition partition, Message messageToThrottle,
Set<Message> messagesThrottled,
PartitionStateMap intermediatePartitionStateMap, RebalanceType
rebalanceType,
ResourceControllerDataProvider cache) {
-
- Map<String, String> currentStateMap =
- currentStateOutput.getCurrentStateMap(resourceName, partition);
- Map<String, String> bestPossibleMap =
bestPossiblePartitionStateMap.getPartitionMap(partition);
- Set<String> allInstances = new HashSet<>(currentStateMap.keySet());
- allInstances.addAll(bestPossibleMap.keySet());
- Map<String, String> intermediateMap = new HashMap<>();
-
boolean hasReachedThrottlingLimit = false;
if (throttleController.shouldThrottleForResource(rebalanceType,
resourceName)) {
hasReachedThrottlingLimit = true;
if (logger.isDebugEnabled()) {
- LogUtil.logDebug(logger, _eventId,
- String.format("Throttled on partition: %s in resource: %s",
- partition.getPartitionName(), resourceName));
+ LogUtil.logDebug(logger, _eventId, String.format(
+ "Throttled because of cluster/resource quota is full for message
{%s} on partition {%s} in resource {%s}",
+ messageToThrottle.getId(), partition.getPartitionName(),
resourceName));
}
} else {
- // throttle if any of the instances are not able to accept state
transitions
- for (String instance : allInstances) {
- String currentState = currentStateMap.get(instance);
- String bestPossibleState = bestPossibleMap.get(instance);
- if (bestPossibleState != null &&
!bestPossibleState.equals(currentState)
- && !cache.getDisabledInstancesForPartition(resourceName,
partition.getPartitionName())
- .contains(instance)) {
- if (throttleController.shouldThrottleForInstance(rebalanceType,
instance)) {
- hasReachedThrottlingLimit = true;
- if (logger.isDebugEnabled()) {
- LogUtil.logDebug(logger, _eventId,
- String.format(
- "Throttled because of instance: %s for partition: %s in
resource: %s",
- instance, partition.getPartitionName(), resourceName));
- }
- break;
+ // Since message already generated, we can assume the current state is
not null and target state is not null
+ if (!cache.getDisabledInstancesForPartition(resourceName,
partition.getPartitionName())
+ .contains(messageToThrottle.getTgtName())) {
+ if (throttleController.shouldThrottleForInstance(rebalanceType,
messageToThrottle.getTgtName())) {
+ hasReachedThrottlingLimit = true;
+ if (logger.isDebugEnabled()) {
+ LogUtil.logDebug(logger, _eventId, String.format(
+ "Throttled because of instance level quota is full on instance
{%s} for message {%s} of partition {%s} in resource {%s}",
+ messageToThrottle.getId(), messageToThrottle.getTgtName(),
partition.getPartitionName(), resourceName));
}
}
}
}
+ // If there is still room for this replica, proceed to charge at the
cluster and resource level and set the
+ // intermediate partition-state mapping so that the state transition
message can move forward.
if (!hasReachedThrottlingLimit) {
- // This implies that there is room for more state transitions.
- // Find instances with a replica whose current state is different from
BestPossibleState and
- // "charge" for it, and bestPossibleStates will become intermediate
states
- intermediateMap.putAll(bestPossibleMap);
- boolean shouldChargeForPartition = false;
- for (String instance : allInstances) {
- String currentState = currentStateMap.get(instance);
- String bestPossibleState = bestPossibleMap.get(instance);
- if (bestPossibleState != null &&
!bestPossibleState.equals(currentState)
- && !cache.getDisabledInstancesForPartition(resourceName,
partition.getPartitionName())
- .contains(instance)) {
- throttleController.chargeInstance(rebalanceType, instance);
- shouldChargeForPartition = true;
- }
- }
- if (shouldChargeForPartition) {
- throttleController.chargeCluster(rebalanceType);
- throttleController.chargeResource(rebalanceType, resourceName);
- }
+ throttleController.chargeCluster(rebalanceType);
+ throttleController.chargeResource(rebalanceType, resourceName);
+ intermediatePartitionStateMap.setState(partition,
messageToThrottle.getTgtName(), messageToThrottle.getToState());
} else {
- // No more room for more state transitions; current states will just
become intermediate
- // states unless the partition is disabled
- // Add this partition to a set of throttled partitions
- for (String instance : allInstances) {
- String currentState = currentStateMap.get(instance);
- String bestPossibleState = bestPossibleMap.get(instance);
- if (bestPossibleState != null &&
!bestPossibleState.equals(currentState)
- && cache.getDisabledInstancesForPartition(resourceName,
partition.getPartitionName())
- .contains(instance)) {
- // Because this partition is disabled, we allow assignment
- intermediateMap.put(instance, bestPossibleState);
- } else {
- // This partition is not disabled, so it must be throttled by just
passing on the current
- // state
- if (currentState != null) {
- intermediateMap.put(instance, currentState);
- }
- partitionsThrottled.add(partition);
- }
- }
+ // Intermediate Map is based on current state
+ messagesThrottled.add(messageToThrottle);
}
- intermediatePartitionStateMap.setState(partition, intermediateMap);
}
/**