This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer2
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 9ee65a0cbc74ef2b25c0bb48043a32763cb70cbb
Author: Jiajun Wang <[email protected]>
AuthorDate: Tue Sep 3 16:30:03 2019 -0700

    Record the replica objects in the AssignableNode in addition to the 
partition name (#440)
    
    The replica instances are required while the rebalance algorithm generating 
ResourceAssignment based on the AssignableNode instances.
    Refine the methods of the AssignableNode for better code style and 
readability.
    Also, modify the related test cases to verify state information and new 
methods.
---
 .../rebalancer/waged/model/AssignableNode.java     | 177 +++++++++++++--------
 .../rebalancer/waged/model/AssignableReplica.java  |  12 ++
 .../waged/model/ClusterModelProvider.java          |   2 +-
 .../rebalancer/waged/model/TestAssignableNode.java |  83 ++++++++--
 .../rebalancer/waged/model/TestClusterModel.java   |   6 +-
 .../waged/model/TestClusterModelProvider.java      |  10 +-
 6 files changed, 203 insertions(+), 87 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index e2fd676..35c3c38 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -33,6 +33,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import static java.lang.Math.max;
 
@@ -51,16 +52,19 @@ public class AssignableNode {
   private Map<String, Integer> _maxCapacity;
   private int _maxPartition; // maximum number of the partitions that can be 
assigned to the node.
 
-  // proposed assignment tracking
-  // <resource name, partition name set>
-  private Map<String, Set<String>> _currentAssignments;
-  // <resource name, top state partition name>
-  private Map<String, Set<String>> _currentTopStateAssignments;
-  // <capacity key, capacity value>
-  private Map<String, Integer> _currentCapacity;
+  // A map of <resource name, <partition name, replica>> that tracks the 
replicas assigned to the node.
+  private Map<String, Map<String, AssignableReplica>> 
_currentAssignedReplicaMap;
+  // A map of <capacity key, capacity value> that tracks the current available 
node capacity
+  private Map<String, Integer> _currentCapacityMap;
   // The maximum capacity utilization (0.0 - 1.0) across all the capacity 
categories.
   private float _highestCapacityUtilization;
 
+  /**
+   * @param clusterConfig
+   * @param instanceConfig
+   * @param instanceName
+   * @param existingAssignment A collection of replicas that have been 
pre-allocated to the node.
+   */
   AssignableNode(ClusterConfig clusterConfig, InstanceConfig instanceConfig, 
String instanceName,
       Collection<AssignableReplica> existingAssignment) {
     _instanceName = instanceName;
@@ -68,9 +72,8 @@ public class AssignableNode {
   }
 
   private void reset() {
-    _currentAssignments = new HashMap<>();
-    _currentTopStateAssignments = new HashMap<>();
-    _currentCapacity = new HashMap<>();
+    _currentAssignedReplicaMap = new HashMap<>();
+    _currentCapacityMap = new HashMap<>();
     _highestCapacityUtilization = 0;
   }
 
@@ -80,8 +83,8 @@ public class AssignableNode {
    * refreshed. This is under the assumption that the capacity mappings of 
InstanceConfig and ResourceConfig could
    * subject to change. If the assumption is no longer true, this function 
should become private.
    *
-   * @param clusterConfig  - the Cluster Config of the cluster where the node 
is located
-   * @param instanceConfig - the Instance Config of the node
+   * @param clusterConfig      - the Cluster Config of the cluster where the 
node is located
+   * @param instanceConfig     - the Instance Config of the node
    * @param existingAssignment - all the existing replicas that are current 
assigned to the node
    */
   private void refresh(ClusterConfig clusterConfig, InstanceConfig 
instanceConfig,
@@ -92,7 +95,7 @@ public class AssignableNode {
     if (instanceCapacity.isEmpty()) {
       instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap();
     }
-    _currentCapacity.putAll(instanceCapacity);
+    _currentCapacityMap.putAll(instanceCapacity);
     _faultZone = computeFaultZone(clusterConfig, instanceConfig);
     _instanceTags = new HashSet<>(instanceConfig.getTags());
     _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap();
@@ -108,78 +111,110 @@ public class AssignableNode {
    * @param assignableReplica - the replica to be assigned
    */
   void assign(AssignableReplica assignableReplica) {
-    if (!addToAssignmentRecord(assignableReplica, _currentAssignments)) {
-      throw new HelixException(String
-          .format("Resource %s already has a replica from partition %s on node 
%s",
-              assignableReplica.getResourceName(), 
assignableReplica.getPartitionName(),
-              getInstanceName()));
-    } else {
-      if (assignableReplica.isReplicaTopState()) {
-        addToAssignmentRecord(assignableReplica, _currentTopStateAssignments);
-      }
-      assignableReplica.getCapacity().entrySet().stream().forEach(
-          capacity -> updateCapacityAndUtilization(capacity.getKey(), 
capacity.getValue()));
-    }
+    addToAssignmentRecord(assignableReplica);
+    assignableReplica.getCapacity().entrySet().stream()
+        .forEach(capacity -> updateCapacityAndUtilization(capacity.getKey(), 
capacity.getValue()));
   }
 
   /**
    * Release a replica from the node.
    * If the replication is not on this node, the assignable node is not 
updated.
    *
-   * @param assignableReplica - the replica to be released
+   * @param replica - the replica to be released
    */
-  void release(AssignableReplica assignableReplica) throws 
IllegalArgumentException {
-    String resourceName = assignableReplica.getResourceName();
-    String partitionName = assignableReplica.getPartitionName();
+  void release(AssignableReplica replica) throws IllegalArgumentException {
+    String resourceName = replica.getResourceName();
+    String partitionName = replica.getPartitionName();
 
     // Check if the release is necessary
-    if (!_currentAssignments.containsKey(resourceName)) {
+    if (!_currentAssignedReplicaMap.containsKey(resourceName)) {
       LOG.warn("Resource {} is not on node {}. Ignore the release call.", 
resourceName,
           getInstanceName());
       return;
     }
-    Set<String> partitions = _currentAssignments.get(resourceName);
-    if (!partitions.contains(partitionName)) {
-      LOG.warn(String
-          .format("Resource %s does not have a replica from partition %s on 
node %s", resourceName,
-              partitionName, getInstanceName()));
+
+    Map<String, AssignableReplica> partitionMap = 
_currentAssignedReplicaMap.get(resourceName);
+    if (!partitionMap.containsKey(partitionName) || 
!partitionMap.get(partitionName)
+        .equals(replica)) {
+      LOG.warn("Replica {} is not assigned to node {}. Ignore the release 
call.",
+          replica.toString(), getInstanceName());
       return;
     }
 
-    partitions.remove(assignableReplica.getPartitionName());
-    if (assignableReplica.isReplicaTopState()) {
-      _currentTopStateAssignments.get(resourceName).remove(partitionName);
-    }
+    AssignableReplica removedReplica = partitionMap.remove(partitionName);
     // Recalculate utilization because of release
     _highestCapacityUtilization = 0;
-    assignableReplica.getCapacity().entrySet().stream()
+    removedReplica.getCapacity().entrySet().stream()
         .forEach(entry -> updateCapacityAndUtilization(entry.getKey(), -1 * 
entry.getValue()));
   }
 
-  public Map<String, Set<String>> getCurrentAssignmentsMap() {
-    return _currentAssignments;
+  /**
+   * @return A set of all assigned replicas on the node.
+   */
+  public Set<AssignableReplica> getAssignedReplicas() {
+    return _currentAssignedReplicaMap.values().stream()
+        .flatMap(replicaMap -> 
replicaMap.values().stream()).collect(Collectors.toSet());
   }
 
-  public Set<String> getCurrentAssignmentsByResource(String resource) {
-    return _currentAssignments.getOrDefault(resource, Collections.emptySet());
+  /**
+   * @return The current assignment in a map of <resource name, set of 
partition names>
+   */
+  public Map<String, Set<String>> getAssignedPartitionsMap() {
+    Map<String, Set<String>> assignmentMap = new HashMap<>();
+    for (String resourceName : _currentAssignedReplicaMap.keySet()) {
+      assignmentMap.put(resourceName, 
_currentAssignedReplicaMap.get(resourceName).keySet());
+    }
+    return assignmentMap;
   }
 
-  public Set<String> getCurrentTopStateAssignmentsByResource(String resource) {
-    return _currentTopStateAssignments.getOrDefault(resource, 
Collections.emptySet());
+  /**
+   * @param resource Resource name
+   * @return A set of the current assigned replicas' partition names in the 
specified resource.
+   */
+  public Set<String> getAssignedPartitionsByResource(String resource) {
+    return _currentAssignedReplicaMap.getOrDefault(resource, 
Collections.emptyMap()).keySet();
   }
 
-  public int getTopStateAssignmentTotalSize() {
-    return 
_currentTopStateAssignments.values().stream().mapToInt(Set::size).sum();
+  /**
+   * @param resource Resource name
+   * @return A set of the current assigned replicas' partition names with the 
top state in the specified resource.
+   */
+  public Set<String> getAssignedTopStatePartitionsByResource(String resource) {
+    return _currentAssignedReplicaMap.getOrDefault(resource, 
Collections.emptyMap()).entrySet()
+        .stream().filter(partitionEntry -> 
partitionEntry.getValue().isReplicaTopState())
+        .map(partitionEntry -> 
partitionEntry.getKey()).collect(Collectors.toSet());
   }
 
-  public int getCurrentAssignmentCount() {
-    return _currentAssignments.values().stream().mapToInt(Set::size).sum();
+  /**
+   * @return The total count of assigned top state partitions.
+   */
+  public long getAssignedTopStatePartitionsCount() {
+    return _currentAssignedReplicaMap.values().stream()
+        .flatMap(replicaMap -> replicaMap.values().stream())
+        .filter(replica -> replica.isReplicaTopState()).count();
   }
 
+  /**
+   * @return The total count of assigned replicas.
+   */
+  public long getAssignedReplicaCount() {
+    return 
_currentAssignedReplicaMap.values().stream().mapToInt(Map::size).sum();
+  }
+
+  /**
+   * @return The current available capacity.
+   */
   public Map<String, Integer> getCurrentCapacity() {
-    return _currentCapacity;
+    return _currentCapacityMap;
   }
 
+  /**
+   * Return the most concerning capacity utilization number for evenly 
partition assignment.
+   * The method dynamically returns the highest utilization number among all 
the capacity categories.
+   * For example, if the current node usage is {CPU: 0.9, MEM: 0.4, DISK: 
0.6}. Then this call shall
+   * return 0.9.
+   * @return The highest utilization number of the node among all the capacity 
category.
+   */
   public float getHighestCapacityUtilization() {
     return _highestCapacityUtilization;
   }
@@ -196,14 +231,23 @@ public class AssignableNode {
     return _faultZone;
   }
 
+  /**
+   * @return A map of <resource name, set of partition names> contains all the 
partitions that are disabled on the node.
+   */
   public Map<String, List<String>> getDisabledPartitionsMap() {
     return _disabledPartitionsMap;
   }
 
+  /**
+   * @return A map of <capacity category, capacity number> that describes the 
max capacity of the node.
+   */
   public Map<String, Integer> getMaxCapacity() {
     return _maxCapacity;
   }
 
+  /**
+   * @return The max partition count that are allowed to be allocated on the 
node.
+   */
   public int getMaxPartition() {
     return _maxPartition;
   }
@@ -268,10 +312,7 @@ public class AssignableNode {
   private void assignNewBatch(Collection<AssignableReplica> replicas) {
     Map<String, Integer> totalPartitionCapacity = new HashMap<>();
     for (AssignableReplica replica : replicas) {
-      addToAssignmentRecord(replica, _currentAssignments);
-      if (replica.isReplicaTopState()) {
-        addToAssignmentRecord(replica, _currentTopStateAssignments);
-      }
+      addToAssignmentRecord(replica);
       // increment the capacity requirement according to partition's capacity 
configuration.
       for (Map.Entry<String, Integer> capacity : 
replica.getCapacity().entrySet()) {
         totalPartitionCapacity.compute(capacity.getKey(),
@@ -287,16 +328,28 @@ public class AssignableNode {
     }
   }
 
-  private boolean addToAssignmentRecord(AssignableReplica replica,
-      Map<String, Set<String>> currentAssignments) {
-    return currentAssignments.computeIfAbsent(replica.getResourceName(), k -> 
new HashSet<>())
-        .add(replica.getPartitionName());
+  /**
+   * @throws HelixException if the replica has already been assigned to the 
node.
+   */
+  private void addToAssignmentRecord(AssignableReplica replica) {
+    String resourceName = replica.getResourceName();
+    String partitionName = replica.getPartitionName();
+    if (_currentAssignedReplicaMap.containsKey(resourceName) && 
_currentAssignedReplicaMap
+        .get(resourceName).containsKey(partitionName)) {
+      throw new HelixException(String
+          .format("Resource %s already has a replica with state %s from 
partition %s on node %s",
+              replica.getResourceName(), replica.getReplicaState(), 
replica.getPartitionName(),
+              getInstanceName()));
+    } else {
+      _currentAssignedReplicaMap.computeIfAbsent(resourceName, key -> new 
HashMap<>())
+          .put(partitionName, replica);
+    }
   }
 
   private void updateCapacityAndUtilization(String capacityKey, int 
valueToSubtract) {
-    if (_currentCapacity.containsKey(capacityKey)) {
-      int newCapacity = _currentCapacity.get(capacityKey) - valueToSubtract;
-      _currentCapacity.put(capacityKey, newCapacity);
+    if (_currentCapacityMap.containsKey(capacityKey)) {
+      int newCapacity = _currentCapacityMap.get(capacityKey) - valueToSubtract;
+      _currentCapacityMap.put(capacityKey, newCapacity);
       // For the purpose of constraint calculation, the max utilization cannot 
be larger than 100%.
       float utilization = Math.min(
           (float) (_maxCapacity.get(capacityKey) - newCapacity) / 
_maxCapacity.get(capacityKey), 1);
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index 0082a2d..ade04bf 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -107,6 +107,18 @@ public class AssignableReplica implements 
Comparable<AssignableReplica> {
     return 0;
   }
 
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (obj instanceof AssignableReplica) {
+      return compareTo((AssignableReplica) obj) == 0;
+    } else {
+      return false;
+    }
+  }
+
   public static String generateReplicaKey(String resourceName, String 
partitionName, String state) {
     return String.format("%s-%s-%s", resourceName, partitionName, state);
   }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index e0a5e35..61f5d8d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -236,7 +236,7 @@ public class ClusterModelProvider {
       Set<AssignableNode> assignableNodes) {
     Map<String, Map<String, Set<String>>> faultZoneAssignmentMap = new 
HashMap<>();
     assignableNodes.stream().forEach(node -> {
-      for (Map.Entry<String, Set<String>> resourceMap : 
node.getCurrentAssignmentsMap()
+      for (Map.Entry<String, Set<String>> resourceMap : 
node.getAssignedPartitionsMap()
           .entrySet()) {
         faultZoneAssignmentMap.computeIfAbsent(node.getFaultZone(), k -> new 
HashMap<>())
             .computeIfAbsent(resourceMap.getKey(), k -> new HashSet<>())
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index f55d0fc..34a03a9 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
@@ -48,6 +49,8 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
     ResourceControllerDataProvider testCache = setupClusterDataCache();
     Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
 
+    Set<String> expectedTopStateAssignmentSet1 = new 
HashSet<>(_partitionNames.subList(0, 1));
+    Set<String> expectedTopStateAssignmentSet2 = new 
HashSet<>(_partitionNames.subList(2, 3));
     Set<String> expectedAssignmentSet1 = new 
HashSet<>(_partitionNames.subList(0, 2));
     Set<String> expectedAssignmentSet2 = new 
HashSet<>(_partitionNames.subList(2, 4));
     Map<String, Set<String>> expectedAssignment = new HashMap<>();
@@ -60,15 +63,28 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
 
     AssignableNode assignableNode = new 
AssignableNode(testCache.getClusterConfig(),
         testCache.getInstanceConfigMap().get(_testInstanceId), 
_testInstanceId, assignmentSet);
-    
Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment));
-    Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 4);
+    Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), 
expectedAssignment);
+    Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
     Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 
20.0, 0.005);
-    
Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap));
+    Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
     Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
-    
Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap));
-    
Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap));
+    Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), 
_disabledPartitionsMap);
+    Assert.assertEquals(assignableNode.getCurrentCapacity(), 
expectedCapacityMap);
+    Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet);
+    
Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)),
+        expectedAssignmentSet1);
+    
Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(1)),
+        expectedAssignmentSet2);
+    Assert
+        
.assertEquals(assignableNode.getAssignedTopStatePartitionsByResource(_resourceNames.get(0)),
+            expectedTopStateAssignmentSet1);
+    Assert
+        
.assertEquals(assignableNode.getAssignedTopStatePartitionsByResource(_resourceNames.get(1)),
+            expectedTopStateAssignmentSet2);
+    Assert.assertEquals(assignableNode.getAssignedTopStatePartitionsCount(),
+        expectedTopStateAssignmentSet1.size() + 
expectedTopStateAssignmentSet2.size());
 
     // Test 2 - release assignment from the AssignableNode
     AssignableReplica removingReplica =
@@ -77,18 +93,39 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
     
expectedAssignment.get(_resourceNames.get(1)).remove(_partitionNames.get(2));
     expectedCapacityMap.put("item1", 9);
     expectedCapacityMap.put("item2", 18);
+    Iterator<AssignableReplica> iter = assignmentSet.iterator();
+    while (iter.hasNext()) {
+      AssignableReplica replica = iter.next();
+      if (replica.equals(removingReplica)) {
+        iter.remove();
+      }
+    }
+    expectedTopStateAssignmentSet2.remove(_partitionNames.get(2));
 
     assignableNode.release(removingReplica);
 
-    
Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment));
-    Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 3);
+    Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), 
expectedAssignment);
+    Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 3);
     Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 11.0 / 
20.0, 0.005);
-    
Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap));
+    Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
     Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
-    
Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap));
-    
Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap));
+    Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), 
_disabledPartitionsMap);
+    Assert.assertEquals(assignableNode.getCurrentCapacity(), 
expectedCapacityMap);
+    Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet);
+    
Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)),
+        expectedAssignmentSet1);
+    
Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(1)),
+        expectedAssignmentSet2);
+    Assert
+        
.assertEquals(assignableNode.getAssignedTopStatePartitionsByResource(_resourceNames.get(0)),
+            expectedTopStateAssignmentSet1);
+    Assert
+        
.assertEquals(assignableNode.getAssignedTopStatePartitionsByResource(_resourceNames.get(1)),
+            expectedTopStateAssignmentSet2);
+    Assert.assertEquals(assignableNode.getAssignedTopStatePartitionsCount(),
+        expectedTopStateAssignmentSet1.size() + 
expectedTopStateAssignmentSet2.size());
 
     // Test 3 - add assignment to the AssignableNode
     AssignableReplica addingReplica =
@@ -97,18 +134,32 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
     expectedAssignment.get(_resourceNames.get(1)).add(_partitionNames.get(2));
     expectedCapacityMap.put("item1", 4);
     expectedCapacityMap.put("item2", 8);
+    assignmentSet.add(addingReplica);
 
     assignableNode.assign(addingReplica);
 
-    
Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().equals(expectedAssignment));
-    Assert.assertEquals(assignableNode.getCurrentAssignmentCount(), 4);
+    Assert.assertEquals(assignableNode.getAssignedPartitionsMap(), 
expectedAssignment);
+    Assert.assertEquals(assignableNode.getAssignedReplicaCount(), 4);
     Assert.assertEquals(assignableNode.getHighestCapacityUtilization(), 16.0 / 
20.0, 0.005);
-    
Assert.assertTrue(assignableNode.getMaxCapacity().equals(_capacityDataMap));
+    Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap);
     Assert.assertEquals(assignableNode.getMaxPartition(), 5);
     Assert.assertEquals(assignableNode.getInstanceTags(), _testInstanceTags);
     Assert.assertEquals(assignableNode.getFaultZone(), _testFaultZoneId);
-    
Assert.assertTrue(assignableNode.getDisabledPartitionsMap().equals(_disabledPartitionsMap));
-    
Assert.assertTrue(assignableNode.getCurrentCapacity().equals(expectedCapacityMap));
+    Assert.assertEquals(assignableNode.getDisabledPartitionsMap(), 
_disabledPartitionsMap);
+    Assert.assertEquals(assignableNode.getCurrentCapacity(), 
expectedCapacityMap);
+    Assert.assertEquals(assignableNode.getAssignedReplicas(), assignmentSet);
+    
Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(0)),
+        expectedAssignmentSet1);
+    
Assert.assertEquals(assignableNode.getAssignedPartitionsByResource(_resourceNames.get(1)),
+        expectedAssignmentSet2);
+    Assert
+        
.assertEquals(assignableNode.getAssignedTopStatePartitionsByResource(_resourceNames.get(0)),
+            expectedTopStateAssignmentSet1);
+    Assert
+        
.assertEquals(assignableNode.getAssignedTopStatePartitionsByResource(_resourceNames.get(1)),
+            expectedTopStateAssignmentSet2);
+    Assert.assertEquals(assignableNode.getAssignedTopStatePartitionsCount(),
+        expectedTopStateAssignmentSet1.size() + 
expectedTopStateAssignmentSet2.size());
   }
 
   @Test
@@ -126,7 +177,7 @@ public class TestAssignableNode extends 
AbstractTestClusterModel {
     assignableNode.release(removingReplica);
   }
 
-  @Test(expectedExceptions = HelixException.class, 
expectedExceptionsMessageRegExp = "Resource Resource1 already has a replica 
from partition Partition1 on node testInstanceId")
+  @Test(expectedExceptions = HelixException.class, 
expectedExceptionsMessageRegExp = "Resource Resource1 already has a replica 
with state SLAVE from partition Partition1 on node testInstanceId")
   public void testAssignDuplicateReplica() throws IOException {
     ResourceControllerDataProvider testCache = setupClusterDataCache();
     Set<AssignableReplica> assignmentSet = generateReplicas(testCache);
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
index c07bd98..a45b729 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModel.java
@@ -63,7 +63,7 @@ public class TestClusterModel extends 
AbstractTestClusterModel {
     
Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
         .allMatch(resourceMap -> resourceMap.values().isEmpty()));
     Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
-        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+        .anyMatch(node -> node.getAssignedReplicaCount() != 0));
 
     // The initialization of the context, node and replication has been tested 
separately. So for
     // cluster model, focus on testing the assignment and release.
@@ -78,7 +78,7 @@ public class TestClusterModel extends 
AbstractTestClusterModel {
     Assert.assertTrue(
         
clusterModel.getContext().getAssignmentForFaultZoneMap().get(assignableNode.getFaultZone())
             
.get(replica.getResourceName()).contains(replica.getPartitionName()));
-    
Assert.assertTrue(assignableNode.getCurrentAssignmentsMap().get(replica.getResourceName())
+    
Assert.assertTrue(assignableNode.getAssignedPartitionsMap().get(replica.getResourceName())
         .contains(replica.getPartitionName()));
 
     // Assign a nonexist replication
@@ -109,6 +109,6 @@ public class TestClusterModel extends 
AbstractTestClusterModel {
         .allMatch(resourceMap -> resourceMap.values().stream()
             .allMatch(partitions -> partitions.isEmpty())));
     Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
-        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+        .anyMatch(node -> node.getAssignedReplicaCount() != 0));
   }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
index 638182f..1ec92a9 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java
@@ -106,7 +106,7 @@ public class TestClusterModelProvider extends 
AbstractTestClusterModel {
     
Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
         .anyMatch(resourceMap -> !resourceMap.isEmpty()));
     Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
-        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+        .anyMatch(node -> node.getAssignedReplicaCount() != 0));
     // Have all 3 instances
     Assert.assertEquals(
         
clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)
@@ -168,7 +168,7 @@ public class TestClusterModelProvider extends 
AbstractTestClusterModel {
         .allMatch(resourceMap -> resourceMap.values().stream()
             .allMatch(partitionSet -> partitionSet.size() == 2)));
     Assert.assertEquals(
-        
clusterModel.getAssignableNodes().get(_testInstanceId).getCurrentAssignmentCount(),
 4);
+        
clusterModel.getAssignableNodes().get(_testInstanceId).getAssignedReplicaCount(),
 4);
     // Since each resource has 2 replicas assigned, the assignable replica 
count should be 10.
     Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
     Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
@@ -183,7 +183,7 @@ public class TestClusterModelProvider extends 
AbstractTestClusterModel {
     
Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
         .allMatch(resourceMap -> resourceMap.isEmpty()));
     Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
-        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+        .anyMatch(node -> node.getAssignedReplicaCount() != 0));
     // Shall have 2 resources and 12 replicas
     Assert.assertEquals(clusterModel.getAssignableReplicaMap().size(), 2);
     Assert.assertTrue(clusterModel.getAssignableReplicaMap().values().stream()
@@ -211,7 +211,7 @@ public class TestClusterModelProvider extends 
AbstractTestClusterModel {
     // Only the first instance will have 2 assignment from resource2.
     for (String instance : _instances) {
       Assert
-          
.assertEquals(clusterModel.getAssignableNodes().get(instance).getCurrentAssignmentCount(),
+          
.assertEquals(clusterModel.getAssignableNodes().get(instance).getAssignedReplicaCount(),
               instance.equals(_testInstanceId) ? 2 : 0);
     }
     // Shall have 2 resources and 12 replicas
@@ -233,7 +233,7 @@ public class TestClusterModelProvider extends 
AbstractTestClusterModel {
     
Assert.assertFalse(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream()
         .anyMatch(resourceMap -> !resourceMap.isEmpty()));
     Assert.assertFalse(clusterModel.getAssignableNodes().values().stream()
-        .anyMatch(node -> node.getCurrentAssignmentCount() != 0));
+        .anyMatch(node -> node.getAssignedReplicaCount() != 0));
     // Have only 2 instances
     Assert.assertEquals(
         
clusterModel.getAssignableNodes().values().stream().map(AssignableNode::getInstanceName)

Reply via email to