This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new ce42376c8 WAGED n-n+1 had a bug wrt Pending message processing (#2567)
ce42376c8 is described below
commit ce42376c8db1647c04c76476d4e206ee484fb052
Author: Komal Desai <[email protected]>
AuthorDate: Thu Jul 20 20:39:56 2023 -0700
WAGED n-n+1 had a bug wrt Pending message processing (#2567)
There was a logic bug in WAGED n=n+1, fixed that and added log statements.
---
.../controller/rebalancer/waged/WagedInstanceCapacity.java | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
index ad738ce66..1b878dbb3 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
@@ -63,7 +63,7 @@ public class WagedInstanceCapacity implements
InstanceCapacityDataProvider {
// Helper methods.
// TODO: Currently, we don't allow double-accounting. But there may be
- // future scenarios, where we may want to allow.
+ // future scenarios, where we may want to allow.
private boolean hasPartitionChargedForCapacity(String instance, String
resource, String partition) {
if (!_allocatedPartitionsMap.containsKey(instance)) {
_allocatedPartitionsMap.put(instance, new HashMap<>());
@@ -124,9 +124,11 @@ public class WagedInstanceCapacity implements
InstanceCapacityDataProvider {
continue;
}
Message msg = entry.getValue();
+ // If boot-strapping message is pending, we should deduct the
capacity.
if (statePriorityMap.get(msg.getFromState()) <
statePriorityMap.get(msg.getToState())
- && msg.getToState().equals(stateModelDef.getInitialState())
- ||
msg.getToState().equals(HelixDefinedState.DROPPED.toString())) {
+ && msg.getFromState().equals(stateModelDef.getInitialState()))
{
+ LOG.info("For bootstrappiing - deducting capacity for instance:
" + instance
+ + " for resource: " + resName + " for partition: " +
partitionName);
checkAndReduceInstanceCapacity(instance, resName, partitionName,
partCapacity);
}
}
@@ -196,6 +198,8 @@ public class WagedInstanceCapacity implements
InstanceCapacityDataProvider {
String partitionName, Map<String, Integer> partitionCapacity) {
if (hasPartitionChargedForCapacity(instance, resName, partitionName)) {
+ LOG.info("Instance: " + instance + " for resource: " + resName
+ + " for partition: " + partitionName + " already charged for
capacity.");
return true;
}
@@ -205,7 +209,7 @@ public class WagedInstanceCapacity implements
InstanceCapacityDataProvider {
if (partitionCapacity.containsKey(key)) {
int partCapacity = partitionCapacity.get(key);
if (instanceCapacity.get(key) < partCapacity) {
- // reset the processed capacity.
+ // rollback the previously processed capacity.
for (String processedKey : processedCapacity.keySet()) {
instanceCapacity.put(processedKey,
instanceCapacity.get(processedKey) + processedCapacity.get(processedKey));
}
@@ -217,6 +221,8 @@ public class WagedInstanceCapacity implements
InstanceCapacityDataProvider {
}
_allocatedPartitionsMap.computeIfAbsent(instance, k -> new HashMap<>())
.computeIfAbsent(resName, k -> new HashSet<>()).add(partitionName);
+ LOG.info("Reduced capacity for instance: " + instance + " for resource: "
+ resName
+ + " for partition: " + partitionName);
return true;
}
}