[HELIX-568] Add new topology aware (rack-aware) rebalance strategy based on 
CRUSH algorithm.
Design doc is available at: 
https://cwiki.apache.org/confluence/display/HELIX/Helix+Topology-aware+Rebalance+Strategy


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/7147ec87
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/7147ec87
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/7147ec87

Branch: refs/heads/helix-0.6.x
Commit: 7147ec874e912f27905c299fefe0d09ca31ebd42
Parents: ea0fbbb
Author: Lei Xia <[email protected]>
Authored: Thu Jun 16 12:06:34 2016 -0700
Committer: Lei Xia <[email protected]>
Committed: Mon Sep 12 10:06:33 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/helix/HelixAdmin.java  |  30 +
 .../java/org/apache/helix/HelixConstants.java   |   4 +
 .../main/java/org/apache/helix/PropertyKey.java |   3 +-
 .../controller/rebalancer/AutoRebalancer.java   |  15 +-
 .../helix/controller/rebalancer/Rebalancer.java |   1 -
 .../strategy/AutoRebalanceStrategy.java         | 754 ++++++++++++++++++
 .../strategy/CrushRebalanceStrategy.java        | 174 +++++
 .../rebalancer/strategy/RebalanceStrategy.java  |  57 ++
 .../crushMapping/CRUSHPlacementAlgorithm.java   | 316 ++++++++
 .../strategy/crushMapping/JenkinsHash.java      | 140 ++++
 .../controller/rebalancer/topology/Node.java    | 208 +++++
 .../rebalancer/topology/Topology.java           | 295 +++++++
 .../controller/stages/ClusterDataCache.java     |  14 +-
 .../strategy/AutoRebalanceStrategy.java         | 753 ------------------
 .../controller/strategy/RebalanceStrategy.java  |  52 --
 .../apache/helix/manager/zk/ZKHelixAdmin.java   |  43 +-
 .../org/apache/helix/model/ClusterConfig.java   |  92 +++
 .../java/org/apache/helix/model/IdealState.java |   2 +-
 .../org/apache/helix/model/InstanceConfig.java  |  50 +-
 .../task/GenericTaskAssignmentCalculator.java   |   7 +-
 .../org/apache/helix/tools/ClusterSetup.java    |   6 +
 .../Strategy/TestAutoRebalanceStrategy.java     | 766 +++++++++++++++++++
 .../strategy/TestAutoRebalanceStrategy.java     | 765 ------------------
 .../helix/controller/strategy/TestTopology.java | 172 +++++
 .../integration/TestCrushAutoRebalance.java     | 221 ++++++
 .../manager/MockParticipantManager.java         |  10 +-
 26 files changed, 3355 insertions(+), 1595 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index fbfab26..aeacd4b 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -114,6 +114,18 @@ public interface HelixAdmin {
       String stateModelRef, String rebalancerMode);
 
   /**
+   * Add a resource to a cluster
+   * @param clusterName
+   * @param resourceName
+   * @param numPartitions
+   * @param stateModelRef
+   * @param rebalancerMode
+   * @param rebalanceStrategy
+   */
+  void addResource(String clusterName, String resourceName, int numPartitions,
+      String stateModelRef, String rebalancerMode, String rebalanceStrategy);
+
+  /**
    * Add a resource to a cluster, using a bucket size > 1
    * @param clusterName
    * @param resourceName
@@ -138,6 +150,22 @@ public interface HelixAdmin {
   void addResource(String clusterName, String resourceName, int numPartitions,
       String stateModelRef, String rebalancerMode, int bucketSize, int 
maxPartitionsPerInstance);
 
+
+  /**
+   * Add a resource to a cluster, using a bucket size > 1
+   * @param clusterName
+   * @param resourceName
+   * @param numPartitions
+   * @param stateModelRef
+   * @param rebalancerMode
+   * @param rebalanceStrategy
+   * @param bucketSize
+   * @param maxPartitionsPerInstance
+   */
+  void addResource(String clusterName, String resourceName, int numPartitions,
+      String stateModelRef, String rebalancerMode, String rebalanceStrategy, 
int bucketSize,
+      int maxPartitionsPerInstance);
+
   /**
    * Add an instance to a cluster
    * @param clusterName
@@ -411,6 +439,8 @@ public interface HelixAdmin {
    */
   void removeInstanceTag(String clusterName, String instanceName, String tag);
 
+  void setInstanceZoneId(String clusterName, String instanceName, String 
zoneId);
+
   /**
    * Release resources
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/HelixConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java 
b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
index 5318fa9..6de0ff1 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
@@ -43,6 +43,10 @@ public interface HelixConstants {
     ANY_LIVEINSTANCE
   }
 
+  /**
+   * Replaced by ClusterConfig.ClusterConfigProperty.
+   */
+  @Deprecated
   enum ClusterConfigType {
     HELIX_DISABLE_PIPELINE_TRIGGERS,
     DISABLE_FULL_AUTO // override all resources in the cluster to use 
SEMI-AUTO instead of FULL-AUTO

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java 
b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index 33355f1..0125902 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -38,6 +38,7 @@ import static 
org.apache.helix.PropertyType.STATUSUPDATES_CONTROLLER;
 
 import java.util.Arrays;
 
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Error;
@@ -186,7 +187,7 @@ public class PropertyKey {
      * @return {@link PropertyKey}
      */
     public PropertyKey clusterConfig() {
-      return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, 
HelixProperty.class,
+      return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, 
ClusterConfig.class,
           _clusterName, ConfigScopeProperty.CLUSTER.toString(), _clusterName);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
index 6682426..ba237b1 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java
@@ -35,8 +35,8 @@ import 
org.apache.helix.controller.rebalancer.internal.MappingCalculator;
 import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
 import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
-import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
-import org.apache.helix.controller.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.LiveInstance;
@@ -79,8 +79,8 @@ public class AutoRebalancer implements Rebalancer, 
MappingCalculator {
     Map<String, LiveInstance> liveInstance = clusterData.getLiveInstances();
     String replicas = currentIdealState.getReplicas();
 
-    LinkedHashMap<String, Integer> stateCountMap = new LinkedHashMap<String, 
Integer>();
-    stateCountMap = stateCount(stateModelDef, liveInstance.size(), 
Integer.parseInt(replicas));
+    LinkedHashMap<String, Integer> stateCountMap =
+        stateCount(stateModelDef, liveInstance.size(), 
Integer.parseInt(replicas));
     List<String> liveNodes = new ArrayList<String>(liveInstance.keySet());
     List<String> allNodes = new 
ArrayList<String>(clusterData.getInstanceConfigMap().keySet());
     allNodes.removeAll(clusterData.getDisabledInstances());
@@ -129,7 +129,8 @@ public class AutoRebalancer implements Rebalancer, 
MappingCalculator {
     int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
 
     String rebalanceStrategyName = currentIdealState.getRebalanceStrategy();
-    if (rebalanceStrategyName == null || 
rebalanceStrategyName.equalsIgnoreCase("default")) {
+    if (rebalanceStrategyName == null || rebalanceStrategyName
+        .equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) {
       _rebalanceStrategy =
           new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, 
maxPartition);
     } else {
@@ -152,8 +153,8 @@ public class AutoRebalancer implements Rebalancer, 
MappingCalculator {
       }
     }
 
-    ZNRecord newMapping =
-        _rebalanceStrategy.computePartitionAssignment(liveNodes, 
currentMapping, allNodes);
+    ZNRecord newMapping = _rebalanceStrategy
+        .computePartitionAssignment(allNodes, liveNodes, currentMapping, 
clusterData);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("currentMapping: " + currentMapping);

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
index f5a4ae8..6935378 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/Rebalancer.java
@@ -46,5 +46,4 @@ public interface Rebalancer {
    */
   IdealState computeNewIdealState(String resourceName, IdealState 
currentIdealState,
       final CurrentStateOutput currentStateOutput, final ClusterDataCache 
clusterData);
-
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
new file mode 100644
index 0000000..868d207
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AutoRebalanceStrategy.java
@@ -0,0 +1,754 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.log4j.Logger;
+
+public class AutoRebalanceStrategy implements RebalanceStrategy {
+  private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class);
+  private final ReplicaPlacementScheme _placementScheme;
+
+  private String _resourceName;
+  private List<String> _partitions;
+  private LinkedHashMap<String, Integer> _states;
+  private int _maximumPerNode;
+
+  private Map<String, Node> _nodeMap;
+  private List<Node> _liveNodesList;
+  private Map<Integer, String> _stateMap;
+
+  private Map<Replica, Node> _preferredAssignment;
+  private Map<Replica, Node> _existingPreferredAssignment;
+  private Map<Replica, Node> _existingNonPreferredAssignment;
+  private Set<Replica> _orphaned;
+
+  public AutoRebalanceStrategy(String resourceName, final List<String> 
partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    init(resourceName, partitions, states, maximumPerNode);
+    _placementScheme = new DefaultPlacementScheme();
+  }
+
+  public AutoRebalanceStrategy(String resourceName, final List<String> 
partitions,
+      final LinkedHashMap<String, Integer> states) {
+    this(resourceName, partitions, states, Integer.MAX_VALUE);
+  }
+
+  @Override
+  public void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    _resourceName = resourceName;
+    _partitions = partitions;
+    _states = states;
+    _maximumPerNode = maximumPerNode;
+  }
+
+  @Override
+  public ZNRecord computePartitionAssignment(final List<String> allNodes, 
final List<String> liveNodes,
+      final Map<String, Map<String, String>> currentMapping, ClusterDataCache 
clusterData) {
+    int numReplicas = countStateReplicas();
+    ZNRecord znRecord = new ZNRecord(_resourceName);
+    if (liveNodes.size() == 0) {
+      return znRecord;
+    }
+    int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size();
+    int distFloor = (numReplicas * _partitions.size()) / liveNodes.size();
+    _nodeMap = new HashMap<String, Node>();
+    _liveNodesList = new ArrayList<Node>();
+
+    for (String id : allNodes) {
+      Node node = new Node(id);
+      node.capacity = 0;
+      node.hasCeilingCapacity = false;
+      _nodeMap.put(id, node);
+    }
+    for (int i = 0; i < liveNodes.size(); i++) {
+      boolean usingCeiling = false;
+      int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, 
_maximumPerNode) : distFloor;
+      if (distRemainder > 0 && targetSize < _maximumPerNode) {
+        targetSize += 1;
+        distRemainder = distRemainder - 1;
+        usingCeiling = true;
+      }
+      Node node = _nodeMap.get(liveNodes.get(i));
+      node.isAlive = true;
+      node.capacity = targetSize;
+      node.hasCeilingCapacity = usingCeiling;
+      _liveNodesList.add(node);
+    }
+
+    // compute states for all replica ids
+    _stateMap = generateStateMap();
+
+    // compute the preferred mapping if all nodes were up
+    _preferredAssignment = computePreferredPlacement(allNodes);
+
+    // logger.info("preferred mapping:"+ preferredAssignment);
+    // from current mapping derive the ones in preferred location
+    // this will update the nodes with their current fill status
+    _existingPreferredAssignment = 
computeExistingPreferredPlacement(currentMapping);
+
+    // from current mapping derive the ones not in preferred location
+    _existingNonPreferredAssignment = 
computeExistingNonPreferredPlacement(currentMapping);
+
+    // compute orphaned replicas that are not assigned to any node
+    _orphaned = computeOrphaned();
+    if (logger.isInfoEnabled()) {
+      logger.info("orphan = " + _orphaned);
+    }
+
+    moveNonPreferredReplicasToPreferred();
+
+    assignOrphans();
+
+    moveExcessReplicas();
+
+    prepareResult(znRecord);
+    return znRecord;
+  }
+
+  /**
+   * Move replicas assigned to non-preferred nodes if their current node is at 
capacity
+   * and its preferred node is under capacity.
+   */
+  private void moveNonPreferredReplicasToPreferred() {
+    // iterate through non preferred and see if we can move them to the
+    // preferred location if the donor has more than it should and stealer has
+    // enough capacity
+    Iterator<Entry<Replica, Node>> iterator = 
_existingNonPreferredAssignment.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Entry<Replica, Node> entry = iterator.next();
+      Replica replica = entry.getKey();
+      Node donor = entry.getValue();
+      Node receiver = _preferredAssignment.get(replica);
+      if (donor.capacity < donor.currentlyAssigned
+          && receiver.capacity > receiver.currentlyAssigned && 
receiver.canAdd(replica)) {
+        donor.currentlyAssigned = donor.currentlyAssigned - 1;
+        receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+        donor.nonPreferred.remove(replica);
+        receiver.preferred.add(replica);
+        donor.newReplicas.remove(replica);
+        receiver.newReplicas.add(replica);
+        iterator.remove();
+      }
+    }
+  }
+
+  /**
+   * Slot in orphaned partitions randomly so as to maintain even load on live 
nodes.
+   */
+  private void assignOrphans() {
+    // now iterate over nodes and remaining orphaned partitions and assign
+    // partitions randomly
+    // Better to iterate over orphaned partitions first
+    Iterator<Replica> it = _orphaned.iterator();
+    while (it.hasNext()) {
+      Replica replica = it.next();
+      boolean added = false;
+      int startIndex = computeRandomStartIndex(replica);
+      for (int index = startIndex; index < startIndex + _liveNodesList.size(); 
index++) {
+        Node receiver = _liveNodesList.get(index % _liveNodesList.size());
+        if (receiver.capacity > receiver.currentlyAssigned && 
receiver.canAdd(replica)) {
+          receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+          receiver.nonPreferred.add(replica);
+          receiver.newReplicas.add(replica);
+          added = true;
+          break;
+        }
+      }
+      if (!added) {
+        // try adding the replica by making room for it
+        added = assignOrphanByMakingRoom(replica);
+      }
+      if (added) {
+        it.remove();
+      }
+    }
+    if (_orphaned.size() > 0 && logger.isInfoEnabled()) {
+      logger.info("could not assign nodes to partitions: " + _orphaned);
+    }
+  }
+
+  /**
+   * If an orphan can't be assigned normally, see if a node can borrow 
capacity to accept it
+   * @param replica The replica to assign
+   * @return true if the assignment succeeded, false otherwise
+   */
+  private boolean assignOrphanByMakingRoom(Replica replica) {
+    Node capacityDonor = null;
+    Node capacityAcceptor = null;
+    int startIndex = computeRandomStartIndex(replica);
+    for (int index = startIndex; index < startIndex + _liveNodesList.size(); 
index++) {
+      Node current = _liveNodesList.get(index % _liveNodesList.size());
+      if (current.hasCeilingCapacity && current.capacity > 
current.currentlyAssigned
+          && !current.canAddIfCapacity(replica) && capacityDonor == null) {
+        // this node has space but cannot accept the node
+        capacityDonor = current;
+      } else if (!current.hasCeilingCapacity && current.capacity == 
current.currentlyAssigned
+          && current.canAddIfCapacity(replica) && capacityAcceptor == null) {
+        // this node would be able to accept the replica if it has ceiling 
capacity
+        capacityAcceptor = current;
+      }
+      if (capacityDonor != null && capacityAcceptor != null) {
+        break;
+      }
+    }
+    if (capacityDonor != null && capacityAcceptor != null) {
+      // transfer ceiling capacity and add the node
+      capacityAcceptor.steal(capacityDonor, replica);
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Move replicas from too-full nodes to nodes that can accept the replicas
+   */
+  private void moveExcessReplicas() {
+    // iterate over nodes and move extra load
+    Iterator<Replica> it;
+    for (Node donor : _liveNodesList) {
+      if (donor.capacity < donor.currentlyAssigned) {
+        Collections.sort(donor.nonPreferred);
+        it = donor.nonPreferred.iterator();
+        while (it.hasNext()) {
+          Replica replica = it.next();
+          int startIndex = computeRandomStartIndex(replica);
+          for (int index = startIndex; index < startIndex + 
_liveNodesList.size(); index++) {
+            Node receiver = _liveNodesList.get(index % _liveNodesList.size());
+            if (receiver.canAdd(replica)) {
+              receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
+              receiver.nonPreferred.add(replica);
+              donor.currentlyAssigned = donor.currentlyAssigned - 1;
+              it.remove();
+              break;
+            }
+          }
+          if (donor.capacity >= donor.currentlyAssigned) {
+            break;
+          }
+        }
+        if (donor.capacity < donor.currentlyAssigned) {
+          logger.warn("Could not take partitions out of node:" + donor.id);
+        }
+      }
+    }
+  }
+
+  /**
+   * Update a ZNRecord with the results of the rebalancing.
+   * @param znRecord
+   */
+  private void prepareResult(ZNRecord znRecord) {
+    // The map fields are keyed on partition name to a pair of node and state, 
i.e. it
+    // indicates that the partition with given state is served by that node
+    //
+    // The list fields are also keyed on partition and list all the nodes 
serving that partition.
+    // This is useful to verify that there is no node serving multiple 
replicas of the same
+    // partition.
+    Map<String, List<String>> newPreferences = new TreeMap<String, 
List<String>>();
+    for (String partition : _partitions) {
+      znRecord.setMapField(partition, new TreeMap<String, String>());
+      znRecord.setListField(partition, new ArrayList<String>());
+      newPreferences.put(partition, new ArrayList<String>());
+    }
+
+    // for preference lists, the rough priority that we want is:
+    // [existing preferred, existing non-preferred, non-existing preferred, 
non-existing
+    // non-preferred]
+    for (Node node : _liveNodesList) {
+      for (Replica replica : node.preferred) {
+        if (node.newReplicas.contains(replica)) {
+          newPreferences.get(replica.partition).add(node.id);
+        } else {
+          znRecord.getListField(replica.partition).add(node.id);
+        }
+      }
+    }
+    for (Node node : _liveNodesList) {
+      for (Replica replica : node.nonPreferred) {
+        if (node.newReplicas.contains(replica)) {
+          newPreferences.get(replica.partition).add(node.id);
+        } else {
+          znRecord.getListField(replica.partition).add(node.id);
+        }
+      }
+    }
+    normalizePreferenceLists(znRecord.getListFields(), newPreferences);
+
+    // generate preference maps based on the preference lists
+    for (String partition : _partitions) {
+      List<String> preferenceList = znRecord.getListField(partition);
+      int i = 0;
+      for (String participant : preferenceList) {
+        znRecord.getMapField(partition).put(participant, _stateMap.get(i));
+        i++;
+      }
+    }
+  }
+
+  /**
+   * Adjust preference lists to reduce the number of same replicas on an 
instance. This will
+   * separately normalize two sets of preference lists, and then append the 
results of the second
+   * set to those of the first. This basically ensures that existing replicas 
are automatically
+   * preferred.
+   * @param preferenceLists map of (partition --> list of nodes)
+   * @param newPreferences map containing node preferences not consistent with 
the current
+   *          assignment
+   */
+  private void normalizePreferenceLists(Map<String, List<String>> 
preferenceLists,
+      Map<String, List<String>> newPreferences) {
+
+    Map<String, Map<String, Integer>> nodeReplicaCounts =
+        new HashMap<String, Map<String, Integer>>();
+    for (String partition : preferenceLists.keySet()) {
+      normalizePreferenceList(preferenceLists.get(partition), 
nodeReplicaCounts);
+    }
+    for (String partition : newPreferences.keySet()) {
+      normalizePreferenceList(newPreferences.get(partition), 
nodeReplicaCounts);
+      preferenceLists.get(partition).addAll(newPreferences.get(partition));
+    }
+  }
+
+  /**
+   * Adjust a single preference list for replica assignment imbalance
+   * @param preferenceList list of node names
+   * @param nodeReplicaCounts map of (node --> state --> count)
+   */
+  private void normalizePreferenceList(List<String> preferenceList,
+      Map<String, Map<String, Integer>> nodeReplicaCounts) {
+    List<String> newPreferenceList = new ArrayList<String>();
+    int replicas = Math.min(countStateReplicas(), preferenceList.size());
+
+    // make this a LinkedHashSet to preserve iteration order
+    Set<String> notAssigned = new LinkedHashSet<String>(preferenceList);
+    for (int i = 0; i < replicas; i++) {
+      String state = _stateMap.get(i);
+      String node = getMinimumNodeForReplica(state, notAssigned, 
nodeReplicaCounts);
+      newPreferenceList.add(node);
+      notAssigned.remove(node);
+      Map<String, Integer> counts = nodeReplicaCounts.get(node);
+      counts.put(state, counts.get(state) + 1);
+    }
+    preferenceList.clear();
+    preferenceList.addAll(newPreferenceList);
+  }
+
+  /**
+   * Get the node which hosts the fewest of a given replica
+   * @param state the state
+   * @param nodes nodes to check
+   * @param nodeReplicaCounts current assignment of replicas
+   * @return the node most willing to accept the replica
+   */
+  private String getMinimumNodeForReplica(String state, Set<String> nodes,
+      Map<String, Map<String, Integer>> nodeReplicaCounts) {
+    String minimalNode = null;
+    int minimalCount = Integer.MAX_VALUE;
+    for (String node : nodes) {
+      int count = getReplicaCountForNode(state, node, nodeReplicaCounts);
+      if (count < minimalCount) {
+        minimalCount = count;
+        minimalNode = node;
+      }
+    }
+    return minimalNode;
+  }
+
+  /**
+   * Safe check for the number of replicas of a given id assiged to a node
+   * @param state the state to assign
+   * @param node the node to check
+   * @param nodeReplicaCounts a map of node to replica id and counts
+   * @return the number of currently assigned replicas of the given id
+   */
+  private int getReplicaCountForNode(String state, String node,
+      Map<String, Map<String, Integer>> nodeReplicaCounts) {
+    if (!nodeReplicaCounts.containsKey(node)) {
+      Map<String, Integer> replicaCounts = new HashMap<String, Integer>();
+      replicaCounts.put(state, 0);
+      nodeReplicaCounts.put(node, replicaCounts);
+      return 0;
+    }
+    Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node);
+    if (!replicaCounts.containsKey(state)) {
+      replicaCounts.put(state, 0);
+      return 0;
+    }
+    return replicaCounts.get(state);
+  }
+
+  /**
+   * Compute the subset of the current mapping where replicas are not mapped 
according to their
+   * preferred assignment.
+   * @param currentMapping Current mapping of replicas to nodes
+   * @return The current assignments that do not conform to the preferred 
assignment
+   */
+  private Map<Replica, Node> computeExistingNonPreferredPlacement(
+      Map<String, Map<String, String>> currentMapping) {
+    Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, 
Node>();
+    int count = countStateReplicas();
+    for (String partition : currentMapping.keySet()) {
+      Map<String, String> nodeStateMap = currentMapping.get(partition);
+      nodeStateMap.keySet().retainAll(_nodeMap.keySet());
+      for (String nodeId : nodeStateMap.keySet()) {
+        Node node = _nodeMap.get(nodeId);
+        boolean skip = false;
+        for (Replica replica : node.preferred) {
+          if (replica.partition.equals(partition)) {
+            skip = true;
+            break;
+          }
+        }
+        if (skip) {
+          continue;
+        }
+        // check if its in one of the preferred position
+        for (int replicaId = 0; replicaId < count; replicaId++) {
+          Replica replica = new Replica(partition, replicaId);
+          if (!_preferredAssignment.containsKey(replica)) {
+
+            logger.info("partitions: " + _partitions);
+            logger.info("currentMapping.keySet: " + currentMapping.keySet());
+            throw new IllegalArgumentException("partition: " + replica + " is 
in currentMapping but not in partitions");
+          }
+
+          if (_preferredAssignment.get(replica).id != node.id
+              && !_existingPreferredAssignment.containsKey(replica)
+              && !existingNonPreferredAssignment.containsKey(replica)) {
+            existingNonPreferredAssignment.put(replica, node);
+            node.nonPreferred.add(replica);
+
+            break;
+          }
+        }
+      }
+    }
+    return existingNonPreferredAssignment;
+  }
+
+  /**
+   * Get a live node index to try first for a replica so that each possible 
start index is
+   * roughly uniformly assigned.
+   * @param replica The replica to assign
+   * @return The starting node index to try
+   */
+  private int computeRandomStartIndex(final Replica replica) {
+    return (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size();
+  }
+
+  /**
+   * Get a set of replicas not currently assigned to any node
+   * @return Unassigned replicas
+   */
+  private Set<Replica> computeOrphaned() {
+    Set<Replica> orphanedPartitions = new 
TreeSet<Replica>(_preferredAssignment.keySet());
+    for (Replica r : _existingPreferredAssignment.keySet()) {
+      if (orphanedPartitions.contains(r)) {
+        orphanedPartitions.remove(r);
+      }
+    }
+    for (Replica r : _existingNonPreferredAssignment.keySet()) {
+      if (orphanedPartitions.contains(r)) {
+        orphanedPartitions.remove(r);
+      }
+    }
+
+    return orphanedPartitions;
+  }
+
+  /**
+   * Determine the replicas already assigned to their preferred nodes
+   * @param currentMapping Current assignment of replicas to nodes
+   * @return Assignments that conform to the preferred placement
+   */
+  private Map<Replica, Node> computeExistingPreferredPlacement(
+      final Map<String, Map<String, String>> currentMapping) {
+    Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, 
Node>();
+    int count = countStateReplicas();
+    for (String partition : currentMapping.keySet()) {
+      Map<String, String> nodeStateMap = currentMapping.get(partition);
+      nodeStateMap.keySet().retainAll(_nodeMap.keySet());
+      for (String nodeId : nodeStateMap.keySet()) {
+        Node node = _nodeMap.get(nodeId);
+        node.currentlyAssigned = node.currentlyAssigned + 1;
+        // check if its in one of the preferred position
+        for (int replicaId = 0; replicaId < count; replicaId++) {
+          Replica replica = new Replica(partition, replicaId);
+          if (_preferredAssignment.containsKey(replica)
+              && !existingPreferredAssignment.containsKey(replica)
+              && _preferredAssignment.get(replica).id == node.id) {
+            existingPreferredAssignment.put(replica, node);
+            node.preferred.add(replica);
+            break;
+          }
+        }
+      }
+    }
+
+    return existingPreferredAssignment;
+  }
+
+  /**
+   * Given a predefined set of all possible nodes, compute an assignment of 
replicas to
+   * nodes that evenly assigns all replicas to nodes.
+   * @param allNodes Identifiers to all nodes, live and non-live
+   * @return Preferred assignment of replicas
+   */
+  private Map<Replica, Node> computePreferredPlacement(final List<String> 
allNodes) {
+    Map<Replica, Node> preferredMapping;
+    preferredMapping = new HashMap<Replica, Node>();
+    int partitionId = 0;
+    int numReplicas = countStateReplicas();
+    int count = countStateReplicas();
+    for (String partition : _partitions) {
+      for (int replicaId = 0; replicaId < count; replicaId++) {
+        Replica replica = new Replica(partition, replicaId);
+        String nodeName =
+            _placementScheme.getLocation(partitionId, replicaId, 
_partitions.size(), numReplicas,
+                allNodes);
+        preferredMapping.put(replica, _nodeMap.get(nodeName));
+      }
+      partitionId = partitionId + 1;
+    }
+    return preferredMapping;
+  }
+
+  /**
+   * Counts the total number of replicas given a state-count mapping
+   * @return
+   */
+  private int countStateReplicas() {
+    int total = 0;
+    for (Integer count : _states.values()) {
+      total += count;
+    }
+    return total;
+  }
+
+  /**
+   * Compute a map of replica ids to state names
+   * @return Map: replica id -> state name
+   */
+  private Map<Integer, String> generateStateMap() {
+    int replicaId = 0;
+    Map<Integer, String> stateMap = new HashMap<Integer, String>();
+    for (String state : _states.keySet()) {
+      Integer count = _states.get(state);
+      for (int i = 0; i < count; i++) {
+        stateMap.put(replicaId, state);
+        replicaId++;
+      }
+    }
+    return stateMap;
+  }
+
+  /**
+   * A Node is an entity that can serve replicas. It has a capacity and 
knowledge
+   * of replicas assigned to it, so it can decide if it can receive additional 
replicas.
+   */
+  class Node {
+    public int currentlyAssigned;
+    public int capacity;
+    public boolean hasCeilingCapacity;
+    private final String id;
+    boolean isAlive;
+    private final List<Replica> preferred;
+    private final List<Replica> nonPreferred;
+    private final Set<Replica> newReplicas;
+
+    public Node(String id) {
+      preferred = new ArrayList<Replica>();
+      nonPreferred = new ArrayList<Replica>();
+      newReplicas = new TreeSet<Replica>();
+      currentlyAssigned = 0;
+      isAlive = false;
+      this.id = id;
+    }
+
+    /**
+     * Check if this replica can be legally added to this node
+     * @param replica The replica to test
+     * @return true if the assignment can be made, false otherwise
+     */
+    public boolean canAdd(Replica replica) {
+      if (currentlyAssigned >= capacity) {
+        return false;
+      }
+      return canAddIfCapacity(replica);
+    }
+
+    /**
+     * Check if this replica can be legally added to this node, provided that 
it has enough
+     * capacity.
+     * @param replica The replica to test
+     * @return true if the assignment can be made, false otherwise
+     */
+    public boolean canAddIfCapacity(Replica replica) {
+      if (!isAlive) {
+        return false;
+      }
+      for (Replica r : preferred) {
+        if (r.partition.equals(replica.partition)) {
+          return false;
+        }
+      }
+      for (Replica r : nonPreferred) {
+        if (r.partition.equals(replica.partition)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    /**
+     * Receive a replica by stealing capacity from another Node
+     * @param donor The node that has excess capacity
+     * @param replica The replica to receive
+     */
+    public void steal(Node donor, Replica replica) {
+      donor.hasCeilingCapacity = false;
+      donor.capacity--;
+      hasCeilingCapacity = true;
+      capacity++;
+      currentlyAssigned++;
+      nonPreferred.add(replica);
+      newReplicas.add(replica);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      
sb.append("##########\nname=").append(id).append("\npreferred:").append(preferred.size())
+          .append("\nnonpreferred:").append(nonPreferred.size());
+      return sb.toString();
+    }
+  }
+
+  /**
+   * A Replica is a combination of a partition of the resource, the state the 
replica is in
+   * and an identifier signifying a specific replica of a given partition and 
state.
+   */
+  class Replica implements Comparable<Replica> {
+    private String partition;
+    private int replicaId; // this is a partition-relative id
+    private String format;
+
+    public Replica(String partition, int replicaId) {
+      this.partition = partition;
+      this.replicaId = replicaId;
+      this.format = this.partition + "|" + this.replicaId;
+    }
+
+    @Override
+    public String toString() {
+      return format;
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that instanceof Replica) {
+        return this.format.equals(((Replica) that).format);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return this.format.hashCode();
+    }
+
+    @Override
+    public int compareTo(Replica that) {
+      if (that instanceof Replica) {
+        return this.format.compareTo(that.format);
+      }
+      return -1;
+    }
+  }
+
+  /**
+   * Interface for providing a custom approach to computing a replica's 
affinity to a node.
+   */
+  public interface ReplicaPlacementScheme {
+    /**
+     * Initialize global state
+     * @param manager The instance to which this placement is associated
+     */
+    public void init(final HelixManager manager);
+
+    /**
+     * Given properties of this replica, determine the node it would prefer to 
be served by
+     * @param partitionId The current partition
+     * @param replicaId The current replica with respect to the current 
partition
+     * @param numPartitions The total number of partitions
+     * @param numReplicas The total number of replicas per partition
+     * @param nodeNames A list of identifiers of all nodes, live and non-live
+     * @return The name of the node that would prefer to serve this replica
+     */
+    public String getLocation(int partitionId, int replicaId, int 
numPartitions, int numReplicas,
+        final List<String> nodeNames);
+  }
+
+  /**
+   * Compute preferred placements based on a default strategy that assigns 
replicas to nodes as
+   * evenly as possible while avoiding placing two replicas of the same 
partition on any node.
+   */
+  public static class DefaultPlacementScheme implements ReplicaPlacementScheme 
{
+    @Override
+    public void init(final HelixManager manager) {
+      // do nothing since this is independent of the manager
+    }
+
+    @Override
+    public String getLocation(int partitionId, int replicaId, int 
numPartitions, int numReplicas,
+        final List<String> nodeNames) {
+      int index;
+      if (nodeNames.size() > numPartitions) {
+        // assign replicas in partition order in case there are more nodes 
than partitions
+        index = (partitionId + replicaId * numPartitions) % nodeNames.size();
+      } else if (nodeNames.size() == numPartitions) {
+        // need a replica offset in case the sizes of these sets are the same
+        index =
+            ((partitionId + replicaId * numPartitions) % nodeNames.size() + 
replicaId)
+                % nodeNames.size();
+      } else {
+        // in all other cases, assigning a replica at a time for each 
partition is reasonable
+        index = (partitionId + replicaId) % nodeNames.size();
+      }
+      return nodeNames.get(index);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
new file mode 100644
index 0000000..a8fe107
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushRebalanceStrategy.java
@@ -0,0 +1,174 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import org.apache.helix.HelixException;
+import org.apache.helix.ZNRecord;
+import 
org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm;
+import 
org.apache.helix.controller.rebalancer.strategy.crushMapping.JenkinsHash;
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.apache.helix.controller.rebalancer.topology.Topology;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.model.InstanceConfig;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * CRUSH-based partition mapping strategy.
+ */
+public class CrushRebalanceStrategy implements RebalanceStrategy {
+  private String _resourceName;
+  private List<String> _partitions;
+  private Topology _clusterTopo;
+  private int _replicas;
+
+  @Override
+  public void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    _resourceName = resourceName;
+    _partitions = partitions;
+    _replicas = countStateReplicas(states);
+  }
+
+  /**
+   * Compute the preference lists and (optional partition-state mapping) for 
the given resource.
+   *
+   * @param allNodes       All instances
+   * @param liveNodes      List of live instances
+   * @param currentMapping current replica mapping
+   * @param clusterData    cluster data
+   * @return
+   * @throws HelixException if a map can not be found
+   */
+  @Override
+  public ZNRecord computePartitionAssignment(final List<String> allNodes,
+      final List<String> liveNodes, final Map<String, Map<String, String>> 
currentMapping,
+      ClusterDataCache clusterData) throws HelixException {
+    Map<String, InstanceConfig> instanceConfigMap = 
clusterData.getInstanceConfigMap();
+    _clusterTopo =
+        new Topology(allNodes, liveNodes, instanceConfigMap, 
clusterData.getClusterConfig());
+    Node topNode = _clusterTopo.getRootNode();
+
+    Map<String, List<String>> newPreferences = new HashMap<String, 
List<String>>();
+    for (int i = 0; i < _partitions.size(); i++) {
+      String partitionName = _partitions.get(i);
+      long data = partitionName.hashCode();
+
+      // apply the placement rules
+      List<Node> selected = select(topNode, data, _replicas);
+
+      List<String> nodeList = new ArrayList<String>();
+      for (int j = 0; j < selected.size(); j++) {
+        nodeList.add(selected.get(j).getName());
+      }
+
+      newPreferences.put(partitionName, nodeList);
+    }
+
+    ZNRecord result = new ZNRecord(_resourceName);
+    result.setListFields(newPreferences);
+
+    return result;
+  }
+
+  /**
+   * Number of retries for finding an appropriate instance for a replica.
+   */
+  private static final int MAX_RETRY = 100;
+  private final JenkinsHash hashFun = new JenkinsHash();
+  private CRUSHPlacementAlgorithm placementAlgorithm = new 
CRUSHPlacementAlgorithm();
+
+  /**
+   * Enforce isolation on the specified fault zone.
+   * The caller will either get the expected number of selected nodes as a 
result, or an exception will be thrown.
+   */
+  private List<Node> select(Node topNode, long data, int rf)
+      throws HelixException {
+    List<Node> nodes = new ArrayList<Node>(rf);
+    Set<Node> selectedZones = new HashSet<Node>();
+    long input = data;
+    int count = rf;
+    int tries = 0;
+    while (nodes.size() < rf) {
+      doSelect(topNode, input, count, nodes, selectedZones);
+      count = rf - nodes.size();
+      if (count > 0) {
+        input = hashFun.hash(input); // create a different hash value for 
retrying
+        tries++;
+        if (tries >= MAX_RETRY) {
+          throw new HelixException(
+              String.format("could not find all mappings after %d tries", 
tries));
+        }
+      }
+    }
+    return nodes;
+  }
+
+  private void doSelect(Node topNode, long input, int rf, List<Node> 
selectedNodes,
+      Set<Node> selectedZones) {
+    String zoneType = _clusterTopo.getFaultZoneType();
+    String endNodeType = _clusterTopo.getEndNodeType();
+
+    if (!zoneType.equals(endNodeType)) {
+      // pick fault zones first
+      List<Node> zones = placementAlgorithm
+          .select(topNode, input, rf, zoneType, 
nodeAlreadySelected(selectedZones));
+      // add the racks to the selected racks
+      selectedZones.addAll(zones);
+      // pick one end node from each fault zone.
+      for (Node zone : zones) {
+        List<Node> endNode = placementAlgorithm.select(zone, input, 1, 
endNodeType);
+        selectedNodes.addAll(endNode);
+      }
+    } else {
+      // pick end node directly
+      List<Node> nodes = placementAlgorithm.select(topNode, input, rf, 
endNodeType,
+          nodeAlreadySelected(new HashSet(selectedNodes)));
+      selectedNodes.addAll(nodes);
+    }
+  }
+
+  /**
+   * Use the predicate to reject already selected zones or nodes.
+   */
+  private Predicate<Node> nodeAlreadySelected(Set<Node> selectedNodes) {
+    return Predicates.not(Predicates.in(selectedNodes));
+  }
+
+  /**
+   * Counts the total number of replicas given a state-count mapping
+   * @return
+   */
+  private int countStateReplicas(Map<String, Integer> stateCountMap) {
+    int total = 0;
+    for (Integer count : stateCountMap.values()) {
+      total += count;
+    }
+    return total;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java
new file mode 100644
index 0000000..a3c7e94
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/RebalanceStrategy.java
@@ -0,0 +1,57 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.stages.ClusterDataCache;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Assignment strategy interface that computes the assignment of 
partition->instance.
+ */
+public interface RebalanceStrategy {
+  String DEFAULT_REBALANCE_STRATEGY = "DEFAULT";
+
+  /**
+   * Perform the necessary initialization for the rebalance strategy object.
+   *
+   * @param resourceName
+   * @param partitions
+   * @param states
+   * @param maximumPerNode
+   */
+  void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode);
+
+  /**
+   * Compute the preference lists and (optional partition-state mapping) for 
the given resource.
+   *
+   * @param liveNodes
+   * @param currentMapping
+   * @param allNodes
+   * @return
+   */
+  ZNRecord computePartitionAssignment(final List<String> allNodes,
+      final List<String> liveNodes, final Map<String, Map<String, String>> 
currentMapping,
+      ClusterDataCache clusterData);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
new file mode 100644
index 0000000..870656c
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java
@@ -0,0 +1,316 @@
+/**
+ * Copyright 2013 Twitter, Inc.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.helix.controller.rebalancer.strategy.crushMapping;
+
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.controller.rebalancer.topology.Node;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The transcription of the CRUSH placement algorithm from the Weil paper. 
This is a fairly simple
+ * adaptation, but a couple of important changes have been made to work with 
the crunch mapping.
+ */
+public class CRUSHPlacementAlgorithm {
+  /**
+   * In case the select() method fails to select after looping back to the 
origin of selection after
+   * so many tries, we stop the search. This constant denotes the maximum 
number of retries after
+   * looping back to the origin. It is expected that in most cases the 
selection will either succeed
+   * with a small number of tries, or it will never succeed. So a reasonably 
large number to
+   * distinguish these two cases should be sufficient.
+   */
+  private static final int MAX_LOOPBACK_COUNT = 50;
+  private static final Logger logger = 
LoggerFactory.getLogger(CRUSHPlacementAlgorithm.class);
+
+  private final boolean keepOffset;
+  private final Map<Long,Integer> roundOffset;
+
+  /**
+   * Creates the crush placement object.
+   */
+  public CRUSHPlacementAlgorithm() {
+    this(false);
+  }
+
+  /**
+   * Creates the crush placement algorithm with the indication whether the 
round offset should be
+   * kept for the duration of this object for successive selection of the same 
input.
+   */
+  public CRUSHPlacementAlgorithm(boolean keepOffset) {
+    this.keepOffset = keepOffset;
+    roundOffset = keepOffset ? new HashMap<Long,Integer>() : null;
+  }
+
+  /**
+   * Returns a list of (count) nodes of the desired type. If the count is more 
than the number of
+   * available nodes, an exception is thrown. Note that it is possible for 
this method to return a
+   * list whose size is smaller than the requested size (count) if it is 
unable to select all the
+   * nodes for any reason. Callers should check the size of the returned list 
and take action if
+   * needed.
+   */
+  public List<Node> select(Node parent, long input, int count, String type) {
+    return select(parent, input, count, type, Predicates.<Node>alwaysTrue());
+  }
+
+  public List<Node> select(Node parent, long input, int count, String type,
+      Predicate<Node> nodePredicate) {
+    int childCount = parent.getChildrenCount(type);
+    if (childCount < count) {
+      throw new IllegalArgumentException(count + " nodes of type " + type +
+          " were requested but the tree has only " + childCount + " nodes!");
+    }
+
+    List<Node> selected = new ArrayList<Node>(count);
+    // use the index stored in the map
+    Integer offset;
+    if (keepOffset) {
+      offset = roundOffset.get(input);
+      if (offset == null) {
+        offset = 0;
+        roundOffset.put(input, offset);
+      }
+    } else {
+      offset = 0;
+    }
+
+    int rPrime = 0;
+    for (int r = 1; r <= count; r++) {
+      int failure = 0;
+      // number of times we had to loop back to the origin
+      int loopbackCount = 0;
+      boolean escape = false;
+      boolean retryOrigin;
+      Node out = null;
+      do {
+        retryOrigin = false; // initialize at the outset
+        Node in = parent;
+        Set<Node> rejected = new HashSet<Node>();
+        boolean retryNode;
+        do {
+          retryNode = false; // initialize at the outset
+          rPrime = r + offset + failure;
+          logger.trace("{}.select({}, {})", new Object[] {in, input, rPrime});
+          Selector selector = new Selector(in);
+          out = selector.select(input, rPrime);
+          if (!out.getType().equalsIgnoreCase(type)) {
+            logger.trace("selected output {} for data {} didn't match the type 
{}: walking down " +
+                "the hierarchy...", new Object[] {out, input, type});
+            in = out; // walk down the hierarchy
+            retryNode = true; // stay within the node and walk down the tree
+          } else { // type matches
+            boolean predicateRejected = !nodePredicate.apply(out);
+            if (selected.contains(out) || predicateRejected) {
+              if (predicateRejected) {
+                logger.trace("{} was rejected by the node predicate for data 
{}: rejecting and " +
+                    "increasing rPrime", out, input);
+                rejected.add(out);
+              } else { // already selected
+                logger.trace("{} was already selected for data {}: rejecting 
and increasing rPrime",
+                    out, input);
+              }
+
+              // we need to see if we have selected all possible nodes from 
this parent, in which
+              // case we should loop back to the origin and start over
+              if (allChildNodesEliminated(in, selected, rejected)) {
+                logger.trace("all child nodes of {} have been eliminated", in);
+                if (loopbackCount == MAX_LOOPBACK_COUNT) {
+                  // we looped back the maximum times we specified; we give up 
search, and exit
+                  escape = true;
+                  break;
+                }
+                loopbackCount++;
+                logger.trace("looping back to the original parent node ({})", 
parent);
+                retryOrigin = true;
+              } else {
+                retryNode = true; // go back and reselect on the same parent
+              }
+              failure++;
+            } else if (nodeIsOut(out)) {
+              logger.trace("{} is marked as out (failed or over the maximum 
assignment) for data " +
+                  "{}! looping back to the original parent node", out, input);
+              failure++;
+              if (loopbackCount == MAX_LOOPBACK_COUNT) {
+                // we looped back the maximum times we specified; we give up 
search, and exit
+                escape = true;
+                break;
+              }
+              loopbackCount++;
+              // re-selection on the same parent is detrimental in case of 
node failure: loop back
+              // to the origin
+              retryOrigin = true;
+            } else {
+              // we got a successful selection
+              break;
+            }
+          }
+        } while (retryNode);
+      } while (retryOrigin);
+
+      if (escape) {
+        // cannot find a node under this parent; return a smaller set than was 
intended
+        logger.debug("we could not select a node for data {} under parent {}; 
a smaller data set " +
+            "than is requested will be returned", input, parent);
+        continue;
+      }
+
+      logger.trace("{} was selected for data {}", out, input);
+      selected.add(out);
+    }
+    if (keepOffset) {
+      roundOffset.put(input, rPrime);
+    }
+    return selected;
+  }
+
+
+  private boolean nodeIsOut(Node node) {
+    if (node.isLeaf() && node.isFailed()) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Examines the immediate child nodes of the given parent node, and sees if 
all of the children
+   * that can be selected (i.e. not failed) are already selected. This is used 
to determine whether
+   * this parent node should no longer be used in the selection.
+   */
+  private boolean allChildNodesEliminated(Node parent, List<Node> selected, 
Set<Node> rejected) {
+    List<Node> children = parent.getChildren();
+    if (children != null) {
+      for (Node child: children) {
+        if (!nodeIsOut(child) && !selected.contains(child) && 
!rejected.contains(child)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Selection algorithm based on the "straw" bucket type as described in the 
CRUSH algorithm.
+   */
+  private class Selector {
+    private final Map<Node,Long> straws = new HashMap<Node,Long>();
+    private final JenkinsHash hashFunction;
+
+    public Selector(Node node) {
+      if (!node.isLeaf()) {
+        // create a map from the nodes to their values
+        List<Node> sortedNodes = sortNodes(node.getChildren()); // do a 
reverse sort by weight
+
+        int numLeft = sortedNodes.size();
+        float straw = 1.0f;
+        float wbelow = 0.0f;
+        float lastw = 0.0f;
+        int i = 0;
+        final int length = sortedNodes.size();
+        while (i < length) {
+          Node current = sortedNodes.get(i);
+          if (current.getWeight() == 0) {
+            straws.put(current, 0L);
+            i++;
+            continue;
+          }
+          straws.put(current, (long)(straw*0x10000));
+          i++;
+          if (i == length) {
+            break;
+          }
+
+          current = sortedNodes.get(i);
+          Node previous = sortedNodes.get(i-1);
+          if (current.getWeight() == previous.getWeight()) {
+            continue;
+          }
+          wbelow += (float)(previous.getWeight() - lastw)*numLeft;
+          for (int j = i; j < length; j++) {
+            if (sortedNodes.get(j).getWeight() == current.getWeight()) {
+              numLeft--;
+            } else {
+              break;
+            }
+          }
+          float wnext = (float)(numLeft * (current.getWeight() - 
previous.getWeight()));
+          float pbelow = wbelow/(wbelow + wnext);
+          straw *= Math.pow(1.0/pbelow, 1.0/numLeft);
+          lastw = previous.getWeight();
+        }
+      }
+      hashFunction = new JenkinsHash();
+    }
+
+    /**
+     * Returns a new list that's sorted in the reverse order of the weight.
+     */
+    private List<Node> sortNodes(List<Node> nodes) {
+      List<Node> ret = new ArrayList<Node>(nodes);
+      sortNodesInPlace(ret);
+      return ret;
+    }
+
+    /**
+     * Sorts the list in place in the reverse order of the weight.
+     */
+    private void sortNodesInPlace(List<Node> nodes) {
+      Collections.sort(nodes, new Comparator<Node>() {
+        public int compare(Node n1, Node n2) {
+          if (n2.getWeight() == n1.getWeight()) {
+            return 0;
+          }
+          return (n2.getWeight() - n1.getWeight() > 0) ? 1 : -1;
+          // sort by weight only in the reverse order
+        }
+      });
+    }
+
+    public Node select(long input, long round) {
+      Node selected = null;
+      long hiScore = -1;
+      for (Map.Entry<Node,Long> e: straws.entrySet()) {
+        Node child = e.getKey();
+        long straw = e.getValue();
+        long score = weightedScore(child, straw, input, round);
+        if (score > hiScore) {
+          selected = child;
+          hiScore = score;
+        }
+      }
+      if (selected == null) {
+        throw new IllegalStateException();
+      }
+      return selected;
+    }
+
+    private long weightedScore(Node child, long straw, long input, long round) 
{
+      long hash = hashFunction.hash(input, child.getId(), round);
+      hash = hash&0xffff;
+      long weightedScore = hash*straw;
+      return weightedScore;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
new file mode 100644
index 0000000..66566f8
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/JenkinsHash.java
@@ -0,0 +1,140 @@
+/**
+ * Copyright 2013 Twitter, Inc.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.helix.controller.rebalancer.strategy.crushMapping;
+
+public class JenkinsHash {
+  // max value to limit it to 4 bytes
+  private static final long MAX_VALUE = 0xFFFFFFFFL;
+  private static final long CRUSH_HASH_SEED = 1315423911L;
+
+  /**
+   * Convert a byte into a long value without making it negative.
+   */
+  private static long byteToLong(byte b) {
+    long val = b & 0x7F;
+    if ((b & 0x80) != 0) {
+      val += 128;
+    }
+    return val;
+  }
+
+  /**
+   * Do addition and turn into 4 bytes.
+   */
+  private static long add(long val, long add) {
+    return (val + add) & MAX_VALUE;
+  }
+
+  /**
+   * Do subtraction and turn into 4 bytes.
+   */
+  private static long subtract(long val, long subtract) {
+    return (val - subtract) & MAX_VALUE;
+  }
+
+  /**
+   * Left shift val by shift bits and turn in 4 bytes.
+   */
+  private static long xor(long val, long xor) {
+    return (val ^ xor) & MAX_VALUE;
+  }
+
+  /**
+   * Left shift val by shift bits.  Cut down to 4 bytes.
+   */
+  private static long leftShift(long val, int shift) {
+    return (val << shift) & MAX_VALUE;
+  }
+
+  /**
+   * Convert 4 bytes from the buffer at offset into a long value.
+   */
+  private static long fourByteToLong(byte[] bytes, int offset) {
+    return (byteToLong(bytes[offset + 0])
+        + (byteToLong(bytes[offset + 1]) << 8)
+        + (byteToLong(bytes[offset + 2]) << 16)
+        + (byteToLong(bytes[offset + 3]) << 24));
+  }
+
+  /**
+   * Mix up the values in the hash function.
+   */
+  private static Triple hashMix(Triple t) {
+    long a = t.a; long b = t.b; long c = t.c;
+    a = subtract(a, b); a = subtract(a, c); a = xor(a, c >> 13);
+    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 8));
+    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 13));
+    a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 12));
+    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 16));
+    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 5));
+    a = subtract(a, b); a = subtract(a, c); a = xor(a, (c >> 3));
+    b = subtract(b, c); b = subtract(b, a); b = xor(b, leftShift(a, 10));
+    c = subtract(c, a); c = subtract(c, b); c = xor(c, (b >> 15));
+    return new Triple(a, b, c);
+  }
+
+  private static class Triple {
+    long a;
+    long b;
+    long c;
+
+    public Triple(long a, long b, long c) {
+      this.a = a; this.b = b; this.c = c;
+    }
+  }
+
+  public long hash(long a) {
+    long hash = xor(CRUSH_HASH_SEED, a);
+    long b = a;
+    long x = 231232L;
+    long y = 1232L;
+    Triple val = hashMix(new Triple(b, x, hash));
+    b = val.a; x = val.b; hash = val.c;
+    val = hashMix(new Triple(y, a, hash));
+    hash = val.c;
+    return hash;
+  }
+
+  public long hash(long a, long b) {
+    long hash = xor(xor(CRUSH_HASH_SEED, a), b);
+    long x = 231232L;
+    long y = 1232L;
+    Triple val = hashMix(new Triple(a, b, hash));
+    a = val.a; b = val.b; hash = val.c;
+    val = hashMix(new Triple(x, a, hash));
+    x = val.a; a = val.b; hash = val.c;
+    val = hashMix(new Triple(b, y, hash));
+    hash = val.c;
+    return hash;
+  }
+
+  public long hash(long a, long b, long c) {
+    long hash = xor(xor(xor(CRUSH_HASH_SEED, a), b), c);
+    long x = 231232L;
+    long y = 1232L;
+    Triple val = hashMix(new Triple(a, b, hash));
+    a = val.a; b = val.b; hash = val.c;
+    val = hashMix(new Triple(c, x, hash));
+    c = val.a; x = val.b; hash = val.c;
+    val = hashMix(new Triple(y, a, hash));
+    y = val.a; a = val.b; hash = val.c;
+    val = hashMix(new Triple(b, x, hash));
+    b = val.a; x = val.b; hash = val.c;
+    val = hashMix(new Triple(y, c, hash));
+    hash = val.c;
+    return hash;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java
new file mode 100644
index 0000000..3a52a21
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Node.java
@@ -0,0 +1,208 @@
+package org.apache.helix.controller.rebalancer.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+public class Node implements Comparable<Node> {
+  private String _name;
+  private String _type;
+  private long _id;
+  private long _weight;
+
+  private LinkedHashMap<String, Node> _children = new LinkedHashMap<String, 
Node>();
+  private Node _parent;
+
+  private boolean _failed;
+
+  public Node() {
+
+  }
+
+  public Node(Node node) {
+    _name = node.getName();
+    _type = node.getType();
+    _id = node.getId();
+    _weight = node.getWeight();
+    _failed = node.isFailed();
+  }
+
+  public String getName() {
+    return _name;
+  }
+
+  public void setName(String name) {
+    _name = name;
+  }
+
+  public String getType() {
+    return _type;
+  }
+
+  public void setType(String type) {
+    _type = type;
+  }
+
+  public long getId() {
+    return _id;
+  }
+
+  public void setId(long id) {
+    _id = id;
+  }
+
+  public long getWeight() {
+    return _weight;
+  }
+
+  public void setWeight(long weight) {
+    _weight = weight;
+  }
+
+  public void addWeight(long weight) { _weight += weight; }
+
+  public boolean isFailed() {
+    return _failed;
+  }
+
+  public void setFailed(boolean failed) {
+    if (!isLeaf()) {
+      throw new UnsupportedOperationException("you cannot set failed on a 
non-leaf!");
+    }
+    _failed = failed;
+  }
+
+  public List<Node> getChildren() {
+    return new ArrayList<Node>(_children.values());
+  }
+
+  /**
+   * Add a child, if there exists a child with the same name, will replace it.
+   *
+   * @param child
+   */
+  public void addChild(Node child) {
+    _children.put(child.getName(), child);
+  }
+
+  /**
+   * Has child with given name.
+   * @param name
+   * @return
+   */
+  public boolean hasChild(String name) {
+    return _children.containsKey(name);
+  }
+
+  /**
+   * Get child node with given name.
+   *
+   * @param name
+   * @return
+   */
+  public Node getChild(String name) {
+    return _children.get(name);
+  }
+
+  public boolean isLeaf() {
+    return _children == null || _children.isEmpty();
+  }
+
+  public Node getParent() {
+    return _parent;
+  }
+
+  public void setParent(Node parent) {
+    _parent = parent;
+  }
+
+  /**
+   * Returns all child nodes that match the type. Returns itself if this node 
matches it. If no
+   * child matches the type, an empty list is returned.
+   */
+  protected List<Node> findChildren(String type) {
+    List<Node> nodes = new ArrayList<Node>();
+    if (_type.equalsIgnoreCase(type)) {
+      nodes.add(this);
+    } else if (!isLeaf()) {
+      for (Node child: _children.values()) {
+        nodes.addAll(child.findChildren(type));
+      }
+    }
+    return nodes;
+  }
+
+  /**
+   * Returns the number of all child nodes that match the type. Returns 1 if 
this node matches it.
+   * Returns 0 if no child matches the type.
+   */
+  public int getChildrenCount(String type) {
+    int count = 0;
+    if (_type.equalsIgnoreCase(type)) {
+      count++;
+    } else if (!isLeaf()) {
+      for (Node child: _children.values()) {
+        count += child.getChildrenCount(type);
+      }
+    }
+    return count;
+  }
+
+  /**
+   * Returns the top-most ("root") node from this node. If this node itself 
does not have a parent,
+   * returns itself.
+   */
+  public Node getRoot() {
+    Node node = this;
+    while (node.getParent() != null) {
+      node = node.getParent();
+    }
+    return node;
+  }
+
+  @Override
+  public String toString() {
+    return _name + ":" + _id;
+  }
+
+  @Override
+  public int hashCode() {
+    return _name.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == this) {
+      return true;
+    }
+    if (!(obj instanceof Node)) {
+      return false;
+    }
+    Node that = (Node)obj;
+    return _name.equals(that.getName());
+  }
+
+  @Override
+  public int compareTo(Node o) {
+    return _name.compareTo(o.getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
new file mode 100644
index 0000000..1057fad
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -0,0 +1,295 @@
+package org.apache.helix.controller.rebalancer.topology;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Topology represents the structure of a cluster (the hierarchy of the nodes, 
its fault boundary, etc).
+ * This class is intended for topology-aware partition placement.
+ */
+public class Topology {
+  private static Logger logger = Logger.getLogger(Topology.class);
+  public enum Types {
+    ROOT,
+    ZONE,
+    INSTANCE
+  }
+  private static final int DEFAULT_NODE_WEIGHT = 1000;
+
+  private final MessageDigest _md;
+  private Node _root; // root of the tree structure of all nodes;
+  private List<String> _allInstances;
+  private List<String> _liveInstances;
+  private Map<String, InstanceConfig> _instanceConfigMap;
+  private HelixProperty _clusterConfig;
+  private String _faultZoneType;
+  private String _endNodeType;
+  private boolean _useDefaultTopologyDef;
+  private LinkedHashSet<String> _types;
+
+  /* default names for domain paths, if value is not specified for a domain 
path, the default one is used */
+  // TODO: default values can be defined in clusterConfig.
+  private Map<String, String> _defaultDomainPathValues = new HashMap<String, 
String>();
+
+  public Topology(final List<String> allNodes, final List<String> liveNodes,
+      final Map<String, InstanceConfig> instanceConfigMap, ClusterConfig 
clusterConfig) {
+    try {
+      _md = MessageDigest.getInstance("SHA-1");
+      _allInstances = allNodes;
+      _liveInstances = liveNodes;
+      _instanceConfigMap = instanceConfigMap;
+      _clusterConfig = clusterConfig;
+      _types = new LinkedHashSet<String>();
+
+      String topologyDef = _clusterConfig.getRecord()
+          .getSimpleField(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name());
+      if (topologyDef != null) {
+        // Customized cluster topology definition is configured.
+        String[] types = topologyDef.trim().split("/");
+        for (int i = 0; i < types.length; i++) {
+          if (types[i].length() != 0) {
+            _types.add(types[i]);
+          }
+        }
+        if (_types.size() == 0) {
+          logger.error("Invalid cluster topology definition " + topologyDef);
+          throw new HelixException("Invalid cluster topology definition " + 
topologyDef);
+        } else {
+          String lastType = null;
+          for (String type : _types) {
+            _defaultDomainPathValues.put(type, "Helix_default_" + type);
+            lastType = type;
+          }
+          _endNodeType = lastType;
+          _faultZoneType = _clusterConfig.getRecord()
+              
.getStringField(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(),
+                  _endNodeType);
+          if (!_types.contains(_faultZoneType)) {
+            throw new HelixException(String
+                .format("Invalid fault zone type %s, not present in topology 
definition %s.",
+                    _faultZoneType, topologyDef));
+          }
+          _useDefaultTopologyDef = false;
+        }
+      } else {
+        // Use default cluster topology definition, i,e. /root/zone/instance
+        _types.add(Types.ZONE.name());
+        _types.add(Types.INSTANCE.name());
+        _endNodeType = Types.INSTANCE.name();
+        _faultZoneType = Types.ZONE.name();
+        _useDefaultTopologyDef = true;
+      }
+    } catch (NoSuchAlgorithmException ex) {
+      throw new IllegalArgumentException(ex);
+    }
+    if (_useDefaultTopologyDef) {
+      _root = createClusterTreeWithDefaultTopologyDef();
+    } else {
+      _root = createClusterTreeWithCustomizedTopology();
+    }
+  }
+
+  public String getEndNodeType() {
+    return _endNodeType;
+  }
+
+  public String getFaultZoneType() {
+    return _faultZoneType;
+  }
+
+  public Node getRootNode() {
+    return _root;
+  }
+
+  public List<Node> getFaultZones() {
+    if (_root != null) {
+      return _root.findChildren(getFaultZoneType());
+    }
+    return Collections.emptyList();
+  }
+
+  /**
+   * Creates a tree representing the cluster structure using default cluster 
topology definition
+   * (i,e no topology definition given and no domain id set).
+   */
+  private Node createClusterTreeWithDefaultTopologyDef() {
+    // root
+    Node root = new Node();
+    root.setName("root");
+    root.setId(computeId("root"));
+    root.setType(Types.ROOT.name());
+
+    for (String ins : _allInstances) {
+      InstanceConfig config = _instanceConfigMap.get(ins);
+      if (config == null) {
+        throw new HelixException(String.format("Config for instance %s is not 
found!", ins));
+      }
+      String zone = config.getZoneId();
+      if (zone == null) {
+        //TODO: we should allow non-rack cluster for back-compatible. This 
should be solved once
+        // we have the hierarchy style of domain id for instance.
+        throw new HelixException(String
+            .format("ZONE_ID for instance %s is not set, failed the 
topology-aware placement!",
+                ins));
+      }
+      Map<String, String> pathValueMap = new HashMap<String, String>();
+      pathValueMap.put(Types.ZONE.name(), zone);
+      pathValueMap.put(Types.INSTANCE.name(), ins);
+
+      int weight = config.getWeight();
+      if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
+        weight = DEFAULT_NODE_WEIGHT;
+      }
+      addEndNode(root, ins, pathValueMap, weight, _liveInstances);
+    }
+
+    return root;
+  }
+
+  /**
+   * Creates a tree representing the cluster structure using default cluster 
topology definition
+   * (i,e no topology definition given and no domain id set).
+   */
+  private Node createClusterTreeWithCustomizedTopology() {
+    // root
+    Node root = new Node();
+    root.setName("root");
+    root.setId(computeId("root"));
+    root.setType(Types.ROOT.name());
+
+    for (String ins : _allInstances) {
+      InstanceConfig insConfig = _instanceConfigMap.get(ins);
+      if (insConfig == null) {
+        throw new HelixException(String.format("Config for instance %s is not 
found!", ins));
+      }
+      String domain = insConfig.getDomain();
+      if (domain == null) {
+        throw new HelixException(String
+            .format("Domain for instance %s is not set, failed the 
topology-aware placement!",
+                ins));
+      }
+
+      String[] pathPairs = domain.trim().split(",");
+      Map<String, String> pathValueMap = new HashMap<String, String>();
+      for (String pair : pathPairs) {
+        String[] values = pair.trim().split("=");
+        if (values.length != 2 || values[0].isEmpty() || values[1].isEmpty()) {
+          throw new HelixException(String.format(
+              "Domain-Value pair %s for instance %s is not valid, failed the 
topology-aware placement!",
+              pair, ins));
+        }
+        String type = values[0];
+        String value = values[1];
+
+        if (!_types.contains(type)) {
+          logger.warn(String
+              .format("Path %s defined in domain of instance %s not 
recognized, ignored!", pair,
+                  ins));
+          continue;
+        }
+        pathValueMap.put(type, value);
+      }
+
+      int weight = insConfig.getWeight();
+      if (weight < 0 || weight == InstanceConfig.WEIGHT_NOT_SET) {
+        weight = DEFAULT_NODE_WEIGHT;
+      }
+
+      root = addEndNode(root, ins, pathValueMap, weight, _liveInstances);
+    }
+
+    return root;
+  }
+
+
+  /**
+   * Add an end node to the tree, create all the paths to the leaf node if not 
present.
+   */
+  private Node addEndNode(Node root, String instanceName, Map<String, String> 
pathNameMap,
+      int instanceWeight, List<String> liveInstances) {
+    Node current = root;
+    List<Node> pathNodes = new ArrayList<Node>();
+    for (String path : _types) {
+      String pathValue = pathNameMap.get(path);
+      if (pathValue == null || pathValue.isEmpty()) {
+        pathValue = _defaultDomainPathValues.get(path);
+      }
+      pathNodes.add(current);
+      if (!current.hasChild(pathValue)) {
+        Node n = new Node();
+        n.setName(pathValue);
+        n.setId(computeId(pathValue));
+        n.setType(path);
+        n.setParent(current);
+
+        // if it is leaf node.
+        if (path.equals(_endNodeType)) {
+          if (liveInstances.contains(instanceName)) {
+            // node is alive
+            n.setWeight(instanceWeight);
+            // add instance weight to all of its parent nodes.
+            for (Node node :  pathNodes) {
+              node.addWeight(instanceWeight);
+            }
+          } else {
+            n.setFailed(true);
+            n.setWeight(0);
+          }
+        }
+        current.addChild(n);
+      }
+      current = current.getChild(pathValue);
+    }
+    return root;
+  }
+
+  private long computeId(String name) {
+    byte[] h = _md.digest(name.getBytes());
+    return bstrTo32bit(h);
+  }
+
+  private long bstrTo32bit(byte[] bstr) {
+    if (bstr.length < 4) {
+      throw new IllegalArgumentException("hashed is less than 4 bytes!");
+    }
+    // need to "simulate" unsigned int
+    return (long) (((ord(bstr[0]) << 24) | (ord(bstr[1]) << 16) | 
(ord(bstr[2]) << 8) | (ord(
+        bstr[3])))) & 0xffffffffL;
+  }
+
+  private int ord(byte b) {
+    return b & 0xff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index cb5bda8..dacf98d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -32,6 +32,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.CurrentState;
@@ -56,6 +57,7 @@ public class ClusterDataCache {
 
   private static final String IDEAL_STATE_RULE_PREFIX = "IdealStateRule!";
 
+  private ClusterConfig _clusterConfig;
   Map<String, LiveInstance> _liveInstanceMap;
   Map<String, LiveInstance> _liveInstanceCacheMap;
   Map<String, IdealState> _idealStateMap;
@@ -200,11 +202,11 @@ public class ClusterDataCache {
     _currentStateMap = Collections.unmodifiableMap(allCurStateMap);
 
     _idealStateRuleMap = Maps.newHashMap();
-    HelixProperty clusterConfig = 
accessor.getProperty(keyBuilder.clusterConfig());
-    if (clusterConfig != null) {
-      for (String simpleKey : 
clusterConfig.getRecord().getSimpleFields().keySet()) {
+    _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
+    if (_clusterConfig != null) {
+      for (String simpleKey : 
_clusterConfig.getRecord().getSimpleFields().keySet()) {
         if (simpleKey.startsWith(IDEAL_STATE_RULE_PREFIX)) {
-          String simpleValue = 
clusterConfig.getRecord().getSimpleField(simpleKey);
+          String simpleValue = 
_clusterConfig.getRecord().getSimpleField(simpleKey);
           String[] rules = simpleValue.split("(?<!\\\\),");
           Map<String, String> singleRule = Maps.newHashMap();
           for (String rule : rules) {
@@ -232,6 +234,10 @@ public class ClusterDataCache {
     return true;
   }
 
+  public ClusterConfig getClusterConfig() {
+    return _clusterConfig;
+  }
+
   /**
    * Retrieves the idealstates for all resources
    * @return

Reply via email to