Implement of CRUSH-ed algorithm. The algorithm is based on CRUSH. The new implementation trade-off between uniform distribution and partition movements during major cluster changes. Please refer to https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/CRUSH-ed+for+even+distribution
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9dfb098e Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9dfb098e Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9dfb098e Branch: refs/heads/master Commit: 9dfb098ed6083609f6bbd17cd202b65ce26e8c7c Parents: fde1a6a Author: Jiajun Wang <[email protected]> Authored: Mon Nov 20 14:06:49 2017 -0800 Committer: Junkai Xue <[email protected]> Committed: Wed Jan 24 18:33:24 2018 -0800 ---------------------------------------------------------------------- ...stractEvenDistributionRebalanceStrategy.java | 208 +++++ .../strategy/CrushEdRebalanceStrategy.java | 32 + .../CardDealingAdjustmentAlgorithm.java | 212 ++++++ .../ConsistentHashingAdjustmentAlgorithm.java | 136 ++++ .../java/org/apache/helix/DistributionTest.java | 753 +++++++++++++++++++ .../TestCrushAutoRebalance.java | 7 +- .../TestCrushAutoRebalanceNonRack.java | 95 ++- 7 files changed, 1400 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java new file mode 100644 index 0000000..9012f73 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/AbstractEvenDistributionRebalanceStrategy.java @@ -0,0 +1,208 @@ +package org.apache.helix.controller.rebalancer.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixException; +import org.apache.helix.ZNRecord; +import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithm; +import org.apache.helix.controller.rebalancer.strategy.crushMapping.ConsistentHashingAdjustmentAlgorithm; +import org.apache.helix.controller.rebalancer.topology.Topology; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * Abstract class of Forced Even Assignment Patched Algorithm. + * This class contains common logic that re-calculate assignment based on a result calculated by the base algorithm. + * The target of this patching step is more even partition distribution, but number of partitions to be reshuffled during node outage could be higher than the base algorithm. + */ +public abstract class AbstractEvenDistributionRebalanceStrategy implements RebalanceStrategy { + private static final Logger _logger = + LoggerFactory.getLogger(AbstractEvenDistributionRebalanceStrategy.class); + private String _resourceName; + private int _replica; + + protected abstract RebalanceStrategy getBaseRebalanceStrategy(); + + @Override + public void init(String resourceName, final List<String> partitions, + final LinkedHashMap<String, Integer> states, int maximumPerNode) { + _resourceName = resourceName; + getBaseRebalanceStrategy().init(resourceName, partitions, states, maximumPerNode); + _replica = countStateReplicas(states); + } + + /** + * Force uniform distribution based on the parent strategy class's calculation result. + * + * @param allNodes All instances + * @param liveNodes List of live instances + * @param currentMapping current replica mapping + * @param clusterData cluster data + * @return + * @throws HelixException + */ + @Override + public ZNRecord computePartitionAssignment(final List<String> allNodes, + final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping, + ClusterDataCache clusterData) throws HelixException { + boolean continueNextStep = true; + // Round 1: Calculate mapping using the base strategy. + // Note to use all nodes for minimizing the influence of live node changes to mapping. + ZNRecord origAssignment = getBaseRebalanceStrategy() + .computePartitionAssignment(allNodes, allNodes, currentMapping, clusterData); + Map<String, List<String>> origPartitionMap = origAssignment.getListFields(); + // If the original calculation contains no assignment, skip patching + if (origPartitionMap.isEmpty()) { + continueNextStep = false; + } + + // Transform current assignment to instance->partitions map, and get total partitions + Map<String, List<String>> nodeToPartitionMap = convertMap(origPartitionMap); + + if (continueNextStep) { + // Round 2: Rebalance mapping using card dealing algorithm. For ensuring evenness distribution. + Topology allNodeTopo = new Topology(allNodes, allNodes, clusterData.getInstanceConfigMap(), + clusterData.getClusterConfig()); + CardDealingAdjustmentAlgorithm cardDealer = + new CardDealingAdjustmentAlgorithm(allNodeTopo, _replica); + continueNextStep = cardDealer.computeMapping(nodeToPartitionMap, _resourceName.hashCode()); + } + + // Round 3: Reorder preference Lists to ensure participants' orders (so as the states) are uniform. + Map<String, List<String>> partitionMap = shufflePreferenceList(nodeToPartitionMap); + + // Round 4: Re-mapping the partitions on non-live nodes using consistent hashing for reducing movement. + if (continueNextStep && !liveNodes.containsAll(allNodes)) { + Topology liveNodeTopo = new Topology(allNodes, liveNodes, clusterData.getInstanceConfigMap(), + clusterData.getClusterConfig()); + ConsistentHashingAdjustmentAlgorithm hashPlacement = + new ConsistentHashingAdjustmentAlgorithm(liveNodeTopo); + if (hashPlacement.computeMapping(nodeToPartitionMap, _resourceName.hashCode())) { + // Since mapping is changed by hashPlacement, need to adjust nodes order. + Map<String, List<String>> adjustedPartitionMap = convertMap(nodeToPartitionMap); + for (String partition : adjustedPartitionMap.keySet()) { + List<String> preSelectedList = partitionMap.get(partition); + Set<String> adjustedNodeList = new HashSet<>(adjustedPartitionMap.get(partition)); + List<String> finalNodeList = adjustedPartitionMap.get(partition); + int index = 0; + // 1. Add the ones in pre-selected node list first, in order + for (String node : preSelectedList) { + if (adjustedNodeList.remove(node)) { + finalNodeList.set(index++, node); + } + } + // 2. Add the rest of nodes to the map + for (String node : adjustedNodeList) { + finalNodeList.set(index++, node); + } + } + partitionMap = adjustedPartitionMap; + } else { + continueNextStep = false; + } + } + + if (continueNextStep) { + ZNRecord result = new ZNRecord(_resourceName); + result.setListFields(partitionMap); + return result; + } else { + if (_logger.isDebugEnabled()) { + _logger.debug("Force even distribution is not possible, using the default strategy: " + + getBaseRebalanceStrategy().getClass().getSimpleName()); + } + // Force even is not possible, fallback to use default strategy + if (liveNodes.equals(allNodes)) { + return origAssignment; + } else { + // need to re-calculate since node list is different. + return getBaseRebalanceStrategy() + .computePartitionAssignment(allNodes, liveNodes, currentMapping, clusterData); + } + } + } + + // Best effort to shuffle preference lists for all partitions for uniform distribution regarding the top state. + private Map<String, List<String>> shufflePreferenceList( + Map<String, List<String>> nodeToPartitionMap) { + final Map<String, List<String>> partitionMap = convertMap(nodeToPartitionMap); + // evaluate node's order according to: + // 1. their potential top state replicas count (less count, higher priority) + // 2. their assigned top state replicas (less top state replica, higher priority) + final Map<String, Integer> nodeScores = new HashMap<>(); + for (String node : nodeToPartitionMap.keySet()) { + // Init with the potential replicas count + nodeScores.put(node, nodeToPartitionMap.get(node).size()); + } + for (final String partition : partitionMap.keySet()) { + List<String> nodes = partitionMap.get(partition); + // order according to score + Collections.sort(nodes, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + int o1Score = nodeScores.get(o1); + int o2Score = nodeScores.get(o2); + if (o1Score == o2Score) { + return new Integer((partition + o1).hashCode()).compareTo((partition + o2).hashCode()); + } else { + return o1Score - o2Score; + } + } + }); + // After assignment, the nodes has less potential top states + for (int i = 0; i < nodes.size(); i++) { + String nodeName = nodes.get(i); + nodeScores.put(nodeName, + nodeScores.get(nodeName) - 1 + (i == 0 ? (int) Math.pow(_replica, 2) : 0)); + } + } + return partitionMap; + } + + // Convert the map from <key, list of values> to a new map <original value, list of related keys> + private Map<String, List<String>> convertMap(Map<String, List<String>> originalMap) { + Map<String, List<String>> resultMap = new HashMap<>(); + for (String originalKey : originalMap.keySet()) { + for (String originalValue : originalMap.get(originalKey)) { + if (!resultMap.containsKey(originalValue)) { + resultMap.put(originalValue, new ArrayList<String>()); + } + resultMap.get(originalValue).add(originalKey); + } + } + return resultMap; + } + + /** + * Counts the total number of replicas given a state-count mapping + * + * @return + */ + private int countStateReplicas(Map<String, Integer> stateCountMap) { + int total = 0; + for (Integer count : stateCountMap.values()) { + total += count; + } + return total; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushEdRebalanceStrategy.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushEdRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushEdRebalanceStrategy.java new file mode 100644 index 0000000..a7d0c4f --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/CrushEdRebalanceStrategy.java @@ -0,0 +1,32 @@ +package org.apache.helix.controller.rebalancer.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * CRUSH-ed, CRUSH with even distribution. This is an Auto rebalance strategy based on CRUSH algorithm. + * This gives even partition distribution, but number of partitions to be reshuffled during node outage could be high. + */ +public class CrushEdRebalanceStrategy extends AbstractEvenDistributionRebalanceStrategy { + private final RebalanceStrategy _baseStrategy = new CrushRebalanceStrategy(); + + protected RebalanceStrategy getBaseRebalanceStrategy() { + return _baseStrategy; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java new file mode 100644 index 0000000..4470094 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CardDealingAdjustmentAlgorithm.java @@ -0,0 +1,212 @@ +package org.apache.helix.controller.rebalancer.strategy.crushMapping; + +import org.apache.helix.controller.rebalancer.topology.Node; +import org.apache.helix.controller.rebalancer.topology.Topology; + +import java.util.*; + +public class CardDealingAdjustmentAlgorithm { + private static int MAX_ADJUSTMENT = 2; + + private int _replica; + // Instance -> FaultZone Tag + private Map<String, String> _instanceFaultZone = new HashMap<>(); + private Map<String, Long> _instanceWeight = new HashMap<>(); + private long _totalWeight = 0; + private Map<String, Long> _faultZoneWeight = new HashMap<>(); + // Record existing partitions that are assigned to a fault zone + private Map<String, Set<String>> _faultZonePartitionMap = new HashMap<>(); + + public CardDealingAdjustmentAlgorithm(Topology topology, int replica) { + _replica = replica; + // Get all instance related information. + for (Node zone : topology.getFaultZones()) { + _faultZoneWeight.put(zone.getName(), zone.getWeight()); + if (!_faultZonePartitionMap.containsKey(zone.getName())) { + _faultZonePartitionMap.put(zone.getName(), new HashSet<String>()); + } + for (Node instance : Topology.getAllLeafNodes(zone)) { + if (!instance.isFailed()) { + _instanceWeight.put(instance.getName(), instance.getWeight()); + _totalWeight += instance.getWeight(); + _instanceFaultZone.put(instance.getName(), zone.getName()); + } + } + } + } + + public boolean computeMapping(Map<String, List<String>> nodeToPartitionMap, int randomSeed) { + // Records exceed partitions + TreeMap<String, Integer> toBeReassigned = new TreeMap<>(); + + // Calculate total partitions that need to be calculated + long totalReplicaCount = 0; + for (List<String> partitions : nodeToPartitionMap.values()) { + totalReplicaCount += partitions.size(); + } + if (totalReplicaCount == 0 || _replica > _faultZoneWeight.size()) { + return false; + } + + // instance -> target (ideal) partition count + Map<String, Float> targetPartitionCount = new HashMap<>(); + for (String liveInstance : _instanceFaultZone.keySet()) { + long zoneWeight = _faultZoneWeight.get(_instanceFaultZone.get(liveInstance)); + float instanceRatioInZone = ((float) _instanceWeight.get(liveInstance)) / zoneWeight; + // 1. if replica = fault zone, fault zone weight does not count, so calculate according to fault zone count. + // 2. else, should consider fault zone weight to calculate expected threshold. + float zonePartitions; + if (_replica == _faultZoneWeight.size()) { + zonePartitions = ((float) totalReplicaCount) / _faultZoneWeight.size(); + } else { + zonePartitions = ((float) totalReplicaCount) * zoneWeight / _totalWeight; + } + targetPartitionCount.put(liveInstance, instanceRatioInZone * zonePartitions); + } + + // Calculate the expected spikes + // Assign spikes to each zone according to zone weight + int totalOverflows = (int) totalReplicaCount % _instanceFaultZone.size(); + Map<String, Integer> maxZoneOverflows = new HashMap<>(); + for (String faultZoneName : _faultZoneWeight.keySet()) { + float zoneWeight = _faultZoneWeight.get(faultZoneName); + maxZoneOverflows.put(faultZoneName, + (int) Math.ceil(((float) totalOverflows) * zoneWeight / _totalWeight)); + } + + Iterator<String> nodeIter = nodeToPartitionMap.keySet().iterator(); + while (nodeIter.hasNext()) { + String instance = nodeIter.next(); + // Cleanup the existing mapping. Remove all non-active nodes from the mapping + if (!_instanceFaultZone.containsKey(instance)) { + List<String> partitions = nodeToPartitionMap.get(instance); + addToReAssignPartition(toBeReassigned, partitions); + partitions.clear(); + nodeIter.remove(); + } + } + + List<String> orderedInstances = new ArrayList<>(_instanceFaultZone.keySet()); + // Different resource should shuffle nodes in different ways. + Collections.shuffle(orderedInstances, new Random(randomSeed)); + for (String instance : orderedInstances) { + if (!nodeToPartitionMap.containsKey(instance)) { + continue; + } + // Cut off the exceed partitions compared with target partition count. + List<String> partitions = nodeToPartitionMap.get(instance); + int target = (int) (Math.floor(targetPartitionCount.get(instance))); + if (partitions.size() > target) { + int maxZoneOverflow = maxZoneOverflows.get(_instanceFaultZone.get(instance)); + if (maxZoneOverflow > 0 && totalOverflows > 0) { + // When fault zone has overflow capacity AND there are still remaining overflow partitions + target = (int) (Math.ceil(targetPartitionCount.get(instance))); + maxZoneOverflows.put(_instanceFaultZone.get(instance), maxZoneOverflow - 1); + totalOverflows--; + } + + // Shuffle partitions to randomly pickup exceed ones. Ensure the algorithm generates consistent results when the inputs are the same. + Collections.shuffle(partitions, new Random(instance.hashCode() * 31 + randomSeed)); + addToReAssignPartition(toBeReassigned, partitions.subList(target, partitions.size())); + + // Put the remaining partitions to the assignment, and record in fault zone partition list + List<String> remainingPartitions = new ArrayList<>(partitions.subList(0, target)); + partitions.clear(); + nodeToPartitionMap.put(instance, remainingPartitions); + } + _faultZonePartitionMap.get(_instanceFaultZone.get(instance)) + .addAll(nodeToPartitionMap.get(instance)); + } + + // Reassign if any instances have space left. + // Assign partition according to the target capacity, CAP at "Math.floor(target) + adjustment" + int adjustment = 0; + while (!toBeReassigned.isEmpty() && adjustment <= MAX_ADJUSTMENT) { + partitionDealing(_instanceFaultZone.keySet(), toBeReassigned, _faultZonePartitionMap, + _instanceFaultZone, nodeToPartitionMap, targetPartitionCount, randomSeed, adjustment++); + } + return toBeReassigned.isEmpty(); + } + + private void partitionDealing(Collection<String> instances, + TreeMap<String, Integer> toBeReassigned, Map<String, Set<String>> faultZonePartitionMap, + Map<String, String> faultZoneMap, final Map<String, List<String>> assignmentMap, + Map<String, Float> targetPartitionCount, final int randomSeed, int targetAdjustment) { + PriorityQueue<String> instanceQueue = + new PriorityQueue<>(instances.size(), new Comparator<String>() { + @Override + public int compare(String node1, String node2) { + int node1Load = assignmentMap.containsKey(node1) ? assignmentMap.get(node1).size() : 0; + int node2Load = assignmentMap.containsKey(node2) ? assignmentMap.get(node2).size() : 0; + if (node1Load == node2Load) { + return new Integer((node1 + randomSeed).hashCode()) + .compareTo((node2 + randomSeed).hashCode()); + } else { + return node1Load - node2Load; + } + } + }); + instanceQueue.addAll(instances); + + while (!toBeReassigned.isEmpty()) { + boolean anyPartitionAssigned = false; + Iterator<String> instanceIter = instanceQueue.iterator(); + while (instanceIter.hasNext()) { + String instance = instanceIter.next(); + // Temporary remove the node from queue. + // If any partition assigned to the instance, add it back to reset priority. + instanceIter.remove(); + boolean partitionAssignedToInstance = false; + String faultZoneStr = faultZoneMap.get(instance); + List<String> partitions = assignmentMap.containsKey(instance) ? + assignmentMap.get(instance) : + new ArrayList<String>(); + int space = + (int) (Math.floor(targetPartitionCount.get(instance))) + targetAdjustment - partitions + .size(); + if (space > 0) { + // Find a pending partition to locate + for (String pendingPartition : toBeReassigned.navigableKeySet()) { + if (!faultZonePartitionMap.get(faultZoneStr).contains(pendingPartition)) { + if (!assignmentMap.containsKey(instance)) { + assignmentMap.put(instance, partitions); + } + partitions.add(pendingPartition); + faultZonePartitionMap.get(faultZoneStr).add(pendingPartition); + if (toBeReassigned.get(pendingPartition) == 1) { + toBeReassigned.remove(pendingPartition); + } else { + toBeReassigned.put(pendingPartition, toBeReassigned.get(pendingPartition) - 1); + } + // if any assignment is made: + // this instance can hold more partitions in the future + partitionAssignedToInstance = true; + break; + } + } + } + if (partitionAssignedToInstance) { + // Reset priority in the queue + instanceQueue.add(instance); + anyPartitionAssigned = true; + break; + } + } + if (!anyPartitionAssigned) { + // if no pending partition is assigned to any instances in this loop, new assignment is not possible + break; + } + } + } + + private void addToReAssignPartition(TreeMap<String, Integer> toBeReassigned, + List<String> partitions) { + for (String partition : partitions) { + if (!toBeReassigned.containsKey(partition)) { + toBeReassigned.put(partition, 1); + } else { + toBeReassigned.put(partition, toBeReassigned.get(partition) + 1); + } + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/ConsistentHashingAdjustmentAlgorithm.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/ConsistentHashingAdjustmentAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/ConsistentHashingAdjustmentAlgorithm.java new file mode 100644 index 0000000..e594cd1 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/ConsistentHashingAdjustmentAlgorithm.java @@ -0,0 +1,136 @@ +package org.apache.helix.controller.rebalancer.strategy.crushMapping; + +import org.apache.helix.controller.rebalancer.topology.Node; +import org.apache.helix.controller.rebalancer.topology.Topology; +import org.apache.helix.util.JenkinsHash; + +import java.util.*; + +public class ConsistentHashingAdjustmentAlgorithm { + private JenkinsHash _hashFunction; + private ConsistentHashSelector _selector; + Set<String> _liveInstances = new HashSet<>(); + // Instance -> FaultZone Tag + private Map<String, String> _faultZoneMap = new HashMap<>(); + // Record existing partitions that are assigned to a fault zone + private Map<String, Set<String>> _faultZonePartitionMap = new HashMap<>(); + + public ConsistentHashingAdjustmentAlgorithm(Topology topology) { + _hashFunction = new JenkinsHash(); + List<String> allInstances = new ArrayList<>(); + // Get all instance related information. + for (Node zone : topology.getFaultZones()) { + for (Node instance : Topology.getAllLeafNodes(zone)) { + if (!instance.isFailed()) { + _liveInstances.add(instance.getName()); + } + allInstances.add(instance.getName()); + _faultZoneMap.put(instance.getName(), zone.getName()); + if (!_faultZonePartitionMap.containsKey(zone.getName())) { + _faultZonePartitionMap.put(zone.getName(), new HashSet<String>()); + } + } + } + _selector = new ConsistentHashSelector(allInstances); + } + + public boolean computeMapping(Map<String, List<String>> nodeToPartitionMap, int randomSeed) { + if (_liveInstances.isEmpty()) { + return false; + } + + Set<String> inactiveInstances = new HashSet<>(); + Map<String, Integer> toBeReassigned = new HashMap<>(); + // Remove all partition assignment to a non-live instance + Iterator<String> nodeIter = nodeToPartitionMap.keySet().iterator(); + while (nodeIter.hasNext()) { + String instance = nodeIter.next(); + List<String> partitions = nodeToPartitionMap.get(instance); + if (!_liveInstances.contains(instance)) { + inactiveInstances.add(instance); + addToReAssignPartition(toBeReassigned, partitions); + partitions.clear(); + nodeIter.remove(); + } else { + _faultZonePartitionMap.get(_faultZoneMap.get(instance)).addAll(partitions); + } + } + + for (String partition : new ArrayList<>(toBeReassigned.keySet())) { + int remainReplicas = toBeReassigned.get(partition); + Set<String> conflictInstance = new HashSet<>(); + for (int index = 0; index < toBeReassigned.get(partition); index++) { + Iterable<String> sortedInstances = _selector.getCircle(_hashFunction.hash(randomSeed, partition.hashCode(), index)); + Iterator<String> instanceItr = sortedInstances.iterator(); + while (instanceItr.hasNext() && conflictInstance.size() + inactiveInstances.size() != _selector.instanceSize) { + String instance = instanceItr.next(); + if (!_liveInstances.contains(instance)) { + inactiveInstances.add(instance); + } + if (inactiveInstances.contains(instance) || conflictInstance.contains(instance)) { + continue; + } + Set<String> faultZonePartitions = _faultZonePartitionMap.get(_faultZoneMap.get(instance)); + if (faultZonePartitions.contains(partition)) { + conflictInstance.add(instance); + continue; + } + // insert this assignment + if (!nodeToPartitionMap.containsKey(instance)) { + nodeToPartitionMap.put(instance, new ArrayList<String>()); + } + nodeToPartitionMap.get(instance).add(partition); + faultZonePartitions.add(partition); + remainReplicas--; + break; + } + } + if (remainReplicas == 0) { + toBeReassigned.remove(partition); + } else { + toBeReassigned.put(partition, remainReplicas); + } + } + + return toBeReassigned.isEmpty(); + } + + private void addToReAssignPartition(Map<String, Integer> toBeReassigned, + List<String> partitions) { + for (String partition : partitions) { + if (!toBeReassigned.containsKey(partition)) { + toBeReassigned.put(partition, 1); + } else { + toBeReassigned.put(partition, toBeReassigned.get(partition) + 1); + } + } + } + + private class ConsistentHashSelector { + private final static int DEFAULT_TOKENS_PER_INSTANCE = 1000; + private final SortedMap<Long, String> circle = new TreeMap<Long, String>(); + protected int instanceSize = 0; + + public ConsistentHashSelector(List<String> instances) { + for (String instance : instances) { + long tokenCount = DEFAULT_TOKENS_PER_INSTANCE; + add(instance, tokenCount); + instanceSize++; + } + } + + private void add(String instance, long numberOfReplicas) { + for (int i = 0; i < numberOfReplicas; i++) { + circle.put(_hashFunction.hash(instance.hashCode(), i), instance); + } + } + + public Iterable<String> getCircle(long data) { + if (circle.isEmpty()) { + return null; + } + long hash = _hashFunction.hash(data); + return circle.tailMap(hash).values(); + } + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/test/java/org/apache/helix/DistributionTest.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/DistributionTest.java b/helix-core/src/test/java/org/apache/helix/DistributionTest.java new file mode 100644 index 0000000..bf580d1 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/DistributionTest.java @@ -0,0 +1,753 @@ +package org.apache.helix; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; +import org.apache.helix.controller.rebalancer.topology.Node; +import org.apache.helix.controller.rebalancer.topology.Topology; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.util.HelixUtil; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; + +public class DistributionTest { + private static String instanceFolderPath; + private static String instanceList; + private static String idealStateFolderPath; + private static String idealStateList; + + String Path = "/home/jjwang/Desktop/FEAP-test"; + //String Path = "/Users/jjwang/Desktop/FEAP-test"; + + @DataProvider(name = "rebalanceStrategies") + public static String[][] rebalanceStrategies() { + return new String[][] { + //{AutoRebalanceStrategy.class.getName()}, + { CrushRebalanceStrategy.class.getName() }, + //{ MultiRoundCrushRebalanceStrategy.class.getName() }, + //{ CrushEdRebalanceStrategy.class.getName() } + }; + } + + String[] fabrics = { "lor1", "lva1", "ltx1", "lsg1", + }; + String[] clusters = { "ESPRESSO_IDENTITY", "ESPRESSO_MT-MD-1", "ESPRESSO_TSCP", "ESPRESSO_MT_PHASE1", + "ESPRESSO_MT-MD-3", "ESPRESSO_USCP", "ESPRESSO_MT-LEGACY", /* "venice-0" */ + }; + String topState = "master"; + float[] nodeAdjustSimulator = + { /*-0.5f, -0.2f, -0.1f, -0.01f, */ 0.01f, 0.1f, 0.2f, 0.5f, 1f}; + + @Test(dataProvider = "rebalanceStrategies") + public void testNodeChange(String rebalanceStrategyClass) throws Exception { + for (String cluster : clusters) { + System.out.println(cluster + + "\tChangeType\tNumOfNodeChange\tDiffRate\tTotalMv\tTotalMvRate\tExtraMvRate\tExtraMvRateComparedWithAvgDist\tTopStateChange\tTopStateChangeRate\tTopStateChangeWithNewDeployRate\tExtraTopStateChangeRate"); + for (String fabric : fabrics) { + String path = Path + "/" + cluster + "/" + fabric; + if (new File(path).exists()) { + System.out.print(fabric); + for (float adjustRate : nodeAdjustSimulator) { + Set<String> deltaNode = new HashSet<>(); + List<String> liveInstances = new ArrayList<>(); + Map<String, Map<String, String>> resultA = + calculate(path, rebalanceStrategyClass, adjustRate, deltaNode, liveInstances); + double[] distEval = checkEvenness(liveInstances, resultA, false); + if (adjustRate != 0) { + Map<String, Map<String, String>> result = + calculate(path, rebalanceStrategyClass, 0, deltaNode, new ArrayList<String>()); + double[] diff = checkMovement(result, resultA, deltaNode, false); + System.out.println( + "\t" + (adjustRate > 0 ? "Adding\t" : "Disabling\t") + diff[0] + "\t" + + distEval[3] + "\t" + diff[1] + "\t" + diff[2] + "\t" + diff[3] + "\t" + + diff[8] + "\t" + diff[4] + "\t" + diff[5] + "\t" + diff[10] + "\t" + + diff[6]); + } + } + } + } + System.out.println(); + } + } + + @Test(dataProvider = "rebalanceStrategies") + public void testDist(String rebalanceStrategyClass) throws Exception { + for (String cluster : clusters) { + System.out.println(cluster + + "\tTotalReplica\tMinReplica\tMaxReplica\tDiffRate\tSTDEV\tMinTopState\tMaxTopState\ttopStateDiffRate\ttopStateSTDEV"); + for (String fabric : fabrics) { + String path = Path + "/" + cluster + "/" + fabric; + if (new File(path).exists()) { + Set<String> deltaNode = new HashSet<>(); + List<String> liveInstances = new ArrayList<>(); + Map<String, Map<String, String>> result = + calculate(path, rebalanceStrategyClass, 0, deltaNode, liveInstances); + double[] distEval = checkEvenness(liveInstances, result, false); + System.out.println( + fabric + "\t" + distEval[0] + "\t" + distEval[1] + "\t" + distEval[2] + "\t" + + distEval[3] + "\t" + distEval[4] + "\t" + distEval[5] + "\t" + distEval[6] + + "\t" + distEval[7] + "\t" + distEval[8]); + } + } + System.out.println(); + } + } + + int _replica = 1; + int partitionCount = 101; + int faultZone = 10; + int[] resourceCounts = new int[] { 100 }; + int[] nodeCounts = new int[] { 100, /*100, 200, 500, 1000*/ }; + + @Test(dataProvider = "rebalanceStrategies") + public void testDistUsingRandomTopo(String rebalanceStrategyClass) throws Exception { + for (int nodeCount : nodeCounts) { + for (int resourceCount : resourceCounts) { + System.out.println( + "NodeCount\tResourceCount\tTotalReplica\tMinReplica\tMaxReplica\tDiffRate\tSTDEV\tMinTopState\tMaxTopState\tTopStateDiffRate\tTopStateSTDEV"); + List<String> liveInstances = new ArrayList<>(); + Map<String, Map<String, String>> result = + calculateUsingRandomTopo(rebalanceStrategyClass, _replica, partitionCount, + resourceCount, nodeCount, faultZone, liveInstances); + double[] distEval = checkEvenness(liveInstances, result, false); + System.out.println( + nodeCount + "\t" + resourceCount + "\t" + distEval[0] + "\t" + distEval[1] + "\t" + + distEval[2] + "\t" + distEval[3] + "\t" + distEval[4] + "\t" + distEval[5] + "\t" + + distEval[6] + "\t" + distEval[7] + "\t" + distEval[8]); + } + } + + System.out.println(); + } + + @Test(dataProvider = "rebalanceStrategies") + public void testRollingUpgrade(String rebalanceStrategyClass) throws Exception { + for (String cluster : clusters) { + System.out.println(cluster + + "\tTotalMv\tTotalMvRate\tExtraMvRate\tExtraMvRateComparedWithAvgDist\tTopStateChange\tTopStateChangeRate\tTopStateChangeWithNewDeployRate\tExtraTopStateChange"); + for (String fabric : fabrics) { + String path = Path + "/" + cluster + "/" + fabric; + if (new File(path).exists()) { + + List<List<String>> deltaNodesHistory = new ArrayList<>(); + List<List<String>> liveInstancesHistory = new ArrayList<>(); + + List<Map<String, Map<String, String>>> mappingHistory = + calculateRollingUpgrade(path, rebalanceStrategyClass, deltaNodesHistory, + liveInstancesHistory, true); + + Map<String, Map<String, String>> basicMapping = + calculate(path, rebalanceStrategyClass, 0, new HashSet<String>(), + new ArrayList<String>()); + + double[] maxDiff = new double[8]; + for (int i = 0; i < mappingHistory.size(); i++) { + List<String> deltaNode = deltaNodesHistory.get(i); + Map<String, Map<String, String>> mapA = mappingHistory.get(i); + + Map<String, Map<String, String>> mapB = basicMapping; + if (i != 0) { + deltaNode.addAll(deltaNodesHistory.get(i - 1)); + mapB = mappingHistory.get(i - 1); + } + double[] diff = checkMovement(mapB, mapA, deltaNode, false); + + maxDiff[0] = Math.max(diff[1], maxDiff[0]); + maxDiff[1] = Math.max(diff[2], maxDiff[1]); + maxDiff[2] = Math.max(diff[3], maxDiff[2]); + maxDiff[3] = Math.max(diff[4], maxDiff[3]); + maxDiff[4] = Math.max(diff[5], maxDiff[4]); + maxDiff[5] = Math.max(diff[6], maxDiff[5]); + maxDiff[6] = Math.max(diff[8], maxDiff[6]); + maxDiff[7] = Math.max(diff[10], maxDiff[7]); + } + System.out.println( + fabric + "\t" + maxDiff[0] + "\t" + maxDiff[1] + "\t" + maxDiff[2] + "\t" + maxDiff[6] + + "\t" + maxDiff[3] + "\t" + maxDiff[4] + "\t" + maxDiff[7] + "\t" + maxDiff[5]); + } + } + System.out.println(); + } + } + + public List<Map<String, Map<String, String>>> calculateRollingUpgrade(String Path, + String rebalanceStrategyClass, List<List<String>> deltaNodesHistory, + List<List<String>> liveInstancesHistory, boolean recoverNode) throws Exception { + instanceFolderPath = Path + "/instanceConfigs/"; + instanceList = Path + "/instance"; + idealStateFolderPath = Path + "/idealStates/"; + idealStateList = Path + "/idealstate"; + Path path = Paths.get(Path + "/clusterConfig"); + ZNRecord record = (ZNRecord) new ZNRecordSerializer().deserialize(Files.readAllBytes(path)); + ClusterConfig clusterConfig = new ClusterConfig(record); + List<String> allNodes = new ArrayList<>(); + List<InstanceConfig> instanceConfigs = + getInstanceConfigs(instanceFolderPath, instanceList, allNodes); + List<IdealState> idealStates = getIdealStates(idealStateFolderPath, idealStateList); + + List<String> deltaNodes = new ArrayList<>(); + + List<Map<String, Map<String, String>>> totalMapHistory = new ArrayList<>(); + for (String downNode : allNodes) { + deltaNodes.add(downNode); + + List<String> liveInstances = new ArrayList<>(allNodes); + liveInstances.removeAll(deltaNodes); + Map<String, Map<String, String>> totalMaps = new HashMap<>(); + + totalMapHistory.add(totalMaps); + liveInstancesHistory.add(liveInstances); + deltaNodesHistory.add(new ArrayList<>(deltaNodes)); + + Map<String, Integer> partitions = new HashMap<>(); + for (int i = 0; i < idealStates.size(); i++) { + Map<String, Map<String, String>> maps = HelixUtil + .getIdealAssignmentForFullAuto(clusterConfig, instanceConfigs, liveInstances, + idealStates.get(i), new ArrayList<>(idealStates.get(i).getPartitionSet()), + rebalanceStrategyClass); + for (String partitionName : idealStates.get(i).getPartitionSet()) { + partitions.put(partitionName, idealStates.get(i).getReplicaCount(liveInstances.size())); + } + totalMaps.putAll(maps); + } + Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); + for (InstanceConfig config : instanceConfigs) { + instanceConfigMap.put(config.getInstanceName(), config); + } + verifyDistribution(totalMaps, liveInstances, partitions, + new Topology(new ArrayList<>(instanceConfigMap.keySet()), liveInstances, + instanceConfigMap, clusterConfig)); + if (recoverNode) { + deltaNodes.remove(downNode); + } + } + return totalMapHistory; + } + + public Map<String, Map<String, String>> calculateUsingRandomTopo(String rebalanceStrategyClass, + int replica, int partitionCount, int nodeCount, int resourceCount, int faultZone, + List<String> liveInstances) throws Exception { + String[] className = rebalanceStrategyClass.split("\\."); + String PARTICIPANT_PREFIX = + className[className.length - 1] + "_node_" + nodeCount + resourceCount; + String RESOURCE_PREFIX = + className[className.length - 1] + "_resource_" + nodeCount + resourceCount; + String CLUSTER_NAME = + className[className.length - 1] + nodeCount + resourceCount + "TestingCluster"; + + ClusterConfig clusterConfig = new ClusterConfig(CLUSTER_NAME); + clusterConfig.setTopologyAwareEnabled(true); + clusterConfig.setFaultZoneType("zone"); + clusterConfig.setTopology("/zone/rack/instance"); + + List<InstanceConfig> newInstanceConfigs = new ArrayList<>(); + Random rand = new Random(); + for (int i = 0; i < nodeCount; i++) { + String nodeName = PARTICIPANT_PREFIX + Math.abs(rand.nextInt()) + "_" + i; + String zone = "zone-" + i % faultZone; + InstanceConfig newConfig = new InstanceConfig(nodeName); + liveInstances.add(nodeName); + newConfig.setInstanceEnabled(true); + newConfig.setHostName(nodeName); + newConfig.setPort(new Integer(i).toString()); + newConfig.setDomain(String + .format("cluster=%s,zone=%s,rack=myRack,instance=%s", CLUSTER_NAME, zone, nodeName)); + newConfig.setWeight(1000); + newConfig.setDelayRebalanceEnabled(false); + newConfig.setMaxConcurrentTask(1000); + newInstanceConfigs.add(newConfig); + } + + Map<String, Map<String, String>> totalMaps = new HashMap<>(); + Map<String, Integer> partitions = new HashMap<>(); + List<IdealState> idealStates = new ArrayList<>(); + + for (int i = 0; i < resourceCount; i++) { + String resourceName = RESOURCE_PREFIX + "_" + i; + IdealState idealState = new IdealState(resourceName); + idealState.setStateModelDefRef(BuiltInStateModelDefinitions.MasterSlave.name()); + idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO); + idealState.setReplicas(new Integer(replica).toString()); + idealState.setNumPartitions(partitionCount); + idealState.setRebalancerClassName(rebalanceStrategyClass); + for (int p = 0; p < partitionCount; p++) { + String partitionName = resourceName + "_" + p; + idealState.setPreferenceList(partitionName, new ArrayList<String>()); + } + idealStates.add(idealState); + } + + long duration = 0; + for (IdealState idealState : idealStates) { + long startTime = System.currentTimeMillis(); + Map<String, Map<String, String>> maps = HelixUtil + .getIdealAssignmentForFullAuto(clusterConfig, newInstanceConfigs, liveInstances, + idealState, new ArrayList<>(idealState.getPartitionSet()), rebalanceStrategyClass); + duration += System.currentTimeMillis() - startTime; + + for (String partitionName : idealState.getPartitionSet()) { + partitions.put(partitionName, idealState.getReplicaCount(liveInstances.size())); + } + totalMaps.putAll(maps); + } + + //System.out.println("Total running time:\t" + duration); + + Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); + for (InstanceConfig config : newInstanceConfigs) { + instanceConfigMap.put(config.getInstanceName(), config); + } + verifyDistribution(totalMaps, liveInstances, partitions, + new Topology(new ArrayList<>(instanceConfigMap.keySet()), liveInstances, instanceConfigMap, + clusterConfig)); + return totalMaps; + } + + public Map<String, Map<String, String>> calculate(String Path, String rebalanceStrategyClass, + float instanceAdjustRate, Set<String> deltaNode, List<String> liveInstances) + throws Exception { + instanceFolderPath = Path + "/instanceConfigs/"; + instanceList = Path + "/instance"; + idealStateFolderPath = Path + "/idealStates/"; + idealStateList = Path + "/idealstate"; + Path path = Paths.get(Path + "/clusterConfig"); + ZNRecord record = (ZNRecord) new ZNRecordSerializer().deserialize(Files.readAllBytes(path)); + ClusterConfig clusterConfig = new ClusterConfig(record); + List<InstanceConfig> instanceConfigs = + getInstanceConfigs(instanceFolderPath, instanceList, liveInstances); + + int adjustNodeCount = (int) (instanceAdjustRate > 0 ? + Math.ceil(instanceAdjustRate * liveInstances.size()) : + Math.floor(instanceAdjustRate * liveInstances.size())); + + if (adjustNodeCount > 0) { + for (int i = 0; i < adjustNodeCount; i++) { + int cloneIndex = i % (liveInstances.size() - 1); + String nodeName = instanceConfigs.get(cloneIndex).getInstanceName() + "_random" + i; + liveInstances.add(nodeName); + InstanceConfig cloneConfig = new InstanceConfig(nodeName); + cloneConfig.setHostName(nodeName); + cloneConfig.setInstanceEnabled(true); + cloneConfig.setPort(instanceConfigs.get(cloneIndex).getPort()); + cloneConfig.setDomain(instanceConfigs.get(cloneIndex).getDomain() + "_random" + i); + if (instanceConfigs.get(cloneIndex).getWeight() > 0) { + cloneConfig.setWeight(instanceConfigs.get(cloneIndex).getWeight()); + } + cloneConfig + .setDelayRebalanceEnabled(instanceConfigs.get(cloneIndex).isDelayRebalanceEnabled()); + if (instanceConfigs.get(cloneIndex).getMaxConcurrentTask() > 0) { + cloneConfig.setMaxConcurrentTask(instanceConfigs.get(cloneIndex).getMaxConcurrentTask()); + } + instanceConfigs.add(cloneConfig); + deltaNode.add(nodeName); + } + } else { + if (adjustNodeCount > liveInstances.size()) { + throw new Exception("All nodes are removed, no assignment possible."); + } + for (int i = 0; i < Math.abs(adjustNodeCount); i++) { + String nodeName = liveInstances.remove(i); + deltaNode.add(nodeName); + } + } + + List<IdealState> idealStates = getIdealStates(idealStateFolderPath, idealStateList); + Map<String, Map<String, String>> totalMaps = new HashMap<>(); + Map<String, Integer> partitions = new HashMap<>(); + + long duration = 0; + for (int i = 0; i < idealStates.size(); i++) { + long startTime = System.currentTimeMillis(); + int partitionCount = idealStates.get(i).getNumPartitions(); + List<String> partitionList = + new ArrayList<>(idealStates.get(i).getPartitionSet()).subList(0, partitionCount); + Map<String, Map<String, String>> maps = HelixUtil + .getIdealAssignmentForFullAuto(clusterConfig, instanceConfigs, liveInstances, + idealStates.get(i), partitionList, rebalanceStrategyClass); + for (String partitionName : partitionList) { + partitions.put(partitionName, idealStates.get(i).getReplicaCount(liveInstances.size())); + } + duration += System.currentTimeMillis() - startTime; + + // print resource details +/* Map<String, Set<String>> nodeMapping = convertMapping(maps); + String partitionCountsStr = idealStates.get(i).getResourceName(); + List<String> sortedInstances = new ArrayList<>(liveInstances); + Collections.sort(sortedInstances); + for (String node : sortedInstances) { + partitionCountsStr += "\t" + (nodeMapping.containsKey(node) ? nodeMapping.get(node).size() : 0); + } + System.out.println(partitionCountsStr);*/ + + totalMaps.putAll(maps); + } + //System.out.println("Takes " + duration + "ms"); + Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); + for (InstanceConfig config : instanceConfigs) { + instanceConfigMap.put(config.getInstanceName(), config); + } + verifyDistribution(totalMaps, liveInstances, partitions, + new Topology(new ArrayList<>(instanceConfigMap.keySet()), liveInstances, instanceConfigMap, + clusterConfig)); + return totalMaps; + } + + private void verifyDistribution(Map<String, Map<String, String>> map, List<String> liveInstances, + Map<String, Integer> partitionExp, Topology topology) throws Exception { + Map<String, Set<String>> faultZonePartition = new HashMap<>(); + Map<String, String> instanceFaultZone = new HashMap<>(); + for (Node node : topology.getFaultZones()) { + faultZonePartition.put(node.getName(), new HashSet<String>()); + for (Node instance : Topology.getAllLeafNodes(node)) { + instanceFaultZone.put(instance.getName(), node.getName()); + } + } + for (String partition : map.keySet()) { + // no partition missing, no partition duplicate + if (!partitionExp.containsKey(partition) || map.get(partition).size() != partitionExp + .get(partition)) { + throw new Exception("partition replica in mapping is not as expected"); + } + partitionExp.remove(partition); + // no partition on non-live node + for (String instance : map.get(partition).keySet()) { + if (!liveInstances.contains(instance)) { + throw new Exception("assignment is not on a live node!"); + } + // no fault zone conflict + String faultZone = instanceFaultZone.get(instance); + if (faultZonePartition.get(faultZone).contains(partition)) { + throw new Exception("faultzone conflict!"); + } + faultZonePartition.get(faultZone).add(partition); + } + } + if (!partitionExp.isEmpty()) { + throw new Exception("partition is not assigned"); + } + } + + private double[] checkEvenness(List<String> liveInstances, + Map<String, Map<String, String>> totalMaps, boolean verbose) { + StringBuilder output = new StringBuilder(); + Map<String, List<String>> detailMap = new HashMap<>(); + Map<String, Integer> distributionMap = new TreeMap<>(); + Map<String, Integer> topStateDistributionMap = new HashMap<>(); + for (String instance : liveInstances) { + distributionMap.put(instance, 0); + topStateDistributionMap.put(instance, 0); + detailMap.put(instance, new ArrayList<String>()); + } + + for (String partition : totalMaps.keySet()) { + Map<String, String> instanceMap = totalMaps.get(partition); + for (String instance : instanceMap.keySet()) { + detailMap.get(instance).add(partition + "-" + totalMaps.get(partition).get(instance)); + distributionMap.put(instance, distributionMap.get(instance) + 1); + if (instanceMap.get(instance).equalsIgnoreCase(topState)) { + topStateDistributionMap.put(instance, topStateDistributionMap.get(instance) + 1); + } + } + } + + int totalReplicas = 0; + int minR = Integer.MAX_VALUE; + int maxR = 0; + int mminR = Integer.MAX_VALUE; + int mmaxR = 0; + for (String instance : distributionMap.keySet()) { + output.append(instance + "\t" + distributionMap.get(instance) + "\tpartitions\t" + + topStateDistributionMap.get(instance) + "\ttopStates\n"); + //output.append(instance + "\t:\t" + distributionMap.get(instance) + "\tpartitions\t" + topStateDistributionMap.get(instance) + "\ttopStates\t" + detailMap.get(instance) + "\n"); + totalReplicas += distributionMap.get(instance); + minR = Math.min(minR, distributionMap.get(instance)); + maxR = Math.max(maxR, distributionMap.get(instance)); + } + for (String instance : topStateDistributionMap.keySet()) { + mminR = Math.min(mminR, topStateDistributionMap.get(instance)); + mmaxR = Math.max(mmaxR, topStateDistributionMap.get(instance)); + } + + output.append("Maximum holds " + maxR + " replicas and minimum holds " + minR + + " replicas, differentiation is " + (double) (maxR - minR) / maxR * 100 + "%\n"); + output.append("Maximum holds " + mmaxR + " topStates and minimum holds " + mminR + + " topStates, differentiation is " + (double) (mmaxR - mminR) / mmaxR * 100 + "%\n "); + + if (verbose) { + System.out.println(output.toString()); + } + return new double[] { totalReplicas, minR, maxR, (double) (maxR - minR) / maxR * 100, + STDEV(new ArrayList<>(distributionMap.values())), mminR, mmaxR, + (double) (mmaxR - mminR) / mmaxR * 100, + STDEV(new ArrayList<>(topStateDistributionMap.values())) + }; + } + + private double STDEV(List<Integer> data) { + if (data.isEmpty() || data.size() == 1) { + return 0; + } + double totalDiff = 0; + double average = 0; + for (int num : data) { + average += num; + } + average /= data.size(); + for (int i = 0; i < data.size(); i++) { + totalDiff += Math.pow(data.get(i) - average, 2); + } + return Math.sqrt(totalDiff) / (data.size() - 1); + } + + private static List<InstanceConfig> getInstanceConfigs(String instanceFolderPath, + String instanceList, List<String> liveInstances) throws IOException { + List<InstanceConfig> instanceConfigs = new ArrayList<>(); + for (ZNRecord record : getRecords(instanceFolderPath, instanceList)) { + instanceConfigs.add(new InstanceConfig(record)); + liveInstances.add(record.getId()); + } + return instanceConfigs; + } + + private static List<IdealState> getIdealStates(String idealStateFolderPath, String idealStateList) + throws IOException { + List<IdealState> idealStates = new ArrayList<>(); + for (ZNRecord record : getRecords(idealStateFolderPath, idealStateList)) { + IdealState idealState = new IdealState(record); + try { + BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef()); + } catch (IllegalArgumentException ex) { + idealState.setStateModelDefRef("OnlineOffline"); + } + idealStates.add(idealState); + } + return idealStates; + } + + private static List<ZNRecord> getRecords(String folderPath, String list) throws IOException { + List<ZNRecord> records = new ArrayList<>(); + List<String> names = new ArrayList<>(); + try (BufferedReader br = new BufferedReader(new FileReader(list))) { + String sCurrentLine = br.readLine(); + names.addAll(Arrays.asList(sCurrentLine.split(" "))); + } catch (IOException e) { + e.printStackTrace(); + } + for (String name : names) { + Path path = Paths.get(folderPath + name); + ZNRecord record = (ZNRecord) new ZNRecordSerializer().deserialize(Files.readAllBytes(path)); + records.add(record); + } + return records; + } + + private Map<String, Map<String, Integer>> getStateCount(Map<String, Map<String, String>> map) { + Map<String, Map<String, Integer>> mapStateCount = new HashMap<>(); + for (String partition : map.keySet()) { + Map<String, Integer> stateCount = new HashMap<>(); + mapStateCount.put(partition, stateCount); + for (String node : map.get(partition).keySet()) { + String state = map.get(partition).get(node); + if (!stateCount.containsKey(state)) { + stateCount.put(state, 1); + } else { + stateCount.put(state, stateCount.get(state) + 1); + } + } + } + return mapStateCount; + } + + private void verifyMaps(Map<String, Map<String, String>> map, + Map<String, Map<String, String>> newMap) throws Exception { + // check no partition lose + Map<String, Map<String, Integer>> mapStateCount = getStateCount(map); + Map<String, Map<String, Integer>> newMapStateCount = getStateCount(newMap); + for (String partition : mapStateCount.keySet()) { + if (!newMapStateCount.containsKey(partition)) { + throw new Exception("mapping does not match"); + } + for (String state : mapStateCount.get(partition).keySet()) { + if (!newMapStateCount.get(partition).containsKey(state) + || mapStateCount.get(partition).get(state) != newMapStateCount.get(partition) + .get(state)) { + throw new Exception("state does not match"); + } + } + for (String state : newMapStateCount.get(partition).keySet()) { + if (!mapStateCount.get(partition).containsKey(state)) { + throw new Exception("state does not match"); + } + } + } + for (String partition : newMapStateCount.keySet()) { + if (!mapStateCount.containsKey(partition)) { + throw new Exception("mapping does not match"); + } + } + } + + private double[] checkMovement(Map<String, Map<String, String>> map, + Map<String, Map<String, String>> newMap, Collection<String> deltaNodes, boolean verbose) + throws Exception { + verifyMaps(map, newMap); + int totalChange = 0; + int totalTopStateChange = 0; + int totalTopStateChangeWithNewDeployment = 0; + int totalPartition = 0; + int totalTopState = 0; + + for (String partition : map.keySet()) { + Map<String, String> origStates = map.get(partition); + Map<String, String> newStates = newMap.get(partition); + String topStateNode = "", newtopStateNode = ""; + for (String node : origStates.keySet()) { + if (origStates.get(node).equalsIgnoreCase(topState)) { + topStateNode = node; + } + } + for (String node : newStates.keySet()) { + if (newStates.get(node).equalsIgnoreCase(topState)) { + newtopStateNode = node; + totalTopState++; + } + } + if (!topStateNode.equalsIgnoreCase(newtopStateNode)) { + totalTopStateChange++; + if (!origStates.containsKey(newtopStateNode)) { + totalTopStateChangeWithNewDeployment++; + } + } + } + + Map<String, Set<String>> list = convertMapping(map); + Map<String, Set<String>> newList = convertMapping(newMap); + + Map<String, Integer> addition = new HashMap<>(); + Map<String, Integer> subtraction = new HashMap<>(); + for (String instance : newList.keySet()) { + Set<String> oldPartitions = list.get(instance); + Set<String> newPartitions = newList.get(instance); + totalPartition += newPartitions.size(); + if (oldPartitions == null) { + addition.put(instance, newPartitions.size()); + } else { + Set<String> commonPartitions = new HashSet<>(newPartitions); + commonPartitions.retainAll(oldPartitions); + + newPartitions.removeAll(commonPartitions); + + addition.put(instance, newPartitions.size()); + + oldPartitions.removeAll(commonPartitions); + subtraction.put(instance, oldPartitions.size()); + } + totalChange += newPartitions.size(); + //System.out.println("Changed partition on node: \t" + instance + "\t: \t" + newPartitions.toString()); + } + /* + List<String> instances = new ArrayList<>(newList.keySet()); + Collections.sort(instances); + System.out.println("Addition partition count: "); + for (String instance : instances) { + System.out.println(addition.containsKey(instance) ? addition.get(instance) : 0); + } + + System.out.println("Subtraction partition count: "); + for (String instance : instances) { + System.out.println(subtraction.containsKey(instance) ? subtraction.get(instance) : 0); + } + */ + + int nodeChanged = 0; + int necessaryChange = 0; + int necessarytopStateChange = 0; + for (String instance : deltaNodes) { + nodeChanged++; + if (list.containsKey(instance)) { + necessaryChange += list.get(instance).size(); + for (Map<String, String> nodeState : map.values()) { + if (nodeState.containsKey(instance)) { + if (nodeState.get(instance).equalsIgnoreCase(topState)) { + necessarytopStateChange++; + } + } + } + } + if (newList.containsKey(instance)) { + necessaryChange += newList.get(instance).size(); + for (Map<String, String> nodeState : newMap.values()) { + if (nodeState.containsKey(instance)) { + if (nodeState.get(instance).equalsIgnoreCase(topState)) { + necessarytopStateChange++; + } + } + } + } + } + + if (verbose) { + System.out.println( + "\t\t\t" + "Total partition change count: \t" + totalChange + "\t/\t" + totalPartition + + "\t, rate: \t" + (((float) totalChange) / totalPartition * 100) + "%\t" + + "Diff nodes have partition \t" + necessaryChange + "\t, unnecessary change rate: \t" + + (((float) totalChange - necessaryChange) / totalPartition * 100) + + "%\t, which is \t" + (((float) totalChange - necessaryChange) / totalChange * 100) + + "%\t of the movement."); + } + + double expectedAverageMv = + (double) totalPartition / Math.max(list.size(), newList.size()) * deltaNodes.size(); + + return new double[] { nodeChanged, totalChange, (((double) totalChange) / totalPartition * 100), + (((double) totalChange - necessaryChange) / totalPartition * 100), totalTopStateChange, + (((double) totalTopStateChange) / totalTopState * 100), + (((double) totalTopStateChange - necessarytopStateChange) / totalTopState * 100), + expectedAverageMv, (((double) totalChange - expectedAverageMv) / totalPartition * 100), + totalTopStateChangeWithNewDeployment, + (((double) totalTopStateChangeWithNewDeployment) / totalTopState * 100) + }; + } + + private Map<String, Set<String>> convertMapping(Map<String, Map<String, String>> map) { + Map<String, Set<String>> list = new HashMap<>(); + for (String partition : map.keySet()) { + for (String instance : map.get(partition).keySet()) { + if (!list.containsKey(instance)) { + list.put(instance, new HashSet<String>()); + } + list.get(instance).add(partition); + } + } + return list; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java index d886e44..08608f3 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java @@ -26,8 +26,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; -import org.apache.helix.controller.rebalancer.strategy.MultiRoundCrushRebalanceStrategy; + +import org.apache.helix.controller.rebalancer.strategy.*; import org.apache.helix.integration.common.ZkIntegrationTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; @@ -111,7 +111,8 @@ public class TestCrushAutoRebalance extends ZkIntegrationTestBase { @DataProvider(name = "rebalanceStrategies") public static Object [][] rebalanceStrategies() { return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}, - {"MultiRoundCrushRebalanceStrategy", MultiRoundCrushRebalanceStrategy.class.getName()} + {"MultiRoundCrushRebalanceStrategy", MultiRoundCrushRebalanceStrategy.class.getName()}, + {"CrushEdRebalanceStrategy", CrushEdRebalanceStrategy.class.getName()} }; } http://git-wip-us.apache.org/repos/asf/helix/blob/9dfb098e/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java index c11ee87..85059b6 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalanceNonRack.java @@ -19,33 +19,25 @@ package org.apache.helix.integration.rebalancer.CrushRebalancers; * under the License. */ -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.helix.ConfigAccessor; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy; import org.apache.helix.integration.common.ZkStandAloneCMTestBase; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.model.BuiltInStateModelDefinitions; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.IdealState; +import org.apache.helix.model.*; import org.apache.helix.model.IdealState.RebalanceMode; -import org.apache.helix.model.InstanceConfig; import org.apache.helix.tools.ClusterSetup; -import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.util.*; + public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase { final int NUM_NODE = 6; protected static final int START_PORT = 12918; @@ -59,7 +51,7 @@ public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase { List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>(); Map<String, String> _nodeToTagMap = new HashMap<String, String>(); List<String> _nodes = new ArrayList<String>(); - Set<String> _allDBs = new HashSet<String>(); + Set<String> _allDBs = new HashSet<>(); int _replica = 3; private static String[] _testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(), @@ -91,7 +83,8 @@ public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase { String tag = "tag-" + i % 2; _setupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME, storageNodeName, tag); _nodeToTagMap.put(storageNodeName, tag); - InstanceConfig instanceConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName); + InstanceConfig instanceConfig = + configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName); instanceConfig.setDomain("instance=" + storageNodeName); configAccessor.setInstanceConfig(CLUSTER_NAME, storageNodeName, instanceConfig); } @@ -112,13 +105,15 @@ public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase { //enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true); } - @DataProvider(name = "rebalanceStrategies") public static String[][] rebalanceStrategies() { - return new String[][] { { "CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName() } }; + @DataProvider(name = "rebalanceStrategies") + public static String[][] rebalanceStrategies() { + return new String[][] { { "CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName() }, + {"CrushEdRebalanceStrategy", CrushEdRebalanceStrategy.class.getName()} + }; } @Test(dataProvider = "rebalanceStrategies", enabled = true) - public void test(String rebalanceStrategyName, String rebalanceStrategyClass) - throws Exception { + public void test(String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception { System.out.println("Test " + rebalanceStrategyName); int i = 0; for (String stateModel : _testModels) { @@ -131,10 +126,9 @@ public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase { Thread.sleep(300); HelixClusterVerifier _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) .setResources(_allDBs).build(); Assert.assertTrue(_clusterVerifier.verify(5000)); - for (String db : _allDBs) { IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); ExternalView ev = @@ -161,8 +155,8 @@ public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase { } Thread.sleep(300); - HelixClusterVerifier _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + HelixClusterVerifier _clusterVerifier = + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) .setResources(_allDBs).build(); Assert.assertTrue(_clusterVerifier.verify(5000)); for (String db : _allDBs) { @@ -173,11 +167,12 @@ public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase { } } - @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods = { "test", - "testWithInstanceTag"}) + @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods = { + "testWithInstanceTag" + }) public void testLackEnoughLiveInstances(String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception { - System.out.println("TestLackEnoughInstances " + rebalanceStrategyName); + System.out.println("TestLackEnoughLiveInstances " + rebalanceStrategyName); enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); // shutdown participants, keep only two left @@ -185,9 +180,9 @@ public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase { _participants.get(i).syncStop(); } - int i = 0; + int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; + String db = "Test-DB-" + rebalanceStrategyName + "-" + j++; _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); @@ -196,22 +191,26 @@ public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase { Thread.sleep(300); HelixClusterVerifier _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) .setResources(_allDBs).build(); Assert.assertTrue(_clusterVerifier.verify(5000)); - for (String db : _allDBs) { IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); validateIsolation(is, ev, 2); } + + for (int i = 2; i < _participants.size(); i++) { + _participants.get(i).syncStart(); + } } - @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods = { "test", - "testWithInstanceTag"}) - public void testLackEnoughInstances(String rebalanceStrategyName, - String rebalanceStrategyClass) throws Exception { + @Test(dataProvider = "rebalanceStrategies", enabled = true, dependsOnMethods = { + "testLackEnoughLiveInstances" + }) + public void testLackEnoughInstances(String rebalanceStrategyName, String rebalanceStrategyClass) + throws Exception { System.out.println("TestLackEnoughInstances " + rebalanceStrategyName); enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); @@ -225,27 +224,42 @@ public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase { _setupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName()); } - int i = 0; + int j = 0; for (String stateModel : _testModels) { - String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; + String db = "Test-DB-" + rebalanceStrategyName + "-" + j++; _setupTool.addResourceToCluster(CLUSTER_NAME, db, _PARTITIONS, stateModel, RebalanceMode.FULL_AUTO + "", rebalanceStrategyClass); _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica); _allDBs.add(db); } Thread.sleep(300); - HelixClusterVerifier _clusterVerifier = - new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) .setResources(_allDBs).build(); Assert.assertTrue(_clusterVerifier.verify()); - for (String db : _allDBs) { IdealState is = _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); ExternalView ev = _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db); validateIsolation(is, ev, 2); } + + // recover test environment + ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient); + for (int i = 2; i < _participants.size(); i++) { + String storageNodeName = _participants.get(i).getInstanceName(); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + + InstanceConfig instanceConfig = + configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName); + instanceConfig.setDomain("instance=" + storageNodeName); + configAccessor.setInstanceConfig(CLUSTER_NAME, storageNodeName, instanceConfig); + + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName); + participant.syncStart(); + _participants.set(i, participant); + } } /** @@ -267,7 +281,8 @@ public class TestCrushAutoRebalanceNonRack extends ZkStandAloneCMTestBase { } } - @AfterMethod public void afterMethod() throws Exception { + @AfterMethod + public void afterMethod() throws Exception { for (String db : _allDBs) { _setupTool.dropResourceFromCluster(CLUSTER_NAME, db); }
