This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 84d89efaf Fix CurrentStateComputationStage when mis configure(#2635)
84d89efaf is described below
commit 84d89efafa389ed1fa9449bf9c4708bcc3c72a5b
Author: xyuanlu <[email protected]>
AuthorDate: Wed Sep 27 18:31:42 2023 -0700
Fix CurrentStateComputationStage when mis configure(#2635)
---
.../controller/rebalancer/waged/WagedInstanceCapacity.java | 12 ++++++++++--
.../controller/stages/CurrentStateComputationStage.java | 2 ++
.../stages/TestCurrentStateComputationStage.java | 14 +++++++++++++-
3 files changed, 25 insertions(+), 3 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
index 29ecf451d..cd19c301c 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedInstanceCapacity.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.HelixException;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
@@ -54,8 +55,15 @@ public class WagedInstanceCapacity implements
InstanceCapacityDataProvider {
return;
}
for (InstanceConfig instanceConfig :
clusterData.getInstanceConfigMap().values()) {
- Map<String, Integer> instanceCapacity =
WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig,
instanceConfig);
- _instanceCapacityMap.put(instanceConfig.getInstanceName(),
instanceCapacity);
+ Map<String, Integer> instanceCapacity = null;
+ try {
+ instanceCapacity =
WagedValidationUtil.validateAndGetInstanceCapacity(clusterConfig,
instanceConfig);
+ } catch (HelixException ignore) {
+ // We don't want to throw exception here, it would be OK if no
resource is using Waged.
+ // Waged rebalancer will fail in later pipeline stage only for waged
resource. So it won't block other resources.
+ }
+ _instanceCapacityMap.put(instanceConfig.getInstanceName(),
+ instanceCapacity == null ? new HashMap<>() : instanceCapacity);
_allocatedPartitionsMap.put(instanceConfig.getInstanceName(), new
HashMap<>());
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 4eb4004af..51abca36b 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -113,6 +113,8 @@ public class CurrentStateComputationStage extends
AbstractBaseStage {
reportResourcePartitionCapacityMetrics(dataProvider.getAsyncTasksThreadPool(),
clusterStatusMonitor, dataProvider.getResourceConfigMap().values());
+ // TODO: we only need to compute when there are resource using Waged. We
should
+ // do this as perf improvement in future.
WagedInstanceCapacity capacityProvider = new
WagedInstanceCapacity(dataProvider);
WagedResourceWeightsProvider weightProvider = new
WagedResourceWeightsProvider(dataProvider);
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index 91e275aaf..f66638ec6 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -21,9 +21,13 @@ package org.apache.helix.controller.stages;
import java.util.HashMap;
import java.util.Map;
+import java.util.List;
import org.apache.helix.PropertyKey.Builder;
import
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.model.CurrentState;
@@ -40,9 +44,17 @@ public class TestCurrentStateComputationStage extends
BaseStageTest {
Map<String, Resource> resourceMap = getResourceMap();
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
resourceMap);
- event.addAttribute(AttributeName.ControllerDataProvider.name(), new
ResourceControllerDataProvider());
+ ResourceControllerDataProvider dataCache = new
ResourceControllerDataProvider();
+ event.addAttribute(AttributeName.ControllerDataProvider.name(), dataCache);
+ event.addAttribute(AttributeName.clusterStatusMonitor.name(), new
ClusterStatusMonitor(_clusterName));
CurrentStateComputationStage stage = new CurrentStateComputationStage();
runStage(event, new ReadClusterDataStage());
+ ClusterConfig clsCfg = dataCache.getClusterConfig();
+ clsCfg.setInstanceCapacityKeys(List.of("s1", "s2", "s3"));
+ dataCache.setClusterConfig(clsCfg);
+ dataCache.setInstanceConfigMap(Map.of(
+ "a", new InstanceConfig("a")
+ ));
runStage(event, stage);
CurrentStateOutput output =
event.getAttribute(AttributeName.CURRENT_STATE.name());
AssertJUnit.assertEquals(