This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 431a096 Add getIdealAssignmentForWagedFullAuto in HelixUtil for WAGED
rebalancer (#1031)
431a096 is described below
commit 431a0961c0366ddfaac7dfc08e6c1b0bcd26f87b
Author: Hunter Lee <[email protected]>
AuthorDate: Fri May 29 09:20:58 2020 -0700
Add getIdealAssignmentForWagedFullAuto in HelixUtil for WAGED rebalancer
(#1031)
This commit adds a method, getIdealAssignmentForWagedFullAuto() in
HelixUtil that returns to the user the cluster-wide assignment result obtained
from running a rebalance using WAGED. The user will be able to use this method
to predict how Helix will be rebalancing resources using the WAGED rebalancer.
---
.../dataproviders/BaseControllerDataProvider.java | 8 ++
.../rebalancer/waged/ReadOnlyWagedRebalancer.java | 88 ++++++++++++++++++++
.../helix/manager/zk/ZkBucketDataAccessor.java | 5 +-
.../BestPossibleExternalViewVerifier.java | 89 +++-----------------
.../main/java/org/apache/helix/util/HelixUtil.java | 96 ++++++++++++++++++++++
.../java/org/apache/helix/util/RebalanceUtil.java | 24 ++++++
.../helix/util/WeightAwareRebalanceUtil.java | 2 +-
.../WagedRebalancer/TestWagedRebalance.java | 50 +++++++++++
.../helix/integration/task/TaskTestUtil.java | 11 +--
9 files changed, 282 insertions(+), 91 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index 51b2e80..a24ea46 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -601,6 +601,14 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
return _resourceConfigCache.getPropertyMap();
}
+ /**
+ * Sets the resource config map
+ * @param resourceConfigMap
+ */
+ public void setResourceConfigMap(Map<String, ResourceConfig>
resourceConfigMap) {
+ _resourceConfigCache.setPropertyMap(resourceConfigMap);
+ }
+
public ResourceConfig getResourceConfig(String resource) {
return _resourceConfigCache.getPropertyByName(resource);
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
new file mode 100644
index 0000000..eccb175
--- /dev/null
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -0,0 +1,88 @@
+package org.apache.helix.controller.rebalancer.waged;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.helix.HelixRebalanceException;
+import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import
org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.manager.zk.ZkBucketDataAccessor;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+
+
+/**
+ * This rebalancer is a version of WagedRebalancer that only reads the
existing assignment metadata
+ * to compute the best possible assignment but never writes back the resulting
assignment metadata
+ * from global or partial rebalance. It does so by using a modified version of
+ * AssignmentMetadataStore, ReadOnlyAssignmentMetadataStore.
+ *
+ * This class is to be used in the cluster verifiers, tests, or util methods.
+ */
+public class ReadOnlyWagedRebalancer extends WagedRebalancer {
+ public ReadOnlyWagedRebalancer(String metadataStoreAddress, String
clusterName,
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
+ super(new ReadOnlyAssignmentMetadataStore(metadataStoreAddress,
clusterName),
+ ConstraintBasedAlgorithmFactory.getInstance(preferences),
Optional.empty());
+ }
+
+ @Override
+ protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
+ ResourceControllerDataProvider clusterData, Map<String, Resource>
resourceMap,
+ Set<String> activeNodes, CurrentStateOutput currentStateOutput,
RebalanceAlgorithm algorithm)
+ throws HelixRebalanceException {
+ return getBestPossibleAssignment(getAssignmentMetadataStore(),
currentStateOutput,
+ resourceMap.keySet());
+ }
+
+ private static class ReadOnlyAssignmentMetadataStore extends
AssignmentMetadataStore {
+ ReadOnlyAssignmentMetadataStore(String metadataStoreAddress, String
clusterName) {
+ super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName);
+ }
+
+ @Override
+ public boolean persistBaseline(Map<String, ResourceAssignment>
globalBaseline) {
+ // If baseline hasn't changed, skip updating the metadata store
+ if (compareAssignments(_globalBaseline, globalBaseline)) {
+ return false;
+ }
+ // Update the in-memory reference only
+ _globalBaseline = globalBaseline;
+ return true;
+ }
+
+ @Override
+ public boolean persistBestPossibleAssignment(
+ Map<String, ResourceAssignment> bestPossibleAssignment) {
+ // If bestPossibleAssignment hasn't changed, skip updating the metadata
store
+ if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment))
{
+ return false;
+ }
+ // Update the in-memory reference only
+ _bestPossibleAssignment = bestPossibleAssignment;
+ return true;
+ }
+ }
+}
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index f61fb50..2eda09b 100644
---
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -19,7 +19,6 @@ package org.apache.helix.manager.zk;
* under the License.
*/
-import com.google.common.collect.ImmutableMap;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
@@ -30,18 +29,20 @@ import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
import org.apache.helix.AccessOption;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
-import org.apache.helix.zookeeper.util.GZipCompressionUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.util.GZipCompressionUtil;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
diff --git
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index aa663bc..6fc833b 100644
---
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -27,37 +27,27 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import org.apache.helix.HelixDefinedState;
-import org.apache.helix.HelixRebalanceException;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.common.PartitionStateMap;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
-import org.apache.helix.controller.pipeline.Stage;
-import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
-import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
-import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
-import
org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
+import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.manager.zk.ZkBucketDataAccessor;
-import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
-import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.task.TaskConstants;
+import org.apache.helix.util.RebalanceUtil;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -402,7 +392,7 @@ public class BestPossibleExternalViewVerifier extends
ZkHelixClusterVerifier {
ClusterEvent event = new ClusterEvent(ClusterEventType.StateVerifier);
event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
- runStage(event, new ResourceComputationStage());
+ RebalanceUtil.runStage(event, new ResourceComputationStage());
if (resources != null && !resources.isEmpty()) {
// Filtering out all non-required resources
@@ -416,85 +406,26 @@ public class BestPossibleExternalViewVerifier extends
ZkHelixClusterVerifier {
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
resourceMapToRebalance);
}
- runStage(event, new CurrentStateComputationStage());
- // Note the dryrunWagedRebalancer is just for one time usage
- DryrunWagedRebalancer dryrunWagedRebalancer =
- new DryrunWagedRebalancer(_zkClient.getServers(),
cache.getClusterName(),
+ RebalanceUtil.runStage(event, new CurrentStateComputationStage());
+ // Note the readOnlyWagedRebalancer is just for one time usage
+ ReadOnlyWagedRebalancer readOnlyWagedRebalancer =
+ new ReadOnlyWagedRebalancer(_zkClient.getServers(),
cache.getClusterName(),
cache.getClusterConfig().getGlobalRebalancePreference());
- event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(),
dryrunWagedRebalancer);
+ event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(),
readOnlyWagedRebalancer);
try {
- runStage(event, new BestPossibleStateCalcStage());
+ RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
} finally {
- dryrunWagedRebalancer.close();
+ readOnlyWagedRebalancer.close();
}
BestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
return output;
}
- private void runStage(ClusterEvent event, Stage stage) throws Exception {
- StageContext context = new StageContext();
- stage.init(context);
- stage.preProcess();
- stage.process(event);
- stage.postProcess();
- }
-
@Override
public String toString() {
String verifierName = getClass().getSimpleName();
return verifierName + "(" + _clusterName + "@" + _zkClient + "@resources["
+ (_resources != null ? Arrays.toString(_resources.toArray()) : "") +
"])";
}
-
- /**
- * A Dryrun WAGED rebalancer that only calculates the assignment based on
the cluster status but
- * never update the rebalancer assignment metadata.
- * This rebalacer is used in the verifiers or tests.
- */
- private class DryrunWagedRebalancer extends WagedRebalancer {
- DryrunWagedRebalancer(String metadataStoreAddrs, String clusterName,
- Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
- super(new ReadOnlyAssignmentMetadataStore(metadataStoreAddrs,
clusterName),
- ConstraintBasedAlgorithmFactory.getInstance(preferences),
Optional.empty());
- }
-
- @Override
- protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
- ResourceControllerDataProvider clusterData, Map<String, Resource>
resourceMap,
- Set<String> activeNodes, CurrentStateOutput currentStateOutput,
RebalanceAlgorithm algorithm)
- throws HelixRebalanceException {
- return getBestPossibleAssignment(getAssignmentMetadataStore(),
currentStateOutput,
- resourceMap.keySet());
- }
- }
-
- private class ReadOnlyAssignmentMetadataStore extends
AssignmentMetadataStore {
- ReadOnlyAssignmentMetadataStore(String metadataStoreAddrs, String
clusterName) {
- super(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
- }
-
- @Override
- public boolean persistBaseline(Map<String, ResourceAssignment>
globalBaseline) {
- // If baseline hasn't changed, skip writing to metadata store
- if (compareAssignments(_globalBaseline, globalBaseline)) {
- return false;
- }
- // Update the in-memory reference only
- _globalBaseline = globalBaseline;
- return true;
- }
-
- @Override
- public boolean persistBestPossibleAssignment(
- Map<String, ResourceAssignment> bestPossibleAssignment) {
- // If bestPossibleAssignment hasn't changed, skip writing to metadata
store
- if (compareAssignments(_bestPossibleAssignment, bestPossibleAssignment))
{
- return false;
- }
- // Update the in-memory reference only
- _bestPossibleAssignment = bestPossibleAssignment;
- return true;
- }
- }
}
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 3480fb6..348ce07 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -28,23 +28,43 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.base.Joiner;
+import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyType;
+import org.apache.helix.controller.common.PartitionStateMap;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.AbstractRebalancer;
import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public final class HelixUtil {
static private Logger LOG = LoggerFactory.getLogger(HelixUtil.class);
@@ -141,6 +161,82 @@ public final class HelixUtil {
}
/**
+ * Returns the expected ideal ResourceAssignments for the given resources in
the cluster
+ * calculated using the read-only WAGED rebalancer.
+ * @param metadataStoreAddress
+ * @param clusterConfig
+ * @param instanceConfigs
+ * @param liveInstances
+ * @param idealStates
+ * @param resourceConfigs
+ * @return
+ */
+ public static Map<String, ResourceAssignment>
getIdealAssignmentForWagedFullAuto(
+ String metadataStoreAddress, ClusterConfig clusterConfig,
+ List<InstanceConfig> instanceConfigs, List<String> liveInstances,
+ List<IdealState> idealStates, List<ResourceConfig> resourceConfigs) {
+ // Prepare a data accessor for a dataProvider (cache) refresh
+ BaseDataAccessor<ZNRecord> baseDataAccessor = new
ZkBaseDataAccessor<>(metadataStoreAddress);
+ HelixDataAccessor helixDataAccessor =
+ new ZKHelixDataAccessor(clusterConfig.getClusterName(),
baseDataAccessor);
+
+ // Create an instance of read-only WAGED rebalancer
+ ReadOnlyWagedRebalancer readOnlyWagedRebalancer =
+ new ReadOnlyWagedRebalancer(metadataStoreAddress,
clusterConfig.getClusterName(),
+ clusterConfig.getGlobalRebalancePreference());
+
+ // Use a dummy event to run the required stages for BestPossibleState
calculation
+ // Attributes RESOURCES and RESOURCES_TO_REBALANCE are populated in
ResourceComputationStage
+ ClusterEvent event = new ClusterEvent(clusterConfig.getClusterName(),
ClusterEventType.Unknown);
+
+ try {
+ // Obtain a refreshed dataProvider (cache) and overwrite cluster
parameters with the given parameters
+ ResourceControllerDataProvider dataProvider =
+ new ResourceControllerDataProvider(clusterConfig.getClusterName());
+ dataProvider.requireFullRefresh();
+ dataProvider.refresh(helixDataAccessor);
+ dataProvider.setClusterConfig(clusterConfig);
+ dataProvider.setInstanceConfigMap(instanceConfigs.stream()
+ .collect(Collectors.toMap(InstanceConfig::getInstanceName,
Function.identity())));
+ dataProvider.setLiveInstances(
+
liveInstances.stream().map(LiveInstance::new).collect(Collectors.toList()));
+ dataProvider.setIdealStates(idealStates);
+ dataProvider.setResourceConfigMap(resourceConfigs.stream()
+ .collect(Collectors.toMap(ResourceConfig::getResourceName,
Function.identity())));
+
+ event.addAttribute(AttributeName.ControllerDataProvider.name(),
dataProvider);
+ event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(),
readOnlyWagedRebalancer);
+
+ // Run the required stages to obtain the BestPossibleOutput
+ RebalanceUtil.runStage(event, new ResourceComputationStage());
+ RebalanceUtil.runStage(event, new CurrentStateComputationStage());
+ RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
+ } catch (Exception e) {
+ LOG.error("getIdealAssignmentForWagedFullAuto(): Failed to compute
ResourceAssignments!", e);
+ } finally {
+ // Close all ZK connections
+ baseDataAccessor.close();
+ readOnlyWagedRebalancer.close();
+ }
+
+ // Convert the resulting BestPossibleStateOutput to Map<String,
ResourceAssignment>
+ Map<String, ResourceAssignment> result = new HashMap<>();
+ BestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
+ Map<String, Resource> resourceMap =
+ event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
+ for (Resource resource : resourceMap.values()) {
+ String resourceName = resource.getResourceName();
+ PartitionStateMap partitionStateMap =
output.getPartitionStateMap(resourceName);
+ ResourceAssignment resourceAssignment = new
ResourceAssignment(resourceName);
+ for (Partition partition : resource.getPartitions()) {
+ resourceAssignment.addReplicaMap(partition,
partitionStateMap.getPartitionMap(partition));
+ }
+ result.put(resourceName, resourceAssignment);
+ }
+ return result;
+ }
+
+ /**
* This method provides the ideal state mapping with corresponding rebalance
strategy
* @param clusterConfig The cluster config
* @param instanceConfigs List of all existing instance configs
including disabled/down instances
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index 050762d..a16f421 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -28,6 +28,9 @@ import java.util.TreeMap;
import org.apache.helix.HelixException;
import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.StateModelDefinition;
import org.slf4j.Logger;
@@ -164,4 +167,25 @@ public class RebalanceUtil {
clusterName);
}
}
+
+ /**
+ * runStage allows the run of individual stages. It can be used to mock a
part of the Controller
+ * pipeline run.
+ *
+ * An example usage is as follows:
+ * runStage(event, new ResourceComputationStage());
+ * runStage(event, new CurrentStateComputationStage());
+ * runStage(event, new BestPossibleStateCalcStage());
+ * By running these stages, we are able to obtain BestPossibleStateOutput in
the event object.
+ * @param event
+ * @param stage
+ * @throws Exception
+ */
+ public static void runStage(ClusterEvent event, Stage stage) throws
Exception {
+ StageContext context = new StageContext();
+ stage.init(context);
+ stage.preProcess();
+ stage.process(event);
+ stage.postProcess();
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
index f6f692a..ab8c93c 100644
---
a/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
+++
b/helix-core/src/main/java/org/apache/helix/util/WeightAwareRebalanceUtil.java
@@ -7,7 +7,6 @@ import java.util.List;
import java.util.Map;
import org.apache.helix.HelixException;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.api.config.RebalanceConfig;
import
org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint;
import
org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint;
@@ -22,6 +21,7 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
/**
* A rebalance tool that generate an resource partition assignment based on
the input.
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
index 1b804d1..2522696 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -29,6 +29,7 @@ import java.util.Set;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
@@ -36,14 +37,18 @@ import
org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.apache.helix.util.HelixUtil;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -145,6 +150,51 @@ public class TestWagedRebalance extends ZkTestBase {
}
}
+ /**
+ * Use HelixUtil.getIdealAssignmentForWagedFullAuto() to compute the
cluster-wide assignment and
+ * verify that it matches with the result from the original WAGED
rebalancer's algorithm result.
+ */
+ @Test(dependsOnMethods = "test")
+ public void testRebalanceTool() throws InterruptedException {
+ // Create resources for testing
+ int i = 0;
+ for (String stateModel : _testModels) {
+ String db = "Test-DB-" + TestHelper.getTestMethodName() + i++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica,
+ _replica);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ _allDBs.add(db);
+ }
+ Thread.sleep(300);
+
+ validate(_replica);
+
+ // Read cluster parameters from ZK
+ HelixDataAccessor dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME,
_baseAccessor);
+ ClusterConfig clusterConfig =
+ dataAccessor.getProperty(dataAccessor.keyBuilder().clusterConfig());
+ List<InstanceConfig> instanceConfigs =
+
dataAccessor.getChildValues(dataAccessor.keyBuilder().instanceConfigs(), true);
+ List<String> liveInstances =
+ dataAccessor.getChildNames(dataAccessor.keyBuilder().liveInstances());
+ List<IdealState> idealStates =
+ dataAccessor.getChildValues(dataAccessor.keyBuilder().idealStates(),
true);
+ List<ResourceConfig> resourceConfigs =
+
dataAccessor.getChildValues(dataAccessor.keyBuilder().resourceConfigs(), true);
+
+ // Verify that utilResult contains the assignment for the resources added
+ Map<String, ResourceAssignment> utilResult = HelixUtil
+ .getIdealAssignmentForWagedFullAuto(ZK_ADDR, clusterConfig,
instanceConfigs, liveInstances,
+ idealStates, resourceConfigs);
+ Assert.assertNotNull(utilResult);
+ Assert.assertEquals(utilResult.size(), _allDBs.size());
+ for (IdealState idealState : idealStates) {
+ Assert.assertTrue(utilResult.containsKey(idealState.getResourceName()));
+
Assert.assertEquals(utilResult.get(idealState.getResourceName()).getRecord().getMapFields(),
+ idealState.getRecord().getMapFields());
+ }
+ }
+
@Test(dependsOnMethods = "test")
public void testWithInstanceTag() throws Exception {
Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
index 892b7b1..3fa99f0 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java
@@ -33,6 +33,7 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
+import org.apache.helix.util.RebalanceUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.DedupEventProcessor;
import
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
@@ -292,14 +293,6 @@ public class TaskTestUtil {
return cache;
}
- static void runStage(ClusterEvent event, Stage stage) throws Exception {
- StageContext context = new StageContext();
- stage.init(context);
- stage.preProcess();
- stage.process(event);
- stage.postProcess();
- }
-
public static BestPossibleStateOutput
calculateTaskSchedulingStage(WorkflowControllerDataProvider cache,
HelixManager manager) throws Exception {
ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
@@ -331,7 +324,7 @@ public class TaskTestUtil {
stages.add(new TaskGarbageCollectionStage());
for (Stage stage : stages) {
- runStage(event, stage);
+ RebalanceUtil.runStage(event, stage);
}
return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());