This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new b02291e86 [Message Prioritization] - Add currentReplicaNumber metadata
for message prioritization by client (#3043)
b02291e86 is described below
commit b02291e8653fd39ea9956231147fa384ed415895
Author: Charanya Sudharsanan <[email protected]>
AuthorDate: Mon Jul 14 11:24:13 2025 -0700
[Message Prioritization] - Add currentReplicaNumber metadata for message
prioritization by client (#3043)
Add currentReplicaNumber metadata for message prioritization by client
---
.../controller/stages/MessageGenerationPhase.java | 117 +++-
.../main/java/org/apache/helix/model/Message.java | 44 +-
.../apache/helix/model/StateModelDefinition.java | 19 +
.../java/org/apache/helix/util/MessageUtil.java | 94 +++-
.../TestPrioritizationMessageGeneration.java | 594 +++++++++++++++++++++
5 files changed, 826 insertions(+), 42 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 859c6679e..0868ac6d6 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
@@ -57,17 +58,18 @@ import org.slf4j.LoggerFactory;
* Compares the currentState, pendingState with IdealState and generate
messages
*/
public class MessageGenerationPhase extends AbstractBaseStage {
- private final static String NO_DESIRED_STATE = "NoDesiredState";
+ private static final String NO_DESIRED_STATE = "NoDesiredState";
// If we see there is any invalid pending message leaving on host, i.e.
message
// tells participant to change from SLAVE to MASTER, and the participant is
already
// at MASTER state, we wait for timeout and if the message is still not
cleaned up by
// participant, controller will cleanup them proactively to unblock further
state
// transition
- public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
+ public static final long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60
* 1000);
- private final static String PENDING_MESSAGE = "pending message";
- private final static String STALE_MESSAGE = "stale message";
+ private static final String PENDING_MESSAGE = "pending message";
+ private static final String STALE_MESSAGE = "stale message";
+ private static final String OFFLINE = "OFFLINE";
private static Logger logger =
LoggerFactory.getLogger(MessageGenerationPhase.class);
@@ -163,6 +165,18 @@ public class MessageGenerationPhase extends
AbstractBaseStage {
// desired-state->list of generated-messages
Map<String, List<Message>> messageMap = new HashMap<>();
+ /*
+ * Calculate the current active replica count based on state model type.
+ * This represents the number of replicas currently serving traffic for
this partition
+ * Active replicas include: top states, secondary top states(excluding
OFFLINE) and ERROR
+ * states.
+ * Active replicas exclude: OFFLINE and DROPPED states.
+ * All qualifying state transitions for this partition will receive this
same value,
+ * allowing clients to understand the current availability level and
prioritize accordingly.
+ */
+ int currentActiveReplicaCount =
+ calculateCurrentActiveReplicaCount(currentStateMap, stateModelDef);
+
for (String instanceName : instanceStateMap.keySet()) {
Set<Message> staleMessages =
cache.getStaleMessagesByInstance(instanceName);
@@ -250,17 +264,39 @@ public class MessageGenerationPhase extends
AbstractBaseStage {
pendingMessage, manager, resource, partition,
sessionIdMap, instanceName,
stateModelDef, cancellationMessage, isCancellationEnabled);
} else {
+ // Set currentActiveReplicaNumber to provide metadata for
potential message
+ // prioritization by participant.
+ // Assign the current active replica count to all qualifying
upward transitions for this
+ // partition.
+ // This ensures consistent prioritization metadata across
concurrent state transitions.
+ // -1 indicates no prioritization metadata, for eg:Downward ST
messages get a -1.
+ int currentActiveReplicaNumber = -1;
+
+ /*
+ * Assign currentActiveReplicaNumber for qualifying upward state
transitions.
+ * Criteria for assignment:
+ * - Must be an upward state transition according to state model
+ * - Target state must be considered active (according to state
model type)
+ */
+ if (stateModelDef.isUpwardStateTransition(currentState, nextState)
+ && isStateActive(nextState, stateModelDef)) {
+
+ // All qualifying transitions for this partition get the same
+ // currentActiveReplicaNumber
+ currentActiveReplicaNumber = currentActiveReplicaCount;
+ }
+
// Create new state transition message
- message = MessageUtil
- .createStateTransitionMessage(manager.getInstanceName(),
manager.getSessionId(),
- resource, partition.getPartitionName(), instanceName,
currentState, nextState,
- sessionIdMap.get(instanceName), stateModelDef.getId());
+ message =
MessageUtil.createStateTransitionMessage(manager.getInstanceName(),
+ manager.getSessionId(), resource,
partition.getPartitionName(), instanceName,
+ currentState, nextState, sessionIdMap.get(instanceName),
stateModelDef.getId(),
+ currentActiveReplicaNumber);
if (logger.isDebugEnabled()) {
LogUtil.logDebug(logger, _eventId, String.format(
- "Resource %s partition %s for instance %s with currentState
%s and nextState %s",
+ "Resource %s partition %s for instance %s with currentState
%s, nextState %s and currentActiveReplicaNumber %d",
resource.getResourceName(), partition.getPartitionName(),
instanceName,
- currentState, nextState));
+ currentState, nextState, currentActiveReplicaNumber));
}
}
}
@@ -290,7 +326,66 @@ public class MessageGenerationPhase extends
AbstractBaseStage {
} // end of for-each-partition
}
- private boolean shouldCreateSTCancellation(Message pendingMessage, String
desiredState,
+ /**
+ * Calculate the current active replica count based on state model type.
+ * The count includes replicas in top states, secondary top states
(excluding OFFLINE),
+ * and ERROR states since helix considers them active.Count excludes OFFLINE
and DROPPED states.
+ * @param currentStateMap
+ * @param stateModelDef
+ * @return The number of replicas currently in active states, used to
determine the
+ * currentActiveReplicaNumber for the partition.
+ */
+ private int calculateCurrentActiveReplicaCount(Map<String, String>
currentStateMap,
+ StateModelDefinition stateModelDef) {
+ return (int) currentStateMap.values().stream()
+ .filter(state -> stateModelDef.getTopState().contains(state) // Top
states (MASTER, ONLINE,
+ // LEADER)
+ || getActiveSecondaryTopStates(stateModelDef).contains(state) //
Active secondary states
+ //
(SLAVE, STANDBY,
+ //
BOOTSTRAP)
+ || HelixDefinedState.ERROR.name().equals(state) // ERROR states
(still considered
+ // active)
+ // DROPPED and OFFLINE are automatically excluded by
getActiveSecondaryTopStates()
+ ).count();
+ }
+
+ /**
+ * Get active secondary top states - states that are not non-serving states
like OFFLINE and
+ * DROPPED.
+ * Reasons for elimination:
+ * - getSecondTopStates() can include OFFLINE as a secondary top state in
some state models.
+ * Example - OnlineOffline:
+ * getSecondTopStates() = ["OFFLINE"] as it transitions to ONLINE.
+ * After filtering: activeSecondaryTopStates=[] (removes "OFFLINE" as it's
not a serving state).
+ * @param stateModelDef
+ */
+ private List<String> getActiveSecondaryTopStates(StateModelDefinition
stateModelDef) {
+ return stateModelDef.getSecondTopStates().stream()
+ // Remove non-serving states
+ .filter(state -> !OFFLINE.equals(state) &&
!HelixDefinedState.DROPPED.name().equals(state))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Determines if the given state is considered active based on the state
model type.
+ * Active states include: top states, active secondary top states (excluding
OFFLINE),
+ * and ERROR states. Active states exclude OFFLINE and DROPPED states.
+ * ERROR state replicas are always considered active in HELIX as they do not
+ * affect availability.
+ * @param state
+ * @param stateModelDef
+ * @return true if the state is considered active, false otherwise
+ */
+ private boolean isStateActive(String state, StateModelDefinition
stateModelDef) {
+ // ERROR state is always considered active regardless of state model type
+ if (HelixDefinedState.ERROR.name().equals(state)) {
+ return true;
+ }
+ return stateModelDef.getTopState().contains(state)
+ || getActiveSecondaryTopStates(stateModelDef).contains(state);
+ }
+
+ private boolean shouldCreateSTCancellation(Message pendingMessage, String
desiredState,
String initialState) {
if (pendingMessage == null) {
return false;
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java
b/helix-core/src/main/java/org/apache/helix/model/Message.java
index acf075821..68751de40 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -104,7 +104,8 @@ public class Message extends HelixProperty {
RELAY_FROM,
EXPIRY_PERIOD,
SRC_CLUSTER,
- ST_REBALANCE_TYPE
+ ST_REBALANCE_TYPE,
+ CURRENT_ACTIVE_REPLICA_NUMBER
}
/**
@@ -137,12 +138,8 @@ public class Message extends HelixProperty {
/**
* Compares the creation time of two Messages
*/
- public static final Comparator<Message> CREATE_TIME_COMPARATOR = new
Comparator<Message>() {
- @Override
- public int compare(Message m1, Message m2) {
- return new Long(m1.getCreateTimeStamp()).compareTo(new
Long(m2.getCreateTimeStamp()));
- }
- };
+ public static final Comparator<Message> CREATE_TIME_COMPARATOR =
+ (m1, m2) -> Long.compare(m2.getCreateTimeStamp(),
m1.getCreateTimeStamp());
/**
* Instantiate a message
@@ -935,6 +932,39 @@ public class Message extends HelixProperty {
_record.setSimpleField(Attributes.SRC_CLUSTER.name(), clusterName);
}
+ /**
+ * Set current active replica count for participant-side message
prioritization.
+ * This field indicates the number of replicas currently in active states
(including ERROR states)
+ * for this partition at the time the state transition message is generated.
+ * Active states include top states, secondary top states (for single-top
state models),
+ * and ERROR states.
+ * This metadata enables participants to prioritize recovery scenarios (low
active counts)
+ * over load balancing scenarios (high active counts) in custom thread pools
or message handlers.
+ * For example, 2→3 transitions get higher priority than 3→4 transitions.
+ * Default value is -1 for transitions that don't require prioritization
metadata.(for eg :
+ * downward transitions).
+ * @param currentActiveReplicaNumber the number of currently active replicas
(-1 when there is no
+ * prioritization metadata,
+ * >=0 for transitions containing current availability level)
+ */
+ public void setCurrentActiveReplicaNumber(int currentActiveReplicaNumber) {
+ _record.setIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(),
+ currentActiveReplicaNumber);
+ }
+
+ /**
+ * Get the current active replica count for this partition at message
generation time.
+ * This value represents the number of replicas in active states (including
ERROR states) before
+ * any state transitions occur, enabling participant-side message
prioritization based on
+ * current availability levels.
+ * @return current active replica count, or -1 for cases where we don't
provide metadata for
+ * prioritization like downward state transitions.
+ */
+
+ public int getCurrentActiveReplicaNumber() {
+ return
_record.getIntField(Attributes.CURRENT_ACTIVE_REPLICA_NUMBER.name(), -1);
+ }
+
/**
* Check if this message is targetted for a controller
* @return true if this is a controller message, false otherwise
diff --git
a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index fcf24fb30..e79d58938 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -505,4 +505,23 @@ public class StateModelDefinition extends HelixProperty {
}
return stateCounts;
}
+
+ /**
+ * Check if a state transition is upward
+ * @param fromState source state
+ * @param toState destination state
+ * @return True if it's an upward state transition, false otherwise
+ */
+ public boolean isUpwardStateTransition(String fromState, String toState) {
+ Map<String, Integer> statePriorityMap = getStatePriorityMap();
+
+ Integer fromStateWeight = statePriorityMap.get(fromState);
+ Integer toStateWeight = statePriorityMap.get(toState);
+
+ if (fromStateWeight == null || toStateWeight == null) {
+ return false;
+ }
+
+ return toStateWeight < fromStateWeight;
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
b/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
index 94de8331b..4a93050dc 100644
--- a/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/MessageUtil.java
@@ -48,7 +48,7 @@ public class MessageUtil {
toState);
Message message =
-
createStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
+
createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION_CANCELLATION,
srcInstanceName, srcSessionId, resource, partitionName,
instanceName, currentState,
nextState, sessionId, stateModelDefName);
@@ -60,28 +60,6 @@ public class MessageUtil {
return null;
}
- public static Message createStateTransitionMessage(String srcInstanceName,
String srcSessionId,
- Resource resource, String partitionName, String instanceName, String
currentState,
- String nextState, String tgtSessionId, String stateModelDefName) {
- Message message =
- createStateTransitionMessage(Message.MessageType.STATE_TRANSITION,
srcInstanceName,
- srcSessionId, resource, partitionName, instanceName, currentState,
nextState, tgtSessionId,
- stateModelDefName);
-
- // Set the retry count for state transition messages.
- // TODO: make the retry count configurable in ClusterConfig or IdealState
- message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
-
- if (resource.getResourceGroupName() != null) {
- message.setResourceGroupName(resource.getResourceGroupName());
- }
- if (resource.getResourceTag() != null) {
- message.setResourceTag(resource.getResourceTag());
- }
-
- return message;
- }
-
/**
* Creates a message to change participant status
* {@link org.apache.helix.model.LiveInstance.LiveInstanceStatus}
@@ -121,7 +99,7 @@ public class MessageUtil {
}
/* Creates state transition or state transition cancellation message */
- private static Message createStateTransitionMessage(Message.MessageType
messageType,
+ private static Message createBasicStateTransitionMessage(Message.MessageType
messageType,
String srcInstanceName, String srcSessionId, Resource resource, String
partitionName,
String instanceName, String currentState, String nextState, String
tgtSessionId,
String stateModelDefName) {
@@ -136,4 +114,72 @@ public class MessageUtil {
return message;
}
+
+ /**
+ * Create a state transition message with replica prioritization metadata
+ * @param srcInstanceName source instance name
+ * @param srcSessionId source session id
+ * @param resource resource
+ * @param partitionName partition name
+ * @param instanceName target instance name
+ * @param currentState current state
+ * @param nextState next state
+ * @param tgtSessionId target session id
+ * @param stateModelDefName state model definition name
+ * @param currentActiveReplicaNumber The number of replicas currently in
active states
+ * for this partition before any state transitions occur. This
metadata
+ * enables participant-side message prioritization by indicating the
+ * current availability level (e.g., 0→1 recovery scenarios get
higher
+ * priority than 2→3 load balancing scenarios). Set to -1 for
transitions
+ * that should not be prioritized (downward transitions).
+ * Active states include top states, secondary top states (for
single-top
+ * state models), and ERROR states since they are still considered
active by Helix.
+ * @return state transition message
+ */
+ public static Message createStateTransitionMessage(String srcInstanceName,
String srcSessionId,
+ Resource resource, String partitionName, String instanceName, String
currentState,
+ String nextState, String tgtSessionId, String stateModelDefName,
+ int currentActiveReplicaNumber) {
+ Message message =
createBasicStateTransitionMessage(Message.MessageType.STATE_TRANSITION,
+ srcInstanceName, srcSessionId, resource, partitionName, instanceName,
currentState,
+ nextState, tgtSessionId, stateModelDefName);
+
+ // Set the retry count for state transition messages.
+ // TODO: make the retry count configurable in ClusterConfig or IdealState
+ message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
+
+ if (resource.getResourceGroupName() != null) {
+ message.setResourceGroupName(resource.getResourceGroupName());
+ }
+ if (resource.getResourceTag() != null) {
+ message.setResourceTag(resource.getResourceTag());
+ }
+
+ // Set current active replica number for participant-side prioritization
+ message.setCurrentActiveReplicaNumber(currentActiveReplicaNumber);
+
+ return message;
+ }
+
+ /**
+ * Create a state transition message (backward compatibility)
+ * @param srcInstanceName source instance name
+ * @param srcSessionId source session id
+ * @param resource resource
+ * @param partitionName partition name
+ * @param instanceName target instance name
+ * @param currentState current state
+ * @param nextState next state
+ * @param tgtSessionId target session id
+ * @param stateModelDefName state model definition name
+ * @return state transition message
+ */
+ public static Message createStateTransitionMessage(String srcInstanceName,
String srcSessionId,
+ Resource resource, String partitionName, String instanceName, String
currentState,
+ String nextState, String tgtSessionId, String stateModelDefName) {
+ // currentActiveReplicaNumber is set to -1 for ST messages needing no
prioritization metadata
+ // (backward compatibility)
+ return createStateTransitionMessage(srcInstanceName, srcSessionId,
resource, partitionName,
+ instanceName, currentState, nextState, tgtSessionId,
stateModelDefName, -1);
+ }
}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestPrioritizationMessageGeneration.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestPrioritizationMessageGeneration.java
new file mode 100644
index 000000000..1a9b50ae3
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestPrioritizationMessageGeneration.java
@@ -0,0 +1,594 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.OnlineOfflineWithBootstrapSMD;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static
org.apache.helix.tools.StateModelConfigGenerator.generateConfigForMasterSlave;
+import static
org.apache.helix.tools.StateModelConfigGenerator.generateConfigForOnlineOffline;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestPrioritizationMessageGeneration extends
MessageGenerationPhase {
+
+ private static final String TEST_CLUSTER = "TestCluster";
+ private static final String TEST_RESOURCE = "TestDB";
+ private static final String PARTITION_0 = "TestDB_0";
+ private static final String INSTANCE_0 = "localhost_0";
+ private static final String INSTANCE_1 = "localhost_1";
+ private static final String INSTANCE_2 = "localhost_2";
+ private static final String INSTANCE_3 = "localhost_3";
+ private static final String INSTANCE_4 = "localhost_4";
+ private static final String SESSION_ID = "123";
+
+ // === Tests for upward transitions and replica counting ===
+
+ @Test
+ public void testCurrentReplicaCountForUpwardTransitions() throws Exception {
+ // Test: Upward transitions from non-second top states should receive
currentActiveReplicaNumber
+ StateModelDefinition stateModelDef = new
StateModelDefinition(generateConfigForMasterSlave());
+ // Setup: 1 MASTER, 1 SLAVE, 2 OFFLINE (current active = 2)
+ Map<String, CurrentState> currentStates =
createCurrentStates(Map.of(INSTANCE_0, "MASTER",
+ INSTANCE_1, "SLAVE", INSTANCE_2, "OFFLINE", INSTANCE_3, "OFFLINE"),
"MasterSlave");
+ // Action: Move 2 OFFLINE instances to SLAVE (upward transitions)
+ Map<String, String> bestPossible = Map.of(INSTANCE_0, "MASTER", // No
change
+ INSTANCE_1, "SLAVE", // No change
+ INSTANCE_2, "SLAVE", // OFFLINE -> SLAVE (upward)
+ INSTANCE_3, "SLAVE" // OFFLINE -> SLAVE (upward)
+ );
+
+ List<Message> messages = processAndGetMessages(stateModelDef,
currentStates, bestPossible, 4);
+
+ // Verify: 2 messages generated, both with current active replica count = 2
+ // Current active replicas: 1 MASTER + 1 SLAVE = 2
+ Assert.assertEquals(messages.size(), 2);
+ for (Message msg : messages) {
+ Assert.assertEquals(msg.getCurrentActiveReplicaNumber(), 2,
+ "Upward transitions should have currentActiveReplicaNumber = current
active replica count");
+ Assert.assertTrue(msg.getTgtName().equals(INSTANCE_2) ||
msg.getTgtName().equals(INSTANCE_3));
+ }
+ }
+
+ @Test
+ public void testZeroReplicaScenario() throws Exception {
+ // Test: All instances starting from OFFLINE (0 active replicas)
+ StateModelDefinition stateModelDef = new
StateModelDefinition(generateConfigForMasterSlave());
+
+ // Setup: All instances OFFLINE (current active = 0)
+ Map<String, CurrentState> currentStates = createCurrentStates(
+ Map.of(INSTANCE_0, "OFFLINE", INSTANCE_1, "OFFLINE", INSTANCE_2,
"OFFLINE"), "MasterSlave");
+
+ // Action: Create 1 MASTER, 2 SLAVE from all OFFLINE
+ Map<String, String> bestPossible = Map.of(INSTANCE_0, "SLAVE", // OFFLINE
-> SLAVE (upward)
+ INSTANCE_1, "MASTER", // OFFLINE -> MASTER (upward)
+ INSTANCE_2, "SLAVE" // OFFLINE -> SLAVE (upward)
+ );
+
+ List<Message> messages = processAndGetMessages(stateModelDef,
currentStates, bestPossible, 3);
+
+ // Verify: All messages have currentActiveReplicaNumber = 0
+ Assert.assertEquals(messages.size(), 3);
+ for (Message msg : messages) {
+ Assert.assertEquals(msg.getCurrentActiveReplicaNumber(), 0,
+ "All upward transitions should have currentActiveReplicaNumber = 0");
+ }
+ }
+
+ // === Tests for non-upward transitions ===
+
+ @Test
+ public void testNoReplicaNumberForNonUpwardTransitions() throws Exception {
+ // Test: Downward transitions should not receive currentActiveReplicaNumber
+ StateModelDefinition stateModelDef = new
StateModelDefinition(generateConfigForMasterSlave());
+
+ Map<String, CurrentState> currentStates =
+ createCurrentStates(Map.of(INSTANCE_0, "SLAVE"), "MasterSlave");
+
+ // Action: SLAVE -> OFFLINE (downward transition)
+ Map<String, String> bestPossible = Map.of(INSTANCE_0, "OFFLINE");
+
+ List<Message> messages = processAndGetMessages(stateModelDef,
currentStates, bestPossible, 1);
+
+ // Verify: Downward transition gets default value (-1)
+ Assert.assertEquals(messages.size(), 1);
+ Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), -1,
+ "Non-upward state transitions should not have
currentActiveReplicaNumber assigned");
+ }
+
+ // === Tests for pending messages ===
+
+ @Test
+ public void testPendingMessagesDoNotAffectCurrentReplicaCount() throws
Exception {
+ // Test: SLAVE -> MASTER (second top to top) should not receive
currentActiveReplicaNumber
+ StateModelDefinition stateModelDef = new
StateModelDefinition(generateConfigForMasterSlave());
+
+ // Setup: 1 MASTER, 2 OFFLINE (current active = 1)
+ Map<String, CurrentState> currentStates = createCurrentStates(
+ Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "OFFLINE", INSTANCE_2,
"OFFLINE"), "MasterSlave");
+
+ CurrentStateOutput currentStateOutput =
createCurrentStateOutput(currentStates);
+
+ // Add pending message for INSTANCE_1: OFFLINE->SLAVE
+ Message pendingMsg = createMessage("OFFLINE", "SLAVE", INSTANCE_1);
+ currentStateOutput.setPendingMessage(TEST_RESOURCE, new
Partition(PARTITION_0), INSTANCE_1,
+ pendingMsg);
+
+ ClusterEvent event = prepareClusterEvent(stateModelDef,
currentStateOutput, 3);
+
+ // Action: Both offline instances should become SLAVE
+ Map<String, String> bestPossible = Map.of(INSTANCE_0, "MASTER", // No
change
+ INSTANCE_1, "SLAVE", // Has pending message, no new message
+ INSTANCE_2, "SLAVE" // OFFLINE -> SLAVE (new transition)
+ );
+
+ setBestPossibleState(event, bestPossible);
+ process(event);
+
+ MessageOutput output =
event.getAttribute(AttributeName.MESSAGES_ALL.name());
+ List<Message> messages = output.getMessages(TEST_RESOURCE, new
Partition(PARTITION_0));
+
+ // Verify: Only new message for INSTANCE_2, with current active count = 1
(ignoring pending)
+ Assert.assertEquals(messages.size(), 1);
+ Assert.assertEquals(messages.get(0).getTgtName(), INSTANCE_2);
+ Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), 1,
+ "currentActiveReplicaNumber should be based on current active replicas
only, not including pending transitions");
+ }
+
+ // === Tests for different state model types ===
+
+ @Test
+ public void testSingleTopStateModelWithoutSecondaryTop() throws Exception {
+ // Test: ONLINE-OFFLINE model (single top without secondary) - only top +
ERROR count as active
+ StateModelDefinition onlineOfflineStateModel =
CustomOnlineOfflineSMD.build(1);
+
+ // Verify this is a single-top state model
+ Assert.assertTrue(onlineOfflineStateModel.isSingleTopStateModel(),
+ "ONLINE-OFFLINE should be a single-top state model");
+
+ // Setup: 0 ONLINE, 1 ERROR, 1 OFFLINE (current active = 1: ERROR only)
+ Map<String, CurrentState> currentStates = createCurrentStates(
+ Map.of(INSTANCE_0, "OFFLINE", INSTANCE_1, "ERROR", INSTANCE_2,
"OFFLINE"), "OnlineOffline");
+
+ // Action: One OFFLINE becomes ONLINE
+ Map<String, String> bestPossible = Map.of(INSTANCE_0, "ONLINE", // OFFLINE
-> ONLINE (upward)
+ INSTANCE_1, "ERROR", // No change
+ INSTANCE_2, "OFFLINE" // No change
+ );
+
+ List<Message> messages =
processAndGetMessagesForOnlineOffline(onlineOfflineStateModel,
+ currentStates, bestPossible, 3);
+
+ // Verify: Current active = 1 (0 ONLINE + 1 ERROR)
+ Assert.assertEquals(messages.size(), 1);
+ Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), 1,
+ "Single-top without secondary: only top states + ERROR count as
active");
+ }
+
+ @Test
+ public void testSingleTopStateModelWithSecondaryTop() throws Exception {
+ // Test: MASTER-SLAVE model (single top with secondary) - top + secondary
+ ERROR count as
+ // active
+ StateModelDefinition stateModelDef = new
StateModelDefinition(generateConfigForMasterSlave());
+
+ // Setup: 1 MASTER, 2 SLAVE, 1 ERROR, 1 OFFLINE (current active = 4)
+ Map<String, CurrentState> currentStates =
+ createCurrentStates(Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "SLAVE",
INSTANCE_2, "SLAVE",
+ INSTANCE_3, "ERROR", INSTANCE_4, "OFFLINE"), "MasterSlave");
+
+ // Action: OFFLINE becomes SLAVE
+ Map<String, String> bestPossible = Map.of(INSTANCE_0, "MASTER",
INSTANCE_1, "SLAVE", INSTANCE_2,
+ "SLAVE", INSTANCE_3, "ERROR", INSTANCE_4, "SLAVE" // OFFLINE -> SLAVE
(upward)
+ );
+
+ List<Message> messages = processAndGetMessages(stateModelDef,
currentStates, bestPossible, 5);
+
+ // Verify: Current active = 4 (1 MASTER + 2 SLAVE + 1 ERROR)
+ Assert.assertEquals(messages.size(), 1);
+ Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), 4,
+ "Single-top with secondary: top + secondary top + ERROR count as
active");
+ }
+
+ @Test
+ public void testMultiTopStateModelWithoutSecondaryTop() throws Exception {
+ StateModelDefinition stateModelDef = new
StateModelDefinition(generateConfigForOnlineOffline());
+
+ // Setup: 1 ONLINE, 2 OFFLINE (current active = 1)
+ Map<String, CurrentState> currentStates = createCurrentStates(
+ Map.of(INSTANCE_0, "ONLINE", INSTANCE_1, "OFFLINE", INSTANCE_2,
"OFFLINE"),
+ "OfflineOnline");
+
+ // Action: Both OFFLINE become ONLINE
+ Map<String, String> bestPossible = Map.of(INSTANCE_0, "ONLINE", // No
change
+ INSTANCE_1, "ONLINE", // OFFLINE -> ONLINE (upward)
+ INSTANCE_2, "ONLINE" // OFFLINE -> ONLINE (upward)
+ );
+
+ List<Message> messages =
+ processAndGetMessagesForOnlineOffline(stateModelDef, currentStates,
bestPossible, 3);
+
+ // Verify: Current active = 1 (only ONLINE states count)
+ Assert.assertEquals(messages.size(), 2);
+ for (Message msg : messages) {
+ Assert.assertEquals(msg.getCurrentActiveReplicaNumber(), 1,
+ "Multi-top state model without secondary top: only top states count
as active");
+ }
+ }
+
+ @Test
+ public void testMultiTopStateModelWithSecondaryTop() throws Exception {
+ StateModelDefinition stateModelDef = OnlineOfflineWithBootstrapSMD.build();
+
+ // Setup: 2 ONLINE, 1 BOOTSTRAP, 1 OFFLINE (current active = 3)
+ Map<String, CurrentState> currentStates =
+ createCurrentStates(Map.of(INSTANCE_0, "ONLINE", INSTANCE_1, "ONLINE",
INSTANCE_2,
+ "BOOTSTRAP", INSTANCE_3, "OFFLINE"), "OnlineOfflineWithBootstrap");
+
+ // Action: OFFLINE becomes BOOTSTRAP.
+ Map<String, String> bestPossible = Map.of(INSTANCE_0, "ONLINE", // No
change
+ INSTANCE_1, "ONLINE", // No change
+ INSTANCE_2, "BOOTSTRAP", // No change
+ INSTANCE_3, "BOOTSTRAP" // OFFLINE -> BOOTSTRAP (upward)
+ );
+
+ List<Message> messages =
+ processAndGetMessagesForOnlineOffline(stateModelDef, currentStates,
bestPossible, 4);
+
+ // Verify: Current active = 3 (ONLINE + BOOTSTRAP state counts)
+ Assert.assertEquals(messages.size(), 1);
+ for (Message msg : messages) {
+ Assert.assertEquals(msg.getCurrentActiveReplicaNumber(), 3,
+ "Multi-top state model without secondary top: top states and
secondary top states count as active");
+ }
+ }
+
+ // === Tests for ERROR state handling ===
+
+ @Test
+ public void testErrorStateIncludedInActiveCount() throws Exception {
+ // Test: ERROR states are always counted as active
+ StateModelDefinition stateModelDef = new
StateModelDefinition(generateConfigForMasterSlave());
+
+ // Setup: 1 MASTER, 1 ERROR, 1 OFFLINE (current active = 2)
+ Map<String, CurrentState> currentStates = createCurrentStates(
+ Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "ERROR", INSTANCE_2,
"OFFLINE"), "MasterSlave");
+
+ // Action: OFFLINE becomes SLAVE
+ Map<String, String> bestPossible =
+ Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "ERROR", INSTANCE_2, "SLAVE");
+
+ List<Message> messages = processAndGetMessages(stateModelDef,
currentStates, bestPossible, 3);
+
+ // Verify: Current active = 2 (1 MASTER + 1 ERROR)
+ Assert.assertEquals(messages.size(), 1);
+ Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), 2,
+ "ERROR state replicas should be included in active count");
+ }
+
+ @Test
+ public void testTransitionFromErrorToOffline() throws Exception {
+ // Test: ERROR -> OFFLINE is a downward transition (active to inactive)
+ StateModelDefinition stateModelDef = new
StateModelDefinition(generateConfigForMasterSlave());
+
+ Map<String, CurrentState> currentStates =
+ createCurrentStates(Map.of(INSTANCE_0, "ERROR"), "MasterSlave");
+
+ // Action: ERROR -> OFFLINE (standard recovery pattern)
+ Map<String, String> bestPossible = Map.of(INSTANCE_0, "OFFLINE");
+
+ List<Message> messages = processAndGetMessages(stateModelDef,
currentStates, bestPossible, 1);
+
+ // Verify: Downward transition gets default value (-1)
+ Assert.assertEquals(messages.size(), 1);
+ Assert.assertEquals(messages.get(0).getCurrentActiveReplicaNumber(), -1,
+ "ERROR→OFFLINE transitions should not have currentActiveReplicaNumber
assigned (downward)");
+ }
+
+ // === Tests for DROPPED state handling ===
+
+ @Test
+ public void testDroppedReplicasExcludedFromActiveCount() throws Exception {
+ // Test: DROPPED replicas are excluded from active count calculations
+ StateModelDefinition stateModelDef = new
StateModelDefinition(generateConfigForMasterSlave());
+
+ // Setup: 1 MASTER, 1 OFFLINE, 1 OFFLINE (current active = 1)
+ Map<String, CurrentState> currentStates = createCurrentStates(
+ Map.of(INSTANCE_0, "MASTER", INSTANCE_1, "OFFLINE", INSTANCE_2,
"OFFLINE"), "MasterSlave");
+
+ // Action: One OFFLINE drops, other becomes SLAVE
+ Map<String, String> bestPossible = Map.of(INSTANCE_0, "MASTER", // No
change
+ INSTANCE_1, "DROPPED", // OFFLINE -> DROPPED (downward)
+ INSTANCE_2, "SLAVE" // OFFLINE -> SLAVE (upward)
+ );
+
+ List<Message> messages = processAndGetMessages(stateModelDef,
currentStates, bestPossible, 3);
+
+ // Verify: 2 messages, upward gets active count, DROPPED doesn't
+ Assert.assertEquals(messages.size(), 2);
+
+ Message upwardMsg = messages.stream()
+ .filter(m -> m.getTgtName().equals(INSTANCE_2) &&
m.getToState().equals("SLAVE"))
+ .findFirst().orElse(null);
+ Message droppedMsg = messages.stream()
+ .filter(m -> m.getTgtName().equals(INSTANCE_1) &&
m.getToState().equals("DROPPED"))
+ .findFirst().orElse(null);
+
+ Assert.assertNotNull(upwardMsg, "Should have upward transition message");
+ Assert.assertNotNull(droppedMsg, "Should have dropped transition message");
+
+ Assert.assertEquals(upwardMsg.getCurrentActiveReplicaNumber(), 1,
+ "Upward transition should have current active replica count");
+ Assert.assertEquals(droppedMsg.getCurrentActiveReplicaNumber(), -1,
+ "DROPPED transition should not have currentActiveReplicaNumber");
+ }
+
+ // === Helper methods ===
+
+ /**
+ * Creates current state map for multiple instances with specified states.
+ */
+ private Map<String, CurrentState> createCurrentStates(Map<String, String>
instanceStates,
+ String stateModelRef) {
+ Map<String, CurrentState> currentStateMap = new HashMap<>();
+ for (Map.Entry<String, String> entry : instanceStates.entrySet()) {
+ CurrentState currentState = new CurrentState(TEST_RESOURCE);
+ currentState.setState(PARTITION_0, entry.getValue());
+ currentState.setSessionId(SESSION_ID);
+ currentState.setStateModelDefRef(stateModelRef);
+ currentStateMap.put(entry.getKey(), currentState);
+ }
+ return currentStateMap;
+ }
+
+ /**
+ * Creates CurrentStateOutput from current state map.
+ */
+ private CurrentStateOutput createCurrentStateOutput(Map<String,
CurrentState> currentStateMap) {
+ CurrentStateOutput currentStateOutput = new CurrentStateOutput();
+ for (Map.Entry<String, CurrentState> entry : currentStateMap.entrySet()) {
+ String instance = entry.getKey();
+ CurrentState currentState = entry.getValue();
+ currentStateOutput.setCurrentState(TEST_RESOURCE, new
Partition(PARTITION_0), instance,
+ currentState.getState(PARTITION_0));
+ }
+ return currentStateOutput;
+ }
+
+ /**
+ * Processes state transitions and returns generated messages.
+ */
+ private List<Message> processAndGetMessages(StateModelDefinition
stateModelDef,
+ Map<String, CurrentState> currentStates, Map<String, String>
bestPossible, int instanceCount)
+ throws Exception {
+ CurrentStateOutput currentStateOutput =
createCurrentStateOutput(currentStates);
+ ClusterEvent event = prepareClusterEvent(stateModelDef,
currentStateOutput, instanceCount);
+ setBestPossibleState(event, bestPossible);
+
+ process(event);
+
+ MessageOutput output =
event.getAttribute(AttributeName.MESSAGES_ALL.name());
+ return output.getMessages(TEST_RESOURCE, new Partition(PARTITION_0));
+ }
+
+ /**
+ * Processes state transitions for OnlineOffline state model.
+ */
+ private List<Message>
processAndGetMessagesForOnlineOffline(StateModelDefinition stateModelDef,
+ Map<String, CurrentState> currentStates, Map<String, String>
bestPossible, int instanceCount)
+ throws Exception {
+ CurrentStateOutput currentStateOutput =
createCurrentStateOutput(currentStates);
+ ClusterEvent event =
+ prepareClusterEventForOnlineOffline(stateModelDef, currentStateOutput,
instanceCount);
+ setBestPossibleState(event, bestPossible);
+
+ process(event);
+
+ MessageOutput output =
event.getAttribute(AttributeName.MESSAGES_ALL.name());
+ return output.getMessages(TEST_RESOURCE, new Partition(PARTITION_0));
+ }
+
+ /**
+ * Sets best possible state in cluster event.
+ */
+ private void setBestPossibleState(ClusterEvent event, Map<String, String>
partitionMap) {
+ BestPossibleStateOutput bestPossibleOutput = new BestPossibleStateOutput();
+ bestPossibleOutput.setState(TEST_RESOURCE, new Partition(PARTITION_0),
partitionMap);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(),
bestPossibleOutput);
+ }
+
+ /**
+ * Prepares cluster event with necessary mock objects and configurations.
+ */
+ private ClusterEvent prepareClusterEvent(StateModelDefinition stateModelDef,
+ CurrentStateOutput currentStateOutput, int instanceCount) {
+ ClusterEvent event = createBaseClusterEvent(currentStateOutput);
+
+ // Setup ResourceControllerDataProvider
+ ResourceControllerDataProvider cache =
mock(ResourceControllerDataProvider.class);
+ when(cache.getClusterConfig()).thenReturn(new ClusterConfig(TEST_CLUSTER));
+ when(cache.getStateModelDef("MasterSlave")).thenReturn(stateModelDef);
+
when(cache.getLiveInstances()).thenReturn(createLiveInstances(instanceCount));
+ event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+
+ // Setup resources
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+ createResourceMap("MasterSlave"));
+
+ return event;
+ }
+
+ /**
+ * Prepares cluster event for OnlineOffline state model.
+ */
+ private ClusterEvent
prepareClusterEventForOnlineOffline(StateModelDefinition stateModelDef,
+ CurrentStateOutput currentStateOutput, int instanceCount) {
+ ClusterEvent event = createBaseClusterEvent(currentStateOutput);
+
+ // Setup ResourceControllerDataProvider for OnlineOffline
+ ResourceControllerDataProvider cache =
mock(ResourceControllerDataProvider.class);
+ when(cache.getClusterConfig()).thenReturn(new ClusterConfig(TEST_CLUSTER));
+ when(cache.getStateModelDef("OfflineOnline")).thenReturn(stateModelDef);
+
when(cache.getLiveInstances()).thenReturn(createLiveInstances(instanceCount));
+ event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
+
+ // Setup resources for OnlineOffline
+ event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
+ createResourceMap("OfflineOnline"));
+
+ return event;
+ }
+
+ /**
+ * Creates base cluster event with common attributes.
+ */
+ private ClusterEvent createBaseClusterEvent(CurrentStateOutput
currentStateOutput) {
+ ClusterEvent event = new ClusterEvent(TEST_CLUSTER,
ClusterEventType.Unknown);
+
+ event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
+ event.addAttribute(AttributeName.CURRENT_STATE_EXCLUDING_UNKNOWN.name(),
currentStateOutput);
+
+ // Mock HelixManager and HelixDataAccessor
+ HelixManager manager = mock(HelixManager.class);
+ when(manager.getInstanceName()).thenReturn("Controller");
+ when(manager.getSessionId()).thenReturn(SESSION_ID);
+
when(manager.getHelixDataAccessor()).thenReturn(mock(HelixDataAccessor.class));
+ event.addAttribute(AttributeName.helixmanager.name(), manager);
+
+ return event;
+ }
+
+ /**
+ * Creates mock live instances for the specified count.
+ */
+ private Map<String, LiveInstance> createLiveInstances(int instanceCount) {
+ Map<String, LiveInstance> liveInstances = new HashMap<>();
+ for (int i = 0; i < instanceCount; i++) {
+ String instanceName = "localhost_" + i;
+ ZNRecord znRecord = new ZNRecord(instanceName);
+ znRecord.setEphemeralOwner(Long.parseLong(SESSION_ID));
+
+ LiveInstance liveInstance = new LiveInstance(znRecord);
+ liveInstance.setSessionId(SESSION_ID);
+ liveInstance.setHelixVersion("1.0.0");
+ liveInstance.setLiveInstance(instanceName);
+
+ liveInstances.put(instanceName, liveInstance);
+ }
+ return liveInstances;
+ }
+
+ /**
+ * Creates resource map with specified state model reference.
+ */
+ private Map<String, Resource> createResourceMap(String stateModelRef) {
+ Map<String, Resource> resourceMap = new HashMap<>();
+ Resource resource = new Resource(TEST_RESOURCE);
+ resource.setStateModelDefRef(stateModelRef);
+ resource.addPartition(PARTITION_0);
+ resourceMap.put(TEST_RESOURCE, resource);
+ return resourceMap;
+ }
+
+ /**
+ * Creates a state transition message.
+ */
+ private Message createMessage(String fromState, String toState, String
tgtName) {
+ Message message =
+ new Message(Message.MessageType.STATE_TRANSITION,
UUID.randomUUID().toString());
+ message.setFromState(fromState);
+ message.setToState(toState);
+ message.setTgtName(tgtName);
+ message.setTgtSessionId(SESSION_ID);
+ message.setResourceName(TEST_RESOURCE);
+ message.setPartitionName(PARTITION_0);
+ message.setStateModelDef("MasterSlave");
+ message.setSrcName("Controller");
+ message.setSrcSessionId(SESSION_ID);
+ return message;
+ }
+
+ /**
+ * Custom OnlineOffline state model with configurable upper bounds for
ONLINE state.
+ * Enables testing scenarios with specific replica count constraints.
+ */
+ private static final class CustomOnlineOfflineSMD {
+ private static final String STATE_MODEL_NAME = "CustomOnlineOffline";
+
+ /**
+ * States for the CustomOnlineOffline state model
+ */
+ private enum States {
+ ONLINE,
+ OFFLINE
+ }
+
+ /**
+ * Build OnlineOffline state model definition with custom instance count
+ * @param instanceCount the maximum number of instances that can be in
ONLINE state
+ * @return StateModelDefinition for OnlineOffline model with custom bounds
+ */
+ public static StateModelDefinition build(int instanceCount) {
+ if (instanceCount <= 0) {
+ throw new IllegalArgumentException(
+ "Instance count must be positive, got: " + instanceCount);
+ }
+
+ StateModelDefinition.Builder builder = new
StateModelDefinition.Builder(STATE_MODEL_NAME);
+
+ // init state
+ builder.initialState(States.OFFLINE.name());
+
+ // add states
+ builder.addState(States.ONLINE.name(), 0);
+ builder.addState(States.OFFLINE.name(), 1);
+ for (final HelixDefinedState state : HelixDefinedState.values()) {
+ builder.addState(state.name());
+ }
+
+ // add transitions
+ builder.addTransition(States.ONLINE.name(), States.OFFLINE.name(), 0);
+ builder.addTransition(States.OFFLINE.name(), States.ONLINE.name(), 1);
+ builder.addTransition(States.OFFLINE.name(),
HelixDefinedState.DROPPED.name());
+
+ // bounds - uses the instanceCount parameter
+ builder.dynamicUpperBound(States.ONLINE.name(),
String.valueOf(instanceCount));
+
+ return builder.build();
+ }
+ }
+}