Repository: helix
Updated Branches:
  refs/heads/master c97a97508 -> 37f3d4c8d


[HELIX-711]  Allow downward state transition during recovery and add recovery 
threshold

Previously, a single partition requiring recovery balance would block all types 
of load-balance. This commit allows all downward state transitions (load 
balance) to happen even when recovery balance is happening in the same cycle.
As for non-downward state transitions load-balance, a parameter, 
ErrorOrRecoveryPartitionThresholdForLoadBalance, was added to ClusterConfig. If 
the number of partitions requiring recovery is lower than the threshold, 
non-downward load-balance will take place in the same cycle as recovery 
balance; otherwise, non-downward load-balance will not take place in the same 
cycle.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/37f3d4c8
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/37f3d4c8
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/37f3d4c8

Branch: refs/heads/master
Commit: 37f3d4c8dadd7cadeebad4fb41e2d4b1c38601fa
Parents: c97a975
Author: Hunter Lee <[email protected]>
Authored: Thu Jun 28 18:06:04 2018 -0700
Committer: Hunter Lee <[email protected]>
Committed: Thu Jun 28 18:06:04 2018 -0700

----------------------------------------------------------------------
 .../config/StateTransitionThrottleConfig.java   |   4 +-
 .../stages/IntermediateStateCalcStage.java      |  94 +++++--
 .../org/apache/helix/model/ClusterConfig.java   | 105 +++++---
 .../stages/TestIntermediateStateCalcStage.java  | 159 ++++++++++--
 .../stages/TestRecoveryLoadBalance.java         | 199 ++++++++++++++
 .../TestRecoveryLoadBalance.MasterSlave.json    | 257 +++++++++++++++++++
 .../TestRecoveryLoadBalance.OnlineOffline.json  | 206 +++++++++++++++
 7 files changed, 952 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
 
b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
index 37662aa..548ae31 100644
--- 
a/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
+++ 
b/helix-core/src/main/java/org/apache/helix/api/config/StateTransitionThrottleConfig.java
@@ -48,9 +48,9 @@ public class StateTransitionThrottleConfig {
   }
 
   public enum RebalanceType {
-    LOAD_BALANCE,
+    LOAD_BALANCE, // A rebalance type for load balance excluding dropping a 
replica
     RECOVERY_BALANCE,
-    ANY, // A type used for general throttling (to account for all types of 
rebalance)
+    ANY, // A rebalance type used for general throttling (to account for all 
types of rebalance)
     NONE
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
index ba5a29a..e70e420 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/IntermediateStateCalcStage.java
@@ -53,8 +53,8 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     if (currentStateOutput == null || bestPossibleStateOutput == null || 
resourceMap == null
         || cache == null) {
       throw new StageException(String.format("Missing attributes in event: %s. 
"
-          + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) |RESOURCES 
(%s) |DataCache (%s)",
-          event, currentStateOutput, bestPossibleStateOutput, resourceMap, 
cache));
+              + "Requires CURRENT_STATE (%s) |BEST_POSSIBLE_STATE (%s) 
|RESOURCES (%s) |DataCache (%s)", event,
+          currentStateOutput, bestPossibleStateOutput, resourceMap, cache));
     }
 
     IntermediateStateOutput intermediateStateOutput =
@@ -273,9 +273,9 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
           partitionsNeedRecovery.add(partition);
           isRebalanceNeeded = true;
         }
+      } else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) {
         // Number of states required by StateModelDefinition are satisfied, 
but to achieve
         // BestPossibleState, need load balance
-      } else if (rebalanceType.equals(RebalanceType.LOAD_BALANCE)) {
         partitionsNeedLoadBalance.add(partition);
         isRebalanceNeeded = true;
       }
@@ -308,28 +308,48 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
             intermediatePartitionStateMap, partitionsNeedRecovery, 
currentStateOutput,
             
cache.getStateModelDef(resource.getStateModelDefRef()).getTopState());
 
-    // Perform load balance
+    // Perform load balance upon checking conditions below
     Set<Partition> loadbalanceThrottledPartitions = partitionsNeedLoadBalance;
+    ClusterConfig clusterConfig = cache.getClusterConfig();
+
+    // If the threshold (ErrorOrRecovery) is set, then use it, if not, then 
check if the old
+    // threshold (Error) is set. If the old threshold is set, use it. If not, 
use the default value
+    // for the new one. This is for backward-compatibility
+    int threshold = 1; // Default threshold for 
ErrorOrRecoveryPartitionThresholdForLoadBalance
+    int partitionCount = partitionsWithErrorStateReplica.size();
+    if (clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance() != 
-1) {
+      // ErrorOrRecovery is set
+      threshold = 
clusterConfig.getErrorOrRecoveryPartitionThresholdForLoadBalance();
+      partitionCount += partitionsNeedRecovery.size(); // Only add this count 
when the threshold is set
+    } else {
+      if (clusterConfig.getErrorPartitionThresholdForLoadBalance() != 0) {
+        // 0 is the default value so the old threshold has been set
+        threshold = clusterConfig.getErrorPartitionThresholdForLoadBalance();
+      }
+    }
 
-    long maxAllowedErrorPartitions =
-        cache.getClusterConfig().getErrorPartitionThresholdForLoadBalance();
-
-    // TODO: Logic here needs change - when there is an error partition, load 
should still happen
-
-    // Perform load balance only if
-    // 1. no recovery operation to be scheduled
-    // 2. error partition count is less than configured limitation
-    if (partitionsNeedRecovery.isEmpty() && (maxAllowedErrorPartitions < 0
-        || partitionsWithErrorStateReplica.size() <= 
maxAllowedErrorPartitions)) {
+    // Perform load balance only if the number of partitions in recovery and 
in error is less than
+    // the threshold
+    if (partitionCount < threshold) {
       loadbalanceThrottledPartitions = loadRebalance(resource, 
currentStateOutput,
           bestPossiblePartitionStateMap, throttleController, 
intermediatePartitionStateMap,
           partitionsNeedLoadBalance, 
currentStateOutput.getCurrentStateMap(resourceName));
     } else {
-      // Skip load balance and current states just become intermediate states
+      // Only allow dropping of replicas to happen (dropping does NOT need to 
be throttled) and skip
+      // load balance for this cycle
       for (Partition partition : partitionsNeedLoadBalance) {
         Map<String, String> currentStateMap =
             currentStateOutput.getCurrentStateMap(resourceName, partition);
+        Map<String, String> bestPossibleMap =
+            bestPossiblePartitionStateMap.getPartitionMap(partition);
+        // Skip load balance by passing current state to intermediate state
         intermediatePartitionStateMap.setState(partition, currentStateMap);
+
+        // Check if this partition only has downward state transitions; if so, 
allow state
+        // transitions by setting it at bestPossibleState
+        if (isLoadBalanceDownwardForAllReplicas(currentStateMap, 
bestPossibleMap, stateModelDef)) {
+          intermediatePartitionStateMap.setState(partition, bestPossibleMap);
+        }
       }
     }
 
@@ -351,6 +371,38 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
   }
 
   /**
+   * Check for a partition, whether all transitions for its replicas are 
downward transitions. Note
+   * that this function does NOT check for ERROR states.
+   * @param currentStateMap
+   * @param bestPossibleMap
+   * @param stateModelDef
+   * @return true if there are; false otherwise
+   */
+  private boolean isLoadBalanceDownwardForAllReplicas(Map<String, String> 
currentStateMap,
+      Map<String, String> bestPossibleMap, StateModelDefinition stateModelDef) 
{
+    Set<String> allInstances = new HashSet<>();
+    allInstances.addAll(currentStateMap.keySet());
+    allInstances.addAll(bestPossibleMap.keySet());
+    Map<String, Integer> statePriorityMap = 
stateModelDef.getStatePriorityMap();
+
+    for (String instance : allInstances) {
+      String currentState = currentStateMap.get(instance);
+      String bestPossibleState = bestPossibleMap.get(instance);
+      if (currentState == null) {
+        return false; // null -> state is upward
+      }
+      if (bestPossibleState != null) {
+        // Compare priority values and return if an upward transition is found
+        // Note that lower integer value implies higher priority
+        if (statePriorityMap.get(currentState) > 
statePriorityMap.get(bestPossibleState)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
    * Check and charge all pending transitions for throttling.
    */
   private void chargePendingTransition(Resource resource, CurrentStateOutput 
currentStateOutput,
@@ -437,7 +489,8 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
           intermediatePartitionStateMap, RebalanceType.RECOVERY_BALANCE);
     }
     logger.info(String.format(
-        "For resource %s: Num of partitions needing recovery: %d, Num of 
partitions needing recovery but throttled (not recovered): %d",
+        "For resource %s: Num of partitions needing recovery: %d, Num of 
partitions needing recovery"
+            + " but throttled (not recovered): %d",
         resourceName, partitionsNeedRecovery.size(), 
partitionRecoveryBalanceThrottled.size()));
     return partitionRecoveryBalanceThrottled;
   }
@@ -453,14 +506,14 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
    * @param throttleController
    * @param intermediatePartitionStateMap
    * @param partitionsNeedLoadbalance
-   * @param currentStateMaps
+   * @param currentStateMap
    * @return a set of partitions that need to be load-balanced but did not due 
to throttling
    */
   private Set<Partition> loadRebalance(Resource resource, CurrentStateOutput 
currentStateOutput,
       PartitionStateMap bestPossiblePartitionStateMap,
       StateTransitionThrottleController throttleController,
       PartitionStateMap intermediatePartitionStateMap, Set<Partition> 
partitionsNeedLoadbalance,
-      Map<Partition, Map<String, String>> currentStateMaps) {
+      Map<Partition, Map<String, String>> currentStateMap) {
     String resourceName = resource.getResourceName();
     Set<Partition> partitionsLoadbalanceThrottled = new HashSet<>();
 
@@ -478,7 +531,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
       }
     });
     Collections.sort(partitionsNeedLoadRebalancePrioritized, new 
PartitionPriorityComparator(
-        bestPossiblePartitionStateMap.getStateMap(), currentStateMaps, "", 
false));
+        bestPossiblePartitionStateMap.getStateMap(), currentStateMap, "", 
false));
     for (Partition partition : partitionsNeedLoadRebalancePrioritized) {
       throttleStateTransitionsForPartition(throttleController, resourceName, 
partition,
           currentStateOutput, bestPossiblePartitionStateMap, 
partitionsLoadbalanceThrottled,
@@ -592,7 +645,6 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
         stateModelDef.getStateCountMap(activeList.size(), replica); // 
StateModelDefinition's counts
     Map<String, Integer> currentStateCounts = 
StateModelDefinition.getStateCounts(currentStateMap); // Current
     // counts
-
     // Go through each state and compare counts
     for (String state : expectedStateCountMap.keySet()) {
       Integer expectedCount = expectedStateCountMap.get(state);
@@ -643,7 +695,7 @@ public class IntermediateStateCalcStage extends 
AbstractBaseStage {
     if (logger.isDebugEnabled()) {
       logger.debug("Partitions need recovery: {}\nPartitions get throttled on 
recovery: {}",
           recoveryPartitions, recoveryThrottledPartitions);
-      logger.debug("Partitions need loadbalance: {}\nPartitions get throttled 
on load-balance: ",
+      logger.debug("Partitions need loadbalance: {}\nPartitions get throttled 
on load-balance: {}",
           loadbalancePartitions, loadbalanceThrottledPartitions);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index ce60888..704ff5d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -38,20 +38,22 @@ import 
org.apache.helix.api.config.StateTransitionTimeoutConfig;
 public class ClusterConfig extends HelixProperty {
   /**
    * Configurable characteristics of a cluster.
-   *
-   * NOTE: Do NOT use this field name directly, use its corresponding 
getter/setter in the ClusterConfig.
+   * NOTE: Do NOT use this field name directly, use its corresponding 
getter/setter in the
+   * ClusterConfig.
    */
   public enum ClusterConfigProperty {
     HELIX_DISABLE_PIPELINE_TRIGGERS,
     PERSIST_BEST_POSSIBLE_ASSIGNMENT,
     PERSIST_INTERMEDIATE_ASSIGNMENT,
-    TOPOLOGY,  // cluster topology definition, for example, 
"/zone/rack/host/instance"
-    FAULT_ZONE_TYPE, // the type in which isolation should be applied on when 
Helix places the replicas from same partition.
+    TOPOLOGY, // cluster topology definition, for example, 
"/zone/rack/host/instance"
+    FAULT_ZONE_TYPE, // the type in which isolation should be applied on when 
Helix places the
+    // replicas from same partition.
     TOPOLOGY_AWARE_ENABLED, // whether topology aware rebalance is enabled.
     @Deprecated
-    DELAY_REBALANCE_DISABLED,  // disabled the delayed rebalaning in case node 
goes offline.
-    DELAY_REBALANCE_ENABLED,  // whether the delayed rebalaning is enabled.
-    DELAY_REBALANCE_TIME,    // delayed time in ms that the delay time Helix 
should hold until rebalancing.
+    DELAY_REBALANCE_DISABLED, // disabled the delayed rebalaning in case node 
goes offline.
+    DELAY_REBALANCE_ENABLED, // whether the delayed rebalaning is enabled.
+    DELAY_REBALANCE_TIME, // delayed time in ms that the delay time Helix 
should hold until
+    // rebalancing.
     STATE_TRANSITION_THROTTLE_CONFIGS,
     STATE_TRANSITION_CANCELLATION_ENABLED,
     MISS_TOP_STATE_DURATION_THRESHOLD,
@@ -61,16 +63,34 @@ public class ClusterConfig extends HelixProperty {
     MAX_PARTITIONS_PER_INSTANCE,
     MAX_OFFLINE_INSTANCES_ALLOWED,
     TARGET_EXTERNALVIEW_ENABLED,
-    ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute 
load balance state transition if the number of partitons that need recovery 
exceeds this limitation
-    DISABLED_INSTANCES
+    @Deprecated // ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE will 
take
+        // precedence if it is set
+        ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't 
execute load balance state
+    // transition if the number of partitons that need
+    // recovery exceeds this limitation
+    ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller 
won't execute load balance
+    // state transition if the number of
+    // partitons that need recovery or in
+    // error exceeds this limitation
+    DISABLED_INSTANCES,
+    VIEW_CLUSTER, // Set to "true" to indicate this is a view cluster
+    VIEW_CLUSTER_SOURCES, // Map field, key is the name of source cluster, 
value is
+    // ViewClusterSourceConfig JSON string
+    VIEW_CLUSTER_REFRESH_PERIOD, // In second
   }
+
   private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40;
-  private final static int DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE 
= 0; // By default, no load balance if any error partition
-  private final static String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!";
+  // By default, no load balance if any error partition
+  @Deprecated
+  private final static int DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE 
= 0;
+  // By default, no load balance if any error or recovery partition. -1 
implies that the threshold
+  // is not set and will be given a default value of 1
+  private final static int 
DEFAULT_ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE = -1;
+  private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!";
+  private final static int DEFAULT_VIEW_CLUSTER_REFRESH_PERIOD = 30;
 
   /**
    * Instantiate for a specific cluster
-   *
    * @param cluster the cluster identifier
    */
   public ClusterConfig(String cluster) {
@@ -79,7 +99,6 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Instantiate with a pre-populated record
-   *
    * @param record a ZNRecord corresponding to a cluster configuration
    */
   public ClusterConfig(ZNRecord record) {
@@ -459,23 +478,52 @@ public class ClusterConfig extends HelixProperty {
 
   /**
    * Get maximum allowed error partitions for a resource to be load balanced.
-   * If limitation is set to negative number, Helix won't check error 
partition count before schedule load balance.
+   * If limitation is set to negative number, Helix won't check error 
partition count before
+   * schedule load balance.
    * @return the maximum allowed error partition count
    */
   public int getErrorPartitionThresholdForLoadBalance() {
-    return 
_record.getIntField(ClusterConfigProperty.ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(),
+    return _record.getIntField(
+        
ClusterConfigProperty.ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(),
         DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE);
   }
 
   /**
    * Set maximum allowed error partitions for a resource to be load balanced.
-   * If limitation is set to negative number, Helix won't check error 
partition count before schedule load balance.
+   * If limitation is set to negative number, Helix won't check error 
partition count before
+   * schedule load balance.
    * @param errorPartitionThreshold the maximum allowed error partition count
    */
   public void setErrorPartitionThresholdForLoadBalance(int 
errorPartitionThreshold) {
     
_record.setIntField(ClusterConfigProperty.ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(),
         errorPartitionThreshold);
   }
+
+  /**
+   * Get the threshold for the number of partitions needing recovery or in 
error. Default value is set at
+   * Integer.MAX_VALUE to allow recovery rebalance and load rebalance to 
happen in the same pipeline
+   * cycle. If the number of partitions needing recovery is greater than this 
threshold, recovery
+   * balance will take precedence and load balance will not happen during this 
cycle.
+   * @return the threshold
+   */
+  public int getErrorOrRecoveryPartitionThresholdForLoadBalance() {
+    return _record.getIntField(
+        
ClusterConfigProperty.ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(),
+        DEFAULT_ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE);
+  }
+
+  /**
+   * Set the threshold for the number of partitions needing recovery or in 
error. Default value is set at
+   * Integer.MAX_VALUE to allow recovery rebalance and load rebalance to 
happen in the same pipeline
+   * cycle. If the number of partitions needing recovery is greater than this 
threshold, recovery
+   * balance will take precedence and load balance will not happen during this 
cycle.
+   * @param recoveryPartitionThreshold
+   */
+  public void setErrorOrRecoveryPartitionThresholdForLoadBalance(int 
recoveryPartitionThreshold) {
+    
_record.setIntField(ClusterConfigProperty.ERROR_OR_RECOVERY_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(),
+        recoveryPartitionThreshold);
+  }
+
   /**
    * Set the disabled instance list
    * @param disabledInstances
@@ -522,20 +570,20 @@ public class ClusterConfig extends HelixProperty {
   public Map<String, Map<String, String>> getIdealStateRules() {
     Map<String, Map<String, String>> idealStateRuleMap = new HashMap<>();
 
-      for (String simpleKey : getRecord().getSimpleFields().keySet()) {
-        if (simpleKey.startsWith(IDEAL_STATE_RULE_PREFIX)) {
-          String simpleValue = getRecord().getSimpleField(simpleKey);
-          String[] rules = simpleValue.split("(?<!\\\\),");
-          Map<String, String> singleRule = Maps.newHashMap();
-          for (String rule : rules) {
-            String[] keyValue = rule.split("(?<!\\\\)=");
-            if (keyValue.length >= 2) {
-              singleRule.put(keyValue[0], keyValue[1]);
-            }
+    for (String simpleKey : getRecord().getSimpleFields().keySet()) {
+      if (simpleKey.startsWith(IDEAL_STATE_RULE_PREFIX)) {
+        String simpleValue = getRecord().getSimpleField(simpleKey);
+        String[] rules = simpleValue.split("(?<!\\\\),");
+        Map<String, String> singleRule = Maps.newHashMap();
+        for (String rule : rules) {
+          String[] keyValue = rule.split("(?<!\\\\)=");
+          if (keyValue.length >= 2) {
+            singleRule.put(keyValue[0], keyValue[1]);
           }
-          idealStateRuleMap.put(simpleKey, singleRule);
         }
+        idealStateRuleMap.put(simpleKey, singleRule);
       }
+    }
     return idealStateRuleMap;
   }
 
@@ -552,5 +600,4 @@ public class ClusterConfig extends HelixProperty {
   public String getClusterName() {
     return _record.getId();
   }
-}
-
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
index 816b374..6b8795f 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestIntermediateStateCalcStage.java
@@ -29,8 +29,10 @@ import org.testng.annotations.Test;
 import java.util.*;
 
 public class TestIntermediateStateCalcStage extends BaseStageTest {
+  private ClusterConfig _clusterConfig;
 
-  @Test public void testNoStateMissing() {
+  @Test
+  public void testNoStateMissing() {
     String resourcePrefix = "resource";
     int nResource = 4;
     int nPartition = 2;
@@ -43,12 +45,12 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
 
     preSetup(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, 
resourceSet, nReplica,
         nReplica);
-    event.addAttribute(AttributeName.RESOURCES.name(),
-        getResourceMap(resourceSet.toArray(new String[resourceSet.size()]), 
nPartition,
-            "OnlineOffline"));
-    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
-        getResourceMap(resourceSet.toArray(new String[resourceSet.size()]), 
nPartition,
-            "OnlineOffline"));
+    event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(
+        resourceSet.toArray(new String[resourceSet.size()]), nPartition, 
"OnlineOffline"));
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), 
getResourceMap(
+        resourceSet.toArray(new String[resourceSet.size()]), nPartition, 
"OnlineOffline"));
+
+
 
     // Initialize bestpossible state and current state
     BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
@@ -56,11 +58,14 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
 
     IntermediateStateOutput expectedResult = new IntermediateStateOutput();
 
+    _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(1);
+    setClusterConfig(_clusterConfig);
+
     for (String resource : resourceSet) {
       IdealState is = 
accessor.getProperty(accessor.keyBuilder().idealStates(resource));
       setSingleIdealState(is);
 
-      Map<String, List<String>> partitionMap = new HashMap<String, 
List<String>>();
+      Map<String, List<String>> partitionMap = new HashMap<>();
       for (int p = 0; p < nPartition; p++) {
         Partition partition = new Partition(resource + "_" + p);
         for (int r = 0; r < nReplica; r++) {
@@ -101,11 +106,50 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
               }
             } else {
               currentStateOutput.setCurrentState(resource, partition, 
instanceName, "ONLINE");
-              currentStateOutput
-                  .setCurrentState(resource, partition, instanceName + "-1", 
"OFFLINE");
+              currentStateOutput.setCurrentState(resource, partition, 
instanceName + "-1",
+                  "OFFLINE");
               // load balance is throttled, so keep all current states
               expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
-              expectedResult.setState(resource, partition, instanceName + 
"-1", "OFFLINE");
+              // The following must be removed because now downward state 
transitions are allowed
+              // expectedResult.setState(resource, partition, instanceName + 
"-1", "OFFLINE");
+            }
+          } else if (resource.endsWith("4")) {
+            // Test that partitions with replicas to drop are dropping them 
when recovery is
+            // happening for other partitions
+            if (p == 0) {
+              // This partition requires recovery
+              currentStateOutput.setCurrentState(resource, partition, 
instanceName, "OFFLINE");
+              bestPossibleStateOutput.setState(resource, partition, 
instanceName, "ONLINE");
+              // After recovery, it should be back ONLINE
+              expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
+            } else {
+              // Other partitions require dropping of replicas
+              currentStateOutput.setCurrentState(resource, partition, 
instanceName, "ONLINE");
+              currentStateOutput.setCurrentState(resource, partition, 
instanceName + "-1",
+                  "OFFLINE");
+              // BestPossibleState dictates that we only need one ONLINE 
replica
+              bestPossibleStateOutput.setState(resource, partition, 
instanceName, "ONLINE");
+              bestPossibleStateOutput.setState(resource, partition, 
instanceName + "-1", "DROPPED");
+              // So instanceName-1 will NOT be expected to show up in 
expectedResult
+              expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
+              expectedResult.setState(resource, partition, instanceName + 
"-1", "DROPPED");
+            }
+          } else if (resource.endsWith("5")) {
+            // Test that load balance bringing up a new replica does NOT 
happen with a recovery
+            // partition
+            if (p == 0) {
+              // Set up a partition requiring recovery
+              currentStateOutput.setCurrentState(resource, partition, 
instanceName, "OFFLINE");
+              bestPossibleStateOutput.setState(resource, partition, 
instanceName, "ONLINE");
+              // After recovery, it should be back ONLINE
+              expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
+            } else {
+              currentStateOutput.setCurrentState(resource, partition, 
instanceName, "ONLINE");
+              bestPossibleStateOutput.setState(resource, partition, 
instanceName, "ONLINE");
+              // Check that load balance (bringing up a new node) did not take 
place
+              bestPossibleStateOutput.setState(resource, partition, 
instanceName + "-1", "ONLINE");
+              expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
+
             }
           }
         }
@@ -116,17 +160,92 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
     event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
     event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
     runStage(event, new ReadClusterDataStage());
+    runStage(event, new IntermediateStateCalcStage());
+
+    IntermediateStateOutput output = 
event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+
+    for (String resource : resourceSet) {
+      // Note Assert.assertEquals won't work. If "actual" is an empty map, it 
won't compare
+      // anything.
+      Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap()
+          
.equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
+    }
+  }
+
+  @Test
+  public void testWithClusterConfigChange() {
+    String resourcePrefix = "resource";
+    int nResource = 1;
+    int nPartition = 2;
+    int nReplica = 3;
+
+    Set<String> resourceSet = new HashSet<>();
+    for (int i = 0; i < nResource; i++) {
+      resourceSet.add(resourcePrefix + "_" + i);
+    }
 
-    // Keep update the current state.
-    for (int i = 0; i < resourceSet.size(); i++) {
-      runStage(event, new IntermediateStateCalcStage());
+    preSetup(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, 
resourceSet,
+        nReplica, nReplica);
+    event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(
+        resourceSet.toArray(new String[resourceSet.size()]), nPartition, 
"OnlineOffline"));
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), 
getResourceMap(
+        resourceSet.toArray(new String[resourceSet.size()]), nPartition, 
"OnlineOffline"));
+
+    // Initialize best possible state and current state
+    BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    IntermediateStateOutput expectedResult = new IntermediateStateOutput();
+
+    for (String resource : resourceSet) {
+      IdealState is = 
accessor.getProperty(accessor.keyBuilder().idealStates(resource));
+      setSingleIdealState(is);
+
+      Map<String, List<String>> partitionMap = new HashMap<>();
+      for (int p = 0; p < nPartition; p++) {
+        Partition partition = new Partition(resource + "_" + p);
+        for (int r = 0; r < nReplica; r++) {
+          String instanceName = HOSTNAME_PREFIX + r;
+          partitionMap.put(partition.getPartitionName(), 
Collections.singletonList(instanceName));
+          if (resource.endsWith("0")) {
+            // Test that when the threshold is set at a number greater than 
the number of error and
+            // recovery partitions, load balance DOES take place
+            
_clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(Integer.MAX_VALUE);
+            setClusterConfig(_clusterConfig);
+            if (p == 0) {
+              // Set up a partition requiring recovery
+              currentStateOutput.setCurrentState(resource, partition, 
instanceName, "OFFLINE");
+              bestPossibleStateOutput.setState(resource, partition, 
instanceName, "ONLINE");
+              // After recovery, it should be back ONLINE
+              expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
+            } else {
+              // Ensure we have at least one ONLINE replica so that this 
partition does not need
+              // recovery
+              currentStateOutput.setCurrentState(resource, partition, 
instanceName, "ONLINE");
+              bestPossibleStateOutput.setState(resource, partition, 
instanceName, "ONLINE");
+              expectedResult.setState(resource, partition, instanceName, 
"ONLINE");
+
+              // This partition to bring up a replica (load balance will 
happen)
+              bestPossibleStateOutput.setState(resource, partition, 
instanceName + "-1", "ONLINE");
+              expectedResult.setState(resource, partition, instanceName + 
"-1", "ONLINE");
+            }
+          }
+        }
+      }
+      bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
     }
 
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, new IntermediateStateCalcStage());
+
     IntermediateStateOutput output = 
event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+
     for (String resource : resourceSet) {
-      // Note Assert.assertEquals won't work. If "actual" is an empty map, it 
won't compare anything.
-      
Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap().equals(
-          expectedResult.getPartitionStateMap(resource).getStateMap()));
+      // Note Assert.assertEquals won't work. If "actual" is an empty map, it 
won't compare
+      // anything.
+      Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap()
+          
.equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
     }
   }
 
@@ -138,10 +257,10 @@ public class TestIntermediateStateCalcStage extends 
BaseStageTest {
     setupLiveInstances(numOfLiveInstances);
 
     // Set up cluster configs
-    ClusterConfig clusterConfig = 
accessor.getProperty(accessor.keyBuilder().clusterConfig());
+    _clusterConfig = 
accessor.getProperty(accessor.keyBuilder().clusterConfig());
     StateTransitionThrottleConfig throttleConfig = new 
StateTransitionThrottleConfig(rebalanceType,
         StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 
Integer.MAX_VALUE);
-    
clusterConfig.setStateTransitionThrottleConfigs(Collections.singletonList(throttleConfig));
-    setClusterConfig(clusterConfig);
+    
_clusterConfig.setStateTransitionThrottleConfigs(Collections.singletonList(throttleConfig));
+    setClusterConfig(_clusterConfig);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java
new file mode 100644
index 0000000..a95b424
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRecoveryLoadBalance.java
@@ -0,0 +1,199 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class TestRecoveryLoadBalance extends BaseStageTest {
+
+  private final String INPUT = "inputs";
+  private final String CURRENT_STATE = "currentStates";
+  private final String BEST_POSSIBLE_STATE = "bestPossibleStates";
+  private final String EXPECTED_STATE = "expectedStates";
+  private final String ERROR_OR_RECOVERY_PARTITION_THRESHOLD =
+      "errorOrRecoveryPartitionThresholdForLoadBalance";
+  private final String STATE_MODEL = "statemodel";
+  private ClusterConfig _clusterConfig;
+
+  @Test(dataProvider = "recoveryLoadBalanceInput")
+  public void testRecoveryAndLoadBalance(String stateModelDef,
+      int errorOrRecoveryPartitionThresholdForLoadBalance,
+      Map<String, Map<String, Map<String, String>>> stateMapping) {
+    System.out.println("START TestRecoveryLoadBalance at " + new 
Date(System.currentTimeMillis()));
+
+    String resourcePrefix = "resource";
+    int nResource = 1;
+    int nPartition = 2;
+    int nReplica = 3;
+
+    Set<String> resourceSet = new HashSet<>();
+    for (int i = 0; i < nResource; i++) {
+      resourceSet.add(resourcePrefix + "_" + i);
+    }
+
+    preSetup(StateTransitionThrottleConfig.RebalanceType.RECOVERY_BALANCE, 
resourceSet, nReplica,
+        nReplica, stateModelDef);
+
+    _clusterConfig.setErrorOrRecoveryPartitionThresholdForLoadBalance(
+        errorOrRecoveryPartitionThresholdForLoadBalance);
+    setClusterConfig(_clusterConfig);
+
+    event.addAttribute(AttributeName.RESOURCES.name(), getResourceMap(
+        resourceSet.toArray(new String[resourceSet.size()]), nPartition, 
stateModelDef));
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(), 
getResourceMap(
+        resourceSet.toArray(new String[resourceSet.size()]), nPartition, 
stateModelDef));
+
+    // Initialize bestpossible state and current state
+    BestPossibleStateOutput bestPossibleStateOutput = new 
BestPossibleStateOutput();
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    IntermediateStateOutput expectedResult = new IntermediateStateOutput();
+
+    for (String resource : resourceSet) {
+      IdealState is = 
accessor.getProperty(accessor.keyBuilder().idealStates(resource));
+      setSingleIdealState(is);
+
+      Map<String, List<String>> partitionMap = new HashMap<>();
+      for (int p = 0; p < nPartition; p++) {
+        Partition partition = new Partition(resource + "_" + p);
+
+        // Set input
+        for (int r = 0; r < 
stateMapping.get(partition.toString()).get(CURRENT_STATE).size(); r++) {
+          String instanceName = HOSTNAME_PREFIX + r;
+          currentStateOutput.setCurrentState(resource, partition, instanceName,
+              
stateMapping.get(partition.toString()).get(CURRENT_STATE).get(instanceName));
+        }
+        for (int r = 0; r < 
stateMapping.get(partition.toString()).get(BEST_POSSIBLE_STATE)
+            .size(); r++) {
+          String instanceName = HOSTNAME_PREFIX + r;
+          bestPossibleStateOutput.setState(resource, partition, instanceName,
+              
stateMapping.get(partition.toString()).get(BEST_POSSIBLE_STATE).get(instanceName));
+        }
+        for (int r = 0; r < 
stateMapping.get(partition.toString()).get(EXPECTED_STATE)
+            .size(); r++) {
+          String instanceName = HOSTNAME_PREFIX + r;
+          expectedResult.setState(resource, partition, instanceName,
+              
stateMapping.get(partition.toString()).get(EXPECTED_STATE).get(instanceName));
+        }
+
+        // Set partitionMap
+        for (int r = 0; r < nReplica; r++) {
+          String instanceName = HOSTNAME_PREFIX + r;
+          partitionMap.put(partition.getPartitionName(), 
Collections.singletonList(instanceName));
+        }
+      }
+      bestPossibleStateOutput.setPreferenceLists(resource, partitionMap);
+    }
+
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    runStage(event, new ReadClusterDataStage());
+    runStage(event, new IntermediateStateCalcStage());
+
+    IntermediateStateOutput output = 
event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
+
+    for (String resource : resourceSet) {
+      // For debugging purposes
+      // Object map1 = output.getPartitionStateMap(resource).getStateMap();
+      // Object map2 = 
expectedResult.getPartitionStateMap(resource).getStateMap();
+
+      // Note Assert.assertEquals won't work. If "actual" is an empty map, it 
won't compare
+      // anything.
+      Assert.assertTrue(output.getPartitionStateMap(resource).getStateMap()
+          
.equals(expectedResult.getPartitionStateMap(resource).getStateMap()));
+    }
+
+    System.out.println("END TestRecoveryLoadBalance at " + new 
Date(System.currentTimeMillis()));
+  }
+
+  @DataProvider(name = "recoveryLoadBalanceInput")
+  public Object[][] rebalanceStrategies() {
+
+    try {
+      List<Object[]> data = new ArrayList<>();
+      // Add data
+      
data.addAll(loadTestInputs("TestRecoveryLoadBalance.OnlineOffline.json"));
+      data.addAll(loadTestInputs("TestRecoveryLoadBalance.MasterSlave.json"));
+
+      Object[][] ret = new Object[data.size()][];
+      for (int i = 0; i < data.size(); i++) {
+        ret[i] = data.get(i);
+      }
+      return ret;
+    } catch (Throwable e) {
+      return new Object[][] {
+          {}
+      };
+    }
+  }
+
+  public List<Object[]> loadTestInputs(String fileName) {
+    List<Object[]> ret = new ArrayList<>();
+    InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(fileName);
+    try {
+      ObjectReader mapReader = new ObjectMapper().reader(List.class);
+      List<Map<String, Object>> inputList = mapReader.readValue(inputStream);
+      for (Map<String, Object> inputMap : inputList) {
+        String stateModelName = (String) inputMap.get(STATE_MODEL);
+        int threshold = (int) 
inputMap.get(ERROR_OR_RECOVERY_PARTITION_THRESHOLD);
+        Map<String, Map<String, Map<String, String>>> stateMapping =
+            (Map<String, Map<String, Map<String, String>>>) 
inputMap.get(INPUT);
+        ret.add(new Object[] {
+            stateModelName, threshold, stateMapping
+        });
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return ret;
+  }
+
+  private void preSetup(StateTransitionThrottleConfig.RebalanceType 
rebalanceType,
+      Set<String> resourceSet, int numOfLiveInstances, int numOfReplicas, 
String stateModelName) {
+    setupIdealState(numOfLiveInstances, resourceSet.toArray(new 
String[resourceSet.size()]),
+        numOfLiveInstances, numOfReplicas, IdealState.RebalanceMode.FULL_AUTO, 
stateModelName);
+    setupStateModel();
+    setupLiveInstances(numOfLiveInstances);
+
+    // Set up cluster configs
+    _clusterConfig = 
accessor.getProperty(accessor.keyBuilder().clusterConfig());
+    StateTransitionThrottleConfig throttleConfig = new 
StateTransitionThrottleConfig(rebalanceType,
+        StateTransitionThrottleConfig.ThrottleScope.CLUSTER, 
Integer.MAX_VALUE);
+    
_clusterConfig.setStateTransitionThrottleConfigs(Collections.singletonList(throttleConfig));
+    setClusterConfig(_clusterConfig);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json 
b/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json
new file mode 100644
index 0000000..0e79723
--- /dev/null
+++ b/helix-core/src/test/resources/TestRecoveryLoadBalance.MasterSlave.json
@@ -0,0 +1,257 @@
+[
+  {
+    "statemodel": "MasterSlave",
+    "errorOrRecoveryPartitionThresholdForLoadBalance": 1,
+    "inputs": {
+      "resource_0_0": {
+        "currentStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE",
+          "localhost_3": "SLAVE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE",
+          "localhost_3": "DROPPED"
+        },
+        "expectedStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE",
+          "localhost_3": "DROPPED"
+        }
+      },
+      "resource_0_1": {
+        "currentStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "expectedStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        }
+      }
+    }
+  },
+  {
+    "statemodel": "MasterSlave",
+    "errorOrRecoveryPartitionThresholdForLoadBalance": 2,
+    "inputs": {
+      "resource_0_0": {
+        "currentStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "ERROR",
+          "localhost_2": "SLAVE",
+          "localhost_3": "SLAVE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "ERROR",
+          "localhost_2": "SLAVE",
+          "localhost_3": "DROPPED"
+        },
+        "expectedStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "ERROR",
+          "localhost_2": "SLAVE",
+          "localhost_3": "DROPPED"
+        }
+      },
+      "resource_0_1": {
+        "currentStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "expectedStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        }
+      }
+    }
+  },
+  {
+    "statemodel": "MasterSlave",
+    "errorOrRecoveryPartitionThresholdForLoadBalance": 1,
+    "inputs": {
+      "resource_0_0": {
+        "currentStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "ERROR",
+          "localhost_2": "SLAVE",
+          "localhost_3": "SLAVE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "ERROR",
+          "localhost_2": "SLAVE",
+          "localhost_3": "DROPPED"
+        },
+        "expectedStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "ERROR",
+          "localhost_2": "SLAVE",
+          "localhost_3": "DROPPED"
+        }
+      },
+      "resource_0_1": {
+        "currentStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "expectedStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        }
+      }
+    }
+  },
+  {
+    "statemodel": "MasterSlave",
+    "errorOrRecoveryPartitionThresholdForLoadBalance": 1,
+    "inputs": {
+      "resource_0_0": {
+        "currentStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "OFFLINE",
+          "localhost_2": "OFFLINE",
+          "localhost_3": "SLAVE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE",
+          "localhost_3": "DROPPED"
+        },
+        "expectedStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE",
+          "localhost_3": "DROPPED"
+        }
+      },
+      "resource_0_1": {
+        "currentStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "expectedStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        }
+      }
+    }
+  },
+  {
+    "statemodel": "MasterSlave",
+    "errorOrRecoveryPartitionThresholdForLoadBalance": 1,
+    "inputs": {
+      "resource_0_0": {
+        "currentStates": {
+          "localhost_0": "SLAVE",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "expectedStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        }
+      },
+      "resource_0_1": {
+        "currentStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "expectedStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        }
+      }
+    }
+  },
+  {
+    "statemodel": "MasterSlave",
+    "errorOrRecoveryPartitionThresholdForLoadBalance": 1,
+    "inputs": {
+      "resource_0_0": {
+        "currentStates": {
+          "localhost_0": "SLAVE",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        },
+        "expectedStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE"
+        }
+      },
+      "resource_0_1": {
+        "currentStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE",
+          "localhost_3": "OFFLINE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE",
+          "localhost_3": "SLAVE"
+        },
+        "expectedStates": {
+          "localhost_0": "MASTER",
+          "localhost_1": "SLAVE",
+          "localhost_2": "SLAVE",
+          "localhost_3": "OFFLINE"
+        }
+      }
+    }
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/helix/blob/37f3d4c8/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json 
b/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json
new file mode 100644
index 0000000..d4677be
--- /dev/null
+++ b/helix-core/src/test/resources/TestRecoveryLoadBalance.OnlineOffline.json
@@ -0,0 +1,206 @@
+[
+  {
+    "statemodel": "OnlineOffline",
+    "errorOrRecoveryPartitionThresholdForLoadBalance": 1,
+    "inputs": {
+      "resource_0_0": {
+        "currentStates": {
+          "localhost_0": "OFFLINE",
+          "localhost_1": "OFFLINE",
+          "localhost_2": "OFFLINE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        },
+        "expectedStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        }
+      },
+      "resource_0_1": {
+        "currentStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE",
+          "localhost_3": "ONLINE"
+        },
+        "expectedStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        }
+      }
+    }
+  },
+  {
+    "statemodel": "OnlineOffline",
+    "errorOrRecoveryPartitionThresholdForLoadBalance": 1,
+    "inputs": {
+      "resource_0_0": {
+        "currentStates": {
+          "localhost_0": "OFFLINE",
+          "localhost_1": "OFFLINE",
+          "localhost_2": "OFFLINE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        },
+        "expectedStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        }
+      },
+      "resource_0_1": {
+        "currentStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE",
+          "localhost_3": "ONLINE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE",
+          "localhost_3": "DROPPED"
+        },
+        "expectedStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE",
+          "localhost_3": "DROPPED"
+        }
+      }
+    }
+  },
+  {
+    "statemodel": "OnlineOffline",
+    "errorOrRecoveryPartitionThresholdForLoadBalance": 100,
+    "inputs": {
+      "resource_0_0": {
+        "currentStates": {
+          "localhost_0": "OFFLINE",
+          "localhost_1": "OFFLINE",
+          "localhost_2": "OFFLINE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        },
+        "expectedStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        }
+      },
+      "resource_0_1": {
+        "currentStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE",
+          "localhost_3": "ONLINE"
+        },
+        "expectedStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE",
+          "localhost_3": "ONLINE"
+        }
+      }
+    }
+  },
+  {
+    "statemodel": "OnlineOffline",
+    "errorOrRecoveryPartitionThresholdForLoadBalance": 1,
+    "inputs": {
+      "resource_0_0": {
+        "currentStates": {
+          "localhost_0": "OFFLINE",
+          "localhost_1": "OFFLINE",
+          "localhost_2": "OFFLINE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        },
+        "expectedStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        }
+      },
+      "resource_0_1": {
+        "currentStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "OFFLINE",
+          "localhost_2": "ONLINE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "DROPPED",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        },
+        "expectedStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "OFFLINE",
+          "localhost_2": "ONLINE"
+        }
+      }
+    }
+  },
+  {
+    "statemodel": "OnlineOffline",
+    "errorOrRecoveryPartitionThresholdForLoadBalance": 1,
+    "inputs": {
+      "resource_0_0": {
+        "currentStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "DROPPED",
+          "localhost_1": "ONLINE",
+          "localhost_2": "OFFLINE"
+        },
+        "expectedStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE"
+        }
+      },
+      "resource_0_1": {
+        "currentStates": {
+          "localhost_0": "OFFLINE",
+          "localhost_1": "OFFLINE",
+          "localhost_2": "OFFLINE"
+        },
+        "bestPossibleStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        },
+        "expectedStates": {
+          "localhost_0": "ONLINE",
+          "localhost_1": "ONLINE",
+          "localhost_2": "ONLINE"
+        }
+      }
+    }
+  }
+]
\ No newline at end of file

Reply via email to