This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-stickiness-rebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 46962336ae740932d06865041d72ddcc6e10778b
Author: frankmu <muteng...@gmail.com>
AuthorDate: Fri Aug 9 12:13:41 2024 -0700

    Helix stickiness rebalancer (#2878)
    
    Create sticky assignment rebalance strategy
---
 .../helix/controller/common/CapacityNode.java      |  14 ++
 .../ResourceControllerDataProvider.java            |   8 +-
 .../rebalancer/ConditionBasedRebalancer.java       |  18 +-
 .../strategy/GreedyRebalanceStrategy.java          | 103 ----------
 .../strategy/StickyRebalanceStrategy.java          | 202 +++++++++++++++++++
 .../java/org/apache/helix/model/ClusterConfig.java |   2 +-
 .../rebalancer/TestGreedyRebalanceStrategy.java    |  85 --------
 .../rebalancer/TestStickyRebalanceStrategy.java    | 214 +++++++++++++++++++++
 ...alanceWithGlobalPerInstancePartitionLimit.java} |  25 ++-
 9 files changed, 467 insertions(+), 204 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java 
b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
index fa5068e13..208ee7913 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/common/CapacityNode.java
@@ -52,11 +52,25 @@ public class CapacityNode {
         && _partitionMap.get(resource).contains(partition))) {
       return false;
     }
+
+    // Add the partition to the resource's set of partitions in this node
     _partitionMap.computeIfAbsent(resource, k -> new 
HashSet<>()).add(partition);
     _currentlyAssigned++;
     return true;
   }
 
+  /**
+   * Checks if a specific resource + partition is assigned to this node.
+   *
+   * @param resource  the name of the resource
+   * @param partition the partition
+   * @return {@code true} if the resource + partition is assigned to this 
node, {@code false} otherwise
+   */
+  public boolean hasPartition(String resource, String partition) {
+    Set<String> partitions = _partitionMap.get(resource);
+    return partitions != null && partitions.contains(partition);
+  }
+
   /**
    * Set the capacity of this node
    * @param capacity  The capacity to set
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index cdfcb0f24..74b5aa8b2 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -41,7 +41,7 @@ import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.common.CapacityNode;
 import org.apache.helix.controller.pipeline.Pipeline;
-import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
 import 
org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
 import org.apache.helix.controller.stages.MissingTopStateRecord;
@@ -191,11 +191,11 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
       // Remove all cached IdealState because it is a global computation 
cannot partially be
       // performed for some resources. The computation is simple as well not 
taking too much resource
       // to recompute the assignments.
-      Set<String> cachedGreedyIdealStates = 
_idealMappingCache.values().stream().filter(
+      Set<String> cachedStickyIdealStates = 
_idealMappingCache.values().stream().filter(
               record -> 
record.getSimpleField(IdealState.IdealStateProperty.REBALANCE_STRATEGY.name())
-                  
.equals(GreedyRebalanceStrategy.class.getName())).map(ZNRecord::getId)
+                  
.equals(StickyRebalanceStrategy.class.getName())).map(ZNRecord::getId)
           .collect(Collectors.toSet());
-      _idealMappingCache.keySet().removeAll(cachedGreedyIdealStates);
+      _idealMappingCache.keySet().removeAll(cachedStickyIdealStates);
     }
 
     LogUtil.logInfo(logger, getClusterEventId(), String.format(
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
index 158699a97..84762ea51 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/ConditionBasedRebalancer.java
@@ -20,12 +20,12 @@ package org.apache.helix.controller.rebalancer;
  */
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.helix.HelixException;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -98,8 +98,8 @@ public class ConditionBasedRebalancer extends 
AbstractRebalancer<ResourceControl
 
     LinkedHashMap<String, Integer> stateCountMap =
         stateModelDef.getStateCountMap(assignableLiveInstance.size(), 
replicas);
-    List<String> assignableLiveNodes = new 
ArrayList<>(assignableLiveInstance.keySet());
-    List<String> assignableNodes = new 
ArrayList<>(clusterData.getAssignableInstances());
+    Set<String> assignableLiveNodes = new 
HashSet<>(assignableLiveInstance.keySet());
+    Set<String> assignableNodes = new 
HashSet<>(clusterData.getAssignableInstances());
     assignableNodes.removeAll(clusterData.getDisabledInstances());
     assignableLiveNodes.retainAll(assignableNodes);
 
@@ -135,20 +135,22 @@ public class ConditionBasedRebalancer extends 
AbstractRebalancer<ResourceControl
         LOG.warn("Resource " + resourceName + " has tag " + 
currentIdealState.getInstanceGroupTag()
             + " but no live participants have this tag");
       }
-      assignableNodes = new ArrayList<>(taggedNodes);
-      assignableLiveNodes = new ArrayList<>(taggedLiveNodes);
+      assignableNodes = new HashSet<>(taggedNodes);
+      assignableLiveNodes = new HashSet<>(taggedLiveNodes);
     }
 
     // sort node lists to ensure consistent preferred assignments
-    Collections.sort(assignableNodes);
-    Collections.sort(assignableLiveNodes);
+    List<String> assignableNodesList =
+        assignableNodes.stream().sorted().collect(Collectors.toList());
+    List<String> assignableLiveNodesList =
+        assignableLiveNodes.stream().sorted().collect(Collectors.toList());
 
     int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
     _rebalanceStrategy =
         getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), 
partitions, resourceName,
             stateCountMap, maxPartition);
     ZNRecord newMapping =
-        _rebalanceStrategy.computePartitionAssignment(assignableNodes, 
assignableLiveNodes,
+        _rebalanceStrategy.computePartitionAssignment(assignableNodesList, 
assignableLiveNodesList,
             currentMapping, clusterData);
 
     if (LOG.isDebugEnabled()) {
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java
deleted file mode 100644
index 60580c40a..000000000
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/GreedyRebalanceStrategy.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package org.apache.helix.controller.rebalancer.strategy;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.helix.controller.common.CapacityNode;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GreedyRebalanceStrategy implements 
RebalanceStrategy<ResourceControllerDataProvider> {
-  private static Logger logger = 
LoggerFactory.getLogger(GreedyRebalanceStrategy.class);
-  private String _resourceName;
-  private List<String> _partitions;
-  private LinkedHashMap<String, Integer> _states;
-
-  public GreedyRebalanceStrategy() {
-  }
-
-  @Override
-  public void init(String resourceName, final List<String> partitions,
-      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
-    _resourceName = resourceName;
-    _partitions = partitions;
-    _states = states;
-  }
-
-  @Override
-  public ZNRecord computePartitionAssignment(final List<String> allNodes, 
final List<String> liveNodes,
-      final Map<String, Map<String, String>> currentMapping, 
ResourceControllerDataProvider clusterData) {
-    int numReplicas = countStateReplicas();
-    ZNRecord znRecord = new ZNRecord(_resourceName);
-    if (liveNodes.size() == 0) {
-      return znRecord;
-    }
-
-    if (clusterData.getSimpleCapacitySet() == null) {
-      logger.warn("No capacity set for resource: " + _resourceName);
-      return znRecord;
-    }
-
-    // Sort the assignable nodes by id
-    List<CapacityNode> assignableNodes = new 
ArrayList<>(clusterData.getSimpleCapacitySet());
-    Collections.sort(assignableNodes, 
Comparator.comparing(CapacityNode::getId));
-
-    // Assign partitions to node by order.
-    for (int i = 0, index = 0; i < _partitions.size(); i++) {
-      int startIndex = index;
-      List<String> preferenceList = new ArrayList<>();
-      for (int j = 0; j < numReplicas; j++) {
-        while (index - startIndex < assignableNodes.size()) {
-          CapacityNode node = assignableNodes.get(index++ % 
assignableNodes.size());
-          if (node.canAdd(_resourceName, _partitions.get(i))) {
-            preferenceList.add(node.getId());
-            break;
-          }
-        }
-
-        if (index - startIndex >= assignableNodes.size()) {
-          // If the all nodes have been tried out, then no node can be 
assigned.
-          logger.warn("No enough assignable nodes for resource: " + 
_resourceName);
-        }
-      }
-      znRecord.setListField(_partitions.get(i), preferenceList);
-    }
-
-    return znRecord;
-  }
-
-  private int countStateReplicas() {
-    int total = 0;
-    for (Integer count : _states.values()) {
-      total += count;
-    }
-    return total;
-  }
-}
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
new file mode 100644
index 000000000..7def42d08
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/StickyRebalanceStrategy.java
@@ -0,0 +1,202 @@
+package org.apache.helix.controller.rebalancer.strategy;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.helix.controller.common.CapacityNode;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StickyRebalanceStrategy implements 
RebalanceStrategy<ResourceControllerDataProvider> {
+  private static Logger logger = 
LoggerFactory.getLogger(StickyRebalanceStrategy.class);
+  private String _resourceName;
+  private List<String> _partitions;
+  private LinkedHashMap<String, Integer> _states;
+  private int _statesReplicaCount;
+
+  public StickyRebalanceStrategy() {
+  }
+
+  @Override
+  public void init(String resourceName, final List<String> partitions,
+      final LinkedHashMap<String, Integer> states, int maximumPerNode) {
+    _resourceName = resourceName;
+    _partitions = partitions;
+    _states = states;
+    if (_states != null) {
+      _statesReplicaCount = 
_states.values().stream().mapToInt(Integer::intValue).sum();
+    }
+  }
+
+  @Override
+  public ZNRecord computePartitionAssignment(final List<String> allNodes,
+      final List<String> liveNodes, final Map<String, Map<String, String>> 
currentMapping,
+      ResourceControllerDataProvider clusterData) {
+    ZNRecord znRecord = new ZNRecord(_resourceName);
+    if (liveNodes.isEmpty()) {
+      return znRecord;
+    }
+
+    if (clusterData.getSimpleCapacitySet() == null) {
+      logger.warn("No capacity set for resource: {}", _resourceName);
+      return znRecord;
+    }
+
+    // Sort the assignable nodes by id
+    List<CapacityNode> assignableNodes = new 
ArrayList<>(clusterData.getSimpleCapacitySet());
+    assignableNodes.sort(Comparator.comparing(CapacityNode::getId));
+
+    // Filter out the nodes if not in the liveNodes parameter
+    // Note the liveNodes parameter here might be processed within the 
rebalancer, e.g. filter based on tags
+    Set<String> liveNodesSet = new HashSet<>(liveNodes);
+    assignableNodes.removeIf(n -> !liveNodesSet.contains(n.getId()));
+
+    //  Populate valid state map given current mapping
+    Map<String, Map<String, String>> stateMap =
+        populateValidStateMapFromCurrentMapping(currentMapping, 
assignableNodes);
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("currentMapping: {}", currentMapping);
+      logger.debug("stateMap: {}", stateMap);
+    }
+
+    // Assign partitions to node by order.
+    for (int i = 0, index = 0; i < _partitions.size(); i++) {
+      int startIndex = index;
+      for (Map.Entry<String, Integer> entry : _states.entrySet()) {
+        String state = entry.getKey();
+        int stateReplicaNumber = entry.getValue();
+        // For this partition, compute existing number replicas
+        long existsReplicas =
+            stateMap.computeIfAbsent(_partitions.get(i), m -> new 
HashMap<>()).values().stream()
+                .filter(s -> s.equals(state)).count();
+        for (int j = 0; j < stateReplicaNumber - existsReplicas; j++) {
+          while (index - startIndex < assignableNodes.size()) {
+            CapacityNode node = assignableNodes.get(index++ % 
assignableNodes.size());
+            if (node.canAdd(_resourceName, _partitions.get(i))) {
+              stateMap.get(_partitions.get(i)).put(node.getId(), state);
+              break;
+            }
+          }
+
+          if (index - startIndex >= assignableNodes.size()) {
+            // If the all nodes have been tried out, then no node can be 
assigned.
+            logger.warn("No enough assignable nodes for resource: {}", 
_resourceName);
+          }
+        }
+      }
+    }
+    for (Map.Entry<String, Map<String, String>> entry : stateMap.entrySet()) {
+      znRecord.setListField(entry.getKey(), new 
ArrayList<>(entry.getValue().keySet()));
+    }
+    if (logger.isDebugEnabled()) {
+      logger.debug("znRecord: {}", znRecord);
+    }
+
+    return znRecord;
+  }
+
+  /**
+   * Populates a valid state map from the current mapping, filtering out 
invalid nodes.
+   *
+   * @param currentMapping   the current mapping of partitions to node states
+   * @param assignableNodes  the list of nodes that can be assigned
+   * @return a map of partitions to valid node states
+   */
+  private Map<String, Map<String, String>> 
populateValidStateMapFromCurrentMapping(
+      final Map<String, Map<String, String>> currentMapping,
+      final List<CapacityNode> assignableNodes) {
+    Map<String, Map<String, String>> validStateMap = new HashMap<>();
+    // Convert the assignableNodes to map for quick lookup
+    Map<String, CapacityNode> assignableNodeMap =
+        assignableNodes.stream().collect(Collectors.toMap(CapacityNode::getId, 
node -> node));
+    if (currentMapping != null) {
+      for (Map.Entry<String, Map<String, String>> entry : 
currentMapping.entrySet()) {
+        String partition = entry.getKey();
+        Map<String, String> currentNodeStateMap = new 
HashMap<>(entry.getValue());
+        // Skip if current node state is invalid with state model
+        if (!isValidStateMap(currentNodeStateMap)) {
+          continue;
+        }
+        // Filter out invalid node assignment
+        currentNodeStateMap.entrySet()
+            .removeIf(e -> !isValidNodeAssignment(partition, e.getKey(), 
assignableNodeMap));
+
+        validStateMap.put(partition, currentNodeStateMap);
+      }
+    }
+    return validStateMap;
+  }
+
+  /**
+   * Validates whether the provided state mapping is valid according to the 
defined state model.
+   *
+   * @param currentNodeStateMap A map representing the actual state mapping 
where the key is the node ID and the value is the state.
+   * @return true if the state map is valid, false otherwise
+   */
+  private boolean isValidStateMap(final Map<String, String> 
currentNodeStateMap) {
+    // Check if the size of the current state map exceeds the total state 
count in state model
+    if (currentNodeStateMap.size() > _statesReplicaCount) {
+      return false;
+    }
+
+    Map<String, Integer> tmpStates = new HashMap<>(_states);
+    for (String state : currentNodeStateMap.values()) {
+      // Return invalid if:
+      // The state is not defined in the state model OR
+      // The state count exceeds the defined count in state model
+      if (!tmpStates.containsKey(state) || tmpStates.get(state) <= 0) {
+        return false;
+      }
+      tmpStates.put(state, tmpStates.get(state) - 1);
+    }
+
+    return true;
+  }
+
+  /**
+   * Checks if a node assignment is valid for a given partition.
+   *
+   * @param partition           the partition to be assigned
+   * @param nodeId              the ID of the node to be checked
+   * @param assignableNodeMap   the map of node IDs to CapacityNode objects
+   * @return true if the node is valid for the assignment, false otherwise
+   */
+  private boolean isValidNodeAssignment(final String partition, final String 
nodeId,
+      final Map<String, CapacityNode> assignableNodeMap) {
+    CapacityNode node = assignableNodeMap.get(nodeId);
+    // Return valid when following conditions match:
+    // 1. Node is in assignableNodeMap
+    // 2. Node hold current partition or we can assign current partition to 
the node
+    return node != null && (node.hasPartition(_resourceName, partition) || 
node.canAdd(
+        _resourceName, partition));
+  }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index edb7a76c6..ab2c40b79 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -71,7 +71,7 @@ public class ClusterConfig extends HelixProperty {
     // The following concerns maintenance mode
     MAX_PARTITIONS_PER_INSTANCE,
     // The maximum number of partitions that an instance can serve in this 
cluster.
-    // This only works for GreedyRebalanceStrategy.
+    // This only works for StickyRebalanceStrategy.
     // TODO: if we want to support this for other rebalancers, we need to 
implement that logic
     GLOBAL_MAX_PARTITIONS_ALLOWED_PER_INSTANCE,
     // The following two include offline AND disabled instances
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java
deleted file mode 100644
index d90e16136..000000000
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestGreedyRebalanceStrategy.java
+++ /dev/null
@@ -1,85 +0,0 @@
-package org.apache.helix.controller.rebalancer;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.helix.controller.common.CapacityNode;
-import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static org.mockito.Mockito.when;
-
-public class TestGreedyRebalanceStrategy {
-  private static final String TEST_CLUSTER_NAME = "TestCluster";
-  private static final String TEST_RESOURCE_PREFIX = "TestResource_";
-
-  @Test
-  public void testAssignmentWithGlobalPartitionLimit() {
-
-    ResourceControllerDataProvider clusterDataCache =
-        Mockito.mock(ResourceControllerDataProvider.class);
-    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, 
Integer>(2);
-    states.put("OFFLINE", 0);
-    states.put("ONLINE", 1);
-
-    Set<CapacityNode> capacityNodeSet = new HashSet<>();
-    for (int i = 0; i < 5; i++) {
-      CapacityNode capacityNode = new CapacityNode("Node-" + i);
-      capacityNode.setCapacity(1);
-      capacityNodeSet.add(capacityNode);
-    }
-
-    List<String> liveNodes =
-        
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
-
-    List<String> partitions = new ArrayList<>();
-    for (int i = 0; i < 3; i++) {
-      partitions.add(TEST_RESOURCE_PREFIX + "0_" + i);
-    }
-    when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet);
-
-    GreedyRebalanceStrategy greedyRebalanceStrategy = new 
GreedyRebalanceStrategy();
-    greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 
1);
-    greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, 
clusterDataCache);
-
-    partitions = new ArrayList<>();
-    for (int i = 0; i < 2; i++) {
-      partitions.add(TEST_RESOURCE_PREFIX + "1_" + i);
-    }
-    greedyRebalanceStrategy = new GreedyRebalanceStrategy();
-    greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 1, partitions, states, 
1);
-    greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, 
clusterDataCache);
-
-    Assert.assertEquals(
-        capacityNodeSet.stream().filter(node -> node.getCurrentlyAssigned() != 
1).count(), 0);
-  }
-}
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java
new file mode 100644
index 000000000..45211df4e
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestStickyRebalanceStrategy.java
@@ -0,0 +1,214 @@
+package org.apache.helix.controller.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.helix.controller.common.CapacityNode;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+
+public class TestStickyRebalanceStrategy {
+  private static final String TEST_CLUSTER_NAME = "TestCluster";
+  private static final String TEST_RESOURCE_PREFIX = "TestResource_";
+
+  @Test
+  public void testAssignmentWithGlobalPartitionLimit() {
+
+    ResourceControllerDataProvider clusterDataCache =
+        Mockito.mock(ResourceControllerDataProvider.class);
+    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, 
Integer>(2);
+    states.put("OFFLINE", 0);
+    states.put("ONLINE", 1);
+
+    Set<CapacityNode> capacityNodeSet = new HashSet<>();
+    for (int i = 0; i < 5; i++) {
+      CapacityNode capacityNode = new CapacityNode("Node-" + i);
+      capacityNode.setCapacity(1);
+      capacityNodeSet.add(capacityNode);
+    }
+
+    List<String> liveNodes =
+        
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+
+    List<String> partitions = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      partitions.add(TEST_RESOURCE_PREFIX + "0_" + i);
+    }
+    when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet);
+
+    StickyRebalanceStrategy greedyRebalanceStrategy = new 
StickyRebalanceStrategy();
+    greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 
1);
+    greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, 
clusterDataCache);
+
+    partitions = new ArrayList<>();
+    for (int i = 0; i < 2; i++) {
+      partitions.add(TEST_RESOURCE_PREFIX + "1_" + i);
+    }
+    greedyRebalanceStrategy = new StickyRebalanceStrategy();
+    greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 1, partitions, states, 
1);
+    greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, null, 
clusterDataCache);
+
+    Assert.assertEquals(
+        capacityNodeSet.stream().filter(node -> node.getCurrentlyAssigned() != 
1).count(), 0);
+  }
+
+  @Test
+  public void testStickyAssignment() {
+    final int nReplicas = 4;
+    final int nPartitions = 4;
+    final int nNode = 16;
+
+    ResourceControllerDataProvider clusterDataCache =
+        Mockito.mock(ResourceControllerDataProvider.class);
+    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, 
Integer>(2);
+    states.put("OFFLINE", 0);
+    states.put("ONLINE", nReplicas);
+
+    Set<CapacityNode> capacityNodeSet = new HashSet<>();
+    for (int i = 0; i < nNode; i++) {
+      CapacityNode capacityNode = new CapacityNode("Node-" + i);
+      capacityNode.setCapacity(1);
+      capacityNodeSet.add(capacityNode);
+    }
+
+    List<String> liveNodes =
+        
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+
+    List<String> partitions = new ArrayList<>();
+    for (int i = 0; i < nPartitions; i++) {
+      partitions.add(TEST_RESOURCE_PREFIX + i);
+    }
+    when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet);
+
+    // Populate previous assignment with currentMapping
+    Map<String, Map<String, String>> currentMapping = new HashMap<>();
+    currentMapping.put(TEST_RESOURCE_PREFIX + "0", new HashMap<>());
+    currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-0", "ONLINE");
+    currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-2", "ONLINE");
+    currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-4", "ONLINE");
+    currentMapping.get(TEST_RESOURCE_PREFIX + "0").put("Node-6", "ONLINE");
+    currentMapping.put(TEST_RESOURCE_PREFIX + "2", new HashMap<>());
+    currentMapping.get(TEST_RESOURCE_PREFIX + "2").put("Node-1", "ONLINE");
+    currentMapping.get(TEST_RESOURCE_PREFIX + "2").put("Node-5", "ONLINE");
+    currentMapping.get(TEST_RESOURCE_PREFIX + "2").put("Node-8", "ONLINE");
+
+    StickyRebalanceStrategy greedyRebalanceStrategy = new 
StickyRebalanceStrategy();
+    greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 
1);
+    ZNRecord shardAssignment =
+        greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, 
currentMapping,
+            clusterDataCache);
+
+    // Assert the existing assignment won't be changed
+    Assert.assertEquals(currentMapping.get(TEST_RESOURCE_PREFIX + 
"0").keySet(),
+        new HashSet<>(shardAssignment.getListField(TEST_RESOURCE_PREFIX + 
"0")));
+    Assert.assertTrue(shardAssignment.getListField(TEST_RESOURCE_PREFIX + "2")
+        .containsAll(currentMapping.get(TEST_RESOURCE_PREFIX + "2").keySet()));
+  }
+
+  @Test
+  public void testStickyAssignmentMultipleTimes() {
+    final int nReplicas = 4;
+    final int nPartitions = 4;
+    final int nNode = 12;
+
+    ResourceControllerDataProvider clusterDataCache =
+        Mockito.mock(ResourceControllerDataProvider.class);
+    LinkedHashMap<String, Integer> states = new LinkedHashMap<String, 
Integer>(2);
+    states.put("OFFLINE", 0);
+    states.put("ONLINE", nReplicas);
+
+    Set<CapacityNode> capacityNodeSet = new HashSet<>();
+    for (int i = 0; i < nNode; i++) {
+      CapacityNode capacityNode = new CapacityNode("Node-" + i);
+      capacityNode.setCapacity(1);
+      capacityNodeSet.add(capacityNode);
+    }
+
+    List<String> liveNodes =
+        
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+
+    List<String> partitions = new ArrayList<>();
+    for (int i = 0; i < nPartitions; i++) {
+      partitions.add(TEST_RESOURCE_PREFIX + i);
+    }
+    when(clusterDataCache.getSimpleCapacitySet()).thenReturn(capacityNodeSet);
+
+    StickyRebalanceStrategy greedyRebalanceStrategy = new 
StickyRebalanceStrategy();
+    greedyRebalanceStrategy.init(TEST_RESOURCE_PREFIX + 0, partitions, states, 
1);
+    // First round assignment computation:
+    // 1. Without previous assignment (currentMapping is null)
+    // 2. Without enough assignable nodes
+    ZNRecord firstRoundShardAssignment =
+        greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, 
null, clusterDataCache);
+
+    // Assert only 3 partitions are fulfilled with assignment
+    
Assert.assertEquals(firstRoundShardAssignment.getListFields().entrySet().stream()
+        .filter(e -> e.getValue().size() == nReplicas).count(), 3);
+
+    // Assign 4 more nodes which is used in second round assignment computation
+    for (int i = nNode; i < nNode + 4; i++) {
+      CapacityNode capacityNode = new CapacityNode("Node-" + i);
+      capacityNode.setCapacity(1);
+      capacityNodeSet.add(capacityNode);
+    }
+
+    liveNodes = 
capacityNodeSet.stream().map(CapacityNode::getId).collect(Collectors.toList());
+
+    // Populate previous assignment (currentMapping) with first round 
assignment computation result
+    Map<String, Map<String, String>> currentMapping = new HashMap<>();
+    firstRoundShardAssignment.getListFields().entrySet().stream()
+        .filter(e -> e.getValue().size() == nReplicas).forEach(e -> {
+          currentMapping.put(e.getKey(), new HashMap<>());
+          for (String nodeId : e.getValue()) {
+            currentMapping.get(e.getKey()).put(nodeId, "ONLINE");
+          }
+        });
+
+    // Second round assignment computation:
+    // 1. With previous assignment (currentMapping)
+    // 2. With enough assignable nodes
+    ZNRecord secondRoundShardAssignment =
+        greedyRebalanceStrategy.computePartitionAssignment(null, liveNodes, 
currentMapping,
+            clusterDataCache);
+
+    // Assert all partitions have been assigned with enough replica
+    
Assert.assertEquals(secondRoundShardAssignment.getListFields().entrySet().stream()
+        .filter(e -> e.getValue().size() == nReplicas).count(), nPartitions);
+    // For previously existing assignment, assert there is no assignment change
+    currentMapping.forEach((partition, nodeMapping) -> {
+      Assert.assertEquals(nodeMapping.keySet(),
+          new HashSet<>(secondRoundShardAssignment.getListField(partition)));
+    });
+  }
+}
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceWithGlobalPerInstancePartitionLimit.java
similarity index 77%
rename from 
helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java
rename to 
helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceWithGlobalPerInstancePartitionLimit.java
index 6b675a8a3..e0d160b78 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestGreedyRebalanceWithGlobalPerInstancePartitionLimit.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestStickyRebalanceWithGlobalPerInstancePartitionLimit.java
@@ -1,5 +1,24 @@
 package org.apache.helix.integration.rebalancer;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
@@ -15,7 +34,7 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-public class TestGreedyRebalanceWithGlobalPerInstancePartitionLimit extends 
TaskTestBase {
+public class TestStickyRebalanceWithGlobalPerInstancePartitionLimit extends 
TaskTestBase {
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -48,14 +67,14 @@ public class 
TestGreedyRebalanceWithGlobalPerInstancePartitionLimit extends Task
     IdealState idealState = _gSetupTool.getClusterManagementTool()
         .getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB);
     idealState.setRebalanceStrategy(
-        
"org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy");
+        
"org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy");
     _gSetupTool.getClusterManagementTool()
         .setResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, 
idealState);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
     _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, "NewDB", 
2, "OnlineOffline",
         IdealState.RebalanceMode.FULL_AUTO.name(),
-        
"org.apache.helix.controller.rebalancer.strategy.GreedyRebalanceStrategy");
+        
"org.apache.helix.controller.rebalancer.strategy.StickyRebalanceStrategy");
     _gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, "NewDB", 1);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 


Reply via email to