This is an automated email from the ASF dual-hosted git repository.

nealsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new f893311  Remove WAGED sorting for each assignment (#1959)
f893311 is described below

commit f8933114dead95dca554673a0b504848e4148138
Author: xyuanlu <xyua...@gmail.com>
AuthorDate: Mon Feb 14 16:58:05 2022 -0800

    Remove WAGED sorting for each assignment (#1959)
    
    Improve WAGED sorting from n^2 to n*log(n)
---
 .../constraints/ConstraintBasedAlgorithm.java      | 46 +++++-----------------
 1 file changed, 9 insertions(+), 37 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 89730c6..bb5df33 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -75,15 +75,13 @@ class ConstraintBasedAlgorithm implements 
RebalanceAlgorithm {
         computeOverallClusterRemainingCapacity(nodes);
 
     // Create a wrapper for each AssignableReplica.
-    Set<AssignableReplicaWithScore> toBeAssignedReplicas =
-        clusterModel.getAssignableReplicaMap().values().stream()
-            .flatMap(replicas -> replicas.stream())
-            .map(replica -> new AssignableReplicaWithScore(replica, 
clusterModel))
-            .collect(Collectors.toSet());
+    List<AssignableReplicaWithScore> toBeAssignedReplicas =
+        
clusterModel.getAssignableReplicaMap().values().stream().flatMap(Collection::stream).map(
+            replica -> new AssignableReplicaWithScore(replica, clusterModel,
+                
overallClusterRemainingCapacityMap)).sorted().collect(Collectors.toList());
 
-    while (!toBeAssignedReplicas.isEmpty()) {
-      AssignableReplica replica =
-          getNextAssignableReplica(toBeAssignedReplicas, 
overallClusterRemainingCapacityMap);
+    for (AssignableReplicaWithScore replicaWithScore : toBeAssignedReplicas) {
+      AssignableReplica replica = replicaWithScore.getAssignableReplica();
       Optional<AssignableNode> maybeBestNode =
           getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), 
busyInstances,
               optimalAssignment);
@@ -100,7 +98,6 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm 
{
       clusterModel
           .assign(replica.getResourceName(), replica.getPartitionName(), 
replica.getReplicaState(),
               bestNode.getInstanceName());
-      
updateOverallClusterRemainingCapacity(overallClusterRemainingCapacityMap, 
replica);
     }
     optimalAssignment.updateAssignments(clusterModel);
     return optimalAssignment;
@@ -180,18 +177,6 @@ class ConstraintBasedAlgorithm implements 
RebalanceAlgorithm {
     return utilizationMap;
   }
 
-  /**
-   * Update the overallClusterUtilMap with newly placed replica
-   */
-  private void updateOverallClusterRemainingCapacity(
-      Map<String, Integer> overallClusterRemainingCapacityMap, 
AssignableReplica replica) {
-    for (Map.Entry<String, Integer> resourceUsage : 
replica.getCapacity().entrySet()) {
-      overallClusterRemainingCapacityMap.put(resourceUsage.getKey(),
-          overallClusterRemainingCapacityMap.get(resourceUsage.getKey()) - 
resourceUsage
-              .getValue());
-    }
-  }
-
   private class AssignableReplicaWithScore implements 
Comparable<AssignableReplicaWithScore> {
     private final AssignableReplica _replica;
     private float _score = 0;
@@ -199,13 +184,15 @@ class ConstraintBasedAlgorithm implements 
RebalanceAlgorithm {
     private final boolean _isInBaselineAssignment;
     private final Integer  _replicaHash;
 
-    AssignableReplicaWithScore(AssignableReplica replica, ClusterModel 
clusterModel) {
+    AssignableReplicaWithScore(AssignableReplica replica, ClusterModel 
clusterModel,
+        Map<String, Integer> overallClusterRemainingCapacityMap) {
       _replica = replica;
       _isInBestPossibleAssignment = 
clusterModel.getContext().getBestPossibleAssignment()
           .containsKey(replica.getResourceName());
       _isInBaselineAssignment =
           
clusterModel.getContext().getBaselineAssignment().containsKey(replica.getResourceName());
       _replicaHash = Objects.hash(replica.toString(), 
clusterModel.getAssignableNodes().keySet());
+      computeScore(overallClusterRemainingCapacityMap);
     }
 
     public void computeScore(Map<String, Integer> 
overallClusterRemainingCapMap) {
@@ -288,21 +275,6 @@ class ConstraintBasedAlgorithm implements 
RebalanceAlgorithm {
     }
   }
 
-  private AssignableReplica getNextAssignableReplica(
-      Set<AssignableReplicaWithScore> allReplica,
-      Map<String, Integer> overallClusterRemainingCapMap) {
-    AssignableReplicaWithScore nextAssinableReplica = null;
-    // Compare every replica with current candidate, update candidate if needed
-    for (AssignableReplicaWithScore replica : allReplica) {
-      replica.computeScore(overallClusterRemainingCapMap);
-      if (nextAssinableReplica == null || 
replica.compareTo(nextAssinableReplica) < 0) {
-        nextAssinableReplica = replica;
-      }
-    }
-    allReplica.remove(nextAssinableReplica);
-    return nextAssinableReplica.getAssignableReplica();
-  }
-
   /**
    * @param assignments A collection of resource replicas assignment.
    * @return A set of instance names that have at least one replica assigned 
in the input assignments.

Reply via email to