This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/wagedRebalancer by this push:
     new 935360e  Integrate the WAGED rebalancer with all the related 
components. (#466)
935360e is described below

commit 935360e7f673ce50ad3dfa58cb9828186d2cc17c
Author: Jiajun Wang <[email protected]>
AuthorDate: Wed Sep 18 13:42:43 2019 -0700

    Integrate the WAGED rebalancer with all the related components. (#466)
    
    1. Integrate with the algorithm, assignment metadata store, etc. Fix 
several conflicting interfaces and logics so as to all the rebalancer run 
correctly.
    2. Complete OptimalAssignment.
    3. Add integration tests to ensure the correctness of rebalancing logic.
---
 .../org/apache/helix/HelixRebalanceException.java  |   3 +
 .../rebalancer/waged/AssignmentMetadataStore.java  |  54 ++-
 .../rebalancer/waged/WagedRebalancer.java          | 248 ++++++++---
 .../constraints/ConstraintBasedAlgorithm.java      |  92 +++-
 .../NodeMaxPartitionLimitConstraint.java           |   9 +-
 .../rebalancer/waged/model/AssignableNode.java     |  38 +-
 .../rebalancer/waged/model/AssignableReplica.java  |  12 +-
 .../waged/model/ClusterModelProvider.java          |  30 ++
 .../rebalancer/waged/model/OptimalAssignment.java  |  52 ++-
 .../stages/BestPossibleStateCalcStage.java         | 154 ++++---
 .../helix/manager/zk/ZkBucketDataAccessor.java     |   3 +-
 .../java/org/apache/helix/common/ZkTestBase.java   |  18 +-
 .../waged/MockAssignmentMetadataStore.java         |   9 +-
 .../waged/TestAssignmentMetadataStore.java         |  20 +-
 .../rebalancer/waged/TestWagedRebalancer.java      |  33 +-
 .../waged/constraints/MockRebalanceAlgorithm.java  |   2 +-
 .../waged/model/AbstractTestClusterModel.java      |   2 +-
 .../waged/model/TestOptimalAssignment.java         |  91 ++++
 .../WagedRebalancer/TestWagedRebalance.java        | 477 +++++++++++++++++++++
 .../TestWagedRebalanceFaultZone.java               | 374 ++++++++++++++++
 .../TestWagedRebalanceTopologyAware.java           | 114 +++++
 21 files changed, 1605 insertions(+), 230 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java 
b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
index a8b5055..d54853f 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
@@ -23,10 +23,13 @@ package org.apache.helix;
  * Exception thrown by Helix due to rebalance failures.
  */
 public class HelixRebalanceException extends Exception {
+  // TODO: Adding static description or other necessary fields into the enum 
instances for
+  // TODO: supporting the rebalance monitor to understand the exception.
   public enum Type {
     INVALID_CLUSTER_STATUS,
     INVALID_REBALANCER_STATUS,
     FAILED_TO_CALCULATE,
+    INVALID_INPUT,
     UNKNOWN_FAILURE
   }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index fd655d1..a540ffb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -20,13 +20,15 @@ package org.apache.helix.controller.rebalancer.waged;
  */
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
@@ -50,23 +52,27 @@ public class AssignmentMetadataStore {
   private Map<String, ResourceAssignment> _globalBaseline;
   private Map<String, ResourceAssignment> _bestPossibleAssignment;
 
+  AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
+    this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
+  }
+
   AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String 
clusterName) {
     _dataAccessor = bucketDataAccessor;
     _baselinePath = String.format(BASELINE_TEMPLATE, clusterName, 
ASSIGNMENT_METADATA_KEY);
     _bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName, 
ASSIGNMENT_METADATA_KEY);
   }
 
-  AssignmentMetadataStore(HelixManager helixManager) {
-    this(new 
ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString()),
-        helixManager.getClusterName());
-  }
-
   public Map<String, ResourceAssignment> getBaseline() {
     // Return the in-memory baseline. If null, read from ZK. This is to 
minimize reads from ZK
     if (_globalBaseline == null) {
-      HelixProperty baseline =
-          _dataAccessor.compressedBucketRead(_baselinePath, 
HelixProperty.class);
-      _globalBaseline = splitAssignments(baseline);
+      try {
+        HelixProperty baseline =
+            _dataAccessor.compressedBucketRead(_baselinePath, 
HelixProperty.class);
+        _globalBaseline = splitAssignments(baseline);
+      } catch (ZkNoNodeException ex) {
+        // Metadata does not exist, so return an empty map
+        _globalBaseline = Collections.emptyMap();
+      }
     }
     return _globalBaseline;
   }
@@ -74,9 +80,14 @@ public class AssignmentMetadataStore {
   public Map<String, ResourceAssignment> getBestPossibleAssignment() {
     // Return the in-memory baseline. If null, read from ZK. This is to 
minimize reads from ZK
     if (_bestPossibleAssignment == null) {
-      HelixProperty baseline =
-          _dataAccessor.compressedBucketRead(_bestPossiblePath, 
HelixProperty.class);
-      _bestPossibleAssignment = splitAssignments(baseline);
+      try {
+        HelixProperty baseline =
+            _dataAccessor.compressedBucketRead(_bestPossiblePath, 
HelixProperty.class);
+        _bestPossibleAssignment = splitAssignments(baseline);
+      } catch (ZkNoNodeException ex) {
+        // Metadata does not exist, so return an empty map
+        _bestPossibleAssignment = Collections.emptyMap();
+      }
     }
     return _bestPossibleAssignment;
   }
@@ -113,6 +124,16 @@ public class AssignmentMetadataStore {
     _bestPossibleAssignment = bestPossibleAssignment;
   }
 
+  protected void finalize() {
+    // To ensure all resources are released.
+    close();
+  }
+
+  // Close to release all the resources.
+  public void close() {
+    _dataAccessor.disconnect();
+  }
+
   /**
    * Produces one HelixProperty that contains all assignment data.
    * @param name
@@ -123,8 +144,9 @@ public class AssignmentMetadataStore {
       Map<String, ResourceAssignment> assignmentMap) {
     HelixProperty property = new HelixProperty(name);
     // Add each resource's assignment as a simple field in one ZNRecord
+    // Node that don't use Arrays.toString() for the record converting. The 
deserialize will fail.
     assignmentMap.forEach((resource, assignment) -> 
property.getRecord().setSimpleField(resource,
-        Arrays.toString(SERIALIZER.serialize(assignment.getRecord()))));
+        new String(SERIALIZER.serialize(assignment.getRecord()))));
     return property;
   }
 
@@ -138,8 +160,8 @@ public class AssignmentMetadataStore {
     // Convert each resource's assignment String into a ResourceAssignment 
object and put it in a
     // map
     property.getRecord().getSimpleFields()
-        .forEach((resource, assignment) -> assignmentMap.put(resource,
-            new ResourceAssignment((ZNRecord) 
SERIALIZER.deserialize(assignment.getBytes()))));
+        .forEach((resource, assignmentStr) -> assignmentMap.put(resource,
+            new ResourceAssignment((ZNRecord) 
SERIALIZER.deserialize(assignmentStr.getBytes()))));
     return assignmentMap;
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 551239d..1861e10 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.changedetector.ResourceChangeDetector;
@@ -64,27 +65,34 @@ public class WagedRebalancer {
   // When any of the following change happens, the rebalancer needs to do a 
global rebalance which
   // contains 1. baseline recalculate, 2. partial rebalance that is based on 
the new baseline.
   private static final Set<HelixConstants.ChangeType> 
GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
-      ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
-          HelixConstants.ChangeType.CLUSTER_CONFIG, 
HelixConstants.ChangeType.INSTANCE_CONFIG);
+      ImmutableSet.of(
+          HelixConstants.ChangeType.RESOURCE_CONFIG,
+          HelixConstants.ChangeType.CLUSTER_CONFIG,
+          HelixConstants.ChangeType.INSTANCE_CONFIG);
   // The cluster change detector is a stateful object.
   // Make it static to avoid unnecessary reinitialization.
   private static final ThreadLocal<ResourceChangeDetector> 
CHANGE_DETECTOR_THREAD_LOCAL =
       new ThreadLocal<>();
   private final MappingCalculator<ResourceControllerDataProvider> 
_mappingCalculator;
-
-  // --------- The following fields are placeholders and need replacement. 
-----------//
-  // TODO Shall we make the metadata store a static threadlocal object as well 
to avoid
-  // reinitialization?
   private final AssignmentMetadataStore _assignmentMetadataStore;
   private final RebalanceAlgorithm _rebalanceAlgorithm;
-  // 
------------------------------------------------------------------------------------//
+
+  private static AssignmentMetadataStore constructAssignmentStore(HelixManager 
helixManager) {
+    AssignmentMetadataStore assignmentMetadataStore = null;
+    if (helixManager != null) {
+      String metadataStoreAddrs = 
helixManager.getMetadataStoreConnectionString();
+      String clusterName = helixManager.getClusterName();
+      if (metadataStoreAddrs != null && clusterName != null) {
+        assignmentMetadataStore = new 
AssignmentMetadataStore(metadataStoreAddrs, clusterName);
+      }
+    }
+    return assignmentMetadataStore;
+  }
 
   public WagedRebalancer(HelixManager helixManager,
       Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
-    this(
-        // TODO init the metadata store according to their requirement when 
integrate,
-        // or change to final static method if possible.
-        new AssignmentMetadataStore(helixManager), 
ConstraintBasedAlgorithmFactory.getInstance(preferences),
+    this(constructAssignmentStore(helixManager),
+        ConstraintBasedAlgorithmFactory.getInstance(preferences),
         // Use DelayedAutoRebalancer as the mapping calculator for the final 
assignment output.
         // Mapping calculator will translate the best possible assignment into 
the applicable state
         // mapping based on the current states.
@@ -94,6 +102,10 @@ public class WagedRebalancer {
 
   private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
+    if (assignmentMetadataStore == null) {
+      LOG.warn("Assignment Metadata Store is not configured properly."
+          + " The rebalancer will not access the assignment store during the 
rebalance.");
+    }
     _assignmentMetadataStore = assignmentMetadataStore;
     _rebalanceAlgorithm = algorithm;
     _mappingCalculator = mappingCalculator;
@@ -103,7 +115,13 @@ public class WagedRebalancer {
   protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
       RebalanceAlgorithm algorithm) {
     this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer());
+  }
 
+  // Release all the resources.
+  public void close() {
+    if (_assignmentMetadataStore != null) {
+      _assignmentMetadataStore.close();
+    }
   }
 
   /**
@@ -117,27 +135,18 @@ public class WagedRebalancer {
   public Map<String, IdealState> 
computeNewIdealStates(ResourceControllerDataProvider clusterData,
       Map<String, Resource> resourceMap, final CurrentStateOutput 
currentStateOutput)
       throws HelixRebalanceException {
-    LOG.info("Start computing new ideal states for resources: {}", 
resourceMap.keySet().toString());
-
-    // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the 
WAGED rebalancer
-    resourceMap = resourceMap.entrySet().stream().filter(resourceEntry -> {
-      IdealState is = clusterData.getIdealState(resourceEntry.getKey());
-      return is != null && 
is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
-          && getClass().getName().equals(is.getRebalancerClassName());
-    }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(),
-        resourceEntry -> resourceEntry.getValue()));
-
     if (resourceMap.isEmpty()) {
-      LOG.warn("There is no valid resource to be rebalanced by {}",
+      LOG.warn("There is no resource to be rebalanced by {}",
           this.getClass().getSimpleName());
       return Collections.emptyMap();
-    } else {
-      LOG.info("Valid resources that will be rebalanced by {}: {}", 
this.getClass().getSimpleName(),
-          resourceMap.keySet().toString());
     }
 
+    LOG.info("Start computing new ideal states for resources: {}", 
resourceMap.keySet().toString());
+    validateInput(clusterData, resourceMap);
+
     // Calculate the target assignment based on the current cluster status.
-    Map<String, IdealState> newIdealStates = 
computeBestPossibleStates(clusterData, resourceMap);
+    Map<String, IdealState> newIdealStates =
+        computeBestPossibleStates(clusterData, resourceMap, 
currentStateOutput);
 
     // Construct the new best possible states according to the current state 
and target assignment.
     // Note that the new ideal state might be an intermediate state between 
the current state and
@@ -166,28 +175,29 @@ public class WagedRebalancer {
 
   // Coordinate baseline recalculation and partial rebalance according to the 
cluster changes.
   private Map<String, IdealState> computeBestPossibleStates(
-      ResourceControllerDataProvider clusterData, Map<String, Resource> 
resourceMap)
-      throws HelixRebalanceException {
+      ResourceControllerDataProvider clusterData, Map<String, Resource> 
resourceMap,
+      final CurrentStateOutput currentStateOutput) throws 
HelixRebalanceException {
     getChangeDetector().updateSnapshots(clusterData);
-    // Get all the modified and new items' information
+    // Get all the changed items' information
     Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
         getChangeDetector().getChangeTypes().stream()
             .collect(Collectors.toMap(changeType -> changeType, changeType -> {
               Set<String> itemKeys = new HashSet<>();
               
itemKeys.addAll(getChangeDetector().getAdditionsByType(changeType));
               
itemKeys.addAll(getChangeDetector().getChangesByType(changeType));
+              
itemKeys.addAll(getChangeDetector().getRemovalsByType(changeType));
               return itemKeys;
             }));
 
     if (clusterChanges.keySet().stream()
         .anyMatch(changeType -> 
GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES.contains(changeType))) {
-      refreshBaseline(clusterData, clusterChanges, resourceMap);
+      refreshBaseline(clusterData, clusterChanges, resourceMap, 
currentStateOutput);
       // Inject a cluster config change for large scale partial rebalance once 
the baseline changed.
       clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, 
Collections.emptySet());
     }
 
     Map<String, ResourceAssignment> newAssignment =
-        partialRebalance(clusterData, clusterChanges, resourceMap);
+        partialRebalance(clusterData, clusterChanges, resourceMap, 
currentStateOutput);
 
     // Convert the assignments into IdealState for the following state mapping 
calculation.
     Map<String, IdealState> finalIdealState = new HashMap<>();
@@ -213,56 +223,60 @@ public class WagedRebalancer {
 
   // TODO make the Baseline calculation async if complicated algorithm is used 
for the Baseline
   private void refreshBaseline(ResourceControllerDataProvider clusterData,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, 
Resource> resourceMap)
-      throws HelixRebalanceException {
+      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, 
Resource> resourceMap,
+      final CurrentStateOutput currentStateOutput) throws 
HelixRebalanceException {
+    LOG.info("Start calculating the new baseline.");
+    Map<String, ResourceAssignment> currentBaseline =
+        getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, 
resourceMap.keySet());
     // For baseline calculation
     // 1. Ignore node status (disable/offline).
     // 2. Use the baseline as the previous best possible assignment since 
there is no "baseline" for
     // the baseline.
-    LOG.info("Start calculating the new baseline.");
-    Map<String, ResourceAssignment> currentBaseline;
-    try {
-      currentBaseline = _assignmentMetadataStore.getBaseline();
-    } catch (Exception ex) {
-      throw new HelixRebalanceException("Failed to get the current baseline 
assignment.",
-          HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
-    }
-    Map<String, ResourceAssignment> baseline = 
calculateAssignment(clusterData, clusterChanges,
-        resourceMap, clusterData.getAllInstances(), Collections.emptyMap(), 
currentBaseline);
-    try {
-      _assignmentMetadataStore.persistBaseline(baseline);
-    } catch (Exception ex) {
-      throw new HelixRebalanceException("Failed to persist the new baseline 
assignment.",
-          HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+    Map<String, ResourceAssignment> newBaseline =
+        calculateAssignment(clusterData, clusterChanges, resourceMap, 
clusterData.getAllInstances(),
+            Collections.emptyMap(), currentBaseline);
+
+    if (_assignmentMetadataStore != null) {
+      try {
+        _assignmentMetadataStore.persistBaseline(newBaseline);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException("Failed to persist the new baseline 
assignment.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    } else {
+      LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline 
assignment.");
     }
+
     LOG.info("Finish calculating the new baseline.");
   }
 
   private Map<String, ResourceAssignment> partialRebalance(
       ResourceControllerDataProvider clusterData,
-      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, 
Resource> resourceMap)
-      throws HelixRebalanceException {
+      Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, 
Resource> resourceMap,
+      final CurrentStateOutput currentStateOutput) throws 
HelixRebalanceException {
     LOG.info("Start calculating the new best possible assignment.");
-    Set<String> activeInstances = clusterData.getEnabledLiveInstances();
-    Map<String, ResourceAssignment> baseline;
-    Map<String, ResourceAssignment> prevBestPossibleAssignment;
-    try {
-      baseline = _assignmentMetadataStore.getBaseline();
-      prevBestPossibleAssignment = 
_assignmentMetadataStore.getBestPossibleAssignment();
-    } catch (Exception ex) {
-      throw new HelixRebalanceException("Failed to get the persisted 
assignment records.",
-          HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
-    }
-    Map<String, ResourceAssignment> newAssignment = 
calculateAssignment(clusterData, clusterChanges,
-        resourceMap, activeInstances, baseline, prevBestPossibleAssignment);
-    try {
-      // TODO Test to confirm if persisting the final assignment (with final 
partition states)
-      // would be a better option.
-      _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
-    } catch (Exception ex) {
-      throw new HelixRebalanceException("Failed to persist the new best 
possible assignment.",
-          HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+    Map<String, ResourceAssignment> currentBaseline =
+        getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, 
resourceMap.keySet());
+    Map<String, ResourceAssignment> currentBestPossibleAssignment =
+        getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+            resourceMap.keySet());
+    Map<String, ResourceAssignment> newAssignment =
+        calculateAssignment(clusterData, clusterChanges, resourceMap,
+            clusterData.getEnabledLiveInstances(), currentBaseline, 
currentBestPossibleAssignment);
+
+    if (_assignmentMetadataStore != null) {
+      try {
+        // TODO Test to confirm if persisting the final assignment (with final 
partition states)
+        // would be a better option.
+        _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException("Failed to persist the new best 
possible assignment.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    } else {
+      LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline 
assignment.");
     }
+
     LOG.info("Finish calculating the new best possible assignment.");
     return newAssignment;
   }
@@ -348,4 +362,100 @@ public class WagedRebalancer {
     }
     return preferenceList;
   }
+
+  private void validateInput(ResourceControllerDataProvider clusterData,
+      Map<String, Resource> resourceMap) throws HelixRebalanceException {
+    Set<String> nonCompatibleResources = 
resourceMap.entrySet().stream().filter(resourceEntry -> {
+      IdealState is = clusterData.getIdealState(resourceEntry.getKey());
+      return is == null || 
!is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
+          || !getClass().getName().equals(is.getRebalancerClassName());
+    }).map(Map.Entry::getKey).collect(Collectors.toSet());
+    if (!nonCompatibleResources.isEmpty()) {
+      throw new HelixRebalanceException(String.format(
+          "Input contains invalid resource(s) that cannot be rebalanced by the 
WAGED rebalancer. %s",
+          nonCompatibleResources.toString()), 
HelixRebalanceException.Type.INVALID_INPUT);
+    }
+  }
+
+  /**
+   * @param assignmentMetadataStore
+   * @param currentStateOutput
+   * @param resources
+   * @return The current baseline assignment. If record does not exist in the
+   * assignmentMetadataStore, return the current state assignment.
+   * @throws HelixRebalanceException
+   */
+  private Map<String, ResourceAssignment> getBaselineAssignment(
+      AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput 
currentStateOutput,
+      Set<String> resources) throws HelixRebalanceException {
+    Map<String, ResourceAssignment> currentBaseline = Collections.emptyMap();
+    if (assignmentMetadataStore != null) {
+      try {
+        currentBaseline = assignmentMetadataStore.getBaseline();
+      } catch (HelixException ex) {
+        // Report error. and use empty mapping instead.
+        LOG.error("Failed to get the current baseline assignment.", ex);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException(
+            "Failed to get the current baseline assignment because of 
unexpected error.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    }
+    if (currentBaseline.isEmpty()) {
+      LOG.warn(
+          "The current baseline assignment record is empty. Use the current 
states instead.");
+      currentBaseline = getCurrentStateAssingment(currentStateOutput, 
resources);
+    }
+    return currentBaseline;
+  }
+
+  /**
+   * @param assignmentMetadataStore
+   * @param currentStateOutput
+   * @param resources
+   * @return The current best possible assignment. If record does not exist in 
the
+   * assignmentMetadataStore, return the current state assignment.
+   * @throws HelixRebalanceException
+   */
+  private Map<String, ResourceAssignment> getBestPossibleAssignment(
+      AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput 
currentStateOutput,
+      Set<String> resources) throws HelixRebalanceException {
+    Map<String, ResourceAssignment> currentBestAssignment = 
Collections.emptyMap();
+    if (assignmentMetadataStore != null) {
+      try {
+        currentBestAssignment = 
assignmentMetadataStore.getBestPossibleAssignment();
+      } catch (HelixException ex) {
+        // Report error. and use empty mapping instead.
+        LOG.error("Failed to get the current best possible assignment.", ex);
+      } catch (Exception ex) {
+        throw new HelixRebalanceException(
+            "Failed to get the current best possible assignment because of 
unexpected error.",
+            HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+      }
+    }
+    if (currentBestAssignment.isEmpty()) {
+      LOG.warn(
+          "The current best possible assignment record is empty. Use the 
current states instead.");
+      currentBestAssignment = getCurrentStateAssingment(currentStateOutput, 
resources);
+    }
+    return currentBestAssignment;
+  }
+
+  private Map<String, ResourceAssignment> getCurrentStateAssingment(
+      CurrentStateOutput currentStateOutput, Set<String> resourceSet) {
+    Map<String, ResourceAssignment> currentStateAssignment = new HashMap<>();
+    for (String resourceName : resourceSet) {
+      Map<Partition, Map<String, String>> currentStateMap =
+          currentStateOutput.getCurrentStateMap(resourceName);
+      if (!currentStateMap.isEmpty()) {
+        ResourceAssignment newResourceAssignment = new 
ResourceAssignment(resourceName);
+        currentStateMap.entrySet().stream().forEach(currentStateEntry -> {
+          newResourceAssignment
+              .addReplicaMap(currentStateEntry.getKey(), 
currentStateEntry.getValue());
+        });
+        currentStateAssignment.put(resourceName, newResourceAssignment);
+      }
+    }
+    return currentStateAssignment;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 89a3f29..1a41aef 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Maps;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
@@ -36,11 +37,10 @@ import 
org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
+import org.apache.helix.model.ResourceAssignment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-
 /**
  * The algorithm is based on a given set of constraints
  * - HardConstraint: Approve or deny the assignment given its condition, any 
assignment cannot
@@ -64,29 +64,26 @@ class ConstraintBasedAlgorithm implements 
RebalanceAlgorithm {
   @Override
   public OptimalAssignment calculate(ClusterModel clusterModel) throws 
HelixRebalanceException {
     OptimalAssignment optimalAssignment = new OptimalAssignment();
-    Map<String, Set<AssignableReplica>> replicasByResource = 
clusterModel.getAssignableReplicaMap();
     List<AssignableNode> nodes = new 
ArrayList<>(clusterModel.getAssignableNodes().values());
-
-    // TODO: different orders of resource/replica could lead to different 
greedy assignments, will
-    // revisit and improve the performance
-    for (String resource : replicasByResource.keySet()) {
-      for (AssignableReplica replica : replicasByResource.get(resource)) {
-        Optional<AssignableNode> maybeBestNode =
-            getNodeWithHighestPoints(replica, nodes, 
clusterModel.getContext(), optimalAssignment);
-        // stop immediately if any replica cannot find best assignable node
-        if (optimalAssignment.hasAnyFailure()) {
-          String errorMessage = String.format(
-              "Unable to find any available candidate node for partition %s; 
Fail reasons: %s",
-              replica.getPartitionName(), optimalAssignment.getFailures());
-          throw new HelixRebalanceException(errorMessage,
-              HelixRebalanceException.Type.FAILED_TO_CALCULATE);
-        }
-        maybeBestNode.ifPresent(node -> 
clusterModel.assign(replica.getResourceName(),
-            replica.getPartitionName(), replica.getReplicaState(), 
node.getInstanceName()));
+    // Sort the replicas so the input is stable for the greedy algorithm.
+    // For the other algorithm implementation, this sorting could be 
unnecessary.
+    for (AssignableReplica replica : 
getOrderedAssignableReplica(clusterModel)) {
+      Optional<AssignableNode> maybeBestNode =
+          getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), 
optimalAssignment);
+      // stop immediately if any replica cannot find best assignable node
+      if (optimalAssignment.hasAnyFailure()) {
+        String errorMessage = String.format(
+            "Unable to find any available candidate node for partition %s; 
Fail reasons: %s",
+            replica.getPartitionName(), optimalAssignment.getFailures());
+        throw new HelixRebalanceException(errorMessage,
+            HelixRebalanceException.Type.FAILED_TO_CALCULATE);
       }
+      maybeBestNode.ifPresent(node -> clusterModel
+          .assign(replica.getResourceName(), replica.getPartitionName(), 
replica.getReplicaState(),
+              node.getInstanceName()));
     }
-
-    return optimalAssignment.convertFrom(clusterModel);
+    optimalAssignment.updateAssignments(clusterModel);
+    return optimalAssignment;
   }
 
   private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica 
replica,
@@ -133,4 +130,55 @@ class ConstraintBasedAlgorithm implements 
RebalanceAlgorithm {
     return hardConstraints.stream().map(HardConstraint::getDescription)
         .collect(Collectors.toList());
   }
+
+  // TODO investigate better ways to sort replicas. One option is sorting 
based on the creation time.
+  private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel 
clusterModel) {
+    Map<String, Set<AssignableReplica>> replicasByResource = 
clusterModel.getAssignableReplicaMap();
+    List<AssignableReplica> orderedAssignableReplicas =
+        replicasByResource.values().stream().flatMap(replicas -> 
replicas.stream())
+            .collect(Collectors.toList());
+
+    Map<String, ResourceAssignment> bestPossibleAssignment =
+        clusterModel.getContext().getBestPossibleAssignment();
+    Map<String, ResourceAssignment> baselineAssignment =
+        clusterModel.getContext().getBaselineAssignment();
+
+    // 1. Sort according if the assignment exists in the best possible and/or 
baseline assignment
+    // 2. Sort according to the state priority. Note that prioritizing the top 
state is required.
+    // Or the greedy algorithm will unnecessarily shuffle the states between 
replicas.
+    // 3. Sort according to the resource/partition name.
+    orderedAssignableReplicas.sort((replica1, replica2) -> {
+      String resourceName1 = replica1.getResourceName();
+      String resourceName2 = replica2.getResourceName();
+      if (bestPossibleAssignment.containsKey(resourceName1) == 
bestPossibleAssignment
+          .containsKey(resourceName2)) {
+        if (baselineAssignment.containsKey(resourceName1) == baselineAssignment
+            .containsKey(resourceName2)) {
+          // If both assignment states have/not have the resource assignment 
the same,
+          // compare for additional dimensions.
+          int statePriority1 = replica1.getStatePriority();
+          int statePriority2 = replica2.getStatePriority();
+          if (statePriority1 == statePriority2) {
+            // If state prioritizes are the same, compare the names.
+            if (resourceName1.equals(resourceName2)) {
+              return 
replica1.getPartitionName().compareTo(replica2.getPartitionName());
+            } else {
+              return resourceName1.compareTo(resourceName2);
+            }
+          } else {
+            // Note we shall prioritize the replica with a higher state 
priority,
+            // the smaller priority number means higher priority.
+            return statePriority1 - statePriority2;
+          }
+        } else {
+          // If the baseline assignment contains the assignment, prioritize 
the replica.
+          return baselineAssignment.containsKey(resourceName1) ? -1 : 1;
+        }
+      } else {
+        // If the best possible assignment contains the assignment, prioritize 
the replica.
+        return bestPossibleAssignment.containsKey(resourceName1) ? -1 : 1;
+      }
+    });
+    return orderedAssignableReplicas;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
index 9d0752b..cda5329 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
@@ -28,9 +28,12 @@ class NodeMaxPartitionLimitConstraint extends HardConstraint 
{
   @Override
   boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
       ClusterContext clusterContext) {
-    return node.getAssignedReplicaCount() < node.getMaxPartition()
-        && 
node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica
-            .getResourceMaxPartitionsPerInstance();
+    boolean exceedMaxPartitionLimit =
+        node.getMaxPartition() < 0 || node.getAssignedReplicaCount() < 
node.getMaxPartition();
+    boolean exceedResourceMaxPartitionLimit = 
replica.getResourceMaxPartitionsPerInstance() < 0
+        || 
node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica
+        .getResourceMaxPartitionsPerInstance();
+    return exceedMaxPartitionLimit && exceedResourceMaxPartitionLimit;
   }
 
   @Override
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index a3460fb..20de6da 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -19,8 +19,6 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
-import static java.lang.Math.max;
-
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -287,16 +285,23 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
    * Any missing field will cause an invalid topology config exception.
    */
   private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig 
instanceConfig) {
-    if (clusterConfig.isTopologyAwareEnabled()) {
-      String topologyStr = clusterConfig.getTopology();
-      String faultZoneType = clusterConfig.getFaultZoneType();
-      if (topologyStr == null || faultZoneType == null) {
-        throw new HelixException("Fault zone or cluster topology information 
is not configured.");
-      }
-
+    if (!clusterConfig.isTopologyAwareEnabled()) {
+      // Instance name is the default fault zone if topology awareness is 
false.
+      return instanceConfig.getInstanceName();
+    }
+    String topologyStr = clusterConfig.getTopology();
+    String faultZoneType = clusterConfig.getFaultZoneType();
+    if (topologyStr == null || faultZoneType == null) {
+      LOG.debug("Topology configuration is not complete. Topology define: {}, 
Fault Zone Type: {}",
+          topologyStr, faultZoneType);
+      // Use the instance name, or the deprecated ZoneId field (if exists) as 
the default fault zone.
+      String zoneId = instanceConfig.getZoneId();
+      return zoneId == null ? instanceConfig.getInstanceName() : zoneId;
+    } else {
+      // Get the fault zone information from the complete topology definition.
       String[] topologyDef = topologyStr.trim().split("/");
-      if (topologyDef.length == 0
-          || Arrays.stream(topologyDef).noneMatch(type -> 
type.equals(faultZoneType))) {
+      if (topologyDef.length == 0 ||
+          Arrays.stream(topologyDef).noneMatch(type -> 
type.equals(faultZoneType))) {
         throw new HelixException(
             "The configured topology definition is empty or does not contain 
the fault zone type.");
       }
@@ -324,10 +329,6 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
         }
         return faultZoneStringBuilder.toString();
       }
-    } else {
-      // For backward compatibility
-      String zoneId = instanceConfig.getZoneId();
-      return zoneId == null ? instanceConfig.getInstanceName() : zoneId;
     }
   }
 
@@ -356,7 +357,7 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
       // For the purpose of constraint calculation, the max utilization cannot 
be larger than 100%.
       float utilization = Math.min(
           (float) (_maxCapacity.get(capacityKey) - newCapacity) / 
_maxCapacity.get(capacityKey), 1);
-      _highestCapacityUtilization = max(_highestCapacityUtilization, 
utilization);
+      _highestCapacityUtilization = Math.max(_highestCapacityUtilization, 
utilization);
     }
     // else if the capacityKey does not exist in the capacity map, this method 
essentially becomes
     // a NOP; in other words, this node will be treated as if it has unlimited 
capacity.
@@ -394,4 +395,9 @@ public class AssignableNode implements 
Comparable<AssignableNode> {
   public int compareTo(AssignableNode o) {
     return _instanceName.compareTo(o.getInstanceName());
   }
+
+  @Override
+  public String toString() {
+    return _instanceName;
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index 66bd7b7..a651e19 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -20,17 +20,22 @@ package org.apache.helix.controller.rebalancer.waged.model;
  */
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class represents a partition replication that needs to be allocated.
  */
 public class AssignableReplica implements Comparable<AssignableReplica> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AssignableReplica.class);
+
   private final String _partitionName;
   private final String _resourceName;
   private final String _resourceInstanceGroupTag;
@@ -149,9 +154,10 @@ public class AssignableReplica implements 
Comparable<AssignableReplica> {
       partitionCapacity = 
capacityMap.get(ResourceConfig.DEFAULT_PARTITION_KEY);
     }
     if (partitionCapacity == null) {
-      throw new IllegalArgumentException(String.format(
-          "The capacity usage of the specified partition %s is not configured 
in the Resource Config %s. No default partition capacity is configured 
neither.",
-          partitionName, resourceConfig.getResourceName()));
+      LOG.warn("The capacity usage of the specified partition {} is not 
configured in the Resource"
+          + " Config {}. No default partition capacity is configured either. 
Will proceed with"
+          + " empty capacity configuration.", partitionName, 
resourceConfig.getResourceName());
+      partitionCapacity = new HashMap<>();
     }
 
     List<String> requiredCapacityKeys = 
clusterConfig.getInstanceCapacityKeys();
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index af1a8d8..2b53422 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -200,6 +200,9 @@ public class ClusterModelProvider {
 
     for (String resourceName : resourceMap.keySet()) {
       ResourceConfig resourceConfig = 
dataProvider.getResourceConfig(resourceName);
+      if (resourceConfig == null) {
+        resourceConfig = new ResourceConfig(resourceName);
+      }
       IdealState is = dataProvider.getIdealState(resourceName);
       if (is == null) {
         throw new HelixException(
@@ -223,6 +226,7 @@ public class ClusterModelProvider {
         for (Map.Entry<String, Integer> entry : stateCountMap.entrySet()) {
           String state = entry.getKey();
           for (int i = 0; i < entry.getValue(); i++) {
+            mergeIdealStateWithResourceConfig(resourceConfig, is);
             totalReplicaMap.computeIfAbsent(resourceName, key -> new 
HashSet<>()).add(
                 new AssignableReplica(clusterConfig, resourceConfig, 
partition, state,
                     def.getStatePriorityMap().get(state)));
@@ -234,6 +238,32 @@ public class ClusterModelProvider {
   }
 
   /**
+   * For backward compatibility, propagate the critical simple fields from the 
IdealState to
+   * the Resource Config.
+   * Eventually, Resource Config should be the only metadata node that 
contains the required information.
+   */
+  private static void mergeIdealStateWithResourceConfig(ResourceConfig 
resourceConfig,
+      final IdealState idealState) {
+    // Note that the config fields get updated in this method shall be fully 
compatible with ones in the IdealState.
+    // 1. The fields shall have exactly the same meaning.
+    // 2. The value shall be exactly compatible, no additional calculation 
involved.
+    // 3. Resource Config items have a high priority.
+    // This is to ensure the resource config is not polluted after the merge.
+    if (null == resourceConfig.getRecord()
+        
.getSimpleField(ResourceConfig.ResourceConfigProperty.INSTANCE_GROUP_TAG.name()))
 {
+      resourceConfig.getRecord()
+          
.setSimpleField(ResourceConfig.ResourceConfigProperty.INSTANCE_GROUP_TAG.name(),
+              idealState.getInstanceGroupTag());
+    }
+    if (null == resourceConfig.getRecord()
+        
.getSimpleField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name()))
 {
+      resourceConfig.getRecord()
+          
.setIntField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(),
+              idealState.getMaxPartitionsPerInstance());
+    }
+  }
+
+  /**
    * @return A map contains the assignments for each fault zone. <fault zone, 
<resource, set of partitions>>
    */
   private static Map<String, Map<String, Set<String>>> 
mapAssignmentToFaultZone(
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java
index 31cb181..138f30c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java
@@ -19,38 +19,64 @@ package org.apache.helix.controller.rebalancer.waged.model;
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.HelixException;
+import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
 
 /**
  * The data model represents the optimal assignment of N replicas assigned to 
M instances;
  * It's mostly used as the return parameter of an assignment calculation 
algorithm; If the algorithm
- * failed to find optimal assignment given the endeavor, the user could check 
the failure reasons
+ * failed to find optimal assignment given the endeavor, the user could check 
the failure reasons.
+ * Note that this class is not thread safe.
  */
 public class OptimalAssignment {
   private Map<AssignableNode, List<AssignableReplica>> _optimalAssignment = 
new HashMap<>();
   private Map<AssignableReplica, Map<AssignableNode, List<String>>> 
_failedAssignments =
       new HashMap<>();
 
-  public OptimalAssignment() {
-
-  }
-
+  /**
+   * Update the OptimalAssignment instance with the existing assignment 
recorded in the input cluster model.
+   *
+   * @param clusterModel
+   */
   public void updateAssignments(ClusterModel clusterModel) {
-
+    _optimalAssignment.clear();
+    clusterModel.getAssignableNodes().values().stream()
+        .forEach(node -> _optimalAssignment.put(node, new 
ArrayList<>(node.getAssignedReplicas())));
   }
 
-  // TODO: determine the output of final assignment format
+  /**
+   * @return The optimal assignment in the form of a <Resource Name, 
ResourceAssignment> map.
+   */
   public Map<String, ResourceAssignment> getOptimalResourceAssignment() {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
-
-  // TODO: the convert method is not the best choice so far, will revisit the 
data model
-  public OptimalAssignment convertFrom(ClusterModel clusterModel) {
-    return this;
+    if (hasAnyFailure()) {
+      throw new HelixException(
+          "Cannot get the optimal resource assignment since a calculation 
failure is recorded. "
+              + getFailures());
+    }
+    Map<String, ResourceAssignment> assignmentMap = new HashMap<>();
+    for (AssignableNode node : _optimalAssignment.keySet()) {
+      for (AssignableReplica replica : _optimalAssignment.get(node)) {
+        String resourceName = replica.getResourceName();
+        Partition partition = new Partition(replica.getPartitionName());
+        ResourceAssignment resourceAssignment = assignmentMap
+            .computeIfAbsent(resourceName, key -> new 
ResourceAssignment(resourceName));
+        Map<String, String> partitionStateMap = 
resourceAssignment.getReplicaMap(partition);
+        if (partitionStateMap.isEmpty()) {
+          // ResourceAssignment returns immutable empty map while no such 
assignment recorded yet.
+          // So if the returned map is empty, create a new map.
+          partitionStateMap = new HashMap<>();
+        }
+        partitionStateMap.put(node.getInstanceName(), 
replica.getReplicaState());
+        resourceAssignment.addReplicaMap(partition, partitionStateMap);
+      }
+    }
+    return assignmentMap;
   }
 
   public void recordAssignmentFailure(AssignableReplica replica,
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 4df8e8d..8ce60e7 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -19,6 +19,14 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRebalanceException;
@@ -48,13 +56,6 @@ import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
 /**
  * For partition compute best possible (instance,state) pair based on
  * IdealState,StateModel,LiveInstance
@@ -114,67 +115,46 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
 
     // Check whether the offline/disabled instance count in the cluster 
reaches the set limit,
     // if yes, pause the rebalancer.
-    boolean isValid = validateOfflineInstancesLimit(cache,
-        (HelixManager) event.getAttribute(AttributeName.helixmanager.name()));
-
-    // 1. Rebalance with the WAGED rebalancer
-    // The rebalancer only calculates the new ideal assignment for all the 
resources that are
-    // configured to use the WAGED rebalancer.
-    // For the other resources, the legacy rebalancers will be triggered in 
the next step.
-    Map<String, IdealState> newIdealStates = new HashMap<>();
-    Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = 
cache.getClusterConfig()
-        .getGlobalRebalancePreference();
-    WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, 
preferences);
-    try {
-      newIdealStates
-          .putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, 
currentStateOutput));
-    } catch (HelixRebalanceException ex) {
-      // Note that unlike the legacy rebalancer, the WAGED rebalance won't 
return partial result.
-      // Since it calculates for all the eligible resources globally, a 
partial result is invalid.
-      // TODO propagate the rebalancer failure information to 
updateRebalanceStatus for monitoring.
-      LogUtil.logError(logger, _eventId, String
-          .format("Failed to calculate the new Ideal States using the 
rebalancer %s due to %s",
-              wagedRebalancer.getClass().getSimpleName(), 
ex.getFailureType()), ex);
-    }
+    boolean isValid =
+        validateOfflineInstancesLimit(cache, 
event.getAttribute(AttributeName.helixmanager.name()));
 
     final List<String> failureResources = new ArrayList<>();
-    Iterator<Resource> itr = resourceMap.values().iterator();
+
+    Map<String, Resource> calculatedResourceMap =
+        computeResourceBestPossibleStateWithWagedRebalancer(cache, 
currentStateOutput, helixManager,
+            resourceMap, output, failureResources);
+
+    Map<String, Resource> remainingResourceMap = new HashMap<>(resourceMap);
+    remainingResourceMap.keySet().removeAll(calculatedResourceMap.keySet());
+
+    // Fallback to the original single resource rebalancer calculation.
+    // This is required because we support mixed cluster that uses both WAGED 
rebalancer and the
+    // older rebalancers.
+    Iterator<Resource> itr = remainingResourceMap.values().iterator();
     while (itr.hasNext()) {
       Resource resource = itr.next();
       boolean result = false;
-      IdealState is = newIdealStates.get(resource.getResourceName());
-      if (is != null) {
-        // 2. Check if the WAGED rebalancer has calculated for this resource 
or not.
-        result = checkBestPossibleStateCalculation(is);
-        if (result) {
-          // The WAGED rebalancer calculates a valid result, record in the 
output
-          updateBestPossibleStateOutput(output, resource, is);
-        }
-      } else {
-        // 3. The WAGED rebalancer skips calculating the resource assignment, 
fallback to use a
-        // legacy resource rebalancer if applicable.
-        // If this calculation fails, the resource will be reported in the 
failureResources list.
-        try {
-          result =
-              computeSingleResourceBestPossibleState(event, cache, 
currentStateOutput, resource,
-                  output);
-        } catch (HelixException ex) {
-          LogUtil.logError(logger, _eventId,
-              "Exception when calculating best possible states for " + 
resource.getResourceName(),
-              ex);
-        }
+      try {
+        result = computeSingleResourceBestPossibleState(event, cache, 
currentStateOutput, resource,
+            output);
+      } catch (HelixException ex) {
+        LogUtil.logError(logger, _eventId, String
+            .format("Exception when calculating best possible states for %s",
+                resource.getResourceName()), ex);
+
       }
       if (!result) {
         failureResources.add(resource.getResourceName());
-        LogUtil.logWarn(logger, _eventId,
-            "Failed to calculate best possible states for " + 
resource.getResourceName());
+        LogUtil.logWarn(logger, _eventId, String
+            .format("Failed to calculate best possible states for %s", 
resource.getResourceName()));
       }
     }
 
     // Check and report if resource rebalance has failure
     updateRebalanceStatus(!isValid || !failureResources.isEmpty(), 
failureResources, helixManager,
-        cache, clusterStatusMonitor,
-        "Failed to calculate best possible states for " + 
failureResources.size() + " resources.");
+        cache, clusterStatusMonitor, String
+            .format("Failed to calculate best possible states for %d 
resources.",
+                failureResources.size()));
 
     return output;
   }
@@ -238,6 +218,70 @@ public class BestPossibleStateCalcStage extends 
AbstractBaseStage {
     return true;
   }
 
+  /**
+   * Rebalance with the WAGED rebalancer
+   * The rebalancer only calculates the new ideal assignment for all the 
resources that are
+   * configured to use the WAGED rebalancer.
+   *
+   * @param cache              Cluster data cache.
+   * @param currentStateOutput The current state information.
+   * @param helixManager
+   * @param resourceMap        The complete resource map. The method will 
filter the map for the compatible resources.
+   * @param output             The best possible state output.
+   * @param failureResources   The failure records that will be updated if any 
resource cannot be computed.
+   * @return The map of all the calculated resources.
+   */
+  private Map<String, Resource> 
computeResourceBestPossibleStateWithWagedRebalancer(
+      ResourceControllerDataProvider cache, CurrentStateOutput 
currentStateOutput,
+      HelixManager helixManager, Map<String, Resource> resourceMap, 
BestPossibleStateOutput output,
+      List<String> failureResources) {
+    // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the 
WAGED rebalancer
+    Map<String, Resource> wagedRebalancedResourceMap =
+        resourceMap.entrySet().stream().filter(resourceEntry -> {
+          IdealState is = cache.getIdealState(resourceEntry.getKey());
+          return is != null && 
is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
+              && 
WagedRebalancer.class.getName().equals(is.getRebalancerClassName());
+        }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(),
+            resourceEntry -> resourceEntry.getValue()));
+
+    Map<String, IdealState> newIdealStates = new HashMap<>();
+
+    // Init rebalancer with the rebalance preferences.
+    Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = 
cache.getClusterConfig()
+        .getGlobalRebalancePreference();
+    // TODO avoid creating the rebalancer on every rebalance call for 
performance enhancement
+    WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, 
preferences);
+    try {
+      newIdealStates.putAll(wagedRebalancer
+          .computeNewIdealStates(cache, wagedRebalancedResourceMap, 
currentStateOutput));
+    } catch (HelixRebalanceException ex) {
+      // Note that unlike the legacy rebalancer, the WAGED rebalance won't 
return partial result.
+      // Since it calculates for all the eligible resources globally, a 
partial result is invalid.
+      // TODO propagate the rebalancer failure information to 
updateRebalanceStatus for monitoring.
+      LogUtil.logError(logger, _eventId, String
+          .format("Failed to calculate the new Ideal States using the 
rebalancer %s due to %s",
+              wagedRebalancer.getClass().getSimpleName(), 
ex.getFailureType()), ex);
+    } finally {
+      wagedRebalancer.close();
+    }
+    Iterator<Resource> itr = wagedRebalancedResourceMap.values().iterator();
+    while (itr.hasNext()) {
+      Resource resource = itr.next();
+      IdealState is = newIdealStates.get(resource.getResourceName());
+      // Check if the WAGED rebalancer has calculated the result for this 
resource or not.
+      if (is != null && checkBestPossibleStateCalculation(is)) {
+        // The WAGED rebalancer calculates a valid result, record in the output
+        updateBestPossibleStateOutput(output, resource, is);
+      } else {
+        failureResources.add(resource.getResourceName());
+        LogUtil.logWarn(logger, _eventId, String
+            .format("Failed to calculate best possible states for %s.",
+                resource.getResourceName()));
+      }
+    }
+    return wagedRebalancedResourceMap;
+  }
+
   private void updateBestPossibleStateOutput(BestPossibleStateOutput output, 
Resource resource,
       IdealState computedIdealState) {
     output.setPreferenceLists(resource.getResourceName(), 
computedIdealState.getPreferenceLists());
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 24c7c8e..a11da29 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
@@ -215,7 +216,7 @@ public class ZkBucketDataAccessor implements 
BucketDataAccessor, AutoCloseable {
         ZNRecord metadataRecord =
             _znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
         if (metadataRecord == null) {
-          throw new HelixException(
+          throw new ZkNoNodeException(
               String.format("Metadata ZNRecord does not exist for path: %s", 
path));
         }
 
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java 
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 03338b4..b9284b9 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -30,9 +30,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.logging.Level;
-
 import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
+
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.BaseDataAccessor;
@@ -54,6 +54,7 @@ import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -347,6 +348,19 @@ public class ZkTestBase {
   protected IdealState createResourceWithDelayedRebalance(String clusterName, 
String db,
       String stateModel, int numPartition, int replica, int minActiveReplica, 
long delay,
       String rebalanceStrategy) {
+    return createResource(clusterName, db, stateModel, numPartition, replica, 
minActiveReplica,
+        delay, DelayedAutoRebalancer.class.getName(), rebalanceStrategy);
+  }
+
+  protected IdealState createResourceWithWagedRebalance(String clusterName, 
String db,
+      String stateModel, int numPartition, int replica, int minActiveReplica, 
long delay) {
+    return createResource(clusterName, db, stateModel, numPartition, replica, 
minActiveReplica,
+        delay, WagedRebalancer.class.getName(), null);
+  }
+
+  private IdealState createResource(String clusterName, String db, String 
stateModel,
+      int numPartition, int replica, int minActiveReplica, long delay, String 
rebalancerClassName,
+      String rebalanceStrategy) {
     IdealState idealState =
         
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
     if (idealState == null) {
@@ -362,7 +376,7 @@ public class ZkTestBase {
     if (delay > 0) {
       idealState.setRebalanceDelay(delay);
     }
-    idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+    idealState.setRebalancerClassName(rebalancerClassName);
     _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, 
db, idealState);
     _gSetupTool.rebalanceStorageCluster(clusterName, db, replica);
     idealState = 
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index 3371c8b..7d05416 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.helix.BucketDataAccessor;
 import org.apache.helix.model.ResourceAssignment;
+import org.mockito.Mockito;
 
 /**
  * A mock up metadata store for unit test.
@@ -32,8 +33,8 @@ public class MockAssignmentMetadataStore extends 
AssignmentMetadataStore {
   private Map<String, ResourceAssignment> _persistGlobalBaseline = new 
HashMap<>();
   private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new 
HashMap<>();
 
-  MockAssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String 
clusterName) {
-    super(bucketDataAccessor, clusterName);
+  MockAssignmentMetadataStore() {
+    super(Mockito.mock(BucketDataAccessor.class), "");
   }
 
   public Map<String, ResourceAssignment> getBaseline() {
@@ -53,6 +54,10 @@ public class MockAssignmentMetadataStore extends 
AssignmentMetadataStore {
     _persistBestPossibleAssignment = bestPossibleAssignment;
   }
 
+  public void close() {
+    // do nothing
+  }
+
   public void clearMetadataStore() {
     _persistBestPossibleAssignment.clear();
     _persistGlobalBaseline.clear();
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
index 922915f..ecd2af3 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.rebalancer.waged;
  */
 
 import java.util.Map;
+
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -28,6 +29,7 @@ import 
org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.ResourceAssignment;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -79,7 +81,15 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
     _manager.connect();
 
     // create AssignmentMetadataStore
-    _store = new AssignmentMetadataStore(_manager);
+    _store = new 
AssignmentMetadataStore(_manager.getMetadataStoreConnectionString(),
+        _manager.getClusterName());
+  }
+
+  @AfterClass
+  public void afterClass() {
+    if (_store != null) {
+      _store.close();
+    }
   }
 
   /**
@@ -91,11 +101,7 @@ public class TestAssignmentMetadataStore extends ZkTestBase 
{
    */
   @Test
   public void testReadEmptyBaseline() {
-    try {
-      Map<String, ResourceAssignment> baseline = _store.getBaseline();
-      Assert.fail("Should fail because there shouldn't be any data.");
-    } catch (Exception e) {
-      // OK
-    }
+    Map<String, ResourceAssignment> baseline = _store.getBaseline();
+    Assert.assertTrue(baseline.isEmpty());
   }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index d6fd99b..e7368be 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -26,7 +26,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.helix.BucketDataAccessor;
+
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -47,8 +47,9 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
 
 public class TestWagedRebalancer extends AbstractTestClusterModel {
   private Set<String> _instances;
@@ -63,9 +64,7 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
     _algorithm = new MockRebalanceAlgorithm();
 
     // Initialize a mock assignment metadata store
-    BucketDataAccessor mockAccessor = Mockito.mock(BucketDataAccessor.class);
-    String clusterName = ""; // an empty string for testing purposes
-    _metadataStore = new MockAssignmentMetadataStore(mockAccessor, 
clusterName);
+    _metadataStore = new MockAssignmentMetadataStore();
   }
 
   @Override
@@ -181,9 +180,9 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
         String resourceName = csEntry.getKey();
         CurrentState cs = csEntry.getValue();
         for (Map.Entry<String, String> partitionStateEntry : 
cs.getPartitionStateMap().entrySet()) {
-          currentStateOutput.setCurrentState(resourceName,
-              new Partition(partitionStateEntry.getKey()), instanceName,
-              partitionStateEntry.getValue());
+          currentStateOutput
+              .setCurrentState(resourceName, new 
Partition(partitionStateEntry.getKey()),
+                  instanceName, partitionStateEntry.getValue());
         }
       }
     }
@@ -216,7 +215,7 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
         "DROPPED");
   }
 
-  @Test(dependsOnMethods = "testRebalance")
+  @Test(dependsOnMethods = "testRebalance", expectedExceptions = 
HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input 
contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED 
rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT")
   public void testNonCompatibleConfiguration() throws IOException, 
HelixRebalanceException {
     _metadataStore.clearMetadataStore();
     WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, 
_algorithm);
@@ -233,12 +232,7 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
               .forEach(partition -> resource.addPartition(partition));
           return resource;
         }));
-    Map<String, IdealState> newIdealStates =
-        rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
-    Map<String, ResourceAssignment> algorithmResult = 
_algorithm.getRebalanceResult();
-    // The output shall not contains the nonCompatibleResource.
-    resourceMap.remove(nonCompatibleResourceName);
-    validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
   }
 
   // TODO test with invalid capacity configuration which will fail the cluster 
model constructing.
@@ -283,7 +277,7 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
       Assert.assertEquals(ex.getFailureType(),
           HelixRebalanceException.Type.INVALID_REBALANCER_STATUS);
       Assert.assertEquals(ex.getMessage(),
-          "Failed to get the persisted assignment records. Failure Type: 
INVALID_REBALANCER_STATUS");
+          "Failed to get the current baseline assignment because of unexpected 
error. Failure Type: INVALID_REBALANCER_STATUS");
     }
   }
 
@@ -425,8 +419,9 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
       Assert.assertTrue(newIdealStates.containsKey(resourceName));
       IdealState is = newIdealStates.get(resourceName);
       ResourceAssignment assignment = expectedResult.get(resourceName);
-      Assert.assertEquals(is.getPartitionSet(), new 
HashSet<>(assignment.getMappedPartitions()
-          .stream().map(partition -> 
partition.getPartitionName()).collect(Collectors.toSet())));
+      Assert.assertEquals(is.getPartitionSet(), new HashSet<>(
+          assignment.getMappedPartitions().stream().map(partition -> 
partition.getPartitionName())
+              .collect(Collectors.toSet())));
       for (String partitionName : is.getPartitionSet()) {
         Assert.assertEquals(is.getInstanceStateMap(partitionName),
             assignment.getReplicaMap(new Partition(partitionName)));
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
index 2a39482..759c685 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
@@ -72,7 +72,7 @@ public class MockRebalanceAlgorithm implements 
RebalanceAlgorithm {
 
     _resultHistory = result;
 
-    // TODO remove this mockito when 
OptimalAssignment.getOptimalResourceAssignment is ready.
+    // Mock the return value for supporting test.
     OptimalAssignment optimalAssignment = 
Mockito.mock(OptimalAssignment.class);
     when(optimalAssignment.getOptimalResourceAssignment()).thenReturn(result);
     return optimalAssignment;
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 0f799b3..91db076 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -103,8 +103,8 @@ public abstract class AbstractTestClusterModel {
     ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
     testClusterConfig.setMaxPartitionsPerInstance(5);
     testClusterConfig.setDisabledInstances(Collections.emptyMap());
-    testClusterConfig.setTopologyAwareEnabled(false);
     testClusterConfig.setInstanceCapacityKeys(new 
ArrayList<>(_capacityDataMap.keySet()));
+    testClusterConfig.setTopologyAwareEnabled(true);
     when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
 
     // 3. Mock the live instance node for the default instance.
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java
new file mode 100644
index 0000000..bd820a9
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java
@@ -0,0 +1,91 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestOptimalAssignment extends ClusterModelTestHelper {
+
+  @BeforeClass
+  public void initialize() {
+    super.initialize();
+  }
+
+  @Test
+  public void testUpdateAssignment() throws IOException {
+    OptimalAssignment assignment = new OptimalAssignment();
+
+    // update with empty cluster model
+    assignment.updateAssignments(getDefaultClusterModel());
+    Map<String, ResourceAssignment> optimalAssignmentMap =
+        assignment.getOptimalResourceAssignment();
+    Assert.assertEquals(optimalAssignmentMap, Collections.emptyMap());
+
+    // update with valid assignment
+    ClusterModel model = getDefaultClusterModel();
+    model.assign(_resourceNames.get(0), _partitionNames.get(1), "SLAVE", 
_testInstanceId);
+    model.assign(_resourceNames.get(0), _partitionNames.get(0), "MASTER", 
_testInstanceId);
+    assignment.updateAssignments(model);
+    optimalAssignmentMap = assignment.getOptimalResourceAssignment();
+    
Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0)).getMappedPartitions(),
+        Arrays
+            .asList(new Partition(_partitionNames.get(0)), new 
Partition(_partitionNames.get(1))));
+    Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0))
+            .getReplicaMap(new Partition(_partitionNames.get(1))),
+        Collections.singletonMap(_testInstanceId, "SLAVE"));
+    Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0))
+            .getReplicaMap(new Partition(_partitionNames.get(0))),
+        Collections.singletonMap(_testInstanceId, "MASTER"));
+  }
+
+  @Test(dependsOnMethods = "testUpdateAssignment")
+  public void TestAssignmentFailure() throws IOException {
+    OptimalAssignment assignment = new OptimalAssignment();
+    ClusterModel model = getDefaultClusterModel();
+
+    // record failure
+    AssignableReplica targetFailureReplica =
+        
model.getAssignableReplicaMap().get(_resourceNames.get(0)).iterator().next();
+    AssignableNode targetFailureNode = 
model.getAssignableNodes().get(_testInstanceId);
+    assignment.recordAssignmentFailure(targetFailureReplica, Collections
+        .singletonMap(targetFailureNode, Collections.singletonList("Assignment 
Failure!")));
+
+    Assert.assertTrue(assignment.hasAnyFailure());
+
+    assignment.updateAssignments(getDefaultClusterModel());
+    try {
+      assignment.getOptimalResourceAssignment();
+      Assert.fail("Get optimal assignment shall fail because of the failure 
record.");
+    } catch (HelixException ex) {
+      Assert.assertTrue(ex.getMessage().startsWith(
+          "Cannot get the optimal resource assignment since a calculation 
failure is recorded."));
+    }
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
new file mode 100644
index 0000000..fb5375c
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -0,0 +1,477 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestWagedRebalance extends ZkTestBase {
+  protected final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int PARTITIONS = 20;
+  protected static final int TAGS = 2;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  Map<String, String> _nodeToTagMap = new HashMap<>();
+  List<String> _nodes = new ArrayList<>();
+  private Set<String> _allDBs = new HashSet<>();
+  private int _replica = 3;
+
+  private static String[] _testModels = {
+      BuiltInStateModelDefinitions.OnlineOffline.name(),
+      BuiltInStateModelDefinitions.MasterSlave.name(),
+      BuiltInStateModelDefinitions.LeaderStandby.name()
+  };
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      addInstanceConfig(storageNodeName, i, TAGS);
+    }
+
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, 
CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  protected void addInstanceConfig(String storageNodeName, int seqNo, int 
tagCount) {
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    String tag = "tag-" + seqNo % tagCount;
+    _gSetupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, 
storageNodeName, tag);
+    _nodeToTagMap.put(storageNodeName, tag);
+    _nodes.add(storageNodeName);
+  }
+
+  @Test
+  public void test() throws Exception {
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica, _replica,
+          -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    validate(_replica);
+
+    // Adding 3 more resources
+    i = 0;
+    for (String stateModel : _testModels) {
+      String moreDB = "More-Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, moreDB, stateModel, 
PARTITIONS, _replica,
+          _replica, -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, moreDB, _replica);
+      _allDBs.add(moreDB);
+
+      Thread.sleep(300);
+
+      validate(_replica);
+    }
+
+    // Drop the 3 additional resources
+    for (int j = 0; j < 3; j++) {
+      String moreDB = "More-Test-DB-" + j++;
+      _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, moreDB);
+      _allDBs.remove(moreDB);
+
+      Thread.sleep(300);
+
+      validate(_replica);
+    }
+  }
+
+  @Test(dependsOnMethods = "test")
+  public void testWithInstanceTag() throws Exception {
+    Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
+    int i = 3;
+    for (String tag : tags) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db,
+          BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, 
_replica, _replica, -1);
+      IdealState is =
+          
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      is.setInstanceGroupTag(tag);
+      
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, 
is);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = "test")
+  public void testChangeIdealState() throws InterruptedException {
+    String dbName = "Test-DB";
+    createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
+        BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, 
_replica, -1);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
+    _allDBs.add(dbName);
+    Thread.sleep(300);
+
+    validate(_replica);
+
+    // Adjust the replica count
+    IdealState is =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
dbName);
+    int newReplicaFactor = _replica - 1;
+    is.setReplicas("" + newReplicaFactor);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
dbName, is);
+    Thread.sleep(300);
+
+    validate(newReplicaFactor);
+
+    // Adjust the partition list
+    is = 
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
dbName);
+    is.setNumPartitions(PARTITIONS + 1);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
dbName, is);
+    _gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, dbName, 
newReplicaFactor);
+    Thread.sleep(300);
+
+    validate(newReplicaFactor);
+    ExternalView ev =
+        
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
dbName);
+    Assert.assertEquals(ev.getPartitionSet().size(), PARTITIONS + 1);
+  }
+
+  @Test(dependsOnMethods = "test")
+  public void testDisableInstance() throws InterruptedException {
+    String dbName = "Test-DB";
+    createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
+        BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica, 
_replica, -1);
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
+    _allDBs.add(dbName);
+    Thread.sleep(300);
+
+    validate(_replica);
+
+    // Disable participants, keep only three left
+    Set<String> disableParticipants = new HashSet<>();
+
+    try {
+      for (int i = 3; i < _participants.size(); i++) {
+        MockParticipantManager p = _participants.get(i);
+        disableParticipants.add(p.getInstanceName());
+        InstanceConfig config = _gSetupTool.getClusterManagementTool()
+            .getInstanceConfig(CLUSTER_NAME, p.getInstanceName());
+        config.setInstanceEnabled(false);
+        _gSetupTool.getClusterManagementTool()
+            .setInstanceConfig(CLUSTER_NAME, p.getInstanceName(), config);
+      }
+      Thread.sleep(300);
+
+      validate(_replica);
+
+      // Verify there is no assignment on the disabled participants.
+      ExternalView ev =
+          
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
dbName);
+      for (String partition : ev.getPartitionSet()) {
+        Map<String, String> replicaStateMap = ev.getStateMap(partition);
+        for (String instance : replicaStateMap.keySet()) {
+          Assert.assertFalse(disableParticipants.contains(instance));
+        }
+      }
+    } finally {
+      // recover the config
+      for (String instanceName : disableParticipants) {
+        InstanceConfig config =
+            
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
instanceName);
+        config.setInstanceEnabled(true);
+        _gSetupTool.getClusterManagementTool()
+            .setInstanceConfig(CLUSTER_NAME, instanceName, config);
+      }
+    }
+  }
+
+  @Test(dependsOnMethods = "testDisableInstance")
+  public void testLackEnoughLiveInstances() throws Exception {
+    // shutdown participants, keep only two left
+    for (int i = 2; i < _participants.size(); i++) {
+      _participants.get(i).syncStop();
+    }
+
+    int j = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + j++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica, _replica,
+          -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+
+    Thread.sleep(300);
+    // Verify if the partitions get assigned
+    validate(2);
+
+    // restart the participants within the zone
+    for (int i = 2; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      MockParticipantManager newNode =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, 
p.getInstanceName());
+      _participants.set(i, newNode);
+      newNode.syncStart();
+    }
+
+    Thread.sleep(300);
+    // Verify if the partitions get assigned
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = "testDisableInstance")
+  public void testLackEnoughInstances() throws Exception {
+    // shutdown participants, keep only two left
+    for (int i = 2; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      p.syncStop();
+      _gSetupTool.getClusterManagementTool()
+          .enableInstance(CLUSTER_NAME, p.getInstanceName(), false);
+      _gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName());
+
+    }
+
+    int j = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + j++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica, _replica,
+          -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+
+    Thread.sleep(300);
+    // Verify if the partitions get assigned
+    validate(2);
+
+    // Create new participants within the zone
+    for (int i = 2; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      String replaceNodeName = p.getInstanceName() + "-replacement_" + 
START_PORT;
+      addInstanceConfig(replaceNodeName, i, TAGS);
+      MockParticipantManager newNode =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, replaceNodeName);
+      _participants.set(i, newNode);
+      newNode.syncStart();
+    }
+
+    Thread.sleep(300);
+    // Verify if the partitions get assigned
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = "test")
+  public void testMixedRebalancerUsage() throws InterruptedException {
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + i++;
+      if (i == 0) {
+        _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, 
stateModel,
+            IdealState.RebalanceMode.FULL_AUTO + "", 
CrushRebalanceStrategy.class.getName());
+      } else if (i == 1) {
+        _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS, 
stateModel,
+            IdealState.RebalanceMode.FULL_AUTO + "", 
CrushEdRebalanceStrategy.class.getName());
+      } else {
+        createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
+            _replica, -1);
+      }
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = "test")
+  public void testMaxPartitionLimitation() throws Exception {
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(CLUSTER_NAME);
+    // Change the cluster level config so no assignment can be done
+    clusterConfig.setMaxPartitionsPerInstance(1);
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    try {
+      String limitedResourceName = null;
+      int i = 0;
+      for (String stateModel : _testModels) {
+        String db = "Test-DB-" + i++;
+        createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
+            _replica, -1);
+        if (i == 1) {
+          // The limited resource has additional limitation, so even the other 
resources can be assigned
+          // later, this resource will still be blocked by the max partition 
limitation.
+          limitedResourceName = db;
+          IdealState idealState =
+              
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+          idealState.setMaxPartitionsPerInstance(1);
+          
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, 
idealState);
+        }
+        _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+        _allDBs.add(db);
+      }
+      Thread.sleep(300);
+
+      // Since the WAGED rebalancer does not do partial rebalance, the initial 
assignment won't show.
+      Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db 
-> {
+        ExternalView ev =
+            
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+        return ev != null && !ev.getPartitionSet().isEmpty();
+      }), 2000));
+
+      // Remove the cluster level limitation
+      clusterConfig.setMaxPartitionsPerInstance(-1);
+      configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+      Thread.sleep(300);
+
+      // wait until any of the resources is rebalanced
+      TestHelper.verify(() -> {
+        for (String db : _allDBs) {
+          ExternalView ev =
+              
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+          if (ev != null && !ev.getPartitionSet().isEmpty()) {
+            return true;
+          }
+        }
+        return false;
+      }, 3000);
+      ExternalView ev = _gSetupTool.getClusterManagementTool()
+          .getResourceExternalView(CLUSTER_NAME, limitedResourceName);
+      Assert.assertFalse(ev != null && !ev.getPartitionSet().isEmpty());
+
+      // Remove the resource level limitation
+      IdealState idealState = _gSetupTool.getClusterManagementTool()
+          .getResourceIdealState(CLUSTER_NAME, limitedResourceName);
+      idealState.setMaxPartitionsPerInstance(Integer.MAX_VALUE);
+      _gSetupTool.getClusterManagementTool()
+          .setResourceIdealState(CLUSTER_NAME, limitedResourceName, 
idealState);
+
+      validate(_replica);
+    } finally {
+      // recover the config change
+      clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+      clusterConfig.setMaxPartitionsPerInstance(-1);
+      configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    }
+  }
+
+  private void validate(int expectedReplica) {
+    HelixClusterVerifier _clusterVerifier =
+        new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify(5000));
+    for (String db : _allDBs) {
+      IdealState is =
+          
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+      validateIsolation(is, ev, expectedReplica);
+    }
+  }
+
+  /**
+   * Validate each partition is different instances and with necessary tagged 
instances.
+   */
+  private void validateIsolation(IdealState is, ExternalView ev, int 
expectedReplica) {
+    String tag = is.getInstanceGroupTag();
+    for (String partition : is.getPartitionSet()) {
+      Map<String, String> assignmentMap = 
ev.getRecord().getMapField(partition);
+      Set<String> instancesInEV = assignmentMap.keySet();
+      Assert.assertEquals(instancesInEV.size(), expectedReplica);
+      for (String instance : instancesInEV) {
+        if (tag != null) {
+          InstanceConfig config =
+              
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
instance);
+          Assert.assertTrue(config.containsTag(tag));
+        }
+      }
+    }
+  }
+
+  @AfterMethod
+  public void afterMethod() throws Exception {
+    for (String db : _allDBs) {
+      _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+    }
+    _allDBs.clear();
+    // waiting for all DB be dropped.
+    Thread.sleep(100);
+    ZkHelixClusterVerifier _clusterVerifier =
+        new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    if (_controller != null && _controller.isConnected()) {
+      _controller.syncStop();
+    }
+    for (MockParticipantManager p : _participants) {
+      if (p != null && p.isConnected()) {
+        p.syncStop();
+      }
+    }
+    deleteCluster(CLUSTER_NAME);
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
new file mode 100644
index 0000000..0b020db
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
@@ -0,0 +1,374 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestWagedRebalanceFaultZone extends ZkTestBase {
+  protected final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int PARTITIONS = 20;
+  protected static final int ZONES = 3;
+  protected static final int TAGS = 2;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  Map<String, String> _nodeToZoneMap = new HashMap<>();
+  Map<String, String> _nodeToTagMap = new HashMap<>();
+  List<String> _nodes = new ArrayList<>();
+  Set<String> _allDBs = new HashSet<>();
+  int _replica = 3;
+
+  String[] _testModels = {
+      BuiltInStateModelDefinitions.OnlineOffline.name(),
+      BuiltInStateModelDefinitions.MasterSlave.name(),
+      BuiltInStateModelDefinitions.LeaderStandby.name()
+  };
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      addInstanceConfig(storageNodeName, i, ZONES, TAGS);
+    }
+
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, 
CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  protected void addInstanceConfig(String storageNodeName, int seqNo, int 
zoneCount, int tagCount) {
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    String zone = "zone-" + seqNo % zoneCount;
+    String tag = "tag-" + seqNo % tagCount;
+    _gSetupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME, 
storageNodeName, zone);
+    _gSetupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, 
storageNodeName, tag);
+    _nodeToZoneMap.put(storageNodeName, zone);
+    _nodeToTagMap.put(storageNodeName, tag);
+    _nodes.add(storageNodeName);
+  }
+
+  @Test
+  public void testZoneIsolation() throws Exception {
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
+          _replica, -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    validate(_replica);
+  }
+
+  @Test
+  public void testZoneIsolationWithInstanceTag() throws Exception {
+    Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
+    int i = 0;
+    for (String tag : tags) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db,
+          BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, 
_replica, _replica, -1);
+      IdealState is =
+          
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      is.setInstanceGroupTag(tag);
+      
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, 
is);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = { "testZoneIsolation", 
"testZoneIsolationWithInstanceTag" })
+  public void testLackEnoughLiveRacks() throws Exception {
+    // shutdown participants within one zone
+    String zone = _nodeToZoneMap.values().iterator().next();
+    for (int i = 0; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) {
+        p.syncStop();
+      }
+    }
+
+    int j = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + j++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
+          _replica, -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+    validate(2);
+
+    // restart the participants within the zone
+    for (int i = 0; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) {
+        MockParticipantManager newNode =
+            new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, 
p.getInstanceName());
+        _participants.set(i, newNode);
+        newNode.syncStart();
+      }
+    }
+
+    Thread.sleep(300);
+    // Verify if the partitions get assigned
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = { "testLackEnoughLiveRacks" })
+  public void testLackEnoughRacks() throws Exception {
+    // shutdown participants within one zone
+    String zone = _nodeToZoneMap.values().iterator().next();
+    for (int i = 0; i < _participants.size(); i++) {
+      MockParticipantManager p = _participants.get(i);
+      if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) {
+        p.syncStop();
+        _gSetupTool.getClusterManagementTool()
+            .enableInstance(CLUSTER_NAME, p.getInstanceName(), false);
+        Thread.sleep(50);
+        _gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName());
+      }
+    }
+
+    int j = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + j++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
+          _replica, -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+    validate(2);
+
+    // Create new participants within the zone
+    int nodeCount = _participants.size();
+    for (int i = 0; i < nodeCount; i++) {
+      MockParticipantManager p = _participants.get(i);
+      if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) {
+        String replaceNodeName = p.getInstanceName() + "-replacement_" + 
START_PORT;
+        addInstanceConfig(replaceNodeName, i, ZONES, TAGS);
+        MockParticipantManager newNode =
+            new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, replaceNodeName);
+        _participants.set(i, newNode);
+        newNode.syncStart();
+      }
+    }
+
+    Thread.sleep(300);
+    // Verify if the partitions get assigned
+    validate(_replica);
+  }
+
+  @Test(dependsOnMethods = { "testZoneIsolation", 
"testZoneIsolationWithInstanceTag" })
+  public void testAddZone() throws Exception {
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, 
PARTITIONS, _replica,
+          _replica, -1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(300);
+
+    validate(_replica);
+
+    // Create new participants within the a new zone
+    Set<MockParticipantManager> newNodes = new HashSet<>();
+    Map<String, Integer> newNodeReplicaCount = new HashMap<>();
+
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+
+    try {
+      // Configure the preference so as to allow movements.
+      ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(CLUSTER_NAME);
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = 
new HashMap<>();
+      preference.put(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 10);
+      preference.put(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 
0);
+      clusterConfig.setGlobalRebalancePreference(preference);
+      configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+      int nodeCount = 2;
+      for (int j = 0; j < nodeCount; j++) {
+        String newNodeName = "new-zone-node-" + j + "_" + START_PORT;
+        // Add all new node to the new zone
+        addInstanceConfig(newNodeName, j, ZONES + 1, TAGS);
+        MockParticipantManager newNode =
+            new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newNodeName);
+        newNode.syncStart();
+        newNodes.add(newNode);
+        newNodeReplicaCount.put(newNodeName, 0);
+      }
+      Thread.sleep(300);
+
+      validate(_replica);
+
+      // The new zone nodes shall have some assignments
+      for (String db : _allDBs) {
+        IdealState is =
+            
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+        ExternalView ev =
+            
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+        validateZoneAndTagIsolation(is, ev, _replica);
+        for (String partition : ev.getPartitionSet()) {
+          Map<String, String> stateMap = ev.getStateMap(partition);
+          for (String node : stateMap.keySet()) {
+            if (newNodeReplicaCount.containsKey(node)) {
+              newNodeReplicaCount.computeIfPresent(node, (nodeName, 
replicaCount) -> replicaCount + 1);
+            }
+          }
+        }
+      }
+      Assert.assertTrue(newNodeReplicaCount.values().stream().allMatch(count 
-> count > 0));
+    } finally {
+      // Revert the preference
+      ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(CLUSTER_NAME);
+      Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference = 
new HashMap<>();
+      preference.put(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1);
+      preference.put(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 
1);
+      clusterConfig.setGlobalRebalancePreference(preference);
+      configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+      // Stop the new nodes
+      for (MockParticipantManager p : newNodes) {
+        if (p != null && p.isConnected()) {
+          p.syncStop();
+        }
+      }
+    }
+  }
+
+  private void validate(int expectedReplica) {
+    ZkHelixClusterVerifier _clusterVerifier =
+        new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+    for (String db : _allDBs) {
+      IdealState is =
+          
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      ExternalView ev =
+          
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, 
db);
+      validateZoneAndTagIsolation(is, ev, expectedReplica);
+    }
+  }
+
+  /**
+   * Validate instances for each partition is on different zone and with 
necessary tagged instances.
+   */
+  private void validateZoneAndTagIsolation(IdealState is, ExternalView ev, int 
expectedReplica) {
+    String tag = is.getInstanceGroupTag();
+    for (String partition : is.getPartitionSet()) {
+      Set<String> assignedZones = new HashSet<String>();
+
+      Map<String, String> assignmentMap = 
ev.getRecord().getMapField(partition);
+      Set<String> instancesInEV = assignmentMap.keySet();
+      // TODO: preference List is not persisted in IS.
+      // Assert.assertEquals(instancesInEV, instancesInIs);
+      for (String instance : instancesInEV) {
+        assignedZones.add(_nodeToZoneMap.get(instance));
+        if (tag != null) {
+          InstanceConfig config =
+              
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
instance);
+          Assert.assertTrue(config.containsTag(tag));
+        }
+      }
+      Assert.assertEquals(assignedZones.size(), expectedReplica);
+    }
+  }
+
+  @AfterMethod
+  public void afterMethod() throws Exception {
+    for (String db : _allDBs) {
+      _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+    }
+    _allDBs.clear();
+    // waiting for all DB be dropped.
+    Thread.sleep(100);
+    ZkHelixClusterVerifier _clusterVerifier =
+        new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+            .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    /*
+     * shutdown order: 1) disconnect the controller 2) disconnect participants
+     */
+    if (_controller != null && _controller.isConnected()) {
+      _controller.syncStop();
+    }
+    for (MockParticipantManager p : _participants) {
+      if (p != null && p.isConnected()) {
+        p.syncStop();
+      }
+    }
+    deleteCluster(CLUSTER_NAME);
+    System.out.println("END " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java
new file mode 100644
index 0000000..412fc8c
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java
@@ -0,0 +1,114 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestWagedRebalanceTopologyAware extends 
TestWagedRebalanceFaultZone {
+  private static final String TOLOPOGY_DEF = "/DOMAIN/ZONE/INSTANCE";
+  private static final String DOMAIN_NAME = "Domain";
+  private static final String FAULT_ZONE = "ZONE";
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println("START " + CLASS_NAME + " at " + new 
Date(System.currentTimeMillis()));
+
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setTopology(TOLOPOGY_DEF);
+    clusterConfig.setFaultZoneType(FAULT_ZONE);
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      addInstanceConfig(storageNodeName, i, ZONES, TAGS);
+    }
+
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, 
CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  protected void addInstanceConfig(String storageNodeName, int seqNo, int 
zoneCount, int tagCount) {
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    String zone = "zone-" + seqNo % zoneCount;
+    String tag = "tag-" + seqNo % tagCount;
+
+    InstanceConfig config =
+        _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, 
storageNodeName);
+    config.setDomain(
+        String.format("DOMAIN=%s,ZONE=%s,INSTANCE=%s", DOMAIN_NAME, zone, 
storageNodeName));
+    config.addTag(tag);
+    _gSetupTool.getClusterManagementTool().setInstanceConfig(CLUSTER_NAME, 
storageNodeName, config);
+
+    _nodeToZoneMap.put(storageNodeName, zone);
+    _nodeToTagMap.put(storageNodeName, tag);
+    _nodes.add(storageNodeName);
+  }
+
+  @Test
+  public void testZoneIsolation() throws Exception {
+    super.testZoneIsolation();
+  }
+
+  @Test
+  public void testZoneIsolationWithInstanceTag() throws Exception {
+    super.testZoneIsolationWithInstanceTag();
+  }
+
+  @Test(dependsOnMethods = { "testZoneIsolation", 
"testZoneIsolationWithInstanceTag" })
+  public void testLackEnoughLiveRacks() throws Exception {
+    super.testLackEnoughLiveRacks();
+  }
+
+  @Test(dependsOnMethods = { "testLackEnoughLiveRacks" })
+  public void testLackEnoughRacks() throws Exception {
+    super.testLackEnoughRacks();
+  }
+
+  @Test(dependsOnMethods = { "testZoneIsolation", 
"testZoneIsolationWithInstanceTag" })
+  public void testAddZone() throws Exception {
+    super.testAddZone();
+  }
+}

Reply via email to