This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer2 in repository https://gitbox.apache.org/repos/asf/helix.git
commit 0d677647b75f2617bf61d7dc7627b855e1c11171 Author: Jiajun Wang <[email protected]> AuthorDate: Mon Aug 19 10:49:12 2019 -0700 Add cluster level default instance config. (#413) This config will be applied to the instance when there is no (or empty) capacity configuration in the Instance Config. Also add unit tests. --- .../rebalancer/waged/model/AssignableNode.java | 8 +++- .../java/org/apache/helix/model/ClusterConfig.java | 51 ++++++++++++++++++++ .../org/apache/helix/model/InstanceConfig.java | 13 ++++-- .../rebalancer/waged/model/TestAssignableNode.java | 13 ++++++ .../org/apache/helix/model/TestClusterConfig.java | 54 ++++++++++++++++++++++ 5 files changed, 134 insertions(+), 5 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java index 5fc04d7..e2fd676 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java @@ -88,11 +88,15 @@ public class AssignableNode { Collection<AssignableReplica> existingAssignment) { reset(); - _currentCapacity.putAll(instanceConfig.getInstanceCapacityMap()); + Map<String, Integer> instanceCapacity = instanceConfig.getInstanceCapacityMap(); + if (instanceCapacity.isEmpty()) { + instanceCapacity = clusterConfig.getDefaultInstanceCapacityMap(); + } + _currentCapacity.putAll(instanceCapacity); _faultZone = computeFaultZone(clusterConfig, instanceConfig); _instanceTags = new HashSet<>(instanceConfig.getTags()); _disabledPartitionsMap = instanceConfig.getDisabledPartitionsMap(); - _maxCapacity = instanceConfig.getInstanceCapacityMap(); + _maxCapacity = instanceCapacity; _maxPartition = clusterConfig.getMaxPartitionsPerInstance(); assignNewBatch(existingAssignment); diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index ee942c7..a8c1da9 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -29,10 +29,12 @@ import org.apache.helix.api.config.StateTransitionThrottleConfig; import org.apache.helix.api.config.StateTransitionTimeoutConfig; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Cluster configurations @@ -86,6 +88,8 @@ public class ClusterConfig extends HelixProperty { // The required instance capacity keys for resource partition assignment calculation. INSTANCE_CAPACITY_KEYS, + // The default instance capacity if no capacity is configured in the Instance Config node. + DEFAULT_INSTANCE_CAPACITY_MAP, // The preference of the rebalance result. // EVENNESS - Evenness of the resource utilization, partition, and top state distribution. // LESS_MOVEMENT - the tendency of keeping the current assignment instead of moving the partition for optimal assignment. @@ -700,6 +704,53 @@ public class ClusterConfig extends HelixProperty { } /** + * Get the default instance capacity information from the map fields. + * @return data map if it exists, or empty map + */ + public Map<String, Integer> getDefaultInstanceCapacityMap() { + Map<String, String> capacityData = + _record.getMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name()); + + if (capacityData != null) { + return capacityData.entrySet().stream().collect( + Collectors.toMap(entry -> entry.getKey(), entry -> Integer.parseInt(entry.getValue()))); + } + return Collections.emptyMap(); + } + + /** + * Set the default instance capacity information with an Integer mapping. + * @param capacityDataMap - map of instance capacity data + * @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty + * + * This information is required by the global rebalancer. + * @see <a href="Rebalance Algorithm"> + * https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter + * </a> + * If the instance capacity is not configured in neither Instance Config nor Cluster Config, the + * cluster topology is considered invalid. So the rebalancer may stop working. + */ + public void setDefaultInstanceCapacityMap(Map<String, Integer> capacityDataMap) + throws IllegalArgumentException { + if (capacityDataMap == null || capacityDataMap.size() == 0) { + throw new IllegalArgumentException("Default Instance Capacity Data is empty"); + } + + Map<String, String> capacityData = new HashMap<>(); + + capacityDataMap.entrySet().stream().forEach(entry -> { + if (entry.getValue() < 0) { + throw new IllegalArgumentException(String + .format("Default Instance Capacity Data contains a negative value: %s = %d", + entry.getKey(), entry.getValue())); + } + capacityData.put(entry.getKey(), Integer.toString(entry.getValue())); + }); + + _record.setMapField(ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name(), capacityData); + } + + /** * Set the global rebalancer's assignment preference. * @param preference A map of the GlobalRebalancePreferenceKey and the corresponding weight. * The ratio of the configured weights will determine the rebalancer's behavior. diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index 88fd1dd..ac1814d 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -507,8 +507,7 @@ public class InstanceConfig extends HelixProperty { } /** - * Get the instance capacity information from the map fields - * + * Get the instance capacity information from the map fields. * @return data map if it exists, or empty map */ public Map<String, Integer> getInstanceCapacityMap() { @@ -523,9 +522,17 @@ public class InstanceConfig extends HelixProperty { } /** - * Set the instance capacity information with an Integer mapping + * Set the instance capacity information with an Integer mapping. * @param capacityDataMap - map of instance capacity data * @throws IllegalArgumentException - when any of the data value is a negative number or when the map is empty + * + * This information is required by the global rebalancer. + * @see <a href="Rebalance Algorithm"> + * https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer#rebalance-algorithm-adapter + * </a> + * If the instance capacity is not configured in neither Instance Config nor Cluster Config, the + * cluster topology is considered invalid. So the rebalancer may stop working. + * Note that when a rebalancer requires this capacity information, it will ignore INSTANCE_WEIGHT. */ public void setInstanceCapacityMap(Map<String, Integer> capacityDataMap) throws IllegalArgumentException { diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java index d7fcce9..f55d0fc 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java @@ -200,4 +200,17 @@ public class TestAssignableNode extends AbstractTestClusterModel { Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/"); } + + @Test + public void testDefaultInstanceCapacity() { + ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId"); + testClusterConfig.setDefaultInstanceCapacityMap(_capacityDataMap); + + InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId"); + + AssignableNode assignableNode = + new AssignableNode(testClusterConfig, testInstanceConfig, _testInstanceId, + Collections.emptyList()); + Assert.assertEquals(assignableNode.getMaxCapacity(), _capacityDataMap); + } } diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java index 209b196..5cf9bff 100644 --- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java +++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java @@ -20,6 +20,8 @@ package org.apache.helix.model; */ import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.helix.ZNRecord; import org.testng.Assert; import org.testng.annotations.Test; @@ -127,4 +129,56 @@ public class TestClusterConfig { ClusterConfig testConfig = new ClusterConfig("testId"); testConfig.setGlobalRebalancePreference(preference); } + + @Test + public void testGetInstanceCapacityMap() { + Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", 3); + + Map<String, String> capacityDataMapString = + ImmutableMap.of("item1", "1", "item2", "2", "item3", "3"); + + ZNRecord rec = new ZNRecord("testId"); + rec.setMapField(ClusterConfig.ClusterConfigProperty.DEFAULT_INSTANCE_CAPACITY_MAP.name(), + capacityDataMapString); + ClusterConfig testConfig = new ClusterConfig(rec); + + Assert.assertTrue(testConfig.getDefaultInstanceCapacityMap().equals(capacityDataMap)); + } + + @Test + public void testGetInstanceCapacityMapEmpty() { + ClusterConfig testConfig = new ClusterConfig("testId"); + + Assert.assertTrue(testConfig.getDefaultInstanceCapacityMap().equals(Collections.emptyMap())); + } + + @Test + public void testSetInstanceCapacityMap() { + Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", 3); + + Map<String, String> capacityDataMapString = + ImmutableMap.of("item1", "1", "item2", "2", "item3", "3"); + + ClusterConfig testConfig = new ClusterConfig("testConfig"); + testConfig.setDefaultInstanceCapacityMap(capacityDataMap); + + Assert.assertEquals(testConfig.getRecord().getMapField(ClusterConfig.ClusterConfigProperty. + DEFAULT_INSTANCE_CAPACITY_MAP.name()), capacityDataMapString); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default Instance Capacity Data is empty") + public void testSetInstanceCapacityMapEmpty() { + Map<String, Integer> capacityDataMap = new HashMap<>(); + + ClusterConfig testConfig = new ClusterConfig("testConfig"); + testConfig.setDefaultInstanceCapacityMap(capacityDataMap); + } + + @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Default Instance Capacity Data contains a negative value: item3 = -3") + public void testSetInstanceCapacityMapInvalid() { + Map<String, Integer> capacityDataMap = ImmutableMap.of("item1", 1, "item2", 2, "item3", -3); + + ClusterConfig testConfig = new ClusterConfig("testConfig"); + testConfig.setDefaultInstanceCapacityMap(capacityDataMap); + } }
