This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch replica_level_throttle
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/replica_level_throttle by this
push:
new 29bd86f [Replica Level Throttle] Make Pipeline in a correct order and
fixes tests (#1750)
29bd86f is described below
commit 29bd86fe867ba05ae19b81a2afa69478d12d2e48
Author: Junkai Xue <[email protected]>
AuthorDate: Tue May 25 10:19:08 2021 -0700
[Replica Level Throttle] Make Pipeline in a correct order and fixes tests
(#1750)
* Make Pipeline in a correct order and fixes tests
1. Make pipeline running in a correct order to process computation
2. Add "DROPPED" case in the mapping to reflect the real case.
3. Fixes test cases.
---
.../helix/controller/GenericHelixController.java | 4 +-
.../stages/IntermediateStateCalcStage.java | 36 +-
.../resource/ResourceMessageGenerationPhase.java | 7 +-
.../helix/monitoring/mbeans/ResourceMonitor.java | 58 ++--
.../helix/controller/stages/BaseStageTest.java | 9 +
.../stages/TestIntermediateStateCalcStage.java | 60 ++--
.../controller/stages/TestRebalancePipeline.java | 14 +-
.../controller/stages/TestRecoveryLoadBalance.java | 225 -------------
.../stages/TestStateTransitionPriority.java | 95 ++++--
.../TestNoThrottleDisabledPartitions.java | 38 ++-
.../controller/TestRedundantDroppedMessage.java | 7 +-
.../messaging/p2pMessage/TestP2PMessages.java | 2 +-
.../TestP2PMessagesAvoidDuplicatedMessage.java | 11 +-
.../p2pMessage/TestP2PStateTransitionMessages.java | 6 +-
.../TestP2PWithStateCancellationMessage.java | 11 +-
.../monitoring/mbeans/TestRebalancerMetrics.java | 18 +-
.../test/resources/TestPartitionLevelPriority.json | 2 +-
.../TestRecoveryLoadBalance.MasterSlave.json | 367 ---------------------
.../TestRecoveryLoadBalance.OnlineOffline.json | 206 ------------
19 files changed, 233 insertions(+), 943 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 4dad60f..4baac38 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -513,12 +513,14 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline(pipelineName);
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new IntermediateStateCalcStage());
// Need to add MaintenanceRecoveryStage here because
MAX_PARTITIONS_PER_INSTANCE check could
// only occur after IntermediateStateCalcStage calculation
rebalancePipeline.addStage(new MaintenanceRecoveryStage());
rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
+ // The IntermediateStateCalcStage should be applied after message
selection
+ // Messages are throttled already removed by IntermediateStateCalcStage
in MessageSelection output
+ rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
rebalancePipeline.addStage(new ResourceMessageDispatchStage());
rebalancePipeline.addStage(new PersistAssignmentStage());
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index b91dca6..b06352d 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -51,6 +51,7 @@ import org.apache.helix.model.Resource;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.monitoring.mbeans.ResourceMonitor;
+import org.apache.helix.participant.statemachine.StateModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -347,11 +348,10 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
Collections.sort(partitions, new
PartitionPriorityComparator(bestPossiblePartitionStateMap.getStateMap(),
currentStateOutput.getCurrentStateMap(resourceName),
stateModelDef.getTopState()));
for (Partition partition : partitions) {
- List<Message> messagesToThrottle = new
ArrayList<>(resourceMessageMap.get(partition));
- if (messagesToThrottle == null || messagesToThrottle.isEmpty()) {
+ if (resourceMessageMap.get(partition) == null ||
resourceMessageMap.get(partition).isEmpty()) {
continue;
}
-
+ List<Message> messagesToThrottle = new
ArrayList<>(resourceMessageMap.get(partition));
Map<String, String> derivedCurrentStateMap =
currentStateOutput.getCurrentStateMap(resourceName, partition)
.entrySet()
.stream()
@@ -468,6 +468,9 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
StateTransitionThrottleConfig.RebalanceType rebalanceType =
getRebalanceTypePerMessage(requiredStates, message,
currentStateMap);
String currentState = currentStateMap.get(message.getTgtName());
+ if (currentState == null) {
+ currentState = stateModelDefinition.getInitialState();
+ }
if (!message.getToState().equals(currentState) &&
message.getFromState().equals(currentState)
&& !cache.getDisabledInstancesForPartition(resourceName,
partition.getPartitionName())
.contains(message.getTgtName())) {
@@ -833,7 +836,10 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
// Higher priority for the partition with fewer replicas with states
matching with IdealState
int idealStateMatched1 = getIdealStateMatched(p1);
int idealStateMatched2 = getIdealStateMatched(p2);
- return Integer.compare(idealStateMatched1, idealStateMatched2);
+ if (idealStateMatched1 != idealStateMatched2) {
+ return Integer.compare(idealStateMatched1, idealStateMatched2);
+ }
+ return p1.getPartitionName().compareTo(p2.getPartitionName());
}
private int getMissTopStateIndex(Partition partition) {
@@ -889,17 +895,23 @@ public class IntermediateStateCalcStage extends
AbstractBaseStage {
private void computeIntermediateMap(PartitionStateMap intermediateStateMap,
Map<Partition, Map<String, Message>> pendingMessageMap, Map<Partition,
List<Message>> resourceMessageMap) {
for (Map.Entry<Partition, Map<String, Message>> entry :
pendingMessageMap.entrySet()) {
- entry.getValue()
- .entrySet()
- .stream()
- .forEach(
- e -> intermediateStateMap.setState(entry.getKey(),
e.getValue().getTgtName(), e.getValue().getToState()));
+ entry.getValue().entrySet().stream().forEach(e -> {
+ if
(!e.getValue().getToState().equals(HelixDefinedState.DROPPED.name())) {
+ intermediateStateMap.setState(entry.getKey(),
e.getValue().getTgtName(), e.getValue().getToState());
+ } else {
+
intermediateStateMap.getStateMap().get(entry.getKey()).remove(e.getValue().getTgtName());
+ }
+ });
}
for (Map.Entry<Partition, List<Message>> entry :
resourceMessageMap.entrySet()) {
- entry.getValue()
- .stream()
- .forEach(e -> intermediateStateMap.setState(entry.getKey(),
e.getTgtName(), e.getToState()));
+ entry.getValue().stream().forEach(e -> {
+ if (!e.getToState().equals(HelixDefinedState.DROPPED.name())) {
+ intermediateStateMap.setState(entry.getKey(), e.getTgtName(),
e.getToState());
+ } else {
+
intermediateStateMap.getStateMap().get(entry.getKey()).remove(e.getTgtName());
+ }
+ });
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
index 7c4d1b7..73b309e 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/resource/ResourceMessageGenerationPhase.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages.resource;
*/
import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.IntermediateStateOutput;
import org.apache.helix.controller.stages.MessageGenerationPhase;
@@ -30,8 +31,8 @@ import
org.apache.helix.controller.stages.MessageGenerationPhase;
public class ResourceMessageGenerationPhase extends MessageGenerationPhase {
@Override
public void process(ClusterEvent event) throws Exception {
- IntermediateStateOutput intermediateStateOutput =
- event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
- processEvent(event, intermediateStateOutput);
+ BestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+ processEvent(event, bestPossibleStateOutput);
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
index b659f9c..51d3321 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ResourceMonitor.java
@@ -60,10 +60,10 @@ public class ResourceMonitor extends DynamicMBeanProvider {
private SimpleDynamicMetric<Long> _externalViewIdealStateDiff;
private SimpleDynamicMetric<Long> _numLessMinActiveReplicaPartitions;
private SimpleDynamicMetric<Long> _numLessReplicaPartitions;
- private SimpleDynamicMetric<Long> _numPendingRecoveryRebalancePartitions;
- private SimpleDynamicMetric<Long> _numPendingLoadRebalancePartitions;
- private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledPartitions;
- private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledPartitions;
+ private SimpleDynamicMetric<Long> _numPendingRecoveryRebalanceReplicas;
+ private SimpleDynamicMetric<Long> _numPendingLoadRebalanceReplicas;
+ private SimpleDynamicMetric<Long> _numRecoveryRebalanceThrottledReplicas;
+ private SimpleDynamicMetric<Long> _numLoadRebalanceThrottledReplicas;
private SimpleDynamicMetric<Long> _numPendingStateTransitions;
// Counters
@@ -110,14 +110,16 @@ public class ResourceMonitor extends DynamicMBeanProvider
{
_dynamicCapacityMetricsMap = new ConcurrentHashMap<>();
_externalViewIdealStateDiff = new
SimpleDynamicMetric("DifferenceWithIdealStateGauge", 0L);
- _numLoadRebalanceThrottledPartitions =
- new SimpleDynamicMetric("LoadRebalanceThrottledPartitionGauge", 0L);
- _numRecoveryRebalanceThrottledPartitions =
- new SimpleDynamicMetric("RecoveryRebalanceThrottledPartitionGauge",
0L);
- _numPendingLoadRebalancePartitions =
- new SimpleDynamicMetric("PendingLoadRebalancePartitionGauge", 0L);
- _numPendingRecoveryRebalancePartitions =
- new SimpleDynamicMetric("PendingRecoveryRebalancePartitionGauge", 0L);
+ _numPendingRecoveryRebalanceReplicas =
+ new SimpleDynamicMetric("PendingRecoveryRebalanceReplicaGauge", 0L);
+ _numLoadRebalanceThrottledReplicas =
+ new SimpleDynamicMetric("LoadRebalanceThrottledReplicaGauge", 0L);
+ _numRecoveryRebalanceThrottledReplicas =
+ new SimpleDynamicMetric("RecoveryRebalanceThrottledReplicaGauge", 0L);
+ _numPendingLoadRebalanceReplicas =
+ new SimpleDynamicMetric("PendingLoadRebalanceReplicaGauge", 0L);
+ _numPendingRecoveryRebalanceReplicas =
+ new SimpleDynamicMetric("PendingRecoveryRebalanceReplicaGauge", 0L);
_numLessReplicaPartitions = new
SimpleDynamicMetric("MissingReplicaPartitionGauge", 0L);
_numLessMinActiveReplicaPartitions =
new SimpleDynamicMetric("MissingMinActiveReplicaPartitionGauge", 0L);
@@ -370,10 +372,10 @@ public class ResourceMonitor extends DynamicMBeanProvider
{
public void updateRebalancerStats(long numPendingRecoveryRebalancePartitions,
long numPendingLoadRebalancePartitions, long
numRecoveryRebalanceThrottledPartitions,
long numLoadRebalanceThrottledPartitions) {
-
_numPendingRecoveryRebalancePartitions.updateValue(numPendingRecoveryRebalancePartitions);
-
_numPendingLoadRebalancePartitions.updateValue(numPendingLoadRebalancePartitions);
-
_numRecoveryRebalanceThrottledPartitions.updateValue(numRecoveryRebalanceThrottledPartitions);
-
_numLoadRebalanceThrottledPartitions.updateValue(numLoadRebalanceThrottledPartitions);
+
_numPendingRecoveryRebalanceReplicas.updateValue(numPendingRecoveryRebalancePartitions);
+
_numPendingLoadRebalanceReplicas.updateValue(numPendingLoadRebalancePartitions);
+
_numRecoveryRebalanceThrottledReplicas.updateValue(numRecoveryRebalanceThrottledPartitions);
+
_numLoadRebalanceThrottledReplicas.updateValue(numLoadRebalanceThrottledPartitions);
}
/**
@@ -422,20 +424,20 @@ public class ResourceMonitor extends DynamicMBeanProvider
{
return _numLessReplicaPartitions.getValue();
}
- public long getPendingRecoveryRebalancePartitionGauge() {
- return _numPendingRecoveryRebalancePartitions.getValue();
+ public long getNumPendingRecoveryRebalanceReplicas() {
+ return _numPendingRecoveryRebalanceReplicas.getValue();
}
- public long getPendingLoadRebalancePartitionGauge() {
- return _numPendingLoadRebalancePartitions.getValue();
+ public long getNumPendingLoadRebalanceReplicas() {
+ return _numPendingLoadRebalanceReplicas.getValue();
}
- public long getRecoveryRebalanceThrottledPartitionGauge() {
- return _numRecoveryRebalanceThrottledPartitions.getValue();
+ public long getNumRecoveryRebalanceThrottledReplicas() {
+ return _numRecoveryRebalanceThrottledReplicas.getValue();
}
- public long getLoadRebalanceThrottledPartitionGauge() {
- return _numLoadRebalanceThrottledPartitions.getValue();
+ public long getNumLoadRebalanceThrottledReplicas() {
+ return _numLoadRebalanceThrottledReplicas.getValue();
}
public long getNumPendingStateTransitionGauge() {
@@ -461,10 +463,10 @@ public class ResourceMonitor extends DynamicMBeanProvider
{
_numNonTopStatePartitions,
_numLessMinActiveReplicaPartitions,
_numLessReplicaPartitions,
- _numPendingRecoveryRebalancePartitions,
- _numPendingLoadRebalancePartitions,
- _numRecoveryRebalanceThrottledPartitions,
- _numLoadRebalanceThrottledPartitions,
+ _numPendingRecoveryRebalanceReplicas,
+ _numPendingLoadRebalanceReplicas,
+ _numRecoveryRebalanceThrottledReplicas,
+ _numLoadRebalanceThrottledReplicas,
_externalViewIdealStateDiff,
_successfulTopStateHandoffDurationCounter,
_successTopStateHandoffCounter,
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index cc6b4df..2d4aa09 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.Message;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
@@ -276,4 +277,12 @@ public class BaseStageTest {
return resourceMap;
}
+
+ protected Message generateMessage(String fromState, String toState, String
tgtName) {
+ Message message = new Message(new ZNRecord(UUID.randomUUID().toString()));
+ message.setTgtName(tgtName);
+ message.setFromState(fromState);
+ message.setToState(toState);
+ return message;
+ }
}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
index 9584edb..7f17dea 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
@@ -51,15 +51,14 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
}
preSetup(resources, nReplica, nReplica);
- event.addAttribute(AttributeName.RESOURCES.name(),
- getResourceMap(resources, nPartition, "OnlineOffline"));
+ event.addAttribute(AttributeName.RESOURCES.name(),
getResourceMap(resources, nPartition, "OnlineOffline"));
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
getResourceMap(resources, nPartition, "OnlineOffline"));
// Initialize bestpossible state and current state
BestPossibleStateOutput bestPossibleStateOutput = new
BestPossibleStateOutput();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
-
+ MessageOutput messageSelectOutput = new MessageOutput();
IntermediateStateOutput expectedResult = new IntermediateStateOutput();
_clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(1);
@@ -79,11 +78,11 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
// Regular recovery balance
currentStateOutput.setCurrentState(resource, partition,
instanceName, "OFFLINE");
// add blocked state transition messages
- Message pendingMessage = new Message("customType", "001");
- pendingMessage.setToState("ONLINE");
+ Message pendingMessage = generateMessage("OFFLINE", "ONLINE",
instanceName);
currentStateOutput.setPendingMessage(resource, partition,
instanceName, pendingMessage);
bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
+
// should be recovered:
expectedResult.setState(resource, partition, instanceName,
"ONLINE");
} else if (resource.endsWith("1")) {
@@ -91,6 +90,8 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
currentStateOutput.setCurrentState(resource, partition,
instanceName, "ONLINE");
currentStateOutput.setCurrentState(resource, partition,
instanceName + "-1", "OFFLINE");
bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
+ messageSelectOutput.addMessage(resource, partition,
+ generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
// should be recovered:
expectedResult.setState(resource, partition, instanceName,
"ONLINE");
} else if (resource.endsWith("2")) {
@@ -110,14 +111,16 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
expectedResult.setState(resource, partition, instanceName,
"ERROR");
} else {
currentStateOutput.setCurrentState(resource, partition,
instanceName, "OFFLINE");
+ messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "ONLINE", instanceName));
// Recovery balance
expectedResult.setState(resource, partition, instanceName,
"ONLINE");
}
} else {
currentStateOutput.setCurrentState(resource, partition,
instanceName, "ONLINE");
- currentStateOutput
- .setCurrentState(resource, partition, instanceName + "-1",
"OFFLINE");
+ currentStateOutput.setCurrentState(resource, partition,
instanceName + "-1", "OFFLINE");
// load balance is throttled, so keep all current states
+ messageSelectOutput.addMessage(resource, partition,
+ generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
expectedResult.setState(resource, partition, instanceName,
"ONLINE");
// The following must be removed because now downward state
transitions are allowed
// expectedResult.setState(resource, partition, instanceName +
"-1", "OFFLINE");
@@ -129,16 +132,18 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
// This partition requires recovery
currentStateOutput.setCurrentState(resource, partition,
instanceName, "OFFLINE");
bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
+ messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "ONLINE", instanceName));
// After recovery, it should be back ONLINE
expectedResult.setState(resource, partition, instanceName,
"ONLINE");
} else {
// Other partitions require dropping of replicas
currentStateOutput.setCurrentState(resource, partition,
instanceName, "ONLINE");
- currentStateOutput
- .setCurrentState(resource, partition, instanceName + "-1",
"OFFLINE");
+ currentStateOutput.setCurrentState(resource, partition,
instanceName + "-1", "OFFLINE");
// BestPossibleState dictates that we only need one ONLINE
replica
bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
bestPossibleStateOutput.setState(resource, partition,
instanceName + "-1", "DROPPED");
+ messageSelectOutput.addMessage(resource, partition,
+ generateMessage("OFFLINE", "DROPPED", instanceName + "-1"));
// So instanceName-1 will NOT be expected to show up in
expectedResult
expectedResult.setState(resource, partition, instanceName,
"ONLINE");
expectedResult.setState(resource, partition, instanceName +
"-1", "DROPPED");
@@ -150,6 +155,7 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
// Set up a partition requiring recovery
currentStateOutput.setCurrentState(resource, partition,
instanceName, "OFFLINE");
bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
+ messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "ONLINE", instanceName));
// After recovery, it should be back ONLINE
expectedResult.setState(resource, partition, instanceName,
"ONLINE");
} else {
@@ -157,6 +163,8 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
// Check that load balance (bringing up a new node) did not take
place
bestPossibleStateOutput.setState(resource, partition,
instanceName + "-1", "ONLINE");
+ messageSelectOutput.addMessage(resource, partition,
+ generateMessage("OFFLINE", "ONLINE", instanceName + "-1"));
expectedResult.setState(resource, partition, instanceName,
"ONLINE");
}
}
@@ -166,9 +174,9 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
}
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(),
messageSelectOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
- event.addAttribute(AttributeName.ControllerDataProvider.name(),
- new ResourceControllerDataProvider());
+ event.addAttribute(AttributeName.ControllerDataProvider.name(), new
ResourceControllerDataProvider());
runStage(event, new ReadClusterDataStage());
runStage(event, new IntermediateStateCalcStage());
@@ -177,7 +185,8 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
for (String resource : resources) {
// Note Assert.assertEquals won't work. If "actual" is an empty map, it
won't compare
// anything.
- Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap()
+ Assert.assertTrue(output.getPartitionStateMap(resource)
+ .getStateMap()
.equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
}
}
@@ -195,14 +204,13 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
}
preSetup(resources, nReplica, nReplica);
- event.addAttribute(AttributeName.RESOURCES.name(),
- getResourceMap(resources, nPartition,
- "OnlineOffline"));
+ event.addAttribute(AttributeName.RESOURCES.name(),
getResourceMap(resources, nPartition, "OnlineOffline"));
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
getResourceMap(resources, nPartition, "OnlineOffline"));
// Initialize best possible state and current state
BestPossibleStateOutput bestPossibleStateOutput = new
BestPossibleStateOutput();
+ MessageOutput messageSelectOutput = new MessageOutput();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
IntermediateStateOutput expectedResult = new IntermediateStateOutput();
@@ -225,6 +233,7 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
// Set up a partition requiring recovery
currentStateOutput.setCurrentState(resource, partition,
instanceName, "OFFLINE");
bestPossibleStateOutput.setState(resource, partition,
instanceName, "ONLINE");
+ messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "ONLINE", instanceName));
// After recovery, it should be back ONLINE
expectedResult.setState(resource, partition, instanceName,
"ONLINE");
} else {
@@ -236,6 +245,8 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
// This partition to bring up a replica (load balance will
happen)
bestPossibleStateOutput.setState(resource, partition,
instanceName + "-1", "ONLINE");
+ messageSelectOutput.addMessage(resource, partition,
+ generateMessage("OFFLINE", "ONLINE", instanceName + "-1"));
expectedResult.setState(resource, partition, instanceName +
"-1", "ONLINE");
}
}
@@ -246,8 +257,8 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
- event.addAttribute(AttributeName.ControllerDataProvider.name(),
- new ResourceControllerDataProvider());
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(),
messageSelectOutput);
+ event.addAttribute(AttributeName.ControllerDataProvider.name(), new
ResourceControllerDataProvider());
runStage(event, new ReadClusterDataStage());
runStage(event, new IntermediateStateCalcStage());
@@ -256,7 +267,8 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
for (String resource : resources) {
// Note Assert.assertEquals won't work. If "actual" is an empty map, it
won't compare
// anything.
- Assert.assertEquals(output.getPartitionStateMap(resource).getStateMap(),
expectedResult.getPartitionStateMap(resource).getStateMap());
+ Assert.assertEquals(output.getPartitionStateMap(resource).getStateMap(),
+ expectedResult.getPartitionStateMap(resource).getStateMap());
}
}
@@ -268,13 +280,11 @@ public class TestIntermediateStateCalcStage extends
BaseStageTest {
// Set up cluster configs
_clusterConfig =
accessor.getProperty(accessor.keyBuilder().clusterConfig());
- _clusterConfig.setStateTransitionThrottleConfigs(ImmutableList
- .of(new StateTransitionThrottleConfig(
- StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
- StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3),
- new StateTransitionThrottleConfig(
- StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
- StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3)));
+ _clusterConfig.setStateTransitionThrottleConfigs(ImmutableList.of(
+ new
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
+ StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3),
+ new
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
+ StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 3)));
setClusterConfig(_clusterConfig);
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 66c1771..d262b7b 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -97,9 +97,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
+ rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
rebalancePipeline.addStage(new ResourceMessageDispatchStage());
@@ -133,9 +133,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
Pipeline messagePipeline = new Pipeline();
messagePipeline.addStage(new BestPossibleStateCalcStage());
- messagePipeline.addStage(new IntermediateStateCalcStage());
messagePipeline.addStage(new ResourceMessageGenerationPhase());
messagePipeline.addStage(new MessageSelectionStage());
+ messagePipeline.addStage(new IntermediateStateCalcStage());
messagePipeline.addStage(new MessageThrottleStage());
messagePipeline.addStage(new ResourceMessageDispatchStage());
@@ -157,7 +157,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
Thread.sleep(2 * MessageGenerationPhase.DEFAULT_OBSELETE_MSG_PURGE_DELAY);
runPipeline(event, dataRefresh, false);
-
+
// Verify the stale message should be deleted
Assert.assertTrue(TestHelper.verify(() -> {
if (dataCache.getStaleMessages().size() != 0) {
@@ -334,9 +334,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
+ rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
rebalancePipeline.addStage(new ResourceMessageDispatchStage());
@@ -431,9 +431,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
+ rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
rebalancePipeline.addStage(new ResourceMessageDispatchStage());
@@ -510,9 +510,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
+ rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
rebalancePipeline.addStage(new ResourceMessageDispatchStage());
@@ -590,9 +590,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
rebalancePipeline.addStage(new ResourceComputationStage());
rebalancePipeline.addStage(new CurrentStateComputationStage());
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
+ rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
rebalancePipeline.addStage(new ResourceMessageDispatchStage());
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java
deleted file mode 100644
index 32d8663..0000000
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java
+++ /dev/null
@@ -1,225 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.helix.api.config.StateTransitionThrottleConfig;
-import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
-import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Partition;
-import org.testng.Assert;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-public class TestRecoveryLoadBalance extends BaseStageTest {
-
- private final String INPUT = "inputs";
- private final String MIN_ACTIVE_REPLICAS = "minActiveReplicas";
- private final String LOAD_BALANCE_THROTTLE = "loadBalanceThrottle";
- private final String CURRENT_STATE = "currentStates";
- private final String BEST_POSSIBLE_STATE = "bestPossibleStates";
- private final String EXPECTED_STATE = "expectedStates";
- private final String ERROR_OR_RECOVERY_PARTITION_THRESHOLD =
- "errorOrRecoveryPartitionThresholdForLoadBalance";
- private final String STATE_MODEL = "statemodel";
- private ClusterConfig _clusterConfig;
-
- @Test(dataProvider = "recoveryLoadBalanceInput")
- public void testRecoveryAndLoadBalance(String stateModelDef,
- int errorOrRecoveryPartitionThresholdForLoadBalance,
- Map<String, Map<String, Map<String, String>>> stateMapping, int
minActiveReplicas, int loadBalanceThrottle) {
- System.out.println("START TestRecoveryLoadBalance at " + new
Date(System.currentTimeMillis()));
-
- String resourcePrefix = "resource";
- int nResource = 1;
- int nPartition = stateMapping.size();
- int nReplica = 3;
-
- Set<String> resourceSet = new HashSet<>();
- for (int i = 0; i < nResource; i++) {
- resourceSet.add(resourcePrefix + "_" + i);
- }
-
- preSetup(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
resourceSet, nReplica,
- nReplica, stateModelDef, minActiveReplicas);
-
- _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(
- errorOrRecoveryPartitionThresholdForLoadBalance);
- if (loadBalanceThrottle >= 0) {
- _clusterConfig.setStateTransitionThrottleConfigs(Arrays.asList(
- new StateTransitionThrottleConfig(
- StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
- StateTransitionThrottleConfig.ThrottleScope.RESOURCE,
loadBalanceThrottle)));
- }
- setClusterConfig(_clusterConfig);
-
- event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(
- resourceSet.toArray(new String[resourceSet.size()]), nPartition,
stateModelDef));
- event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
getResourceMap(
- resourceSet.toArray(new String[resourceSet.size()]), nPartition,
stateModelDef));
- event.addAttribute(AttributeName.ControllerDataProvider.name(),
- new ResourceControllerDataProvider());
-
- // Initialize bestpossible state and current state
- BestPossibleStateOutput bestPossibleStateOutput = new
BestPossibleStateOutput();
- CurrentStateOutput currentStateOutput = new CurrentStateOutput();
- IntermediateStateOutput expectedResult = new IntermediateStateOutput();
-
- for (String resource : resourceSet) {
- IdealState is =
accessor.getProperty(accessor.keyBuilder().idealStates(resource));
- setSingleIdealState(is);
-
- Map<String, List<String>> partitionMap = new HashMap<>();
- for (int p = 0; p < nPartition; p++) {
- Partition partition = new Partition(resource + "_" + p);
-
- // Set input
- for (int r = 0; r <
stateMapping.get(partition.toString()).get(CURRENT_STATE).size(); r++) {
- String instanceName = HOSTNAME_PREFIX + r;
- currentStateOutput.setCurrentState(resource, partition, instanceName,
-
stateMapping.get(partition.toString()).get(CURRENT_STATE).get(instanceName));
- }
- for (int r = 0; r <
stateMapping.get(partition.toString()).get(BEST_POSSIBLE_STATE)
- .size(); r++) {
- String instanceName = HOSTNAME_PREFIX + r;
- bestPossibleStateOutput.setState(resource, partition, instanceName,
-
stateMapping.get(partition.toString()).get(BEST_POSSIBLE_STATE).get(instanceName));
- }
- for (int r = 0; r <
stateMapping.get(partition.toString()).get(EXPECTED_STATE)
- .size(); r++) {
- String instanceName = HOSTNAME_PREFIX + r;
- expectedResult.setState(resource, partition, instanceName,
-
stateMapping.get(partition.toString()).get(EXPECTED_STATE).get(instanceName));
- }
-
- // Set partitionMap
- for (int r = 0; r < nReplica; r++) {
- String instanceName = HOSTNAME_PREFIX + r;
- partitionMap.put(partition.getPartitionName(),
Collections.singletonList(instanceName));
- }
- }
- bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
- }
-
- event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
- event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
- runStage(event, new ReadClusterDataStage());
- runStage(event, new IntermediateStateCalcStage());
-
- IntermediateStateOutput output =
event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
-
- for (String resource : resourceSet) {
- // For debugging purposes
- // Object map1 = output.getPartitionStateMap(resource).getStateMap();
- // Object map2 =
expectedResult.getPartitionStateMap(resource).getStateMap();
-
- // Note Assert.assertEquals won't work. If "actual" is an empty map, it
won't compare
- // anything.
- Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap()
-
.equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
- }
-
- System.out.println("END TestRecoveryLoadBalance at " + new
Date(System.currentTimeMillis()));
- }
-
- @DataProvider(name = "recoveryLoadBalanceInput")
- public Object[][] rebalanceStrategies() {
-
- try {
- List<Object[]> data = new ArrayList<>();
- // Add data
-
data.addAll(loadTestInputs("TestRecoveryLoadBalance.OnlineOffline.json"));
- data.addAll(loadTestInputs("TestRecoveryLoadBalance.MasterSlave.json"));
-
- Object[][] ret = new Object[data.size()][];
- for (int i = 0; i < data.size(); i++) {
- ret[i] = data.get(i);
- }
- return ret;
- } catch (Throwable e) {
- return new Object[][] {
- {}
- };
- }
- }
-
- public List<Object[]> loadTestInputs(String fileName) {
- List<Object[]> ret = new ArrayList<>();
- InputStream inputStream =
getClass().getClassLoader().getResourceAsStream(fileName);
- try {
- ObjectReader mapReader = new ObjectMapper().reader(List.class);
- List<Map<String, Object>> inputList = mapReader.readValue(inputStream);
- for (Map<String, Object> inputMap : inputList) {
- String stateModelName = (String) inputMap.get(STATE_MODEL);
- int threshold = (int)
inputMap.get(ERROR_OR_RECOVERY_PARTITION_THRESHOLD);
- int minActiveReplicas = -1;
- if (inputMap.get(MIN_ACTIVE_REPLICAS) != null) {
- minActiveReplicas =
Integer.parseInt(inputMap.get(MIN_ACTIVE_REPLICAS).toString());
- }
- int loadBalanceThrottle = -1;
- if (inputMap.get(LOAD_BALANCE_THROTTLE) != null) {
- loadBalanceThrottle =
Integer.parseInt(inputMap.get(LOAD_BALANCE_THROTTLE).toString());
- }
- Map<String, Map<String, Map<String, String>>> stateMapping =
- (Map<String, Map<String, Map<String, String>>>)
inputMap.get(INPUT);
- ret.add(new Object[] {
- stateModelName, threshold, stateMapping, minActiveReplicas,
loadBalanceThrottle
- });
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- return ret;
- }
-
- private void preSetup(StateTransitionThrottleConfig.RebalanceType
rebalanceType,
- Set<String> resourceSet, int numOfLiveInstances, int numOfReplicas,
String stateModelName,
- int minActiveReplica) {
- setupIdealState(numOfLiveInstances, resourceSet.toArray(new
String[resourceSet.size()]),
- numOfLiveInstances, numOfReplicas, IdealState.RebalanceMode.FULL_AUTO,
stateModelName,
- DelayedAutoRebalancer.class.getName(),
CrushEdRebalanceStrategy.class.getName(),
- minActiveReplica);
- setupStateModel();
- setupLiveInstances(numOfLiveInstances);
-
- // Set up cluster configs
- _clusterConfig =
accessor.getProperty(accessor.keyBuilder().clusterConfig());
- StateTransitionThrottleConfig throttleConfig = new
StateTransitionThrottleConfig(rebalanceType,
- StateTransitionThrottleConfig.ThrottleScope.CLUSTER,
Integer.MAX_VALUE);
-
_clusterConfig.setStateTransitionThrottleConfigs(Collections.singletonList(throttleConfig));
- setClusterConfig(_clusterConfig);
- }
-}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
index be817ef..22c20f7 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestStateTransitionPriority.java
@@ -62,6 +62,7 @@ public class TestStateTransitionPriority extends
BaseStageTest {
// Initialize bestpossible state and current state
BestPossibleStateOutput bestPossibleStateOutput = new
BestPossibleStateOutput();
+ MessageOutput messageSelectOutput = new MessageOutput();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
for (String resource : resourceMap.keySet()) {
@@ -76,11 +77,13 @@ public class TestStateTransitionPriority extends
BaseStageTest {
Collections.singletonList(instanceName));
bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
bestPossibleStateOutput.setState(resource, partition, instanceName,
"SLAVE");
+ messageSelectOutput.addMessage(resource, partition,
generateMessage("OFFLINE", "SLAVE", instanceName));
currentStateOutput.setCurrentState(resource, partition, instanceName,
"OFFLINE");
}
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(),
messageSelectOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
runStage(event, new ReadClusterDataStage());
@@ -108,6 +111,7 @@ public class TestStateTransitionPriority extends
BaseStageTest {
// Initialize bestpossible state and current state
BestPossibleStateOutput bestPossibleStateOutput = new
BestPossibleStateOutput();
+ MessageOutput messageSelectOutput = new MessageOutput();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
for (String resource : resourceMap.keySet()) {
@@ -121,11 +125,13 @@ public class TestStateTransitionPriority extends
BaseStageTest {
String nextInstanceName = HOSTNAME_PREFIX +
(Integer.parseInt(resource.split("_")[1]) + 1);
partitionMap.put(partition.getPartitionName(),
Collections.singletonList(nextInstanceName));
bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
- bestPossibleStateOutput.setState(resource, partition, nextInstanceName,
"MASTER");
+ bestPossibleStateOutput.setState(resource, partition, instanceName,
"MASTER");
+ bestPossibleStateOutput.setState(resource, partition, nextInstanceName,
"SLAVE");
currentStateOutput.setCurrentState(resource, partition, instanceName,
"MASTER");
}
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(),
messageSelectOutput);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(),
new ResourceControllerDataProvider());
@@ -134,8 +140,10 @@ public class TestStateTransitionPriority extends
BaseStageTest {
// Keep update the current state.
List<String> resourcePriority = new ArrayList<String>();
for (int i = 0; i < resourceMap.size(); i++) {
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(),
+ generateMessageMapForResource(bestPossibleStateOutput,
currentStateOutput, resourcePriority));
runStage(event, new IntermediateStateCalcStage());
- updateCurrentStatesForLoadBalance(resourcePriority, currentStateOutput);
+ updateCurrentStatesForLoadBalance(resourcePriority, currentStateOutput,
bestPossibleStateOutput);
}
Assert.assertEquals(resourcePriority, expectedPriority);
@@ -164,7 +172,6 @@ public class TestStateTransitionPriority extends
BaseStageTest {
Resource resource = new Resource(resourceName);
BestPossibleStateOutput bestPossibleStateOutput = new
BestPossibleStateOutput();
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
-
for (String partitionName : bestPossibleMap.keySet()) {
Partition partition = new Partition(partitionName);
bestPossibleStateOutput.setPreferenceList(resourceName, partitionName,
preferenceList);
@@ -173,6 +180,7 @@ public class TestStateTransitionPriority extends
BaseStageTest {
bestPossibleMap.get(partitionName).get(instanceName));
currentStateOutput.setCurrentState(resourceName, partition,
instanceName,
currentStateMap.get(partitionName).get(instanceName));
+
}
resource.addPartition(partitionName);
}
@@ -183,6 +191,8 @@ public class TestStateTransitionPriority extends
BaseStageTest {
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
Collections.singletonMap(resourceName, resource));
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(),
+ generateMessageMapForPartition(bestPossibleMap, currentStateMap,
Collections.emptyList(), resourceName));
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
event.addAttribute(AttributeName.ControllerDataProvider.name(),
new ResourceControllerDataProvider());
@@ -191,9 +201,10 @@ public class TestStateTransitionPriority extends
BaseStageTest {
// Keep update the current state.
List<String> partitionPriority = new ArrayList<String>();
for (int i = 0; i < bestPossibleMap.size(); i++) {
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.name(),
+ generateMessageMapForPartition(bestPossibleMap, currentStateMap,
partitionPriority, resourceName));
runStage(event, new IntermediateStateCalcStage());
- updateCurrentStateForPartitionLevelPriority(partitionPriority,
currentStateOutput,
- resourceName, bestPossibleMap);
+ updateCurrentStateForPartitionLevelPriority(partitionPriority,
currentStateOutput, resourceName, bestPossibleMap);
}
Assert.assertEquals(partitionPriority, expectedPriority);
@@ -287,25 +298,52 @@ public class TestStateTransitionPriority extends
BaseStageTest {
}
}
- private void updateCurrentStatesForLoadBalance(List<String> resourcePriority,
- CurrentStateOutput currentStateOutput) {
- IntermediateStateOutput output =
event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
- for (PartitionStateMap partitionStateMap :
output.getResourceStatesMap().values()) {
- String resourceName = partitionStateMap.getResourceName();
+ private void updateCurrentStatesForLoadBalance(List<String>
resourcePriority, CurrentStateOutput currentStateOutput,
+ BestPossibleStateOutput bestPossibleStateOutput) {
+ MessageOutput output =
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ for (String resourceName :
bestPossibleStateOutput.getResourceStatesMap().keySet()) {
Partition partition = new Partition(resourceName + "_0");
- String oldInstance = HOSTNAME_PREFIX + resourceName.split("_")[1];
- String expectedInstance =
- HOSTNAME_PREFIX + (Integer.parseInt(resourceName.split("_")[1]) + 1);
- if
(partitionStateMap.getPartitionMap(partition).containsKey(expectedInstance)
- && !resourcePriority.contains(resourceName)) {
- currentStateOutput.getCurrentStateMap(resourceName,
partition).remove(oldInstance);
- updateCurrentOutput(resourcePriority, currentStateOutput,
resourceName, partition,
- expectedInstance, "MASTER");
+ if (output.getResourceMessageMap(resourceName).get(partition) != null
+ && output.getResourceMessageMap(resourceName).get(partition).size()
> 0) {
+ String nextInstanceName = HOSTNAME_PREFIX +
(Integer.parseInt(resourceName.split("_")[1]) + 1);
+ currentStateOutput.setCurrentState(resourceName, partition,
nextInstanceName, "SLAVE");
+ resourcePriority.add(resourceName);
break;
}
}
}
+ private MessageOutput generateMessageMapForResource(BestPossibleStateOutput
bestPossibleStateOutput,
+ CurrentStateOutput currentStateOutput, List<String> resourcePrirority) {
+ MessageOutput messageSelectOutput = new MessageOutput();
+ for (String resource :
bestPossibleStateOutput.getResourceStatesMap().keySet()) {
+ if (!resourcePrirority.contains(resource) &&
!bestPossibleStateOutput.getPartitionStateMap(resource)
+ .getStateMap()
+ .equals(currentStateOutput.getCurrentStateMap(resource))) {
+ messageSelectOutput.addMessage(resource, new Partition(resource +
"_0"),
+ generateMessage("OFFLINE", "SLAVE", (HOSTNAME_PREFIX +
(Integer.parseInt(resource.split("_")[1]) + 1))));
+ }
+ }
+ return messageSelectOutput;
+ }
+
+ private MessageOutput generateMessageMapForPartition(Map<String, Map<String,
String>> bestPossibleMap,
+ Map<String, Map<String, String>> currentStateMap, List<String>
partitionPriority, String resourceName) {
+ MessageOutput messageSelectOutput = new MessageOutput();
+ for (String partitionName : bestPossibleMap.keySet()) {
+ for (String instanceName : bestPossibleMap.get(partitionName).keySet()) {
+ if (!partitionPriority.contains(partitionName) &&
!bestPossibleMap.get(partitionName)
+ .get(instanceName)
+ .equals(currentStateMap.get(partitionName).get(instanceName))) {
+ messageSelectOutput.addMessage(resourceName, new
Partition(partitionName),
+
generateMessage(currentStateMap.get(partitionName).get(instanceName),
+ bestPossibleMap.get(partitionName).get(instanceName),
instanceName));
+ }
+ }
+ }
+ return messageSelectOutput;
+ }
+
private void updateCurrentOutput(List<String> resourcePriority,
CurrentStateOutput currentStateOutput, String resourceName, Partition
partition,
String instanceName, String state) {
@@ -317,19 +355,14 @@ public class TestStateTransitionPriority extends
BaseStageTest {
private void updateCurrentStateForPartitionLevelPriority(List<String>
partitionPriority,
CurrentStateOutput currentStateOutput, String resourceName,
Map<String, Map<String, String>> bestPossibleMap) {
- IntermediateStateOutput output =
event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
- PartitionStateMap partitionStateMap =
output.getPartitionStateMap(resourceName);
- for (Partition partition : partitionStateMap.getStateMap().keySet()) {
- Map<String, String> instanceStateMap =
bestPossibleMap.get(partition.getPartitionName());
- if (partitionStateMap.getPartitionMap(partition).equals(instanceStateMap)
- && !partitionPriority.contains(partition.getPartitionName())) {
- partitionPriority.add(partition.getPartitionName());
- for (String instanceName : instanceStateMap.keySet()) {
- currentStateOutput.setCurrentState(resourceName, partition,
instanceName,
- instanceStateMap.get(instanceName));
- }
- break;
+ MessageOutput output =
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
+ output.getResourceMessageMap(resourceName).entrySet().stream().filter(e ->
e.getValue().size() > 0).forEach(e -> {
+ partitionPriority.add(e.getKey().toString());
+
+ for (String instanceName :
bestPossibleMap.get(e.getKey().toString()).keySet()) {
+ currentStateOutput.setCurrentState(resourceName, e.getKey(),
instanceName,
+ bestPossibleMap.get(e.getKey().toString()).get(instanceName));
}
- }
+ });
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
index df6bc02..98dd281 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java
@@ -69,9 +69,9 @@ public class TestNoThrottleDisabledPartitions extends
ZkTestBase {
setupEnvironment(participantCount);
// Set the throttling only for load balance
- setThrottleConfigForLoadBalance();
+ setThrottleConfigForLoadBalance(1);
- // Disable instance 0 so that it will cause a partition to do a recovery
balance
+ // Disable instance 0 so that it will cause a partition to do a load
balance
PropertyKey key =
_accessor.keyBuilder().instanceConfig(_participants[0].getInstanceName());
InstanceConfig instanceConfig = _accessor.getProperty(key);
instanceConfig.setInstanceEnabled(false);
@@ -261,7 +261,7 @@ public class TestNoThrottleDisabledPartitions extends
ZkTestBase {
public void testNoThrottleOnDisabledPartition() throws Exception {
int participantCount = 3;
setupEnvironment(participantCount);
- setThrottleConfig();
+ setThrottleConfig(3); // Convert partition to replica mapping should be 1
-> 3
// Disable a partition so that it will not be subject to throttling
String partitionName = _resourceName + "0_0";
@@ -292,8 +292,9 @@ public class TestNoThrottleDisabledPartitions extends
ZkTestBase {
controller.syncStart();
Thread.sleep(500L);
+ // The throttle quota will be consumed by first partition with all
for (MockParticipantManager participantManager : _participants) {
- Assert.assertTrue(verifyTwoMessages(participantManager));
+ Assert.assertTrue(verifySingleMessage(participantManager));
}
// clean up the cluster
@@ -339,11 +340,14 @@ public class TestNoThrottleDisabledPartitions extends
ZkTestBase {
controller.syncStop();
}
+ private void setThrottleConfig() {
+ setThrottleConfig(1);
+ }
/**
* Set all throttle configs at 1 so that we could test by observing the
number of ongoing
* transitions.
*/
- private void setThrottleConfig() {
+ private void setThrottleConfig(int maxReplicas) {
PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
ClusterConfig clusterConfig =
_accessor.getProperty(_accessor.keyBuilder().clusterConfig());
@@ -353,27 +357,31 @@ public class TestNoThrottleDisabledPartitions extends
ZkTestBase {
// Add throttling at cluster-level
throttleConfigs.add(new StateTransitionThrottleConfig(
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
- StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1));
+ StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas));
throttleConfigs.add(
new
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
- StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1));
+ StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas));
throttleConfigs
.add(new
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY,
- StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 1));
+ StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas));
// Add throttling at instance level
throttleConfigs
.add(new
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.ANY,
- StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 1));
+ StateTransitionThrottleConfig.ThrottleScope.INSTANCE,
maxReplicas));
clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
_accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
}
+ private void setThrottleConfigForLoadBalance() {
+ setThrottleConfigForLoadBalance(0);
+ }
+
/**
* Set throttle limits only for load balance so that none of them would
happen.
*/
- private void setThrottleConfigForLoadBalance() {
+ private void setThrottleConfigForLoadBalance(int maxReplicas) {
PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
ClusterConfig clusterConfig =
_accessor.getProperty(_accessor.keyBuilder().clusterConfig());
@@ -383,12 +391,12 @@ public class TestNoThrottleDisabledPartitions extends
ZkTestBase {
// Add throttling at cluster-level
throttleConfigs.add(
new
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
- StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 0));
+ StateTransitionThrottleConfig.ThrottleScope.CLUSTER, maxReplicas));
// Add throttling at instance level
throttleConfigs.add(
new
StateTransitionThrottleConfig(StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE,
- StateTransitionThrottleConfig.ThrottleScope.INSTANCE, 0));
+ StateTransitionThrottleConfig.ThrottleScope.INSTANCE,
maxReplicas));
clusterConfig.setStateTransitionThrottleConfigs(throttleConfigs);
_accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig);
@@ -481,15 +489,15 @@ public class TestNoThrottleDisabledPartitions extends
ZkTestBase {
}
/**
- * Ensure that there are 2 messages for a given Participant.
+ * Ensure that there are 1 messages for a given Participant.
* @param participant
* @return
*/
- private boolean verifyTwoMessages(final MockParticipantManager participant) {
+ private boolean verifySingleMessage(final MockParticipantManager
participant) {
PropertyKey key =
_accessor.keyBuilder().messages(participant.getInstanceName());
List<String> messageNames = _accessor.getChildNames(key);
if (messageNames != null) {
- return messageNames.size() == 2;
+ return messageNames.size() == 1;
}
return false;
}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
index 3bd9e45..9539b98 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
@@ -31,6 +31,7 @@ import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
import org.apache.helix.controller.stages.MessageOutput;
+import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.model.IdealState;
@@ -75,11 +76,13 @@ public class TestRedundantDroppedMessage extends
TaskSynchronizedTestBase {
runStage(event, new ResourceComputationStage());
runStage(event, new CurrentStateComputationStage());
runStage(event, new BestPossibleStateCalcStage());
- runStage(event, new IntermediateStateCalcStage());
Assert.assertEquals(cache.getCachedIdealMapping().size(), 1);
runStage(event, new ResourceMessageGenerationPhase());
+ runStage(event, new MessageSelectionStage());
+ runStage(event, new IntermediateStateCalcStage());
+
- MessageOutput messageOutput =
event.getAttribute(AttributeName.MESSAGES_ALL.name());
+ MessageOutput messageOutput =
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
Assert
.assertEquals(messageOutput.getMessages(resourceName, new
Partition(partitionName)).size(),
1);
diff --git
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
index 4f5f60c..1f1d7bd 100644
---
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
+++
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java
@@ -100,9 +100,9 @@ public class TestP2PMessages extends BaseStageTest {
_fullPipeline.addStage(new ResourceComputationStage());
_fullPipeline.addStage(new CurrentStateComputationStage());
_fullPipeline.addStage(new BestPossibleStateCalcStage());
- _fullPipeline.addStage(new IntermediateStateCalcStage());
_fullPipeline.addStage(new ResourceMessageGenerationPhase());
_fullPipeline.addStage(new MessageSelectionStage());
+ _fullPipeline.addStage(new IntermediateStateCalcStage());
_fullPipeline.addStage(new MessageThrottleStage());
_fullPipeline.addStage(new ResourceMessageDispatchStage());
diff --git
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
index 56e1f5a..1d164c5 100644
---
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
+++
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessagesAvoidDuplicatedMessage.java
@@ -87,14 +87,15 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends
BaseStageTest {
_fullPipeline = new Pipeline("FullPipeline");
_fullPipeline.addStage(new ReadClusterDataStage());
_fullPipeline.addStage(new BestPossibleStateCalcStage());
- _fullPipeline.addStage(new IntermediateStateCalcStage());
_fullPipeline.addStage(new ResourceMessageGenerationPhase());
_fullPipeline.addStage(new MessageSelectionStage());
+ _fullPipeline.addStage(new IntermediateStateCalcStage());
_fullPipeline.addStage(new MessageThrottleStage());
_messagePipeline = new Pipeline("MessagePipeline");
_messagePipeline.addStage(new ResourceMessageGenerationPhase());
_messagePipeline.addStage(new MessageSelectionStage());
+ _messagePipeline.addStage(new IntermediateStateCalcStage());
_messagePipeline.addStage(new MessageThrottleStage());
@@ -126,7 +127,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends
BaseStageTest {
_fullPipeline.handle(event);
- _bestpossibleState =
event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+ _bestpossibleState =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
MessageOutput messageOutput =
event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
@@ -217,7 +218,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends
BaseStageTest {
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
- event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(),
_bestpossibleState);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
_bestpossibleState);
_messagePipeline.handle(event);
@@ -273,7 +274,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends
BaseStageTest {
instanceStateMap.put(thirdMaster, "MASTER");
_bestpossibleState.setState(_db, _partition, instanceStateMap);
- event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(),
_bestpossibleState);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
_bestpossibleState);
_messagePipeline.handle(event);
@@ -290,7 +291,7 @@ public class TestP2PMessagesAvoidDuplicatedMessage extends
BaseStageTest {
currentStateOutput.setPendingMessage(_db, _partition, secondMaster,
relayMessage);
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
- event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(),
_bestpossibleState);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
_bestpossibleState);
_messagePipeline.handle(event);
diff --git
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
index bf53682..cec0d2b 100644
---
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
+++
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PStateTransitionMessages.java
@@ -199,9 +199,9 @@ public class TestP2PStateTransitionMessages extends
BaseStageTest {
event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
pipeline = new Pipeline("test");
- pipeline.addStage(new IntermediateStateCalcStage());
pipeline.addStage(new ResourceMessageGenerationPhase());
pipeline.addStage(new MessageSelectionStage());
+ pipeline.addStage(new IntermediateStateCalcStage());
pipeline.addStage(new MessageThrottleStage());
pipeline.handle(event);
@@ -223,9 +223,9 @@ public class TestP2PStateTransitionMessages extends
BaseStageTest {
pipeline = new Pipeline("test");
- pipeline.addStage(new IntermediateStateCalcStage());
pipeline.addStage(new ResourceMessageGenerationPhase());
pipeline.addStage(new MessageSelectionStage());
+ pipeline.addStage(new IntermediateStateCalcStage());
pipeline.addStage(new MessageThrottleStage());
pipeline.handle(event);
@@ -359,9 +359,9 @@ public class TestP2PStateTransitionMessages extends
BaseStageTest {
Pipeline pipeline = new Pipeline("test");
pipeline.addStage(new ReadClusterDataStage());
pipeline.addStage(new BestPossibleStateCalcStage());
- pipeline.addStage(new IntermediateStateCalcStage());
pipeline.addStage(new ResourceMessageGenerationPhase());
pipeline.addStage(new MessageSelectionStage());
+ pipeline.addStage(new IntermediateStateCalcStage());
pipeline.addStage(new MessageThrottleStage());
return pipeline;
diff --git
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
index a8191cb..f685bf4 100644
---
a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
+++
b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PWithStateCancellationMessage.java
@@ -31,6 +31,7 @@ import org.apache.helix.HelixManager;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BaseStageTest;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -166,11 +167,11 @@ public class TestP2PWithStateCancellationMessage extends
BaseStageTest {
currentStateOutput.setCurrentState(RESOURCE_NAME, new Partition("1"),
"localhost_2", "MASTER");
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
- IntermediateStateOutput intermediateStateOutput = new
IntermediateStateOutput();
- intermediateStateOutput.setState(RESOURCE_NAME, new Partition("0"),
"localhost_1", "SLAVE");
- intermediateStateOutput.setState(RESOURCE_NAME, new Partition("0"),
"localhost_2", "MASTER");
- intermediateStateOutput.setState(RESOURCE_NAME, new Partition("1"),
"localhost_2", "MASTER");
- event.addAttribute(AttributeName.INTERMEDIATE_STATE.name(),
intermediateStateOutput);
+ BestPossibleStateOutput bestPossibleStateOutput = new
BestPossibleStateOutput();
+ bestPossibleStateOutput.setState(RESOURCE_NAME, new Partition("0"),
"localhost_1", "SLAVE");
+ bestPossibleStateOutput.setState(RESOURCE_NAME, new Partition("0"),
"localhost_2", "MASTER");
+ bestPossibleStateOutput.setState(RESOURCE_NAME, new Partition("1"),
"localhost_2", "MASTER");
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleStateOutput);
return event;
}
diff --git
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
index 43bc8b1..ccc1431 100644
---
a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
+++
b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRebalancerMetrics.java
@@ -13,7 +13,9 @@ import
org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageSelectionStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
+import
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
@@ -77,14 +79,17 @@ public class TestRebalancerMetrics extends BaseStageTest {
setupThrottleConfig(cache.getClusterConfig(),
StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE,
maxPending);
runStage(event, new BestPossibleStateCalcStage());
+ runStage(event, new ResourceMessageGenerationPhase());
+ runStage(event, new MessageSelectionStage());
runStage(event, new IntermediateStateCalcStage());
ClusterStatusMonitor clusterStatusMonitor =
event.getAttribute(AttributeName.clusterStatusMonitor.name());
ResourceMonitor resourceMonitor =
clusterStatusMonitor.getResourceMonitor(resource);
-
Assert.assertEquals(resourceMonitor.getPendingRecoveryRebalancePartitionGauge(),
numPartition);
-
Assert.assertEquals(resourceMonitor.getRecoveryRebalanceThrottledPartitionGauge(),
- numPartition - maxPending);
+
Assert.assertEquals(resourceMonitor.getNumPendingRecoveryRebalanceReplicas(),
+ numPartition * numReplica -
resourceMonitor.getNumPendingLoadRebalanceReplicas());
+
Assert.assertEquals(resourceMonitor.getNumRecoveryRebalanceThrottledReplicas(),
+ numPartition * numReplica -
resourceMonitor.getNumPendingLoadRebalanceReplicas() - maxPending);
System.out
.println("END testRecoveryRebalanceMetrics at " + new
Date(System.currentTimeMillis()));
@@ -135,15 +140,16 @@ public class TestRebalancerMetrics extends BaseStageTest {
setupThrottleConfig(cache.getClusterConfig(),
StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE, maxPending);
runStage(event, new BestPossibleStateCalcStage());
+ runStage(event, new ResourceMessageGenerationPhase());
+ runStage(event, new MessageSelectionStage());
runStage(event, new IntermediateStateCalcStage());
ClusterStatusMonitor clusterStatusMonitor =
event.getAttribute(AttributeName.clusterStatusMonitor.name());
ResourceMonitor resourceMonitor =
clusterStatusMonitor.getResourceMonitor(resource);
- long numPendingLoadBalance =
resourceMonitor.getPendingLoadRebalancePartitionGauge();
+ long numPendingLoadBalance =
resourceMonitor.getNumPendingLoadRebalanceReplicas();
Assert.assertTrue(numPendingLoadBalance > 0);
-
Assert.assertEquals(resourceMonitor.getLoadRebalanceThrottledPartitionGauge(),
- numPendingLoadBalance - maxPending);
+
Assert.assertEquals(resourceMonitor.getNumLoadRebalanceThrottledReplicas(),
numPendingLoadBalance - maxPending);
System.out
.println("END testLoadBalanceMetrics at " + new
Date(System.currentTimeMillis()));
diff --git a/helix-core/src/test/resources/TestPartitionLevelPriority.json
b/helix-core/src/test/resources/TestPartitionLevelPriority.json
index 125277d..5e42d2d 100644
--- a/helix-core/src/test/resources/TestPartitionLevelPriority.json
+++ b/helix-core/src/test/resources/TestPartitionLevelPriority.json
@@ -143,8 +143,8 @@
"Partition_0",
"Partition_3",
"Partition_2",
- "Partition_1",
"Partition_4",
+ "Partition_1",
"Partition_5"
]
}
diff --git
a/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json
b/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json
deleted file mode 100644
index 077f129..0000000
--- a/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json
+++ /dev/null
@@ -1,367 +0,0 @@
-[
- {
- "statemodel": "MasterSlave",
- "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
- "inputs": {
- "resource_0_0": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE",
- "localhost_3": "SLAVE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE",
- "localhost_3": "DROPPED"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE",
- "localhost_3": "DROPPED"
- }
- },
- "resource_0_1": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- }
- }
- }
- },
- {
- "statemodel": "MasterSlave",
- "errorOrRecoveryPartitionThresholdForLoadBalance": 1,
- "inputs": {
- "resource_0_0": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "ERROR",
- "localhost_2": "SLAVE",
- "localhost_3": "SLAVE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "ERROR",
- "localhost_2": "SLAVE",
- "localhost_3": "DROPPED"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "ERROR",
- "localhost_2": "SLAVE",
- "localhost_3": "DROPPED"
- }
- },
- "resource_0_1": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- }
- }
- }
- },
- {
- "statemodel": "MasterSlave",
- "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
- "inputs": {
- "resource_0_0": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "ERROR",
- "localhost_2": "SLAVE",
- "localhost_3": "SLAVE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "ERROR",
- "localhost_2": "SLAVE",
- "localhost_3": "DROPPED"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "ERROR",
- "localhost_2": "SLAVE",
- "localhost_3": "DROPPED"
- }
- },
- "resource_0_1": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- }
- }
- }
- },
- {
- "statemodel": "MasterSlave",
- "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
- "inputs": {
- "resource_0_0": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "OFFLINE",
- "localhost_2": "OFFLINE",
- "localhost_3": "SLAVE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE",
- "localhost_3": "DROPPED"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE",
- "localhost_3": "DROPPED"
- }
- },
- "resource_0_1": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- }
- }
- }
- },
- {
- "statemodel": "MasterSlave",
- "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
- "inputs": {
- "resource_0_0": {
- "currentStates": {
- "localhost_0": "SLAVE",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- }
- },
- "resource_0_1": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- }
- }
- }
- },
- {
- "statemodel": "MasterSlave",
- "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
- "inputs": {
- "resource_0_0": {
- "currentStates": {
- "localhost_0": "SLAVE",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- }
- },
- "resource_0_1": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE",
- "localhost_3": "OFFLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE",
- "localhost_3": "SLAVE"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE",
- "localhost_3": "OFFLINE"
- }
- }
- }
- },
- {
- "statemodel": "MasterSlave",
- "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
- "loadBalanceThrottle": "0",
- "minActiveReplica": "2",
- "inputs": {
- "resource_0_0": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "OFFLINE",
- "localhost_3": "OFFLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE",
- "localhost_3": "DROPPED"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "OFFLINE",
- "localhost_3": "OFFLINE"
- }
- },
- "resource_0_1": {
- "currentStates": {
- "localhost_0": "SLAVE",
- "localhost_1": "SLAVE",
- "localhost_2": "OFFLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- }
- }
- }
- },
- {
- "statemodel": "MasterSlave",
- "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
- "loadBalanceThrottle": "2",
- "minActiveReplica": "2",
- "inputs": {
- "resource_0_0": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE",
- "localhost_3": "OFFLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "OFFLINE",
- "localhost_3": "DROPPED"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "OFFLINE",
- "localhost_3": "DROPPED"
- }
- },
- "resource_0_1": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "OFFLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE"
- }
- },
- "resource_0_2": {
- "currentStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "OFFLINE",
- "localhost_3": "OFFLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "SLAVE",
- "localhost_3": "SLAVE"
- },
- "expectedStates": {
- "localhost_0": "MASTER",
- "localhost_1": "SLAVE",
- "localhost_2": "OFFLINE",
- "localhost_3": "OFFLINE"
- }
- }
- }
- }
-]
diff --git
a/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json
b/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json
deleted file mode 100644
index 389d90e..0000000
--- a/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json
+++ /dev/null
@@ -1,206 +0,0 @@
-[
- {
- "statemodel": "OnlineOffline",
- "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
- "inputs": {
- "resource_0_0": {
- "currentStates": {
- "localhost_0": "OFFLINE",
- "localhost_1": "OFFLINE",
- "localhost_2": "OFFLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- },
- "expectedStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- }
- },
- "resource_0_1": {
- "currentStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE",
- "localhost_3": "ONLINE"
- },
- "expectedStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- }
- }
- }
- },
- {
- "statemodel": "OnlineOffline",
- "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
- "inputs": {
- "resource_0_0": {
- "currentStates": {
- "localhost_0": "OFFLINE",
- "localhost_1": "OFFLINE",
- "localhost_2": "OFFLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- },
- "expectedStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- }
- },
- "resource_0_1": {
- "currentStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE",
- "localhost_3": "ONLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE",
- "localhost_3": "DROPPED"
- },
- "expectedStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE",
- "localhost_3": "DROPPED"
- }
- }
- }
- },
- {
- "statemodel": "OnlineOffline",
- "errorOrRecoveryPartitionThresholdForLoadBalance": 100,
- "inputs": {
- "resource_0_0": {
- "currentStates": {
- "localhost_0": "OFFLINE",
- "localhost_1": "OFFLINE",
- "localhost_2": "OFFLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- },
- "expectedStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- }
- },
- "resource_0_1": {
- "currentStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE",
- "localhost_3": "ONLINE"
- },
- "expectedStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE",
- "localhost_3": "ONLINE"
- }
- }
- }
- },
- {
- "statemodel": "OnlineOffline",
- "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
- "inputs": {
- "resource_0_0": {
- "currentStates": {
- "localhost_0": "OFFLINE",
- "localhost_1": "OFFLINE",
- "localhost_2": "OFFLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- },
- "expectedStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- }
- },
- "resource_0_1": {
- "currentStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "OFFLINE",
- "localhost_2": "ONLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "DROPPED",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- },
- "expectedStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "OFFLINE",
- "localhost_2": "ONLINE"
- }
- }
- }
- },
- {
- "statemodel": "OnlineOffline",
- "errorOrRecoveryPartitionThresholdForLoadBalance": 0,
- "inputs": {
- "resource_0_0": {
- "currentStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "DROPPED",
- "localhost_1": "ONLINE",
- "localhost_2": "OFFLINE"
- },
- "expectedStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE"
- }
- },
- "resource_0_1": {
- "currentStates": {
- "localhost_0": "OFFLINE",
- "localhost_1": "OFFLINE",
- "localhost_2": "OFFLINE"
- },
- "bestPossibleStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- },
- "expectedStates": {
- "localhost_0": "ONLINE",
- "localhost_1": "ONLINE",
- "localhost_2": "ONLINE"
- }
- }
- }
- }
-]
\ No newline at end of file