This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new b02291e86 [Message Prioritization] - Add currentReplicaNumber metadata 
for message prioritization by client (#3043)
b02291e86 is described below

commit b02291e8653fd39ea9956231147fa384ed415895
Author: Charanya Sudharsanan <[email protected]>
AuthorDate: Mon Jul 14 11:24:13 2025 -0700

    [Message Prioritization] - Add currentReplicaNumber metadata for message 
prioritization by client (#3043)
    
    Add currentReplicaNumber metadata for message prioritization by client
---
 .../controller/stages/MessageGenerationPhase.java  | 117 +++-
 .../main/java/org/apache/helix/model/Message.java  |  44 +-
 .../apache/helix/model/StateModelDefinition.java   |  19 +
 .../java/org/apache/helix/util/MessageUtil.java    |  94 +++-
 .../TestPrioritizationMessageGeneration.java       | 594 +++++++++++++++++++++
 5 files changed, 826 insertions(+), 42 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 859c6679e..0868ac6d6 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixDefinedState;
@@ -57,17 +58,18 @@ import org.slf4j.LoggerFactory;
  * Compares the currentState, pendingState with IdealState and generate 
messages
  */
 public class MessageGenerationPhase extends AbstractBaseStage {
-  private final static String NO_DESIRED_STATE = "NoDesiredState";
+  private static final String NO_DESIRED_STATE = "NoDesiredState";
 
   // If we see there is any invalid pending message leaving on host, i.e. 
message
   // tells participant to change from SLAVE to MASTER, and the participant is 
already
   // at MASTER state, we wait for timeout and if the message is still not 
cleaned up by
   // participant, controller will cleanup them proactively to unblock further 
state
   // transition
-  public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
+  public static final long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
       
.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60 
* 1000);
-  private final static String PENDING_MESSAGE = "pending message";
-  private final static String STALE_MESSAGE = "stale message";
+  private static final String PENDING_MESSAGE = "pending message";
+  private static final String STALE_MESSAGE = "stale message";
+  private static final String OFFLINE = "OFFLINE";
 
   private static Logger logger = 
LoggerFactory.getLogger(MessageGenerationPhase.class);
 
@@ -163,6 +165,18 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
       // desired-state->list of generated-messages
       Map<String, List<Message>> messageMap = new HashMap<>();
 
+      /*
+       * Calculate the current active replica count based on state model type.
+       * This represents the number of replicas currently serving traffic for 
this partition
+       * Active replicas include: top states, secondary top states(excluding 
OFFLINE) and ERROR
+       * states.
+       * Active replicas exclude: OFFLINE and DROPPED states.
+       * All qualifying state transitions for this partition will receive this 
same value,
+       * allowing clients to understand the current availability level and 
prioritize accordingly.
+       */
+      int currentActiveReplicaCount =
+          calculateCurrentActiveReplicaCount(currentStateMap, stateModelDef);
+
       for (String instanceName : instanceStateMap.keySet()) {
 
         Set<Message> staleMessages = 
cache.getStaleMessagesByInstance(instanceName);
@@ -250,17 +264,39 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
                     pendingMessage, manager, resource, partition, 
sessionIdMap, instanceName,
                     stateModelDef, cancellationMessage, isCancellationEnabled);
           } else {
+            // Set currentActiveReplicaNumber to provide metadata for 
potential message
+            // prioritization by participant.
+            // Assign the current active replica count to all qualifying 
upward transitions for this
+            // partition.
+            // This ensures consistent prioritization metadata across 
concurrent state transitions.
+            // -1 indicates no prioritization metadata, for eg:Downward ST 
messages get a -1.
+            int currentActiveReplicaNumber = -1;
+
+            /*
+             * Assign currentActiveReplicaNumber for qualifying upward state 
transitions.
+             * Criteria for assignment:
+             * - Must be an upward state transition according to state model
+             * - Target state must be considered active (according to state 
model type)
+             */
+            if (stateModelDef.isUpwardStateTransition(currentState, nextState)
+                && isStateActive(nextState, stateModelDef)) {
+
+              // All qualifying transitions for this partition get the same
+              // currentActiveReplicaNumber
+              currentActiveReplicaNumber = currentActiveReplicaCount;
+            }
+
             // Create new state transition message
-            message = MessageUtil
-                .createStateTransitionMessage(manager.getInstanceName(), 
manager.getSessionId(),
-                    resource, partition.getPartitionName(), instanceName, 
currentState, nextState,
-                    sessionIdMap.get(instanceName), stateModelDef.getId());
+            message = 
MessageUtil.createStateTransitionMessage(manager.getInstanceName(),
+                manager.getSessionId(), resource, 
partition.getPartitionName(), instanceName,
+                currentState, nextState, sessionIdMap.get(instanceName), 
stateModelDef.getId(),
+                currentActiveReplicaNumber);
 
             if (logger.isDebugEnabled()) {
               LogUtil.logDebug(logger, _eventId, String.format(
-                  "Resource %s partition %s for instance %s with currentState 
%s and nextState %s",
+                  "Resource %s partition %s for instance %s with currentState 
%s, nextState %s and currentActiveReplicaNumber %d",
                   resource.getResourceName(), partition.getPartitionName(), 
instanceName,
-                  currentState, nextState));
+                  currentState, nextState, currentActiveReplicaNumber));
             }
           }
         }
@@ -290,7 +326,66 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
     } // end of for-each-partition
   }
 
-  private boolean shouldCreateSTCancellation(Message pendingMessage, String 
desiredState,
+  /**
+   * Calculate the current active replica count based on state model type.
+   * The count includes replicas in top states, secondary top states 
(excluding OFFLINE),
+   * and ERROR states since helix considers them active.Count excludes OFFLINE 
and DROPPED states.
+   * @param currentStateMap
+   * @param stateModelDef
+   * @return The number of replicas currently in active states, used to 
determine the
+   *         currentActiveReplicaNumber for the partition.
+   */
+  private int calculateCurrentActiveReplicaCount(Map<String, String> 
currentStateMap,
+      StateModelDefinition stateModelDef) {
+    return (int) currentStateMap.values().stream()
+        .filter(state -> stateModelDef.getTopState().contains(state) // Top 
states (MASTER, ONLINE,
+                                                                     // LEADER)
+            || getActiveSecondaryTopStates(stateModelDef).contains(state) // 
Active secondary states
+                                                                          // 
(SLAVE, STANDBY,
+                                                                          // 
BOOTSTRAP)
+            || HelixDefinedState.ERROR.name().equals(state) // ERROR states 
(still considered
+                                                            // active)
+        // DROPPED and OFFLINE are automatically excluded by 
getActiveSecondaryTopStates()
+        ).count();
+  }
+
+  /**
+   * Get active secondary top states - states that are not non-serving states 
like OFFLINE and
+   * DROPPED.
+   * Reasons for elimination:
+   * - getSecondTopStates() can include OFFLINE as a secondary top state in 
some state models.
+   * Example - OnlineOffline:
+   * getSecondTopStates() = ["OFFLINE"] as it transitions to ONLINE.
+   * After filtering: activeSecondaryTopStates=[] (removes "OFFLINE" as it's 
not a serving state).
+   * @param stateModelDef
+   */
+  private List<String> getActiveSecondaryTopStates(StateModelDefinition 
stateModelDef) {
+    return stateModelDef.getSecondTopStates().stream()
+        // Remove non-serving states
+        .filter(state -> !OFFLINE.equals(state) && 
!HelixDefinedState.DROPPED.name().equals(state))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Determines if the given state is considered active based on the state 
model type.
+   * Active states include: top states, active secondary top states (excluding 
OFFLINE),
+   * and ERROR states. Active states exclude OFFLINE and DROPPED states.
+   * ERROR state replicas are always considered active in HELIX as they do not
+   * affect availability.
+   * @param state
+   * @param stateModelDef
+   * @return true if the state is considered active, false otherwise
+   */
+  private boolean isStateActive(String state, StateModelDefinition 
stateModelDef) {
+    // ERROR state is always considered active regardless of state model type
+    if (HelixDefinedState.ERROR.name().equals(state)) {
+      return true;
+    }
+    return stateModelDef.getTopState().contains(state)
+        || getActiveSecondaryTopStates(stateModelDef).contains(state);
+  }
+
+    private boolean shouldCreateSTCancellation(Message pendingMessage, String 
desiredState,
       String initialState) {
     if (pendingMessage == null) {
       return false;
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java 
b/helix-core/src/main/java/org/apache/helix/model/Message.java
index acf075821..68751de40 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -104,7 +104,8 @@ public class Message extends HelixProperty {
     RELAY_FROM,
     EXPIRY_PERIOD,
     SRC_CLUSTER,
-    ST_REBALANCE_TYPE
+    ST_REBALANCE_TYPE,
+    CURRENT_ACTIVE_REPLICA_NUMBER
   }
 
   /**
@@ -137,12 +138,8 @@ public class Message extends HelixProperty {
   /**
    * Compares the creation time of two Messages
    */
-  public static final Comparator<Message> CREATE_TIME_COMPARATOR = new 
Comparator<Message>() {
-    @Override
-    public int compare(Message m1, Message m2) {
-      return new Long(m1.getCreateTimeStamp()).compareTo(new 
Long(m2.getCreateTimeStamp()));
-    }
-  };
+  public static final Comparator<Message> CREATE_TIME_COMPARATOR =
+      (m1, m2) -> Long.compare(m2.getCreateTimeStamp(), 
m1.getCreateTimeStamp());
 
   /**
    * Instantiate a message
@@ -935,6 +932,39 @@ public class Message extends HelixProperty {
     _record.setSimpleField(Attributes.SRC_CLUSTER.name(), clusterName);
   }
 
+  /**
+   * Set current active replica count for participant-side message 
prioritization.
+   * This field indicates the number of replicas currently in active states 
(including ERROR states)
+   * for this partition at the time the state transition message is generated.
+   * Active states include top states, secondary top states (for single-top 
state models),
+   * and ERROR states.
+   * This metadata enables participants to prioritize recovery scenarios (low 
active counts)
+   * over load balancing scenarios (high active counts) in custom thread pools 
or message handlers.
+   * For example, 2→3 transitions get higher priority than 3→4 transitions.
+   * Default value is -1 for transitions that don't require prioritization 
metadata.(for eg :
+   * downward transitions).
+   * @param currentActiveReplicaNumber the number of currently active replicas 
(-1 when there is no
+   *          prioritization metadata,
+   *          >=0 for transitions containing current availability level)
+   */
+  public void setCurrentActiveReplicaNumber(int currentActiveReplicaNumber) {
+    _record.setIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(),
+        currentActiveReplicaNumber);
+  }
+
+  /**
+   * Get the current active replica count for this partition at message 
generation time.
+   * This value represents the number of replicas in active states (including 
ERROR states) before
+   * any state transitions occur, enabling participant-side message 
prioritization based on
+   * current availability levels.
+   * @return current active replica count, or -1 for cases where we don't 
provide metadata for
+   *         prioritization like downward state transitions.
+   */
+
+  public int getCurrentActiveReplicaNumber() {
+    return 
_record.getIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(), -1);
+  }
+
   /**
    * Check if this message is targetted for a controller
    * @return true if this is a controller message, false otherwise
diff --git 
a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java 
b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index fcf24fb30..e79d58938 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -505,4 +505,23 @@ public class StateModelDefinition extends HelixProperty {
     }
     return stateCounts;
   }
+
+  /**
+   * Check if a state transition is upward
+   * @param fromState source state
+   * @param toState destination state
+   * @return True if it's an upward state transition, false otherwise
+   */
+  public boolean isUpwardStateTransition(String fromState, String toState) {
+    Map<String, Integer> statePriorityMap = getStatePriorityMap();
+
+    Integer fromStateWeight = statePriorityMap.get(fromState);
+    Integer toStateWeight = statePriorityMap.get(toState);
+
+    if (fromStateWeight == null || toStateWeight == null) {
+      return false;
+    }
+
+    return toStateWeight < fromStateWeight;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
index 94de8331b..4a93050dc 100644
--- a/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
@@ -48,7 +48,7 @@ public class MessageUtil {
           toState);
 
       Message message =
-          
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
+          
createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
               srcInstanceName, srcSessionId, resource, partitionName, 
instanceName, currentState,
               nextState, sessionId, stateModelDefName);
 
@@ -60,28 +60,6 @@ public class MessageUtil {
     return null;
   }
 
-  public static Message createStateTransitionMessage(String srcInstanceName, 
String srcSessionId,
-      Resource resource, String partitionName, String instanceName, String 
currentState,
-      String nextState, String tgtSessionId, String stateModelDefName) {
-    Message message =
-        createStateTransitionMessage(Message.MessageType.STATE_TRANSITION, 
srcInstanceName,
-            srcSessionId, resource, partitionName, instanceName, currentState, 
nextState, tgtSessionId,
-            stateModelDefName);
-
-    // Set the retry count for state transition messages.
-    // TODO: make the retry count configurable in ClusterConfig or IdealState
-    message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
-
-    if (resource.getResourceGroupName() != null) {
-      message.setResourceGroupName(resource.getResourceGroupName());
-    }
-    if (resource.getResourceTag() != null) {
-      message.setResourceTag(resource.getResourceTag());
-    }
-
-    return message;
-  }
-
   /**
    * Creates a message to change participant status
    * {@link org.apache.helix.model.LiveInstance.LiveInstanceStatus}
@@ -121,7 +99,7 @@ public class MessageUtil {
   }
 
   /* Creates state transition or state transition cancellation message */
-  private static Message createStateTransitionMessage(Message.MessageType 
messageType,
+  private static Message createBasicStateTransitionMessage(Message.MessageType 
messageType,
       String srcInstanceName, String srcSessionId, Resource resource, String 
partitionName,
       String instanceName, String currentState, String nextState, String 
tgtSessionId,
       String stateModelDefName) {
@@ -136,4 +114,72 @@ public class MessageUtil {
 
     return message;
   }
+
+  /**
+   * Create a state transition message with replica prioritization metadata
+   * @param srcInstanceName source instance name
+   * @param srcSessionId source session id
+   * @param resource resource
+   * @param partitionName partition name
+   * @param instanceName target instance name
+   * @param currentState current state
+   * @param nextState next state
+   * @param tgtSessionId target session id
+   * @param stateModelDefName state model definition name
+   * @param currentActiveReplicaNumber The number of replicas currently in 
active states
+   *          for this partition before any state transitions occur. This 
metadata
+   *          enables participant-side message prioritization by indicating the
+   *          current availability level (e.g., 0→1 recovery scenarios get 
higher
+   *          priority than 2→3 load balancing scenarios). Set to -1 for 
transitions
+   *          that should not be prioritized (downward transitions).
+   *          Active states include top states, secondary top states (for 
single-top
+   *          state models), and ERROR states since they are still considered 
active by Helix.
+   * @return state transition message
+   */
+  public static Message createStateTransitionMessage(String srcInstanceName, 
String srcSessionId,
+      Resource resource, String partitionName, String instanceName, String 
currentState,
+      String nextState, String tgtSessionId, String stateModelDefName,
+      int currentActiveReplicaNumber) {
+    Message message = 
createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION,
+        srcInstanceName, srcSessionId, resource, partitionName, instanceName, 
currentState,
+        nextState, tgtSessionId, stateModelDefName);
+
+    // Set the retry count for state transition messages.
+    // TODO: make the retry count configurable in ClusterConfig or IdealState
+    message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
+
+    if (resource.getResourceGroupName() != null) {
+      message.setResourceGroupName(resource.getResourceGroupName());
+    }
+    if (resource.getResourceTag() != null) {
+      message.setResourceTag(resource.getResourceTag());
+    }
+
+    // Set current active replica number for participant-side prioritization
+    message.setCurrentActiveReplicaNumber(currentActiveReplicaNumber);
+
+    return message;
+  }
+
+  /**
+   * Create a state transition message (backward compatibility)
+   * @param srcInstanceName source instance name
+   * @param srcSessionId source session id
+   * @param resource resource
+   * @param partitionName partition name
+   * @param instanceName target instance name
+   * @param currentState current state
+   * @param nextState next state
+   * @param tgtSessionId target session id
+   * @param stateModelDefName state model definition name
+   * @return state transition message
+   */
+  public static Message createStateTransitionMessage(String srcInstanceName, 
String srcSessionId,
+      Resource resource, String partitionName, String instanceName, String 
currentState,
+      String nextState, String tgtSessionId, String stateModelDefName) {
+    // currentActiveReplicaNumber is set to -1 for ST messages needing no 
prioritization metadata
+    // (backward compatibility)
+    return createStateTransitionMessage(srcInstanceName, srcSessionId, 
resource, partitionName,
+        instanceName, currentState, nextState, tgtSessionId, 
stateModelDefName, -1);
+  }
 }
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestPrioritizationMessageGeneration.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestPrioritizationMessageGeneration.java
new file mode 100644
index 000000000..1a9b50ae3
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestPrioritizationMessageGeneration.java
@@ -0,0 +1,594 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.OnlineOfflineWithBootstrapSMD;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static 
org.apache.helix.tools.StateModelConfigGenerator.generateConfigForMasterSlave;
+import static 
org.apache.helix.tools.StateModelConfigGenerator.generateConfigForOnlineOffline;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestPrioritizationMessageGeneration extends 
MessageGenerationPhase {
+
+  private static final String TEST_CLUSTER = "TestCluster";
+  private static final String TEST_RESOURCE = "TestDB";
+  private static final String PARTITION_0 = "TestDB_0";
+  private static final String INSTANCE_0 = "localhost_0";
+  private static final String INSTANCE_1 = "localhost_1";
+  private static final String INSTANCE_2 = "localhost_2";
+  private static final String INSTANCE_3 = "localhost_3";
+  private static final String INSTANCE_4 = "localhost_4";
+  private static final String SESSION_ID = "123";
+
+  // === Tests for upward transitions and replica counting ===
+
+  @Test
+  public void testCurrentReplicaCountForUpwardTransitions() throws Exception {
+    // Test: Upward transitions from non-second top states should receive 
currentActiveReplicaNumber
+    StateModelDefinition stateModelDef = new 
StateModelDefinition(generateConfigForMasterSlave());
+    // Setup: 1 MASTER, 1 SLAVE, 2 OFFLINE (current active = 2)
+    Map<String, CurrentState> currentStates = 
createCurrentStates(Map.of(INSTANCE_0, "MASTER",
+        INSTANCE_1, "SLAVE", INSTANCE_2, "OFFLINE", INSTANCE_3, "OFFLINE"), 
"MasterSlave");
+    // Action: Move 2 OFFLINE instances to SLAVE (upward transitions)
+    Map<String, String> bestPossible = Map.of(INSTANCE_0, "MASTER", // No 
change
+        INSTANCE_1, "SLAVE", // No change
+        INSTANCE_2, "SLAVE", // OFFLINE -> SLAVE (upward)
+        INSTANCE_3, "SLAVE" // OFFLINE -> SLAVE (upward)
+    );
+
+    List<Message> messages = processAndGetMessages(stateModelDef, 
currentStates, bestPossible, 4);
+
+    // Verify: 2 messages generated, both with current active replica count = 2
+    // Current active replicas: 1 MASTER + 1 SLAVE = 2
+    Assert.assertEquals(messages.size(), 2);
+    for (Message msg : messages) {
+      Assert.assertEquals(msg.getCurrentActiveReplicaNumber(), 2,
+          "Upward transitions should have currentActiveReplicaNumber = current 
active replica count");
+      Assert.assertTrue(msg.getTgtName().equals(INSTANCE_2) || 
msg.getTgtName().equals(INSTANCE_3));
+    }
+  }
+
+  @Test
+  public void testZeroReplicaScenario() throws Exception {
+    // Test: All instances starting from OFFLINE (0 active replicas)
+    StateModelDefinition stateModelDef = new 
StateModelDefinition(generateConfigForMasterSlave());
+
+    // Setup: All instances OFFLINE (current active = 0)
+    Map<String, CurrentState> currentStates = createCurrentStates(
+        Map.of(INSTANCE_0, "OFFLINE", INSTANCE_1, "OFFLINE", INSTANCE_2, 
"OFFLINE"), "MasterSlave");
+
+    // Action: Create 1 MASTER, 2 SLAVE from all OFFLINE
+    Map<String, String> bestPossible = Map.of(INSTANCE_0, "SLAVE", // OFFLINE 
-> SLAVE (upward)
+        INSTANCE_1, "MASTER", // OFFLINE -> MASTER (upward)
+        INSTANCE_2, "SLAVE" // OFFLINE -> SLAVE (upward)
+    );
+
+    List<Message> messages = processAndGetMessages(stateModelDef, 
currentStates, bestPossible, 3);
+
+    // Verify: All messages have currentActiveReplicaNumber = 0
+    Assert.assertEquals(messages.size(), 3);
+    for (Message msg : messages) {
+      Assert.assertEquals(msg.getCurrentActiveReplicaNumber(), 0,
+          "All upward transitions should have currentActiveReplicaNumber = 0");
+    }
+  }
+
+  // === Tests for non-upward transitions ===
+
+  @Test
+  public void testNoReplicaNumberForNonUpwardTransitions() throws Exception {
+    // Test: Downward transitions should not receive currentActiveReplicaNumber
+    StateModelDefinition stateModelDef = new 
StateModelDefinition(generateConfigForMasterSlave());
+
+    Map<String, CurrentState> currentStates =
+        createCurrentStates(Map.of(INSTANCE_0, "SLAVE"), "MasterSlave");
+
+    // Action: SLAVE -> OFFLINE (downward transition)
+    Map<String, String> bestPossible = Map.of(INSTANCE_0, "OFFLINE");
+
+    List<Message> messages = processAndGetMessages(stateModelDef, 
currentStates, bestPossible, 1);
+
+    // Verify: Downward transition gets default value (-1)
+    Assert.assertEquals(messages.size(), 1);
+    Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), -1,
+        "Non-upward state transitions should not have 
currentActiveReplicaNumber assigned");
+  }
+
+  // === Tests for pending messages ===
+
+  @Test
+  public void testPendingMessagesDoNotAffectCurrentReplicaCount() throws 
Exception {
+    // Test: SLAVE -> MASTER (second top to top) should not receive 
currentActiveReplicaNumber
+    StateModelDefinition stateModelDef = new 
StateModelDefinition(generateConfigForMasterSlave());
+
+    // Setup: 1 MASTER, 2 OFFLINE (current active = 1)
+    Map<String, CurrentState> currentStates = createCurrentStates(
+        Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "OFFLINE", INSTANCE_2, 
"OFFLINE"), "MasterSlave");
+
+    CurrentStateOutput currentStateOutput = 
createCurrentStateOutput(currentStates);
+
+    // Add pending message for INSTANCE_1: OFFLINE->SLAVE
+    Message pendingMsg = createMessage("OFFLINE", "SLAVE", INSTANCE_1);
+    currentStateOutput.setPendingMessage(TEST_RESOURCE, new 
Partition(PARTITION_0), INSTANCE_1,
+        pendingMsg);
+
+    ClusterEvent event = prepareClusterEvent(stateModelDef, 
currentStateOutput, 3);
+
+    // Action: Both offline instances should become SLAVE
+    Map<String, String> bestPossible = Map.of(INSTANCE_0, "MASTER", // No 
change
+        INSTANCE_1, "SLAVE", // Has pending message, no new message
+        INSTANCE_2, "SLAVE" // OFFLINE -> SLAVE (new transition)
+    );
+
+    setBestPossibleState(event, bestPossible);
+    process(event);
+
+    MessageOutput output = 
event.getAttribute(AttributeName.MESSAGES_ALL.name());
+    List<Message> messages = output.getMessages(TEST_RESOURCE, new 
Partition(PARTITION_0));
+
+    // Verify: Only new message for INSTANCE_2, with current active count = 1 
(ignoring pending)
+    Assert.assertEquals(messages.size(), 1);
+    Assert.assertEquals(messages.get(0).getTgtName(), INSTANCE_2);
+    Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), 1,
+        "currentActiveReplicaNumber should be based on current active replicas 
only, not including pending transitions");
+  }
+
+  // === Tests for different state model types ===
+
+  @Test
+  public void testSingleTopStateModelWithoutSecondaryTop() throws Exception {
+    // Test: ONLINE-OFFLINE model (single top without secondary) - only top + 
ERROR count as active
+    StateModelDefinition onlineOfflineStateModel = 
CustomOnlineOfflineSMD.build(1);
+
+    // Verify this is a single-top state model
+    Assert.assertTrue(onlineOfflineStateModel.isSingleTopStateModel(),
+        "ONLINE-OFFLINE should be a single-top state model");
+
+    // Setup: 0 ONLINE, 1 ERROR, 1 OFFLINE (current active = 1: ERROR only)
+    Map<String, CurrentState> currentStates = createCurrentStates(
+        Map.of(INSTANCE_0, "OFFLINE", INSTANCE_1, "ERROR", INSTANCE_2, 
"OFFLINE"), "OnlineOffline");
+
+    // Action: One OFFLINE becomes ONLINE
+    Map<String, String> bestPossible = Map.of(INSTANCE_0, "ONLINE", // OFFLINE 
-> ONLINE (upward)
+        INSTANCE_1, "ERROR", // No change
+        INSTANCE_2, "OFFLINE" // No change
+    );
+
+    List<Message> messages = 
processAndGetMessagesForOnlineOffline(onlineOfflineStateModel,
+        currentStates, bestPossible, 3);
+
+    // Verify: Current active = 1 (0 ONLINE + 1 ERROR)
+    Assert.assertEquals(messages.size(), 1);
+    Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), 1,
+        "Single-top without secondary: only top states + ERROR count as 
active");
+  }
+
+  @Test
+  public void testSingleTopStateModelWithSecondaryTop() throws Exception {
+    // Test: MASTER-SLAVE model (single top with secondary) - top + secondary 
+ ERROR count as
+    // active
+    StateModelDefinition stateModelDef = new 
StateModelDefinition(generateConfigForMasterSlave());
+
+    // Setup: 1 MASTER, 2 SLAVE, 1 ERROR, 1 OFFLINE (current active = 4)
+    Map<String, CurrentState> currentStates =
+        createCurrentStates(Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "SLAVE", 
INSTANCE_2, "SLAVE",
+            INSTANCE_3, "ERROR", INSTANCE_4, "OFFLINE"), "MasterSlave");
+
+    // Action: OFFLINE becomes SLAVE
+    Map<String, String> bestPossible = Map.of(INSTANCE_0, "MASTER", 
INSTANCE_1, "SLAVE", INSTANCE_2,
+        "SLAVE", INSTANCE_3, "ERROR", INSTANCE_4, "SLAVE" // OFFLINE -> SLAVE 
(upward)
+    );
+
+    List<Message> messages = processAndGetMessages(stateModelDef, 
currentStates, bestPossible, 5);
+
+    // Verify: Current active = 4 (1 MASTER + 2 SLAVE + 1 ERROR)
+    Assert.assertEquals(messages.size(), 1);
+    Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), 4,
+        "Single-top with secondary: top + secondary top + ERROR count as 
active");
+  }
+
+  @Test
+  public void testMultiTopStateModelWithoutSecondaryTop() throws Exception {
+    StateModelDefinition stateModelDef = new 
StateModelDefinition(generateConfigForOnlineOffline());
+
+    // Setup: 1 ONLINE, 2 OFFLINE (current active = 1)
+    Map<String, CurrentState> currentStates = createCurrentStates(
+        Map.of(INSTANCE_0, "ONLINE", INSTANCE_1, "OFFLINE", INSTANCE_2, 
"OFFLINE"),
+        "OfflineOnline");
+
+    // Action: Both OFFLINE become ONLINE
+    Map<String, String> bestPossible = Map.of(INSTANCE_0, "ONLINE", // No 
change
+        INSTANCE_1, "ONLINE", // OFFLINE -> ONLINE (upward)
+        INSTANCE_2, "ONLINE" // OFFLINE -> ONLINE (upward)
+    );
+
+    List<Message> messages =
+        processAndGetMessagesForOnlineOffline(stateModelDef, currentStates, 
bestPossible, 3);
+
+    // Verify: Current active = 1 (only ONLINE states count)
+    Assert.assertEquals(messages.size(), 2);
+    for (Message msg : messages) {
+      Assert.assertEquals(msg.getCurrentActiveReplicaNumber(), 1,
+          "Multi-top state model without secondary top: only top states count 
as active");
+    }
+  }
+
+  @Test
+  public void testMultiTopStateModelWithSecondaryTop() throws Exception {
+    StateModelDefinition stateModelDef = OnlineOfflineWithBootstrapSMD.build();
+
+    // Setup: 2 ONLINE, 1 BOOTSTRAP, 1 OFFLINE (current active = 3)
+    Map<String, CurrentState> currentStates =
+        createCurrentStates(Map.of(INSTANCE_0, "ONLINE", INSTANCE_1, "ONLINE", 
INSTANCE_2,
+            "BOOTSTRAP", INSTANCE_3, "OFFLINE"), "OnlineOfflineWithBootstrap");
+
+    // Action: OFFLINE becomes BOOTSTRAP.
+    Map<String, String> bestPossible = Map.of(INSTANCE_0, "ONLINE", // No 
change
+        INSTANCE_1, "ONLINE", // No change
+        INSTANCE_2, "BOOTSTRAP", // No change
+        INSTANCE_3, "BOOTSTRAP" // OFFLINE -> BOOTSTRAP (upward)
+    );
+
+    List<Message> messages =
+        processAndGetMessagesForOnlineOffline(stateModelDef, currentStates, 
bestPossible, 4);
+
+    // Verify: Current active = 3 (ONLINE + BOOTSTRAP state counts)
+    Assert.assertEquals(messages.size(), 1);
+    for (Message msg : messages) {
+      Assert.assertEquals(msg.getCurrentActiveReplicaNumber(), 3,
+          "Multi-top state model without secondary top: top states and 
secondary top states count as active");
+    }
+  }
+
+  // === Tests for ERROR state handling ===
+
+  @Test
+  public void testErrorStateIncludedInActiveCount() throws Exception {
+    // Test: ERROR states are always counted as active
+    StateModelDefinition stateModelDef = new 
StateModelDefinition(generateConfigForMasterSlave());
+
+    // Setup: 1 MASTER, 1 ERROR, 1 OFFLINE (current active = 2)
+    Map<String, CurrentState> currentStates = createCurrentStates(
+        Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "ERROR", INSTANCE_2, 
"OFFLINE"), "MasterSlave");
+
+    // Action: OFFLINE becomes SLAVE
+    Map<String, String> bestPossible =
+        Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "ERROR", INSTANCE_2, "SLAVE");
+
+    List<Message> messages = processAndGetMessages(stateModelDef, 
currentStates, bestPossible, 3);
+
+    // Verify: Current active = 2 (1 MASTER + 1 ERROR)
+    Assert.assertEquals(messages.size(), 1);
+    Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), 2,
+        "ERROR state replicas should be included in active count");
+  }
+
+  @Test
+  public void testTransitionFromErrorToOffline() throws Exception {
+    // Test: ERROR -> OFFLINE is a downward transition (active to inactive)
+    StateModelDefinition stateModelDef = new 
StateModelDefinition(generateConfigForMasterSlave());
+
+    Map<String, CurrentState> currentStates =
+        createCurrentStates(Map.of(INSTANCE_0, "ERROR"), "MasterSlave");
+
+    // Action: ERROR -> OFFLINE (standard recovery pattern)
+    Map<String, String> bestPossible = Map.of(INSTANCE_0, "OFFLINE");
+
+    List<Message> messages = processAndGetMessages(stateModelDef, 
currentStates, bestPossible, 1);
+
+    // Verify: Downward transition gets default value (-1)
+    Assert.assertEquals(messages.size(), 1);
+    Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), -1,
+        "ERROR→OFFLINE transitions should not have currentActiveReplicaNumber 
assigned (downward)");
+  }
+
+  // === Tests for DROPPED state handling ===
+
+  @Test
+  public void testDroppedReplicasExcludedFromActiveCount() throws Exception {
+    // Test: DROPPED replicas are excluded from active count calculations
+    StateModelDefinition stateModelDef = new 
StateModelDefinition(generateConfigForMasterSlave());
+
+    // Setup: 1 MASTER, 1 OFFLINE, 1 OFFLINE (current active = 1)
+    Map<String, CurrentState> currentStates = createCurrentStates(
+        Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "OFFLINE", INSTANCE_2, 
"OFFLINE"), "MasterSlave");
+
+    // Action: One OFFLINE drops, other becomes SLAVE
+    Map<String, String> bestPossible = Map.of(INSTANCE_0, "MASTER", // No 
change
+        INSTANCE_1, "DROPPED", // OFFLINE -> DROPPED (downward)
+        INSTANCE_2, "SLAVE" // OFFLINE -> SLAVE (upward)
+    );
+
+    List<Message> messages = processAndGetMessages(stateModelDef, 
currentStates, bestPossible, 3);
+
+    // Verify: 2 messages, upward gets active count, DROPPED doesn't
+    Assert.assertEquals(messages.size(), 2);
+
+    Message upwardMsg = messages.stream()
+        .filter(m -> m.getTgtName().equals(INSTANCE_2) && 
m.getToState().equals("SLAVE"))
+        .findFirst().orElse(null);
+    Message droppedMsg = messages.stream()
+        .filter(m -> m.getTgtName().equals(INSTANCE_1) && 
m.getToState().equals("DROPPED"))
+        .findFirst().orElse(null);
+
+    Assert.assertNotNull(upwardMsg, "Should have upward transition message");
+    Assert.assertNotNull(droppedMsg, "Should have dropped transition message");
+
+    Assert.assertEquals(upwardMsg.getCurrentActiveReplicaNumber(), 1,
+        "Upward transition should have current active replica count");
+    Assert.assertEquals(droppedMsg.getCurrentActiveReplicaNumber(), -1,
+        "DROPPED transition should not have currentActiveReplicaNumber");
+  }
+
+  // === Helper methods ===
+
+  /**
+   * Creates current state map for multiple instances with specified states.
+   */
+  private Map<String, CurrentState> createCurrentStates(Map<String, String> 
instanceStates,
+      String stateModelRef) {
+    Map<String, CurrentState> currentStateMap = new HashMap<>();
+    for (Map.Entry<String, String> entry : instanceStates.entrySet()) {
+      CurrentState currentState = new CurrentState(TEST_RESOURCE);
+      currentState.setState(PARTITION_0, entry.getValue());
+      currentState.setSessionId(SESSION_ID);
+      currentState.setStateModelDefRef(stateModelRef);
+      currentStateMap.put(entry.getKey(), currentState);
+    }
+    return currentStateMap;
+  }
+
+  /**
+   * Creates CurrentStateOutput from current state map.
+   */
+  private CurrentStateOutput createCurrentStateOutput(Map<String, 
CurrentState> currentStateMap) {
+    CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+    for (Map.Entry<String, CurrentState> entry : currentStateMap.entrySet()) {
+      String instance = entry.getKey();
+      CurrentState currentState = entry.getValue();
+      currentStateOutput.setCurrentState(TEST_RESOURCE, new 
Partition(PARTITION_0), instance,
+          currentState.getState(PARTITION_0));
+    }
+    return currentStateOutput;
+  }
+
+  /**
+   * Processes state transitions and returns generated messages.
+   */
+  private List<Message> processAndGetMessages(StateModelDefinition 
stateModelDef,
+      Map<String, CurrentState> currentStates, Map<String, String> 
bestPossible, int instanceCount)
+      throws Exception {
+    CurrentStateOutput currentStateOutput = 
createCurrentStateOutput(currentStates);
+    ClusterEvent event = prepareClusterEvent(stateModelDef, 
currentStateOutput, instanceCount);
+    setBestPossibleState(event, bestPossible);
+
+    process(event);
+
+    MessageOutput output = 
event.getAttribute(AttributeName.MESSAGES_ALL.name());
+    return output.getMessages(TEST_RESOURCE, new Partition(PARTITION_0));
+  }
+
+  /**
+   * Processes state transitions for OnlineOffline state model.
+   */
+  private List<Message> 
processAndGetMessagesForOnlineOffline(StateModelDefinition stateModelDef,
+      Map<String, CurrentState> currentStates, Map<String, String> 
bestPossible, int instanceCount)
+      throws Exception {
+    CurrentStateOutput currentStateOutput = 
createCurrentStateOutput(currentStates);
+    ClusterEvent event =
+        prepareClusterEventForOnlineOffline(stateModelDef, currentStateOutput, 
instanceCount);
+    setBestPossibleState(event, bestPossible);
+
+    process(event);
+
+    MessageOutput output = 
event.getAttribute(AttributeName.MESSAGES_ALL.name());
+    return output.getMessages(TEST_RESOURCE, new Partition(PARTITION_0));
+  }
+
+  /**
+   * Sets best possible state in cluster event.
+   */
+  private void setBestPossibleState(ClusterEvent event, Map<String, String> 
partitionMap) {
+    BestPossibleStateOutput bestPossibleOutput = new BestPossibleStateOutput();
+    bestPossibleOutput.setState(TEST_RESOURCE, new Partition(PARTITION_0), 
partitionMap);
+    event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), 
bestPossibleOutput);
+  }
+
+  /**
+   * Prepares cluster event with necessary mock objects and configurations.
+   */
+  private ClusterEvent prepareClusterEvent(StateModelDefinition stateModelDef,
+      CurrentStateOutput currentStateOutput, int instanceCount) {
+    ClusterEvent event = createBaseClusterEvent(currentStateOutput);
+
+    // Setup ResourceControllerDataProvider
+    ResourceControllerDataProvider cache = 
mock(ResourceControllerDataProvider.class);
+    when(cache.getClusterConfig()).thenReturn(new ClusterConfig(TEST_CLUSTER));
+    when(cache.getStateModelDef("MasterSlave")).thenReturn(stateModelDef);
+    
when(cache.getLiveInstances()).thenReturn(createLiveInstances(instanceCount));
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+
+    // Setup resources
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+        createResourceMap("MasterSlave"));
+
+    return event;
+  }
+
+  /**
+   * Prepares cluster event for OnlineOffline state model.
+   */
+  private ClusterEvent 
prepareClusterEventForOnlineOffline(StateModelDefinition stateModelDef,
+      CurrentStateOutput currentStateOutput, int instanceCount) {
+    ClusterEvent event = createBaseClusterEvent(currentStateOutput);
+
+    // Setup ResourceControllerDataProvider for OnlineOffline
+    ResourceControllerDataProvider cache = 
mock(ResourceControllerDataProvider.class);
+    when(cache.getClusterConfig()).thenReturn(new ClusterConfig(TEST_CLUSTER));
+    when(cache.getStateModelDef("OfflineOnline")).thenReturn(stateModelDef);
+    
when(cache.getLiveInstances()).thenReturn(createLiveInstances(instanceCount));
+    event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+
+    // Setup resources for OnlineOffline
+    event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+        createResourceMap("OfflineOnline"));
+
+    return event;
+  }
+
+  /**
+   * Creates base cluster event with common attributes.
+   */
+  private ClusterEvent createBaseClusterEvent(CurrentStateOutput 
currentStateOutput) {
+    ClusterEvent event = new ClusterEvent(TEST_CLUSTER, 
ClusterEventType.Unknown);
+
+    event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+    event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(), 
currentStateOutput);
+
+    // Mock HelixManager and HelixDataAccessor
+    HelixManager manager = mock(HelixManager.class);
+    when(manager.getInstanceName()).thenReturn("Controller");
+    when(manager.getSessionId()).thenReturn(SESSION_ID);
+    
when(manager.getHelixDataAccessor()).thenReturn(mock(HelixDataAccessor.class));
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+
+    return event;
+  }
+
+  /**
+   * Creates mock live instances for the specified count.
+   */
+  private Map<String, LiveInstance> createLiveInstances(int instanceCount) {
+    Map<String, LiveInstance> liveInstances = new HashMap<>();
+    for (int i = 0; i < instanceCount; i++) {
+      String instanceName = "localhost_" + i;
+      ZNRecord znRecord = new ZNRecord(instanceName);
+      znRecord.setEphemeralOwner(Long.parseLong(SESSION_ID));
+
+      LiveInstance liveInstance = new LiveInstance(znRecord);
+      liveInstance.setSessionId(SESSION_ID);
+      liveInstance.setHelixVersion("1.0.0");
+      liveInstance.setLiveInstance(instanceName);
+
+      liveInstances.put(instanceName, liveInstance);
+    }
+    return liveInstances;
+  }
+
+  /**
+   * Creates resource map with specified state model reference.
+   */
+  private Map<String, Resource> createResourceMap(String stateModelRef) {
+    Map<String, Resource> resourceMap = new HashMap<>();
+    Resource resource = new Resource(TEST_RESOURCE);
+    resource.setStateModelDefRef(stateModelRef);
+    resource.addPartition(PARTITION_0);
+    resourceMap.put(TEST_RESOURCE, resource);
+    return resourceMap;
+  }
+
+  /**
+   * Creates a state transition message.
+   */
+  private Message createMessage(String fromState, String toState, String 
tgtName) {
+    Message message =
+        new Message(Message.MessageType.STATE_TRANSITION, 
UUID.randomUUID().toString());
+    message.setFromState(fromState);
+    message.setToState(toState);
+    message.setTgtName(tgtName);
+    message.setTgtSessionId(SESSION_ID);
+    message.setResourceName(TEST_RESOURCE);
+    message.setPartitionName(PARTITION_0);
+    message.setStateModelDef("MasterSlave");
+    message.setSrcName("Controller");
+    message.setSrcSessionId(SESSION_ID);
+    return message;
+  }
+
+  /**
+   * Custom OnlineOffline state model with configurable upper bounds for 
ONLINE state.
+   * Enables testing scenarios with specific replica count constraints.
+   */
+  private static final class CustomOnlineOfflineSMD {
+    private static final String STATE_MODEL_NAME = "CustomOnlineOffline";
+
+    /**
+     * States for the CustomOnlineOffline state model
+     */
+    private enum States {
+      ONLINE,
+      OFFLINE
+    }
+
+    /**
+     * Build OnlineOffline state model definition with custom instance count
+     * @param instanceCount the maximum number of instances that can be in 
ONLINE state
+     * @return StateModelDefinition for OnlineOffline model with custom bounds
+     */
+    public static StateModelDefinition build(int instanceCount) {
+      if (instanceCount <= 0) {
+        throw new IllegalArgumentException(
+            "Instance count must be positive, got: " + instanceCount);
+      }
+
+      StateModelDefinition.Builder builder = new 
StateModelDefinition.Builder(STATE_MODEL_NAME);
+
+      // init state
+      builder.initialState(States.OFFLINE.name());
+
+      // add states
+      builder.addState(States.ONLINE.name(), 0);
+      builder.addState(States.OFFLINE.name(), 1);
+      for (final HelixDefinedState state : HelixDefinedState.values()) {
+        builder.addState(state.name());
+      }
+
+      // add transitions
+      builder.addTransition(States.ONLINE.name(), States.OFFLINE.name(), 0);
+      builder.addTransition(States.OFFLINE.name(), States.ONLINE.name(), 1);
+      builder.addTransition(States.OFFLINE.name(), 
HelixDefinedState.DROPPED.name());
+
+      // bounds - uses the instanceCount parameter
+      builder.dynamicUpperBound(States.ONLINE.name(), 
String.valueOf(instanceCount));
+
+      return builder.build();
+    }
+  }
+}


Reply via email to