This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new fcbd7822a Code cleanup and improvement in intermediate state calc
stage related (#2456)
fcbd7822a is described below
commit fcbd7822a468b0a639d9c863de4bca71853296cf
Author: Qi (Quincy) Qu <[email protected]>
AuthorDate: Fri Apr 28 17:08:50 2023 -0400
Code cleanup and improvement in intermediate state calc stage related
(#2456)
---
.../stages/IntermediateStateCalcStage.java | 73 ++++++++--------------
.../stages/StateTransitionThrottleController.java | 54 ++++++----------
2 files changed, 45 insertions(+), 82 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index dbd433b3e..00c49ad91 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -66,7 +65,6 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
public void process(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
CurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.name());
-
BestPossibleStateOutput bestPossibleStateOutput =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
Map<String, Resource> resourceToRebalance =
@@ -124,7 +122,7 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
// priority.
List<ResourcePriority> prioritizedResourceList = new ArrayList<>();
for (String resourceName : resourceMap.keySet()) {
- prioritizedResourceList.add(new ResourcePriority(resourceName,
Integer.MIN_VALUE));
+ prioritizedResourceList.add(new ResourcePriority(resourceName));
}
// If resourcePriorityField is null at the cluster level, all resources
will be considered equal
// in priority by keeping all priorities at MIN_VALUE
@@ -147,7 +145,7 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
dataCache.getIdealState(resourceName).getRecord().getSimpleField(priorityField));
}
}
- prioritizedResourceList.sort(new ResourcePriorityComparator());
+ Collections.sort(prioritizedResourceList);
}
ClusterStatusMonitor clusterStatusMonitor =
@@ -315,7 +313,7 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
String stateModelDefName = idealState.getStateModelDefRef();
StateModelDefinition stateModelDef =
cache.getStateModelDef(stateModelDefName);
- // This require a deep copy of current state map because some of the
states will be overwritten by applying
+ // This requires a deep copy of current state map because some of the
states will be overwritten by applying
// messages to it.
Set<Partition> partitionsWithErrorStateReplica = new HashSet<>();
@@ -357,9 +355,8 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
// Sort partitions in case of urgent partition need to take the quota
first.
List<Partition> partitions = new ArrayList<>(resource.getPartitions());
- Collections.sort(partitions,
- new
PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(),
- currentStateOutput.getCurrentStateMap(resourceName),
stateModelDef.getTopState()));
+ partitions.sort(new
PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(),
+ currentStateOutput.getCurrentStateMap(resourceName),
stateModelDef.getTopState()));
for (Partition partition : partitions) {
if (resourceMessageMap.get(partition) == null ||
resourceMessageMap.get(partition)
.isEmpty()) {
@@ -368,11 +365,10 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
List<Message> messagesToThrottle = new
ArrayList<>(resourceMessageMap.get(partition));
Map<String, String> derivedCurrentStateMap =
currentStateOutput.getCurrentStateMap(resourceName,
partition).entrySet().stream()
- .collect(Collectors.toMap(entry -> entry.getKey(), entry ->
entry.getValue()));
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
List<String> preferenceList =
preferenceLists.get(partition.getPartitionName());
Map<String, Integer> requiredState = getRequiredStates(resourceName,
cache, preferenceList);
- Collections.sort(messagesToThrottle,
- new MessagePriorityComparator(preferenceList,
stateModelDef.getStatePriorityMap()));
+ messagesToThrottle.sort(new MessagePriorityComparator(preferenceList,
stateModelDef.getStatePriorityMap()));
for (Message message : messagesToThrottle) {
RebalanceType rebalanceType =
getRebalanceTypePerMessage(requiredState, message,
derivedCurrentStateMap);
@@ -454,13 +450,9 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
// Note that lower integer value implies higher priority
// If the state is not found in statePriorityMap, consider it not strictly
downward by
// default because we can't determine whether it is downward
- if (statePriorityMap.containsKey(message.getFromState()) &&
statePriorityMap
- .containsKey(message.getToState())
- && statePriorityMap.get(message.getFromState()) < statePriorityMap
- .get(message.getToState())) {
- return true;
- }
- return false;
+ return statePriorityMap.containsKey(message.getFromState())
+ && statePriorityMap.containsKey(message.getToState())
+ && statePriorityMap.get(message.getFromState()) <
statePriorityMap.get(message.getToState());
}
/**
@@ -483,9 +475,8 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
// Maps instance to its pending (next) state
List<Message> pendingMessages = new ArrayList<>(
currentStateOutput.getPendingMessageMap(resourceName,
partition).values());
- Collections.sort(pendingMessages,
- new
MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()),
- stateModelDefinition.getStatePriorityMap()));
+ pendingMessages.sort(new
MessagePriorityComparator(preferenceLists.get(partition.getPartitionName()),
+ stateModelDefinition.getStatePriorityMap()));
for (Message message : pendingMessages) {
StateTransitionThrottleConfig.RebalanceType rebalanceType =
@@ -657,12 +648,10 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
// Generate a state mapping, state -> required numbers based on the live
and enabled instances for this partition
// preference list
- LinkedHashMap<String, Integer> expectedStateCountMap =
stateModelDefinition.getStateCountMap(
+ return stateModelDefinition.getStateCountMap(
(int) preferenceList.stream()
.filter(i ->
resourceControllerDataProvider.getEnabledLiveInstances().contains(i))
.count(), requiredNumReplica); // StateModelDefinition's counts
-
- return expectedStateCountMap;
}
/**
@@ -709,17 +698,17 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
/**
* POJO that maps resource name to its priority represented by an integer.
*/
- private static class ResourcePriority {
- private String _resourceName;
- private int _priority;
+ private static class ResourcePriority implements
Comparable<ResourcePriority> {
+ private final String _resourceName;
+ private int _priority = Integer.MIN_VALUE;
- ResourcePriority(String resourceName, Integer priority) {
+ ResourcePriority(String resourceName) {
_resourceName = resourceName;
- _priority = priority;
}
+ @Override
public int compareTo(ResourcePriority resourcePriority) {
- return Integer.compare(_priority, resourcePriority._priority);
+ return Integer.compare(resourcePriority._priority, _priority);
}
public String getResourceName() {
@@ -736,16 +725,9 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
}
}
- private static class ResourcePriorityComparator implements
Comparator<ResourcePriority> {
- @Override
- public int compare(ResourcePriority priority1, ResourcePriority priority2)
{
- return priority2.compareTo(priority1);
- }
- }
-
- private class MessagePriorityComparator implements Comparator<Message> {
- private Map<String, Integer> _preferenceInstanceMap;
- private Map<String, Integer> _statePriorityMap;
+ private static class MessagePriorityComparator implements
Comparator<Message> {
+ private final Map<String, Integer> _preferenceInstanceMap;
+ private final Map<String, Integer> _statePriorityMap;
MessagePriorityComparator(List<String> preferenceList, Map<String,
Integer> statePriorityMap) {
// Get instance -> priority map.
@@ -776,10 +758,10 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
// Compare partitions according following standard:
// 1) Partition without top state always is the highest priority.
// 2) For partition with top-state, the more number of active replica it
has, the less priority.
- private class PartitionPriorityComparator implements Comparator<Partition> {
- private Map<Partition, Map<String, String>> _bestPossibleMap;
- private Map<Partition, Map<String, String>> _currentStateMap;
- private String _topState;
+ private static class PartitionPriorityComparator implements
Comparator<Partition> {
+ private final Map<Partition, Map<String, String>> _bestPossibleMap;
+ private final Map<Partition, Map<String, String>> _currentStateMap;
+ private final String _topState;
PartitionPriorityComparator(Map<Partition, Map<String, String>>
bestPossibleMap,
Map<Partition, Map<String, String>> currentStateMap, String topState) {
@@ -813,8 +795,7 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
private int getMissTopStateIndex(Partition partition) {
// 0 if no replicas in top-state, 1 if it has at least one replica in
top-state.
- if (!_currentStateMap.containsKey(partition) ||
!_currentStateMap.get(partition).values()
- .contains(_topState)) {
+ if (!_currentStateMap.containsKey(partition) ||
!_currentStateMap.get(partition).containsValue(_topState)) {
return 0;
}
return 1;
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
index 4d110afdb..2b3ad6d6b 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/StateTransitionThrottleController.java
@@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory;
* off.
*/
class StateTransitionThrottleController {
- private static Logger logger =
LoggerFactory.getLogger(StateTransitionThrottleController.class);
+ private static final Logger logger =
LoggerFactory.getLogger(StateTransitionThrottleController.class);
// pending allowed transition counts in the cluster level for recovery and
load balance
Map<StateTransitionThrottleConfig.RebalanceType, Long>
_pendingTransitionAllowedInCluster;
@@ -75,23 +75,15 @@ class StateTransitionThrottleController {
break;
case RESOURCE:
for (String resource : resources) {
- if (!_pendingTransitionAllowedPerResource.containsKey(resource)) {
- _pendingTransitionAllowedPerResource.put(resource,
- new HashMap<StateTransitionThrottleConfig.RebalanceType,
Long>());
- }
-
_pendingTransitionAllowedPerResource.get(resource).put(config.getRebalanceType(),
- config.getMaxPartitionInTransition());
+ _pendingTransitionAllowedPerResource.computeIfAbsent(resource, k ->
new HashMap<>())
+ .put(config.getRebalanceType(),
config.getMaxPartitionInTransition());
}
_throttleEnabled = true;
break;
case INSTANCE:
for (String instance : liveInstances) {
- if (!_pendingTransitionAllowedPerInstance.containsKey(instance)) {
- _pendingTransitionAllowedPerInstance.put(instance,
- new HashMap<StateTransitionThrottleConfig.RebalanceType,
Long>());
- }
-
_pendingTransitionAllowedPerInstance.get(instance).put(config.getRebalanceType(),
- config.getMaxPartitionInTransition());
+ _pendingTransitionAllowedPerInstance.computeIfAbsent(instance, k ->
new HashMap<>())
+ .put(config.getRebalanceType(),
config.getMaxPartitionInTransition());
}
_throttleEnabled = true;
break;
@@ -132,18 +124,7 @@ class StateTransitionThrottleController {
*/
protected boolean shouldThrottleForResource(
StateTransitionThrottleConfig.RebalanceType rebalanceType, String
resourceName) {
- if (shouldThrottleForCluster(rebalanceType)) {
- return true;
- }
- Long resourceThrottle;
- if (_pendingTransitionAllowedPerResource.containsKey(resourceName)) {
- resourceThrottle =
_pendingTransitionAllowedPerResource.get(resourceName).get(rebalanceType);
- if
(shouldThrottleForANYType(_pendingTransitionAllowedPerResource.get(resourceName))
- || (resourceThrottle != null && resourceThrottle <= 0)) {
- return true;
- }
- }
- return false;
+ return shouldThrottleForGivenMap(rebalanceType, resourceName,
_pendingTransitionAllowedPerResource);
}
/**
@@ -155,16 +136,20 @@ class StateTransitionThrottleController {
*/
protected boolean shouldThrottleForInstance(
StateTransitionThrottleConfig.RebalanceType rebalanceType, String
instanceName) {
+ return shouldThrottleForGivenMap(rebalanceType, instanceName,
_pendingTransitionAllowedPerInstance);
+ }
+
+ private boolean
shouldThrottleForGivenMap(StateTransitionThrottleConfig.RebalanceType
rebalanceType, String entryName,
+ Map<String, Map<StateTransitionThrottleConfig.RebalanceType, Long>>
transitionAllowedPerEntry) {
if (shouldThrottleForCluster(rebalanceType)) {
return true;
}
Long instanceThrottle;
- if (_pendingTransitionAllowedPerInstance.containsKey(instanceName)) {
- instanceThrottle =
_pendingTransitionAllowedPerInstance.get(instanceName).get(rebalanceType);
- if
(shouldThrottleForANYType(_pendingTransitionAllowedPerInstance.get(instanceName))
- || (instanceThrottle != null && instanceThrottle <= 0)) {
- return true;
- }
+ if (transitionAllowedPerEntry.containsKey(entryName)) {
+ Map<StateTransitionThrottleConfig.RebalanceType, Long> throttleConfigMap
=
+ transitionAllowedPerEntry.get(entryName);
+ instanceThrottle = throttleConfigMap.get(rebalanceType);
+ return (instanceThrottle != null && instanceThrottle <= 0) ||
shouldThrottleForANYType(throttleConfigMap);
}
return false;
}
@@ -223,11 +208,8 @@ class StateTransitionThrottleController {
private boolean shouldThrottleForANYType(
Map<StateTransitionThrottleConfig.RebalanceType, Long>
pendingTransitionAllowed) {
if
(pendingTransitionAllowed.containsKey(StateTransitionThrottleConfig.RebalanceType.ANY))
{
- Long anyTypeThrottle =
-
pendingTransitionAllowed.get(StateTransitionThrottleConfig.RebalanceType.ANY);
- if (anyTypeThrottle != null && anyTypeThrottle <= 0) {
- return true;
- }
+ Long anyTypeThrottle =
pendingTransitionAllowed.get(StateTransitionThrottleConfig.RebalanceType.ANY);
+ return anyTypeThrottle != null && anyTypeThrottle <= 0;
}
return false;
}