HELIX-151: rb:13209 Allow nodes using floor capacity to steal a slot

Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/91b32bff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/91b32bff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/91b32bff

Branch: refs/heads/master
Commit: 91b32bff94262a459e982b9abfadefa48d8303c9
Parents: 955b99f
Author: kishoreg <[email protected]>
Authored: Fri Aug 2 15:56:49 2013 -0700
Committer: kishoreg <[email protected]>
Committed: Fri Aug 2 15:56:49 2013 -0700

----------------------------------------------------------------------
 helix-core/pom.xml                              |  6 +-
 .../strategy/AutoRebalanceStrategy.java         | 89 ++++++++++++++++++--
 2 files changed, 85 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/91b32bff/helix-core/pom.xml
----------------------------------------------------------------------
diff --git a/helix-core/pom.xml b/helix-core/pom.xml
index bb8ed90..3de42b6 100644
--- a/helix-core/pom.xml
+++ b/helix-core/pom.xml
@@ -77,11 +77,7 @@ under the License.
       <artifactId>jackson-mapper-asl</artifactId>
       <version>1.8.5</version>
     </dependency>
-    <dependency>
-      <groupId>commons-io</groupId>
-      <artifactId>commons-io</artifactId>
-      <version>1.4</version>
-    </dependency>
+
     <dependency>
       <groupId>commons-cli</groupId>
       <artifactId>commons-cli</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/91b32bff/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
index 9d0e2de..f013937 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java
@@ -152,17 +152,21 @@ public class AutoRebalanceStrategy implements Rebalancer {
       for (String id : allNodes) {
         Node node = new Node(id);
         node.capacity = 0;
+        node.hasCeilingCapacity = false;
         _nodeMap.put(id, node);
       }
       for (int i = 0; i < liveNodes.size(); i++) {
+        boolean usingCeiling = false;
         int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, 
_maximumPerNode) : distFloor;
         if (distRemainder > 0 && targetSize < _maximumPerNode) {
           targetSize += 1;
           distRemainder = distRemainder - 1;
+          usingCeiling = true;
         }
         Node node = _nodeMap.get(liveNodes.get(i));
         node.isAlive = true;
         node.capacity = targetSize;
+        node.hasCeilingCapacity = usingCeiling;
         _liveNodesList.add(node);
       }
 
@@ -232,16 +236,24 @@ public class AutoRebalanceStrategy implements Rebalancer {
       Iterator<Replica> it = _orphaned.iterator();
       while (it.hasNext()) {
         Replica replica = it.next();
-        int startIndex = (replica.hashCode() & 0x7FFFFFFF) % 
_liveNodesList.size();
+        boolean added = false;
+        int startIndex = computeRandomStartIndex(replica);
         for (int index = startIndex; index < startIndex + 
_liveNodesList.size(); index++) {
           Node receiver = _liveNodesList.get(index % _liveNodesList.size());
           if (receiver.capacity > receiver.currentlyAssigned && 
receiver.canAdd(replica)) {
             receiver.currentlyAssigned = receiver.currentlyAssigned + 1;
             receiver.nonPreferred.add(replica);
-            it.remove();
+            added = true;
             break;
           }
         }
+        if (!added) {
+          // try adding the replica by making room for it
+          added = assignOrphanByMakingRoom(replica);
+        }
+        if (added) {
+          it.remove();
+        }
       }
       if (_orphaned.size() > 0 && logger.isInfoEnabled()) {
         logger.info("could not assign nodes to partitions: " + _orphaned);
@@ -249,6 +261,38 @@ public class AutoRebalanceStrategy implements Rebalancer {
     }
 
     /**
+     * If an orphan can't be assigned normally, see if a node can borrow 
capacity to accept it
+     * @param replica The replica to assign
+     * @return true if the assignment succeeded, false otherwise
+     */
+    private boolean assignOrphanByMakingRoom(Replica replica) {
+      Node capacityDonor = null;
+      Node capacityAcceptor = null;
+      int startIndex = computeRandomStartIndex(replica);
+      for (int index = startIndex; index < startIndex + _liveNodesList.size(); 
index++) {
+        Node current = _liveNodesList.get(index % _liveNodesList.size());
+        if (current.hasCeilingCapacity && current.capacity > 
current.currentlyAssigned
+            && !current.canAddIfCapacity(replica) && capacityDonor == null) {
+          // this node has space but cannot accept the node
+          capacityDonor = current;
+        } else if (!current.hasCeilingCapacity && current.capacity == 
current.currentlyAssigned
+            && current.canAddIfCapacity(replica) && capacityAcceptor == null) {
+          // this node would be able to accept the replica if it has ceiling 
capacity
+          capacityAcceptor = current;
+        }
+        if (capacityDonor != null && capacityAcceptor != null) {
+          break;
+        }
+      }
+      if (capacityDonor != null && capacityAcceptor != null) {
+        // transfer ceiling capacity and add the node
+        capacityAcceptor.steal(capacityDonor, replica);
+        return true;
+      }
+      return false;
+    }
+
+    /**
      * Move replicas from too-full nodes to nodes that can accept the replicas
      */
     private void moveExcessReplicas() {
@@ -260,7 +304,7 @@ public class AutoRebalanceStrategy implements Rebalancer {
           it = donor.nonPreferred.iterator();
           while (it.hasNext()) {
             Replica replica = it.next();
-            int startIndex = (replica.hashCode() & 0x7FFFFFFF) % 
_liveNodesList.size();
+            int startIndex = computeRandomStartIndex(replica);
             for (int index = startIndex; index < startIndex + 
_liveNodesList.size(); index++) {
               Node receiver = _liveNodesList.get(index % 
_liveNodesList.size());
               if (receiver.canAdd(replica)) {
@@ -365,6 +409,16 @@ public class AutoRebalanceStrategy implements Rebalancer {
     }
 
     /**
+     * Get a live node index to try first for a replica so that each possible 
start index is
+     * roughly uniformly assigned.
+     * @param replica The replica to assign
+     * @return The starting node index to try
+     */
+    private int computeRandomStartIndex(final Replica replica) {
+      return (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size();
+    }
+
+    /**
      * Get a set of replicas not currently assigned to any node
      * @return Unassigned replicas
      */
@@ -478,6 +532,7 @@ public class AutoRebalanceStrategy implements Rebalancer {
 
       public int currentlyAssigned;
       public int capacity;
+      public boolean hasCeilingCapacity;
       private String id;
       boolean isAlive;
       private List<Replica> preferred;
@@ -497,10 +552,20 @@ public class AutoRebalanceStrategy implements Rebalancer {
        * @return true if the assignment can be made, false otherwise
        */
       public boolean canAdd(Replica replica) {
-        if (!isAlive) {
+        if (currentlyAssigned >= capacity) {
           return false;
         }
-        if (currentlyAssigned >= capacity) {
+        return canAddIfCapacity(replica);
+      }
+
+      /**
+       * Check if this replica can be legally added to this node, provided 
that it has enough
+       * capacity.
+       * @param replica The replica to test
+       * @return true if the assignment can be made, false otherwise
+       */
+      public boolean canAddIfCapacity(Replica replica) {
+        if (!isAlive) {
           return false;
         }
         for (Replica r : preferred) {
@@ -516,6 +581,20 @@ public class AutoRebalanceStrategy implements Rebalancer {
         return true;
       }
 
+      /**
+       * Receive a replica by stealing capacity from another Node
+       * @param donor The node that has excess capacity
+       * @param replica The replica to receive
+       */
+      public void steal(Node donor, Replica replica) {
+        donor.hasCeilingCapacity = false;
+        donor.capacity--;
+        hasCeilingCapacity = true;
+        capacity++;
+        currentlyAssigned++;
+        nonPreferred.add(replica);
+      }
+
       @Override
       public String toString() {
         StringBuilder sb = new StringBuilder();

Reply via email to