This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push: new 2deef9d92 Change partitionAssignment API to handle ANY_LIVEINSTANCE (#2817) 2deef9d92 is described below commit 2deef9d921dd12cde26c30fa2c7297c41c168ad2 Author: Grant Paláu Spencer <gspen...@linkedin.com> AuthorDate: Wed Jul 17 16:31:50 2024 -0700 Change partitionAssignment API to handle ANY_LIVEINSTANCE (#2817) Handle ANY_LIVEINSTANCE by calling getReplicaCount --- .../helix/controller/stages/ClusterDataCache.java | 21 +++++--------------- .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 6 +++++- .../java/org/apache/helix/model/IdealState.java | 5 +++-- .../main/java/org/apache/helix/util/HelixUtil.java | 2 +- .../rest/server/TestPartitionAssignmentAPI.java | 23 ++++++++++++++++++++++ 5 files changed, 37 insertions(+), 20 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index cc0dd6629..a11cf2b07 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -159,22 +159,11 @@ public class ClusterDataCache extends ResourceControllerDataProvider { Map<String, IdealState> idealStateMap = _idealStateCache.getIdealStateMap(); if (idealStateMap.containsKey(resourceName)) { - String replicasStr = idealStateMap.get(resourceName).getReplicas(); - - if (replicasStr != null) { - if (replicasStr.equals(IdealState.IdealStateConstants.ANY_LIVEINSTANCE.toString())) { - replicas = _liveInstanceMap.size(); - } else { - try { - replicas = Integer.parseInt(replicasStr); - } catch (Exception e) { - LogUtil.logError(LOG, _eventId, "invalid replicas string: " + replicasStr + " for " - + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); - } - } - } else { - LogUtil.logError(LOG, _eventId, "idealState for resource: " + resourceName - + " does NOT have replicas for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); + int replicasStr = idealStateMap.get(resourceName).getReplicaCount(_liveInstanceMap.size()); + + if (replicasStr == 0) { + LogUtil.logError(LOG, _eventId, + "idealState for resource: " + resourceName + " does NOT have replicas for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline"); } } return replicas; diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 39ae9ae67..36c9a6a5e 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -2484,7 +2484,11 @@ public class ZKHelixAdmin implements HelixAdmin { setResourceIdealState(clusterName, idealState.getResourceName(), idealState); // 4. rebalance the resource - rebalance(clusterName, idealState.getResourceName(), Integer.parseInt(idealState.getReplicas()), + HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, _baseDataAccessor); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + List<String> liveNodes = accessor.getChildNames(keyBuilder.liveInstances()); + + rebalance(clusterName, idealState.getResourceName(), idealState.getReplicaCount(liveNodes.size()), idealState.getResourceName(), idealState.getInstanceGroupTag()); return true; diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java index aafcca89b..44a52d09e 100644 --- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java +++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java @@ -554,8 +554,9 @@ public class IdealState extends HelixProperty { } /** - * Get the number of replicas for each partition of this resource - * @return number of replicas (as a string) + * Get the number of replicas for each partition of this resource. Return value can be "ANY_LIVEINSTANCE", use + * {@link #getReplicaCount(int)} to prevent NumberFormatException when parsing string for int. + * @return String value of the replica count, */ public String getReplicas() { // HACK: if replica doesn't exists, use the length of the first list field diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java index 834b84678..70269a746 100644 --- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java @@ -394,7 +394,7 @@ public final class HelixUtil { RebalanceStrategy.class.cast(loadClass(HelixUtil.class, strategyClassName).newInstance()); strategy.init(idealState.getResourceName(), partitions, stateModelDefinition - .getStateCountMap(liveInstances.size(), Integer.parseInt(idealState.getReplicas())), + .getStateCountMap(liveInstances.size(), idealState.getReplicaCount(liveInstances.size())), idealState.getMaxPartitionsPerInstance()); // Remove all disabled instances so that Helix will not consider them live. diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java index e00c392b0..f3a4bbdd9 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java @@ -38,6 +38,7 @@ import org.apache.helix.HelixDataAccessor; import org.apache.helix.TestHelper; import org.apache.helix.constants.InstanceConstants; import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.integration.manager.ClusterControllerManager; @@ -379,6 +380,26 @@ public class TestPartitionAssignmentAPI extends AbstractTestClass { Assert.assertTrue(_clusterVerifier.verifyByPolling()); } + private void createAutoRebalanceResource(String db) { + _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, 1, "LeaderStandby", + IdealState.RebalanceMode.FULL_AUTO + "", null); + _resources.add(db); + + IdealState idealState = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db); + + idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName()); + idealState.setRebalanceStrategy(AutoRebalanceStrategy.class.getName()); + idealState.setReplicas("ANY_LIVEINSTANCE"); + idealState.enable(true); + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, idealState); + + ResourceConfig resourceConfig = new ResourceConfig(db); + _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig); + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + @Test private void testComputePartitionAssignmentMaintenanceMode() throws Exception { @@ -399,6 +420,8 @@ public class TestPartitionAssignmentAPI extends AbstractTestClass { MIN_ACTIVE_REPLICA, 100000L); } + createAutoRebalanceResource("TEST_AUTOREBALANCE_DB_0"); + // Wait for cluster to converge after adding resources Assert.assertTrue(_clusterVerifier.verifyByPolling());