http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java deleted file mode 100644 index 959609f..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/strategy/AutoRebalanceStrategy.java +++ /dev/null @@ -1,753 +0,0 @@ -package org.apache.helix.controller.strategy; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.apache.helix.HelixManager; -import org.apache.helix.ZNRecord; -import org.apache.log4j.Logger; - -public class AutoRebalanceStrategy implements RebalanceStrategy { - private static Logger logger = Logger.getLogger(AutoRebalanceStrategy.class); - private final ReplicaPlacementScheme _placementScheme; - - private String _resourceName; - private List<String> _partitions; - private LinkedHashMap<String, Integer> _states; - private int _maximumPerNode; - - private Map<String, Node> _nodeMap; - private List<Node> _liveNodesList; - private Map<Integer, String> _stateMap; - - private Map<Replica, Node> _preferredAssignment; - private Map<Replica, Node> _existingPreferredAssignment; - private Map<Replica, Node> _existingNonPreferredAssignment; - private Set<Replica> _orphaned; - - public AutoRebalanceStrategy(String resourceName, final List<String> partitions, - final LinkedHashMap<String, Integer> states, int maximumPerNode) { - init(resourceName, partitions, states, maximumPerNode); - _placementScheme = new DefaultPlacementScheme(); - } - - public AutoRebalanceStrategy(String resourceName, final List<String> partitions, - final LinkedHashMap<String, Integer> states) { - this(resourceName, partitions, states, Integer.MAX_VALUE); - } - - @Override - public void init(String resourceName, final List<String> partitions, - final LinkedHashMap<String, Integer> states, int maximumPerNode) { - _resourceName = resourceName; - _partitions = partitions; - _states = states; - _maximumPerNode = maximumPerNode; - } - - @Override - public ZNRecord computePartitionAssignment(final List<String> liveNodes, - final Map<String, Map<String, String>> currentMapping, final List<String> allNodes) { - int numReplicas = countStateReplicas(); - ZNRecord znRecord = new ZNRecord(_resourceName); - if (liveNodes.size() == 0) { - return znRecord; - } - int distRemainder = (numReplicas * _partitions.size()) % liveNodes.size(); - int distFloor = (numReplicas * _partitions.size()) / liveNodes.size(); - _nodeMap = new HashMap<String, Node>(); - _liveNodesList = new ArrayList<Node>(); - - for (String id : allNodes) { - Node node = new Node(id); - node.capacity = 0; - node.hasCeilingCapacity = false; - _nodeMap.put(id, node); - } - for (int i = 0; i < liveNodes.size(); i++) { - boolean usingCeiling = false; - int targetSize = (_maximumPerNode > 0) ? Math.min(distFloor, _maximumPerNode) : distFloor; - if (distRemainder > 0 && targetSize < _maximumPerNode) { - targetSize += 1; - distRemainder = distRemainder - 1; - usingCeiling = true; - } - Node node = _nodeMap.get(liveNodes.get(i)); - node.isAlive = true; - node.capacity = targetSize; - node.hasCeilingCapacity = usingCeiling; - _liveNodesList.add(node); - } - - // compute states for all replica ids - _stateMap = generateStateMap(); - - // compute the preferred mapping if all nodes were up - _preferredAssignment = computePreferredPlacement(allNodes); - - // logger.info("preferred mapping:"+ preferredAssignment); - // from current mapping derive the ones in preferred location - // this will update the nodes with their current fill status - _existingPreferredAssignment = computeExistingPreferredPlacement(currentMapping); - - // from current mapping derive the ones not in preferred location - _existingNonPreferredAssignment = computeExistingNonPreferredPlacement(currentMapping); - - // compute orphaned replicas that are not assigned to any node - _orphaned = computeOrphaned(); - if (logger.isInfoEnabled()) { - logger.info("orphan = " + _orphaned); - } - - moveNonPreferredReplicasToPreferred(); - - assignOrphans(); - - moveExcessReplicas(); - - prepareResult(znRecord); - return znRecord; - } - - /** - * Move replicas assigned to non-preferred nodes if their current node is at capacity - * and its preferred node is under capacity. - */ - private void moveNonPreferredReplicasToPreferred() { - // iterate through non preferred and see if we can move them to the - // preferred location if the donor has more than it should and stealer has - // enough capacity - Iterator<Entry<Replica, Node>> iterator = _existingNonPreferredAssignment.entrySet().iterator(); - while (iterator.hasNext()) { - Entry<Replica, Node> entry = iterator.next(); - Replica replica = entry.getKey(); - Node donor = entry.getValue(); - Node receiver = _preferredAssignment.get(replica); - if (donor.capacity < donor.currentlyAssigned - && receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) { - donor.currentlyAssigned = donor.currentlyAssigned - 1; - receiver.currentlyAssigned = receiver.currentlyAssigned + 1; - donor.nonPreferred.remove(replica); - receiver.preferred.add(replica); - donor.newReplicas.remove(replica); - receiver.newReplicas.add(replica); - iterator.remove(); - } - } - } - - /** - * Slot in orphaned partitions randomly so as to maintain even load on live nodes. - */ - private void assignOrphans() { - // now iterate over nodes and remaining orphaned partitions and assign - // partitions randomly - // Better to iterate over orphaned partitions first - Iterator<Replica> it = _orphaned.iterator(); - while (it.hasNext()) { - Replica replica = it.next(); - boolean added = false; - int startIndex = computeRandomStartIndex(replica); - for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) { - Node receiver = _liveNodesList.get(index % _liveNodesList.size()); - if (receiver.capacity > receiver.currentlyAssigned && receiver.canAdd(replica)) { - receiver.currentlyAssigned = receiver.currentlyAssigned + 1; - receiver.nonPreferred.add(replica); - receiver.newReplicas.add(replica); - added = true; - break; - } - } - if (!added) { - // try adding the replica by making room for it - added = assignOrphanByMakingRoom(replica); - } - if (added) { - it.remove(); - } - } - if (_orphaned.size() > 0 && logger.isInfoEnabled()) { - logger.info("could not assign nodes to partitions: " + _orphaned); - } - } - - /** - * If an orphan can't be assigned normally, see if a node can borrow capacity to accept it - * @param replica The replica to assign - * @return true if the assignment succeeded, false otherwise - */ - private boolean assignOrphanByMakingRoom(Replica replica) { - Node capacityDonor = null; - Node capacityAcceptor = null; - int startIndex = computeRandomStartIndex(replica); - for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) { - Node current = _liveNodesList.get(index % _liveNodesList.size()); - if (current.hasCeilingCapacity && current.capacity > current.currentlyAssigned - && !current.canAddIfCapacity(replica) && capacityDonor == null) { - // this node has space but cannot accept the node - capacityDonor = current; - } else if (!current.hasCeilingCapacity && current.capacity == current.currentlyAssigned - && current.canAddIfCapacity(replica) && capacityAcceptor == null) { - // this node would be able to accept the replica if it has ceiling capacity - capacityAcceptor = current; - } - if (capacityDonor != null && capacityAcceptor != null) { - break; - } - } - if (capacityDonor != null && capacityAcceptor != null) { - // transfer ceiling capacity and add the node - capacityAcceptor.steal(capacityDonor, replica); - return true; - } - return false; - } - - /** - * Move replicas from too-full nodes to nodes that can accept the replicas - */ - private void moveExcessReplicas() { - // iterate over nodes and move extra load - Iterator<Replica> it; - for (Node donor : _liveNodesList) { - if (donor.capacity < donor.currentlyAssigned) { - Collections.sort(donor.nonPreferred); - it = donor.nonPreferred.iterator(); - while (it.hasNext()) { - Replica replica = it.next(); - int startIndex = computeRandomStartIndex(replica); - for (int index = startIndex; index < startIndex + _liveNodesList.size(); index++) { - Node receiver = _liveNodesList.get(index % _liveNodesList.size()); - if (receiver.canAdd(replica)) { - receiver.currentlyAssigned = receiver.currentlyAssigned + 1; - receiver.nonPreferred.add(replica); - donor.currentlyAssigned = donor.currentlyAssigned - 1; - it.remove(); - break; - } - } - if (donor.capacity >= donor.currentlyAssigned) { - break; - } - } - if (donor.capacity < donor.currentlyAssigned) { - logger.warn("Could not take partitions out of node:" + donor.id); - } - } - } - } - - /** - * Update a ZNRecord with the results of the rebalancing. - * @param znRecord - */ - private void prepareResult(ZNRecord znRecord) { - // The map fields are keyed on partition name to a pair of node and state, i.e. it - // indicates that the partition with given state is served by that node - // - // The list fields are also keyed on partition and list all the nodes serving that partition. - // This is useful to verify that there is no node serving multiple replicas of the same - // partition. - Map<String, List<String>> newPreferences = new TreeMap<String, List<String>>(); - for (String partition : _partitions) { - znRecord.setMapField(partition, new TreeMap<String, String>()); - znRecord.setListField(partition, new ArrayList<String>()); - newPreferences.put(partition, new ArrayList<String>()); - } - - // for preference lists, the rough priority that we want is: - // [existing preferred, existing non-preferred, non-existing preferred, non-existing - // non-preferred] - for (Node node : _liveNodesList) { - for (Replica replica : node.preferred) { - if (node.newReplicas.contains(replica)) { - newPreferences.get(replica.partition).add(node.id); - } else { - znRecord.getListField(replica.partition).add(node.id); - } - } - } - for (Node node : _liveNodesList) { - for (Replica replica : node.nonPreferred) { - if (node.newReplicas.contains(replica)) { - newPreferences.get(replica.partition).add(node.id); - } else { - znRecord.getListField(replica.partition).add(node.id); - } - } - } - normalizePreferenceLists(znRecord.getListFields(), newPreferences); - - // generate preference maps based on the preference lists - for (String partition : _partitions) { - List<String> preferenceList = znRecord.getListField(partition); - int i = 0; - for (String participant : preferenceList) { - znRecord.getMapField(partition).put(participant, _stateMap.get(i)); - i++; - } - } - } - - /** - * Adjust preference lists to reduce the number of same replicas on an instance. This will - * separately normalize two sets of preference lists, and then append the results of the second - * set to those of the first. This basically ensures that existing replicas are automatically - * preferred. - * @param preferenceLists map of (partition --> list of nodes) - * @param newPreferences map containing node preferences not consistent with the current - * assignment - */ - private void normalizePreferenceLists(Map<String, List<String>> preferenceLists, - Map<String, List<String>> newPreferences) { - - Map<String, Map<String, Integer>> nodeReplicaCounts = - new HashMap<String, Map<String, Integer>>(); - for (String partition : preferenceLists.keySet()) { - normalizePreferenceList(preferenceLists.get(partition), nodeReplicaCounts); - } - for (String partition : newPreferences.keySet()) { - normalizePreferenceList(newPreferences.get(partition), nodeReplicaCounts); - preferenceLists.get(partition).addAll(newPreferences.get(partition)); - } - } - - /** - * Adjust a single preference list for replica assignment imbalance - * @param preferenceList list of node names - * @param nodeReplicaCounts map of (node --> state --> count) - */ - private void normalizePreferenceList(List<String> preferenceList, - Map<String, Map<String, Integer>> nodeReplicaCounts) { - List<String> newPreferenceList = new ArrayList<String>(); - int replicas = Math.min(countStateReplicas(), preferenceList.size()); - - // make this a LinkedHashSet to preserve iteration order - Set<String> notAssigned = new LinkedHashSet<String>(preferenceList); - for (int i = 0; i < replicas; i++) { - String state = _stateMap.get(i); - String node = getMinimumNodeForReplica(state, notAssigned, nodeReplicaCounts); - newPreferenceList.add(node); - notAssigned.remove(node); - Map<String, Integer> counts = nodeReplicaCounts.get(node); - counts.put(state, counts.get(state) + 1); - } - preferenceList.clear(); - preferenceList.addAll(newPreferenceList); - } - - /** - * Get the node which hosts the fewest of a given replica - * @param state the state - * @param nodes nodes to check - * @param nodeReplicaCounts current assignment of replicas - * @return the node most willing to accept the replica - */ - private String getMinimumNodeForReplica(String state, Set<String> nodes, - Map<String, Map<String, Integer>> nodeReplicaCounts) { - String minimalNode = null; - int minimalCount = Integer.MAX_VALUE; - for (String node : nodes) { - int count = getReplicaCountForNode(state, node, nodeReplicaCounts); - if (count < minimalCount) { - minimalCount = count; - minimalNode = node; - } - } - return minimalNode; - } - - /** - * Safe check for the number of replicas of a given id assiged to a node - * @param state the state to assign - * @param node the node to check - * @param nodeReplicaCounts a map of node to replica id and counts - * @return the number of currently assigned replicas of the given id - */ - private int getReplicaCountForNode(String state, String node, - Map<String, Map<String, Integer>> nodeReplicaCounts) { - if (!nodeReplicaCounts.containsKey(node)) { - Map<String, Integer> replicaCounts = new HashMap<String, Integer>(); - replicaCounts.put(state, 0); - nodeReplicaCounts.put(node, replicaCounts); - return 0; - } - Map<String, Integer> replicaCounts = nodeReplicaCounts.get(node); - if (!replicaCounts.containsKey(state)) { - replicaCounts.put(state, 0); - return 0; - } - return replicaCounts.get(state); - } - - /** - * Compute the subset of the current mapping where replicas are not mapped according to their - * preferred assignment. - * @param currentMapping Current mapping of replicas to nodes - * @return The current assignments that do not conform to the preferred assignment - */ - private Map<Replica, Node> computeExistingNonPreferredPlacement( - Map<String, Map<String, String>> currentMapping) { - Map<Replica, Node> existingNonPreferredAssignment = new TreeMap<Replica, Node>(); - int count = countStateReplicas(); - for (String partition : currentMapping.keySet()) { - Map<String, String> nodeStateMap = currentMapping.get(partition); - nodeStateMap.keySet().retainAll(_nodeMap.keySet()); - for (String nodeId : nodeStateMap.keySet()) { - Node node = _nodeMap.get(nodeId); - boolean skip = false; - for (Replica replica : node.preferred) { - if (replica.partition.equals(partition)) { - skip = true; - break; - } - } - if (skip) { - continue; - } - // check if its in one of the preferred position - for (int replicaId = 0; replicaId < count; replicaId++) { - Replica replica = new Replica(partition, replicaId); - if (!_preferredAssignment.containsKey(replica)) { - - logger.info("partitions: " + _partitions); - logger.info("currentMapping.keySet: " + currentMapping.keySet()); - throw new IllegalArgumentException("partition: " + replica + " is in currentMapping but not in partitions"); - } - - if (_preferredAssignment.get(replica).id != node.id - && !_existingPreferredAssignment.containsKey(replica) - && !existingNonPreferredAssignment.containsKey(replica)) { - existingNonPreferredAssignment.put(replica, node); - node.nonPreferred.add(replica); - - break; - } - } - } - } - return existingNonPreferredAssignment; - } - - /** - * Get a live node index to try first for a replica so that each possible start index is - * roughly uniformly assigned. - * @param replica The replica to assign - * @return The starting node index to try - */ - private int computeRandomStartIndex(final Replica replica) { - return (replica.hashCode() & 0x7FFFFFFF) % _liveNodesList.size(); - } - - /** - * Get a set of replicas not currently assigned to any node - * @return Unassigned replicas - */ - private Set<Replica> computeOrphaned() { - Set<Replica> orphanedPartitions = new TreeSet<Replica>(_preferredAssignment.keySet()); - for (Replica r : _existingPreferredAssignment.keySet()) { - if (orphanedPartitions.contains(r)) { - orphanedPartitions.remove(r); - } - } - for (Replica r : _existingNonPreferredAssignment.keySet()) { - if (orphanedPartitions.contains(r)) { - orphanedPartitions.remove(r); - } - } - - return orphanedPartitions; - } - - /** - * Determine the replicas already assigned to their preferred nodes - * @param currentMapping Current assignment of replicas to nodes - * @return Assignments that conform to the preferred placement - */ - private Map<Replica, Node> computeExistingPreferredPlacement( - final Map<String, Map<String, String>> currentMapping) { - Map<Replica, Node> existingPreferredAssignment = new TreeMap<Replica, Node>(); - int count = countStateReplicas(); - for (String partition : currentMapping.keySet()) { - Map<String, String> nodeStateMap = currentMapping.get(partition); - nodeStateMap.keySet().retainAll(_nodeMap.keySet()); - for (String nodeId : nodeStateMap.keySet()) { - Node node = _nodeMap.get(nodeId); - node.currentlyAssigned = node.currentlyAssigned + 1; - // check if its in one of the preferred position - for (int replicaId = 0; replicaId < count; replicaId++) { - Replica replica = new Replica(partition, replicaId); - if (_preferredAssignment.containsKey(replica) - && !existingPreferredAssignment.containsKey(replica) - && _preferredAssignment.get(replica).id == node.id) { - existingPreferredAssignment.put(replica, node); - node.preferred.add(replica); - break; - } - } - } - } - - return existingPreferredAssignment; - } - - /** - * Given a predefined set of all possible nodes, compute an assignment of replicas to - * nodes that evenly assigns all replicas to nodes. - * @param allNodes Identifiers to all nodes, live and non-live - * @return Preferred assignment of replicas - */ - private Map<Replica, Node> computePreferredPlacement(final List<String> allNodes) { - Map<Replica, Node> preferredMapping; - preferredMapping = new HashMap<Replica, Node>(); - int partitionId = 0; - int numReplicas = countStateReplicas(); - int count = countStateReplicas(); - for (String partition : _partitions) { - for (int replicaId = 0; replicaId < count; replicaId++) { - Replica replica = new Replica(partition, replicaId); - String nodeName = - _placementScheme.getLocation(partitionId, replicaId, _partitions.size(), numReplicas, - allNodes); - preferredMapping.put(replica, _nodeMap.get(nodeName)); - } - partitionId = partitionId + 1; - } - return preferredMapping; - } - - /** - * Counts the total number of replicas given a state-count mapping - * @return - */ - private int countStateReplicas() { - int total = 0; - for (Integer count : _states.values()) { - total += count; - } - return total; - } - - /** - * Compute a map of replica ids to state names - * @return Map: replica id -> state name - */ - private Map<Integer, String> generateStateMap() { - int replicaId = 0; - Map<Integer, String> stateMap = new HashMap<Integer, String>(); - for (String state : _states.keySet()) { - Integer count = _states.get(state); - for (int i = 0; i < count; i++) { - stateMap.put(replicaId, state); - replicaId++; - } - } - return stateMap; - } - - /** - * A Node is an entity that can serve replicas. It has a capacity and knowledge - * of replicas assigned to it, so it can decide if it can receive additional replicas. - */ - class Node { - public int currentlyAssigned; - public int capacity; - public boolean hasCeilingCapacity; - private final String id; - boolean isAlive; - private final List<Replica> preferred; - private final List<Replica> nonPreferred; - private final Set<Replica> newReplicas; - - public Node(String id) { - preferred = new ArrayList<Replica>(); - nonPreferred = new ArrayList<Replica>(); - newReplicas = new TreeSet<Replica>(); - currentlyAssigned = 0; - isAlive = false; - this.id = id; - } - - /** - * Check if this replica can be legally added to this node - * @param replica The replica to test - * @return true if the assignment can be made, false otherwise - */ - public boolean canAdd(Replica replica) { - if (currentlyAssigned >= capacity) { - return false; - } - return canAddIfCapacity(replica); - } - - /** - * Check if this replica can be legally added to this node, provided that it has enough - * capacity. - * @param replica The replica to test - * @return true if the assignment can be made, false otherwise - */ - public boolean canAddIfCapacity(Replica replica) { - if (!isAlive) { - return false; - } - for (Replica r : preferred) { - if (r.partition.equals(replica.partition)) { - return false; - } - } - for (Replica r : nonPreferred) { - if (r.partition.equals(replica.partition)) { - return false; - } - } - return true; - } - - /** - * Receive a replica by stealing capacity from another Node - * @param donor The node that has excess capacity - * @param replica The replica to receive - */ - public void steal(Node donor, Replica replica) { - donor.hasCeilingCapacity = false; - donor.capacity--; - hasCeilingCapacity = true; - capacity++; - currentlyAssigned++; - nonPreferred.add(replica); - newReplicas.add(replica); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("##########\nname=").append(id).append("\npreferred:").append(preferred.size()) - .append("\nnonpreferred:").append(nonPreferred.size()); - return sb.toString(); - } - } - - /** - * A Replica is a combination of a partition of the resource, the state the replica is in - * and an identifier signifying a specific replica of a given partition and state. - */ - class Replica implements Comparable<Replica> { - private String partition; - private int replicaId; // this is a partition-relative id - private String format; - - public Replica(String partition, int replicaId) { - this.partition = partition; - this.replicaId = replicaId; - this.format = this.partition + "|" + this.replicaId; - } - - @Override - public String toString() { - return format; - } - - @Override - public boolean equals(Object that) { - if (that instanceof Replica) { - return this.format.equals(((Replica) that).format); - } - return false; - } - - @Override - public int hashCode() { - return this.format.hashCode(); - } - - @Override - public int compareTo(Replica that) { - if (that instanceof Replica) { - return this.format.compareTo(that.format); - } - return -1; - } - } - - /** - * Interface for providing a custom approach to computing a replica's affinity to a node. - */ - public interface ReplicaPlacementScheme { - /** - * Initialize global state - * @param manager The instance to which this placement is associated - */ - public void init(final HelixManager manager); - - /** - * Given properties of this replica, determine the node it would prefer to be served by - * @param partitionId The current partition - * @param replicaId The current replica with respect to the current partition - * @param numPartitions The total number of partitions - * @param numReplicas The total number of replicas per partition - * @param nodeNames A list of identifiers of all nodes, live and non-live - * @return The name of the node that would prefer to serve this replica - */ - public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas, - final List<String> nodeNames); - } - - /** - * Compute preferred placements based on a default strategy that assigns replicas to nodes as - * evenly as possible while avoiding placing two replicas of the same partition on any node. - */ - public static class DefaultPlacementScheme implements ReplicaPlacementScheme { - @Override - public void init(final HelixManager manager) { - // do nothing since this is independent of the manager - } - - @Override - public String getLocation(int partitionId, int replicaId, int numPartitions, int numReplicas, - final List<String> nodeNames) { - int index; - if (nodeNames.size() > numPartitions) { - // assign replicas in partition order in case there are more nodes than partitions - index = (partitionId + replicaId * numPartitions) % nodeNames.size(); - } else if (nodeNames.size() == numPartitions) { - // need a replica offset in case the sizes of these sets are the same - index = - ((partitionId + replicaId * numPartitions) % nodeNames.size() + replicaId) - % nodeNames.size(); - } else { - // in all other cases, assigning a replica at a time for each partition is reasonable - index = (partitionId + replicaId) % nodeNames.size(); - } - return nodeNames.get(index); - } - } -}
http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java deleted file mode 100644 index 4daae82..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/strategy/RebalanceStrategy.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.apache.helix.controller.strategy; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.ZNRecord; - -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -/** - * Assignment strategy interface that computes the assignment of partition->instance. - */ -public interface RebalanceStrategy { - /** - * Perform the necessary initialization for the rebalance strategy object. - * @param resourceName - * @param partitions - * @param states - * @param maximumPerNode - */ - void init(String resourceName, final List<String> partitions, - final LinkedHashMap<String, Integer> states, int maximumPerNode); - - /** - * Compute the preference lists and (optional partition-state mapping) for the given resource. - * - * @param liveNodes - * @param currentMapping - * @param allNodes - * @return - */ - ZNRecord computePartitionAssignment(final List<String> liveNodes, - final Map<String, Map<String, String>> currentMapping, final List<String> allNodes); -} http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index e97ac9b..73f2cbb 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -52,6 +52,7 @@ import org.apache.helix.PropertyKey.Builder; import org.apache.helix.PropertyPathConfig; import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.ClusterConstraints.ConstraintType; import org.apache.helix.model.ConstraintItem; @@ -607,6 +608,13 @@ public class ZKHelixAdmin implements HelixAdmin { } @Override + public void addResource(String clusterName, String resourceName, int partitions, + String stateModelRef, String rebalancerMode, String rebalanceStrategy) { + addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, + rebalanceStrategy, 0, -1); + } + + @Override public void addResource(String clusterName, String resourceName, IdealState idealstate) { String stateModelRef = idealstate.getStateModelDefRef(); String stateModelDefPath = @@ -629,14 +637,21 @@ public class ZKHelixAdmin implements HelixAdmin { @Override public void addResource(String clusterName, String resourceName, int partitions, String stateModelRef, String rebalancerMode, int bucketSize) { - addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, bucketSize, - -1); + addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, bucketSize, -1); } @Override public void addResource(String clusterName, String resourceName, int partitions, String stateModelRef, String rebalancerMode, int bucketSize, int maxPartitionsPerInstance) { + addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, + RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY, bucketSize, maxPartitionsPerInstance); + } + + @Override + public void addResource(String clusterName, String resourceName, int partitions, + String stateModelRef, String rebalancerMode, String rebalanceStrategy, int bucketSize, + int maxPartitionsPerInstance) { if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { throw new HelixException("cluster " + clusterName + " is not setup yet"); } @@ -647,6 +662,7 @@ public class ZKHelixAdmin implements HelixAdmin { RebalanceMode mode = idealState.rebalanceModeFromString(rebalancerMode, RebalanceMode.SEMI_AUTO); idealState.setRebalanceMode(mode); + idealState.setRebalanceStrategy(rebalanceStrategy); idealState.setReplicas("" + 0); idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY); if (maxPartitionsPerInstance > 0 && maxPartitionsPerInstance < Integer.MAX_VALUE) { @@ -1014,8 +1030,7 @@ public class ZKHelixAdmin implements HelixAdmin { @Override public ZNRecord update(ZNRecord currentData) { ClusterConstraints constraints = - currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints( - currentData); + currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(currentData); constraints.addConstraintItem(constraintId, constraintItem); return constraints.getRecord(); @@ -1153,6 +1168,26 @@ public class ZKHelixAdmin implements HelixAdmin { } @Override + public void setInstanceZoneId(String clusterName, String instanceName, String zoneId) { + if (!ZKUtil.isClusterSetup(clusterName, _zkClient)) { + throw new HelixException("cluster " + clusterName + " is not setup yet"); + } + + if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) { + throw new HelixException("cluster " + clusterName + " instance " + instanceName + + " is not setup yet"); + } + HelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); + Builder keyBuilder = accessor.keyBuilder(); + + PropertyKey configKey = keyBuilder.instanceConfig(instanceName); + InstanceConfig config = accessor.getProperty(configKey); + config.setZoneId(zoneId); + accessor.setProperty(configKey, config); + } + + @Override public void close() { if (_zkClient != null) { _zkClient.close(); http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java new file mode 100644 index 0000000..25a16d1 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -0,0 +1,92 @@ +package org.apache.helix.model; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixProperty; +import org.apache.helix.ZNRecord; + +/** + * Cluster configurations + */ +public class ClusterConfig extends HelixProperty { + /** + * Configurable characteristics of a cluster + */ + public enum ClusterConfigProperty { + HELIX_DISABLE_PIPELINE_TRIGGERS, + TOPOLOGY, // cluster topology definition, for example, "/zone/rack/host/instance" + FAULT_ZONE_TYPE // the type in which isolation should be applied on when Helix places the replicas from same partition. + } + + /** + * Instantiate for a specific cluster + * + * @param cluster the cluster identifier + */ + public ClusterConfig(String cluster) { + super(cluster); + } + + /** + * Instantiate with a pre-populated record + * + * @param record a ZNRecord corresponding to a cluster configuration + */ + public ClusterConfig(ZNRecord record) { + super(record); + } + + /** + * Whether to persist best possible assignment in a resource's idealstate. + * + * @return + */ + public Boolean isPipelineTriggersDisabled() { + return _record + .getBooleanField(ClusterConfigProperty.HELIX_DISABLE_PIPELINE_TRIGGERS.toString(), false); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof ClusterConfig) { + ClusterConfig that = (ClusterConfig) obj; + + if (this.getId().equals(that.getId())) { + return true; + } + } + return false; + } + + @Override + public int hashCode() { + return getId().hashCode(); + } + + /** + * Get the name of this resource + * + * @return the instance name + */ + public String getClusterName() { + return _record.getId(); + } +} + http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/IdealState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java index 7c4cf54..55d4734 100644 --- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java +++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java @@ -167,7 +167,7 @@ public class IdealState extends HelixProperty { /** * Specify the strategy for Helix to use to compute the partition-instance assignment, - * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.strategy.RebalanceStrategy} + * i,e, the custom rebalance strategy that implements {@link org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy} * * @param rebalanceStrategy * @return http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index eb1c652..ecf2900 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -39,10 +39,14 @@ public class InstanceConfig extends HelixProperty { public enum InstanceConfigProperty { HELIX_HOST, HELIX_PORT, + HELIX_ZONE_ID, HELIX_ENABLED, HELIX_DISABLED_PARTITION, - TAG_LIST + TAG_LIST, + INSTANCE_WEIGHT, + DOMAIN } + public static final int WEIGHT_NOT_SET = -1; private static final Logger _logger = Logger.getLogger(InstanceConfig.class.getName()); @@ -94,6 +98,50 @@ public class InstanceConfig extends HelixProperty { _record.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(), port); } + public String getZoneId() { + return _record.getSimpleField(InstanceConfigProperty.HELIX_ZONE_ID.name()); + } + + public void setZoneId(String zoneId) { + _record.setSimpleField(InstanceConfigProperty.HELIX_ZONE_ID.name(), zoneId); + } + + /** + * Domain represents a hierarchy identifier for an instance. + * @return + */ + public String getDomain() { + return _record.getSimpleField(InstanceConfigProperty.DOMAIN.name()); + } + + /** + * Domain represents a hierarchy identifier for an instance. + * Example: "cluster=myCluster,zone=myZone1,rack=myRack,host=hostname,instance=instance001". + * @return + */ + public void setDomain(String domain) { + _record.setSimpleField(InstanceConfigProperty.DOMAIN.name(), domain); + } + + public int getWeight() { + String w = _record.getSimpleField(InstanceConfigProperty.INSTANCE_WEIGHT.name()); + if (w != null) { + try { + int weight = Integer.valueOf(w); + return weight; + } catch (NumberFormatException e) { + } + } + return WEIGHT_NOT_SET; + } + + public void setWeight(int weight) { + if (weight <= 0) { + throw new IllegalArgumentException("Instance weight can not be equal or less than 0!"); + } + _record.setSimpleField(InstanceConfigProperty.INSTANCE_WEIGHT.name(), String.valueOf(weight)); + } + /** * Get arbitrary tags associated with the instance * @return a list of tags http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java index 623357f..ac96768 100644 --- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java +++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java @@ -32,8 +32,8 @@ import java.util.TreeSet; import org.apache.helix.ZNRecord; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.controller.strategy.AutoRebalanceStrategy; -import org.apache.helix.controller.strategy.RebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.Partition; @@ -126,7 +126,8 @@ public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator { List<String> allNodes = Lists.newArrayList(getEligibleInstances(jobCfg, currStateOutput, instances, cache)); Collections.sort(allNodes); - ZNRecord record = strategy.computePartitionAssignment(allNodes, currentMapping, allNodes); + ZNRecord record = + strategy.computePartitionAssignment(allNodes, allNodes, currentMapping, cache); Map<String, List<String>> preferenceLists = record.getListFields(); // Convert to an assignment keyed on participant http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java index 9d411bb..08ccbdc 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java @@ -362,6 +362,12 @@ public class ClusterSetup { } public void addResourceToCluster(String clusterName, String resourceName, int numResources, + String stateModelRef, String rebalancerMode, String rebalanceStrategy) { + _admin.addResource(clusterName, resourceName, numResources, stateModelRef, rebalancerMode, + rebalanceStrategy); + } + + public void addResourceToCluster(String clusterName, String resourceName, int numResources, String stateModelRef, String rebalancerMode, int bucketSize) { _admin.addResource(clusterName, resourceName, numResources, stateModelRef, rebalancerMode, bucketSize); http://git-wip-us.apache.org/repos/asf/helix/blob/7147ec87/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java new file mode 100644 index 0000000..a4e38a1 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/Strategy/TestAutoRebalanceStrategy.java @@ -0,0 +1,766 @@ +package org.apache.helix.controller.Strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.helix.HelixDefinedState; +import org.apache.helix.Mocks.MockAccessor; +import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.ZNRecord; +import org.apache.helix.controller.rebalancer.AutoRebalancer; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; +import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.tools.StateModelConfigGenerator; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class TestAutoRebalanceStrategy { + private static Logger logger = Logger.getLogger(TestAutoRebalanceStrategy.class); + + /** + * Sanity test for a basic Master-Slave model + */ + @Test + public void simpleMasterSlaveTest() { + final int NUM_ITERATIONS = 10; + final int NUM_PARTITIONS = 10; + final int NUM_LIVE_NODES = 12; + final int NUM_TOTAL_NODES = 20; + final int MAX_PER_NODE = 5; + + final String[] STATE_NAMES = { + "MASTER", "SLAVE" + }; + final int[] STATE_COUNTS = { + 1, 2 + }; + + runTest("BasicMasterSlave", NUM_ITERATIONS, NUM_PARTITIONS, NUM_LIVE_NODES, NUM_TOTAL_NODES, + MAX_PER_NODE, STATE_NAMES, STATE_COUNTS); + } + + /** + * Run a test for an arbitrary state model. + * @param name Name of the test state model + * @param numIterations Number of rebalance tasks to run + * @param numPartitions Number of partitions for the resource + * @param numLiveNodes Number of live nodes in the cluster + * @param numTotalNodes Number of nodes in the cluster, must be greater than or equal to + * numLiveNodes + * @param maxPerNode Maximum number of replicas a node can serve + * @param stateNames States ordered by preference + * @param stateCounts Number of replicas that should be in each state + */ + private void runTest(String name, int numIterations, int numPartitions, int numLiveNodes, + int numTotalNodes, int maxPerNode, String[] stateNames, int[] stateCounts) { + List<String> partitions = new ArrayList<String>(); + for (int i = 0; i < numPartitions; i++) { + partitions.add("p_" + i); + } + + List<String> liveNodes = new ArrayList<String>(); + List<String> allNodes = new ArrayList<String>(); + for (int i = 0; i < numTotalNodes; i++) { + allNodes.add("n_" + i); + if (i < numLiveNodes) { + liveNodes.add("n_" + i); + } + } + + Map<String, Map<String, String>> currentMapping = new TreeMap<String, Map<String, String>>(); + + LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>(); + for (int i = 0; i < Math.min(stateNames.length, stateCounts.length); i++) { + states.put(stateNames[i], stateCounts[i]); + } + + StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states); + + new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode, + stateModelDef).runRepeatedly(numIterations); + } + + /** + * Get a StateModelDefinition without transitions. The auto rebalancer doesn't take transitions + * into account when computing mappings, so this is acceptable. + * @param modelName name to give the model + * @param initialState initial state for all nodes + * @param states ordered map of state to count + * @return incomplete StateModelDefinition for rebalancing + */ + private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState, + LinkedHashMap<String, Integer> states) { + StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName); + builder.initialState(initialState); + int i = states.size(); + for (String state : states.keySet()) { + builder.addState(state, i); + builder.upperBound(state, states.get(state)); + i--; + } + return builder.build(); + } + + class AutoRebalanceTester { + private static final double P_KILL = 0.45; + private static final double P_ADD = 0.1; + private static final double P_RESURRECT = 0.45; + private static final String RESOURCE_NAME = "resource"; + + private List<String> _partitions; + private LinkedHashMap<String, Integer> _states; + private List<String> _liveNodes; + private Set<String> _liveSet; + private Set<String> _removedSet; + private Set<String> _nonLiveSet; + private Map<String, Map<String, String>> _currentMapping; + private List<String> _allNodes; + private int _maxPerNode; + private StateModelDefinition _stateModelDef; + private Random _random; + + public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states, + List<String> liveNodes, Map<String, Map<String, String>> currentMapping, + List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) { + _partitions = partitions; + _states = states; + _liveNodes = liveNodes; + _liveSet = new TreeSet<String>(); + for (String node : _liveNodes) { + _liveSet.add(node); + } + _removedSet = new TreeSet<String>(); + _nonLiveSet = new TreeSet<String>(); + _currentMapping = currentMapping; + _allNodes = allNodes; + for (String node : allNodes) { + if (!_liveSet.contains(node)) { + _nonLiveSet.add(node); + } + } + _maxPerNode = maxPerNode; + _stateModelDef = stateModelDef; + _random = new Random(); + } + + /** + * Repeatedly randomly select a task to run and report the result + * @param numIterations + * Number of random tasks to run in sequence + */ + public void runRepeatedly(int numIterations) { + logger.info("~~~~ Initial State ~~~~~"); + RebalanceStrategy strategy = + new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode); + ZNRecord initialResult = + strategy.computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); + _currentMapping = getMapping(initialResult.getListFields()); + logger.info(_currentMapping); + getRunResult(_currentMapping, initialResult.getListFields()); + for (int i = 0; i < numIterations; i++) { + logger.info("~~~~ Iteration " + i + " ~~~~~"); + ZNRecord znRecord = runOnceRandomly(); + if (znRecord != null) { + final Map<String, List<String>> listResult = znRecord.getListFields(); + final Map<String, Map<String, String>> mapResult = getMapping(listResult); + logger.info(mapResult); + logger.info(listResult); + getRunResult(mapResult, listResult); + _currentMapping = mapResult; + } + } + } + + private Map<String, Map<String, String>> getMapping(final Map<String, List<String>> listResult) { + final Map<String, Map<String, String>> mapResult = new HashMap<String, Map<String, String>>(); + ClusterDataCache cache = new ClusterDataCache(); + MockAccessor accessor = new MockAccessor(); + Builder keyBuilder = accessor.keyBuilder(); + for (String node : _liveNodes) { + LiveInstance liveInstance = new LiveInstance(node); + liveInstance.setSessionId("testSession"); + accessor.setProperty(keyBuilder.liveInstance(node), liveInstance); + } + cache.refresh(accessor); + for (String partition : _partitions) { + List<String> preferenceList = listResult.get(partition); + Map<String, String> currentStateMap = _currentMapping.get(partition); + Set<String> disabled = Collections.emptySet(); + Map<String, String> assignment = + ConstraintBasedAssignment.computeAutoBestStateForPartition(cache, _stateModelDef, + preferenceList, currentStateMap, disabled, true); + mapResult.put(partition, assignment); + } + return mapResult; + } + + /** + * Output various statistics and correctness check results + * @param mapFields + * The map-map assignment generated by the rebalancer + * @param listFields + * The map-list assignment generated by the rebalancer + */ + public void getRunResult(final Map<String, Map<String, String>> mapFields, + final Map<String, List<String>> listFields) { + logger.info("***** Statistics *****"); + dumpStatistics(mapFields); + verifyCorrectness(mapFields, listFields); + } + + /** + * Output statistics about the assignment + * @param mapFields + * The map-map assignment generated by the rebalancer + */ + public void dumpStatistics(final Map<String, Map<String, String>> mapFields) { + Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields); + int nodeCount = _liveNodes.size(); + logger.info("Total number of nodes: " + nodeCount); + logger.info("Nodes: " + _liveNodes); + int sumPartitions = getSum(partitionsPerNode.values()); + logger.info("Total number of partitions: " + sumPartitions); + double averagePartitions = getAverage(partitionsPerNode.values()); + logger.info("Average number of partitions per node: " + averagePartitions); + double stdevPartitions = getStdev(partitionsPerNode.values(), averagePartitions); + logger.info("Standard deviation of partitions: " + stdevPartitions); + + // Statistics about each state + Map<String, Map<String, Integer>> statesPerNode = getStateBucketsForNode(mapFields); + for (String state : _states.keySet()) { + Map<String, Integer> nodeStateCounts = new TreeMap<String, Integer>(); + for (Entry<String, Map<String, Integer>> nodeStates : statesPerNode.entrySet()) { + Map<String, Integer> stateCounts = nodeStates.getValue(); + if (stateCounts.containsKey(state)) { + nodeStateCounts.put(nodeStates.getKey(), stateCounts.get(state)); + } else { + nodeStateCounts.put(nodeStates.getKey(), 0); + } + } + int sumStates = getSum(nodeStateCounts.values()); + logger.info("Total number of state " + state + ": " + sumStates); + double averageStates = getAverage(nodeStateCounts.values()); + logger.info("Average number of state " + state + " per node: " + averageStates); + double stdevStates = getStdev(nodeStateCounts.values(), averageStates); + logger.info("Standard deviation of state " + state + " per node: " + stdevStates); + } + } + + /** + * Run a set of correctness tests, reporting success or failure + * @param mapFields + * The map-map assignment generated by the rebalancer + * @param listFields + * The map-list assignment generated by the rebalancer + */ + public void verifyCorrectness(final Map<String, Map<String, String>> mapFields, + final Map<String, List<String>> listFields) { + final Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields); + boolean maxConstraintMet = maxNotExceeded(partitionsPerNode); + assert maxConstraintMet : "Max per node constraint: FAIL"; + logger.info("Max per node constraint: PASS"); + + boolean liveConstraintMet = onlyLiveAssigned(partitionsPerNode); + assert liveConstraintMet : "Only live nodes have partitions constraint: FAIL"; + logger.info("Only live nodes have partitions constraint: PASS"); + + boolean stateAssignmentPossible = correctStateAssignmentCount(mapFields); + assert stateAssignmentPossible : "State replica constraint: FAIL"; + logger.info("State replica constraint: PASS"); + + boolean nodesUniqueForPartitions = atMostOnePartitionReplicaPerNode(listFields); + assert nodesUniqueForPartitions : "Node uniqueness per partition constraint: FAIL"; + logger.info("Node uniqueness per partition constraint: PASS"); + } + + private boolean maxNotExceeded(final Map<String, Integer> partitionsPerNode) { + for (String node : partitionsPerNode.keySet()) { + Integer value = partitionsPerNode.get(node); + if (value > _maxPerNode) { + logger.error("ERROR: Node " + node + " has " + value + + " partitions despite a maximum of " + _maxPerNode); + return false; + } + } + return true; + } + + private boolean onlyLiveAssigned(final Map<String, Integer> partitionsPerNode) { + for (final Entry<String, Integer> nodeState : partitionsPerNode.entrySet()) { + boolean isLive = _liveSet.contains(nodeState.getKey()); + boolean isEmpty = nodeState.getValue() == 0; + if (!isLive && !isEmpty) { + logger.error("ERROR: Node " + nodeState.getKey() + " is not live, but has " + + nodeState.getValue() + " replicas!"); + return false; + } + } + return true; + } + + private boolean correctStateAssignmentCount(final Map<String, Map<String, String>> assignment) { + for (final Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) { + final Map<String, String> nodeMap = partitionEntry.getValue(); + final Map<String, Integer> stateCounts = new TreeMap<String, Integer>(); + for (String state : nodeMap.values()) { + if (!stateCounts.containsKey(state)) { + stateCounts.put(state, 1); + } else { + stateCounts.put(state, stateCounts.get(state) + 1); + } + } + for (String state : stateCounts.keySet()) { + if (state.equals(HelixDefinedState.DROPPED.toString())) { + continue; + } + int count = stateCounts.get(state); + int maximumCount = _states.get(state); + if (count > maximumCount) { + logger.error("ERROR: State " + state + " for partition " + partitionEntry.getKey() + + " has " + count + " replicas when " + maximumCount + " is allowed!"); + return false; + } + } + } + return true; + } + + private boolean atMostOnePartitionReplicaPerNode(final Map<String, List<String>> listFields) { + for (final Entry<String, List<String>> partitionEntry : listFields.entrySet()) { + Set<String> nodeSet = new HashSet<String>(partitionEntry.getValue()); + int numUniques = nodeSet.size(); + int total = partitionEntry.getValue().size(); + if (numUniques < total) { + logger.error("ERROR: Partition " + partitionEntry.getKey() + " is assigned to " + total + + " nodes, but only " + numUniques + " are unique!"); + return false; + } + } + return true; + } + + private double getAverage(final Collection<Integer> values) { + double sum = 0.0; + for (Integer value : values) { + sum += value; + } + if (values.size() != 0) { + return sum / values.size(); + } else { + return -1.0; + } + } + + private int getSum(final Collection<Integer> values) { + int sum = 0; + for (Integer value : values) { + sum += value; + } + return sum; + } + + private double getStdev(final Collection<Integer> values, double mean) { + double sum = 0.0; + for (Integer value : values) { + double deviation = mean - value; + sum += Math.pow(deviation, 2.0); + } + if (values.size() != 0) { + sum /= values.size(); + return Math.pow(sum, 0.5); + } else { + return -1.0; + } + } + + private Map<String, Integer> getPartitionBucketsForNode( + final Map<String, Map<String, String>> assignment) { + Map<String, Integer> partitionsPerNode = new TreeMap<String, Integer>(); + for (String node : _liveNodes) { + partitionsPerNode.put(node, 0); + } + for (Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) { + final Map<String, String> nodeMap = partitionEntry.getValue(); + for (String node : nodeMap.keySet()) { + String state = nodeMap.get(node); + if (state.equals(HelixDefinedState.DROPPED.toString())) { + continue; + } + // add 1 for every occurrence of a node + if (!partitionsPerNode.containsKey(node)) { + partitionsPerNode.put(node, 1); + } else { + partitionsPerNode.put(node, partitionsPerNode.get(node) + 1); + } + } + } + return partitionsPerNode; + } + + private Map<String, Map<String, Integer>> getStateBucketsForNode( + final Map<String, Map<String, String>> assignment) { + Map<String, Map<String, Integer>> result = new TreeMap<String, Map<String, Integer>>(); + for (String n : _liveNodes) { + result.put(n, new TreeMap<String, Integer>()); + } + for (Map<String, String> nodeStateMap : assignment.values()) { + for (Entry<String, String> nodeState : nodeStateMap.entrySet()) { + if (!result.containsKey(nodeState.getKey())) { + result.put(nodeState.getKey(), new TreeMap<String, Integer>()); + } + Map<String, Integer> stateMap = result.get(nodeState.getKey()); + if (!stateMap.containsKey(nodeState.getValue())) { + stateMap.put(nodeState.getValue(), 1); + } else { + stateMap.put(nodeState.getValue(), stateMap.get(nodeState.getValue()) + 1); + } + } + } + return result; + } + + /** + * Randomly choose between killing, adding, or resurrecting a single node + * @return (Partition -> (Node -> State)) ZNRecord + */ + public ZNRecord runOnceRandomly() { + double choose = _random.nextDouble(); + ZNRecord result = null; + if (choose < P_KILL) { + result = removeSingleNode(null); + } else if (choose < P_KILL + P_ADD) { + result = addSingleNode(null); + } else if (choose < P_KILL + P_ADD + P_RESURRECT) { + result = resurrectSingleNode(null); + } + return result; + } + + /** + * Run rebalancer trying to add a never-live node + * @param node + * Optional String to add + * @return ZNRecord result returned by the rebalancer + */ + public ZNRecord addSingleNode(String node) { + logger.info("=================== add node ================="); + if (_nonLiveSet.size() == 0) { + logger.warn("Cannot add node because there are no nodes left to add."); + return null; + } + + // Get a random never-live node + if (node == null || !_nonLiveSet.contains(node)) { + node = getRandomSetElement(_nonLiveSet); + } + logger.info("Adding " + node); + _liveNodes.add(node); + _liveSet.add(node); + _nonLiveSet.remove(node); + + return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode). + computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); + } + + /** + * Run rebalancer trying to remove a live node + * @param node + * Optional String to remove + * @return ZNRecord result returned by the rebalancer + */ + public ZNRecord removeSingleNode(String node) { + logger.info("=================== remove node ================="); + if (_liveSet.size() == 0) { + logger.warn("Cannot remove node because there are no nodes left to remove."); + return null; + } + + // Get a random never-live node + if (node == null || !_liveSet.contains(node)) { + node = getRandomSetElement(_liveSet); + } + logger.info("Removing " + node); + _removedSet.add(node); + _liveNodes.remove(node); + _liveSet.remove(node); + + // the rebalancer expects that the current mapping doesn't contain deleted + // nodes + for (Map<String, String> nodeMap : _currentMapping.values()) { + if (nodeMap.containsKey(node)) { + nodeMap.remove(node); + } + } + + return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) + .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); + } + + /** + * Run rebalancer trying to add back a removed node + * @param node + * Optional String to resurrect + * @return ZNRecord result returned by the rebalancer + */ + public ZNRecord resurrectSingleNode(String node) { + logger.info("=================== resurrect node ================="); + if (_removedSet.size() == 0) { + logger.warn("Cannot remove node because there are no nodes left to resurrect."); + return null; + } + + // Get a random never-live node + if (node == null || !_removedSet.contains(node)) { + node = getRandomSetElement(_removedSet); + } + logger.info("Resurrecting " + node); + _removedSet.remove(node); + _liveNodes.add(node); + _liveSet.add(node); + + return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) + .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); + } + + private <T> T getRandomSetElement(Set<T> source) { + int element = _random.nextInt(source.size()); + int i = 0; + for (T node : source) { + if (i == element) { + return node; + } + i++; + } + return null; + } + } + + /** + * Tests the following scenario: nodes come up one by one, then one node is taken down. Preference + * lists should prefer nodes in the current mapping at all times, but when all nodes are in the + * current mapping, then it should distribute states as evenly as possible. + */ + @Test + public void testOrphansNotPreferred() { + final String RESOURCE_NAME = "resource"; + final String[] PARTITIONS = { + "resource_0", "resource_1", "resource_2" + }; + final StateModelDefinition STATE_MODEL = + new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); + final int REPLICA_COUNT = 2; + final String[] NODES = { + "n0", "n1", "n2" + }; + + // initial state, one node, no mapping + List<String> allNodes = Lists.newArrayList(NODES[0]); + List<String> liveNodes = Lists.newArrayList(NODES[0]); + Map<String, Map<String, String>> currentMapping = Maps.newHashMap(); + for (String partition : PARTITIONS) { + currentMapping.put(partition, new HashMap<String, String>()); + } + + // make sure that when the first node joins, a single replica is assigned fairly + List<String> partitions = ImmutableList.copyOf(PARTITIONS); + LinkedHashMap<String, Integer> stateCount = + AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT); + ZNRecord znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + Map<String, List<String>> preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + // make sure these are all MASTER + List<String> preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition); + } + + // now assign a replica to the first node in the current mapping, and add a second node + allNodes.add(NODES[1]); + liveNodes.add(NODES[1]); + stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT); + for (String partition : PARTITIONS) { + currentMapping.get(partition).put(NODES[0], "MASTER"); + } + znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + List<String> preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for " + + partition); + Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for " + + partition); + } + + // now set the current mapping to reflect this update and make sure that it distributes masters + for (String partition : PARTITIONS) { + currentMapping.get(partition).put(NODES[1], "SLAVE"); + } + znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + preferenceLists = znRecord.getListFields(); + Set<String> firstNodes = Sets.newHashSet(); + for (String partition : currentMapping.keySet()) { + List<String> preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + firstNodes.add(preferenceList.get(0)); + } + Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed"); + + // set a mapping corresponding to a valid mapping for 2 nodes, add a third node, check that the + // new node is never the most preferred + allNodes.add(NODES[2]); + liveNodes.add(NODES[2]); + stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT); + + // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one + currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE"); + currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); + znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + preferenceLists = znRecord.getListFields(); + boolean newNodeUsed = false; + for (String partition : currentMapping.keySet()) { + List<String> preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + if (preferenceList.contains(NODES[2])) { + newNodeUsed = true; + Assert.assertEquals(preferenceList.get(1), NODES[2], + "newly added node not at preference list tail for " + partition); + } + } + Assert.assertTrue(newNodeUsed, "not using " + NODES[2]); + + // now remap this to take the new node into account, should go back to balancing masters, slaves + // evenly across all nodes + for (String partition : PARTITIONS) { + currentMapping.get(partition).clear(); + } + currentMapping.get(PARTITIONS[0]).put(NODES[0], "MASTER"); + currentMapping.get(PARTITIONS[0]).put(NODES[1], "SLAVE"); + currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); + currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE"); + currentMapping.get(PARTITIONS[2]).put(NODES[0], "MASTER"); + currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE"); + znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + preferenceLists = znRecord.getListFields(); + firstNodes.clear(); + Set<String> secondNodes = Sets.newHashSet(); + for (String partition : currentMapping.keySet()) { + List<String> preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + firstNodes.add(preferenceList.get(0)); + secondNodes.add(preferenceList.get(1)); + } + Assert.assertEquals(firstNodes.size(), 3, "masters not distributed evenly"); + Assert.assertEquals(secondNodes.size(), 3, "slaves not distributed evenly"); + + // remove a node now, but use the current mapping with everything balanced just prior + liveNodes.remove(0); + stateCount = AutoRebalancer.stateCount(STATE_MODEL, liveNodes.size(), REPLICA_COUNT); + + // remove all references of n0 from the mapping, keep everything else in a legal state + for (String partition : PARTITIONS) { + currentMapping.get(partition).clear(); + } + currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER"); + currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); + currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE"); + currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER"); + znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + preferenceLists = znRecord.getListFields(); + for (String partition : currentMapping.keySet()) { + List<String> preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + Map<String, String> stateMap = currentMapping.get(partition); + for (String participant : stateMap.keySet()) { + Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for " + + partition); + } + for (String participant : preferenceList) { + if (!stateMap.containsKey(participant)) { + Assert.assertNotSame(preferenceList.get(0), participant, + "newly moved replica should not be master for " + partition); + } + } + } + + // finally, adjust the current mapping to reflect 2 nodes and make sure everything's even again + for (String partition : PARTITIONS) { + currentMapping.get(partition).clear(); + } + currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER"); + currentMapping.get(PARTITIONS[0]).put(NODES[2], "SLAVE"); + currentMapping.get(PARTITIONS[1]).put(NODES[1], "SLAVE"); + currentMapping.get(PARTITIONS[1]).put(NODES[2], "MASTER"); + currentMapping.get(PARTITIONS[2]).put(NODES[1], "SLAVE"); + currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER"); + znRecord = + new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) + .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); + preferenceLists = znRecord.getListFields(); + firstNodes.clear(); + for (String partition : currentMapping.keySet()) { + List<String> preferenceList = preferenceLists.get(partition); + Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); + Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); + firstNodes.add(preferenceList.get(0)); + } + Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed"); + } +}
