This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch helix-stickiness-rebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit 46962336ae740932d06865041d72ddcc6e10778b Author: frankmu <muteng...@gmail.com> AuthorDate: Fri Aug 9 12:13:41 2024 -0700 Helix stickiness rebalancer (#2878) Create sticky assignment rebalance strategy --- .../helix/controller/common/CapacityNode.java | 14 ++ .../ResourceControllerDataProvider.java | 8 +- .../rebalancer/ConditionBasedRebalancer.java | 18 +- .../strategy/GreedyRebalanceStrategy.java | 103 ---------- .../strategy/StickyRebalanceStrategy.java | 202 +++++++++++++++++++ .../java/org/apache/helix/model/ClusterConfig.java | 2 +- .../rebalancer/TestGreedyRebalanceStrategy.java | 85 -------- .../rebalancer/TestStickyRebalanceStrategy.java | 214 +++++++++++++++++++++ ...alanceWithGlobalPerInstancePartitionLimit.java} | 25 ++- 9 files changed, 467 insertions(+), 204 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java index fa5068e13..208ee7913 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java @@ -52,11 +52,25 @@ public class CapacityNode { && _partitionMap.get(resource).contains(partition))) { return false; } + + // Add the partition to the resource's set of partitions in this node _partitionMap.computeIfAbsent(resource, k -> new HashSet<>()).add(partition); _currentlyAssigned++; return true; } + /** + * Checks if a specific resource + partition is assigned to this node. + * + * @param resource the name of the resource + * @param partition the partition + * @return {@code true} if the resource + partition is assigned to this node, {@code false} otherwise + */ + public boolean hasPartition(String resource, String partition) { + Set<String> partitions = _partitionMap.get(resource); + return partitions != null && partitions.contains(partition); + } + /** * Set the capacity of this node * @param capacity The capacity to set diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java index cdfcb0f24..74b5aa8b2 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java @@ -41,7 +41,7 @@ import org.apache.helix.constants.InstanceConstants; import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.common.CapacityNode; import org.apache.helix.controller.pipeline.Pipeline; -import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy; import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity; import org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider; import org.apache.helix.controller.stages.MissingTopStateRecord; @@ -191,11 +191,11 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider { // Remove all cached IdealState because it is a global computation cannot partially be // performed for some resources. The computation is simple as well not taking too much resource // to recompute the assignments. - Set<String> cachedGreedyIdealStates = _idealMappingCache.values().stream().filter( + Set<String> cachedStickyIdealStates = _idealMappingCache.values().stream().filter( record -> record.getSimpleField(IdealState.IdealStateProperty.REBALANCE_STRATEGY.name()) - .equals(GreedyRebalanceStrategy.class.getName())).map(ZNRecord::getId) + .equals(StickyRebalanceStrategy.class.getName())).map(ZNRecord::getId) .collect(Collectors.toSet()); - _idealMappingCache.keySet().removeAll(cachedGreedyIdealStates); + _idealMappingCache.keySet().removeAll(cachedStickyIdealStates); } LogUtil.logInfo(logger, getClusterEventId(), String.format( diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java index 158699a97..84762ea51 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java @@ -20,12 +20,12 @@ package org.apache.helix.controller.rebalancer; */ import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.helix.HelixException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; @@ -98,8 +98,8 @@ public class ConditionBasedRebalancer extends AbstractRebalancer<ResourceControl LinkedHashMap<String, Integer> stateCountMap = stateModelDef.getStateCountMap(assignableLiveInstance.size(), replicas); - List<String> assignableLiveNodes = new ArrayList<>(assignableLiveInstance.keySet()); - List<String> assignableNodes = new ArrayList<>(clusterData.getAssignableInstances()); + Set<String> assignableLiveNodes = new HashSet<>(assignableLiveInstance.keySet()); + Set<String> assignableNodes = new HashSet<>(clusterData.getAssignableInstances()); assignableNodes.removeAll(clusterData.getDisabledInstances()); assignableLiveNodes.retainAll(assignableNodes); @@ -135,20 +135,22 @@ public class ConditionBasedRebalancer extends AbstractRebalancer<ResourceControl LOG.warn("Resource " + resourceName + " has tag " + currentIdealState.getInstanceGroupTag() + " but no live participants have this tag"); } - assignableNodes = new ArrayList<>(taggedNodes); - assignableLiveNodes = new ArrayList<>(taggedLiveNodes); + assignableNodes = new HashSet<>(taggedNodes); + assignableLiveNodes = new HashSet<>(taggedLiveNodes); } // sort node lists to ensure consistent preferred assignments - Collections.sort(assignableNodes); - Collections.sort(assignableLiveNodes); + List<String> assignableNodesList = + assignableNodes.stream().sorted().collect(Collectors.toList()); + List<String> assignableLiveNodesList = + assignableLiveNodes.stream().sorted().collect(Collectors.toList()); int maxPartition = currentIdealState.getMaxPartitionsPerInstance(); _rebalanceStrategy = getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), partitions, resourceName, stateCountMap, maxPartition); ZNRecord newMapping = - _rebalanceStrategy.computePartitionAssignment(assignableNodes, assignableLiveNodes, + _rebalanceStrategy.computePartitionAssignment(assignableNodesList, assignableLiveNodesList, currentMapping, clusterData); if (LOG.isDebugEnabled()) { diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java deleted file mode 100644 index 60580c40a..000000000 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java +++ /dev/null @@ -1,103 +0,0 @@ -package org.apache.helix.controller.rebalancer.strategy; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.helix.controller.common.CapacityNode; -import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class GreedyRebalanceStrategy implements RebalanceStrategy<ResourceControllerDataProvider> { - private static Logger logger = LoggerFactory.getLogger(GreedyRebalanceStrategy.class); - private String _resourceName; - private List<String> _partitions; - private LinkedHashMap<String, Integer> _states; - - public GreedyRebalanceStrategy() { - } - - @Override - public void init(String resourceName, final List<String> partitions, - final LinkedHashMap<String, Integer> states, int maximumPerNode) { - _resourceName = resourceName; - _partitions = partitions; - _states = states; - } - - @Override - public ZNRecord computePartitionAssignment(final List<String> allNodes, final List<String> liveNodes, - final Map<String, Map<String, String>> currentMapping, ResourceControllerDataProvider clusterData) { - int numReplicas = countStateReplicas(); - ZNRecord znRecord = new ZNRecord(_resourceName); - if (liveNodes.size() == 0) { - return znRecord; - } - - if (clusterData.getSimpleCapacitySet() == null) { - logger.warn("No capacity set for resource: " + _resourceName); - return znRecord; - } - - // Sort the assignable nodes by id - List<CapacityNode> assignableNodes = new ArrayList<>(clusterData.getSimpleCapacitySet()); - Collections.sort(assignableNodes, Comparator.comparing(CapacityNode::getId)); - - // Assign partitions to node by order. - for (int i = 0, index = 0; i < _partitions.size(); i++) { - int startIndex = index; - List<String> preferenceList = new ArrayList<>(); - for (int j = 0; j < numReplicas; j++) { - while (index - startIndex < assignableNodes.size()) { - CapacityNode node = assignableNodes.get(index++ % assignableNodes.size()); - if (node.canAdd(_resourceName, _partitions.get(i))) { - preferenceList.add(node.getId()); - break; - } - } - - if (index - startIndex >= assignableNodes.size()) { - // If the all nodes have been tried out, then no node can be assigned. - logger.warn("No enough assignable nodes for resource: " + _resourceName); - } - } - znRecord.setListField(_partitions.get(i), preferenceList); - } - - return znRecord; - } - - private int countStateReplicas() { - int total = 0; - for (Integer count : _states.values()) { - total += count; - } - return total; - } -} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java new file mode 100644 index 000000000..7def42d08 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java @@ -0,0 +1,202 @@ +package org.apache.helix.controller.rebalancer.strategy; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.helix.controller.common.CapacityNode; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StickyRebalanceStrategy implements RebalanceStrategy<ResourceControllerDataProvider> { + private static Logger logger = LoggerFactory.getLogger(StickyRebalanceStrategy.class); + private String _resourceName; + private List<String> _partitions; + private LinkedHashMap<String, Integer> _states; + private int _statesReplicaCount; + + public StickyRebalanceStrategy() { + } + + @Override + public void init(String resourceName, final List<String> partitions, + final LinkedHashMap<String, Integer> states, int maximumPerNode) { + _resourceName = resourceName; + _partitions = partitions; + _states = states; + if (_states != null) { + _statesReplicaCount = _states.values().stream().mapToInt(Integer::intValue).sum(); + } + } + + @Override + public ZNRecord computePartitionAssignment(final List<String> allNodes, + final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping, + ResourceControllerDataProvider clusterData) { + ZNRecord znRecord = new ZNRecord(_resourceName); + if (liveNodes.isEmpty()) { + return znRecord; + } + + if (clusterData.getSimpleCapacitySet() == null) { + logger.warn("No capacity set for resource: {}", _resourceName); + return znRecord; + } + + // Sort the assignable nodes by id + List<CapacityNode> assignableNodes = new ArrayList<>(clusterData.getSimpleCapacitySet()); + assignableNodes.sort(Comparator.comparing(CapacityNode::getId)); + + // Filter out the nodes if not in the liveNodes parameter + // Note the liveNodes parameter here might be processed within the rebalancer, e.g. filter based on tags + Set<String> liveNodesSet = new HashSet<>(liveNodes); + assignableNodes.removeIf(n -> !liveNodesSet.contains(n.getId())); + + // Populate valid state map given current mapping + Map<String, Map<String, String>> stateMap = + populateValidStateMapFromCurrentMapping(currentMapping, assignableNodes); + + if (logger.isDebugEnabled()) { + logger.debug("currentMapping: {}", currentMapping); + logger.debug("stateMap: {}", stateMap); + } + + // Assign partitions to node by order. + for (int i = 0, index = 0; i < _partitions.size(); i++) { + int startIndex = index; + for (Map.Entry<String, Integer> entry : _states.entrySet()) { + String state = entry.getKey(); + int stateReplicaNumber = entry.getValue(); + // For this partition, compute existing number replicas + long existsReplicas = + stateMap.computeIfAbsent(_partitions.get(i), m -> new HashMap<>()).values().stream() + .filter(s -> s.equals(state)).count(); + for (int j = 0; j < stateReplicaNumber - existsReplicas; j++) { + while (index - startIndex < assignableNodes.size()) { + CapacityNode node = assignableNodes.get(index++ % assignableNodes.size()); + if (node.canAdd(_resourceName, _partitions.get(i))) { + stateMap.get(_partitions.get(i)).put(node.getId(), state); + break; + } + } + + if (index - startIndex >= assignableNodes.size()) { + // If the all nodes have been tried out, then no node can be assigned. + logger.warn("No enough assignable nodes for resource: {}", _resourceName); + } + } + } + } + for (Map.Entry<String, Map<String, String>> entry : stateMap.entrySet()) { + znRecord.setListField(entry.getKey(), new ArrayList<>(entry.getValue().keySet())); + } + if (logger.isDebugEnabled()) { + logger.debug("znRecord: {}", znRecord); + } + + return znRecord; + } + + /** + * Populates a valid state map from the current mapping, filtering out invalid nodes. + * + * @param currentMapping the current mapping of partitions to node states + * @param assignableNodes the list of nodes that can be assigned + * @return a map of partitions to valid node states + */ + private Map<String, Map<String, String>> populateValidStateMapFromCurrentMapping( + final Map<String, Map<String, String>> currentMapping, + final List<CapacityNode> assignableNodes) { + Map<String, Map<String, String>> validStateMap = new HashMap<>(); + // Convert the assignableNodes to map for quick lookup + Map<String, CapacityNode> assignableNodeMap = + assignableNodes.stream().collect(Collectors.toMap(CapacityNode::getId, node -> node)); + if (currentMapping != null) { + for (Map.Entry<String, Map<String, String>> entry : currentMapping.entrySet()) { + String partition = entry.getKey(); + Map<String, String> currentNodeStateMap = new HashMap<>(entry.getValue()); + // Skip if current node state is invalid with state model + if (!isValidStateMap(currentNodeStateMap)) { + continue; + } + // Filter out invalid node assignment + currentNodeStateMap.entrySet() + .removeIf(e -> !isValidNodeAssignment(partition, e.getKey(), assignableNodeMap)); + + validStateMap.put(partition, currentNodeStateMap); + } + } + return validStateMap; + } + + /** + * Validates whether the provided state mapping is valid according to the defined state model. + * + * @param currentNodeStateMap A map representing the actual state mapping where the key is the node ID and the value is the state. + * @return true if the state map is valid, false otherwise + */ + private boolean isValidStateMap(final Map<String, String> currentNodeStateMap) { + // Check if the size of the current state map exceeds the total state count in state model + if (currentNodeStateMap.size() > _statesReplicaCount) { + return false; + } + + Map<String, Integer> tmpStates = new HashMap<>(_states); + for (String state : currentNodeStateMap.values()) { + // Return invalid if: + // The state is not defined in the state model OR + // The state count exceeds the defined count in state model + if (!tmpStates.containsKey(state) || tmpStates.get(state) <= 0) { + return false; + } + tmpStates.put(state, tmpStates.get(state) - 1); + } + + return true; + } + + /** + * Checks if a node assignment is valid for a given partition. + * + * @param partition the partition to be assigned + * @param nodeId the ID of the node to be checked + * @param assignableNodeMap the map of node IDs to CapacityNode objects + * @return true if the node is valid for the assignment, false otherwise + */ + private boolean isValidNodeAssignment(final String partition, final String nodeId, + final Map<String, CapacityNode> assignableNodeMap) { + CapacityNode node = assignableNodeMap.get(nodeId); + // Return valid when following conditions match: + // 1. Node is in assignableNodeMap + // 2. Node hold current partition or we can assign current partition to the node + return node != null && (node.hasPartition(_resourceName, partition) || node.canAdd( + _resourceName, partition)); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index edb7a76c6..ab2c40b79 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -71,7 +71,7 @@ public class ClusterConfig extends HelixProperty { // The following concerns maintenance mode MAX_PARTITIONS_PER_INSTANCE, // The maximum number of partitions that an instance can serve in this cluster. - // This only works for GreedyRebalanceStrategy. + // This only works for StickyRebalanceStrategy. // TODO: if we want to support this for other rebalancers, we need to implement that logic GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE, // The following two include offline AND disabled instances diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java deleted file mode 100644 index d90e16136..000000000 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java +++ /dev/null @@ -1,85 +0,0 @@ -package org.apache.helix.controller.rebalancer; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -import org.apache.helix.controller.common.CapacityNode; -import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.mockito.Mockito; -import org.testng.Assert; -import org.testng.annotations.Test; - -import static org.mockito.Mockito.when; - -public class TestGreedyRebalanceStrategy { - private static final String TEST_CLUSTER_NAME = "TestCluster"; - private static final String TEST_RESOURCE_PREFIX = "TestResource_"; - - @Test - public void testAssignmentWithGlobalPartitionLimit() { - - ResourceControllerDataProvider clusterDataCache = - Mockito.mock(ResourceControllerDataProvider.class); - LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>(2); - states.put("OFFLINE", 0); - states.put("ONLINE", 1); - - Set<CapacityNode> capacityNodeSet = new HashSet<>(); - for (int i = 0; i < 5; i++) { - CapacityNode capacityNode = new CapacityNode("Node-" + i); - capacityNode.setCapacity(1); - capacityNodeSet.add(capacityNode); - } - - List<String> liveNodes = - capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList()); - - List<String> partitions = new ArrayList<>(); - for (int i = 0; i < 3; i++) { - partitions.add(TEST_RESOURCE_PREFIX + "0_" + i); - } - when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet); - - GreedyRebalanceStrategy greedyRebalanceStrategy = new GreedyRebalanceStrategy(); - greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 1); - greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache); - - partitions = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - partitions.add(TEST_RESOURCE_PREFIX + "1_" + i); - } - greedyRebalanceStrategy = new GreedyRebalanceStrategy(); - greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 1, partitions, states, 1); - greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache); - - Assert.assertEquals( - capacityNodeSet.stream().filter(node -> node.getCurrentlyAssigned() != 1).count(), 0); - } -} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java new file mode 100644 index 000000000..45211df4e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java @@ -0,0 +1,214 @@ +package org.apache.helix.controller.rebalancer; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.helix.controller.common.CapacityNode; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.when; + +public class TestStickyRebalanceStrategy { + private static final String TEST_CLUSTER_NAME = "TestCluster"; + private static final String TEST_RESOURCE_PREFIX = "TestResource_"; + + @Test + public void testAssignmentWithGlobalPartitionLimit() { + + ResourceControllerDataProvider clusterDataCache = + Mockito.mock(ResourceControllerDataProvider.class); + LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>(2); + states.put("OFFLINE", 0); + states.put("ONLINE", 1); + + Set<CapacityNode> capacityNodeSet = new HashSet<>(); + for (int i = 0; i < 5; i++) { + CapacityNode capacityNode = new CapacityNode("Node-" + i); + capacityNode.setCapacity(1); + capacityNodeSet.add(capacityNode); + } + + List<String> liveNodes = + capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList()); + + List<String> partitions = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + partitions.add(TEST_RESOURCE_PREFIX + "0_" + i); + } + when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet); + + StickyRebalanceStrategy greedyRebalanceStrategy = new StickyRebalanceStrategy(); + greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 1); + greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache); + + partitions = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + partitions.add(TEST_RESOURCE_PREFIX + "1_" + i); + } + greedyRebalanceStrategy = new StickyRebalanceStrategy(); + greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 1, partitions, states, 1); + greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache); + + Assert.assertEquals( + capacityNodeSet.stream().filter(node -> node.getCurrentlyAssigned() != 1).count(), 0); + } + + @Test + public void testStickyAssignment() { + final int nReplicas = 4; + final int nPartitions = 4; + final int nNode = 16; + + ResourceControllerDataProvider clusterDataCache = + Mockito.mock(ResourceControllerDataProvider.class); + LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>(2); + states.put("OFFLINE", 0); + states.put("ONLINE", nReplicas); + + Set<CapacityNode> capacityNodeSet = new HashSet<>(); + for (int i = 0; i < nNode; i++) { + CapacityNode capacityNode = new CapacityNode("Node-" + i); + capacityNode.setCapacity(1); + capacityNodeSet.add(capacityNode); + } + + List<String> liveNodes = + capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList()); + + List<String> partitions = new ArrayList<>(); + for (int i = 0; i < nPartitions; i++) { + partitions.add(TEST_RESOURCE_PREFIX + i); + } + when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet); + + // Populate previous assignment with currentMapping + Map<String, Map<String, String>> currentMapping = new HashMap<>(); + currentMapping.put(TEST_RESOURCE_PREFIX + "0", new HashMap<>()); + currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-0", "ONLINE"); + currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-2", "ONLINE"); + currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-4", "ONLINE"); + currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-6", "ONLINE"); + currentMapping.put(TEST_RESOURCE_PREFIX + "2", new HashMap<>()); + currentMapping.get(TEST_RESOURCE_PREFIX + "2").put("Node-1", "ONLINE"); + currentMapping.get(TEST_RESOURCE_PREFIX + "2").put("Node-5", "ONLINE"); + currentMapping.get(TEST_RESOURCE_PREFIX + "2").put("Node-8", "ONLINE"); + + StickyRebalanceStrategy greedyRebalanceStrategy = new StickyRebalanceStrategy(); + greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 1); + ZNRecord shardAssignment = + greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, currentMapping, + clusterDataCache); + + // Assert the existing assignment won't be changed + Assert.assertEquals(currentMapping.get(TEST_RESOURCE_PREFIX + "0").keySet(), + new HashSet<>(shardAssignment.getListField(TEST_RESOURCE_PREFIX + "0"))); + Assert.assertTrue(shardAssignment.getListField(TEST_RESOURCE_PREFIX + "2") + .containsAll(currentMapping.get(TEST_RESOURCE_PREFIX + "2").keySet())); + } + + @Test + public void testStickyAssignmentMultipleTimes() { + final int nReplicas = 4; + final int nPartitions = 4; + final int nNode = 12; + + ResourceControllerDataProvider clusterDataCache = + Mockito.mock(ResourceControllerDataProvider.class); + LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>(2); + states.put("OFFLINE", 0); + states.put("ONLINE", nReplicas); + + Set<CapacityNode> capacityNodeSet = new HashSet<>(); + for (int i = 0; i < nNode; i++) { + CapacityNode capacityNode = new CapacityNode("Node-" + i); + capacityNode.setCapacity(1); + capacityNodeSet.add(capacityNode); + } + + List<String> liveNodes = + capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList()); + + List<String> partitions = new ArrayList<>(); + for (int i = 0; i < nPartitions; i++) { + partitions.add(TEST_RESOURCE_PREFIX + i); + } + when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet); + + StickyRebalanceStrategy greedyRebalanceStrategy = new StickyRebalanceStrategy(); + greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 1); + // First round assignment computation: + // 1. Without previous assignment (currentMapping is null) + // 2. Without enough assignable nodes + ZNRecord firstRoundShardAssignment = + greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, clusterDataCache); + + // Assert only 3 partitions are fulfilled with assignment + Assert.assertEquals(firstRoundShardAssignment.getListFields().entrySet().stream() + .filter(e -> e.getValue().size() == nReplicas).count(), 3); + + // Assign 4 more nodes which is used in second round assignment computation + for (int i = nNode; i < nNode + 4; i++) { + CapacityNode capacityNode = new CapacityNode("Node-" + i); + capacityNode.setCapacity(1); + capacityNodeSet.add(capacityNode); + } + + liveNodes = capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList()); + + // Populate previous assignment (currentMapping) with first round assignment computation result + Map<String, Map<String, String>> currentMapping = new HashMap<>(); + firstRoundShardAssignment.getListFields().entrySet().stream() + .filter(e -> e.getValue().size() == nReplicas).forEach(e -> { + currentMapping.put(e.getKey(), new HashMap<>()); + for (String nodeId : e.getValue()) { + currentMapping.get(e.getKey()).put(nodeId, "ONLINE"); + } + }); + + // Second round assignment computation: + // 1. With previous assignment (currentMapping) + // 2. With enough assignable nodes + ZNRecord secondRoundShardAssignment = + greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, currentMapping, + clusterDataCache); + + // Assert all partitions have been assigned with enough replica + Assert.assertEquals(secondRoundShardAssignment.getListFields().entrySet().stream() + .filter(e -> e.getValue().size() == nReplicas).count(), nPartitions); + // For previously existing assignment, assert there is no assignment change + currentMapping.forEach((partition, nodeMapping) -> { + Assert.assertEquals(nodeMapping.keySet(), + new HashSet<>(secondRoundShardAssignment.getListField(partition))); + }); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceWithGlobalPerInstancePartitionLimit.java similarity index 77% rename from helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java rename to helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceWithGlobalPerInstancePartitionLimit.java index 6b675a8a3..e0d160b78 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceWithGlobalPerInstancePartitionLimit.java @@ -1,5 +1,24 @@ package org.apache.helix.integration.rebalancer; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -15,7 +34,7 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -public class TestGreedyRebalanceWithGlobalPerInstancePartitionLimit extends TaskTestBase { +public class TestStickyRebalanceWithGlobalPerInstancePartitionLimit extends TaskTestBase { @BeforeClass public void beforeClass() throws Exception { @@ -48,14 +67,14 @@ public class TestGreedyRebalanceWithGlobalPerInstancePartitionLimit extends Task IdealState idealState = _gSetupTool.getClusterManagementTool() .getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB); idealState.setRebalanceStrategy( - "org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy"); + "org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy"); _gSetupTool.getClusterManagementTool() .setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, idealState); Assert.assertTrue(_clusterVerifier.verifyByPolling()); _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, "NewDB", 2, "OnlineOffline", IdealState.RebalanceMode.FULL_AUTO.name(), - "org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy"); + "org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy"); _gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, "NewDB", 1); Assert.assertTrue(_clusterVerifier.verifyByPolling());