This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/wagedRebalancer by this push:
new 935360e Integrate the WAGED rebalancer with all the related
components. (#466)
935360e is described below
commit 935360e7f673ce50ad3dfa58cb9828186d2cc17c
Author: Jiajun Wang <[email protected]>
AuthorDate: Wed Sep 18 13:42:43 2019 -0700
Integrate the WAGED rebalancer with all the related components. (#466)
1. Integrate with the algorithm, assignment metadata store, etc. Fix
several conflicting interfaces and logics so as to all the rebalancer run
correctly.
2. Complete OptimalAssignment.
3. Add integration tests to ensure the correctness of rebalancing logic.
---
.../org/apache/helix/HelixRebalanceException.java | 3 +
.../rebalancer/waged/AssignmentMetadataStore.java | 54 ++-
.../rebalancer/waged/WagedRebalancer.java | 248 ++++++++---
.../constraints/ConstraintBasedAlgorithm.java | 92 +++-
.../NodeMaxPartitionLimitConstraint.java | 9 +-
.../rebalancer/waged/model/AssignableNode.java | 38 +-
.../rebalancer/waged/model/AssignableReplica.java | 12 +-
.../waged/model/ClusterModelProvider.java | 30 ++
.../rebalancer/waged/model/OptimalAssignment.java | 52 ++-
.../stages/BestPossibleStateCalcStage.java | 154 ++++---
.../helix/manager/zk/ZkBucketDataAccessor.java | 3 +-
.../java/org/apache/helix/common/ZkTestBase.java | 18 +-
.../waged/MockAssignmentMetadataStore.java | 9 +-
.../waged/TestAssignmentMetadataStore.java | 20 +-
.../rebalancer/waged/TestWagedRebalancer.java | 33 +-
.../waged/constraints/MockRebalanceAlgorithm.java | 2 +-
.../waged/model/AbstractTestClusterModel.java | 2 +-
.../waged/model/TestOptimalAssignment.java | 91 ++++
.../WagedRebalancer/TestWagedRebalance.java | 477 +++++++++++++++++++++
.../TestWagedRebalanceFaultZone.java | 374 ++++++++++++++++
.../TestWagedRebalanceTopologyAware.java | 114 +++++
21 files changed, 1605 insertions(+), 230 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
index a8b5055..d54853f 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java
@@ -23,10 +23,13 @@ package org.apache.helix;
* Exception thrown by Helix due to rebalance failures.
*/
public class HelixRebalanceException extends Exception {
+ // TODO: Adding static description or other necessary fields into the enum
instances for
+ // TODO: supporting the rebalance monitor to understand the exception.
public enum Type {
INVALID_CLUSTER_STATUS,
INVALID_REBALANCER_STATUS,
FAILED_TO_CALCULATE,
+ INVALID_INPUT,
UNKNOWN_FAILURE
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
index fd655d1..a540ffb 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/AssignmentMetadataStore.java
@@ -20,13 +20,15 @@ package org.apache.helix.controller.rebalancer.waged;
*/
import java.io.IOException;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.manager.zk.ZNRecordJacksonSerializer;
@@ -50,23 +52,27 @@ public class AssignmentMetadataStore {
private Map<String, ResourceAssignment> _globalBaseline;
private Map<String, ResourceAssignment> _bestPossibleAssignment;
+ AssignmentMetadataStore(String metadataStoreAddrs, String clusterName) {
+ this(new ZkBucketDataAccessor(metadataStoreAddrs), clusterName);
+ }
+
AssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String
clusterName) {
_dataAccessor = bucketDataAccessor;
_baselinePath = String.format(BASELINE_TEMPLATE, clusterName,
ASSIGNMENT_METADATA_KEY);
_bestPossiblePath = String.format(BEST_POSSIBLE_TEMPLATE, clusterName,
ASSIGNMENT_METADATA_KEY);
}
- AssignmentMetadataStore(HelixManager helixManager) {
- this(new
ZkBucketDataAccessor(helixManager.getMetadataStoreConnectionString()),
- helixManager.getClusterName());
- }
-
public Map<String, ResourceAssignment> getBaseline() {
// Return the in-memory baseline. If null, read from ZK. This is to
minimize reads from ZK
if (_globalBaseline == null) {
- HelixProperty baseline =
- _dataAccessor.compressedBucketRead(_baselinePath,
HelixProperty.class);
- _globalBaseline = splitAssignments(baseline);
+ try {
+ HelixProperty baseline =
+ _dataAccessor.compressedBucketRead(_baselinePath,
HelixProperty.class);
+ _globalBaseline = splitAssignments(baseline);
+ } catch (ZkNoNodeException ex) {
+ // Metadata does not exist, so return an empty map
+ _globalBaseline = Collections.emptyMap();
+ }
}
return _globalBaseline;
}
@@ -74,9 +80,14 @@ public class AssignmentMetadataStore {
public Map<String, ResourceAssignment> getBestPossibleAssignment() {
// Return the in-memory baseline. If null, read from ZK. This is to
minimize reads from ZK
if (_bestPossibleAssignment == null) {
- HelixProperty baseline =
- _dataAccessor.compressedBucketRead(_bestPossiblePath,
HelixProperty.class);
- _bestPossibleAssignment = splitAssignments(baseline);
+ try {
+ HelixProperty baseline =
+ _dataAccessor.compressedBucketRead(_bestPossiblePath,
HelixProperty.class);
+ _bestPossibleAssignment = splitAssignments(baseline);
+ } catch (ZkNoNodeException ex) {
+ // Metadata does not exist, so return an empty map
+ _bestPossibleAssignment = Collections.emptyMap();
+ }
}
return _bestPossibleAssignment;
}
@@ -113,6 +124,16 @@ public class AssignmentMetadataStore {
_bestPossibleAssignment = bestPossibleAssignment;
}
+ protected void finalize() {
+ // To ensure all resources are released.
+ close();
+ }
+
+ // Close to release all the resources.
+ public void close() {
+ _dataAccessor.disconnect();
+ }
+
/**
* Produces one HelixProperty that contains all assignment data.
* @param name
@@ -123,8 +144,9 @@ public class AssignmentMetadataStore {
Map<String, ResourceAssignment> assignmentMap) {
HelixProperty property = new HelixProperty(name);
// Add each resource's assignment as a simple field in one ZNRecord
+ // Node that don't use Arrays.toString() for the record converting. The
deserialize will fail.
assignmentMap.forEach((resource, assignment) ->
property.getRecord().setSimpleField(resource,
- Arrays.toString(SERIALIZER.serialize(assignment.getRecord()))));
+ new String(SERIALIZER.serialize(assignment.getRecord()))));
return property;
}
@@ -138,8 +160,8 @@ public class AssignmentMetadataStore {
// Convert each resource's assignment String into a ResourceAssignment
object and put it in a
// map
property.getRecord().getSimpleFields()
- .forEach((resource, assignment) -> assignmentMap.put(resource,
- new ResourceAssignment((ZNRecord)
SERIALIZER.deserialize(assignment.getBytes()))));
+ .forEach((resource, assignmentStr) -> assignmentMap.put(resource,
+ new ResourceAssignment((ZNRecord)
SERIALIZER.deserialize(assignmentStr.getBytes()))));
return assignmentMap;
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 551239d..1861e10 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
@@ -64,27 +65,34 @@ public class WagedRebalancer {
// When any of the following change happens, the rebalancer needs to do a
global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on
the new baseline.
private static final Set<HelixConstants.ChangeType>
GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
- ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
- HelixConstants.ChangeType.CLUSTER_CONFIG,
HelixConstants.ChangeType.INSTANCE_CONFIG);
+ ImmutableSet.of(
+ HelixConstants.ChangeType.RESOURCE_CONFIG,
+ HelixConstants.ChangeType.CLUSTER_CONFIG,
+ HelixConstants.ChangeType.INSTANCE_CONFIG);
// The cluster change detector is a stateful object.
// Make it static to avoid unnecessary reinitialization.
private static final ThreadLocal<ResourceChangeDetector>
CHANGE_DETECTOR_THREAD_LOCAL =
new ThreadLocal<>();
private final MappingCalculator<ResourceControllerDataProvider>
_mappingCalculator;
-
- // --------- The following fields are placeholders and need replacement.
-----------//
- // TODO Shall we make the metadata store a static threadlocal object as well
to avoid
- // reinitialization?
private final AssignmentMetadataStore _assignmentMetadataStore;
private final RebalanceAlgorithm _rebalanceAlgorithm;
- //
------------------------------------------------------------------------------------//
+
+ private static AssignmentMetadataStore constructAssignmentStore(HelixManager
helixManager) {
+ AssignmentMetadataStore assignmentMetadataStore = null;
+ if (helixManager != null) {
+ String metadataStoreAddrs =
helixManager.getMetadataStoreConnectionString();
+ String clusterName = helixManager.getClusterName();
+ if (metadataStoreAddrs != null && clusterName != null) {
+ assignmentMetadataStore = new
AssignmentMetadataStore(metadataStoreAddrs, clusterName);
+ }
+ }
+ return assignmentMetadataStore;
+ }
public WagedRebalancer(HelixManager helixManager,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
- this(
- // TODO init the metadata store according to their requirement when
integrate,
- // or change to final static method if possible.
- new AssignmentMetadataStore(helixManager),
ConstraintBasedAlgorithmFactory.getInstance(preferences),
+ this(constructAssignmentStore(helixManager),
+ ConstraintBasedAlgorithmFactory.getInstance(preferences),
// Use DelayedAutoRebalancer as the mapping calculator for the final
assignment output.
// Mapping calculator will translate the best possible assignment into
the applicable state
// mapping based on the current states.
@@ -94,6 +102,10 @@ public class WagedRebalancer {
private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
+ if (assignmentMetadataStore == null) {
+ LOG.warn("Assignment Metadata Store is not configured properly."
+ + " The rebalancer will not access the assignment store during the
rebalance.");
+ }
_assignmentMetadataStore = assignmentMetadataStore;
_rebalanceAlgorithm = algorithm;
_mappingCalculator = mappingCalculator;
@@ -103,7 +115,13 @@ public class WagedRebalancer {
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm) {
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer());
+ }
+ // Release all the resources.
+ public void close() {
+ if (_assignmentMetadataStore != null) {
+ _assignmentMetadataStore.close();
+ }
}
/**
@@ -117,27 +135,18 @@ public class WagedRebalancer {
public Map<String, IdealState>
computeNewIdealStates(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, final CurrentStateOutput
currentStateOutput)
throws HelixRebalanceException {
- LOG.info("Start computing new ideal states for resources: {}",
resourceMap.keySet().toString());
-
- // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the
WAGED rebalancer
- resourceMap = resourceMap.entrySet().stream().filter(resourceEntry -> {
- IdealState is = clusterData.getIdealState(resourceEntry.getKey());
- return is != null &&
is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
- && getClass().getName().equals(is.getRebalancerClassName());
- }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(),
- resourceEntry -> resourceEntry.getValue()));
-
if (resourceMap.isEmpty()) {
- LOG.warn("There is no valid resource to be rebalanced by {}",
+ LOG.warn("There is no resource to be rebalanced by {}",
this.getClass().getSimpleName());
return Collections.emptyMap();
- } else {
- LOG.info("Valid resources that will be rebalanced by {}: {}",
this.getClass().getSimpleName(),
- resourceMap.keySet().toString());
}
+ LOG.info("Start computing new ideal states for resources: {}",
resourceMap.keySet().toString());
+ validateInput(clusterData, resourceMap);
+
// Calculate the target assignment based on the current cluster status.
- Map<String, IdealState> newIdealStates =
computeBestPossibleStates(clusterData, resourceMap);
+ Map<String, IdealState> newIdealStates =
+ computeBestPossibleStates(clusterData, resourceMap,
currentStateOutput);
// Construct the new best possible states according to the current state
and target assignment.
// Note that the new ideal state might be an intermediate state between
the current state and
@@ -166,28 +175,29 @@ public class WagedRebalancer {
// Coordinate baseline recalculation and partial rebalance according to the
cluster changes.
private Map<String, IdealState> computeBestPossibleStates(
- ResourceControllerDataProvider clusterData, Map<String, Resource>
resourceMap)
- throws HelixRebalanceException {
+ ResourceControllerDataProvider clusterData, Map<String, Resource>
resourceMap,
+ final CurrentStateOutput currentStateOutput) throws
HelixRebalanceException {
getChangeDetector().updateSnapshots(clusterData);
- // Get all the modified and new items' information
+ // Get all the changed items' information
Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
getChangeDetector().getChangeTypes().stream()
.collect(Collectors.toMap(changeType -> changeType, changeType -> {
Set<String> itemKeys = new HashSet<>();
itemKeys.addAll(getChangeDetector().getAdditionsByType(changeType));
itemKeys.addAll(getChangeDetector().getChangesByType(changeType));
+
itemKeys.addAll(getChangeDetector().getRemovalsByType(changeType));
return itemKeys;
}));
if (clusterChanges.keySet().stream()
.anyMatch(changeType ->
GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES.contains(changeType))) {
- refreshBaseline(clusterData, clusterChanges, resourceMap);
+ refreshBaseline(clusterData, clusterChanges, resourceMap,
currentStateOutput);
// Inject a cluster config change for large scale partial rebalance once
the baseline changed.
clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG,
Collections.emptySet());
}
Map<String, ResourceAssignment> newAssignment =
- partialRebalance(clusterData, clusterChanges, resourceMap);
+ partialRebalance(clusterData, clusterChanges, resourceMap,
currentStateOutput);
// Convert the assignments into IdealState for the following state mapping
calculation.
Map<String, IdealState> finalIdealState = new HashMap<>();
@@ -213,56 +223,60 @@ public class WagedRebalancer {
// TODO make the Baseline calculation async if complicated algorithm is used
for the Baseline
private void refreshBaseline(ResourceControllerDataProvider clusterData,
- Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String,
Resource> resourceMap)
- throws HelixRebalanceException {
+ Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String,
Resource> resourceMap,
+ final CurrentStateOutput currentStateOutput) throws
HelixRebalanceException {
+ LOG.info("Start calculating the new baseline.");
+ Map<String, ResourceAssignment> currentBaseline =
+ getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
// For baseline calculation
// 1. Ignore node status (disable/offline).
// 2. Use the baseline as the previous best possible assignment since
there is no "baseline" for
// the baseline.
- LOG.info("Start calculating the new baseline.");
- Map<String, ResourceAssignment> currentBaseline;
- try {
- currentBaseline = _assignmentMetadataStore.getBaseline();
- } catch (Exception ex) {
- throw new HelixRebalanceException("Failed to get the current baseline
assignment.",
- HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
- }
- Map<String, ResourceAssignment> baseline =
calculateAssignment(clusterData, clusterChanges,
- resourceMap, clusterData.getAllInstances(), Collections.emptyMap(),
currentBaseline);
- try {
- _assignmentMetadataStore.persistBaseline(baseline);
- } catch (Exception ex) {
- throw new HelixRebalanceException("Failed to persist the new baseline
assignment.",
- HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ Map<String, ResourceAssignment> newBaseline =
+ calculateAssignment(clusterData, clusterChanges, resourceMap,
clusterData.getAllInstances(),
+ Collections.emptyMap(), currentBaseline);
+
+ if (_assignmentMetadataStore != null) {
+ try {
+ _assignmentMetadataStore.persistBaseline(newBaseline);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to persist the new baseline
assignment.",
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ }
+ } else {
+ LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline
assignment.");
}
+
LOG.info("Finish calculating the new baseline.");
}
private Map<String, ResourceAssignment> partialRebalance(
ResourceControllerDataProvider clusterData,
- Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String,
Resource> resourceMap)
- throws HelixRebalanceException {
+ Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String,
Resource> resourceMap,
+ final CurrentStateOutput currentStateOutput) throws
HelixRebalanceException {
LOG.info("Start calculating the new best possible assignment.");
- Set<String> activeInstances = clusterData.getEnabledLiveInstances();
- Map<String, ResourceAssignment> baseline;
- Map<String, ResourceAssignment> prevBestPossibleAssignment;
- try {
- baseline = _assignmentMetadataStore.getBaseline();
- prevBestPossibleAssignment =
_assignmentMetadataStore.getBestPossibleAssignment();
- } catch (Exception ex) {
- throw new HelixRebalanceException("Failed to get the persisted
assignment records.",
- HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
- }
- Map<String, ResourceAssignment> newAssignment =
calculateAssignment(clusterData, clusterChanges,
- resourceMap, activeInstances, baseline, prevBestPossibleAssignment);
- try {
- // TODO Test to confirm if persisting the final assignment (with final
partition states)
- // would be a better option.
- _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
- } catch (Exception ex) {
- throw new HelixRebalanceException("Failed to persist the new best
possible assignment.",
- HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ Map<String, ResourceAssignment> currentBaseline =
+ getBaselineAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
+ Map<String, ResourceAssignment> currentBestPossibleAssignment =
+ getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
+ resourceMap.keySet());
+ Map<String, ResourceAssignment> newAssignment =
+ calculateAssignment(clusterData, clusterChanges, resourceMap,
+ clusterData.getEnabledLiveInstances(), currentBaseline,
currentBestPossibleAssignment);
+
+ if (_assignmentMetadataStore != null) {
+ try {
+ // TODO Test to confirm if persisting the final assignment (with final
partition states)
+ // would be a better option.
+ _assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException("Failed to persist the new best
possible assignment.",
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ }
+ } else {
+ LOG.debug("Assignment Metadata Store is empty. Skip persist the baseline
assignment.");
}
+
LOG.info("Finish calculating the new best possible assignment.");
return newAssignment;
}
@@ -348,4 +362,100 @@ public class WagedRebalancer {
}
return preferenceList;
}
+
+ private void validateInput(ResourceControllerDataProvider clusterData,
+ Map<String, Resource> resourceMap) throws HelixRebalanceException {
+ Set<String> nonCompatibleResources =
resourceMap.entrySet().stream().filter(resourceEntry -> {
+ IdealState is = clusterData.getIdealState(resourceEntry.getKey());
+ return is == null ||
!is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
+ || !getClass().getName().equals(is.getRebalancerClassName());
+ }).map(Map.Entry::getKey).collect(Collectors.toSet());
+ if (!nonCompatibleResources.isEmpty()) {
+ throw new HelixRebalanceException(String.format(
+ "Input contains invalid resource(s) that cannot be rebalanced by the
WAGED rebalancer. %s",
+ nonCompatibleResources.toString()),
HelixRebalanceException.Type.INVALID_INPUT);
+ }
+ }
+
+ /**
+ * @param assignmentMetadataStore
+ * @param currentStateOutput
+ * @param resources
+ * @return The current baseline assignment. If record does not exist in the
+ * assignmentMetadataStore, return the current state assignment.
+ * @throws HelixRebalanceException
+ */
+ private Map<String, ResourceAssignment> getBaselineAssignment(
+ AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput
currentStateOutput,
+ Set<String> resources) throws HelixRebalanceException {
+ Map<String, ResourceAssignment> currentBaseline = Collections.emptyMap();
+ if (assignmentMetadataStore != null) {
+ try {
+ currentBaseline = assignmentMetadataStore.getBaseline();
+ } catch (HelixException ex) {
+ // Report error. and use empty mapping instead.
+ LOG.error("Failed to get the current baseline assignment.", ex);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException(
+ "Failed to get the current baseline assignment because of
unexpected error.",
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ }
+ }
+ if (currentBaseline.isEmpty()) {
+ LOG.warn(
+ "The current baseline assignment record is empty. Use the current
states instead.");
+ currentBaseline = getCurrentStateAssingment(currentStateOutput,
resources);
+ }
+ return currentBaseline;
+ }
+
+ /**
+ * @param assignmentMetadataStore
+ * @param currentStateOutput
+ * @param resources
+ * @return The current best possible assignment. If record does not exist in
the
+ * assignmentMetadataStore, return the current state assignment.
+ * @throws HelixRebalanceException
+ */
+ private Map<String, ResourceAssignment> getBestPossibleAssignment(
+ AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput
currentStateOutput,
+ Set<String> resources) throws HelixRebalanceException {
+ Map<String, ResourceAssignment> currentBestAssignment =
Collections.emptyMap();
+ if (assignmentMetadataStore != null) {
+ try {
+ currentBestAssignment =
assignmentMetadataStore.getBestPossibleAssignment();
+ } catch (HelixException ex) {
+ // Report error. and use empty mapping instead.
+ LOG.error("Failed to get the current best possible assignment.", ex);
+ } catch (Exception ex) {
+ throw new HelixRebalanceException(
+ "Failed to get the current best possible assignment because of
unexpected error.",
+ HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
+ }
+ }
+ if (currentBestAssignment.isEmpty()) {
+ LOG.warn(
+ "The current best possible assignment record is empty. Use the
current states instead.");
+ currentBestAssignment = getCurrentStateAssingment(currentStateOutput,
resources);
+ }
+ return currentBestAssignment;
+ }
+
+ private Map<String, ResourceAssignment> getCurrentStateAssingment(
+ CurrentStateOutput currentStateOutput, Set<String> resourceSet) {
+ Map<String, ResourceAssignment> currentStateAssignment = new HashMap<>();
+ for (String resourceName : resourceSet) {
+ Map<Partition, Map<String, String>> currentStateMap =
+ currentStateOutput.getCurrentStateMap(resourceName);
+ if (!currentStateMap.isEmpty()) {
+ ResourceAssignment newResourceAssignment = new
ResourceAssignment(resourceName);
+ currentStateMap.entrySet().stream().forEach(currentStateEntry -> {
+ newResourceAssignment
+ .addReplicaMap(currentStateEntry.getKey(),
currentStateEntry.getValue());
+ });
+ currentStateAssignment.put(resourceName, newResourceAssignment);
+ }
+ }
+ return currentStateAssignment;
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
index 89a3f29..1a41aef 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import com.google.common.collect.Maps;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
@@ -36,11 +37,10 @@ import
org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
+import org.apache.helix.model.ResourceAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
-
/**
* The algorithm is based on a given set of constraints
* - HardConstraint: Approve or deny the assignment given its condition, any
assignment cannot
@@ -64,29 +64,26 @@ class ConstraintBasedAlgorithm implements
RebalanceAlgorithm {
@Override
public OptimalAssignment calculate(ClusterModel clusterModel) throws
HelixRebalanceException {
OptimalAssignment optimalAssignment = new OptimalAssignment();
- Map<String, Set<AssignableReplica>> replicasByResource =
clusterModel.getAssignableReplicaMap();
List<AssignableNode> nodes = new
ArrayList<>(clusterModel.getAssignableNodes().values());
-
- // TODO: different orders of resource/replica could lead to different
greedy assignments, will
- // revisit and improve the performance
- for (String resource : replicasByResource.keySet()) {
- for (AssignableReplica replica : replicasByResource.get(resource)) {
- Optional<AssignableNode> maybeBestNode =
- getNodeWithHighestPoints(replica, nodes,
clusterModel.getContext(), optimalAssignment);
- // stop immediately if any replica cannot find best assignable node
- if (optimalAssignment.hasAnyFailure()) {
- String errorMessage = String.format(
- "Unable to find any available candidate node for partition %s;
Fail reasons: %s",
- replica.getPartitionName(), optimalAssignment.getFailures());
- throw new HelixRebalanceException(errorMessage,
- HelixRebalanceException.Type.FAILED_TO_CALCULATE);
- }
- maybeBestNode.ifPresent(node ->
clusterModel.assign(replica.getResourceName(),
- replica.getPartitionName(), replica.getReplicaState(),
node.getInstanceName()));
+ // Sort the replicas so the input is stable for the greedy algorithm.
+ // For the other algorithm implementation, this sorting could be
unnecessary.
+ for (AssignableReplica replica :
getOrderedAssignableReplica(clusterModel)) {
+ Optional<AssignableNode> maybeBestNode =
+ getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(),
optimalAssignment);
+ // stop immediately if any replica cannot find best assignable node
+ if (optimalAssignment.hasAnyFailure()) {
+ String errorMessage = String.format(
+ "Unable to find any available candidate node for partition %s;
Fail reasons: %s",
+ replica.getPartitionName(), optimalAssignment.getFailures());
+ throw new HelixRebalanceException(errorMessage,
+ HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}
+ maybeBestNode.ifPresent(node -> clusterModel
+ .assign(replica.getResourceName(), replica.getPartitionName(),
replica.getReplicaState(),
+ node.getInstanceName()));
}
-
- return optimalAssignment.convertFrom(clusterModel);
+ optimalAssignment.updateAssignments(clusterModel);
+ return optimalAssignment;
}
private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica
replica,
@@ -133,4 +130,55 @@ class ConstraintBasedAlgorithm implements
RebalanceAlgorithm {
return hardConstraints.stream().map(HardConstraint::getDescription)
.collect(Collectors.toList());
}
+
+ // TODO investigate better ways to sort replicas. One option is sorting
based on the creation time.
+ private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel
clusterModel) {
+ Map<String, Set<AssignableReplica>> replicasByResource =
clusterModel.getAssignableReplicaMap();
+ List<AssignableReplica> orderedAssignableReplicas =
+ replicasByResource.values().stream().flatMap(replicas ->
replicas.stream())
+ .collect(Collectors.toList());
+
+ Map<String, ResourceAssignment> bestPossibleAssignment =
+ clusterModel.getContext().getBestPossibleAssignment();
+ Map<String, ResourceAssignment> baselineAssignment =
+ clusterModel.getContext().getBaselineAssignment();
+
+ // 1. Sort according if the assignment exists in the best possible and/or
baseline assignment
+ // 2. Sort according to the state priority. Note that prioritizing the top
state is required.
+ // Or the greedy algorithm will unnecessarily shuffle the states between
replicas.
+ // 3. Sort according to the resource/partition name.
+ orderedAssignableReplicas.sort((replica1, replica2) -> {
+ String resourceName1 = replica1.getResourceName();
+ String resourceName2 = replica2.getResourceName();
+ if (bestPossibleAssignment.containsKey(resourceName1) ==
bestPossibleAssignment
+ .containsKey(resourceName2)) {
+ if (baselineAssignment.containsKey(resourceName1) == baselineAssignment
+ .containsKey(resourceName2)) {
+ // If both assignment states have/not have the resource assignment
the same,
+ // compare for additional dimensions.
+ int statePriority1 = replica1.getStatePriority();
+ int statePriority2 = replica2.getStatePriority();
+ if (statePriority1 == statePriority2) {
+ // If state prioritizes are the same, compare the names.
+ if (resourceName1.equals(resourceName2)) {
+ return
replica1.getPartitionName().compareTo(replica2.getPartitionName());
+ } else {
+ return resourceName1.compareTo(resourceName2);
+ }
+ } else {
+ // Note we shall prioritize the replica with a higher state
priority,
+ // the smaller priority number means higher priority.
+ return statePriority1 - statePriority2;
+ }
+ } else {
+ // If the baseline assignment contains the assignment, prioritize
the replica.
+ return baselineAssignment.containsKey(resourceName1) ? -1 : 1;
+ }
+ } else {
+ // If the best possible assignment contains the assignment, prioritize
the replica.
+ return bestPossibleAssignment.containsKey(resourceName1) ? -1 : 1;
+ }
+ });
+ return orderedAssignableReplicas;
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
index 9d0752b..cda5329 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/NodeMaxPartitionLimitConstraint.java
@@ -28,9 +28,12 @@ class NodeMaxPartitionLimitConstraint extends HardConstraint
{
@Override
boolean isAssignmentValid(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
- return node.getAssignedReplicaCount() < node.getMaxPartition()
- &&
node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica
- .getResourceMaxPartitionsPerInstance();
+ boolean exceedMaxPartitionLimit =
+ node.getMaxPartition() < 0 || node.getAssignedReplicaCount() <
node.getMaxPartition();
+ boolean exceedResourceMaxPartitionLimit =
replica.getResourceMaxPartitionsPerInstance() < 0
+ ||
node.getAssignedPartitionsByResource(replica.getResourceName()).size() < replica
+ .getResourceMaxPartitionsPerInstance();
+ return exceedMaxPartitionLimit && exceedResourceMaxPartitionLimit;
}
@Override
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index a3460fb..20de6da 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -19,8 +19,6 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
-import static java.lang.Math.max;
-
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -287,16 +285,23 @@ public class AssignableNode implements
Comparable<AssignableNode> {
* Any missing field will cause an invalid topology config exception.
*/
private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig
instanceConfig) {
- if (clusterConfig.isTopologyAwareEnabled()) {
- String topologyStr = clusterConfig.getTopology();
- String faultZoneType = clusterConfig.getFaultZoneType();
- if (topologyStr == null || faultZoneType == null) {
- throw new HelixException("Fault zone or cluster topology information
is not configured.");
- }
-
+ if (!clusterConfig.isTopologyAwareEnabled()) {
+ // Instance name is the default fault zone if topology awareness is
false.
+ return instanceConfig.getInstanceName();
+ }
+ String topologyStr = clusterConfig.getTopology();
+ String faultZoneType = clusterConfig.getFaultZoneType();
+ if (topologyStr == null || faultZoneType == null) {
+ LOG.debug("Topology configuration is not complete. Topology define: {},
Fault Zone Type: {}",
+ topologyStr, faultZoneType);
+ // Use the instance name, or the deprecated ZoneId field (if exists) as
the default fault zone.
+ String zoneId = instanceConfig.getZoneId();
+ return zoneId == null ? instanceConfig.getInstanceName() : zoneId;
+ } else {
+ // Get the fault zone information from the complete topology definition.
String[] topologyDef = topologyStr.trim().split("/");
- if (topologyDef.length == 0
- || Arrays.stream(topologyDef).noneMatch(type ->
type.equals(faultZoneType))) {
+ if (topologyDef.length == 0 ||
+ Arrays.stream(topologyDef).noneMatch(type ->
type.equals(faultZoneType))) {
throw new HelixException(
"The configured topology definition is empty or does not contain
the fault zone type.");
}
@@ -324,10 +329,6 @@ public class AssignableNode implements
Comparable<AssignableNode> {
}
return faultZoneStringBuilder.toString();
}
- } else {
- // For backward compatibility
- String zoneId = instanceConfig.getZoneId();
- return zoneId == null ? instanceConfig.getInstanceName() : zoneId;
}
}
@@ -356,7 +357,7 @@ public class AssignableNode implements
Comparable<AssignableNode> {
// For the purpose of constraint calculation, the max utilization cannot
be larger than 100%.
float utilization = Math.min(
(float) (_maxCapacity.get(capacityKey) - newCapacity) /
_maxCapacity.get(capacityKey), 1);
- _highestCapacityUtilization = max(_highestCapacityUtilization,
utilization);
+ _highestCapacityUtilization = Math.max(_highestCapacityUtilization,
utilization);
}
// else if the capacityKey does not exist in the capacity map, this method
essentially becomes
// a NOP; in other words, this node will be treated as if it has unlimited
capacity.
@@ -394,4 +395,9 @@ public class AssignableNode implements
Comparable<AssignableNode> {
public int compareTo(AssignableNode o) {
return _instanceName.compareTo(o.getInstanceName());
}
+
+ @Override
+ public String toString() {
+ return _instanceName;
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
index 66bd7b7..a651e19 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableReplica.java
@@ -20,17 +20,22 @@ package org.apache.helix.controller.rebalancer.waged.model;
*/
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class represents a partition replication that needs to be allocated.
*/
public class AssignableReplica implements Comparable<AssignableReplica> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AssignableReplica.class);
+
private final String _partitionName;
private final String _resourceName;
private final String _resourceInstanceGroupTag;
@@ -149,9 +154,10 @@ public class AssignableReplica implements
Comparable<AssignableReplica> {
partitionCapacity =
capacityMap.get(ResourceConfig.DEFAULT_PARTITION_KEY);
}
if (partitionCapacity == null) {
- throw new IllegalArgumentException(String.format(
- "The capacity usage of the specified partition %s is not configured
in the Resource Config %s. No default partition capacity is configured
neither.",
- partitionName, resourceConfig.getResourceName()));
+ LOG.warn("The capacity usage of the specified partition {} is not
configured in the Resource"
+ + " Config {}. No default partition capacity is configured either.
Will proceed with"
+ + " empty capacity configuration.", partitionName,
resourceConfig.getResourceName());
+ partitionCapacity = new HashMap<>();
}
List<String> requiredCapacityKeys =
clusterConfig.getInstanceCapacityKeys();
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index af1a8d8..2b53422 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -200,6 +200,9 @@ public class ClusterModelProvider {
for (String resourceName : resourceMap.keySet()) {
ResourceConfig resourceConfig =
dataProvider.getResourceConfig(resourceName);
+ if (resourceConfig == null) {
+ resourceConfig = new ResourceConfig(resourceName);
+ }
IdealState is = dataProvider.getIdealState(resourceName);
if (is == null) {
throw new HelixException(
@@ -223,6 +226,7 @@ public class ClusterModelProvider {
for (Map.Entry<String, Integer> entry : stateCountMap.entrySet()) {
String state = entry.getKey();
for (int i = 0; i < entry.getValue(); i++) {
+ mergeIdealStateWithResourceConfig(resourceConfig, is);
totalReplicaMap.computeIfAbsent(resourceName, key -> new
HashSet<>()).add(
new AssignableReplica(clusterConfig, resourceConfig,
partition, state,
def.getStatePriorityMap().get(state)));
@@ -234,6 +238,32 @@ public class ClusterModelProvider {
}
/**
+ * For backward compatibility, propagate the critical simple fields from the
IdealState to
+ * the Resource Config.
+ * Eventually, Resource Config should be the only metadata node that
contains the required information.
+ */
+ private static void mergeIdealStateWithResourceConfig(ResourceConfig
resourceConfig,
+ final IdealState idealState) {
+ // Note that the config fields get updated in this method shall be fully
compatible with ones in the IdealState.
+ // 1. The fields shall have exactly the same meaning.
+ // 2. The value shall be exactly compatible, no additional calculation
involved.
+ // 3. Resource Config items have a high priority.
+ // This is to ensure the resource config is not polluted after the merge.
+ if (null == resourceConfig.getRecord()
+
.getSimpleField(ResourceConfig.ResourceConfigProperty.INSTANCE_GROUP_TAG.name()))
{
+ resourceConfig.getRecord()
+
.setSimpleField(ResourceConfig.ResourceConfigProperty.INSTANCE_GROUP_TAG.name(),
+ idealState.getInstanceGroupTag());
+ }
+ if (null == resourceConfig.getRecord()
+
.getSimpleField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name()))
{
+ resourceConfig.getRecord()
+
.setIntField(ResourceConfig.ResourceConfigProperty.MAX_PARTITIONS_PER_INSTANCE.name(),
+ idealState.getMaxPartitionsPerInstance());
+ }
+ }
+
+ /**
* @return A map contains the assignments for each fault zone. <fault zone,
<resource, set of partitions>>
*/
private static Map<String, Map<String, Set<String>>>
mapAssignmentToFaultZone(
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java
index 31cb181..138f30c 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/OptimalAssignment.java
@@ -19,38 +19,64 @@ package org.apache.helix.controller.rebalancer.waged.model;
* under the License.
*/
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.helix.HelixException;
+import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
/**
* The data model represents the optimal assignment of N replicas assigned to
M instances;
* It's mostly used as the return parameter of an assignment calculation
algorithm; If the algorithm
- * failed to find optimal assignment given the endeavor, the user could check
the failure reasons
+ * failed to find optimal assignment given the endeavor, the user could check
the failure reasons.
+ * Note that this class is not thread safe.
*/
public class OptimalAssignment {
private Map<AssignableNode, List<AssignableReplica>> _optimalAssignment =
new HashMap<>();
private Map<AssignableReplica, Map<AssignableNode, List<String>>>
_failedAssignments =
new HashMap<>();
- public OptimalAssignment() {
-
- }
-
+ /**
+ * Update the OptimalAssignment instance with the existing assignment
recorded in the input cluster model.
+ *
+ * @param clusterModel
+ */
public void updateAssignments(ClusterModel clusterModel) {
-
+ _optimalAssignment.clear();
+ clusterModel.getAssignableNodes().values().stream()
+ .forEach(node -> _optimalAssignment.put(node, new
ArrayList<>(node.getAssignedReplicas())));
}
- // TODO: determine the output of final assignment format
+ /**
+ * @return The optimal assignment in the form of a <Resource Name,
ResourceAssignment> map.
+ */
public Map<String, ResourceAssignment> getOptimalResourceAssignment() {
- throw new UnsupportedOperationException("Not implemented yet");
- }
-
- // TODO: the convert method is not the best choice so far, will revisit the
data model
- public OptimalAssignment convertFrom(ClusterModel clusterModel) {
- return this;
+ if (hasAnyFailure()) {
+ throw new HelixException(
+ "Cannot get the optimal resource assignment since a calculation
failure is recorded. "
+ + getFailures());
+ }
+ Map<String, ResourceAssignment> assignmentMap = new HashMap<>();
+ for (AssignableNode node : _optimalAssignment.keySet()) {
+ for (AssignableReplica replica : _optimalAssignment.get(node)) {
+ String resourceName = replica.getResourceName();
+ Partition partition = new Partition(replica.getPartitionName());
+ ResourceAssignment resourceAssignment = assignmentMap
+ .computeIfAbsent(resourceName, key -> new
ResourceAssignment(resourceName));
+ Map<String, String> partitionStateMap =
resourceAssignment.getReplicaMap(partition);
+ if (partitionStateMap.isEmpty()) {
+ // ResourceAssignment returns immutable empty map while no such
assignment recorded yet.
+ // So if the returned map is empty, create a new map.
+ partitionStateMap = new HashMap<>();
+ }
+ partitionStateMap.put(node.getInstanceName(),
replica.getReplicaState());
+ resourceAssignment.addReplicaMap(partition, partitionStateMap);
+ }
+ }
+ return assignmentMap;
}
public void recordAssignmentFailure(AssignableReplica replica,
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 4df8e8d..8ce60e7 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -19,6 +19,14 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
@@ -48,13 +56,6 @@ import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
/**
* For partition compute best possible (instance,state) pair based on
* IdealState,StateModel,LiveInstance
@@ -114,67 +115,46 @@ public class BestPossibleStateCalcStage extends
AbstractBaseStage {
// Check whether the offline/disabled instance count in the cluster
reaches the set limit,
// if yes, pause the rebalancer.
- boolean isValid = validateOfflineInstancesLimit(cache,
- (HelixManager) event.getAttribute(AttributeName.helixmanager.name()));
-
- // 1. Rebalance with the WAGED rebalancer
- // The rebalancer only calculates the new ideal assignment for all the
resources that are
- // configured to use the WAGED rebalancer.
- // For the other resources, the legacy rebalancers will be triggered in
the next step.
- Map<String, IdealState> newIdealStates = new HashMap<>();
- Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences =
cache.getClusterConfig()
- .getGlobalRebalancePreference();
- WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager,
preferences);
- try {
- newIdealStates
- .putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap,
currentStateOutput));
- } catch (HelixRebalanceException ex) {
- // Note that unlike the legacy rebalancer, the WAGED rebalance won't
return partial result.
- // Since it calculates for all the eligible resources globally, a
partial result is invalid.
- // TODO propagate the rebalancer failure information to
updateRebalanceStatus for monitoring.
- LogUtil.logError(logger, _eventId, String
- .format("Failed to calculate the new Ideal States using the
rebalancer %s due to %s",
- wagedRebalancer.getClass().getSimpleName(),
ex.getFailureType()), ex);
- }
+ boolean isValid =
+ validateOfflineInstancesLimit(cache,
event.getAttribute(AttributeName.helixmanager.name()));
final List<String> failureResources = new ArrayList<>();
- Iterator<Resource> itr = resourceMap.values().iterator();
+
+ Map<String, Resource> calculatedResourceMap =
+ computeResourceBestPossibleStateWithWagedRebalancer(cache,
currentStateOutput, helixManager,
+ resourceMap, output, failureResources);
+
+ Map<String, Resource> remainingResourceMap = new HashMap<>(resourceMap);
+ remainingResourceMap.keySet().removeAll(calculatedResourceMap.keySet());
+
+ // Fallback to the original single resource rebalancer calculation.
+ // This is required because we support mixed cluster that uses both WAGED
rebalancer and the
+ // older rebalancers.
+ Iterator<Resource> itr = remainingResourceMap.values().iterator();
while (itr.hasNext()) {
Resource resource = itr.next();
boolean result = false;
- IdealState is = newIdealStates.get(resource.getResourceName());
- if (is != null) {
- // 2. Check if the WAGED rebalancer has calculated for this resource
or not.
- result = checkBestPossibleStateCalculation(is);
- if (result) {
- // The WAGED rebalancer calculates a valid result, record in the
output
- updateBestPossibleStateOutput(output, resource, is);
- }
- } else {
- // 3. The WAGED rebalancer skips calculating the resource assignment,
fallback to use a
- // legacy resource rebalancer if applicable.
- // If this calculation fails, the resource will be reported in the
failureResources list.
- try {
- result =
- computeSingleResourceBestPossibleState(event, cache,
currentStateOutput, resource,
- output);
- } catch (HelixException ex) {
- LogUtil.logError(logger, _eventId,
- "Exception when calculating best possible states for " +
resource.getResourceName(),
- ex);
- }
+ try {
+ result = computeSingleResourceBestPossibleState(event, cache,
currentStateOutput, resource,
+ output);
+ } catch (HelixException ex) {
+ LogUtil.logError(logger, _eventId, String
+ .format("Exception when calculating best possible states for %s",
+ resource.getResourceName()), ex);
+
}
if (!result) {
failureResources.add(resource.getResourceName());
- LogUtil.logWarn(logger, _eventId,
- "Failed to calculate best possible states for " +
resource.getResourceName());
+ LogUtil.logWarn(logger, _eventId, String
+ .format("Failed to calculate best possible states for %s",
resource.getResourceName()));
}
}
// Check and report if resource rebalance has failure
updateRebalanceStatus(!isValid || !failureResources.isEmpty(),
failureResources, helixManager,
- cache, clusterStatusMonitor,
- "Failed to calculate best possible states for " +
failureResources.size() + " resources.");
+ cache, clusterStatusMonitor, String
+ .format("Failed to calculate best possible states for %d
resources.",
+ failureResources.size()));
return output;
}
@@ -238,6 +218,70 @@ public class BestPossibleStateCalcStage extends
AbstractBaseStage {
return true;
}
+ /**
+ * Rebalance with the WAGED rebalancer
+ * The rebalancer only calculates the new ideal assignment for all the
resources that are
+ * configured to use the WAGED rebalancer.
+ *
+ * @param cache Cluster data cache.
+ * @param currentStateOutput The current state information.
+ * @param helixManager
+ * @param resourceMap The complete resource map. The method will
filter the map for the compatible resources.
+ * @param output The best possible state output.
+ * @param failureResources The failure records that will be updated if any
resource cannot be computed.
+ * @return The map of all the calculated resources.
+ */
+ private Map<String, Resource>
computeResourceBestPossibleStateWithWagedRebalancer(
+ ResourceControllerDataProvider cache, CurrentStateOutput
currentStateOutput,
+ HelixManager helixManager, Map<String, Resource> resourceMap,
BestPossibleStateOutput output,
+ List<String> failureResources) {
+ // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the
WAGED rebalancer
+ Map<String, Resource> wagedRebalancedResourceMap =
+ resourceMap.entrySet().stream().filter(resourceEntry -> {
+ IdealState is = cache.getIdealState(resourceEntry.getKey());
+ return is != null &&
is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
+ &&
WagedRebalancer.class.getName().equals(is.getRebalancerClassName());
+ }).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(),
+ resourceEntry -> resourceEntry.getValue()));
+
+ Map<String, IdealState> newIdealStates = new HashMap<>();
+
+ // Init rebalancer with the rebalance preferences.
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences =
cache.getClusterConfig()
+ .getGlobalRebalancePreference();
+ // TODO avoid creating the rebalancer on every rebalance call for
performance enhancement
+ WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager,
preferences);
+ try {
+ newIdealStates.putAll(wagedRebalancer
+ .computeNewIdealStates(cache, wagedRebalancedResourceMap,
currentStateOutput));
+ } catch (HelixRebalanceException ex) {
+ // Note that unlike the legacy rebalancer, the WAGED rebalance won't
return partial result.
+ // Since it calculates for all the eligible resources globally, a
partial result is invalid.
+ // TODO propagate the rebalancer failure information to
updateRebalanceStatus for monitoring.
+ LogUtil.logError(logger, _eventId, String
+ .format("Failed to calculate the new Ideal States using the
rebalancer %s due to %s",
+ wagedRebalancer.getClass().getSimpleName(),
ex.getFailureType()), ex);
+ } finally {
+ wagedRebalancer.close();
+ }
+ Iterator<Resource> itr = wagedRebalancedResourceMap.values().iterator();
+ while (itr.hasNext()) {
+ Resource resource = itr.next();
+ IdealState is = newIdealStates.get(resource.getResourceName());
+ // Check if the WAGED rebalancer has calculated the result for this
resource or not.
+ if (is != null && checkBestPossibleStateCalculation(is)) {
+ // The WAGED rebalancer calculates a valid result, record in the output
+ updateBestPossibleStateOutput(output, resource, is);
+ } else {
+ failureResources.add(resource.getResourceName());
+ LogUtil.logWarn(logger, _eventId, String
+ .format("Failed to calculate best possible states for %s.",
+ resource.getResourceName()));
+ }
+ }
+ return wagedRebalancedResourceMap;
+ }
+
private void updateBestPossibleStateOutput(BestPossibleStateOutput output,
Resource resource,
IdealState computedIdealState) {
output.setPreferenceLists(resource.getResourceName(),
computedIdealState.getPreferenceLists());
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 24c7c8e..a11da29 100644
---
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
@@ -215,7 +216,7 @@ public class ZkBucketDataAccessor implements
BucketDataAccessor, AutoCloseable {
ZNRecord metadataRecord =
_znRecordBaseDataAccessor.get(path, null, AccessOption.PERSISTENT);
if (metadataRecord == null) {
- throw new HelixException(
+ throw new ZkNoNodeException(
String.format("Metadata ZNRecord does not exist for path: %s",
path));
}
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 03338b4..b9284b9 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -30,9 +30,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
-
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
+
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkServer;
import org.apache.helix.BaseDataAccessor;
@@ -54,6 +54,7 @@ import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -347,6 +348,19 @@ public class ZkTestBase {
protected IdealState createResourceWithDelayedRebalance(String clusterName,
String db,
String stateModel, int numPartition, int replica, int minActiveReplica,
long delay,
String rebalanceStrategy) {
+ return createResource(clusterName, db, stateModel, numPartition, replica,
minActiveReplica,
+ delay, DelayedAutoRebalancer.class.getName(), rebalanceStrategy);
+ }
+
+ protected IdealState createResourceWithWagedRebalance(String clusterName,
String db,
+ String stateModel, int numPartition, int replica, int minActiveReplica,
long delay) {
+ return createResource(clusterName, db, stateModel, numPartition, replica,
minActiveReplica,
+ delay, WagedRebalancer.class.getName(), null);
+ }
+
+ private IdealState createResource(String clusterName, String db, String
stateModel,
+ int numPartition, int replica, int minActiveReplica, long delay, String
rebalancerClassName,
+ String rebalanceStrategy) {
IdealState idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
if (idealState == null) {
@@ -362,7 +376,7 @@ public class ZkTestBase {
if (delay > 0) {
idealState.setRebalanceDelay(delay);
}
- idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+ idealState.setRebalancerClassName(rebalancerClassName);
_gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName,
db, idealState);
_gSetupTool.rebalanceStorageCluster(clusterName, db, replica);
idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
index 3371c8b..7d05416 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/MockAssignmentMetadataStore.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.model.ResourceAssignment;
+import org.mockito.Mockito;
/**
* A mock up metadata store for unit test.
@@ -32,8 +33,8 @@ public class MockAssignmentMetadataStore extends
AssignmentMetadataStore {
private Map<String, ResourceAssignment> _persistGlobalBaseline = new
HashMap<>();
private Map<String, ResourceAssignment> _persistBestPossibleAssignment = new
HashMap<>();
- MockAssignmentMetadataStore(BucketDataAccessor bucketDataAccessor, String
clusterName) {
- super(bucketDataAccessor, clusterName);
+ MockAssignmentMetadataStore() {
+ super(Mockito.mock(BucketDataAccessor.class), "");
}
public Map<String, ResourceAssignment> getBaseline() {
@@ -53,6 +54,10 @@ public class MockAssignmentMetadataStore extends
AssignmentMetadataStore {
_persistBestPossibleAssignment = bestPossibleAssignment;
}
+ public void close() {
+ // do nothing
+ }
+
public void clearMetadataStore() {
_persistBestPossibleAssignment.clear();
_persistGlobalBaseline.clear();
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
index 922915f..ecd2af3 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestAssignmentMetadataStore.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.rebalancer.waged;
*/
import java.util.Map;
+
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
@@ -28,6 +29,7 @@ import
org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.ResourceAssignment;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -79,7 +81,15 @@ public class TestAssignmentMetadataStore extends ZkTestBase {
_manager.connect();
// create AssignmentMetadataStore
- _store = new AssignmentMetadataStore(_manager);
+ _store = new
AssignmentMetadataStore(_manager.getMetadataStoreConnectionString(),
+ _manager.getClusterName());
+ }
+
+ @AfterClass
+ public void afterClass() {
+ if (_store != null) {
+ _store.close();
+ }
}
/**
@@ -91,11 +101,7 @@ public class TestAssignmentMetadataStore extends ZkTestBase
{
*/
@Test
public void testReadEmptyBaseline() {
- try {
- Map<String, ResourceAssignment> baseline = _store.getBaseline();
- Assert.fail("Should fail because there shouldn't be any data.");
- } catch (Exception e) {
- // OK
- }
+ Map<String, ResourceAssignment> baseline = _store.getBaseline();
+ Assert.assertTrue(baseline.isEmpty());
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index d6fd99b..e7368be 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -26,7 +26,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.helix.BucketDataAccessor;
+
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixRebalanceException;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
@@ -47,8 +47,9 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
public class TestWagedRebalancer extends AbstractTestClusterModel {
private Set<String> _instances;
@@ -63,9 +64,7 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
_algorithm = new MockRebalanceAlgorithm();
// Initialize a mock assignment metadata store
- BucketDataAccessor mockAccessor = Mockito.mock(BucketDataAccessor.class);
- String clusterName = ""; // an empty string for testing purposes
- _metadataStore = new MockAssignmentMetadataStore(mockAccessor,
clusterName);
+ _metadataStore = new MockAssignmentMetadataStore();
}
@Override
@@ -181,9 +180,9 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
String resourceName = csEntry.getKey();
CurrentState cs = csEntry.getValue();
for (Map.Entry<String, String> partitionStateEntry :
cs.getPartitionStateMap().entrySet()) {
- currentStateOutput.setCurrentState(resourceName,
- new Partition(partitionStateEntry.getKey()), instanceName,
- partitionStateEntry.getValue());
+ currentStateOutput
+ .setCurrentState(resourceName, new
Partition(partitionStateEntry.getKey()),
+ instanceName, partitionStateEntry.getValue());
}
}
}
@@ -216,7 +215,7 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
"DROPPED");
}
- @Test(dependsOnMethods = "testRebalance")
+ @Test(dependsOnMethods = "testRebalance", expectedExceptions =
HelixRebalanceException.class, expectedExceptionsMessageRegExp = "Input
contains invalid resource\\(s\\) that cannot be rebalanced by the WAGED
rebalancer. \\[Resource1\\] Failure Type: INVALID_INPUT")
public void testNonCompatibleConfiguration() throws IOException,
HelixRebalanceException {
_metadataStore.clearMetadataStore();
WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore,
_algorithm);
@@ -233,12 +232,7 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
.forEach(partition -> resource.addPartition(partition));
return resource;
}));
- Map<String, IdealState> newIdealStates =
- rebalancer.computeNewIdealStates(clusterData, resourceMap, new
CurrentStateOutput());
- Map<String, ResourceAssignment> algorithmResult =
_algorithm.getRebalanceResult();
- // The output shall not contains the nonCompatibleResource.
- resourceMap.remove(nonCompatibleResourceName);
- validateRebalanceResult(resourceMap, newIdealStates, algorithmResult);
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new
CurrentStateOutput());
}
// TODO test with invalid capacity configuration which will fail the cluster
model constructing.
@@ -283,7 +277,7 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
Assert.assertEquals(ex.getFailureType(),
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS);
Assert.assertEquals(ex.getMessage(),
- "Failed to get the persisted assignment records. Failure Type:
INVALID_REBALANCER_STATUS");
+ "Failed to get the current baseline assignment because of unexpected
error. Failure Type: INVALID_REBALANCER_STATUS");
}
}
@@ -425,8 +419,9 @@ public class TestWagedRebalancer extends
AbstractTestClusterModel {
Assert.assertTrue(newIdealStates.containsKey(resourceName));
IdealState is = newIdealStates.get(resourceName);
ResourceAssignment assignment = expectedResult.get(resourceName);
- Assert.assertEquals(is.getPartitionSet(), new
HashSet<>(assignment.getMappedPartitions()
- .stream().map(partition ->
partition.getPartitionName()).collect(Collectors.toSet())));
+ Assert.assertEquals(is.getPartitionSet(), new HashSet<>(
+ assignment.getMappedPartitions().stream().map(partition ->
partition.getPartitionName())
+ .collect(Collectors.toSet())));
for (String partitionName : is.getPartitionSet()) {
Assert.assertEquals(is.getInstanceStateMap(partitionName),
assignment.getReplicaMap(new Partition(partitionName)));
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
index 2a39482..759c685 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/MockRebalanceAlgorithm.java
@@ -72,7 +72,7 @@ public class MockRebalanceAlgorithm implements
RebalanceAlgorithm {
_resultHistory = result;
- // TODO remove this mockito when
OptimalAssignment.getOptimalResourceAssignment is ready.
+ // Mock the return value for supporting test.
OptimalAssignment optimalAssignment =
Mockito.mock(OptimalAssignment.class);
when(optimalAssignment.getOptimalResourceAssignment()).thenReturn(result);
return optimalAssignment;
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 0f799b3..91db076 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -103,8 +103,8 @@ public abstract class AbstractTestClusterModel {
ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
testClusterConfig.setMaxPartitionsPerInstance(5);
testClusterConfig.setDisabledInstances(Collections.emptyMap());
- testClusterConfig.setTopologyAwareEnabled(false);
testClusterConfig.setInstanceCapacityKeys(new
ArrayList<>(_capacityDataMap.keySet()));
+ testClusterConfig.setTopologyAwareEnabled(true);
when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
// 3. Mock the live instance node for the default instance.
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java
new file mode 100644
index 0000000..bd820a9
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestOptimalAssignment.java
@@ -0,0 +1,91 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestOptimalAssignment extends ClusterModelTestHelper {
+
+ @BeforeClass
+ public void initialize() {
+ super.initialize();
+ }
+
+ @Test
+ public void testUpdateAssignment() throws IOException {
+ OptimalAssignment assignment = new OptimalAssignment();
+
+ // update with empty cluster model
+ assignment.updateAssignments(getDefaultClusterModel());
+ Map<String, ResourceAssignment> optimalAssignmentMap =
+ assignment.getOptimalResourceAssignment();
+ Assert.assertEquals(optimalAssignmentMap, Collections.emptyMap());
+
+ // update with valid assignment
+ ClusterModel model = getDefaultClusterModel();
+ model.assign(_resourceNames.get(0), _partitionNames.get(1), "SLAVE",
_testInstanceId);
+ model.assign(_resourceNames.get(0), _partitionNames.get(0), "MASTER",
_testInstanceId);
+ assignment.updateAssignments(model);
+ optimalAssignmentMap = assignment.getOptimalResourceAssignment();
+
Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0)).getMappedPartitions(),
+ Arrays
+ .asList(new Partition(_partitionNames.get(0)), new
Partition(_partitionNames.get(1))));
+ Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0))
+ .getReplicaMap(new Partition(_partitionNames.get(1))),
+ Collections.singletonMap(_testInstanceId, "SLAVE"));
+ Assert.assertEquals(optimalAssignmentMap.get(_resourceNames.get(0))
+ .getReplicaMap(new Partition(_partitionNames.get(0))),
+ Collections.singletonMap(_testInstanceId, "MASTER"));
+ }
+
+ @Test(dependsOnMethods = "testUpdateAssignment")
+ public void TestAssignmentFailure() throws IOException {
+ OptimalAssignment assignment = new OptimalAssignment();
+ ClusterModel model = getDefaultClusterModel();
+
+ // record failure
+ AssignableReplica targetFailureReplica =
+
model.getAssignableReplicaMap().get(_resourceNames.get(0)).iterator().next();
+ AssignableNode targetFailureNode =
model.getAssignableNodes().get(_testInstanceId);
+ assignment.recordAssignmentFailure(targetFailureReplica, Collections
+ .singletonMap(targetFailureNode, Collections.singletonList("Assignment
Failure!")));
+
+ Assert.assertTrue(assignment.hasAnyFailure());
+
+ assignment.updateAssignments(getDefaultClusterModel());
+ try {
+ assignment.getOptimalResourceAssignment();
+ Assert.fail("Get optimal assignment shall fail because of the failure
record.");
+ } catch (HelixException ex) {
+ Assert.assertTrue(ex.getMessage().startsWith(
+ "Cannot get the optimal resource assignment since a calculation
failure is recorded."));
+ }
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
new file mode 100644
index 0000000..fb5375c
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalance.java
@@ -0,0 +1,477 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestWagedRebalance extends ZkTestBase {
+ protected final int NUM_NODE = 6;
+ protected static final int START_PORT = 12918;
+ protected static final int PARTITIONS = 20;
+ protected static final int TAGS = 2;
+
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+ protected ClusterControllerManager _controller;
+
+ List<MockParticipantManager> _participants = new ArrayList<>();
+ Map<String, String> _nodeToTagMap = new HashMap<>();
+ List<String> _nodes = new ArrayList<>();
+ private Set<String> _allDBs = new HashSet<>();
+ private int _replica = 3;
+
+ private static String[] _testModels = {
+ BuiltInStateModelDefinitions.OnlineOffline.name(),
+ BuiltInStateModelDefinitions.MasterSlave.name(),
+ BuiltInStateModelDefinitions.LeaderStandby.name()
+ };
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ System.out.println("START " + CLASS_NAME + " at " + new
Date(System.currentTimeMillis()));
+
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+ for (int i = 0; i < NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ addInstanceConfig(storageNodeName, i, TAGS);
+ }
+
+ // start dummy participants
+ for (String node : _nodes) {
+ MockParticipantManager participant = new MockParticipantManager(ZK_ADDR,
CLUSTER_NAME, node);
+ participant.syncStart();
+ _participants.add(participant);
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME,
controllerName);
+ _controller.syncStart();
+
+ enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+ }
+
+ protected void addInstanceConfig(String storageNodeName, int seqNo, int
tagCount) {
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ String tag = "tag-" + seqNo % tagCount;
+ _gSetupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME,
storageNodeName, tag);
+ _nodeToTagMap.put(storageNodeName, tag);
+ _nodes.add(storageNodeName);
+ }
+
+ @Test
+ public void test() throws Exception {
+ int i = 0;
+ for (String stateModel : _testModels) {
+ String db = "Test-DB-" + i++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica, _replica,
+ -1);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ _allDBs.add(db);
+ }
+ Thread.sleep(300);
+
+ validate(_replica);
+
+ // Adding 3 more resources
+ i = 0;
+ for (String stateModel : _testModels) {
+ String moreDB = "More-Test-DB-" + i++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, moreDB, stateModel,
PARTITIONS, _replica,
+ _replica, -1);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, moreDB, _replica);
+ _allDBs.add(moreDB);
+
+ Thread.sleep(300);
+
+ validate(_replica);
+ }
+
+ // Drop the 3 additional resources
+ for (int j = 0; j < 3; j++) {
+ String moreDB = "More-Test-DB-" + j++;
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, moreDB);
+ _allDBs.remove(moreDB);
+
+ Thread.sleep(300);
+
+ validate(_replica);
+ }
+ }
+
+ @Test(dependsOnMethods = "test")
+ public void testWithInstanceTag() throws Exception {
+ Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
+ int i = 3;
+ for (String tag : tags) {
+ String db = "Test-DB-" + i++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db,
+ BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS,
_replica, _replica, -1);
+ IdealState is =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ is.setInstanceGroupTag(tag);
+
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db,
is);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ _allDBs.add(db);
+ }
+ Thread.sleep(300);
+ validate(_replica);
+ }
+
+ @Test(dependsOnMethods = "test")
+ public void testChangeIdealState() throws InterruptedException {
+ String dbName = "Test-DB";
+ createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
+ BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica,
_replica, -1);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
+ _allDBs.add(dbName);
+ Thread.sleep(300);
+
+ validate(_replica);
+
+ // Adjust the replica count
+ IdealState is =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
dbName);
+ int newReplicaFactor = _replica - 1;
+ is.setReplicas("" + newReplicaFactor);
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
dbName, is);
+ Thread.sleep(300);
+
+ validate(newReplicaFactor);
+
+ // Adjust the partition list
+ is =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME,
dbName);
+ is.setNumPartitions(PARTITIONS + 1);
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME,
dbName, is);
+ _gSetupTool.getClusterManagementTool().rebalance(CLUSTER_NAME, dbName,
newReplicaFactor);
+ Thread.sleep(300);
+
+ validate(newReplicaFactor);
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
dbName);
+ Assert.assertEquals(ev.getPartitionSet().size(), PARTITIONS + 1);
+ }
+
+ @Test(dependsOnMethods = "test")
+ public void testDisableInstance() throws InterruptedException {
+ String dbName = "Test-DB";
+ createResourceWithWagedRebalance(CLUSTER_NAME, dbName,
+ BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS, _replica,
_replica, -1);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, _replica);
+ _allDBs.add(dbName);
+ Thread.sleep(300);
+
+ validate(_replica);
+
+ // Disable participants, keep only three left
+ Set<String> disableParticipants = new HashSet<>();
+
+ try {
+ for (int i = 3; i < _participants.size(); i++) {
+ MockParticipantManager p = _participants.get(i);
+ disableParticipants.add(p.getInstanceName());
+ InstanceConfig config = _gSetupTool.getClusterManagementTool()
+ .getInstanceConfig(CLUSTER_NAME, p.getInstanceName());
+ config.setInstanceEnabled(false);
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceConfig(CLUSTER_NAME, p.getInstanceName(), config);
+ }
+ Thread.sleep(300);
+
+ validate(_replica);
+
+ // Verify there is no assignment on the disabled participants.
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
dbName);
+ for (String partition : ev.getPartitionSet()) {
+ Map<String, String> replicaStateMap = ev.getStateMap(partition);
+ for (String instance : replicaStateMap.keySet()) {
+ Assert.assertFalse(disableParticipants.contains(instance));
+ }
+ }
+ } finally {
+ // recover the config
+ for (String instanceName : disableParticipants) {
+ InstanceConfig config =
+
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME,
instanceName);
+ config.setInstanceEnabled(true);
+ _gSetupTool.getClusterManagementTool()
+ .setInstanceConfig(CLUSTER_NAME, instanceName, config);
+ }
+ }
+ }
+
+ @Test(dependsOnMethods = "testDisableInstance")
+ public void testLackEnoughLiveInstances() throws Exception {
+ // shutdown participants, keep only two left
+ for (int i = 2; i < _participants.size(); i++) {
+ _participants.get(i).syncStop();
+ }
+
+ int j = 0;
+ for (String stateModel : _testModels) {
+ String db = "Test-DB-" + j++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica, _replica,
+ -1);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ _allDBs.add(db);
+ }
+
+ Thread.sleep(300);
+ // Verify if the partitions get assigned
+ validate(2);
+
+ // restart the participants within the zone
+ for (int i = 2; i < _participants.size(); i++) {
+ MockParticipantManager p = _participants.get(i);
+ MockParticipantManager newNode =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
p.getInstanceName());
+ _participants.set(i, newNode);
+ newNode.syncStart();
+ }
+
+ Thread.sleep(300);
+ // Verify if the partitions get assigned
+ validate(_replica);
+ }
+
+ @Test(dependsOnMethods = "testDisableInstance")
+ public void testLackEnoughInstances() throws Exception {
+ // shutdown participants, keep only two left
+ for (int i = 2; i < _participants.size(); i++) {
+ MockParticipantManager p = _participants.get(i);
+ p.syncStop();
+ _gSetupTool.getClusterManagementTool()
+ .enableInstance(CLUSTER_NAME, p.getInstanceName(), false);
+ _gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName());
+
+ }
+
+ int j = 0;
+ for (String stateModel : _testModels) {
+ String db = "Test-DB-" + j++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica, _replica,
+ -1);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ _allDBs.add(db);
+ }
+
+ Thread.sleep(300);
+ // Verify if the partitions get assigned
+ validate(2);
+
+ // Create new participants within the zone
+ for (int i = 2; i < _participants.size(); i++) {
+ MockParticipantManager p = _participants.get(i);
+ String replaceNodeName = p.getInstanceName() + "-replacement_" +
START_PORT;
+ addInstanceConfig(replaceNodeName, i, TAGS);
+ MockParticipantManager newNode =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, replaceNodeName);
+ _participants.set(i, newNode);
+ newNode.syncStart();
+ }
+
+ Thread.sleep(300);
+ // Verify if the partitions get assigned
+ validate(_replica);
+ }
+
+ @Test(dependsOnMethods = "test")
+ public void testMixedRebalancerUsage() throws InterruptedException {
+ int i = 0;
+ for (String stateModel : _testModels) {
+ String db = "Test-DB-" + i++;
+ if (i == 0) {
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS,
stateModel,
+ IdealState.RebalanceMode.FULL_AUTO + "",
CrushRebalanceStrategy.class.getName());
+ } else if (i == 1) {
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, PARTITIONS,
stateModel,
+ IdealState.RebalanceMode.FULL_AUTO + "",
CrushEdRebalanceStrategy.class.getName());
+ } else {
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica,
+ _replica, -1);
+ }
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ _allDBs.add(db);
+ }
+ Thread.sleep(300);
+
+ validate(_replica);
+ }
+
+ @Test(dependsOnMethods = "test")
+ public void testMaxPartitionLimitation() throws Exception {
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+ ClusterConfig clusterConfig =
configAccessor.getClusterConfig(CLUSTER_NAME);
+ // Change the cluster level config so no assignment can be done
+ clusterConfig.setMaxPartitionsPerInstance(1);
+ configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+ try {
+ String limitedResourceName = null;
+ int i = 0;
+ for (String stateModel : _testModels) {
+ String db = "Test-DB-" + i++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica,
+ _replica, -1);
+ if (i == 1) {
+ // The limited resource has additional limitation, so even the other
resources can be assigned
+ // later, this resource will still be blocked by the max partition
limitation.
+ limitedResourceName = db;
+ IdealState idealState =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ idealState.setMaxPartitionsPerInstance(1);
+
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db,
idealState);
+ }
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ _allDBs.add(db);
+ }
+ Thread.sleep(300);
+
+ // Since the WAGED rebalancer does not do partial rebalance, the initial
assignment won't show.
+ Assert.assertFalse(TestHelper.verify(() -> _allDBs.stream().allMatch(db
-> {
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
+ return ev != null && !ev.getPartitionSet().isEmpty();
+ }), 2000));
+
+ // Remove the cluster level limitation
+ clusterConfig.setMaxPartitionsPerInstance(-1);
+ configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+ Thread.sleep(300);
+
+ // wait until any of the resources is rebalanced
+ TestHelper.verify(() -> {
+ for (String db : _allDBs) {
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
+ if (ev != null && !ev.getPartitionSet().isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+ }, 3000);
+ ExternalView ev = _gSetupTool.getClusterManagementTool()
+ .getResourceExternalView(CLUSTER_NAME, limitedResourceName);
+ Assert.assertFalse(ev != null && !ev.getPartitionSet().isEmpty());
+
+ // Remove the resource level limitation
+ IdealState idealState = _gSetupTool.getClusterManagementTool()
+ .getResourceIdealState(CLUSTER_NAME, limitedResourceName);
+ idealState.setMaxPartitionsPerInstance(Integer.MAX_VALUE);
+ _gSetupTool.getClusterManagementTool()
+ .setResourceIdealState(CLUSTER_NAME, limitedResourceName,
idealState);
+
+ validate(_replica);
+ } finally {
+ // recover the config change
+ clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.setMaxPartitionsPerInstance(-1);
+ configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+ }
+ }
+
+ private void validate(int expectedReplica) {
+ HelixClusterVerifier _clusterVerifier =
+ new
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setResources(_allDBs).build();
+ Assert.assertTrue(_clusterVerifier.verify(5000));
+ for (String db : _allDBs) {
+ IdealState is =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
+ validateIsolation(is, ev, expectedReplica);
+ }
+ }
+
+ /**
+ * Validate each partition is different instances and with necessary tagged
instances.
+ */
+ private void validateIsolation(IdealState is, ExternalView ev, int
expectedReplica) {
+ String tag = is.getInstanceGroupTag();
+ for (String partition : is.getPartitionSet()) {
+ Map<String, String> assignmentMap =
ev.getRecord().getMapField(partition);
+ Set<String> instancesInEV = assignmentMap.keySet();
+ Assert.assertEquals(instancesInEV.size(), expectedReplica);
+ for (String instance : instancesInEV) {
+ if (tag != null) {
+ InstanceConfig config =
+
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME,
instance);
+ Assert.assertTrue(config.containsTag(tag));
+ }
+ }
+ }
+ }
+
+ @AfterMethod
+ public void afterMethod() throws Exception {
+ for (String db : _allDBs) {
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+ }
+ _allDBs.clear();
+ // waiting for all DB be dropped.
+ Thread.sleep(100);
+ ZkHelixClusterVerifier _clusterVerifier =
+ new
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setResources(_allDBs).build();
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ if (_controller != null && _controller.isConnected()) {
+ _controller.syncStop();
+ }
+ for (MockParticipantManager p : _participants) {
+ if (p != null && p.isConnected()) {
+ p.syncStop();
+ }
+ }
+ deleteCluster(CLUSTER_NAME);
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
new file mode 100644
index 0000000..0b020db
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceFaultZone.java
@@ -0,0 +1,374 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestWagedRebalanceFaultZone extends ZkTestBase {
+ protected final int NUM_NODE = 6;
+ protected static final int START_PORT = 12918;
+ protected static final int PARTITIONS = 20;
+ protected static final int ZONES = 3;
+ protected static final int TAGS = 2;
+
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+ protected ClusterControllerManager _controller;
+
+ List<MockParticipantManager> _participants = new ArrayList<>();
+ Map<String, String> _nodeToZoneMap = new HashMap<>();
+ Map<String, String> _nodeToTagMap = new HashMap<>();
+ List<String> _nodes = new ArrayList<>();
+ Set<String> _allDBs = new HashSet<>();
+ int _replica = 3;
+
+ String[] _testModels = {
+ BuiltInStateModelDefinitions.OnlineOffline.name(),
+ BuiltInStateModelDefinitions.MasterSlave.name(),
+ BuiltInStateModelDefinitions.LeaderStandby.name()
+ };
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ System.out.println("START " + CLASS_NAME + " at " + new
Date(System.currentTimeMillis()));
+
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+ for (int i = 0; i < NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ addInstanceConfig(storageNodeName, i, ZONES, TAGS);
+ }
+
+ // start dummy participants
+ for (String node : _nodes) {
+ MockParticipantManager participant = new MockParticipantManager(ZK_ADDR,
CLUSTER_NAME, node);
+ participant.syncStart();
+ _participants.add(participant);
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME,
controllerName);
+ _controller.syncStart();
+
+ enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+ enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true);
+ }
+
+ protected void addInstanceConfig(String storageNodeName, int seqNo, int
zoneCount, int tagCount) {
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ String zone = "zone-" + seqNo % zoneCount;
+ String tag = "tag-" + seqNo % tagCount;
+ _gSetupTool.getClusterManagementTool().setInstanceZoneId(CLUSTER_NAME,
storageNodeName, zone);
+ _gSetupTool.getClusterManagementTool().addInstanceTag(CLUSTER_NAME,
storageNodeName, tag);
+ _nodeToZoneMap.put(storageNodeName, zone);
+ _nodeToTagMap.put(storageNodeName, tag);
+ _nodes.add(storageNodeName);
+ }
+
+ @Test
+ public void testZoneIsolation() throws Exception {
+ int i = 0;
+ for (String stateModel : _testModels) {
+ String db = "Test-DB-" + i++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica,
+ _replica, -1);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ _allDBs.add(db);
+ }
+ Thread.sleep(300);
+
+ validate(_replica);
+ }
+
+ @Test
+ public void testZoneIsolationWithInstanceTag() throws Exception {
+ Set<String> tags = new HashSet<String>(_nodeToTagMap.values());
+ int i = 0;
+ for (String tag : tags) {
+ String db = "Test-DB-" + i++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db,
+ BuiltInStateModelDefinitions.MasterSlave.name(), PARTITIONS,
_replica, _replica, -1);
+ IdealState is =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ is.setInstanceGroupTag(tag);
+
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db,
is);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ _allDBs.add(db);
+ }
+ Thread.sleep(300);
+
+ validate(_replica);
+ }
+
+ @Test(dependsOnMethods = { "testZoneIsolation",
"testZoneIsolationWithInstanceTag" })
+ public void testLackEnoughLiveRacks() throws Exception {
+ // shutdown participants within one zone
+ String zone = _nodeToZoneMap.values().iterator().next();
+ for (int i = 0; i < _participants.size(); i++) {
+ MockParticipantManager p = _participants.get(i);
+ if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) {
+ p.syncStop();
+ }
+ }
+
+ int j = 0;
+ for (String stateModel : _testModels) {
+ String db = "Test-DB-" + j++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica,
+ _replica, -1);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ _allDBs.add(db);
+ }
+ Thread.sleep(300);
+ validate(2);
+
+ // restart the participants within the zone
+ for (int i = 0; i < _participants.size(); i++) {
+ MockParticipantManager p = _participants.get(i);
+ if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) {
+ MockParticipantManager newNode =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
p.getInstanceName());
+ _participants.set(i, newNode);
+ newNode.syncStart();
+ }
+ }
+
+ Thread.sleep(300);
+ // Verify if the partitions get assigned
+ validate(_replica);
+ }
+
+ @Test(dependsOnMethods = { "testLackEnoughLiveRacks" })
+ public void testLackEnoughRacks() throws Exception {
+ // shutdown participants within one zone
+ String zone = _nodeToZoneMap.values().iterator().next();
+ for (int i = 0; i < _participants.size(); i++) {
+ MockParticipantManager p = _participants.get(i);
+ if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) {
+ p.syncStop();
+ _gSetupTool.getClusterManagementTool()
+ .enableInstance(CLUSTER_NAME, p.getInstanceName(), false);
+ Thread.sleep(50);
+ _gSetupTool.dropInstanceFromCluster(CLUSTER_NAME, p.getInstanceName());
+ }
+ }
+
+ int j = 0;
+ for (String stateModel : _testModels) {
+ String db = "Test-DB-" + j++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica,
+ _replica, -1);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ _allDBs.add(db);
+ }
+ Thread.sleep(300);
+ validate(2);
+
+ // Create new participants within the zone
+ int nodeCount = _participants.size();
+ for (int i = 0; i < nodeCount; i++) {
+ MockParticipantManager p = _participants.get(i);
+ if (_nodeToZoneMap.get(p.getInstanceName()).equals(zone)) {
+ String replaceNodeName = p.getInstanceName() + "-replacement_" +
START_PORT;
+ addInstanceConfig(replaceNodeName, i, ZONES, TAGS);
+ MockParticipantManager newNode =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, replaceNodeName);
+ _participants.set(i, newNode);
+ newNode.syncStart();
+ }
+ }
+
+ Thread.sleep(300);
+ // Verify if the partitions get assigned
+ validate(_replica);
+ }
+
+ @Test(dependsOnMethods = { "testZoneIsolation",
"testZoneIsolationWithInstanceTag" })
+ public void testAddZone() throws Exception {
+ int i = 0;
+ for (String stateModel : _testModels) {
+ String db = "Test-DB-" + i++;
+ createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel,
PARTITIONS, _replica,
+ _replica, -1);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+ _allDBs.add(db);
+ }
+ Thread.sleep(300);
+
+ validate(_replica);
+
+ // Create new participants within the a new zone
+ Set<MockParticipantManager> newNodes = new HashSet<>();
+ Map<String, Integer> newNodeReplicaCount = new HashMap<>();
+
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+
+ try {
+ // Configure the preference so as to allow movements.
+ ClusterConfig clusterConfig =
configAccessor.getClusterConfig(CLUSTER_NAME);
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference =
new HashMap<>();
+ preference.put(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 10);
+ preference.put(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT,
0);
+ clusterConfig.setGlobalRebalancePreference(preference);
+ configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+ int nodeCount = 2;
+ for (int j = 0; j < nodeCount; j++) {
+ String newNodeName = "new-zone-node-" + j + "_" + START_PORT;
+ // Add all new node to the new zone
+ addInstanceConfig(newNodeName, j, ZONES + 1, TAGS);
+ MockParticipantManager newNode =
+ new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newNodeName);
+ newNode.syncStart();
+ newNodes.add(newNode);
+ newNodeReplicaCount.put(newNodeName, 0);
+ }
+ Thread.sleep(300);
+
+ validate(_replica);
+
+ // The new zone nodes shall have some assignments
+ for (String db : _allDBs) {
+ IdealState is =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
+ validateZoneAndTagIsolation(is, ev, _replica);
+ for (String partition : ev.getPartitionSet()) {
+ Map<String, String> stateMap = ev.getStateMap(partition);
+ for (String node : stateMap.keySet()) {
+ if (newNodeReplicaCount.containsKey(node)) {
+ newNodeReplicaCount.computeIfPresent(node, (nodeName,
replicaCount) -> replicaCount + 1);
+ }
+ }
+ }
+ }
+ Assert.assertTrue(newNodeReplicaCount.values().stream().allMatch(count
-> count > 0));
+ } finally {
+ // Revert the preference
+ ClusterConfig clusterConfig =
configAccessor.getClusterConfig(CLUSTER_NAME);
+ Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preference =
new HashMap<>();
+ preference.put(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1);
+ preference.put(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT,
1);
+ clusterConfig.setGlobalRebalancePreference(preference);
+ configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+ // Stop the new nodes
+ for (MockParticipantManager p : newNodes) {
+ if (p != null && p.isConnected()) {
+ p.syncStop();
+ }
+ }
+ }
+ }
+
+ private void validate(int expectedReplica) {
+ ZkHelixClusterVerifier _clusterVerifier =
+ new
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setResources(_allDBs).build();
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ for (String db : _allDBs) {
+ IdealState is =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+ ExternalView ev =
+
_gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME,
db);
+ validateZoneAndTagIsolation(is, ev, expectedReplica);
+ }
+ }
+
+ /**
+ * Validate instances for each partition is on different zone and with
necessary tagged instances.
+ */
+ private void validateZoneAndTagIsolation(IdealState is, ExternalView ev, int
expectedReplica) {
+ String tag = is.getInstanceGroupTag();
+ for (String partition : is.getPartitionSet()) {
+ Set<String> assignedZones = new HashSet<String>();
+
+ Map<String, String> assignmentMap =
ev.getRecord().getMapField(partition);
+ Set<String> instancesInEV = assignmentMap.keySet();
+ // TODO: preference List is not persisted in IS.
+ // Assert.assertEquals(instancesInEV, instancesInIs);
+ for (String instance : instancesInEV) {
+ assignedZones.add(_nodeToZoneMap.get(instance));
+ if (tag != null) {
+ InstanceConfig config =
+
_gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME,
instance);
+ Assert.assertTrue(config.containsTag(tag));
+ }
+ }
+ Assert.assertEquals(assignedZones.size(), expectedReplica);
+ }
+ }
+
+ @AfterMethod
+ public void afterMethod() throws Exception {
+ for (String db : _allDBs) {
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, db);
+ }
+ _allDBs.clear();
+ // waiting for all DB be dropped.
+ Thread.sleep(100);
+ ZkHelixClusterVerifier _clusterVerifier =
+ new
StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+ .setResources(_allDBs).build();
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ /*
+ * shutdown order: 1) disconnect the controller 2) disconnect participants
+ */
+ if (_controller != null && _controller.isConnected()) {
+ _controller.syncStop();
+ }
+ for (MockParticipantManager p : _participants) {
+ if (p != null && p.isConnected()) {
+ p.syncStop();
+ }
+ }
+ deleteCluster(CLUSTER_NAME);
+ System.out.println("END " + CLASS_NAME + " at " + new
Date(System.currentTimeMillis()));
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java
new file mode 100644
index 0000000..412fc8c
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestWagedRebalanceTopologyAware.java
@@ -0,0 +1,114 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.InstanceConfig;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestWagedRebalanceTopologyAware extends
TestWagedRebalanceFaultZone {
+ private static final String TOLOPOGY_DEF = "/DOMAIN/ZONE/INSTANCE";
+ private static final String DOMAIN_NAME = "Domain";
+ private static final String FAULT_ZONE = "ZONE";
+
+ protected final String CLASS_NAME = getShortClassName();
+ protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ System.out.println("START " + CLASS_NAME + " at " + new
Date(System.currentTimeMillis()));
+
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+ ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+ ClusterConfig clusterConfig =
configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.setTopology(TOLOPOGY_DEF);
+ clusterConfig.setFaultZoneType(FAULT_ZONE);
+ configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+ for (int i = 0; i < NUM_NODE; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ addInstanceConfig(storageNodeName, i, ZONES, TAGS);
+ }
+
+ // start dummy participants
+ for (String node : _nodes) {
+ MockParticipantManager participant = new MockParticipantManager(ZK_ADDR,
CLUSTER_NAME, node);
+ participant.syncStart();
+ _participants.add(participant);
+ }
+
+ // start controller
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME,
controllerName);
+ _controller.syncStart();
+
+ enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+ enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true);
+ }
+
+ protected void addInstanceConfig(String storageNodeName, int seqNo, int
zoneCount, int tagCount) {
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ String zone = "zone-" + seqNo % zoneCount;
+ String tag = "tag-" + seqNo % tagCount;
+
+ InstanceConfig config =
+ _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME,
storageNodeName);
+ config.setDomain(
+ String.format("DOMAIN=%s,ZONE=%s,INSTANCE=%s", DOMAIN_NAME, zone,
storageNodeName));
+ config.addTag(tag);
+ _gSetupTool.getClusterManagementTool().setInstanceConfig(CLUSTER_NAME,
storageNodeName, config);
+
+ _nodeToZoneMap.put(storageNodeName, zone);
+ _nodeToTagMap.put(storageNodeName, tag);
+ _nodes.add(storageNodeName);
+ }
+
+ @Test
+ public void testZoneIsolation() throws Exception {
+ super.testZoneIsolation();
+ }
+
+ @Test
+ public void testZoneIsolationWithInstanceTag() throws Exception {
+ super.testZoneIsolationWithInstanceTag();
+ }
+
+ @Test(dependsOnMethods = { "testZoneIsolation",
"testZoneIsolationWithInstanceTag" })
+ public void testLackEnoughLiveRacks() throws Exception {
+ super.testLackEnoughLiveRacks();
+ }
+
+ @Test(dependsOnMethods = { "testLackEnoughLiveRacks" })
+ public void testLackEnoughRacks() throws Exception {
+ super.testLackEnoughRacks();
+ }
+
+ @Test(dependsOnMethods = { "testZoneIsolation",
"testZoneIsolationWithInstanceTag" })
+ public void testAddZone() throws Exception {
+ super.testAddZone();
+ }
+}