Repository: helix Updated Branches: refs/heads/master c97a97508 -> 37f3d4c8d
[HELIX-711] Allow downward state transition during recovery and add recovery threshold Previously, a single partition requiring recovery balance would block all types of load-balance. This commit allows all downward state transitions (load balance) to happen even when recovery balance is happening in the same cycle. As for non-downward state transitions load-balance, a parameter, ErrorOrRecoveryPartitionThresholdForLoadBalance, was added to ClusterConfig. If the number of partitions requiring recovery is lower than the threshold, non-downward load-balance will take place in the same cycle as recovery balance; otherwise, non-downward load-balance will not take place in the same cycle. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/37f3d4c8 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/37f3d4c8 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/37f3d4c8 Branch: refs/heads/master Commit: 37f3d4c8dadd7cadeebad4fb41e2d4b1c38601fa Parents: c97a975 Author: Hunter Lee <[email protected]> Authored: Thu Jun 28 18:06:04 2018 -0700 Committer: Hunter Lee <[email protected]> Committed: Thu Jun 28 18:06:04 2018 -0700 ---------------------------------------------------------------------- .../config/StateTransitionThrottleConfig.java | 4 +- .../stages/IntermediateStateCalcStage.java | 94 +++++-- .../org/apache/helix/model/ClusterConfig.java | 105 +++++--- .../stages/TestIntermediateStateCalcStage.java | 159 ++++++++++-- .../stages/TestRecoveryLoadBalance.java | 199 ++++++++++++++ .../TestRecoveryLoadBalance.MasterSlave.json | 257 +++++++++++++++++++ .../TestRecoveryLoadBalance.OnlineOffline.json | 206 +++++++++++++++ 7 files changed, 952 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java index 37662aa..548ae31 100644 --- a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java +++ b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java @@ -48,9 +48,9 @@ public class StateTransitionThrottleConfig { } public enum RebalanceType { - LOAD_BALANCE, + LOAD_BALANCE, // A rebalance type for load balance excluding dropping a replica RECOVERY_BALANCE, - ANY, // A type used for general throttling (to account for all types of rebalance) + ANY, // A rebalance type used for general throttling (to account for all types of rebalance) NONE } http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index ba5a29a..e70e420 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -53,8 +53,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { if (currentStateOutput == null || bestPossibleStateOutput == null || resourceMap == null || cache == null) { throw new StageException(String.format("Missing attributes in event: %s. " - + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache (%s)", - event, currentStateOutput, bestPossibleStateOutput, resourceMap, cache)); + + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES (%s) |DataCache (%s)", event, + currentStateOutput, bestPossibleStateOutput, resourceMap, cache)); } IntermediateStateOutput intermediateStateOutput = @@ -273,9 +273,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { partitionsNeedRecovery.add(partition); isRebalanceNeeded = true; } + } else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) { // Number of states required by StateModelDefinition are satisfied, but to achieve // BestPossibleState, need load balance - } else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) { partitionsNeedLoadBalance.add(partition); isRebalanceNeeded = true; } @@ -308,28 +308,48 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { intermediatePartitionStateMap, partitionsNeedRecovery, currentStateOutput, cache.getStateModelDef(resource.getStateModelDefRef()).getTopState()); - // Perform load balance + // Perform load balance upon checking conditions below Set<Partition> loadbalanceThrottledPartitions = partitionsNeedLoadBalance; + ClusterConfig clusterConfig = cache.getClusterConfig(); + + // If the threshold (ErrorOrRecovery) is set, then use it, if not, then check if the old + // threshold (Error) is set. If the old threshold is set, use it. If not, use the default value + // for the new one. This is for backward-compatibility + int threshold = 1; // Default threshold for ErrorOrRecoveryPartitionThresholdForLoadBalance + int partitionCount = partitionsWithErrorStateReplica.size(); + if (clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance() != -1) { + // ErrorOrRecovery is set + threshold = clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance(); + partitionCount += partitionsNeedRecovery.size(); // Only add this count when the threshold is set + } else { + if (clusterConfig.getErrorPartitionThresholdForLoadBalance() != 0) { + // 0 is the default value so the old threshold has been set + threshold = clusterConfig.getErrorPartitionThresholdForLoadBalance(); + } + } - long maxAllowedErrorPartitions = - cache.getClusterConfig().getErrorPartitionThresholdForLoadBalance(); - - // TODO: Logic here needs change - when there is an error partition, load should still happen - - // Perform load balance only if - // 1. no recovery operation to be scheduled - // 2. error partition count is less than configured limitation - if (partitionsNeedRecovery.isEmpty() && (maxAllowedErrorPartitions < 0 - || partitionsWithErrorStateReplica.size() <= maxAllowedErrorPartitions)) { + // Perform load balance only if the number of partitions in recovery and in error is less than + // the threshold + if (partitionCount < threshold) { loadbalanceThrottledPartitions = loadRebalance(resource, currentStateOutput, bestPossiblePartitionStateMap, throttleController, intermediatePartitionStateMap, partitionsNeedLoadBalance, currentStateOutput.getCurrentStateMap(resourceName)); } else { - // Skip load balance and current states just become intermediate states + // Only allow dropping of replicas to happen (dropping does NOT need to be throttled) and skip + // load balance for this cycle for (Partition partition : partitionsNeedLoadBalance) { Map<String, String> currentStateMap = currentStateOutput.getCurrentStateMap(resourceName, partition); + Map<String, String> bestPossibleMap = + bestPossiblePartitionStateMap.getPartitionMap(partition); + // Skip load balance by passing current state to intermediate state intermediatePartitionStateMap.setState(partition, currentStateMap); + + // Check if this partition only has downward state transitions; if so, allow state + // transitions by setting it at bestPossibleState + if (isLoadBalanceDownwardForAllReplicas(currentStateMap, bestPossibleMap, stateModelDef)) { + intermediatePartitionStateMap.setState(partition, bestPossibleMap); + } } } @@ -351,6 +371,38 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } /** + * Check for a partition, whether all transitions for its replicas are downward transitions. Note + * that this function does NOT check for ERROR states. + * @param currentStateMap + * @param bestPossibleMap + * @param stateModelDef + * @return true if there are; false otherwise + */ + private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String> currentStateMap, + Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef) { + Set<String> allInstances = new HashSet<>(); + allInstances.addAll(currentStateMap.keySet()); + allInstances.addAll(bestPossibleMap.keySet()); + Map<String, Integer> statePriorityMap = stateModelDef.getStatePriorityMap(); + + for (String instance : allInstances) { + String currentState = currentStateMap.get(instance); + String bestPossibleState = bestPossibleMap.get(instance); + if (currentState == null) { + return false; // null -> state is upward + } + if (bestPossibleState != null) { + // Compare priority values and return if an upward transition is found + // Note that lower integer value implies higher priority + if (statePriorityMap.get(currentState) > statePriorityMap.get(bestPossibleState)) { + return false; + } + } + } + return true; + } + + /** * Check and charge all pending transitions for throttling. */ private void chargePendingTransition(Resource resource, CurrentStateOutput currentStateOutput, @@ -437,7 +489,8 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE); } logger.info(String.format( - "For resource %s: Num of partitions needing recovery: %d, Num of partitions needing recovery but throttled (not recovered): %d", + "For resource %s: Num of partitions needing recovery: %d, Num of partitions needing recovery" + + " but throttled (not recovered): %d", resourceName, partitionsNeedRecovery.size(), partitionRecoveryBalanceThrottled.size())); return partitionRecoveryBalanceThrottled; } @@ -453,14 +506,14 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { * @param throttleController * @param intermediatePartitionStateMap * @param partitionsNeedLoadbalance - * @param currentStateMaps + * @param currentStateMap * @return a set of partitions that need to be load-balanced but did not due to throttling */ private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput currentStateOutput, PartitionStateMap bestPossiblePartitionStateMap, StateTransitionThrottleController throttleController, PartitionStateMap intermediatePartitionStateMap, Set<Partition> partitionsNeedLoadbalance, - Map<Partition, Map<String, String>> currentStateMaps) { + Map<Partition, Map<String, String>> currentStateMap) { String resourceName = resource.getResourceName(); Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>(); @@ -478,7 +531,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { } }); Collections.sort(partitionsNeedLoadRebalancePrioritized, new PartitionPriorityComparator( - bestPossiblePartitionStateMap.getStateMap(), currentStateMaps, "", false)); + bestPossiblePartitionStateMap.getStateMap(), currentStateMap, "", false)); for (Partition partition : partitionsNeedLoadRebalancePrioritized) { throttleStateTransitionsForPartition(throttleController, resourceName, partition, currentStateOutput, bestPossiblePartitionStateMap, partitionsLoadbalanceThrottled, @@ -592,7 +645,6 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { stateModelDef.getStateCountMap(activeList.size(), replica); // StateModelDefinition's counts Map<String, Integer> currentStateCounts = StateModelDefinition.getStateCounts(currentStateMap); // Current // counts - // Go through each state and compare counts for (String state : expectedStateCountMap.keySet()) { Integer expectedCount = expectedStateCountMap.get(state); @@ -643,7 +695,7 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { if (logger.isDebugEnabled()) { logger.debug("Partitions need recovery: {}\nPartitions get throttled on recovery: {}", recoveryPartitions, recoveryThrottledPartitions); - logger.debug("Partitions need loadbalance: {}\nPartitions get throttled on load-balance: ", + logger.debug("Partitions need loadbalance: {}\nPartitions get throttled on load-balance: {}", loadbalancePartitions, loadbalanceThrottledPartitions); } http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index ce60888..704ff5d 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -38,20 +38,22 @@ import org.apache.helix.api.config.StateTransitionTimeoutConfig; public class ClusterConfig extends HelixProperty { /** * Configurable characteristics of a cluster. - * - * NOTE: Do NOT use this field name directly, use its corresponding getter/setter in the ClusterConfig. + * NOTE: Do NOT use this field name directly, use its corresponding getter/setter in the + * ClusterConfig. */ public enum ClusterConfigProperty { HELIX_DISABLE_PIPELINE_TRIGGERS, PERSIST_BEST_POSSIBLE_ASSIGNMENT, PERSIST_INTERMEDIATE_ASSIGNMENT, - TOPOLOGY, // cluster topology definition, for example, "/zone/rack/host/instance" - FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places the replicas from same partition. + TOPOLOGY, // cluster topology definition, for example, "/zone/rack/host/instance" + FAULT_ZONE_TYPE, // the type in which isolation should be applied on when Helix places the + // replicas from same partition. TOPOLOGY_AWARE_ENABLED, // whether topology aware rebalance is enabled. @Deprecated - DELAY_REBALANCE_DISABLED, // disabled the delayed rebalaning in case node goes offline. - DELAY_REBALANCE_ENABLED, // whether the delayed rebalaning is enabled. - DELAY_REBALANCE_TIME, // delayed time in ms that the delay time Helix should hold until rebalancing. + DELAY_REBALANCE_DISABLED, // disabled the delayed rebalaning in case node goes offline. + DELAY_REBALANCE_ENABLED, // whether the delayed rebalaning is enabled. + DELAY_REBALANCE_TIME, // delayed time in ms that the delay time Helix should hold until + // rebalancing. STATE_TRANSITION_THROTTLE_CONFIGS, STATE_TRANSITION_CANCELLATION_ENABLED, MISS_TOP_STATE_DURATION_THRESHOLD, @@ -61,16 +63,34 @@ public class ClusterConfig extends HelixProperty { MAX_PARTITIONS_PER_INSTANCE, MAX_OFFLINE_INSTANCES_ALLOWED, TARGET_EXTERNALVIEW_ENABLED, - ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute load balance state transition if the number of partitons that need recovery exceeds this limitation - DISABLED_INSTANCES + @Deprecated // ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE will take + // precedence if it is set + ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute load balance state + // transition if the number of partitons that need + // recovery exceeds this limitation + ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute load balance + // state transition if the number of + // partitons that need recovery or in + // error exceeds this limitation + DISABLED_INSTANCES, + VIEW_CLUSTER, // Set to "true" to indicate this is a view cluster + VIEW_CLUSTER_SOURCES, // Map field, key is the name of source cluster, value is + // ViewClusterSourceConfig JSON string + VIEW_CLUSTER_REFRESH_PERIOD, // In second } + private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40; - private final static int DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE = 0; // By default, no load balance if any error partition - private final static String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!"; + // By default, no load balance if any error partition + @Deprecated + private final static int DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE = 0; + // By default, no load balance if any error or recovery partition. -1 implies that the threshold + // is not set and will be given a default value of 1 + private final static int DEFAULT_ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE = -1; + private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!"; + private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30; /** * Instantiate for a specific cluster - * * @param cluster the cluster identifier */ public ClusterConfig(String cluster) { @@ -79,7 +99,6 @@ public class ClusterConfig extends HelixProperty { /** * Instantiate with a pre-populated record - * * @param record a ZNRecord corresponding to a cluster configuration */ public ClusterConfig(ZNRecord record) { @@ -459,23 +478,52 @@ public class ClusterConfig extends HelixProperty { /** * Get maximum allowed error partitions for a resource to be load balanced. - * If limitation is set to negative number, Helix won't check error partition count before schedule load balance. + * If limitation is set to negative number, Helix won't check error partition count before + * schedule load balance. * @return the maximum allowed error partition count */ public int getErrorPartitionThresholdForLoadBalance() { - return _record.getIntField(ClusterConfigProperty.ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(), + return _record.getIntField( + ClusterConfigProperty.ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(), DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE); } /** * Set maximum allowed error partitions for a resource to be load balanced. - * If limitation is set to negative number, Helix won't check error partition count before schedule load balance. + * If limitation is set to negative number, Helix won't check error partition count before + * schedule load balance. * @param errorPartitionThreshold the maximum allowed error partition count */ public void setErrorPartitionThresholdForLoadBalance(int errorPartitionThreshold) { _record.setIntField(ClusterConfigProperty.ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(), errorPartitionThreshold); } + + /** + * Get the threshold for the number of partitions needing recovery or in error. Default value is set at + * Integer.MAX_VALUE to allow recovery rebalance and load rebalance to happen in the same pipeline + * cycle. If the number of partitions needing recovery is greater than this threshold, recovery + * balance will take precedence and load balance will not happen during this cycle. + * @return the threshold + */ + public int getErrorOrRecoveryPartitionThresholdForLoadBalance() { + return _record.getIntField( + ClusterConfigProperty.ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(), + DEFAULT_ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE); + } + + /** + * Set the threshold for the number of partitions needing recovery or in error. Default value is set at + * Integer.MAX_VALUE to allow recovery rebalance and load rebalance to happen in the same pipeline + * cycle. If the number of partitions needing recovery is greater than this threshold, recovery + * balance will take precedence and load balance will not happen during this cycle. + * @param recoveryPartitionThreshold + */ + public void setErrorOrRecoveryPartitionThresholdForLoadBalance(int recoveryPartitionThreshold) { + _record.setIntField(ClusterConfigProperty.ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(), + recoveryPartitionThreshold); + } + /** * Set the disabled instance list * @param disabledInstances @@ -522,20 +570,20 @@ public class ClusterConfig extends HelixProperty { public Map<String, Map<String, String>> getIdealStateRules() { Map<String, Map<String, String>> idealStateRuleMap = new HashMap<>(); - for (String simpleKey : getRecord().getSimpleFields().keySet()) { - if (simpleKey.startsWith(IDEAL_STATE_RULE_PREFIX)) { - String simpleValue = getRecord().getSimpleField(simpleKey); - String[] rules = simpleValue.split("(?<!\\\\),"); - Map<String, String> singleRule = Maps.newHashMap(); - for (String rule : rules) { - String[] keyValue = rule.split("(?<!\\\\)="); - if (keyValue.length >= 2) { - singleRule.put(keyValue[0], keyValue[1]); - } + for (String simpleKey : getRecord().getSimpleFields().keySet()) { + if (simpleKey.startsWith(IDEAL_STATE_RULE_PREFIX)) { + String simpleValue = getRecord().getSimpleField(simpleKey); + String[] rules = simpleValue.split("(?<!\\\\),"); + Map<String, String> singleRule = Maps.newHashMap(); + for (String rule : rules) { + String[] keyValue = rule.split("(?<!\\\\)="); + if (keyValue.length >= 2) { + singleRule.put(keyValue[0], keyValue[1]); } - idealStateRuleMap.put(simpleKey, singleRule); } + idealStateRuleMap.put(simpleKey, singleRule); } + } return idealStateRuleMap; } @@ -552,5 +600,4 @@ public class ClusterConfig extends HelixProperty { public String getClusterName() { return _record.getId(); } -} - +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java index 816b374..6b8795f 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java @@ -29,8 +29,10 @@ import org.testng.annotations.Test; import java.util.*; public class TestIntermediateStateCalcStage extends BaseStageTest { + private ClusterConfig _clusterConfig; - @Test public void testNoStateMissing() { + @Test + public void testNoStateMissing() { String resourcePrefix = "resource"; int nResource = 4; int nPartition = 2; @@ -43,12 +45,12 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { preSetup(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, resourceSet, nReplica, nReplica); - event.addAttribute(AttributeName.RESOURCES.name(), - getResourceMap(resourceSet.toArray(new String[resourceSet.size()]), nPartition, - "OnlineOffline")); - event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), - getResourceMap(resourceSet.toArray(new String[resourceSet.size()]), nPartition, - "OnlineOffline")); + event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap( + resourceSet.toArray(new String[resourceSet.size()]), nPartition, "OnlineOffline")); + event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), getResourceMap( + resourceSet.toArray(new String[resourceSet.size()]), nPartition, "OnlineOffline")); + + // Initialize bestpossible state and current state BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); @@ -56,11 +58,14 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { IntermediateStateOutput expectedResult = new IntermediateStateOutput(); + _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(1); + setClusterConfig(_clusterConfig); + for (String resource : resourceSet) { IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource)); setSingleIdealState(is); - Map<String, List<String>> partitionMap = new HashMap<String, List<String>>(); + Map<String, List<String>> partitionMap = new HashMap<>(); for (int p = 0; p < nPartition; p++) { Partition partition = new Partition(resource + "_" + p); for (int r = 0; r < nReplica; r++) { @@ -101,11 +106,50 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { } } else { currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE"); - currentStateOutput - .setCurrentState(resource, partition, instanceName + "-1", "OFFLINE"); + currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", + "OFFLINE"); // load balance is throttled, so keep all current states expectedResult.setState(resource, partition, instanceName, "ONLINE"); - expectedResult.setState(resource, partition, instanceName + "-1", "OFFLINE"); + // The following must be removed because now downward state transitions are allowed + // expectedResult.setState(resource, partition, instanceName + "-1", "OFFLINE"); + } + } else if (resource.endsWith("4")) { + // Test that partitions with replicas to drop are dropping them when recovery is + // happening for other partitions + if (p == 0) { + // This partition requires recovery + currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); + bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + // After recovery, it should be back ONLINE + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + } else { + // Other partitions require dropping of replicas + currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE"); + currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", + "OFFLINE"); + // BestPossibleState dictates that we only need one ONLINE replica + bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "DROPPED"); + // So instanceName-1 will NOT be expected to show up in expectedResult + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + expectedResult.setState(resource, partition, instanceName + "-1", "DROPPED"); + } + } else if (resource.endsWith("5")) { + // Test that load balance bringing up a new replica does NOT happen with a recovery + // partition + if (p == 0) { + // Set up a partition requiring recovery + currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); + bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + // After recovery, it should be back ONLINE + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + } else { + currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE"); + bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + // Check that load balance (bringing up a new node) did not take place + bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "ONLINE"); + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + } } } @@ -116,17 +160,92 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); runStage(event, new ReadClusterDataStage()); + runStage(event, new IntermediateStateCalcStage()); + + IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); + + for (String resource : resourceSet) { + // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare + // anything. + Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap() + .equals(expectedResult.getPartitionStateMap(resource).getStateMap())); + } + } + + @Test + public void testWithClusterConfigChange() { + String resourcePrefix = "resource"; + int nResource = 1; + int nPartition = 2; + int nReplica = 3; + + Set<String> resourceSet = new HashSet<>(); + for (int i = 0; i < nResource; i++) { + resourceSet.add(resourcePrefix + "_" + i); + } - // Keep update the current state. - for (int i = 0; i < resourceSet.size(); i++) { - runStage(event, new IntermediateStateCalcStage()); + preSetup(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, resourceSet, + nReplica, nReplica); + event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap( + resourceSet.toArray(new String[resourceSet.size()]), nPartition, "OnlineOffline")); + event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), getResourceMap( + resourceSet.toArray(new String[resourceSet.size()]), nPartition, "OnlineOffline")); + + // Initialize best possible state and current state + BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); + CurrentStateOutput currentStateOutput = new CurrentStateOutput(); + IntermediateStateOutput expectedResult = new IntermediateStateOutput(); + + for (String resource : resourceSet) { + IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource)); + setSingleIdealState(is); + + Map<String, List<String>> partitionMap = new HashMap<>(); + for (int p = 0; p < nPartition; p++) { + Partition partition = new Partition(resource + "_" + p); + for (int r = 0; r < nReplica; r++) { + String instanceName = HOSTNAME_PREFIX + r; + partitionMap.put(partition.getPartitionName(), Collections.singletonList(instanceName)); + if (resource.endsWith("0")) { + // Test that when the threshold is set at a number greater than the number of error and + // recovery partitions, load balance DOES take place + _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(Integer.MAX_VALUE); + setClusterConfig(_clusterConfig); + if (p == 0) { + // Set up a partition requiring recovery + currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); + bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + // After recovery, it should be back ONLINE + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + } else { + // Ensure we have at least one ONLINE replica so that this partition does not need + // recovery + currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE"); + bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + expectedResult.setState(resource, partition, instanceName, "ONLINE"); + + // This partition to bring up a replica (load balance will happen) + bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "ONLINE"); + expectedResult.setState(resource, partition, instanceName + "-1", "ONLINE"); + } + } + } + } + bestPossibleStateOutput.setPreferenceLists(resource, partitionMap); } + event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); + runStage(event, new ReadClusterDataStage()); + runStage(event, new IntermediateStateCalcStage()); + IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); + for (String resource : resourceSet) { - // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare anything. - Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap().equals( - expectedResult.getPartitionStateMap(resource).getStateMap())); + // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare + // anything. + Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap() + .equals(expectedResult.getPartitionStateMap(resource).getStateMap())); } } @@ -138,10 +257,10 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { setupLiveInstances(numOfLiveInstances); // Set up cluster configs - ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig()); + _clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig()); StateTransitionThrottleConfig throttleConfig = new StateTransitionThrottleConfig(rebalanceType, StateTransitionThrottleConfig.ThrottleScope.CLUSTER, Integer.MAX_VALUE); - clusterConfig.setStateTransitionThrottleConfigs(Collections.singletonList(throttleConfig)); - setClusterConfig(clusterConfig); + _clusterConfig.setStateTransitionThrottleConfigs(Collections.singletonList(throttleConfig)); + setClusterConfig(_clusterConfig); } } http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java new file mode 100644 index 0000000..a95b424 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java @@ -0,0 +1,199 @@ +package org.apache.helix.controller.stages; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.api.config.StateTransitionThrottleConfig; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Partition; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectReader; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class TestRecoveryLoadBalance extends BaseStageTest { + + private final String INPUT = "inputs"; + private final String CURRENT_STATE = "currentStates"; + private final String BEST_POSSIBLE_STATE = "bestPossibleStates"; + private final String EXPECTED_STATE = "expectedStates"; + private final String ERROR_OR_RECOVERY_PARTITION_THRESHOLD = + "errorOrRecoveryPartitionThresholdForLoadBalance"; + private final String STATE_MODEL = "statemodel"; + private ClusterConfig _clusterConfig; + + @Test(dataProvider = "recoveryLoadBalanceInput") + public void testRecoveryAndLoadBalance(String stateModelDef, + int errorOrRecoveryPartitionThresholdForLoadBalance, + Map<String, Map<String, Map<String, String>>> stateMapping) { + System.out.println("START TestRecoveryLoadBalance at " + new Date(System.currentTimeMillis())); + + String resourcePrefix = "resource"; + int nResource = 1; + int nPartition = 2; + int nReplica = 3; + + Set<String> resourceSet = new HashSet<>(); + for (int i = 0; i < nResource; i++) { + resourceSet.add(resourcePrefix + "_" + i); + } + + preSetup(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, resourceSet, nReplica, + nReplica, stateModelDef); + + _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance( + errorOrRecoveryPartitionThresholdForLoadBalance); + setClusterConfig(_clusterConfig); + + event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap( + resourceSet.toArray(new String[resourceSet.size()]), nPartition, stateModelDef)); + event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), getResourceMap( + resourceSet.toArray(new String[resourceSet.size()]), nPartition, stateModelDef)); + + // Initialize bestpossible state and current state + BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); + CurrentStateOutput currentStateOutput = new CurrentStateOutput(); + IntermediateStateOutput expectedResult = new IntermediateStateOutput(); + + for (String resource : resourceSet) { + IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource)); + setSingleIdealState(is); + + Map<String, List<String>> partitionMap = new HashMap<>(); + for (int p = 0; p < nPartition; p++) { + Partition partition = new Partition(resource + "_" + p); + + // Set input + for (int r = 0; r < stateMapping.get(partition.toString()).get(CURRENT_STATE).size(); r++) { + String instanceName = HOSTNAME_PREFIX + r; + currentStateOutput.setCurrentState(resource, partition, instanceName, + stateMapping.get(partition.toString()).get(CURRENT_STATE).get(instanceName)); + } + for (int r = 0; r < stateMapping.get(partition.toString()).get(BEST_POSSIBLE_STATE) + .size(); r++) { + String instanceName = HOSTNAME_PREFIX + r; + bestPossibleStateOutput.setState(resource, partition, instanceName, + stateMapping.get(partition.toString()).get(BEST_POSSIBLE_STATE).get(instanceName)); + } + for (int r = 0; r < stateMapping.get(partition.toString()).get(EXPECTED_STATE) + .size(); r++) { + String instanceName = HOSTNAME_PREFIX + r; + expectedResult.setState(resource, partition, instanceName, + stateMapping.get(partition.toString()).get(EXPECTED_STATE).get(instanceName)); + } + + // Set partitionMap + for (int r = 0; r < nReplica; r++) { + String instanceName = HOSTNAME_PREFIX + r; + partitionMap.put(partition.getPartitionName(), Collections.singletonList(instanceName)); + } + } + bestPossibleStateOutput.setPreferenceLists(resource, partitionMap); + } + + event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); + event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); + runStage(event, new ReadClusterDataStage()); + runStage(event, new IntermediateStateCalcStage()); + + IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); + + for (String resource : resourceSet) { + // For debugging purposes + // Object map1 = output.getPartitionStateMap(resource).getStateMap(); + // Object map2 = expectedResult.getPartitionStateMap(resource).getStateMap(); + + // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare + // anything. + Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap() + .equals(expectedResult.getPartitionStateMap(resource).getStateMap())); + } + + System.out.println("END TestRecoveryLoadBalance at " + new Date(System.currentTimeMillis())); + } + + @DataProvider(name = "recoveryLoadBalanceInput") + public Object[][] rebalanceStrategies() { + + try { + List<Object[]> data = new ArrayList<>(); + // Add data + data.addAll(loadTestInputs("TestRecoveryLoadBalance.OnlineOffline.json")); + data.addAll(loadTestInputs("TestRecoveryLoadBalance.MasterSlave.json")); + + Object[][] ret = new Object[data.size()][]; + for (int i = 0; i < data.size(); i++) { + ret[i] = data.get(i); + } + return ret; + } catch (Throwable e) { + return new Object[][] { + {} + }; + } + } + + public List<Object[]> loadTestInputs(String fileName) { + List<Object[]> ret = new ArrayList<>(); + InputStream inputStream = getClass().getClassLoader().getResourceAsStream(fileName); + try { + ObjectReader mapReader = new ObjectMapper().reader(List.class); + List<Map<String, Object>> inputList = mapReader.readValue(inputStream); + for (Map<String, Object> inputMap : inputList) { + String stateModelName = (String) inputMap.get(STATE_MODEL); + int threshold = (int) inputMap.get(ERROR_OR_RECOVERY_PARTITION_THRESHOLD); + Map<String, Map<String, Map<String, String>>> stateMapping = + (Map<String, Map<String, Map<String, String>>>) inputMap.get(INPUT); + ret.add(new Object[] { + stateModelName, threshold, stateMapping + }); + } + } catch (IOException e) { + e.printStackTrace(); + } + return ret; + } + + private void preSetup(StateTransitionThrottleConfig.RebalanceType rebalanceType, + Set<String> resourceSet, int numOfLiveInstances, int numOfReplicas, String stateModelName) { + setupIdealState(numOfLiveInstances, resourceSet.toArray(new String[resourceSet.size()]), + numOfLiveInstances, numOfReplicas, IdealState.RebalanceMode.FULL_AUTO, stateModelName); + setupStateModel(); + setupLiveInstances(numOfLiveInstances); + + // Set up cluster configs + _clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig()); + StateTransitionThrottleConfig throttleConfig = new StateTransitionThrottleConfig(rebalanceType, + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, Integer.MAX_VALUE); + _clusterConfig.setStateTransitionThrottleConfigs(Collections.singletonList(throttleConfig)); + setClusterConfig(_clusterConfig); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json ---------------------------------------------------------------------- diff --git a/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json b/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json new file mode 100644 index 0000000..0e79723 --- /dev/null +++ b/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json @@ -0,0 +1,257 @@ +[ + { + "statemodel": "MasterSlave", + "errorOrRecoveryPartitionThresholdForLoadBalance": 1, + "inputs": { + "resource_0_0": { + "currentStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE", + "localhost_3": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE", + "localhost_3": "DROPPED" + }, + "expectedStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE", + "localhost_3": "DROPPED" + } + }, + "resource_0_1": { + "currentStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "expectedStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + } + } + } + }, + { + "statemodel": "MasterSlave", + "errorOrRecoveryPartitionThresholdForLoadBalance": 2, + "inputs": { + "resource_0_0": { + "currentStates": { + "localhost_0": "MASTER", + "localhost_1": "ERROR", + "localhost_2": "SLAVE", + "localhost_3": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "MASTER", + "localhost_1": "ERROR", + "localhost_2": "SLAVE", + "localhost_3": "DROPPED" + }, + "expectedStates": { + "localhost_0": "MASTER", + "localhost_1": "ERROR", + "localhost_2": "SLAVE", + "localhost_3": "DROPPED" + } + }, + "resource_0_1": { + "currentStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "expectedStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + } + } + } + }, + { + "statemodel": "MasterSlave", + "errorOrRecoveryPartitionThresholdForLoadBalance": 1, + "inputs": { + "resource_0_0": { + "currentStates": { + "localhost_0": "MASTER", + "localhost_1": "ERROR", + "localhost_2": "SLAVE", + "localhost_3": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "MASTER", + "localhost_1": "ERROR", + "localhost_2": "SLAVE", + "localhost_3": "DROPPED" + }, + "expectedStates": { + "localhost_0": "MASTER", + "localhost_1": "ERROR", + "localhost_2": "SLAVE", + "localhost_3": "DROPPED" + } + }, + "resource_0_1": { + "currentStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "expectedStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + } + } + } + }, + { + "statemodel": "MasterSlave", + "errorOrRecoveryPartitionThresholdForLoadBalance": 1, + "inputs": { + "resource_0_0": { + "currentStates": { + "localhost_0": "MASTER", + "localhost_1": "OFFLINE", + "localhost_2": "OFFLINE", + "localhost_3": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE", + "localhost_3": "DROPPED" + }, + "expectedStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE", + "localhost_3": "DROPPED" + } + }, + "resource_0_1": { + "currentStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "expectedStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + } + } + } + }, + { + "statemodel": "MasterSlave", + "errorOrRecoveryPartitionThresholdForLoadBalance": 1, + "inputs": { + "resource_0_0": { + "currentStates": { + "localhost_0": "SLAVE", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "expectedStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + } + }, + "resource_0_1": { + "currentStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "expectedStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + } + } + } + }, + { + "statemodel": "MasterSlave", + "errorOrRecoveryPartitionThresholdForLoadBalance": 1, + "inputs": { + "resource_0_0": { + "currentStates": { + "localhost_0": "SLAVE", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "bestPossibleStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + }, + "expectedStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE" + } + }, + "resource_0_1": { + "currentStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE", + "localhost_3": "OFFLINE" + }, + "bestPossibleStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE", + "localhost_3": "SLAVE" + }, + "expectedStates": { + "localhost_0": "MASTER", + "localhost_1": "SLAVE", + "localhost_2": "SLAVE", + "localhost_3": "OFFLINE" + } + } + } + } +] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json ---------------------------------------------------------------------- diff --git a/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json b/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json new file mode 100644 index 0000000..d4677be --- /dev/null +++ b/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json @@ -0,0 +1,206 @@ +[ + { + "statemodel": "OnlineOffline", + "errorOrRecoveryPartitionThresholdForLoadBalance": 1, + "inputs": { + "resource_0_0": { + "currentStates": { + "localhost_0": "OFFLINE", + "localhost_1": "OFFLINE", + "localhost_2": "OFFLINE" + }, + "bestPossibleStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + }, + "expectedStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + } + }, + "resource_0_1": { + "currentStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + }, + "bestPossibleStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE", + "localhost_3": "ONLINE" + }, + "expectedStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + } + } + } + }, + { + "statemodel": "OnlineOffline", + "errorOrRecoveryPartitionThresholdForLoadBalance": 1, + "inputs": { + "resource_0_0": { + "currentStates": { + "localhost_0": "OFFLINE", + "localhost_1": "OFFLINE", + "localhost_2": "OFFLINE" + }, + "bestPossibleStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + }, + "expectedStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + } + }, + "resource_0_1": { + "currentStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE", + "localhost_3": "ONLINE" + }, + "bestPossibleStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE", + "localhost_3": "DROPPED" + }, + "expectedStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE", + "localhost_3": "DROPPED" + } + } + } + }, + { + "statemodel": "OnlineOffline", + "errorOrRecoveryPartitionThresholdForLoadBalance": 100, + "inputs": { + "resource_0_0": { + "currentStates": { + "localhost_0": "OFFLINE", + "localhost_1": "OFFLINE", + "localhost_2": "OFFLINE" + }, + "bestPossibleStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + }, + "expectedStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + } + }, + "resource_0_1": { + "currentStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + }, + "bestPossibleStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE", + "localhost_3": "ONLINE" + }, + "expectedStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE", + "localhost_3": "ONLINE" + } + } + } + }, + { + "statemodel": "OnlineOffline", + "errorOrRecoveryPartitionThresholdForLoadBalance": 1, + "inputs": { + "resource_0_0": { + "currentStates": { + "localhost_0": "OFFLINE", + "localhost_1": "OFFLINE", + "localhost_2": "OFFLINE" + }, + "bestPossibleStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + }, + "expectedStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + } + }, + "resource_0_1": { + "currentStates": { + "localhost_0": "ONLINE", + "localhost_1": "OFFLINE", + "localhost_2": "ONLINE" + }, + "bestPossibleStates": { + "localhost_0": "DROPPED", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + }, + "expectedStates": { + "localhost_0": "ONLINE", + "localhost_1": "OFFLINE", + "localhost_2": "ONLINE" + } + } + } + }, + { + "statemodel": "OnlineOffline", + "errorOrRecoveryPartitionThresholdForLoadBalance": 1, + "inputs": { + "resource_0_0": { + "currentStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE" + }, + "bestPossibleStates": { + "localhost_0": "DROPPED", + "localhost_1": "ONLINE", + "localhost_2": "OFFLINE" + }, + "expectedStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE" + } + }, + "resource_0_1": { + "currentStates": { + "localhost_0": "OFFLINE", + "localhost_1": "OFFLINE", + "localhost_2": "OFFLINE" + }, + "bestPossibleStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + }, + "expectedStates": { + "localhost_0": "ONLINE", + "localhost_1": "ONLINE", + "localhost_2": "ONLINE" + } + } + } + } +] \ No newline at end of file
