This is an automated email from the ASF dual-hosted git repository. nealsun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push: new f893311 Remove WAGED sorting for each assignment (#1959) f893311 is described below commit f8933114dead95dca554673a0b504848e4148138 Author: xyuanlu <xyua...@gmail.com> AuthorDate: Mon Feb 14 16:58:05 2022 -0800 Remove WAGED sorting for each assignment (#1959) Improve WAGED sorting from n^2 to n*log(n) --- .../constraints/ConstraintBasedAlgorithm.java | 46 +++++----------------- 1 file changed, 9 insertions(+), 37 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java index 89730c6..bb5df33 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java @@ -75,15 +75,13 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm { computeOverallClusterRemainingCapacity(nodes); // Create a wrapper for each AssignableReplica. - Set<AssignableReplicaWithScore> toBeAssignedReplicas = - clusterModel.getAssignableReplicaMap().values().stream() - .flatMap(replicas -> replicas.stream()) - .map(replica -> new AssignableReplicaWithScore(replica, clusterModel)) - .collect(Collectors.toSet()); + List<AssignableReplicaWithScore> toBeAssignedReplicas = + clusterModel.getAssignableReplicaMap().values().stream().flatMap(Collection::stream).map( + replica -> new AssignableReplicaWithScore(replica, clusterModel, + overallClusterRemainingCapacityMap)).sorted().collect(Collectors.toList()); - while (!toBeAssignedReplicas.isEmpty()) { - AssignableReplica replica = - getNextAssignableReplica(toBeAssignedReplicas, overallClusterRemainingCapacityMap); + for (AssignableReplicaWithScore replicaWithScore : toBeAssignedReplicas) { + AssignableReplica replica = replicaWithScore.getAssignableReplica(); Optional<AssignableNode> maybeBestNode = getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), busyInstances, optimalAssignment); @@ -100,7 +98,6 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm { clusterModel .assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(), bestNode.getInstanceName()); - updateOverallClusterRemainingCapacity(overallClusterRemainingCapacityMap, replica); } optimalAssignment.updateAssignments(clusterModel); return optimalAssignment; @@ -180,18 +177,6 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm { return utilizationMap; } - /** - * Update the overallClusterUtilMap with newly placed replica - */ - private void updateOverallClusterRemainingCapacity( - Map<String, Integer> overallClusterRemainingCapacityMap, AssignableReplica replica) { - for (Map.Entry<String, Integer> resourceUsage : replica.getCapacity().entrySet()) { - overallClusterRemainingCapacityMap.put(resourceUsage.getKey(), - overallClusterRemainingCapacityMap.get(resourceUsage.getKey()) - resourceUsage - .getValue()); - } - } - private class AssignableReplicaWithScore implements Comparable<AssignableReplicaWithScore> { private final AssignableReplica _replica; private float _score = 0; @@ -199,13 +184,15 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm { private final boolean _isInBaselineAssignment; private final Integer _replicaHash; - AssignableReplicaWithScore(AssignableReplica replica, ClusterModel clusterModel) { + AssignableReplicaWithScore(AssignableReplica replica, ClusterModel clusterModel, + Map<String, Integer> overallClusterRemainingCapacityMap) { _replica = replica; _isInBestPossibleAssignment = clusterModel.getContext().getBestPossibleAssignment() .containsKey(replica.getResourceName()); _isInBaselineAssignment = clusterModel.getContext().getBaselineAssignment().containsKey(replica.getResourceName()); _replicaHash = Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet()); + computeScore(overallClusterRemainingCapacityMap); } public void computeScore(Map<String, Integer> overallClusterRemainingCapMap) { @@ -288,21 +275,6 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm { } } - private AssignableReplica getNextAssignableReplica( - Set<AssignableReplicaWithScore> allReplica, - Map<String, Integer> overallClusterRemainingCapMap) { - AssignableReplicaWithScore nextAssinableReplica = null; - // Compare every replica with current candidate, update candidate if needed - for (AssignableReplicaWithScore replica : allReplica) { - replica.computeScore(overallClusterRemainingCapMap); - if (nextAssinableReplica == null || replica.compareTo(nextAssinableReplica) < 0) { - nextAssinableReplica = replica; - } - } - allReplica.remove(nextAssinableReplica); - return nextAssinableReplica.getAssignableReplica(); - } - /** * @param assignments A collection of resource replicas assignment. * @return A set of instance names that have at least one replica assigned in the input assignments.