This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 2deef9d92 Change partitionAssignment API to handle ANY_LIVEINSTANCE 
(#2817)
2deef9d92 is described below

commit 2deef9d921dd12cde26c30fa2c7297c41c168ad2
Author: Grant Paláu Spencer <gspen...@linkedin.com>
AuthorDate: Wed Jul 17 16:31:50 2024 -0700

    Change partitionAssignment API to handle ANY_LIVEINSTANCE (#2817)
    
    Handle ANY_LIVEINSTANCE by calling getReplicaCount
---
 .../helix/controller/stages/ClusterDataCache.java  | 21 +++++---------------
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |  6 +++++-
 .../java/org/apache/helix/model/IdealState.java    |  5 +++--
 .../main/java/org/apache/helix/util/HelixUtil.java |  2 +-
 .../rest/server/TestPartitionAssignmentAPI.java    | 23 ++++++++++++++++++++++
 5 files changed, 37 insertions(+), 20 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index cc0dd6629..a11cf2b07 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -159,22 +159,11 @@ public class ClusterDataCache extends 
ResourceControllerDataProvider {
     Map<String, IdealState> idealStateMap = 
_idealStateCache.getIdealStateMap();
 
     if (idealStateMap.containsKey(resourceName)) {
-      String replicasStr = idealStateMap.get(resourceName).getReplicas();
-
-      if (replicasStr != null) {
-        if 
(replicasStr.equals(IdealState.IdealStateConstants.ANY_LIVEINSTANCE.toString()))
 {
-          replicas = _liveInstanceMap.size();
-        } else {
-          try {
-            replicas = Integer.parseInt(replicasStr);
-          } catch (Exception e) {
-            LogUtil.logError(LOG, _eventId, "invalid replicas string: " + 
replicasStr + " for "
-                + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
-          }
-        }
-      } else {
-        LogUtil.logError(LOG, _eventId, "idealState for resource: " + 
resourceName
-            + " does NOT have replicas for " + (_isTaskCache ? "TASK" : 
"DEFAULT") + "pipeline");
+      int replicasStr = 
idealStateMap.get(resourceName).getReplicaCount(_liveInstanceMap.size());
+
+      if (replicasStr == 0) {
+        LogUtil.logError(LOG, _eventId,
+            "idealState for resource: " + resourceName + " does NOT have 
replicas for " + (_isTaskCache ? "TASK" : "DEFAULT") + "pipeline");
       }
     }
     return replicas;
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index 39ae9ae67..36c9a6a5e 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -2484,7 +2484,11 @@ public class ZKHelixAdmin implements HelixAdmin {
     setResourceIdealState(clusterName, idealState.getResourceName(), 
idealState);
 
     // 4. rebalance the resource
-    rebalance(clusterName, idealState.getResourceName(), 
Integer.parseInt(idealState.getReplicas()),
+    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, 
_baseDataAccessor);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    List<String> liveNodes = 
accessor.getChildNames(keyBuilder.liveInstances());
+
+    rebalance(clusterName, idealState.getResourceName(), 
idealState.getReplicaCount(liveNodes.size()),
         idealState.getResourceName(), idealState.getInstanceGroupTag());
 
     return true;
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java 
b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index aafcca89b..44a52d09e 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -554,8 +554,9 @@ public class IdealState extends HelixProperty {
   }
 
   /**
-   * Get the number of replicas for each partition of this resource
-   * @return number of replicas (as a string)
+   * Get the number of replicas for each partition of this resource. Return 
value can be "ANY_LIVEINSTANCE", use
+   * {@link #getReplicaCount(int)} to prevent NumberFormatException when 
parsing string for int. 
+   * @return String value of the replica count,
    */
   public String getReplicas() {
     // HACK: if replica doesn't exists, use the length of the first list field
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 834b84678..70269a746 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -394,7 +394,7 @@ public final class HelixUtil {
         RebalanceStrategy.class.cast(loadClass(HelixUtil.class, 
strategyClassName).newInstance());
 
     strategy.init(idealState.getResourceName(), partitions, 
stateModelDefinition
-            .getStateCountMap(liveInstances.size(), 
Integer.parseInt(idealState.getReplicas())),
+            .getStateCountMap(liveInstances.size(), 
idealState.getReplicaCount(liveInstances.size())),
         idealState.getMaxPartitionsPerInstance());
 
     // Remove all disabled instances so that Helix will not consider them live.
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
index e00c392b0..f3a4bbdd9 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestPartitionAssignmentAPI.java
@@ -38,6 +38,7 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.TestHelper;
 import org.apache.helix.constants.InstanceConstants;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
 import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -379,6 +380,26 @@ public class TestPartitionAssignmentAPI extends 
AbstractTestClass {
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
   }
 
+  private void createAutoRebalanceResource(String db) {
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, db, 1, "LeaderStandby",
+        IdealState.RebalanceMode.FULL_AUTO + "", null);
+    _resources.add(db);
+
+    IdealState idealState =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+
+    idealState.setRebalancerClassName(DelayedAutoRebalancer.class.getName());
+    idealState.setRebalanceStrategy(AutoRebalanceStrategy.class.getName());
+    idealState.setReplicas("ANY_LIVEINSTANCE");
+    idealState.enable(true);
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
db, idealState);
+
+    ResourceConfig resourceConfig = new ResourceConfig(db);
+    _configAccessor.setResourceConfig(CLUSTER_NAME, db, resourceConfig);
+
+    Assert.assertTrue(_clusterVerifier.verifyByPolling());
+  }
+
   @Test
   private void testComputePartitionAssignmentMaintenanceMode() throws 
Exception {
 
@@ -399,6 +420,8 @@ public class TestPartitionAssignmentAPI extends 
AbstractTestClass {
           MIN_ACTIVE_REPLICA, 100000L);
     }
 
+    createAutoRebalanceResource("TEST_AUTOREBALANCE_DB_0");
+
     // Wait for cluster to converge after adding resources
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 

Reply via email to