This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new b6ca0f414 WAGED - Fix the intermediate hard-constraint violation 
during n=n+1 state. (#2546)
b6ca0f414 is described below

commit b6ca0f4148fb5707829af308da605cfa8bb3e63e
Author: Komal Desai <[email protected]>
AuthorDate: Tue Jul 11 23:43:56 2023 -0700

    WAGED - Fix the intermediate hard-constraint violation during n=n+1 state. 
(#2546)
    
    * WAGED - Part 2 - Fix the core calculation for n - n+1 issue.
    This change is focused on changes to core logic and has not done any
    additional code cleanup, de-dup etc.
    
    There are 3 more changes planned for the change once core logic is 
checked-in.
    Change-1:  code dedup across computeBestPossibleStateForPartition
    Change-2: moving the message procesing state to 'DataProcess' stage of the 
pipeline.
    Change-3: This is may be: to add metrics to indicate that we diverted from 
'preferred list'
    
    The core logic is as follows:
    Pre-Compute stage
    ------------------
    - in WAGED rebalancer: computeNewIdealState, we create 
'WagedInstanceCapacity' which tracks instance capacity and WeightProvider for 
partitions. These changes were introduced in Part-1.
    - We process the pending messages for the resources where if we are moving 
out partition but not yet moved, we reduce the capacity.
    
    Once we compute the ideal state, we go through preference list sorting.
    There are no changes to the pure computational change.
    
    Post-compute stage
    ------------------
    We look at the current-state as well as ideal state, we combine the list 
and do the sorting of the nodes. We then pick up only the required numReplica 
nodes.
    
    This is where we look at the nodes which are "NEW" (ie. not in 
currentState) and check if it has the required capacity to hold the new 
partition.
    If not, we remove it from combined list.
---
 1                                                  |   0
 .../InstanceCapacityDataProvider.java              |  12 +-
 .../ResourceControllerDataProvider.java            |  38 +++
 .../controller/rebalancer/AbstractRebalancer.java  |  48 +++-
 .../rebalancer/DelayedAutoRebalancer.java          |  42 +++-
 .../rebalancer/util/WagedRebalanceUtil.java        |   3 +-
 .../rebalancer/waged/WagedInstanceCapacity.java    | 180 +++++++++-----
 .../stages/CurrentStateComputationStage.java       |   9 +
 .../rebalancer/waged/TestWagedRebalancer.java      |   5 +-
 ...straint.java => TestWagedClusterExpansion.java} | 263 +++++++++++++--------
 .../TestWagedRebalanceHardConstraint.java          |   1 -
 11 files changed, 410 insertions(+), 191 deletions(-)

diff --git a/1 b/1
deleted file mode 100644
index e69de29bb..000000000
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/InstanceCapacityDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/InstanceCapacityDataProvider.java
index ce550de33..9fedcbe52 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/InstanceCapacityDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/InstanceCapacityDataProvider.java
@@ -31,7 +31,7 @@ import java.util.Map;
 public interface InstanceCapacityDataProvider {
 
  /**
-   * Get the instance remaining capacity. 
+   * Get the instance remaining capacity.
    * Capacity and weight both are represented as Key-Value.
    * Returns the capacity map of available head room for the instance.
    * @param instanceName - instance name to query
@@ -42,18 +42,10 @@ public interface InstanceCapacityDataProvider {
   /**
    * Check if partition can be placed on the instance.
    *
-   * @param instanceName - instance name 
+   * @param instanceName - instance name
    * @param partitionCapacity - Partition capacity expresed in capacity map.
    * @return boolean - True if the partition can be placed, False otherwise
    */
   public boolean isInstanceCapacityAvailable(String instanceName, Map<String, 
Integer> partitionCapacity);
 
-  /**
-   * Reduce the available capacity by specified Partition Capacity Map.
-   *
-   * @param instanceName - instance name 
-   * @param partitionCapacity - Partition capacity expresed in capacity map.
-   * @returns boolean - True if successfully updated partition capacity, false 
otherwise.
-   */
-  public boolean reduceAvailableInstanceCapacity(String instanceName, 
Map<String, Integer> partitionCapacity);
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
index 7a9b52040..efd80ff54 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java
@@ -37,6 +37,8 @@ import org.apache.helix.common.caches.CustomizedViewCache;
 import org.apache.helix.common.caches.PropertyCache;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
+import 
org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
 import org.apache.helix.controller.stages.MissingTopStateRecord;
 import org.apache.helix.model.CustomizedState;
 import org.apache.helix.model.CustomizedStateConfig;
@@ -89,6 +91,10 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
   // TODO: cause shuffling. So it is not backward compatible.
   private final Map<String, List<String>> _stablePartitionListCache = new 
HashMap<>();
 
+  // WAGED specific capacity / weight provider
+  WagedInstanceCapacity _wagedInstanceCapacity;
+  WagedResourceWeightsProvider _wagedPartitionWeightProvider;
+
   public ResourceControllerDataProvider() {
     this(AbstractDataCache.UNKNOWN_CLUSTER);
   }
@@ -475,4 +481,36 @@ public class ResourceControllerDataProvider extends 
BaseControllerDataProvider {
       }
     }
   }
+
+  /**
+   * Set the WAGED algorithm specific instance capacity provider and resource 
weight provider.
+   * @param capacityProvider - the capacity provider for instances
+   * @param resourceWeightProvider - the resource weight provider for 
partitions
+   */
+  public void setWagedCapacityProviders(WagedInstanceCapacity 
capacityProvider, WagedResourceWeightsProvider resourceWeightProvider) {
+    // WAGED specific capacity / weight provider
+    _wagedInstanceCapacity = capacityProvider;
+    _wagedPartitionWeightProvider = resourceWeightProvider;
+  }
+
+  /**
+   * Check and reduce the capacity of an instance for a resource partition
+   * @param instance - the instance to check
+   * @param resourceName - the resource name
+   * @param partition - the partition name
+   * @return true if the capacity is reduced, false otherwise
+   */
+  public boolean checkAndReduceCapacity(String instance, String resourceName, 
String partition) {
+    if (_wagedPartitionWeightProvider == null || _wagedInstanceCapacity == 
null) {
+      return true;
+    }
+    Map<String, Integer> partitionWeightMap =
+          _wagedPartitionWeightProvider.getPartitionWeights(resourceName, 
partition);
+    if (partitionWeightMap == null || partitionWeightMap.isEmpty()) {
+      return true;
+    }
+
+    return _wagedInstanceCapacity.checkAndReduceInstanceCapacity(instance, 
resourceName, partition,
+        partitionWeightMap);
+  }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index cc6f8ef95..1d8cb5d6c 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -74,9 +74,8 @@ public abstract class AbstractRebalancer<T extends 
BaseControllerDataProvider> i
   }
 
   @Override
-  public abstract IdealState computeNewIdealState(
-      String resourceName, IdealState currentIdealState, CurrentStateOutput 
currentStateOutput,
-      T clusterData);
+  public abstract IdealState computeNewIdealState(String resourceName, 
IdealState currentIdealState,
+      CurrentStateOutput currentStateOutput, T clusterData);
 
   /**
    * Compute the best state for all partitions.
@@ -91,9 +90,8 @@ public abstract class AbstractRebalancer<T extends 
BaseControllerDataProvider> i
    * @return
    */
   @Override
-  public ResourceAssignment computeBestPossiblePartitionState(
-      T cache, IdealState idealState, Resource resource,
-      CurrentStateOutput currentStateOutput) {
+  public ResourceAssignment computeBestPossiblePartitionState(T cache, 
IdealState idealState,
+      Resource resource, CurrentStateOutput currentStateOutput) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Processing resource:" + resource.getResourceName());
     }
@@ -109,7 +107,7 @@ public abstract class AbstractRebalancer<T extends 
BaseControllerDataProvider> i
           
computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), 
stateModelDef,
               preferenceList, currentStateOutput, 
disabledInstancesForPartition, idealState,
               cache.getClusterConfig(), partition,
-              cache.getAbnormalStateResolver(stateModelDefName));
+              cache.getAbnormalStateResolver(stateModelDefName), cache);
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
@@ -155,18 +153,16 @@ public abstract class AbstractRebalancer<T extends 
BaseControllerDataProvider> i
     return map;
   }
 
-  protected RebalanceStrategy<T> getRebalanceStrategy(
-      String rebalanceStrategyName, List<String> partitions, String 
resourceName,
+  protected RebalanceStrategy<T> getRebalanceStrategy(String 
rebalanceStrategyName, List<String> partitions, String resourceName,
       LinkedHashMap<String, Integer> stateCountMap, int maxPartition) {
     RebalanceStrategy rebalanceStrategy;
-    if (rebalanceStrategyName == null || rebalanceStrategyName
-        .equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY)) {
+    if (rebalanceStrategyName == null || 
rebalanceStrategyName.equalsIgnoreCase(RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY))
 {
       rebalanceStrategy =
           new AutoRebalanceStrategy(resourceName, partitions, stateCountMap, 
maxPartition);
     } else {
       try {
-        rebalanceStrategy = RebalanceStrategy.class
-            .cast(HelixUtil.loadClass(getClass(), 
rebalanceStrategyName).newInstance());
+        rebalanceStrategy = RebalanceStrategy.class.cast(
+            HelixUtil.loadClass(getClass(), 
rebalanceStrategyName).newInstance());
         rebalanceStrategy.init(resourceName, partitions, stateCountMap, 
maxPartition);
       } catch (ClassNotFoundException ex) {
         throw new HelixException(
@@ -204,9 +200,35 @@ public abstract class AbstractRebalancer<T extends 
BaseControllerDataProvider> i
       CurrentStateOutput currentStateOutput, Set<String> 
disabledInstancesForPartition,
       IdealState idealState, ClusterConfig clusterConfig, Partition partition,
       MonitoredAbnormalResolver monitoredResolver) {
+    return computeBestPossibleStateForPartition(liveInstances, stateModelDef, 
preferenceList,
+        currentStateOutput, disabledInstancesForPartition, idealState, 
clusterConfig, partition,
+        monitoredResolver, null);
+  }
+
+  /**
+   * Compute best state for partition in AUTO ideal state mode.
+   * @param liveInstances
+   * @param stateModelDef
+   * @param preferenceList
+   * @param currentStateOutput instance->state for each partition
+   * @param disabledInstancesForPartition
+   * @param idealState
+   * @param clusterConfig
+   * @param partition
+   * @param monitoredResolver
+   * @param cache
+   * @return
+   */
+  protected Map<String, String> 
computeBestPossibleStateForPartition(Set<String> liveInstances,
+      StateModelDefinition stateModelDef, List<String> preferenceList,
+      CurrentStateOutput currentStateOutput, Set<String> 
disabledInstancesForPartition,
+      IdealState idealState, ClusterConfig clusterConfig, Partition partition,
+      MonitoredAbnormalResolver monitoredResolver, T cache) {
+
     Optional<Map<String, String>> optionalOverwrittenStates =
         computeStatesOverwriteForPartition(stateModelDef, preferenceList, 
currentStateOutput,
             idealState, partition, monitoredResolver);
+
     if (optionalOverwrittenStates.isPresent()) {
       return optionalOverwrittenStates.get();
     }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 89f0a76b1..ad36b5019 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -35,6 +35,7 @@ import 
org.apache.helix.api.config.StateTransitionThrottleConfig;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import 
org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
 import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
+import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -254,6 +255,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
     String stateModelDefName = idealState.getStateModelDefRef();
     StateModelDefinition stateModelDef = 
cache.getStateModelDef(stateModelDefName);
     ResourceAssignment partitionMapping = new 
ResourceAssignment(resource.getResourceName());
+
     for (Partition partition : resource.getPartitions()) {
       Set<String> disabledInstancesForPartition =
           cache.getDisabledInstancesForPartition(resource.getResourceName(), 
partition.toString());
@@ -261,7 +263,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
       Map<String, String> bestStateForPartition =
           computeBestPossibleStateForPartition(liveNodes, stateModelDef, 
preferenceList,
               currentStateOutput, disabledInstancesForPartition, idealState, 
clusterConfig,
-              partition, cache.getAbnormalStateResolver(stateModelDefName));
+              partition, cache.getAbnormalStateResolver(stateModelDefName), 
cache);
 
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
@@ -274,12 +276,26 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
     return partitionMapping;
   }
 
+
   @Override
   protected Map<String, String> 
computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
       CurrentStateOutput currentStateOutput, Set<String> 
disabledInstancesForPartition,
       IdealState idealState, ClusterConfig clusterConfig, Partition partition,
       MonitoredAbnormalResolver monitoredResolver) {
+
+    return computeBestPossibleStateForPartition(liveInstances, stateModelDef, 
preferenceList,
+        currentStateOutput, disabledInstancesForPartition, idealState, 
clusterConfig, partition,
+        monitoredResolver, null);
+  }
+
+  @Override
+  protected Map<String, String> 
computeBestPossibleStateForPartition(Set<String> liveInstances,
+      StateModelDefinition stateModelDef, List<String> preferenceList,
+      CurrentStateOutput currentStateOutput, Set<String> 
disabledInstancesForPartition,
+      IdealState idealState, ClusterConfig clusterConfig, Partition partition,
+      MonitoredAbnormalResolver monitoredResolver, 
ResourceControllerDataProvider cache) {
+
     Optional<Map<String, String>> optionalOverwrittenStates =
         computeStatesOverwriteForPartition(stateModelDef, preferenceList, 
currentStateOutput,
             idealState, partition, monitoredResolver);
@@ -318,7 +334,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
     if (preferenceList == null) {
       preferenceList = Collections.emptyList();
     }
-
+    boolean isPreferenceListEmpty = preferenceList.isEmpty();
     int numExtraReplicas = getNumExtraReplicas(clusterConfig);
 
     // TODO : Keep the behavior consistent with existing state count, change 
back to read from idealstate
@@ -342,6 +358,26 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
     combinedPreferenceList.addAll(currentInstances);
     combinedPreferenceList.sort(new 
PreferenceListNodeComparator(currentStateMap, stateModelDef, preferenceList));
 
+    // if preference list is not empty, and we do have new intanceToAdd, we
+    // should check if it has capacity to hold the partition.
+    boolean isWaged = WagedValidationUtil.isWagedEnabled(idealState) && cache 
!= null;
+    if (isWaged && !isPreferenceListEmpty && instanceToAdd.size() > 0) {
+      // check instanceToAdd instance appears in combinedPreferenceList
+      for (String instance : instanceToAdd) {
+        if (combinedPreferenceList.contains(instance)) {
+          if (!cache.checkAndReduceCapacity(instance, 
idealState.getResourceName(),
+              partition.getPartitionName())) {
+            // if instanceToAdd instance has no capacity to hold the 
partition, we should
+            // remove it from combinedPreferenceList
+            LOG.info("Instance: {} has no capacity to hold resource: {}, 
partition: {}, removing "
+                + "it from combinedPreferenceList.", instance, 
idealState.getResourceName(),
+                partition.getPartitionName());
+            combinedPreferenceList.remove(instance);
+          }
+        }
+      }
+    }
+
     // Assign states to instances with the combined preference list.
     Map<String, String> bestPossibleStateMap =
         computeBestPossibleMap(combinedPreferenceList, stateModelDef, 
currentStateMap,
@@ -372,7 +408,6 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
         bestPossibleStateMap.put(instance, HelixDefinedState.ERROR.name());
       }
     }
-
     return bestPossibleStateMap;
   }
 
@@ -388,7 +423,6 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer<ResourceController
         return false;
       }
     }
-
     return true;
   }
 
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
index c43fda68d..7148fddec 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/WagedRebalanceUtil.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.rebalancer.util;
  */
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
@@ -61,7 +62,7 @@ public class WagedRebalanceUtil {
       ResourceConfig resourceConfig, ClusterConfig clusterConfig) {
     Map<String, Map<String, Integer>> capacityMap;
     try {
-      capacityMap = resourceConfig.getPartitionCapacityMap();
+      capacityMap = resourceConfig == null ? new HashMap<>() : 
resourceConfig.getPartitionCapacityMap();
     } catch (IOException ex) {
       throw new IllegalArgumentException(
           "Invalid partition capacity configuration of resource: " + 
resourceConfig
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
index 02c869a5a..ad738ce66 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
@@ -19,21 +19,23 @@ package org.apache.helix.controller.rebalancer.waged;
  * under the License.
  */
 
-import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
-import javax.annotation.Nullable;
+import java.util.Set;
 
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
-import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.Resource;
 import org.apache.helix.controller.dataproviders.InstanceCapacityDataProvider;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
+import org.apache.helix.model.StateModelDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,85 +44,133 @@ public class WagedInstanceCapacity implements 
InstanceCapacityDataProvider {
 
   // Available Capacity per Instance
   private final Map<String, Map<String, Integer>> _instanceCapacityMap;
-  private final ResourceControllerDataProvider _cache;
+  private final Map<String, Map<String, Set<String>>> _allocatedPartitionsMap;
 
   public WagedInstanceCapacity(ResourceControllerDataProvider clusterData) {
-    _cache = clusterData;
     _instanceCapacityMap = new HashMap<>();
-
-    ClusterConfig clusterConfig = _cache.getClusterConfig();
-    for (InstanceConfig instanceConfig : 
_cache.getInstanceConfigMap().values()) {
-      Map<String, Integer> instanceCapacity =
-        WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, 
instanceConfig);
+    _allocatedPartitionsMap = new HashMap<>();
+    ClusterConfig clusterConfig = clusterData.getClusterConfig();
+    if (clusterConfig == null) {
+      LOG.error("Cluster config is null, cannot initialize instance capacity 
map.");
+      return;
+    }
+    for (InstanceConfig instanceConfig : 
clusterData.getInstanceConfigMap().values()) {
+      Map<String, Integer> instanceCapacity = 
WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig, 
instanceConfig);
       _instanceCapacityMap.put(instanceConfig.getInstanceName(), 
instanceCapacity);
+      _allocatedPartitionsMap.put(instanceConfig.getInstanceName(), new 
HashMap<>());
     }
   }
 
-  /**
-   * Create Default Capacity Map.
-   * This is a utility method to create a default capacity map matching 
instance capacity map for participants.
-   * This is required as non-WAGED partitions will be placed on same instance 
and we don't know their actual capacity.
-   * This will generate default values of 0 for all the capacity keys.
-   */
-  private Map<String, Integer> createDefaultParticipantWeight() {
-    // copy the value of first Instance capacity.
-    Map<String, Integer> partCapacity = new 
HashMap<>(_instanceCapacityMap.values().iterator().next());
-
-    // Set the value of all capacity to -1.
-    for (String key : partCapacity.keySet()) {
-      partCapacity.put(key, -1);
+  // Helper methods.
+  // TODO: Currently, we don't allow double-accounting. But there may be
+  // future scenarios, where we may want to allow. 
+  private boolean hasPartitionChargedForCapacity(String instance, String 
resource, String partition) {
+    if (!_allocatedPartitionsMap.containsKey(instance)) {
+      _allocatedPartitionsMap.put(instance, new HashMap<>());
+      return false;
     }
-    return partCapacity;
+    return _allocatedPartitionsMap.get(instance).containsKey(resource)
+        && 
_allocatedPartitionsMap.get(instance).get(resource).contains(partition);
+  }
+
+  public void process(ResourceControllerDataProvider cache, CurrentStateOutput 
currentStateOutput,
+      Map<String, Resource> resourceMap, WagedResourceWeightsProvider 
weightProvider) {
+    processCurrentState(cache, currentStateOutput, resourceMap, 
weightProvider);
+    processPendingMessages(cache, currentStateOutput, resourceMap, 
weightProvider);
   }
 
   /**
    * Process the pending messages based on the Current states
    * @param currentState - Current state of the resources.
    */
-  public void processPendingMessages(CurrentStateOutput currentState) {
-    Map<String, Map<Partition, Map<String, Message>>> pendingMsgs = 
currentState.getPendingMessages();
+  public void processPendingMessages(ResourceControllerDataProvider cache,
+      CurrentStateOutput currentState, Map<String, Resource> resourceMap,
+      WagedResourceWeightsProvider weightProvider) {
 
-    for (String resource : pendingMsgs.keySet()) {
-      Map<Partition, Map<String, Message>> partitionMsgs = 
pendingMsgs.get(resource);
+    for (Map.Entry<String, Resource> resourceEntry : resourceMap.entrySet()) {
+      String resName = resourceEntry.getKey();
+      Resource resource = resourceEntry.getValue();
 
-      for (Partition partition : partitionMsgs.keySet()) {
-        String partitionName = partition.getPartitionName();
+      // if Resource is WAGED managed, then we need to manage the capacity.
+      if (!WagedValidationUtil.isWagedEnabled(cache.getIdealState(resName))) {
+        continue;
+      }
 
-        // Get Partition Weight
-        Map<String, Integer> partCapacity = getPartitionCapacity(resource, 
partitionName);
+      // list of partitions in the resource
+      Collection<Partition> partitions = resource.getPartitions();
+      // State model definition for the resource
+      StateModelDefinition stateModelDef = 
cache.getStateModelDef(resource.getStateModelDefRef());
+      if (stateModelDef == null) {
+        LOG.warn("State Model Definition for resource: " + resName + " is 
null");
+        continue;
+      }
+      Map<String, Integer> statePriorityMap = 
stateModelDef.getStatePriorityMap();
 
-        // TODO - check
-        Map<String, Message> msgs = partitionMsgs.get(partition);
-        // TODO - Check
-        for (String instance : msgs.keySet()) {
-           reduceAvailableInstanceCapacity(instance, partCapacity);
+      for (Partition partition : partitions) {
+        String partitionName = partition.getPartitionName();
+        // Get Partition Weight
+        Map<String, Integer> partCapacity = 
weightProvider.getPartitionWeights(resName, partitionName);
+        if (partCapacity == null || partCapacity.isEmpty()) {
+          LOG.info("Partition: " + partitionName + " in resource: " + resName
+              + " has no weight specified. Skipping it.");
+          continue;
+        }
+        // Get the pending messages for the partition
+        Map<String, Message> pendingMessages = 
currentState.getPendingMessageMap(resName, partition);
+        if (pendingMessages != null && !pendingMessages.isEmpty()) {
+          for (Map.Entry<String, Message> entry :  pendingMessages.entrySet()) 
{
+            String instance = entry.getKey();
+            if (hasPartitionChargedForCapacity(instance, resName, 
partitionName)) {
+              continue;
+            }
+            Message msg = entry.getValue();
+            if (statePriorityMap.get(msg.getFromState()) < 
statePriorityMap.get(msg.getToState())
+                && msg.getToState().equals(stateModelDef.getInitialState())
+                || 
msg.getToState().equals(HelixDefinedState.DROPPED.toString())) {
+              checkAndReduceInstanceCapacity(instance, resName, partitionName, 
partCapacity);
+            }
+          }
         }
       }
     }
   }
 
-  /**
-   * Get the partition capacity given Resource and Partition name.
-   */
-  private Map<String, Integer> getPartitionCapacity(String resource, String 
partition) {
-    ClusterConfig clusterConfig = _cache.getClusterConfig();
-    ResourceConfig resourceConfig = _cache.getResourceConfig(resource);
+  private void processCurrentState(ResourceControllerDataProvider cache,
+      CurrentStateOutput currentStateOutput, Map<String, Resource> resourceMap,
+      WagedResourceWeightsProvider weightProvider) {
+
+    // Iterate through all the resources
+    for (Map.Entry<String, Resource> entry : resourceMap.entrySet()) {
+      String resName = entry.getKey();
+      Resource resource = entry.getValue();
+
+      // if Resource is WAGED managed, then we need to manage the capacity.
+      if (!WagedValidationUtil.isWagedEnabled(cache.getIdealState(resName))) {
+        continue;
+      }
 
+      // list of partitions in the resource
+      Collection<Partition> partitions = resource.getPartitions();
 
-    // Parse the entire capacityMap from ResourceConfig
-    Map<String, Map<String, Integer>> capacityMap;
-    try {
-      capacityMap = resourceConfig.getPartitionCapacityMap();
-    } catch (IOException ex) {
-      return createDefaultParticipantWeight();
+      for (Partition partition : partitions) {
+        String partitionName = partition.getPartitionName();
+        // Get Partition Weight
+        Map<String, Integer> partCapacity = 
weightProvider.getPartitionWeights(resName, partitionName);
+        // Get the current state for the partition
+        Map<String, String> currentStateMap = 
currentStateOutput.getCurrentStateMap(resName, partition);
+        if (currentStateMap != null && !currentStateMap.isEmpty()) {
+          for (String instance : currentStateMap.keySet()) {
+            checkAndReduceInstanceCapacity(instance, resName, partitionName, 
partCapacity);
+          }
+        }
+      }
     }
-    return WagedValidationUtil.validateAndGetPartitionCapacity(partition, 
resourceConfig, capacityMap, clusterConfig);
   }
 
   /**
    * Get the instance remaining capacity.
    * Capacity and weight both are represented as Key-Value.
-   * Returns the capacity map of available head room for the instance.
+   * Returns the capacity map of available headroom for the instance.
    * @param instanceName - instance name to query
    * @return Map<String, Integer> - capacity pair for all defined attributes 
for the instance.
    */
@@ -141,20 +191,32 @@ public class WagedInstanceCapacity implements 
InstanceCapacityDataProvider {
     return true;
   }
 
-  @Override
-  public boolean reduceAvailableInstanceCapacity(String instance, Map<String, 
Integer> partitionCapacity) {
+
+  public synchronized boolean checkAndReduceInstanceCapacity(String instance, 
String resName,
+      String partitionName, Map<String, Integer> partitionCapacity) {
+
+    if (hasPartitionChargedForCapacity(instance, resName, partitionName)) {
+      return true;
+    }
+
     Map<String, Integer> instanceCapacity = _instanceCapacityMap.get(instance);
+    Map<String, Integer> processedCapacity = new HashMap<>();
     for (String key : instanceCapacity.keySet()) {
       if (partitionCapacity.containsKey(key)) {
-        int partCapacity = partitionCapacity.getOrDefault(key, 0);
-        if (partCapacity != 0 && instanceCapacity.get(key) < partCapacity) {
+        int partCapacity = partitionCapacity.get(key);
+        if (instanceCapacity.get(key) < partCapacity) {
+          // reset the processed capacity.
+          for (String processedKey : processedCapacity.keySet()) {
+            instanceCapacity.put(processedKey, 
instanceCapacity.get(processedKey) + processedCapacity.get(processedKey));
+          }
           return false;
         }
-        if (partCapacity != 0) {
-          instanceCapacity.put(key, instanceCapacity.get(key) - partCapacity);
-        }
+        instanceCapacity.put(key, instanceCapacity.get(key) - partCapacity);
+        processedCapacity.put(key, partCapacity);
       }
     }
+    _allocatedPartitionsMap.computeIfAbsent(instance, k -> new HashMap<>())
+        .computeIfAbsent(resName, k -> new HashSet<>()).add(partitionName);
     return true;
   }
 }
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index c2af5d156..4eb4004af 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -35,6 +35,8 @@ import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.rebalancer.util.ResourceUsageCalculator;
 import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
+import org.apache.helix.controller.rebalancer.waged.WagedInstanceCapacity;
+import 
org.apache.helix.controller.rebalancer.waged.WagedResourceWeightsProvider;
 import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
@@ -110,6 +112,13 @@ public class CurrentStateComputationStage extends 
AbstractBaseStage {
           currentStateOutput);
       
reportResourcePartitionCapacityMetrics(dataProvider.getAsyncTasksThreadPool(),
           clusterStatusMonitor, dataProvider.getResourceConfigMap().values());
+
+      WagedInstanceCapacity capacityProvider = new 
WagedInstanceCapacity(dataProvider);
+      WagedResourceWeightsProvider weightProvider = new 
WagedResourceWeightsProvider(dataProvider);
+
+      // Process the currentState and update the available instance capacity.
+      capacityProvider.process(dataProvider, currentStateOutput, resourceMap, 
weightProvider);
+      dataProvider.setWagedCapacityProviders(capacityProvider, weightProvider);
     }
   }
 
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 2836af745..fff248282 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -139,6 +139,9 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
     when(clusterData.getRefreshedChangeTypes())
         
.thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
 
+    when(clusterData.checkAndReduceCapacity(Mockito.any(), Mockito.any(),
+        Mockito.any())).thenReturn(true);
+
     Map<String, IdealState> newIdealStates =
         rebalancer.computeNewIdealStates(clusterData, resourceMap, new 
CurrentStateOutput());
     Map<String, ResourceAssignment> algorithmResult = 
_algorithm.getRebalanceResult();
@@ -894,7 +897,7 @@ public class TestWagedRebalancer extends 
AbstractTestClusterModel {
           return resource;
         }));
     WagedInstanceCapacity provider = new WagedInstanceCapacity(clusterData);
-   
+
     Map<String, Integer> weights1 = Map.of("item1", 20, "item2", 40, "item3", 
30);
     Map<String, Integer> capacity = 
provider.getInstanceAvailableCapacity("testInstanceId");
     
Assert.assertEquals(provider.getInstanceAvailableCapacity("testInstanceId"), 
weights1);
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceHardConstraint.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansion.java
similarity index 56%
copy from 
helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceHardConstraint.java
copy to 
helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansion.java
index 07f39f966..4778d9b1e 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceHardConstraint.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedClusterExpansion.java
@@ -30,7 +30,7 @@ import java.util.Set;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.TestHelper;
+import org.apache.helix.NotificationContext;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
 import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -39,14 +39,15 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBucketDataAccessor;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.CurrentState;
-import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceAssignment;
 import org.apache.helix.model.ResourceConfig;
-import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
-import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -65,13 +66,13 @@ import org.testng.annotations.Test;
  * cluster and increasing the weight for some resource and checking if
  * intermediate state satisfies the need or not.
  */
-public class TestWagedRebalanceHardConstraint extends ZkTestBase {
+public class TestWagedClusterExpansion extends ZkTestBase {
   protected final int NUM_NODE = 6;
   protected static final int START_PORT = 13000;
   protected static final int PARTITIONS = 10;
 
-  protected final String CLASS_NAME = getShortClassName();
-  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected static final String CLASS_NAME = 
TestWagedClusterExpansion.class.getSimpleName();
+  protected static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + 
CLASS_NAME;
   protected ClusterControllerManager _controller;
   protected AssignmentMetadataStore _assignmentMetadataStore;
 
@@ -80,12 +81,85 @@ public class TestWagedRebalanceHardConstraint extends 
ZkTestBase {
   List<String> _nodes = new ArrayList<>();
   private final Set<String> _allDBs = new HashSet<>();
   private final int _replica = 3;
-
+  private final int INSTANCE_CAPACITY = 100;
+  private final int DEFAULT_PARTITION_CAPACITY = 6;
+  private final int INCREASED_PARTITION_CAPACITY = 10;
+  private final int DEFAULT_DELAY = 500; // 0.5 second
   private final String  _testCapacityKey = "TestCapacityKey";
+  private final String  _resourceChanged = "Test-WagedDB-0";
   private final Map<String, IdealState> _prevIdealState = new HashMap<>();
 
-  private final Map<String,List<String>> _instanceUsage = new HashMap<>();
-  private static final Logger LOG = 
LoggerFactory.getLogger("TestWagedRebalanceHardConstraint");
+  private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+
+
+  // mock delay master-slave state model
+  @StateModelInfo(initialState = "OFFLINE", states = {
+      "MASTER", "SLAVE", "ERROR"
+  })
+  public static class WagedMasterSlaveModel extends StateModel {
+    private static final Logger LOG = 
LoggerFactory.getLogger(WagedMasterSlaveModel.class);
+    private final long _delay;
+
+    public WagedMasterSlaveModel(long delay) {
+      _delay = delay;
+    }
+
+    @Transition(to = "SLAVE", from = "OFFLINE")
+    public void onBecomeSlaveFromOffline(Message message, NotificationContext 
context) {
+      LOG.info("Become SLAVE from OFFLINE");
+    }
+
+    @Transition(to = "MASTER", from = "SLAVE")
+    public void onBecomeMasterFromSlave(Message message, NotificationContext 
context)
+        throws InterruptedException {
+      LOG.info("Become MASTER from SLAVE");
+    }
+
+    @Transition(to = "SLAVE", from = "MASTER")
+    public void onBecomeSlaveFromMaster(Message message, NotificationContext 
context) {
+      LOG.info("Become Slave from Master");
+    }
+
+    @Transition(to = "OFFLINE", from = "SLAVE")
+    public void onBecomeOfflineFromSlave(Message message, NotificationContext 
context) {
+      if (_delay > 0) {
+        try {
+          Thread.currentThread().sleep(_delay);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+       }
+      LOG.info("Become OFFLINE from SLAVE");
+    }
+
+    @Transition(to = "DROPPED", from = "OFFLINE")
+    public void onBecomeDroppedFromOffline(Message message, 
NotificationContext context) {
+      if (_delay > 0) {
+        try {
+          Thread.currentThread().sleep(_delay);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+      }
+      LOG.info("Become DROPPED FROM OFFLINE");
+    }
+  }
+
+  public class WagedDelayMSStateModelFactory extends 
StateModelFactory<WagedMasterSlaveModel> {
+    private long _delay;
+
+    @Override
+    public WagedMasterSlaveModel createNewStateModel(String resourceName,
+        String partitionKey) {
+      WagedMasterSlaveModel model = new WagedMasterSlaveModel(_delay);
+      return model;
+    }
+
+    public WagedDelayMSStateModelFactory setDelay(long delay) {
+      _delay = delay;
+      return this;
+    }
+  }
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -98,13 +172,8 @@ public class TestWagedRebalanceHardConstraint extends 
ZkTestBase {
       _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
       _nodes.add(storageNodeName);
     }
-
-    // start dummy participants
-    for (String node : _nodes) {
-      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, 
CLUSTER_NAME, node);
-      participant.syncStart();
-      _participants.add(participant);
-    }
+    // ST downward message will get delayed by 5sec.
+    startParticipants(DEFAULT_DELAY);
 
     // start controller
     String controllerName = CONTROLLER_PREFIX + "_0";
@@ -113,9 +182,6 @@ public class TestWagedRebalanceHardConstraint extends 
ZkTestBase {
 
     enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
 
-    // This is same as TestWagedRebalance
-    // It's a hacky way to workaround the package restriction. Note that we 
still want to hide the
-    // AssignmentMetadataStore constructor to prevent unexpected update to the 
assignment records.
     _assignmentMetadataStore =
         new AssignmentMetadataStore(new ZkBucketDataAccessor(ZK_ADDR), 
CLUSTER_NAME) {
           public Map<String, ResourceAssignment> getBaseline() {
@@ -137,16 +203,10 @@ public class TestWagedRebalanceHardConstraint extends 
ZkTestBase {
         dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
 
     
clusterConfig.setInstanceCapacityKeys(Collections.singletonList(_testCapacityKey));
-    
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(_testCapacityKey,
 100));
-    // Calculation is: 6 nodes, total capacity 600
-    // 3 resources * 10 partitions each with 3 replica = 90.
-    // Total capacity is 600, so each partition should be 6 units.
-    
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(_testCapacityKey,
 6));
+    
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap(_testCapacityKey,
 INSTANCE_CAPACITY));
+    
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap(_testCapacityKey,
 DEFAULT_PARTITION_CAPACITY));
     dataAccessor.setProperty(dataAccessor.keyBuilder().clusterConfig(), 
clusterConfig);
-  }
 
-  @Test
-  public void testInitialPlacement() {
     // Create 3 resources with 10 partitions each.
     for (int i = 0; i < 3; i++) {
       String db = "Test-WagedDB-" + i;
@@ -155,10 +215,24 @@ public class TestWagedRebalanceHardConstraint extends 
ZkTestBase {
       _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
       _allDBs.add(db);
     }
-    validate();
   }
 
-  @Test(dependsOnMethods = { "testInitialPlacement"})
+  private void startParticipants(int delay) {
+    // start dummy participants
+    for (String node : _nodes) {
+      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, 
CLUSTER_NAME, node);
+      StateMachineEngine stateMach = participant.getStateMachineEngine();
+      TestWagedClusterExpansion.WagedDelayMSStateModelFactory delayFactory =
+          new 
TestWagedClusterExpansion.WagedDelayMSStateModelFactory().setDelay(delay);
+      stateMach.registerStateModelFactory("MasterSlave", delayFactory);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+  }
+
+  // This test case, first adds a new instance which will cause STs.
+  // Next, it will try to increase the default weight for one resource.
+  @Test
   public void testIncreaseResourcePartitionWeight() throws Exception {
     // For this test, let us record our initial prevIdealState.
     for (String db : _allDBs) {
@@ -171,132 +245,117 @@ public class TestWagedRebalanceHardConstraint extends 
ZkTestBase {
     String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + 
NUM_NODE);
     _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
     _nodes.add(storageNodeName);
+
+    // Start the participant.
     MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, 
CLUSTER_NAME, storageNodeName);
+    StateMachineEngine stateMach = participant.getStateMachineEngine();
+    TestWagedClusterExpansion.WagedDelayMSStateModelFactory delayFactory =
+        new 
TestWagedClusterExpansion.WagedDelayMSStateModelFactory().setDelay(DEFAULT_DELAY);
+    stateMach.registerStateModelFactory("MasterSlave", delayFactory);
     participant.syncStart();
     _participants.add(participant);
 
-    // This is to make sure we have run the pipeline.
-    Thread.currentThread().sleep(2000);
+    // Check modified time for external view of the first resource.
+    // if pipeline is run, then external view would be persisted.
+    waitForPipeline(100, 3000);
 
     LOG.info("After adding the new instance");
-    validateIdealState(false);
+    validateIdealState(false /* afterWeightChange */);
 
     // Update the weight for one of the resource.
     HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, 
_baseAccessor);
-    String db = "Test-WagedDB-0";
+    String db = _resourceChanged;
     ResourceConfig resourceConfig = 
dataAccessor.getProperty(dataAccessor.keyBuilder().resourceConfig(db));
     if (resourceConfig == null) {
       resourceConfig = new ResourceConfig(db);
     }
-    Map<String, Integer> capacityDataMap = ImmutableMap.of(_testCapacityKey, 
10);
+    Map<String, Integer> capacityDataMap = ImmutableMap.of(_testCapacityKey, 
INCREASED_PARTITION_CAPACITY);
     resourceConfig.setPartitionCapacityMap(
         Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, 
capacityDataMap));
     dataAccessor.setProperty(dataAccessor.keyBuilder().resourceConfig(db), 
resourceConfig);
 
     // Make sure pipeline is run.
-    Thread.currentThread().sleep(2000);
+    waitForPipeline(100, 10000); // 10 sec. max timeout.
 
     LOG.info("After changing resource partition weight");
-    validate();
-    validateIdealState(true);
-    printCurrentState();
+    validateIdealState(true /* afterWeightChange */);
+    waitForPipeline(100, 3000); // this is for ZK to sync up.
   }
 
   @AfterClass
   public void afterClass() throws Exception {
-    if (_controller != null && _controller.isConnected()) {
-      _controller.syncStop();
-    }
-    for (MockParticipantManager p : _participants) {
-      if (p != null && p.isConnected()) {
-        p.syncStop();
-      }
-    }
-    deleteCluster(CLUSTER_NAME);
-  }
-
-  private void validate() {
-    HelixClusterVerifier _clusterVerifier =
-        new 
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-            .setDeactivatedNodeAwareness(true).setResources(_allDBs)
-            
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
-            .build();
     try {
-      Assert.assertTrue(_clusterVerifier.verify(5000));
-    } finally {
-      _clusterVerifier.close();
+      if (_controller != null && _controller.isConnected()) {
+        _controller.syncStop();
+      }
+      for (MockParticipantManager p : _participants) {
+        if (p != null && p.isConnected()) {
+          p.syncStop();
+        }
+      }
+      deleteCluster(CLUSTER_NAME);
+    } catch (Exception e) {
+      LOG.info("After class throwing exception, {}", e);
     }
   }
 
-  private void printCurrentState() {
-    List<String> instances =
-        
_gSetupTool.getClusterManagementTool().getInstancesInCluster(CLUSTER_NAME);
-    HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, 
_baseAccessor);
-
-    for (String instance : instances) {
-      LiveInstance liveInstance =
-          
dataAccessor.getProperty(dataAccessor.keyBuilder().liveInstance(instance));
-      String sessionId = liveInstance.getEphemeralOwner();
-      List<CurrentState> currentStates = 
dataAccessor.getChildValues(dataAccessor.keyBuilder().currentStates(instance, 
sessionId),
-          true);
-      LOG.info("\n\nCurrentState for instance: " + instance);
-      for (CurrentState currentState : currentStates) {
-          LOG.info("\t" + 
currentState.getRecord().getMapFields().keySet().toString());
+  private void waitForPipeline(long stepSleep, long maxTimeout) {
+    // Check modified time for external view of the first resource.
+    // if pipeline is run, then external view would be persisted.
+    long startTime = System.currentTimeMillis();
+    while (System.currentTimeMillis() - startTime < maxTimeout) {
+      String db = _allDBs.iterator().next();
+      long modifiedTime = _gSetupTool.getClusterManagementTool().
+          getResourceExternalView(CLUSTER_NAME, 
db).getRecord().getModifiedTime();
+      if (modifiedTime - startTime > maxTimeout) {
+        break;
+      }
+      try {
+        Thread.currentThread().sleep(stepSleep);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
       }
     }
   }
+
   private void validateIdealState(boolean afterWeightChange) {
     // Calculate the instance to partition mapping based on previous and new 
ideal state.
-    // We will take the union of the two.
-    // For example: if prevIdealState for instance_0 has partition_0, 
partition_1
-    // and newIdealState for instance_0 has partition_1, partition_2, then the 
final
-    // mapping for instance_0 will be partition_0, partition_1, partition_2.
 
     Map<String, Set<String>> instanceToPartitionMap = new HashMap<>();
     for (String db : _allDBs) {
       IdealState newIdealState =
           
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
-      IdealState prevIdealState = _prevIdealState.get(db);
-
       for (String partition : newIdealState.getPartitionSet()) {
         Map<String, String> assignmentMap = 
newIdealState.getRecord().getMapField(partition);
+        List<String> preferenceList = 
newIdealState.getRecord().getListField(partition);
         for (String instance : assignmentMap.keySet()) {
+          if (!preferenceList.contains(instance)) {
+            LOG.error("Instance: " + instance + " is not in preference list 
for partition: " + partition);
+          }
           if (!instanceToPartitionMap.containsKey(instance)) {
             instanceToPartitionMap.put(instance, new HashSet<>());
           }
           instanceToPartitionMap.get(instance).add(partition);
         }
       }
-      if (prevIdealState != null) {
-        for (String partition : prevIdealState.getPartitionSet()) {
-          Map<String, String> assignmentMap = 
prevIdealState.getRecord().getMapField(partition);
-          for (String instance : assignmentMap.keySet()) {
-            if (!instanceToPartitionMap.containsKey(instance)) {
-              instanceToPartitionMap.put(instance, new HashSet<>());
-            }
-            instanceToPartitionMap.get(instance).add(partition);
-          }
-        }
-      }
-      _prevIdealState.put(db, newIdealState);
     }
+    // Now, let us validate the instance to partition mapping.
     for (String instance : instanceToPartitionMap.keySet()) {
       int usedInstanceCapacity = 0;
       for (String partition : instanceToPartitionMap.get(instance)) {
         LOG.info("\tPartition: " + partition);
-        if (partition.startsWith("Test-WagedDB-0")) {
-          if (afterWeightChange) {
-            usedInstanceCapacity += 10;
-          } else {
-            usedInstanceCapacity += 6;
-          }
+        if (afterWeightChange && partition.startsWith(_resourceChanged)) {
+            usedInstanceCapacity += INCREASED_PARTITION_CAPACITY;
         } else {
-          usedInstanceCapacity += 6;
+          usedInstanceCapacity += DEFAULT_PARTITION_CAPACITY;
         }
       }
-      LOG.info("\tIntance: " + instance + " used capacity: " + 
usedInstanceCapacity);
+      LOG.error("\tInstance: " + instance + " used capacity: " + 
usedInstanceCapacity);
       // For now, this has to be disabled as this test case is negative 
scenario.
-      // Assert.assertTrue(usedInstanceCapacity <= 100);
+      if (usedInstanceCapacity > INSTANCE_CAPACITY) {
+        LOG.error(instanceToPartitionMap.get(instance).toString());
+      }
+      Assert.assertTrue(usedInstanceCapacity <= INSTANCE_CAPACITY);
     }
   }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceHardConstraint.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceHardConstraint.java
index 07f39f966..b320f7a83 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceHardConstraint.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceHardConstraint.java
@@ -197,7 +197,6 @@ public class TestWagedRebalanceHardConstraint extends 
ZkTestBase {
     Thread.currentThread().sleep(2000);
 
     LOG.info("After changing resource partition weight");
-    validate();
     validateIdealState(true);
     printCurrentState();
   }

Reply via email to