This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit f49986e24c27c49d8c877457753e32dcf8b540f4 Author: Junkai Xue <[email protected]> AuthorDate: Tue May 25 10:19:08 2021 -0700 [Replica Level Throttle] Make Pipeline in a correct order and fixes tests (#1750) * Make Pipeline in a correct order and fixes tests 1. Make pipeline running in a correct order to process computation 2. Add "DROPPED" case in the mapping to reflect the real case. 3. Fixes test cases. --- .../helix/controller/GenericHelixController.java | 4 +- .../stages/IntermediateStateCalcStage.java | 36 +- .../resource/ResourceMessageGenerationPhase.java | 7 +- .../helix/monitoring/mbeans/ResourceMonitor.java | 58 ++-- .../helix/controller/stages/BaseStageTest.java | 9 + .../stages/TestIntermediateStateCalcStage.java | 60 ++-- .../controller/stages/TestRebalancePipeline.java | 14 +- .../controller/stages/TestRecoveryLoadBalance.java | 225 ------------- .../stages/TestStateTransitionPriority.java | 95 ++++-- .../TestNoThrottleDisabledPartitions.java | 38 ++- .../controller/TestRedundantDroppedMessage.java | 7 +- .../messaging/p2pMessage/TestP2PMessages.java | 2 +- .../TestP2PMessagesAvoidDuplicatedMessage.java | 11 +- .../p2pMessage/TestP2PStateTransitionMessages.java | 6 +- .../TestP2PWithStateCancellationMessage.java | 11 +- .../monitoring/mbeans/TestRebalancerMetrics.java | 18 +- .../test/resources/TestPartitionLevelPriority.json | 2 +- .../TestRecoveryLoadBalance.MasterSlave.json | 367 --------------------- .../TestRecoveryLoadBalance.OnlineOffline.json | 206 ------------ 19 files changed, 233 insertions(+), 943 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 4dad60f..4baac38 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -513,12 +513,14 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns // rebalance pipeline Pipeline rebalancePipeline = new Pipeline(pipelineName); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); - rebalancePipeline.addStage(new IntermediateStateCalcStage()); // Need to add MaintenanceRecoveryStage here because MAX_PARTITIONS_PER_INSTANCE check could // only occur after IntermediateStateCalcStage calculation rebalancePipeline.addStage(new MaintenanceRecoveryStage()); rebalancePipeline.addStage(new ResourceMessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); + // The IntermediateStateCalcStage should be applied after message selection + // Messages are throttled already removed by IntermediateStateCalcStage in MessageSelection output + rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageThrottleStage()); rebalancePipeline.addStage(new ResourceMessageDispatchStage()); rebalancePipeline.addStage(new PersistAssignmentStage()); diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java index b91dca6..b06352d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java @@ -51,6 +51,7 @@ import org.apache.helix.model.Resource; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; import org.apache.helix.monitoring.mbeans.ResourceMonitor; +import org.apache.helix.participant.statemachine.StateModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -347,11 +348,10 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { Collections.sort(partitions, new PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(), currentStateOutput.getCurrentStateMap(resourceName), stateModelDef.getTopState())); for (Partition partition : partitions) { - List<Message> messagesToThrottle = new ArrayList<>(resourceMessageMap.get(partition)); - if (messagesToThrottle == null || messagesToThrottle.isEmpty()) { + if (resourceMessageMap.get(partition) == null || resourceMessageMap.get(partition).isEmpty()) { continue; } - + List<Message> messagesToThrottle = new ArrayList<>(resourceMessageMap.get(partition)); Map<String, String> derivedCurrentStateMap = currentStateOutput.getCurrentStateMap(resourceName, partition) .entrySet() .stream() @@ -468,6 +468,9 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { StateTransitionThrottleConfig.RebalanceType rebalanceType = getRebalanceTypePerMessage(requiredStates, message, currentStateMap); String currentState = currentStateMap.get(message.getTgtName()); + if (currentState == null) { + currentState = stateModelDefinition.getInitialState(); + } if (!message.getToState().equals(currentState) && message.getFromState().equals(currentState) && !cache.getDisabledInstancesForPartition(resourceName, partition.getPartitionName()) .contains(message.getTgtName())) { @@ -833,7 +836,10 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { // Higher priority for the partition with fewer replicas with states matching with IdealState int idealStateMatched1 = getIdealStateMatched(p1); int idealStateMatched2 = getIdealStateMatched(p2); - return Integer.compare(idealStateMatched1, idealStateMatched2); + if (idealStateMatched1 != idealStateMatched2) { + return Integer.compare(idealStateMatched1, idealStateMatched2); + } + return p1.getPartitionName().compareTo(p2.getPartitionName()); } private int getMissTopStateIndex(Partition partition) { @@ -889,17 +895,23 @@ public class IntermediateStateCalcStage extends AbstractBaseStage { private void computeIntermediateMap(PartitionStateMap intermediateStateMap, Map<Partition, Map<String, Message>> pendingMessageMap, Map<Partition, List<Message>> resourceMessageMap) { for (Map.Entry<Partition, Map<String, Message>> entry : pendingMessageMap.entrySet()) { - entry.getValue() - .entrySet() - .stream() - .forEach( - e -> intermediateStateMap.setState(entry.getKey(), e.getValue().getTgtName(), e.getValue().getToState())); + entry.getValue().entrySet().stream().forEach(e -> { + if (!e.getValue().getToState().equals(HelixDefinedState.DROPPED.name())) { + intermediateStateMap.setState(entry.getKey(), e.getValue().getTgtName(), e.getValue().getToState()); + } else { + intermediateStateMap.getStateMap().get(entry.getKey()).remove(e.getValue().getTgtName()); + } + }); } for (Map.Entry<Partition, List<Message>> entry : resourceMessageMap.entrySet()) { - entry.getValue() - .stream() - .forEach(e -> intermediateStateMap.setState(entry.getKey(), e.getTgtName(), e.getToState())); + entry.getValue().stream().forEach(e -> { + if (!e.getToState().equals(HelixDefinedState.DROPPED.name())) { + intermediateStateMap.setState(entry.getKey(), e.getTgtName(), e.getToState()); + } else { + intermediateStateMap.getStateMap().get(entry.getKey()).remove(e.getTgtName()); + } + }); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java index 7c4d1b7..73b309e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java @@ -20,6 +20,7 @@ package org.apache.helix.controller.stages.resource; */ import org.apache.helix.controller.stages.AttributeName; +import org.apache.helix.controller.stages.BestPossibleStateOutput; import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.controller.stages.IntermediateStateOutput; import org.apache.helix.controller.stages.MessageGenerationPhase; @@ -30,8 +31,8 @@ import org.apache.helix.controller.stages.MessageGenerationPhase; public class ResourceMessageGenerationPhase extends MessageGenerationPhase { @Override public void process(ClusterEvent event) throws Exception { - IntermediateStateOutput intermediateStateOutput = - event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); - processEvent(event, intermediateStateOutput); + BestPossibleStateOutput bestPossibleStateOutput = + event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); + processEvent(event, bestPossibleStateOutput); } } diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java index b659f9c..51d3321 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java @@ -60,10 +60,10 @@ public class ResourceMonitor extends DynamicMBeanProvider { private SimpleDynamicMetric<Long> _externalViewIdealStateDiff; private SimpleDynamicMetric<Long> _numLessMinActiveReplicaPartitions; private SimpleDynamicMetric<Long> _numLessReplicaPartitions; - private SimpleDynamicMetric<Long> _numPendingRecoveryRebalancePartitions; - private SimpleDynamicMetric<Long> _numPendingLoadRebalancePartitions; - private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledPartitions; - private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledPartitions; + private SimpleDynamicMetric<Long> _numPendingRecoveryRebalanceReplicas; + private SimpleDynamicMetric<Long> _numPendingLoadRebalanceReplicas; + private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledReplicas; + private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledReplicas; private SimpleDynamicMetric<Long> _numPendingStateTransitions; // Counters @@ -110,14 +110,16 @@ public class ResourceMonitor extends DynamicMBeanProvider { _dynamicCapacityMetricsMap = new ConcurrentHashMap<>(); _externalViewIdealStateDiff = new SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0L); - _numLoadRebalanceThrottledPartitions = - new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0L); - _numRecoveryRebalanceThrottledPartitions = - new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge", 0L); - _numPendingLoadRebalancePartitions = - new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0L); - _numPendingRecoveryRebalancePartitions = - new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0L); + _numPendingRecoveryRebalanceReplicas = + new SimpleDynamicMetric("PendingRecoveryRebalanceReplicaGauge", 0L); + _numLoadRebalanceThrottledReplicas = + new SimpleDynamicMetric("LoadRebalanceThrottledReplicaGauge", 0L); + _numRecoveryRebalanceThrottledReplicas = + new SimpleDynamicMetric("RecoveryRebalanceThrottledReplicaGauge", 0L); + _numPendingLoadRebalanceReplicas = + new SimpleDynamicMetric("PendingLoadRebalanceReplicaGauge", 0L); + _numPendingRecoveryRebalanceReplicas = + new SimpleDynamicMetric("PendingRecoveryRebalanceReplicaGauge", 0L); _numLessReplicaPartitions = new SimpleDynamicMetric("MissingReplicaPartitionGauge", 0L); _numLessMinActiveReplicaPartitions = new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0L); @@ -370,10 +372,10 @@ public class ResourceMonitor extends DynamicMBeanProvider { public void updateRebalancerStats(long numPendingRecoveryRebalancePartitions, long numPendingLoadRebalancePartitions, long numRecoveryRebalanceThrottledPartitions, long numLoadRebalanceThrottledPartitions) { - _numPendingRecoveryRebalancePartitions.updateValue(numPendingRecoveryRebalancePartitions); - _numPendingLoadRebalancePartitions.updateValue(numPendingLoadRebalancePartitions); - _numRecoveryRebalanceThrottledPartitions.updateValue(numRecoveryRebalanceThrottledPartitions); - _numLoadRebalanceThrottledPartitions.updateValue(numLoadRebalanceThrottledPartitions); + _numPendingRecoveryRebalanceReplicas.updateValue(numPendingRecoveryRebalancePartitions); + _numPendingLoadRebalanceReplicas.updateValue(numPendingLoadRebalancePartitions); + _numRecoveryRebalanceThrottledReplicas.updateValue(numRecoveryRebalanceThrottledPartitions); + _numLoadRebalanceThrottledReplicas.updateValue(numLoadRebalanceThrottledPartitions); } /** @@ -422,20 +424,20 @@ public class ResourceMonitor extends DynamicMBeanProvider { return _numLessReplicaPartitions.getValue(); } - public long getPendingRecoveryRebalancePartitionGauge() { - return _numPendingRecoveryRebalancePartitions.getValue(); + public long getNumPendingRecoveryRebalanceReplicas() { + return _numPendingRecoveryRebalanceReplicas.getValue(); } - public long getPendingLoadRebalancePartitionGauge() { - return _numPendingLoadRebalancePartitions.getValue(); + public long getNumPendingLoadRebalanceReplicas() { + return _numPendingLoadRebalanceReplicas.getValue(); } - public long getRecoveryRebalanceThrottledPartitionGauge() { - return _numRecoveryRebalanceThrottledPartitions.getValue(); + public long getNumRecoveryRebalanceThrottledReplicas() { + return _numRecoveryRebalanceThrottledReplicas.getValue(); } - public long getLoadRebalanceThrottledPartitionGauge() { - return _numLoadRebalanceThrottledPartitions.getValue(); + public long getNumLoadRebalanceThrottledReplicas() { + return _numLoadRebalanceThrottledReplicas.getValue(); } public long getNumPendingStateTransitionGauge() { @@ -461,10 +463,10 @@ public class ResourceMonitor extends DynamicMBeanProvider { _numNonTopStatePartitions, _numLessMinActiveReplicaPartitions, _numLessReplicaPartitions, - _numPendingRecoveryRebalancePartitions, - _numPendingLoadRebalancePartitions, - _numRecoveryRebalanceThrottledPartitions, - _numLoadRebalanceThrottledPartitions, + _numPendingRecoveryRebalanceReplicas, + _numPendingLoadRebalanceReplicas, + _numRecoveryRebalanceThrottledReplicas, + _numLoadRebalanceThrottledReplicas, _externalViewIdealStateDiff, _successfulTopStateHandoffDurationCounter, _successTopStateHandoffCounter, diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java index cc6b4df..2d4aa09 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java @@ -31,6 +31,7 @@ import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.model.Message; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.controller.pipeline.Stage; import org.apache.helix.controller.pipeline.StageContext; @@ -276,4 +277,12 @@ public class BaseStageTest { return resourceMap; } + + protected Message generateMessage(String fromState, String toState, String tgtName) { + Message message = new Message(new ZNRecord(UUID.randomUUID().toString())); + message.setTgtName(tgtName); + message.setFromState(fromState); + message.setToState(toState); + return message; + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java index 9584edb..7f17dea 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java @@ -51,15 +51,14 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { } preSetup(resources, nReplica, nReplica); - event.addAttribute(AttributeName.RESOURCES.name(), - getResourceMap(resources, nPartition, "OnlineOffline")); + event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(resources, nPartition, "OnlineOffline")); event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), getResourceMap(resources, nPartition, "OnlineOffline")); // Initialize bestpossible state and current state BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); CurrentStateOutput currentStateOutput = new CurrentStateOutput(); - + MessageOutput messageSelectOutput = new MessageOutput(); IntermediateStateOutput expectedResult = new IntermediateStateOutput(); _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(1); @@ -79,11 +78,11 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { // Regular recovery balance currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); // add blocked state transition messages - Message pendingMessage = new Message("customType", "001"); - pendingMessage.setToState("ONLINE"); + Message pendingMessage = generateMessage("OFFLINE", "ONLINE", instanceName); currentStateOutput.setPendingMessage(resource, partition, instanceName, pendingMessage); bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + // should be recovered: expectedResult.setState(resource, partition, instanceName, "ONLINE"); } else if (resource.endsWith("1")) { @@ -91,6 +90,8 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE"); currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE"); bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + messageSelectOutput.addMessage(resource, partition, + generateMessage("OFFLINE", "DROPPED", instanceName + "-1")); // should be recovered: expectedResult.setState(resource, partition, instanceName, "ONLINE"); } else if (resource.endsWith("2")) { @@ -110,14 +111,16 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { expectedResult.setState(resource, partition, instanceName, "ERROR"); } else { currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); + messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName)); // Recovery balance expectedResult.setState(resource, partition, instanceName, "ONLINE"); } } else { currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE"); - currentStateOutput - .setCurrentState(resource, partition, instanceName + "-1", "OFFLINE"); + currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE"); // load balance is throttled, so keep all current states + messageSelectOutput.addMessage(resource, partition, + generateMessage("OFFLINE", "DROPPED", instanceName + "-1")); expectedResult.setState(resource, partition, instanceName, "ONLINE"); // The following must be removed because now downward state transitions are allowed // expectedResult.setState(resource, partition, instanceName + "-1", "OFFLINE"); @@ -129,16 +132,18 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { // This partition requires recovery currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName)); // After recovery, it should be back ONLINE expectedResult.setState(resource, partition, instanceName, "ONLINE"); } else { // Other partitions require dropping of replicas currentStateOutput.setCurrentState(resource, partition, instanceName, "ONLINE"); - currentStateOutput - .setCurrentState(resource, partition, instanceName + "-1", "OFFLINE"); + currentStateOutput.setCurrentState(resource, partition, instanceName + "-1", "OFFLINE"); // BestPossibleState dictates that we only need one ONLINE replica bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "DROPPED"); + messageSelectOutput.addMessage(resource, partition, + generateMessage("OFFLINE", "DROPPED", instanceName + "-1")); // So instanceName-1 will NOT be expected to show up in expectedResult expectedResult.setState(resource, partition, instanceName, "ONLINE"); expectedResult.setState(resource, partition, instanceName + "-1", "DROPPED"); @@ -150,6 +155,7 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { // Set up a partition requiring recovery currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName)); // After recovery, it should be back ONLINE expectedResult.setState(resource, partition, instanceName, "ONLINE"); } else { @@ -157,6 +163,8 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); // Check that load balance (bringing up a new node) did not take place bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "ONLINE"); + messageSelectOutput.addMessage(resource, partition, + generateMessage("OFFLINE", "ONLINE", instanceName + "-1")); expectedResult.setState(resource, partition, instanceName, "ONLINE"); } } @@ -166,9 +174,9 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { } event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); + event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); - event.addAttribute(AttributeName.ControllerDataProvider.name(), - new ResourceControllerDataProvider()); + event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider()); runStage(event, new ReadClusterDataStage()); runStage(event, new IntermediateStateCalcStage()); @@ -177,7 +185,8 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { for (String resource : resources) { // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare // anything. - Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap() + Assert.assertTrue(output.getPartitionStateMap(resource) + .getStateMap() .equals(expectedResult.getPartitionStateMap(resource).getStateMap())); } } @@ -195,14 +204,13 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { } preSetup(resources, nReplica, nReplica); - event.addAttribute(AttributeName.RESOURCES.name(), - getResourceMap(resources, nPartition, - "OnlineOffline")); + event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(resources, nPartition, "OnlineOffline")); event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), getResourceMap(resources, nPartition, "OnlineOffline")); // Initialize best possible state and current state BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); + MessageOutput messageSelectOutput = new MessageOutput(); CurrentStateOutput currentStateOutput = new CurrentStateOutput(); IntermediateStateOutput expectedResult = new IntermediateStateOutput(); @@ -225,6 +233,7 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { // Set up a partition requiring recovery currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); bestPossibleStateOutput.setState(resource, partition, instanceName, "ONLINE"); + messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "ONLINE", instanceName)); // After recovery, it should be back ONLINE expectedResult.setState(resource, partition, instanceName, "ONLINE"); } else { @@ -236,6 +245,8 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { // This partition to bring up a replica (load balance will happen) bestPossibleStateOutput.setState(resource, partition, instanceName + "-1", "ONLINE"); + messageSelectOutput.addMessage(resource, partition, + generateMessage("OFFLINE", "ONLINE", instanceName + "-1")); expectedResult.setState(resource, partition, instanceName + "-1", "ONLINE"); } } @@ -246,8 +257,8 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); - event.addAttribute(AttributeName.ControllerDataProvider.name(), - new ResourceControllerDataProvider()); + event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput); + event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider()); runStage(event, new ReadClusterDataStage()); runStage(event, new IntermediateStateCalcStage()); @@ -256,7 +267,8 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { for (String resource : resources) { // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare // anything. - Assert.assertEquals(output.getPartitionStateMap(resource).getStateMap(), expectedResult.getPartitionStateMap(resource).getStateMap()); + Assert.assertEquals(output.getPartitionStateMap(resource).getStateMap(), + expectedResult.getPartitionStateMap(resource).getStateMap()); } } @@ -268,13 +280,11 @@ public class TestIntermediateStateCalcStage extends BaseStageTest { // Set up cluster configs _clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig()); - _clusterConfig.setStateTransitionThrottleConfigs(ImmutableList - .of(new StateTransitionThrottleConfig( - StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, - StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3), - new StateTransitionThrottleConfig( - StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, - StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3))); + _clusterConfig.setStateTransitionThrottleConfigs(ImmutableList.of( + new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3), + new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, + StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3))); setClusterConfig(_clusterConfig); } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java index 66c1771..d262b7b 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java @@ -97,9 +97,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase { rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); - rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new ResourceMessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); + rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageThrottleStage()); rebalancePipeline.addStage(new ResourceMessageDispatchStage()); @@ -133,9 +133,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase { Pipeline messagePipeline = new Pipeline(); messagePipeline.addStage(new BestPossibleStateCalcStage()); - messagePipeline.addStage(new IntermediateStateCalcStage()); messagePipeline.addStage(new ResourceMessageGenerationPhase()); messagePipeline.addStage(new MessageSelectionStage()); + messagePipeline.addStage(new IntermediateStateCalcStage()); messagePipeline.addStage(new MessageThrottleStage()); messagePipeline.addStage(new ResourceMessageDispatchStage()); @@ -157,7 +157,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { Thread.sleep(2 * MessageGenerationPhase.DEFAULT_OBSELETE_MSG_PURGE_DELAY); runPipeline(event, dataRefresh, false); - + // Verify the stale message should be deleted Assert.assertTrue(TestHelper.verify(() -> { if (dataCache.getStaleMessages().size() != 0) { @@ -334,9 +334,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase { rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); - rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new ResourceMessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); + rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageThrottleStage()); rebalancePipeline.addStage(new ResourceMessageDispatchStage()); @@ -431,9 +431,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase { rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); - rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new ResourceMessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); + rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageThrottleStage()); rebalancePipeline.addStage(new ResourceMessageDispatchStage()); @@ -510,9 +510,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase { rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); - rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new ResourceMessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); + rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageThrottleStage()); rebalancePipeline.addStage(new ResourceMessageDispatchStage()); @@ -590,9 +590,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase { rebalancePipeline.addStage(new ResourceComputationStage()); rebalancePipeline.addStage(new CurrentStateComputationStage()); rebalancePipeline.addStage(new BestPossibleStateCalcStage()); - rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new ResourceMessageGenerationPhase()); rebalancePipeline.addStage(new MessageSelectionStage()); + rebalancePipeline.addStage(new IntermediateStateCalcStage()); rebalancePipeline.addStage(new MessageThrottleStage()); rebalancePipeline.addStage(new ResourceMessageDispatchStage()); diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java deleted file mode 100644 index 32d8663..0000000 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java +++ /dev/null @@ -1,225 +0,0 @@ -package org.apache.helix.controller.stages; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; -import org.apache.helix.api.config.StateTransitionThrottleConfig; -import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; -import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.Partition; -import org.testng.Assert; -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -public class TestRecoveryLoadBalance extends BaseStageTest { - - private final String INPUT = "inputs"; - private final String MIN_ACTIVE_REPLICAS = "minActiveReplicas"; - private final String LOAD_BALANCE_THROTTLE = "loadBalanceThrottle"; - private final String CURRENT_STATE = "currentStates"; - private final String BEST_POSSIBLE_STATE = "bestPossibleStates"; - private final String EXPECTED_STATE = "expectedStates"; - private final String ERROR_OR_RECOVERY_PARTITION_THRESHOLD = - "errorOrRecoveryPartitionThresholdForLoadBalance"; - private final String STATE_MODEL = "statemodel"; - private ClusterConfig _clusterConfig; - - @Test(dataProvider = "recoveryLoadBalanceInput") - public void testRecoveryAndLoadBalance(String stateModelDef, - int errorOrRecoveryPartitionThresholdForLoadBalance, - Map<String, Map<String, Map<String, String>>> stateMapping, int minActiveReplicas, int loadBalanceThrottle) { - System.out.println("START TestRecoveryLoadBalance at " + new Date(System.currentTimeMillis())); - - String resourcePrefix = "resource"; - int nResource = 1; - int nPartition = stateMapping.size(); - int nReplica = 3; - - Set<String> resourceSet = new HashSet<>(); - for (int i = 0; i < nResource; i++) { - resourceSet.add(resourcePrefix + "_" + i); - } - - preSetup(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, resourceSet, nReplica, - nReplica, stateModelDef, minActiveReplicas); - - _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance( - errorOrRecoveryPartitionThresholdForLoadBalance); - if (loadBalanceThrottle >= 0) { - _clusterConfig.setStateTransitionThrottleConfigs(Arrays.asList( - new StateTransitionThrottleConfig( - StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, - StateTransitionThrottleConfig.ThrottleScope.RESOURCE, loadBalanceThrottle))); - } - setClusterConfig(_clusterConfig); - - event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap( - resourceSet.toArray(new String[resourceSet.size()]), nPartition, stateModelDef)); - event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), getResourceMap( - resourceSet.toArray(new String[resourceSet.size()]), nPartition, stateModelDef)); - event.addAttribute(AttributeName.ControllerDataProvider.name(), - new ResourceControllerDataProvider()); - - // Initialize bestpossible state and current state - BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); - CurrentStateOutput currentStateOutput = new CurrentStateOutput(); - IntermediateStateOutput expectedResult = new IntermediateStateOutput(); - - for (String resource : resourceSet) { - IdealState is = accessor.getProperty(accessor.keyBuilder().idealStates(resource)); - setSingleIdealState(is); - - Map<String, List<String>> partitionMap = new HashMap<>(); - for (int p = 0; p < nPartition; p++) { - Partition partition = new Partition(resource + "_" + p); - - // Set input - for (int r = 0; r < stateMapping.get(partition.toString()).get(CURRENT_STATE).size(); r++) { - String instanceName = HOSTNAME_PREFIX + r; - currentStateOutput.setCurrentState(resource, partition, instanceName, - stateMapping.get(partition.toString()).get(CURRENT_STATE).get(instanceName)); - } - for (int r = 0; r < stateMapping.get(partition.toString()).get(BEST_POSSIBLE_STATE) - .size(); r++) { - String instanceName = HOSTNAME_PREFIX + r; - bestPossibleStateOutput.setState(resource, partition, instanceName, - stateMapping.get(partition.toString()).get(BEST_POSSIBLE_STATE).get(instanceName)); - } - for (int r = 0; r < stateMapping.get(partition.toString()).get(EXPECTED_STATE) - .size(); r++) { - String instanceName = HOSTNAME_PREFIX + r; - expectedResult.setState(resource, partition, instanceName, - stateMapping.get(partition.toString()).get(EXPECTED_STATE).get(instanceName)); - } - - // Set partitionMap - for (int r = 0; r < nReplica; r++) { - String instanceName = HOSTNAME_PREFIX + r; - partitionMap.put(partition.getPartitionName(), Collections.singletonList(instanceName)); - } - } - bestPossibleStateOutput.setPreferenceLists(resource, partitionMap); - } - - event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); - event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); - runStage(event, new ReadClusterDataStage()); - runStage(event, new IntermediateStateCalcStage()); - - IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); - - for (String resource : resourceSet) { - // For debugging purposes - // Object map1 = output.getPartitionStateMap(resource).getStateMap(); - // Object map2 = expectedResult.getPartitionStateMap(resource).getStateMap(); - - // Note Assert.assertEquals won't work. If "actual" is an empty map, it won't compare - // anything. - Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap() - .equals(expectedResult.getPartitionStateMap(resource).getStateMap())); - } - - System.out.println("END TestRecoveryLoadBalance at " + new Date(System.currentTimeMillis())); - } - - @DataProvider(name = "recoveryLoadBalanceInput") - public Object[][] rebalanceStrategies() { - - try { - List<Object[]> data = new ArrayList<>(); - // Add data - data.addAll(loadTestInputs("TestRecoveryLoadBalance.OnlineOffline.json")); - data.addAll(loadTestInputs("TestRecoveryLoadBalance.MasterSlave.json")); - - Object[][] ret = new Object[data.size()][]; - for (int i = 0; i < data.size(); i++) { - ret[i] = data.get(i); - } - return ret; - } catch (Throwable e) { - return new Object[][] { - {} - }; - } - } - - public List<Object[]> loadTestInputs(String fileName) { - List<Object[]> ret = new ArrayList<>(); - InputStream inputStream = getClass().getClassLoader().getResourceAsStream(fileName); - try { - ObjectReader mapReader = new ObjectMapper().reader(List.class); - List<Map<String, Object>> inputList = mapReader.readValue(inputStream); - for (Map<String, Object> inputMap : inputList) { - String stateModelName = (String) inputMap.get(STATE_MODEL); - int threshold = (int) inputMap.get(ERROR_OR_RECOVERY_PARTITION_THRESHOLD); - int minActiveReplicas = -1; - if (inputMap.get(MIN_ACTIVE_REPLICAS) != null) { - minActiveReplicas = Integer.parseInt(inputMap.get(MIN_ACTIVE_REPLICAS).toString()); - } - int loadBalanceThrottle = -1; - if (inputMap.get(LOAD_BALANCE_THROTTLE) != null) { - loadBalanceThrottle = Integer.parseInt(inputMap.get(LOAD_BALANCE_THROTTLE).toString()); - } - Map<String, Map<String, Map<String, String>>> stateMapping = - (Map<String, Map<String, Map<String, String>>>) inputMap.get(INPUT); - ret.add(new Object[] { - stateModelName, threshold, stateMapping, minActiveReplicas, loadBalanceThrottle - }); - } - } catch (IOException e) { - e.printStackTrace(); - } - return ret; - } - - private void preSetup(StateTransitionThrottleConfig.RebalanceType rebalanceType, - Set<String> resourceSet, int numOfLiveInstances, int numOfReplicas, String stateModelName, - int minActiveReplica) { - setupIdealState(numOfLiveInstances, resourceSet.toArray(new String[resourceSet.size()]), - numOfLiveInstances, numOfReplicas, IdealState.RebalanceMode.FULL_AUTO, stateModelName, - DelayedAutoRebalancer.class.getName(), CrushEdRebalanceStrategy.class.getName(), - minActiveReplica); - setupStateModel(); - setupLiveInstances(numOfLiveInstances); - - // Set up cluster configs - _clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig()); - StateTransitionThrottleConfig throttleConfig = new StateTransitionThrottleConfig(rebalanceType, - StateTransitionThrottleConfig.ThrottleScope.CLUSTER, Integer.MAX_VALUE); - _clusterConfig.setStateTransitionThrottleConfigs(Collections.singletonList(throttleConfig)); - setClusterConfig(_clusterConfig); - } -} diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java index be817ef..22c20f7 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java @@ -62,6 +62,7 @@ public class TestStateTransitionPriority extends BaseStageTest { // Initialize bestpossible state and current state BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); + MessageOutput messageSelectOutput = new MessageOutput(); CurrentStateOutput currentStateOutput = new CurrentStateOutput(); for (String resource : resourceMap.keySet()) { @@ -76,11 +77,13 @@ public class TestStateTransitionPriority extends BaseStageTest { Collections.singletonList(instanceName)); bestPossibleStateOutput.setPreferenceLists(resource, partitionMap); bestPossibleStateOutput.setState(resource, partition, instanceName, "SLAVE"); + messageSelectOutput.addMessage(resource, partition, generateMessage("OFFLINE", "SLAVE", instanceName)); currentStateOutput.setCurrentState(resource, partition, instanceName, "OFFLINE"); } event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); + event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); runStage(event, new ReadClusterDataStage()); @@ -108,6 +111,7 @@ public class TestStateTransitionPriority extends BaseStageTest { // Initialize bestpossible state and current state BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); + MessageOutput messageSelectOutput = new MessageOutput(); CurrentStateOutput currentStateOutput = new CurrentStateOutput(); for (String resource : resourceMap.keySet()) { @@ -121,11 +125,13 @@ public class TestStateTransitionPriority extends BaseStageTest { String nextInstanceName = HOSTNAME_PREFIX + (Integer.parseInt(resource.split("_")[1]) + 1); partitionMap.put(partition.getPartitionName(), Collections.singletonList(nextInstanceName)); bestPossibleStateOutput.setPreferenceLists(resource, partitionMap); - bestPossibleStateOutput.setState(resource, partition, nextInstanceName, "MASTER"); + bestPossibleStateOutput.setState(resource, partition, instanceName, "MASTER"); + bestPossibleStateOutput.setState(resource, partition, nextInstanceName, "SLAVE"); currentStateOutput.setCurrentState(resource, partition, instanceName, "MASTER"); } event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); + event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), messageSelectOutput); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider()); @@ -134,8 +140,10 @@ public class TestStateTransitionPriority extends BaseStageTest { // Keep update the current state. List<String> resourcePriority = new ArrayList<String>(); for (int i = 0; i < resourceMap.size(); i++) { + event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), + generateMessageMapForResource(bestPossibleStateOutput, currentStateOutput, resourcePriority)); runStage(event, new IntermediateStateCalcStage()); - updateCurrentStatesForLoadBalance(resourcePriority, currentStateOutput); + updateCurrentStatesForLoadBalance(resourcePriority, currentStateOutput, bestPossibleStateOutput); } Assert.assertEquals(resourcePriority, expectedPriority); @@ -164,7 +172,6 @@ public class TestStateTransitionPriority extends BaseStageTest { Resource resource = new Resource(resourceName); BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); CurrentStateOutput currentStateOutput = new CurrentStateOutput(); - for (String partitionName : bestPossibleMap.keySet()) { Partition partition = new Partition(partitionName); bestPossibleStateOutput.setPreferenceList(resourceName, partitionName, preferenceList); @@ -173,6 +180,7 @@ public class TestStateTransitionPriority extends BaseStageTest { bestPossibleMap.get(partitionName).get(instanceName)); currentStateOutput.setCurrentState(resourceName, partition, instanceName, currentStateMap.get(partitionName).get(instanceName)); + } resource.addPartition(partitionName); } @@ -183,6 +191,8 @@ public class TestStateTransitionPriority extends BaseStageTest { event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), Collections.singletonMap(resourceName, resource)); event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); + event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), + generateMessageMapForPartition(bestPossibleMap, currentStateMap, Collections.emptyList(), resourceName)); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); event.addAttribute(AttributeName.ControllerDataProvider.name(), new ResourceControllerDataProvider()); @@ -191,9 +201,10 @@ public class TestStateTransitionPriority extends BaseStageTest { // Keep update the current state. List<String> partitionPriority = new ArrayList<String>(); for (int i = 0; i < bestPossibleMap.size(); i++) { + event.addAttribute(AttributeName.MESSAGES_SELECTED.name(), + generateMessageMapForPartition(bestPossibleMap, currentStateMap, partitionPriority, resourceName)); runStage(event, new IntermediateStateCalcStage()); - updateCurrentStateForPartitionLevelPriority(partitionPriority, currentStateOutput, - resourceName, bestPossibleMap); + updateCurrentStateForPartitionLevelPriority(partitionPriority, currentStateOutput, resourceName, bestPossibleMap); } Assert.assertEquals(partitionPriority, expectedPriority); @@ -287,25 +298,52 @@ public class TestStateTransitionPriority extends BaseStageTest { } } - private void updateCurrentStatesForLoadBalance(List<String> resourcePriority, - CurrentStateOutput currentStateOutput) { - IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); - for (PartitionStateMap partitionStateMap : output.getResourceStatesMap().values()) { - String resourceName = partitionStateMap.getResourceName(); + private void updateCurrentStatesForLoadBalance(List<String> resourcePriority, CurrentStateOutput currentStateOutput, + BestPossibleStateOutput bestPossibleStateOutput) { + MessageOutput output = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); + for (String resourceName : bestPossibleStateOutput.getResourceStatesMap().keySet()) { Partition partition = new Partition(resourceName + "_0"); - String oldInstance = HOSTNAME_PREFIX + resourceName.split("_")[1]; - String expectedInstance = - HOSTNAME_PREFIX + (Integer.parseInt(resourceName.split("_")[1]) + 1); - if (partitionStateMap.getPartitionMap(partition).containsKey(expectedInstance) - && !resourcePriority.contains(resourceName)) { - currentStateOutput.getCurrentStateMap(resourceName, partition).remove(oldInstance); - updateCurrentOutput(resourcePriority, currentStateOutput, resourceName, partition, - expectedInstance, "MASTER"); + if (output.getResourceMessageMap(resourceName).get(partition) != null + && output.getResourceMessageMap(resourceName).get(partition).size() > 0) { + String nextInstanceName = HOSTNAME_PREFIX + (Integer.parseInt(resourceName.split("_")[1]) + 1); + currentStateOutput.setCurrentState(resourceName, partition, nextInstanceName, "SLAVE"); + resourcePriority.add(resourceName); break; } } } + private MessageOutput generateMessageMapForResource(BestPossibleStateOutput bestPossibleStateOutput, + CurrentStateOutput currentStateOutput, List<String> resourcePrirority) { + MessageOutput messageSelectOutput = new MessageOutput(); + for (String resource : bestPossibleStateOutput.getResourceStatesMap().keySet()) { + if (!resourcePrirority.contains(resource) && !bestPossibleStateOutput.getPartitionStateMap(resource) + .getStateMap() + .equals(currentStateOutput.getCurrentStateMap(resource))) { + messageSelectOutput.addMessage(resource, new Partition(resource + "_0"), + generateMessage("OFFLINE", "SLAVE", (HOSTNAME_PREFIX + (Integer.parseInt(resource.split("_")[1]) + 1)))); + } + } + return messageSelectOutput; + } + + private MessageOutput generateMessageMapForPartition(Map<String, Map<String, String>> bestPossibleMap, + Map<String, Map<String, String>> currentStateMap, List<String> partitionPriority, String resourceName) { + MessageOutput messageSelectOutput = new MessageOutput(); + for (String partitionName : bestPossibleMap.keySet()) { + for (String instanceName : bestPossibleMap.get(partitionName).keySet()) { + if (!partitionPriority.contains(partitionName) && !bestPossibleMap.get(partitionName) + .get(instanceName) + .equals(currentStateMap.get(partitionName).get(instanceName))) { + messageSelectOutput.addMessage(resourceName, new Partition(partitionName), + generateMessage(currentStateMap.get(partitionName).get(instanceName), + bestPossibleMap.get(partitionName).get(instanceName), instanceName)); + } + } + } + return messageSelectOutput; + } + private void updateCurrentOutput(List<String> resourcePriority, CurrentStateOutput currentStateOutput, String resourceName, Partition partition, String instanceName, String state) { @@ -317,19 +355,14 @@ public class TestStateTransitionPriority extends BaseStageTest { private void updateCurrentStateForPartitionLevelPriority(List<String> partitionPriority, CurrentStateOutput currentStateOutput, String resourceName, Map<String, Map<String, String>> bestPossibleMap) { - IntermediateStateOutput output = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); - PartitionStateMap partitionStateMap = output.getPartitionStateMap(resourceName); - for (Partition partition : partitionStateMap.getStateMap().keySet()) { - Map<String, String> instanceStateMap = bestPossibleMap.get(partition.getPartitionName()); - if (partitionStateMap.getPartitionMap(partition).equals(instanceStateMap) - && !partitionPriority.contains(partition.getPartitionName())) { - partitionPriority.add(partition.getPartitionName()); - for (String instanceName : instanceStateMap.keySet()) { - currentStateOutput.setCurrentState(resourceName, partition, instanceName, - instanceStateMap.get(instanceName)); - } - break; + MessageOutput output = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); + output.getResourceMessageMap(resourceName).entrySet().stream().filter(e -> e.getValue().size() > 0).forEach(e -> { + partitionPriority.add(e.getKey().toString()); + + for (String instanceName : bestPossibleMap.get(e.getKey().toString()).keySet()) { + currentStateOutput.setCurrentState(resourceName, e.getKey(), instanceName, + bestPossibleMap.get(e.getKey().toString()).get(instanceName)); } - } + }); } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java index df6bc02..98dd281 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java @@ -69,9 +69,9 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { setupEnvironment(participantCount); // Set the throttling only for load balance - setThrottleConfigForLoadBalance(); + setThrottleConfigForLoadBalance(1); - // Disable instance 0 so that it will cause a partition to do a recovery balance + // Disable instance 0 so that it will cause a partition to do a load balance PropertyKey key = _accessor.keyBuilder().instanceConfig(_participants[0].getInstanceName()); InstanceConfig instanceConfig = _accessor.getProperty(key); instanceConfig.setInstanceEnabled(false); @@ -261,7 +261,7 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { public void testNoThrottleOnDisabledPartition() throws Exception { int participantCount = 3; setupEnvironment(participantCount); - setThrottleConfig(); + setThrottleConfig(3); // Convert partition to replica mapping should be 1 -> 3 // Disable a partition so that it will not be subject to throttling String partitionName = _resourceName + "0_0"; @@ -292,8 +292,9 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { controller.syncStart(); Thread.sleep(500L); + // The throttle quota will be consumed by first partition with all for (MockParticipantManager participantManager : _participants) { - Assert.assertTrue(verifyTwoMessages(participantManager)); + Assert.assertTrue(verifySingleMessage(participantManager)); } // clean up the cluster @@ -339,11 +340,14 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { controller.syncStop(); } + private void setThrottleConfig() { + setThrottleConfig(1); + } /** * Set all throttle configs at 1 so that we could test by observing the number of ongoing * transitions. */ - private void setThrottleConfig() { + private void setThrottleConfig(int maxReplicas) { PropertyKey.Builder keyBuilder = _accessor.keyBuilder(); ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig()); @@ -353,27 +357,31 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { // Add throttling at cluster-level throttleConfigs.add(new StateTransitionThrottleConfig( StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, - StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1)); + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas)); throttleConfigs.add( new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, - StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1)); + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas)); throttleConfigs .add(new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY, - StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1)); + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas)); // Add throttling at instance level throttleConfigs .add(new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY, - StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 1)); + StateTransitionThrottleConfig.ThrottleScope.INSTANCE, maxReplicas)); clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs); _accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig); } + private void setThrottleConfigForLoadBalance() { + setThrottleConfigForLoadBalance(0); + } + /** * Set throttle limits only for load balance so that none of them would happen. */ - private void setThrottleConfigForLoadBalance() { + private void setThrottleConfigForLoadBalance(int maxReplicas) { PropertyKey.Builder keyBuilder = _accessor.keyBuilder(); ClusterConfig clusterConfig = _accessor.getProperty(_accessor.keyBuilder().clusterConfig()); @@ -383,12 +391,12 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { // Add throttling at cluster-level throttleConfigs.add( new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, - StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 0)); + StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas)); // Add throttling at instance level throttleConfigs.add( new StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, - StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 0)); + StateTransitionThrottleConfig.ThrottleScope.INSTANCE, maxReplicas)); clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs); _accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig); @@ -481,15 +489,15 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { } /** - * Ensure that there are 2 messages for a given Participant. + * Ensure that there are 1 messages for a given Participant. * @param participant * @return */ - private boolean verifyTwoMessages(final MockParticipantManager participant) { + private boolean verifySingleMessage(final MockParticipantManager participant) { PropertyKey key = _accessor.keyBuilder().messages(participant.getInstanceName()); List<String> messageNames = _accessor.getChildNames(key); if (messageNames != null) { - return messageNames.size() == 2; + return messageNames.size() == 1; } return false; } diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java index 3bd9e45..9539b98 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java @@ -31,6 +31,7 @@ import org.apache.helix.controller.stages.ClusterEventType; import org.apache.helix.controller.stages.CurrentStateComputationStage; import org.apache.helix.controller.stages.IntermediateStateCalcStage; import org.apache.helix.controller.stages.MessageOutput; +import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.ResourceComputationStage; import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase; import org.apache.helix.model.IdealState; @@ -75,11 +76,13 @@ public class TestRedundantDroppedMessage extends TaskSynchronizedTestBase { runStage(event, new ResourceComputationStage()); runStage(event, new CurrentStateComputationStage()); runStage(event, new BestPossibleStateCalcStage()); - runStage(event, new IntermediateStateCalcStage()); Assert.assertEquals(cache.getCachedIdealMapping().size(), 1); runStage(event, new ResourceMessageGenerationPhase()); + runStage(event, new MessageSelectionStage()); + runStage(event, new IntermediateStateCalcStage()); + - MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_ALL.name()); + MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); Assert .assertEquals(messageOutput.getMessages(resourceName, new Partition(partitionName)).size(), 1); diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java index 4f5f60c..1f1d7bd 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java @@ -100,9 +100,9 @@ public class TestP2PMessages extends BaseStageTest { _fullPipeline.addStage(new ResourceComputationStage()); _fullPipeline.addStage(new CurrentStateComputationStage()); _fullPipeline.addStage(new BestPossibleStateCalcStage()); - _fullPipeline.addStage(new IntermediateStateCalcStage()); _fullPipeline.addStage(new ResourceMessageGenerationPhase()); _fullPipeline.addStage(new MessageSelectionStage()); + _fullPipeline.addStage(new IntermediateStateCalcStage()); _fullPipeline.addStage(new MessageThrottleStage()); _fullPipeline.addStage(new ResourceMessageDispatchStage()); diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java index 56e1f5a..1d164c5 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java @@ -87,14 +87,15 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest { _fullPipeline = new Pipeline("FullPipeline"); _fullPipeline.addStage(new ReadClusterDataStage()); _fullPipeline.addStage(new BestPossibleStateCalcStage()); - _fullPipeline.addStage(new IntermediateStateCalcStage()); _fullPipeline.addStage(new ResourceMessageGenerationPhase()); _fullPipeline.addStage(new MessageSelectionStage()); + _fullPipeline.addStage(new IntermediateStateCalcStage()); _fullPipeline.addStage(new MessageThrottleStage()); _messagePipeline = new Pipeline("MessagePipeline"); _messagePipeline.addStage(new ResourceMessageGenerationPhase()); _messagePipeline.addStage(new MessageSelectionStage()); + _messagePipeline.addStage(new IntermediateStateCalcStage()); _messagePipeline.addStage(new MessageThrottleStage()); @@ -126,7 +127,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest { _fullPipeline.handle(event); - _bestpossibleState = event.getAttribute(AttributeName.INTERMEDIATE_STATE.name()); + _bestpossibleState = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name()); MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name()); @@ -217,7 +218,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest { event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); - event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState); + event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), _bestpossibleState); _messagePipeline.handle(event); @@ -273,7 +274,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest { instanceStateMap.put(thirdMaster, "MASTER"); _bestpossibleState.setState(_db, _partition, instanceStateMap); - event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState); + event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), _bestpossibleState); _messagePipeline.handle(event); @@ -290,7 +291,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends BaseStageTest { currentStateOutput.setPendingMessage(_db, _partition, secondMaster, relayMessage); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); - event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), _bestpossibleState); + event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), _bestpossibleState); _messagePipeline.handle(event); diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java index bf53682..cec0d2b 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java @@ -199,9 +199,9 @@ public class TestP2PStateTransitionMessages extends BaseStageTest { event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); pipeline = new Pipeline("test"); - pipeline.addStage(new IntermediateStateCalcStage()); pipeline.addStage(new ResourceMessageGenerationPhase()); pipeline.addStage(new MessageSelectionStage()); + pipeline.addStage(new IntermediateStateCalcStage()); pipeline.addStage(new MessageThrottleStage()); pipeline.handle(event); @@ -223,9 +223,9 @@ public class TestP2PStateTransitionMessages extends BaseStageTest { pipeline = new Pipeline("test"); - pipeline.addStage(new IntermediateStateCalcStage()); pipeline.addStage(new ResourceMessageGenerationPhase()); pipeline.addStage(new MessageSelectionStage()); + pipeline.addStage(new IntermediateStateCalcStage()); pipeline.addStage(new MessageThrottleStage()); pipeline.handle(event); @@ -359,9 +359,9 @@ public class TestP2PStateTransitionMessages extends BaseStageTest { Pipeline pipeline = new Pipeline("test"); pipeline.addStage(new ReadClusterDataStage()); pipeline.addStage(new BestPossibleStateCalcStage()); - pipeline.addStage(new IntermediateStateCalcStage()); pipeline.addStage(new ResourceMessageGenerationPhase()); pipeline.addStage(new MessageSelectionStage()); + pipeline.addStage(new IntermediateStateCalcStage()); pipeline.addStage(new MessageThrottleStage()); return pipeline; diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java index a8191cb..f685bf4 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java @@ -31,6 +31,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.BaseStageTest; +import org.apache.helix.controller.stages.BestPossibleStateOutput; import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.controller.stages.ClusterEventType; import org.apache.helix.controller.stages.CurrentStateOutput; @@ -166,11 +167,11 @@ public class TestP2PWithStateCancellationMessage extends BaseStageTest { currentStateOutput.setCurrentState(RESOURCE_NAME, new Partition("1"), "localhost_2", "MASTER"); event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput); - IntermediateStateOutput intermediateStateOutput = new IntermediateStateOutput(); - intermediateStateOutput.setState(RESOURCE_NAME, new Partition("0"), "localhost_1", "SLAVE"); - intermediateStateOutput.setState(RESOURCE_NAME, new Partition("0"), "localhost_2", "MASTER"); - intermediateStateOutput.setState(RESOURCE_NAME, new Partition("1"), "localhost_2", "MASTER"); - event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(), intermediateStateOutput); + BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput(); + bestPossibleStateOutput.setState(RESOURCE_NAME, new Partition("0"), "localhost_1", "SLAVE"); + bestPossibleStateOutput.setState(RESOURCE_NAME, new Partition("0"), "localhost_2", "MASTER"); + bestPossibleStateOutput.setState(RESOURCE_NAME, new Partition("1"), "localhost_2", "MASTER"); + event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), bestPossibleStateOutput); return event; } diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java index 43bc8b1..ccc1431 100644 --- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java +++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java @@ -13,7 +13,9 @@ import org.apache.helix.controller.stages.BestPossibleStateCalcStage; import org.apache.helix.controller.stages.BestPossibleStateOutput; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.controller.stages.IntermediateStateCalcStage; +import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.ReadClusterDataStage; +import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase; import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; @@ -77,14 +79,17 @@ public class TestRebalancerMetrics extends BaseStageTest { setupThrottleConfig(cache.getClusterConfig(), StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, maxPending); runStage(event, new BestPossibleStateCalcStage()); + runStage(event, new ResourceMessageGenerationPhase()); + runStage(event, new MessageSelectionStage()); runStage(event, new IntermediateStateCalcStage()); ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); ResourceMonitor resourceMonitor = clusterStatusMonitor.getResourceMonitor(resource); - Assert.assertEquals(resourceMonitor.getPendingRecoveryRebalancePartitionGauge(), numPartition); - Assert.assertEquals(resourceMonitor.getRecoveryRebalanceThrottledPartitionGauge(), - numPartition - maxPending); + Assert.assertEquals(resourceMonitor.getNumPendingRecoveryRebalanceReplicas(), + numPartition * numReplica - resourceMonitor.getNumPendingLoadRebalanceReplicas()); + Assert.assertEquals(resourceMonitor.getNumRecoveryRebalanceThrottledReplicas(), + numPartition * numReplica - resourceMonitor.getNumPendingLoadRebalanceReplicas() - maxPending); System.out .println("END testRecoveryRebalanceMetrics at " + new Date(System.currentTimeMillis())); @@ -135,15 +140,16 @@ public class TestRebalancerMetrics extends BaseStageTest { setupThrottleConfig(cache.getClusterConfig(), StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, maxPending); runStage(event, new BestPossibleStateCalcStage()); + runStage(event, new ResourceMessageGenerationPhase()); + runStage(event, new MessageSelectionStage()); runStage(event, new IntermediateStateCalcStage()); ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); ResourceMonitor resourceMonitor = clusterStatusMonitor.getResourceMonitor(resource); - long numPendingLoadBalance = resourceMonitor.getPendingLoadRebalancePartitionGauge(); + long numPendingLoadBalance = resourceMonitor.getNumPendingLoadRebalanceReplicas(); Assert.assertTrue(numPendingLoadBalance > 0); - Assert.assertEquals(resourceMonitor.getLoadRebalanceThrottledPartitionGauge(), - numPendingLoadBalance - maxPending); + Assert.assertEquals(resourceMonitor.getNumLoadRebalanceThrottledReplicas(), numPendingLoadBalance - maxPending); System.out .println("END testLoadBalanceMetrics at " + new Date(System.currentTimeMillis())); diff --git a/helix-core/src/test/resources/TestPartitionLevelPriority.json b/helix-core/src/test/resources/TestPartitionLevelPriority.json index 125277d..5e42d2d 100644 --- a/helix-core/src/test/resources/TestPartitionLevelPriority.json +++ b/helix-core/src/test/resources/TestPartitionLevelPriority.json @@ -143,8 +143,8 @@ "Partition_0", "Partition_3", "Partition_2", - "Partition_1", "Partition_4", + "Partition_1", "Partition_5" ] } diff --git a/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json b/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json deleted file mode 100644 index 077f129..0000000 --- a/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json +++ /dev/null @@ -1,367 +0,0 @@ -[ - { - "statemodel": "MasterSlave", - "errorOrRecoveryPartitionThresholdForLoadBalance": 0, - "inputs": { - "resource_0_0": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE", - "localhost_3": "SLAVE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE", - "localhost_3": "DROPPED" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE", - "localhost_3": "DROPPED" - } - }, - "resource_0_1": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - } - } - } - }, - { - "statemodel": "MasterSlave", - "errorOrRecoveryPartitionThresholdForLoadBalance": 1, - "inputs": { - "resource_0_0": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "ERROR", - "localhost_2": "SLAVE", - "localhost_3": "SLAVE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "ERROR", - "localhost_2": "SLAVE", - "localhost_3": "DROPPED" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "ERROR", - "localhost_2": "SLAVE", - "localhost_3": "DROPPED" - } - }, - "resource_0_1": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - } - } - } - }, - { - "statemodel": "MasterSlave", - "errorOrRecoveryPartitionThresholdForLoadBalance": 0, - "inputs": { - "resource_0_0": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "ERROR", - "localhost_2": "SLAVE", - "localhost_3": "SLAVE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "ERROR", - "localhost_2": "SLAVE", - "localhost_3": "DROPPED" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "ERROR", - "localhost_2": "SLAVE", - "localhost_3": "DROPPED" - } - }, - "resource_0_1": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - } - } - } - }, - { - "statemodel": "MasterSlave", - "errorOrRecoveryPartitionThresholdForLoadBalance": 0, - "inputs": { - "resource_0_0": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "OFFLINE", - "localhost_2": "OFFLINE", - "localhost_3": "SLAVE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE", - "localhost_3": "DROPPED" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE", - "localhost_3": "DROPPED" - } - }, - "resource_0_1": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - } - } - } - }, - { - "statemodel": "MasterSlave", - "errorOrRecoveryPartitionThresholdForLoadBalance": 0, - "inputs": { - "resource_0_0": { - "currentStates": { - "localhost_0": "SLAVE", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - } - }, - "resource_0_1": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - } - } - } - }, - { - "statemodel": "MasterSlave", - "errorOrRecoveryPartitionThresholdForLoadBalance": 0, - "inputs": { - "resource_0_0": { - "currentStates": { - "localhost_0": "SLAVE", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - } - }, - "resource_0_1": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE", - "localhost_3": "OFFLINE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE", - "localhost_3": "SLAVE" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE", - "localhost_3": "OFFLINE" - } - } - } - }, - { - "statemodel": "MasterSlave", - "errorOrRecoveryPartitionThresholdForLoadBalance": 0, - "loadBalanceThrottle": "0", - "minActiveReplica": "2", - "inputs": { - "resource_0_0": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "OFFLINE", - "localhost_3": "OFFLINE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE", - "localhost_3": "DROPPED" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "OFFLINE", - "localhost_3": "OFFLINE" - } - }, - "resource_0_1": { - "currentStates": { - "localhost_0": "SLAVE", - "localhost_1": "SLAVE", - "localhost_2": "OFFLINE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - } - } - } - }, - { - "statemodel": "MasterSlave", - "errorOrRecoveryPartitionThresholdForLoadBalance": 0, - "loadBalanceThrottle": "2", - "minActiveReplica": "2", - "inputs": { - "resource_0_0": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE", - "localhost_3": "OFFLINE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "OFFLINE", - "localhost_3": "DROPPED" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "OFFLINE", - "localhost_3": "DROPPED" - } - }, - "resource_0_1": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "OFFLINE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE" - } - }, - "resource_0_2": { - "currentStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "OFFLINE", - "localhost_3": "OFFLINE" - }, - "bestPossibleStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "SLAVE", - "localhost_3": "SLAVE" - }, - "expectedStates": { - "localhost_0": "MASTER", - "localhost_1": "SLAVE", - "localhost_2": "OFFLINE", - "localhost_3": "OFFLINE" - } - } - } - } -] diff --git a/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json b/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json deleted file mode 100644 index 389d90e..0000000 --- a/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json +++ /dev/null @@ -1,206 +0,0 @@ -[ - { - "statemodel": "OnlineOffline", - "errorOrRecoveryPartitionThresholdForLoadBalance": 0, - "inputs": { - "resource_0_0": { - "currentStates": { - "localhost_0": "OFFLINE", - "localhost_1": "OFFLINE", - "localhost_2": "OFFLINE" - }, - "bestPossibleStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - }, - "expectedStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - } - }, - "resource_0_1": { - "currentStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - }, - "bestPossibleStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE", - "localhost_3": "ONLINE" - }, - "expectedStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - } - } - } - }, - { - "statemodel": "OnlineOffline", - "errorOrRecoveryPartitionThresholdForLoadBalance": 0, - "inputs": { - "resource_0_0": { - "currentStates": { - "localhost_0": "OFFLINE", - "localhost_1": "OFFLINE", - "localhost_2": "OFFLINE" - }, - "bestPossibleStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - }, - "expectedStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - } - }, - "resource_0_1": { - "currentStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE", - "localhost_3": "ONLINE" - }, - "bestPossibleStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE", - "localhost_3": "DROPPED" - }, - "expectedStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE", - "localhost_3": "DROPPED" - } - } - } - }, - { - "statemodel": "OnlineOffline", - "errorOrRecoveryPartitionThresholdForLoadBalance": 100, - "inputs": { - "resource_0_0": { - "currentStates": { - "localhost_0": "OFFLINE", - "localhost_1": "OFFLINE", - "localhost_2": "OFFLINE" - }, - "bestPossibleStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - }, - "expectedStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - } - }, - "resource_0_1": { - "currentStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - }, - "bestPossibleStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE", - "localhost_3": "ONLINE" - }, - "expectedStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE", - "localhost_3": "ONLINE" - } - } - } - }, - { - "statemodel": "OnlineOffline", - "errorOrRecoveryPartitionThresholdForLoadBalance": 0, - "inputs": { - "resource_0_0": { - "currentStates": { - "localhost_0": "OFFLINE", - "localhost_1": "OFFLINE", - "localhost_2": "OFFLINE" - }, - "bestPossibleStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - }, - "expectedStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - } - }, - "resource_0_1": { - "currentStates": { - "localhost_0": "ONLINE", - "localhost_1": "OFFLINE", - "localhost_2": "ONLINE" - }, - "bestPossibleStates": { - "localhost_0": "DROPPED", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - }, - "expectedStates": { - "localhost_0": "ONLINE", - "localhost_1": "OFFLINE", - "localhost_2": "ONLINE" - } - } - } - }, - { - "statemodel": "OnlineOffline", - "errorOrRecoveryPartitionThresholdForLoadBalance": 0, - "inputs": { - "resource_0_0": { - "currentStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE" - }, - "bestPossibleStates": { - "localhost_0": "DROPPED", - "localhost_1": "ONLINE", - "localhost_2": "OFFLINE" - }, - "expectedStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE" - } - }, - "resource_0_1": { - "currentStates": { - "localhost_0": "OFFLINE", - "localhost_1": "OFFLINE", - "localhost_2": "OFFLINE" - }, - "bestPossibleStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - }, - "expectedStates": { - "localhost_0": "ONLINE", - "localhost_1": "ONLINE", - "localhost_2": "ONLINE" - } - } - } - } -] \ No newline at end of file
