Implement of CRUSH-ed algorithm.

The algorithm is based on CRUSH. The new implementation trade-off between 
uniform distribution and partition movements during major cluster changes.
Please refer to 
https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/CRUSH-ed+for+even+distribution


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9dfb098e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9dfb098e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9dfb098e

Branch: refs/heads/master
Commit: 9dfb098ed6083609f6bbd17cd202b65ce26e8c7c
Parents: fde1a6a
Author: Jiajun Wang <[email protected]>
Authored: Mon Nov 20 14:06:49 2017 -0800
Committer: Junkai Xue <[email protected]>
Committed: Wed Jan 24 18:33:24 2018 -0800

----------------------------------------------------------------------
 ...stractEvenDistributionRebalanceStrategy.java | 208 +++++
 .../strategy/CrushEdRebalanceStrategy.java      |  32 +
 .../CardDealingAdjustmentAlgorithm.java         | 212 ++++++
 .../ConsistentHashingAdjustmentAlgorithm.java   | 136 ++++
 .../java/org/apache/helix/DistributionTest.java | 753 +++++++++++++++++++
 .../TestCrushAutoRebalance.java                 |   7 +-
 .../TestCrushAutoRebalanceNonRack.java          |  95 ++-
 7 files changed, 1400 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
new file mode 100644
index 0000000..9012f73
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java
@@ -0,0 +1,208 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import 
org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithm;
+import 
org.apache.helix.controller.rebalancer.strategy.crushMapping.ConsistentHashingAdjustmentAlgorithm;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Abstract class of Forced Even Assignment Patched Algorithm.
+ * This class contains common logic that re-calculate assignment based on a 
result calculated by the base algorithm.
+ * The target of this patching step is more even partition distribution, but 
number of partitions to be reshuffled during node outage could be higher than 
the base algorithm.
+ */
+public abstract class AbstractEvenDistributionRebalanceStrategy implements 
RebalanceStrategy {
+  private static final Logger _logger =
+      LoggerFactory.getLogger(AbstractEvenDistributionRebalanceStrategy.class);
+  private String _resourceName;
+  private int _replica;
+
+  protected abstract RebalanceStrategy getBaseRebalanceStrategy();
+
+  @Override
+  public void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    _resourceName = resourceName;
+    getBaseRebalanceStrategy().init(resourceName, partitions, states, 
maximumPerNode);
+    _replica = countStateReplicas(states);
+  }
+
+  /**
+   * Force uniform distribution based on the parent strategy class's 
calculation result.
+   *
+   * @param allNodes       All instances
+   * @param liveNodes      List of live instances
+   * @param currentMapping current replica mapping
+   * @param clusterData    cluster data
+   * @return
+   * @throws HelixException
+   */
+  @Override
+  public ZNRecord computePartitionAssignment(final List<String> allNodes,
+      final List<String> liveNodes, final Map<String, Map<String, String>> 
currentMapping,
+      ClusterDataCache clusterData) throws HelixException {
+    boolean continueNextStep = true;
+    // Round 1: Calculate mapping using the base strategy.
+    // Note to use all nodes for minimizing the influence of live node changes 
to mapping.
+    ZNRecord origAssignment = getBaseRebalanceStrategy()
+        .computePartitionAssignment(allNodes, allNodes, currentMapping, 
clusterData);
+    Map<String, List<String>> origPartitionMap = 
origAssignment.getListFields();
+    // If the original calculation contains no assignment, skip patching
+    if (origPartitionMap.isEmpty()) {
+      continueNextStep = false;
+    }
+
+    // Transform current assignment to instance->partitions map, and get total 
partitions
+    Map<String, List<String>> nodeToPartitionMap = 
convertMap(origPartitionMap);
+
+    if (continueNextStep) {
+      // Round 2: Rebalance mapping using card dealing algorithm. For ensuring 
evenness distribution.
+      Topology allNodeTopo = new Topology(allNodes, allNodes, 
clusterData.getInstanceConfigMap(),
+          clusterData.getClusterConfig());
+      CardDealingAdjustmentAlgorithm cardDealer =
+          new CardDealingAdjustmentAlgorithm(allNodeTopo, _replica);
+      continueNextStep = cardDealer.computeMapping(nodeToPartitionMap, 
_resourceName.hashCode());
+    }
+
+    // Round 3: Reorder preference Lists to ensure participants' orders (so as 
the states) are uniform.
+    Map<String, List<String>> partitionMap = 
shufflePreferenceList(nodeToPartitionMap);
+
+    // Round 4: Re-mapping the partitions on non-live nodes using consistent 
hashing for reducing movement.
+    if (continueNextStep && !liveNodes.containsAll(allNodes)) {
+      Topology liveNodeTopo = new Topology(allNodes, liveNodes, 
clusterData.getInstanceConfigMap(),
+          clusterData.getClusterConfig());
+      ConsistentHashingAdjustmentAlgorithm hashPlacement =
+          new ConsistentHashingAdjustmentAlgorithm(liveNodeTopo);
+      if (hashPlacement.computeMapping(nodeToPartitionMap, 
_resourceName.hashCode())) {
+        // Since mapping is changed by hashPlacement, need to adjust nodes 
order.
+        Map<String, List<String>> adjustedPartitionMap = 
convertMap(nodeToPartitionMap);
+        for (String partition : adjustedPartitionMap.keySet()) {
+          List<String> preSelectedList = partitionMap.get(partition);
+          Set<String> adjustedNodeList = new 
HashSet<>(adjustedPartitionMap.get(partition));
+          List<String> finalNodeList = adjustedPartitionMap.get(partition);
+          int index = 0;
+          // 1. Add the ones in pre-selected node list first, in order
+          for (String node : preSelectedList) {
+            if (adjustedNodeList.remove(node)) {
+              finalNodeList.set(index++, node);
+            }
+          }
+          // 2. Add the rest of nodes to the map
+          for (String node : adjustedNodeList) {
+            finalNodeList.set(index++, node);
+          }
+        }
+        partitionMap = adjustedPartitionMap;
+      } else {
+        continueNextStep = false;
+      }
+    }
+
+    if (continueNextStep) {
+      ZNRecord result = new ZNRecord(_resourceName);
+      result.setListFields(partitionMap);
+      return result;
+    } else {
+      if (_logger.isDebugEnabled()) {
+        _logger.debug("Force even distribution is not possible, using the 
default strategy: "
+            + getBaseRebalanceStrategy().getClass().getSimpleName());
+      }
+      // Force even is not possible, fallback to use default strategy
+      if (liveNodes.equals(allNodes)) {
+        return origAssignment;
+      } else {
+        // need to re-calculate since node list is different.
+        return getBaseRebalanceStrategy()
+            .computePartitionAssignment(allNodes, liveNodes, currentMapping, 
clusterData);
+      }
+    }
+  }
+
+  // Best effort to shuffle preference lists for all partitions for uniform 
distribution regarding the top state.
+  private Map<String, List<String>> shufflePreferenceList(
+      Map<String, List<String>> nodeToPartitionMap) {
+    final Map<String, List<String>> partitionMap = 
convertMap(nodeToPartitionMap);
+    // evaluate node's order according to:
+    // 1. their potential top state replicas count (less count, higher 
priority)
+    // 2. their assigned top state replicas (less top state replica, higher 
priority)
+    final Map<String, Integer> nodeScores = new HashMap<>();
+    for (String node : nodeToPartitionMap.keySet()) {
+      // Init with the potential replicas count
+      nodeScores.put(node, nodeToPartitionMap.get(node).size());
+    }
+    for (final String partition : partitionMap.keySet()) {
+      List<String> nodes = partitionMap.get(partition);
+      // order according to score
+      Collections.sort(nodes, new Comparator<String>() {
+        @Override
+        public int compare(String o1, String o2) {
+          int o1Score = nodeScores.get(o1);
+          int o2Score = nodeScores.get(o2);
+          if (o1Score == o2Score) {
+            return new Integer((partition + 
o1).hashCode()).compareTo((partition + o2).hashCode());
+          } else {
+            return o1Score - o2Score;
+          }
+        }
+      });
+      // After assignment, the nodes has less potential top states
+      for (int i = 0; i < nodes.size(); i++) {
+        String nodeName = nodes.get(i);
+        nodeScores.put(nodeName,
+            nodeScores.get(nodeName) - 1 + (i == 0 ? (int) Math.pow(_replica, 
2) : 0));
+      }
+    }
+    return partitionMap;
+  }
+
+  // Convert the map from <key, list of values> to a new map <original value, 
list of related keys>
+  private Map<String, List<String>> convertMap(Map<String, List<String>> 
originalMap) {
+    Map<String, List<String>> resultMap = new HashMap<>();
+    for (String originalKey : originalMap.keySet()) {
+      for (String originalValue : originalMap.get(originalKey)) {
+        if (!resultMap.containsKey(originalValue)) {
+          resultMap.put(originalValue, new ArrayList<String>());
+        }
+        resultMap.get(originalValue).add(originalKey);
+      }
+    }
+    return resultMap;
+  }
+
+  /**
+   * Counts the total number of replicas given a state-count mapping
+   *
+   * @return
+   */
+  private int countStateReplicas(Map<String, Integer> stateCountMap) {
+    int total = 0;
+    for (Integer count : stateCountMap.values()) {
+      total += count;
+    }
+    return total;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushEdRebalanceStrategy.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushEdRebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushEdRebalanceStrategy.java
new file mode 100644
index 0000000..a7d0c4f
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushEdRebalanceStrategy.java
@@ -0,0 +1,32 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * CRUSH-ed, CRUSH with even distribution. This is an Auto rebalance strategy 
based on CRUSH algorithm.
+ * This gives even partition distribution, but number of partitions to be 
reshuffled during node outage could be high.
+ */
+public class CrushEdRebalanceStrategy extends 
AbstractEvenDistributionRebalanceStrategy {
+  private final RebalanceStrategy _baseStrategy = new CrushRebalanceStrategy();
+
+  protected RebalanceStrategy getBaseRebalanceStrategy() {
+    return _baseStrategy;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java
new file mode 100644
index 0000000..4470094
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java
@@ -0,0 +1,212 @@
+package org.apache.helix.controller.rebalancer.strategy.crushMapping;
+
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+
+import java.util.*;
+
+public class CardDealingAdjustmentAlgorithm {
+  private static int MAX_ADJUSTMENT = 2;
+
+  private int _replica;
+  // Instance -> FaultZone Tag
+  private Map<String, String> _instanceFaultZone = new HashMap<>();
+  private Map<String, Long> _instanceWeight = new HashMap<>();
+  private long _totalWeight = 0;
+  private Map<String, Long> _faultZoneWeight = new HashMap<>();
+  // Record existing partitions that are assigned to a fault zone
+  private Map<String, Set<String>> _faultZonePartitionMap = new HashMap<>();
+
+  public CardDealingAdjustmentAlgorithm(Topology topology, int replica) {
+    _replica = replica;
+    // Get all instance related information.
+    for (Node zone : topology.getFaultZones()) {
+      _faultZoneWeight.put(zone.getName(), zone.getWeight());
+      if (!_faultZonePartitionMap.containsKey(zone.getName())) {
+        _faultZonePartitionMap.put(zone.getName(), new HashSet<String>());
+      }
+      for (Node instance : Topology.getAllLeafNodes(zone)) {
+        if (!instance.isFailed()) {
+          _instanceWeight.put(instance.getName(), instance.getWeight());
+          _totalWeight += instance.getWeight();
+          _instanceFaultZone.put(instance.getName(), zone.getName());
+        }
+      }
+    }
+  }
+
+  public boolean computeMapping(Map<String, List<String>> nodeToPartitionMap, 
int randomSeed) {
+    // Records exceed partitions
+    TreeMap<String, Integer> toBeReassigned = new TreeMap<>();
+
+    // Calculate total partitions that need to be calculated
+    long totalReplicaCount = 0;
+    for (List<String> partitions : nodeToPartitionMap.values()) {
+      totalReplicaCount += partitions.size();
+    }
+    if (totalReplicaCount == 0 || _replica > _faultZoneWeight.size()) {
+      return false;
+    }
+
+    // instance -> target (ideal) partition count
+    Map<String, Float> targetPartitionCount = new HashMap<>();
+    for (String liveInstance : _instanceFaultZone.keySet()) {
+      long zoneWeight = 
_faultZoneWeight.get(_instanceFaultZone.get(liveInstance));
+      float instanceRatioInZone = ((float) _instanceWeight.get(liveInstance)) 
/ zoneWeight;
+      // 1. if replica = fault zone, fault zone weight does not count, so 
calculate according to fault zone count.
+      // 2. else, should consider fault zone weight to calculate expected 
threshold.
+      float zonePartitions;
+      if (_replica == _faultZoneWeight.size()) {
+        zonePartitions = ((float) totalReplicaCount) / _faultZoneWeight.size();
+      } else {
+        zonePartitions = ((float) totalReplicaCount) * zoneWeight / 
_totalWeight;
+      }
+      targetPartitionCount.put(liveInstance, instanceRatioInZone * 
zonePartitions);
+    }
+
+    // Calculate the expected spikes
+    // Assign spikes to each zone according to zone weight
+    int totalOverflows = (int) totalReplicaCount % _instanceFaultZone.size();
+    Map<String, Integer> maxZoneOverflows = new HashMap<>();
+    for (String faultZoneName : _faultZoneWeight.keySet()) {
+      float zoneWeight = _faultZoneWeight.get(faultZoneName);
+      maxZoneOverflows.put(faultZoneName,
+          (int) Math.ceil(((float) totalOverflows) * zoneWeight / 
_totalWeight));
+    }
+
+    Iterator<String> nodeIter = nodeToPartitionMap.keySet().iterator();
+    while (nodeIter.hasNext()) {
+      String instance = nodeIter.next();
+      // Cleanup the existing mapping. Remove all non-active nodes from the 
mapping
+      if (!_instanceFaultZone.containsKey(instance)) {
+        List<String> partitions = nodeToPartitionMap.get(instance);
+        addToReAssignPartition(toBeReassigned, partitions);
+        partitions.clear();
+        nodeIter.remove();
+      }
+    }
+
+    List<String> orderedInstances = new 
ArrayList<>(_instanceFaultZone.keySet());
+    // Different resource should shuffle nodes in different ways.
+    Collections.shuffle(orderedInstances, new Random(randomSeed));
+    for (String instance : orderedInstances) {
+      if (!nodeToPartitionMap.containsKey(instance)) {
+        continue;
+      }
+      // Cut off the exceed partitions compared with target partition count.
+      List<String> partitions = nodeToPartitionMap.get(instance);
+      int target = (int) (Math.floor(targetPartitionCount.get(instance)));
+      if (partitions.size() > target) {
+        int maxZoneOverflow = 
maxZoneOverflows.get(_instanceFaultZone.get(instance));
+        if (maxZoneOverflow > 0 && totalOverflows > 0) {
+          // When fault zone has overflow capacity AND there are still 
remaining overflow partitions
+          target = (int) (Math.ceil(targetPartitionCount.get(instance)));
+          maxZoneOverflows.put(_instanceFaultZone.get(instance), 
maxZoneOverflow - 1);
+          totalOverflows--;
+        }
+
+        // Shuffle partitions to randomly pickup exceed ones. Ensure the 
algorithm generates consistent results when the inputs are the same.
+        Collections.shuffle(partitions, new Random(instance.hashCode() * 31 + 
randomSeed));
+        addToReAssignPartition(toBeReassigned, partitions.subList(target, 
partitions.size()));
+
+        // Put the remaining partitions to the assignment, and record in fault 
zone partition list
+        List<String> remainingPartitions = new 
ArrayList<>(partitions.subList(0, target));
+        partitions.clear();
+        nodeToPartitionMap.put(instance, remainingPartitions);
+      }
+      _faultZonePartitionMap.get(_instanceFaultZone.get(instance))
+          .addAll(nodeToPartitionMap.get(instance));
+    }
+
+    // Reassign if any instances have space left.
+    // Assign partition according to the target capacity, CAP at 
"Math.floor(target) + adjustment"
+    int adjustment = 0;
+    while (!toBeReassigned.isEmpty() && adjustment <= MAX_ADJUSTMENT) {
+      partitionDealing(_instanceFaultZone.keySet(), toBeReassigned, 
_faultZonePartitionMap,
+          _instanceFaultZone, nodeToPartitionMap, targetPartitionCount, 
randomSeed, adjustment++);
+    }
+    return toBeReassigned.isEmpty();
+  }
+
+  private void partitionDealing(Collection<String> instances,
+      TreeMap<String, Integer> toBeReassigned, Map<String, Set<String>> 
faultZonePartitionMap,
+      Map<String, String> faultZoneMap, final Map<String, List<String>> 
assignmentMap,
+      Map<String, Float> targetPartitionCount, final int randomSeed, int 
targetAdjustment) {
+    PriorityQueue<String> instanceQueue =
+        new PriorityQueue<>(instances.size(), new Comparator<String>() {
+          @Override
+          public int compare(String node1, String node2) {
+            int node1Load = assignmentMap.containsKey(node1) ? 
assignmentMap.get(node1).size() : 0;
+            int node2Load = assignmentMap.containsKey(node2) ? 
assignmentMap.get(node2).size() : 0;
+            if (node1Load == node2Load) {
+              return new Integer((node1 + randomSeed).hashCode())
+                  .compareTo((node2 + randomSeed).hashCode());
+            } else {
+              return node1Load - node2Load;
+            }
+          }
+        });
+    instanceQueue.addAll(instances);
+
+    while (!toBeReassigned.isEmpty()) {
+      boolean anyPartitionAssigned = false;
+      Iterator<String> instanceIter = instanceQueue.iterator();
+      while (instanceIter.hasNext()) {
+        String instance = instanceIter.next();
+        // Temporary remove the node from queue.
+        // If any partition assigned to the instance, add it back to reset 
priority.
+        instanceIter.remove();
+        boolean partitionAssignedToInstance = false;
+        String faultZoneStr = faultZoneMap.get(instance);
+        List<String> partitions = assignmentMap.containsKey(instance) ?
+            assignmentMap.get(instance) :
+            new ArrayList<String>();
+        int space =
+            (int) (Math.floor(targetPartitionCount.get(instance))) + 
targetAdjustment - partitions
+                .size();
+        if (space > 0) {
+          // Find a pending partition to locate
+          for (String pendingPartition : toBeReassigned.navigableKeySet()) {
+            if 
(!faultZonePartitionMap.get(faultZoneStr).contains(pendingPartition)) {
+              if (!assignmentMap.containsKey(instance)) {
+                assignmentMap.put(instance, partitions);
+              }
+              partitions.add(pendingPartition);
+              faultZonePartitionMap.get(faultZoneStr).add(pendingPartition);
+              if (toBeReassigned.get(pendingPartition) == 1) {
+                toBeReassigned.remove(pendingPartition);
+              } else {
+                toBeReassigned.put(pendingPartition, 
toBeReassigned.get(pendingPartition) - 1);
+              }
+              // if any assignment is made:
+              // this instance can hold more partitions in the future
+              partitionAssignedToInstance = true;
+              break;
+            }
+          }
+        }
+        if (partitionAssignedToInstance) {
+          // Reset priority in the queue
+          instanceQueue.add(instance);
+          anyPartitionAssigned = true;
+          break;
+        }
+      }
+      if (!anyPartitionAssigned) {
+        // if no pending partition is assigned to any instances in this loop, 
new assignment is not possible
+        break;
+      }
+    }
+  }
+
+  private void addToReAssignPartition(TreeMap<String, Integer> toBeReassigned,
+      List<String> partitions) {
+    for (String partition : partitions) {
+      if (!toBeReassigned.containsKey(partition)) {
+        toBeReassigned.put(partition, 1);
+      } else {
+        toBeReassigned.put(partition, toBeReassigned.get(partition) + 1);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/ConsistentHashingAdjustmentAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/ConsistentHashingAdjustmentAlgorithm.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/ConsistentHashingAdjustmentAlgorithm.java
new file mode 100644
index 0000000..e594cd1
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/ConsistentHashingAdjustmentAlgorithm.java
@@ -0,0 +1,136 @@
+package org.apache.helix.controller.rebalancer.strategy.crushMapping;
+
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.util.JenkinsHash;
+
+import java.util.*;
+
+public class ConsistentHashingAdjustmentAlgorithm {
+  private JenkinsHash _hashFunction;
+  private ConsistentHashSelector _selector;
+  Set<String> _liveInstances = new HashSet<>();
+  // Instance -> FaultZone Tag
+  private Map<String, String> _faultZoneMap = new HashMap<>();
+  // Record existing partitions that are assigned to a fault zone
+  private Map<String, Set<String>> _faultZonePartitionMap = new HashMap<>();
+
+  public ConsistentHashingAdjustmentAlgorithm(Topology topology) {
+    _hashFunction = new JenkinsHash();
+    List<String> allInstances = new ArrayList<>();
+    // Get all instance related information.
+    for (Node zone : topology.getFaultZones()) {
+      for (Node instance : Topology.getAllLeafNodes(zone)) {
+        if (!instance.isFailed()) {
+          _liveInstances.add(instance.getName());
+        }
+        allInstances.add(instance.getName());
+        _faultZoneMap.put(instance.getName(), zone.getName());
+        if (!_faultZonePartitionMap.containsKey(zone.getName())) {
+          _faultZonePartitionMap.put(zone.getName(), new HashSet<String>());
+        }
+      }
+    }
+    _selector = new ConsistentHashSelector(allInstances);
+  }
+
+  public boolean computeMapping(Map<String, List<String>> nodeToPartitionMap, 
int randomSeed) {
+    if (_liveInstances.isEmpty()) {
+      return false;
+    }
+
+    Set<String> inactiveInstances = new HashSet<>();
+    Map<String, Integer> toBeReassigned = new HashMap<>();
+    // Remove all partition assignment to a non-live instance
+    Iterator<String> nodeIter = nodeToPartitionMap.keySet().iterator();
+    while (nodeIter.hasNext()) {
+      String instance = nodeIter.next();
+      List<String> partitions = nodeToPartitionMap.get(instance);
+      if (!_liveInstances.contains(instance)) {
+        inactiveInstances.add(instance);
+        addToReAssignPartition(toBeReassigned, partitions);
+        partitions.clear();
+        nodeIter.remove();
+      } else {
+        
_faultZonePartitionMap.get(_faultZoneMap.get(instance)).addAll(partitions);
+      }
+    }
+
+    for (String partition : new ArrayList<>(toBeReassigned.keySet())) {
+      int remainReplicas = toBeReassigned.get(partition);
+      Set<String> conflictInstance = new HashSet<>();
+      for (int index = 0; index < toBeReassigned.get(partition); index++) {
+        Iterable<String> sortedInstances = 
_selector.getCircle(_hashFunction.hash(randomSeed, partition.hashCode(), 
index));
+        Iterator<String> instanceItr = sortedInstances.iterator();
+        while (instanceItr.hasNext() && conflictInstance.size() + 
inactiveInstances.size() != _selector.instanceSize) {
+          String instance = instanceItr.next();
+          if (!_liveInstances.contains(instance)) {
+            inactiveInstances.add(instance);
+          }
+          if (inactiveInstances.contains(instance) || 
conflictInstance.contains(instance)) {
+            continue;
+          }
+          Set<String> faultZonePartitions = 
_faultZonePartitionMap.get(_faultZoneMap.get(instance));
+          if (faultZonePartitions.contains(partition)) {
+            conflictInstance.add(instance);
+            continue;
+          }
+          // insert this assignment
+          if (!nodeToPartitionMap.containsKey(instance)) {
+            nodeToPartitionMap.put(instance, new ArrayList<String>());
+          }
+          nodeToPartitionMap.get(instance).add(partition);
+          faultZonePartitions.add(partition);
+          remainReplicas--;
+          break;
+        }
+      }
+      if (remainReplicas == 0) {
+        toBeReassigned.remove(partition);
+      } else {
+        toBeReassigned.put(partition, remainReplicas);
+      }
+    }
+
+    return toBeReassigned.isEmpty();
+  }
+
+  private void addToReAssignPartition(Map<String, Integer> toBeReassigned,
+      List<String> partitions) {
+    for (String partition : partitions) {
+      if (!toBeReassigned.containsKey(partition)) {
+        toBeReassigned.put(partition, 1);
+      } else {
+        toBeReassigned.put(partition, toBeReassigned.get(partition) + 1);
+      }
+    }
+  }
+
+  private class ConsistentHashSelector {
+    private final static int DEFAULT_TOKENS_PER_INSTANCE = 1000;
+    private final SortedMap<Long, String> circle = new TreeMap<Long, String>();
+    protected int instanceSize = 0;
+
+    public ConsistentHashSelector(List<String> instances) {
+      for (String instance : instances) {
+        long tokenCount = DEFAULT_TOKENS_PER_INSTANCE;
+        add(instance, tokenCount);
+        instanceSize++;
+      }
+    }
+
+    private void add(String instance, long numberOfReplicas) {
+      for (int i = 0; i < numberOfReplicas; i++) {
+        circle.put(_hashFunction.hash(instance.hashCode(), i), instance);
+      }
+    }
+
+    public Iterable<String> getCircle(long data) {
+      if (circle.isEmpty()) {
+        return null;
+      }
+      long hash = _hashFunction.hash(data);
+      return circle.tailMap(hash).values();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/test/java/org/apache/helix/DistributionTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/DistributionTest.java 
b/helix-core/src/test/java/org/apache/helix/DistributionTest.java
new file mode 100644
index 0000000..bf580d1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/DistributionTest.java
@@ -0,0 +1,753 @@
+package org.apache.helix;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.util.HelixUtil;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.*;
+
+public class DistributionTest {
+  private static String instanceFolderPath;
+  private static String instanceList;
+  private static String idealStateFolderPath;
+  private static String idealStateList;
+
+  String Path = "/home/jjwang/Desktop/FEAP-test";
+  //String Path = "/Users/jjwang/Desktop/FEAP-test";
+
+  @DataProvider(name = "rebalanceStrategies")
+  public static String[][] rebalanceStrategies() {
+    return new String[][] {
+        //{AutoRebalanceStrategy.class.getName()},
+        { CrushRebalanceStrategy.class.getName() },
+        //{ MultiRoundCrushRebalanceStrategy.class.getName() },
+        //{ CrushEdRebalanceStrategy.class.getName() }
+    };
+  }
+
+  String[] fabrics = { "lor1", "lva1", "ltx1", "lsg1",
+  };
+  String[] clusters = { "ESPRESSO_IDENTITY", "ESPRESSO_MT-MD-1", 
"ESPRESSO_TSCP", "ESPRESSO_MT_PHASE1",
+          "ESPRESSO_MT-MD-3", "ESPRESSO_USCP", "ESPRESSO_MT-LEGACY", /* 
"venice-0" */
+  };
+  String topState = "master";
+  float[] nodeAdjustSimulator =
+      { /*-0.5f, -0.2f, -0.1f, -0.01f, */ 0.01f, 0.1f, 0.2f, 0.5f, 1f};
+
+  @Test(dataProvider = "rebalanceStrategies")
+  public void testNodeChange(String rebalanceStrategyClass) throws Exception {
+    for (String cluster : clusters) {
+      System.out.println(cluster
+          + 
"\tChangeType\tNumOfNodeChange\tDiffRate\tTotalMv\tTotalMvRate\tExtraMvRate\tExtraMvRateComparedWithAvgDist\tTopStateChange\tTopStateChangeRate\tTopStateChangeWithNewDeployRate\tExtraTopStateChangeRate");
+      for (String fabric : fabrics) {
+        String path = Path + "/" + cluster + "/" + fabric;
+        if (new File(path).exists()) {
+          System.out.print(fabric);
+          for (float adjustRate : nodeAdjustSimulator) {
+            Set<String> deltaNode = new HashSet<>();
+            List<String> liveInstances = new ArrayList<>();
+            Map<String, Map<String, String>> resultA =
+                calculate(path, rebalanceStrategyClass, adjustRate, deltaNode, 
liveInstances);
+            double[] distEval = checkEvenness(liveInstances, resultA, false);
+            if (adjustRate != 0) {
+              Map<String, Map<String, String>> result =
+                  calculate(path, rebalanceStrategyClass, 0, deltaNode, new 
ArrayList<String>());
+              double[] diff = checkMovement(result, resultA, deltaNode, false);
+              System.out.println(
+                  "\t" + (adjustRate > 0 ? "Adding\t" : "Disabling\t") + 
diff[0] + "\t"
+                      + distEval[3] + "\t" + diff[1] + "\t" + diff[2] + "\t" + 
diff[3] + "\t"
+                      + diff[8] + "\t" + diff[4] + "\t" + diff[5] + "\t" + 
diff[10] + "\t"
+                      + diff[6]);
+            }
+          }
+        }
+      }
+      System.out.println();
+    }
+  }
+
+  @Test(dataProvider = "rebalanceStrategies")
+  public void testDist(String rebalanceStrategyClass) throws Exception {
+    for (String cluster : clusters) {
+      System.out.println(cluster
+          + 
"\tTotalReplica\tMinReplica\tMaxReplica\tDiffRate\tSTDEV\tMinTopState\tMaxTopState\ttopStateDiffRate\ttopStateSTDEV");
+      for (String fabric : fabrics) {
+        String path = Path + "/" + cluster + "/" + fabric;
+        if (new File(path).exists()) {
+          Set<String> deltaNode = new HashSet<>();
+          List<String> liveInstances = new ArrayList<>();
+          Map<String, Map<String, String>> result =
+              calculate(path, rebalanceStrategyClass, 0, deltaNode, 
liveInstances);
+          double[] distEval = checkEvenness(liveInstances, result, false);
+          System.out.println(
+              fabric + "\t" + distEval[0] + "\t" + distEval[1] + "\t" + 
distEval[2] + "\t"
+                  + distEval[3] + "\t" + distEval[4] + "\t" + distEval[5] + 
"\t" + distEval[6]
+                  + "\t" + distEval[7] + "\t" + distEval[8]);
+        }
+      }
+      System.out.println();
+    }
+  }
+
+  int _replica = 1;
+  int partitionCount = 101;
+  int faultZone = 10;
+  int[] resourceCounts = new int[] { 100 };
+  int[] nodeCounts = new int[] { 100, /*100, 200, 500, 1000*/ };
+
+  @Test(dataProvider = "rebalanceStrategies")
+  public void testDistUsingRandomTopo(String rebalanceStrategyClass) throws 
Exception {
+    for (int nodeCount : nodeCounts) {
+      for (int resourceCount : resourceCounts) {
+        System.out.println(
+            
"NodeCount\tResourceCount\tTotalReplica\tMinReplica\tMaxReplica\tDiffRate\tSTDEV\tMinTopState\tMaxTopState\tTopStateDiffRate\tTopStateSTDEV");
+        List<String> liveInstances = new ArrayList<>();
+        Map<String, Map<String, String>> result =
+            calculateUsingRandomTopo(rebalanceStrategyClass, _replica, 
partitionCount,
+                resourceCount, nodeCount, faultZone, liveInstances);
+        double[] distEval = checkEvenness(liveInstances, result, false);
+        System.out.println(
+            nodeCount + "\t" + resourceCount + "\t" + distEval[0] + "\t" + 
distEval[1] + "\t"
+                + distEval[2] + "\t" + distEval[3] + "\t" + distEval[4] + "\t" 
+ distEval[5] + "\t"
+                + distEval[6] + "\t" + distEval[7] + "\t" + distEval[8]);
+      }
+    }
+
+    System.out.println();
+  }
+
+  @Test(dataProvider = "rebalanceStrategies")
+  public void testRollingUpgrade(String rebalanceStrategyClass) throws 
Exception {
+    for (String cluster : clusters) {
+      System.out.println(cluster
+          + 
"\tTotalMv\tTotalMvRate\tExtraMvRate\tExtraMvRateComparedWithAvgDist\tTopStateChange\tTopStateChangeRate\tTopStateChangeWithNewDeployRate\tExtraTopStateChange");
+      for (String fabric : fabrics) {
+        String path = Path + "/" + cluster + "/" + fabric;
+        if (new File(path).exists()) {
+
+          List<List<String>> deltaNodesHistory = new ArrayList<>();
+          List<List<String>> liveInstancesHistory = new ArrayList<>();
+
+          List<Map<String, Map<String, String>>> mappingHistory =
+              calculateRollingUpgrade(path, rebalanceStrategyClass, 
deltaNodesHistory,
+                  liveInstancesHistory, true);
+
+          Map<String, Map<String, String>> basicMapping =
+              calculate(path, rebalanceStrategyClass, 0, new HashSet<String>(),
+                  new ArrayList<String>());
+
+          double[] maxDiff = new double[8];
+          for (int i = 0; i < mappingHistory.size(); i++) {
+            List<String> deltaNode = deltaNodesHistory.get(i);
+            Map<String, Map<String, String>> mapA = mappingHistory.get(i);
+
+            Map<String, Map<String, String>> mapB = basicMapping;
+            if (i != 0) {
+              deltaNode.addAll(deltaNodesHistory.get(i - 1));
+              mapB = mappingHistory.get(i - 1);
+            }
+            double[] diff = checkMovement(mapB, mapA, deltaNode, false);
+
+            maxDiff[0] = Math.max(diff[1], maxDiff[0]);
+            maxDiff[1] = Math.max(diff[2], maxDiff[1]);
+            maxDiff[2] = Math.max(diff[3], maxDiff[2]);
+            maxDiff[3] = Math.max(diff[4], maxDiff[3]);
+            maxDiff[4] = Math.max(diff[5], maxDiff[4]);
+            maxDiff[5] = Math.max(diff[6], maxDiff[5]);
+            maxDiff[6] = Math.max(diff[8], maxDiff[6]);
+            maxDiff[7] = Math.max(diff[10], maxDiff[7]);
+          }
+          System.out.println(
+              fabric + "\t" + maxDiff[0] + "\t" + maxDiff[1] + "\t" + 
maxDiff[2] + "\t" + maxDiff[6]
+                  + "\t" + maxDiff[3] + "\t" + maxDiff[4] + "\t" + maxDiff[7] 
+ "\t" + maxDiff[5]);
+        }
+      }
+      System.out.println();
+    }
+  }
+
+  public List<Map<String, Map<String, String>>> calculateRollingUpgrade(String 
Path,
+      String rebalanceStrategyClass, List<List<String>> deltaNodesHistory,
+      List<List<String>> liveInstancesHistory, boolean recoverNode) throws 
Exception {
+    instanceFolderPath = Path + "/instanceConfigs/";
+    instanceList = Path + "/instance";
+    idealStateFolderPath = Path + "/idealStates/";
+    idealStateList = Path + "/idealstate";
+    Path path = Paths.get(Path + "/clusterConfig");
+    ZNRecord record = (ZNRecord) new 
ZNRecordSerializer().deserialize(Files.readAllBytes(path));
+    ClusterConfig clusterConfig = new ClusterConfig(record);
+    List<String> allNodes = new ArrayList<>();
+    List<InstanceConfig> instanceConfigs =
+        getInstanceConfigs(instanceFolderPath, instanceList, allNodes);
+    List<IdealState> idealStates = getIdealStates(idealStateFolderPath, 
idealStateList);
+
+    List<String> deltaNodes = new ArrayList<>();
+
+    List<Map<String, Map<String, String>>> totalMapHistory = new ArrayList<>();
+    for (String downNode : allNodes) {
+      deltaNodes.add(downNode);
+
+      List<String> liveInstances = new ArrayList<>(allNodes);
+      liveInstances.removeAll(deltaNodes);
+      Map<String, Map<String, String>> totalMaps = new HashMap<>();
+
+      totalMapHistory.add(totalMaps);
+      liveInstancesHistory.add(liveInstances);
+      deltaNodesHistory.add(new ArrayList<>(deltaNodes));
+
+      Map<String, Integer> partitions = new HashMap<>();
+      for (int i = 0; i < idealStates.size(); i++) {
+        Map<String, Map<String, String>> maps = HelixUtil
+            .getIdealAssignmentForFullAuto(clusterConfig, instanceConfigs, 
liveInstances,
+                idealStates.get(i), new 
ArrayList<>(idealStates.get(i).getPartitionSet()),
+                rebalanceStrategyClass);
+        for (String partitionName : idealStates.get(i).getPartitionSet()) {
+          partitions.put(partitionName, 
idealStates.get(i).getReplicaCount(liveInstances.size()));
+        }
+        totalMaps.putAll(maps);
+      }
+      Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+      for (InstanceConfig config : instanceConfigs) {
+        instanceConfigMap.put(config.getInstanceName(), config);
+      }
+      verifyDistribution(totalMaps, liveInstances, partitions,
+          new Topology(new ArrayList<>(instanceConfigMap.keySet()), 
liveInstances,
+              instanceConfigMap, clusterConfig));
+      if (recoverNode) {
+        deltaNodes.remove(downNode);
+      }
+    }
+    return totalMapHistory;
+  }
+
+  public Map<String, Map<String, String>> calculateUsingRandomTopo(String 
rebalanceStrategyClass,
+      int replica, int partitionCount, int nodeCount, int resourceCount, int 
faultZone,
+      List<String> liveInstances) throws Exception {
+    String[] className = rebalanceStrategyClass.split("\\.");
+    String PARTICIPANT_PREFIX =
+        className[className.length - 1] + "_node_" + nodeCount + resourceCount;
+    String RESOURCE_PREFIX =
+        className[className.length - 1] + "_resource_" + nodeCount + 
resourceCount;
+    String CLUSTER_NAME =
+        className[className.length - 1] + nodeCount + resourceCount + 
"TestingCluster";
+
+    ClusterConfig clusterConfig = new ClusterConfig(CLUSTER_NAME);
+    clusterConfig.setTopologyAwareEnabled(true);
+    clusterConfig.setFaultZoneType("zone");
+    clusterConfig.setTopology("/zone/rack/instance");
+
+    List<InstanceConfig> newInstanceConfigs = new ArrayList<>();
+    Random rand = new Random();
+    for (int i = 0; i < nodeCount; i++) {
+      String nodeName = PARTICIPANT_PREFIX + Math.abs(rand.nextInt()) + "_" + 
i;
+      String zone = "zone-" + i % faultZone;
+      InstanceConfig newConfig = new InstanceConfig(nodeName);
+      liveInstances.add(nodeName);
+      newConfig.setInstanceEnabled(true);
+      newConfig.setHostName(nodeName);
+      newConfig.setPort(new Integer(i).toString());
+      newConfig.setDomain(String
+          .format("cluster=%s,zone=%s,rack=myRack,instance=%s", CLUSTER_NAME, 
zone, nodeName));
+      newConfig.setWeight(1000);
+      newConfig.setDelayRebalanceEnabled(false);
+      newConfig.setMaxConcurrentTask(1000);
+      newInstanceConfigs.add(newConfig);
+    }
+
+    Map<String, Map<String, String>> totalMaps = new HashMap<>();
+    Map<String, Integer> partitions = new HashMap<>();
+    List<IdealState> idealStates = new ArrayList<>();
+
+    for (int i = 0; i < resourceCount; i++) {
+      String resourceName = RESOURCE_PREFIX + "_" + i;
+      IdealState idealState = new IdealState(resourceName);
+      
idealState.setStateModelDefRef(BuiltInStateModelDefinitions.MasterSlave.name());
+      idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+      idealState.setReplicas(new Integer(replica).toString());
+      idealState.setNumPartitions(partitionCount);
+      idealState.setRebalancerClassName(rebalanceStrategyClass);
+      for (int p = 0; p < partitionCount; p++) {
+        String partitionName = resourceName + "_" + p;
+        idealState.setPreferenceList(partitionName, new ArrayList<String>());
+      }
+      idealStates.add(idealState);
+    }
+
+    long duration = 0;
+    for (IdealState idealState : idealStates) {
+      long startTime = System.currentTimeMillis();
+      Map<String, Map<String, String>> maps = HelixUtil
+          .getIdealAssignmentForFullAuto(clusterConfig, newInstanceConfigs, 
liveInstances,
+              idealState, new ArrayList<>(idealState.getPartitionSet()), 
rebalanceStrategyClass);
+      duration += System.currentTimeMillis() - startTime;
+
+      for (String partitionName : idealState.getPartitionSet()) {
+        partitions.put(partitionName, 
idealState.getReplicaCount(liveInstances.size()));
+      }
+      totalMaps.putAll(maps);
+    }
+
+    //System.out.println("Total running time:\t" + duration);
+
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    for (InstanceConfig config : newInstanceConfigs) {
+      instanceConfigMap.put(config.getInstanceName(), config);
+    }
+    verifyDistribution(totalMaps, liveInstances, partitions,
+        new Topology(new ArrayList<>(instanceConfigMap.keySet()), 
liveInstances, instanceConfigMap,
+            clusterConfig));
+    return totalMaps;
+  }
+
+  public Map<String, Map<String, String>> calculate(String Path, String 
rebalanceStrategyClass,
+      float instanceAdjustRate, Set<String> deltaNode, List<String> 
liveInstances)
+      throws Exception {
+    instanceFolderPath = Path + "/instanceConfigs/";
+    instanceList = Path + "/instance";
+    idealStateFolderPath = Path + "/idealStates/";
+    idealStateList = Path + "/idealstate";
+    Path path = Paths.get(Path + "/clusterConfig");
+    ZNRecord record = (ZNRecord) new 
ZNRecordSerializer().deserialize(Files.readAllBytes(path));
+    ClusterConfig clusterConfig = new ClusterConfig(record);
+    List<InstanceConfig> instanceConfigs =
+        getInstanceConfigs(instanceFolderPath, instanceList, liveInstances);
+
+    int adjustNodeCount = (int) (instanceAdjustRate > 0 ?
+        Math.ceil(instanceAdjustRate * liveInstances.size()) :
+        Math.floor(instanceAdjustRate * liveInstances.size()));
+
+    if (adjustNodeCount > 0) {
+      for (int i = 0; i < adjustNodeCount; i++) {
+        int cloneIndex = i % (liveInstances.size() - 1);
+        String nodeName = instanceConfigs.get(cloneIndex).getInstanceName() + 
"_random" + i;
+        liveInstances.add(nodeName);
+        InstanceConfig cloneConfig = new InstanceConfig(nodeName);
+        cloneConfig.setHostName(nodeName);
+        cloneConfig.setInstanceEnabled(true);
+        cloneConfig.setPort(instanceConfigs.get(cloneIndex).getPort());
+        cloneConfig.setDomain(instanceConfigs.get(cloneIndex).getDomain() + 
"_random" + i);
+        if (instanceConfigs.get(cloneIndex).getWeight() > 0) {
+          cloneConfig.setWeight(instanceConfigs.get(cloneIndex).getWeight());
+        }
+        cloneConfig
+            
.setDelayRebalanceEnabled(instanceConfigs.get(cloneIndex).isDelayRebalanceEnabled());
+        if (instanceConfigs.get(cloneIndex).getMaxConcurrentTask() > 0) {
+          
cloneConfig.setMaxConcurrentTask(instanceConfigs.get(cloneIndex).getMaxConcurrentTask());
+        }
+        instanceConfigs.add(cloneConfig);
+        deltaNode.add(nodeName);
+      }
+    } else {
+      if (adjustNodeCount > liveInstances.size()) {
+        throw new Exception("All nodes are removed, no assignment possible.");
+      }
+      for (int i = 0; i < Math.abs(adjustNodeCount); i++) {
+        String nodeName = liveInstances.remove(i);
+        deltaNode.add(nodeName);
+      }
+    }
+
+    List<IdealState> idealStates = getIdealStates(idealStateFolderPath, 
idealStateList);
+    Map<String, Map<String, String>> totalMaps = new HashMap<>();
+    Map<String, Integer> partitions = new HashMap<>();
+
+    long duration = 0;
+    for (int i = 0; i < idealStates.size(); i++) {
+      long startTime = System.currentTimeMillis();
+      int partitionCount = idealStates.get(i).getNumPartitions();
+      List<String> partitionList =
+          new ArrayList<>(idealStates.get(i).getPartitionSet()).subList(0, 
partitionCount);
+      Map<String, Map<String, String>> maps = HelixUtil
+          .getIdealAssignmentForFullAuto(clusterConfig, instanceConfigs, 
liveInstances,
+              idealStates.get(i), partitionList, rebalanceStrategyClass);
+      for (String partitionName : partitionList) {
+        partitions.put(partitionName, 
idealStates.get(i).getReplicaCount(liveInstances.size()));
+      }
+      duration += System.currentTimeMillis() - startTime;
+
+      // print resource details
+/*      Map<String, Set<String>> nodeMapping = convertMapping(maps);
+      String partitionCountsStr = idealStates.get(i).getResourceName();
+      List<String> sortedInstances = new ArrayList<>(liveInstances);
+      Collections.sort(sortedInstances);
+      for (String node : sortedInstances) {
+        partitionCountsStr += "\t" + (nodeMapping.containsKey(node) ? 
nodeMapping.get(node).size() : 0);
+      }
+      System.out.println(partitionCountsStr);*/
+
+      totalMaps.putAll(maps);
+    }
+    //System.out.println("Takes " + duration + "ms");
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    for (InstanceConfig config : instanceConfigs) {
+      instanceConfigMap.put(config.getInstanceName(), config);
+    }
+    verifyDistribution(totalMaps, liveInstances, partitions,
+        new Topology(new ArrayList<>(instanceConfigMap.keySet()), 
liveInstances, instanceConfigMap,
+            clusterConfig));
+    return totalMaps;
+  }
+
+  private void verifyDistribution(Map<String, Map<String, String>> map, 
List<String> liveInstances,
+      Map<String, Integer> partitionExp, Topology topology) throws Exception {
+    Map<String, Set<String>> faultZonePartition = new HashMap<>();
+    Map<String, String> instanceFaultZone = new HashMap<>();
+    for (Node node : topology.getFaultZones()) {
+      faultZonePartition.put(node.getName(), new HashSet<String>());
+      for (Node instance : Topology.getAllLeafNodes(node)) {
+        instanceFaultZone.put(instance.getName(), node.getName());
+      }
+    }
+    for (String partition : map.keySet()) {
+      // no partition missing, no partition duplicate
+      if (!partitionExp.containsKey(partition) || map.get(partition).size() != 
partitionExp
+          .get(partition)) {
+        throw new Exception("partition replica in mapping is not as expected");
+      }
+      partitionExp.remove(partition);
+      // no partition on non-live node
+      for (String instance : map.get(partition).keySet()) {
+        if (!liveInstances.contains(instance)) {
+          throw new Exception("assignment is not on a live node!");
+        }
+        // no fault zone conflict
+        String faultZone = instanceFaultZone.get(instance);
+        if (faultZonePartition.get(faultZone).contains(partition)) {
+          throw new Exception("faultzone conflict!");
+        }
+        faultZonePartition.get(faultZone).add(partition);
+      }
+    }
+    if (!partitionExp.isEmpty()) {
+      throw new Exception("partition is not assigned");
+    }
+  }
+
+  private double[] checkEvenness(List<String> liveInstances,
+      Map<String, Map<String, String>> totalMaps, boolean verbose) {
+    StringBuilder output = new StringBuilder();
+    Map<String, List<String>> detailMap = new HashMap<>();
+    Map<String, Integer> distributionMap = new TreeMap<>();
+    Map<String, Integer> topStateDistributionMap = new HashMap<>();
+    for (String instance : liveInstances) {
+      distributionMap.put(instance, 0);
+      topStateDistributionMap.put(instance, 0);
+      detailMap.put(instance, new ArrayList<String>());
+    }
+
+    for (String partition : totalMaps.keySet()) {
+      Map<String, String> instanceMap = totalMaps.get(partition);
+      for (String instance : instanceMap.keySet()) {
+        detailMap.get(instance).add(partition + "-" + 
totalMaps.get(partition).get(instance));
+        distributionMap.put(instance, distributionMap.get(instance) + 1);
+        if (instanceMap.get(instance).equalsIgnoreCase(topState)) {
+          topStateDistributionMap.put(instance, 
topStateDistributionMap.get(instance) + 1);
+        }
+      }
+    }
+
+    int totalReplicas = 0;
+    int minR = Integer.MAX_VALUE;
+    int maxR = 0;
+    int mminR = Integer.MAX_VALUE;
+    int mmaxR = 0;
+    for (String instance : distributionMap.keySet()) {
+      output.append(instance + "\t" + distributionMap.get(instance) + 
"\tpartitions\t"
+          + topStateDistributionMap.get(instance) + "\ttopStates\n");
+      //output.append(instance + "\t:\t" + distributionMap.get(instance) + 
"\tpartitions\t" + topStateDistributionMap.get(instance) + "\ttopStates\t" + 
detailMap.get(instance) + "\n");
+      totalReplicas += distributionMap.get(instance);
+      minR = Math.min(minR, distributionMap.get(instance));
+      maxR = Math.max(maxR, distributionMap.get(instance));
+    }
+    for (String instance : topStateDistributionMap.keySet()) {
+      mminR = Math.min(mminR, topStateDistributionMap.get(instance));
+      mmaxR = Math.max(mmaxR, topStateDistributionMap.get(instance));
+    }
+
+    output.append("Maximum holds " + maxR + " replicas and minimum holds " + 
minR
+        + " replicas, differentiation is " + (double) (maxR - minR) / maxR * 
100 + "%\n");
+    output.append("Maximum holds " + mmaxR + " topStates and minimum holds " + 
mminR
+        + " topStates, differentiation is " + (double) (mmaxR - mminR) / mmaxR 
* 100 + "%\n ");
+
+    if (verbose) {
+      System.out.println(output.toString());
+    }
+    return new double[] { totalReplicas, minR, maxR, (double) (maxR - minR) / 
maxR * 100,
+        STDEV(new ArrayList<>(distributionMap.values())), mminR, mmaxR,
+        (double) (mmaxR - mminR) / mmaxR * 100,
+        STDEV(new ArrayList<>(topStateDistributionMap.values()))
+    };
+  }
+
+  private double STDEV(List<Integer> data) {
+    if (data.isEmpty() || data.size() == 1) {
+      return 0;
+    }
+    double totalDiff = 0;
+    double average = 0;
+    for (int num : data) {
+      average += num;
+    }
+    average /= data.size();
+    for (int i = 0; i < data.size(); i++) {
+      totalDiff += Math.pow(data.get(i) - average, 2);
+    }
+    return Math.sqrt(totalDiff) / (data.size() - 1);
+  }
+
+  private static List<InstanceConfig> getInstanceConfigs(String 
instanceFolderPath,
+      String instanceList, List<String> liveInstances) throws IOException {
+    List<InstanceConfig> instanceConfigs = new ArrayList<>();
+    for (ZNRecord record : getRecords(instanceFolderPath, instanceList)) {
+      instanceConfigs.add(new InstanceConfig(record));
+      liveInstances.add(record.getId());
+    }
+    return instanceConfigs;
+  }
+
+  private static List<IdealState> getIdealStates(String idealStateFolderPath, 
String idealStateList)
+      throws IOException {
+    List<IdealState> idealStates = new ArrayList<>();
+    for (ZNRecord record : getRecords(idealStateFolderPath, idealStateList)) {
+      IdealState idealState = new IdealState(record);
+      try {
+        BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef());
+      } catch (IllegalArgumentException ex) {
+        idealState.setStateModelDefRef("OnlineOffline");
+      }
+      idealStates.add(idealState);
+    }
+    return idealStates;
+  }
+
+  private static List<ZNRecord> getRecords(String folderPath, String list) 
throws IOException {
+    List<ZNRecord> records = new ArrayList<>();
+    List<String> names = new ArrayList<>();
+    try (BufferedReader br = new BufferedReader(new FileReader(list))) {
+      String sCurrentLine = br.readLine();
+      names.addAll(Arrays.asList(sCurrentLine.split(" ")));
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    for (String name : names) {
+      Path path = Paths.get(folderPath + name);
+      ZNRecord record = (ZNRecord) new 
ZNRecordSerializer().deserialize(Files.readAllBytes(path));
+      records.add(record);
+    }
+    return records;
+  }
+
+  private Map<String, Map<String, Integer>> getStateCount(Map<String, 
Map<String, String>> map) {
+    Map<String, Map<String, Integer>> mapStateCount = new HashMap<>();
+    for (String partition : map.keySet()) {
+      Map<String, Integer> stateCount = new HashMap<>();
+      mapStateCount.put(partition, stateCount);
+      for (String node : map.get(partition).keySet()) {
+        String state = map.get(partition).get(node);
+        if (!stateCount.containsKey(state)) {
+          stateCount.put(state, 1);
+        } else {
+          stateCount.put(state, stateCount.get(state) + 1);
+        }
+      }
+    }
+    return mapStateCount;
+  }
+
+  private void verifyMaps(Map<String, Map<String, String>> map,
+      Map<String, Map<String, String>> newMap) throws Exception {
+    // check no partition lose
+    Map<String, Map<String, Integer>> mapStateCount = getStateCount(map);
+    Map<String, Map<String, Integer>> newMapStateCount = getStateCount(newMap);
+    for (String partition : mapStateCount.keySet()) {
+      if (!newMapStateCount.containsKey(partition)) {
+        throw new Exception("mapping does not match");
+      }
+      for (String state : mapStateCount.get(partition).keySet()) {
+        if (!newMapStateCount.get(partition).containsKey(state)
+            || mapStateCount.get(partition).get(state) != 
newMapStateCount.get(partition)
+            .get(state)) {
+          throw new Exception("state does not match");
+        }
+      }
+      for (String state : newMapStateCount.get(partition).keySet()) {
+        if (!mapStateCount.get(partition).containsKey(state)) {
+          throw new Exception("state does not match");
+        }
+      }
+    }
+    for (String partition : newMapStateCount.keySet()) {
+      if (!mapStateCount.containsKey(partition)) {
+        throw new Exception("mapping does not match");
+      }
+    }
+  }
+
+  private double[] checkMovement(Map<String, Map<String, String>> map,
+      Map<String, Map<String, String>> newMap, Collection<String> deltaNodes, 
boolean verbose)
+      throws Exception {
+    verifyMaps(map, newMap);
+    int totalChange = 0;
+    int totalTopStateChange = 0;
+    int totalTopStateChangeWithNewDeployment = 0;
+    int totalPartition = 0;
+    int totalTopState = 0;
+
+    for (String partition : map.keySet()) {
+      Map<String, String> origStates = map.get(partition);
+      Map<String, String> newStates = newMap.get(partition);
+      String topStateNode = "", newtopStateNode = "";
+      for (String node : origStates.keySet()) {
+        if (origStates.get(node).equalsIgnoreCase(topState)) {
+          topStateNode = node;
+        }
+      }
+      for (String node : newStates.keySet()) {
+        if (newStates.get(node).equalsIgnoreCase(topState)) {
+          newtopStateNode = node;
+          totalTopState++;
+        }
+      }
+      if (!topStateNode.equalsIgnoreCase(newtopStateNode)) {
+        totalTopStateChange++;
+        if (!origStates.containsKey(newtopStateNode)) {
+          totalTopStateChangeWithNewDeployment++;
+        }
+      }
+    }
+
+    Map<String, Set<String>> list = convertMapping(map);
+    Map<String, Set<String>> newList = convertMapping(newMap);
+
+    Map<String, Integer> addition = new HashMap<>();
+    Map<String, Integer> subtraction = new HashMap<>();
+    for (String instance : newList.keySet()) {
+      Set<String> oldPartitions = list.get(instance);
+      Set<String> newPartitions = newList.get(instance);
+      totalPartition += newPartitions.size();
+      if (oldPartitions == null) {
+        addition.put(instance, newPartitions.size());
+      } else {
+        Set<String> commonPartitions = new HashSet<>(newPartitions);
+        commonPartitions.retainAll(oldPartitions);
+
+        newPartitions.removeAll(commonPartitions);
+
+        addition.put(instance, newPartitions.size());
+
+        oldPartitions.removeAll(commonPartitions);
+        subtraction.put(instance, oldPartitions.size());
+      }
+      totalChange += newPartitions.size();
+      //System.out.println("Changed partition on node: \t" + instance + "\t: 
\t" + newPartitions.toString());
+    }
+    /*
+      List<String> instances = new ArrayList<>(newList.keySet());
+      Collections.sort(instances);
+      System.out.println("Addition partition count: ");
+      for (String instance : instances) {
+        System.out.println(addition.containsKey(instance) ? 
addition.get(instance) : 0);
+      }
+
+      System.out.println("Subtraction partition count: ");
+      for (String instance : instances) {
+        System.out.println(subtraction.containsKey(instance) ? 
subtraction.get(instance) : 0);
+      }
+    */
+
+    int nodeChanged = 0;
+    int necessaryChange = 0;
+    int necessarytopStateChange = 0;
+    for (String instance : deltaNodes) {
+      nodeChanged++;
+      if (list.containsKey(instance)) {
+        necessaryChange += list.get(instance).size();
+        for (Map<String, String> nodeState : map.values()) {
+          if (nodeState.containsKey(instance)) {
+            if (nodeState.get(instance).equalsIgnoreCase(topState)) {
+              necessarytopStateChange++;
+            }
+          }
+        }
+      }
+      if (newList.containsKey(instance)) {
+        necessaryChange += newList.get(instance).size();
+        for (Map<String, String> nodeState : newMap.values()) {
+          if (nodeState.containsKey(instance)) {
+            if (nodeState.get(instance).equalsIgnoreCase(topState)) {
+              necessarytopStateChange++;
+            }
+          }
+        }
+      }
+    }
+
+    if (verbose) {
+      System.out.println(
+          "\t\t\t" + "Total partition change count: \t" + totalChange + 
"\t/\t" + totalPartition
+              + "\t, rate: \t" + (((float) totalChange) / totalPartition * 
100) + "%\t"
+              + "Diff nodes have partition \t" + necessaryChange + "\t, 
unnecessary change rate: \t"
+              + (((float) totalChange - necessaryChange) / totalPartition * 
100)
+              + "%\t, which is \t" + (((float) totalChange - necessaryChange) 
/ totalChange * 100)
+              + "%\t of the movement.");
+    }
+
+    double expectedAverageMv =
+        (double) totalPartition / Math.max(list.size(), newList.size()) * 
deltaNodes.size();
+
+    return new double[] { nodeChanged, totalChange, (((double) totalChange) / 
totalPartition * 100),
+        (((double) totalChange - necessaryChange) / totalPartition * 100), 
totalTopStateChange,
+        (((double) totalTopStateChange) / totalTopState * 100),
+        (((double) totalTopStateChange - necessarytopStateChange) / 
totalTopState * 100),
+        expectedAverageMv, (((double) totalChange - expectedAverageMv) / 
totalPartition * 100),
+        totalTopStateChangeWithNewDeployment,
+        (((double) totalTopStateChangeWithNewDeployment) / totalTopState * 100)
+    };
+  }
+
+  private Map<String, Set<String>> convertMapping(Map<String, Map<String, 
String>> map) {
+    Map<String, Set<String>> list = new HashMap<>();
+    for (String partition : map.keySet()) {
+      for (String instance : map.get(partition).keySet()) {
+        if (!list.containsKey(instance)) {
+          list.put(instance, new HashSet<String>());
+        }
+        list.get(instance).add(partition);
+      }
+    }
+    return list;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
index d886e44..08608f3 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
@@ -26,8 +26,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
-import 
org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy;
+
+import org.apache.helix.controller.rebalancer.strategy.*;
 import org.apache.helix.integration.common.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -111,7 +111,8 @@ public class TestCrushAutoRebalance extends 
ZkIntegrationTestBase {
   @DataProvider(name = "rebalanceStrategies")
   public static Object [][] rebalanceStrategies() {
     return new String[][] { {"CrushRebalanceStrategy", 
CrushRebalanceStrategy.class.getName()},
-        {"MultiRoundCrushRebalanceStrategy", 
MultiRoundCrushRebalanceStrategy.class.getName()}
+        {"MultiRoundCrushRebalanceStrategy", 
MultiRoundCrushRebalanceStrategy.class.getName()},
+        {"CrushEdRebalanceStrategy", CrushEdRebalanceStrategy.class.getName()}
     };
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java
index c11ee87..85059b6 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java
@@ -19,33 +19,25 @@ package 
org.apache.helix.integration.rebalancer.CrushRebalancers;
  * under the License.
  */
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import org.apache.helix.ConfigAccessor;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.model.BuiltInStateModelDefinitions;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
+import org.apache.helix.model.*;
 import org.apache.helix.model.IdealState.RebalanceMode;
-import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.tools.ClusterSetup;
-import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import java.util.*;
+
 public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase {
   final int NUM_NODE = 6;
   protected static final int START_PORT = 12918;
@@ -59,7 +51,7 @@ public class TestCrushAutoRebalanceNonRack extends 
ZkStandAloneCMTestBase {
   List<MockParticipantManager> _participants = new 
ArrayList<MockParticipantManager>();
   Map<String, String> _nodeToTagMap = new HashMap<String, String>();
   List<String> _nodes = new ArrayList<String>();
-  Set<String> _allDBs = new HashSet<String>();
+  Set<String> _allDBs = new HashSet<>();
   int _replica = 3;
 
   private static String[] _testModels = { 
BuiltInStateModelDefinitions.OnlineOffline.name(),
@@ -91,7 +83,8 @@ public class TestCrushAutoRebalanceNonRack extends 
ZkStandAloneCMTestBase {
       String tag = "tag-" + i % 2;
       _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, 
storageNodeName, tag);
       _nodeToTagMap.put(storageNodeName, tag);
-      InstanceConfig instanceConfig = 
configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName);
+      InstanceConfig instanceConfig =
+          configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName);
       instanceConfig.setDomain("instance=" + storageNodeName);
       configAccessor.setInstanceConfig(CLUSTER_NAME, storageNodeName, 
instanceConfig);
     }
@@ -112,13 +105,15 @@ public class TestCrushAutoRebalanceNonRack extends 
ZkStandAloneCMTestBase {
     //enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true);
   }
 
-  @DataProvider(name = "rebalanceStrategies") public static String[][] 
rebalanceStrategies() {
-    return new String[][] { { "CrushRebalanceStrategy", 
CrushRebalanceStrategy.class.getName() } };
+  @DataProvider(name = "rebalanceStrategies")
+  public static String[][] rebalanceStrategies() {
+    return new String[][] { { "CrushRebalanceStrategy", 
CrushRebalanceStrategy.class.getName() },
+        {"CrushEdRebalanceStrategy", CrushEdRebalanceStrategy.class.getName()}
+    };
   }
 
   @Test(dataProvider = "rebalanceStrategies", enabled = true)
-  public void test(String rebalanceStrategyName, String rebalanceStrategyClass)
-      throws Exception {
+  public void test(String rebalanceStrategyName, String 
rebalanceStrategyClass) throws Exception {
     System.out.println("Test " + rebalanceStrategyName);
     int i = 0;
     for (String stateModel : _testModels) {
@@ -131,10 +126,9 @@ public class TestCrushAutoRebalanceNonRack extends 
ZkStandAloneCMTestBase {
     Thread.sleep(300);
 
     HelixClusterVerifier _clusterVerifier =
-        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setResources(_allDBs).build();
     Assert.assertTrue(_clusterVerifier.verify(5000));
-
     for (String db : _allDBs) {
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
       ExternalView ev =
@@ -161,8 +155,8 @@ public class TestCrushAutoRebalanceNonRack extends 
ZkStandAloneCMTestBase {
     }
     Thread.sleep(300);
 
-      HelixClusterVerifier _clusterVerifier =
-        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+    HelixClusterVerifier _clusterVerifier =
+        new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setResources(_allDBs).build();
     Assert.assertTrue(_clusterVerifier.verify(5000));
     for (String db : _allDBs) {
@@ -173,11 +167,12 @@ public class TestCrushAutoRebalanceNonRack extends 
ZkStandAloneCMTestBase {
     }
   }
 
-  @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods 
= { "test",
-      "testWithInstanceTag"})
+  @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods 
= {
+      "testWithInstanceTag"
+  })
   public void testLackEnoughLiveInstances(String rebalanceStrategyName,
       String rebalanceStrategyClass) throws Exception {
-    System.out.println("TestLackEnoughInstances " + rebalanceStrategyName);
+    System.out.println("TestLackEnoughLiveInstances " + rebalanceStrategyName);
     enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
 
     // shutdown participants, keep only two left
@@ -185,9 +180,9 @@ public class TestCrushAutoRebalanceNonRack extends 
ZkStandAloneCMTestBase {
       _participants.get(i).syncStop();
     }
 
-    int i = 0;
+    int j = 0;
     for (String stateModel : _testModels) {
-      String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
+      String db = "Test-DB-" + rebalanceStrategyName + "-" + j++;
       _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, 
stateModel,
           RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass);
       _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
@@ -196,22 +191,26 @@ public class TestCrushAutoRebalanceNonRack extends 
ZkStandAloneCMTestBase {
     Thread.sleep(300);
 
     HelixClusterVerifier _clusterVerifier =
-        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setResources(_allDBs).build();
     Assert.assertTrue(_clusterVerifier.verify(5000));
-
     for (String db : _allDBs) {
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       validateIsolation(is, ev, 2);
     }
+
+    for (int i = 2; i < _participants.size(); i++) {
+      _participants.get(i).syncStart();
+    }
   }
 
-  @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods 
= { "test",
-      "testWithInstanceTag"})
-  public void testLackEnoughInstances(String rebalanceStrategyName,
-      String rebalanceStrategyClass) throws Exception {
+  @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods 
= {
+      "testLackEnoughLiveInstances"
+  })
+  public void testLackEnoughInstances(String rebalanceStrategyName, String 
rebalanceStrategyClass)
+      throws Exception {
     System.out.println("TestLackEnoughInstances " + rebalanceStrategyName);
     enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
 
@@ -225,27 +224,42 @@ public class TestCrushAutoRebalanceNonRack extends 
ZkStandAloneCMTestBase {
       _setupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName());
     }
 
-    int i = 0;
+    int j = 0;
     for (String stateModel : _testModels) {
-      String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
+      String db = "Test-DB-" + rebalanceStrategyName + "-" + j++;
       _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, 
stateModel,
           RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass);
       _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
       _allDBs.add(db);
     }
     Thread.sleep(300);
-
     HelixClusterVerifier _clusterVerifier =
-        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
             .setResources(_allDBs).build();
     Assert.assertTrue(_clusterVerifier.verify());
-
     for (String db : _allDBs) {
       IdealState is = 
_setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
       ExternalView ev =
           
_setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
       validateIsolation(is, ev, 2);
     }
+
+    // recover test environment
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    for (int i = 2; i < _participants.size(); i++) {
+      String storageNodeName = _participants.get(i).getInstanceName();
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+
+      InstanceConfig instanceConfig =
+          configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName);
+      instanceConfig.setDomain("instance=" + storageNodeName);
+      configAccessor.setInstanceConfig(CLUSTER_NAME, storageNodeName, 
instanceConfig);
+
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName);
+      participant.syncStart();
+      _participants.set(i, participant);
+    }
   }
 
   /**
@@ -267,7 +281,8 @@ public class TestCrushAutoRebalanceNonRack extends 
ZkStandAloneCMTestBase {
     }
   }
 
-  @AfterMethod public void afterMethod() throws Exception {
+  @AfterMethod
+  public void afterMethod() throws Exception {
     for (String db : _allDBs) {
       _setupTool.dropResourceFromCluster(CLUSTER_NAME, db);
     }

Reply via email to