Add P2P (Participant-to-Participant) state-transition message support in Helix controller.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d0a3c0d1 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d0a3c0d1 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d0a3c0d1 Branch: refs/heads/master Commit: d0a3c0d1ea3a467564ed5ffd20bcd6a8e5a56678 Parents: 5117022 Author: Lei Xia <[email protected]> Authored: Mon Oct 9 14:52:19 2017 -0700 Committer: Junkai Xue <[email protected]> Committed: Wed Jan 24 18:30:30 2018 -0800 ---------------------------------------------------------------------- .../helix/api/config/HelixConfigProperty.java | 43 ++++ .../rebalancer/DelayedAutoRebalancer.java | 2 +- .../controller/stages/ClusterDataCache.java | 62 ++++- .../controller/stages/CurrentStateOutput.java | 16 +- .../stages/MessageGenerationPhase.java | 3 +- .../stages/MessageSelectionStage.java | 82 +++---- .../stages/ResourceComputationStage.java | 4 +- .../controller/stages/TaskAssignmentStage.java | 10 +- .../messaging/handling/BatchMessageHandler.java | 1 + .../handling/HelixStateTransitionHandler.java | 16 +- .../helix/messaging/handling/HelixTask.java | 52 +++- .../messaging/handling/HelixTaskExecutor.java | 9 +- .../messaging/handling/HelixTaskResult.java | 9 + .../org/apache/helix/model/ClusterConfig.java | 23 ++ .../org/apache/helix/model/CurrentState.java | 11 +- .../java/org/apache/helix/model/Message.java | 184 +++++++++++++- .../java/org/apache/helix/model/Resource.java | 40 +++- .../org/apache/helix/model/ResourceConfig.java | 33 ++- .../helix/model/StateModelDefinition.java | 30 ++- .../helix/task/FixedTargetTaskRebalancer.java | 3 +- .../helix/controller/stages/BaseStageTest.java | 43 +++- .../stages/TestMsgSelectionStage.java | 8 +- .../stages/TestResourceComputationStage.java | 1 - .../helix/integration/TestZkReconnect.java | 2 +- .../common/ZkIntegrationTestBase.java | 20 ++ .../manager/TestZkCallbackHandlerLeak.java | 6 +- .../messaging/TestP2PMessageSemiAuto.java | 240 +++++++++++++++++++ .../paticipant/TestNodeOfflineTimeStamp.java | 2 + .../TestCrushAutoRebalance.java | 7 +- .../org/apache/helix/manager/zk/TestZKUtil.java | 4 - .../helix/manager/zk/TestZkClusterManager.java | 3 - .../TestP2PStateTransitionMessages.java | 176 ++++++++++++++ .../apache/helix/mock/MockBaseDataAccessor.java | 6 +- .../org/apache/helix/mock/MockHelixAdmin.java | 14 +- .../mbeans/TestDisableResourceMbean.java | 9 +- .../mbeans/TestTopStateHandoffMetrics.java | 4 +- 36 files changed, 1065 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/api/config/HelixConfigProperty.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/api/config/HelixConfigProperty.java b/helix-core/src/main/java/org/apache/helix/api/config/HelixConfigProperty.java new file mode 100644 index 0000000..eff63cb --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/api/config/HelixConfigProperty.java @@ -0,0 +1,43 @@ +package org.apache.helix.api.config; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; + +/** + * This class defines all possible configuration options + * and its applicable config scopes (eg, Cluster/Resource/Instance/Partition config). + */ +public enum HelixConfigProperty { + P2P_MESSAGE_ENABLED(ConfigScopeProperty.CLUSTER, ConfigScopeProperty.RESOURCE); + + Set<ConfigScopeProperty> _applicableScopes; + + HelixConfigProperty(ConfigScopeProperty ...configScopeProperties) { + _applicableScopes = new HashSet<>(Arrays.asList(configScopeProperties)); + } + + public Set<ConfigScopeProperty> applicableScopes() { + return _applicableScopes; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index a44aa11..adac235 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -310,7 +310,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { private boolean isDelayRebalanceEnabled(IdealState idealState, ClusterConfig clusterConfig) { long delay = getRebalanceDelay(idealState, clusterConfig); return (delay > 0 && idealState.isDelayRebalanceEnabled() && clusterConfig - .isDelayRebalaceEnabled()); + . isDelayRebalaceEnabled()); } private ZNRecord getFinalDelayedMapping(IdealState idealState, ZNRecord newIdealMapping, http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 8999ed7..1dd862d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -188,16 +188,20 @@ public class ClusterDataCache { accessor.getChildValuesMap(keyBuilder.stateModelDefs()); _stateModelDefMap = new ConcurrentHashMap<>(stateDefMap); _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints()); + _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); refreshMessages(accessor); refreshCurrentStates(accessor); - _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); + // current state must be refreshed before refreshing relay messages + // because we need to use current state to validate all relay messages. + updateRelayMessages(_messageMap); + if (_clusterConfig != null) { _idealStateRuleMap = _clusterConfig.getIdealStateRules(); } else { _idealStateRuleMap = Maps.newHashMap(); - LOG.error("Cluster config is null!"); + LOG.warn("Cluster config is null!"); } long endTime = System.currentTimeMillis(); @@ -275,6 +279,7 @@ public class ClusterDataCache { } } } + _messageMap = Collections.unmodifiableMap(msgMap); if (LOG.isDebugEnabled()) { @@ -284,6 +289,59 @@ public class ClusterDataCache { } } + // update all valid relay messages attached to existing state transition messages into message map. + private void updateRelayMessages(Map<String, Map<String, Message>> messageMap) { + List<Message> relayMessages = new ArrayList<>(); + for (String instance : messageMap.keySet()) { + Map<String, Message> instanceMessages = messageMap.get(instance); + Map<String, Map<String, CurrentState>> instanceCurrentStateMap = _currentStateMap.get(instance); + if (instanceCurrentStateMap == null) { + continue; + } + + for (Message message : instanceMessages.values()) { + if (message.hasRelayMessages()) { + String sessionId = message.getTgtSessionId(); + String resourceName = message.getResourceName(); + String partitionName = message.getPartitionName(); + String targetState = message.getToState(); + String instanceSessionId = _liveInstanceMap.get(instance).getSessionId(); + + if (!instanceSessionId.equals(sessionId)) { + continue; + } + + Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId); + if (sessionCurrentStateMap == null) { + continue; + } + CurrentState currentState = sessionCurrentStateMap.get(resourceName); + if (currentState == null || !targetState.equals(currentState.getState(partitionName))) { + continue; + } + long transitionCompleteTime = currentState.getEndTime(partitionName); + + for (Message msg : message.getRelayMessages().values()) { + msg.setRelayTime(transitionCompleteTime); + if (!message.isExpired()) { + relayMessages.add(msg); + } + } + } + } + } + + for (Message message : relayMessages) { + String instance = message.getTgtName(); + Map<String, Message> instanceMessages = messageMap.get(instance); + if (instanceMessages == null) { + instanceMessages = new HashMap<>(); + messageMap.put(instance, instanceMessages); + } + instanceMessages.put(message.getId(), message); + } + } + private void refreshCurrentStates(HelixDataAccessor accessor) { refreshCurrentStatesCache(accessor); http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java index 3821303..97420e5 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java @@ -54,13 +54,13 @@ public class CurrentStateOutput { private final Map<String, CurrentState> _curStateMetaMap; public CurrentStateOutput() { - _currentStateMap = new HashMap<String, Map<Partition, Map<String, String>>>(); - _pendingStateMap = new HashMap<String, Map<Partition, Map<String, Message>>>(); - _cancellationStateMap = new HashMap<String, Map<Partition, Map<String, Message>>>(); - _resourceStateModelMap = new HashMap<String, String>(); - _curStateMetaMap = new HashMap<String, CurrentState>(); - _requestedStateMap = new HashMap<String, Map<Partition, Map<String, String>>>(); - _infoMap = new HashMap<String, Map<Partition, Map<String, String>>>(); + _currentStateMap = new HashMap<>(); + _pendingStateMap = new HashMap<>(); + _cancellationStateMap = new HashMap<>(); + _resourceStateModelMap = new HashMap<>(); + _curStateMetaMap = new HashMap<>(); + _requestedStateMap = new HashMap<>(); + _infoMap = new HashMap<>(); } public void setResourceStateModelDef(String resourceName, String stateModelDefName) { @@ -331,7 +331,7 @@ public class CurrentStateOutput { private Map<String, Integer> getPartitionCountWithState(String resourceStateModel, String state, Map<String, Map<Partition, Map<String, Object>>> stateMap) { - Map<String, Integer> currentPartitionCount = new HashMap<String, Integer>(); + Map<String, Integer> currentPartitionCount = new HashMap<>(); for (String resource : stateMap.keySet()) { String stateModel = _resourceStateModelMap.get(resource); if ((stateModel != null && stateModel.equals(resourceStateModel)) || (stateModel == null http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index ca0adfe..d60c20e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.api.config.StateTransitionTimeoutConfig; import org.apache.helix.controller.pipeline.AbstractBaseStage; @@ -187,7 +186,7 @@ public class MessageGenerationPhase extends AbstractBaseStage { message.setExecutionTimeout(timeout); } - message.getRecord().setSimpleField("ClusterEventName", event.getEventType().name()); + message.setAttribute(Message.Attributes.ClusterEventName, event.getEventType().name()); // output.addMessage(resourceName, partition, message); if (!messageMap.containsKey(desiredState)) { messageMap.put(desiredState, new ArrayList<Message>()); http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java index 94b645d..1a94dcb 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java @@ -22,17 +22,20 @@ package org.apache.helix.controller.stages; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +43,7 @@ import org.slf4j.LoggerFactory; public class MessageSelectionStage extends AbstractBaseStage { private static final Logger LOG = LoggerFactory.getLogger(MessageSelectionStage.class); - public static class Bounds { + protected static class Bounds { private int upper; private int lower; @@ -49,26 +52,6 @@ public class MessageSelectionStage extends AbstractBaseStage { this.upper = upper; } - public void increaseUpperBound() { - upper++; - } - - public void increaseLowerBound() { - lower++; - } - - public void decreaseUpperBound() { - upper--; - } - - public void decreaseLowerBound() { - lower--; - } - - public int getLowerBound() { - return lower; - } - public int getUpperBound() { return upper; } @@ -100,11 +83,11 @@ public class MessageSelectionStage extends AbstractBaseStage { computeStateConstraints(stateModelDef, idealState, cache); for (Partition partition : resource.getPartitions()) { List<Message> messages = messageGenOutput.getMessages(resourceName, partition); - List<Message> selectedMessages = - selectMessages(cache.getLiveInstances(), - currentStateOutput.getCurrentStateMap(resourceName, partition), - currentStateOutput.getPendingMessageMap(resourceName, partition), messages, - stateConstraints, stateTransitionPriorities, stateModelDef.getInitialState()); + List<Message> selectedMessages = selectMessages(cache.getLiveInstances(), + currentStateOutput.getCurrentStateMap(resourceName, partition), + currentStateOutput.getPendingMessageMap(resourceName, partition), messages, + stateConstraints, stateTransitionPriorities, stateModelDef, + resource.isP2PMessageEnabled()); output.addMessages(resourceName, partition, selectedMessages); } } @@ -124,34 +107,36 @@ public class MessageSelectionStage extends AbstractBaseStage { } // TODO: This method deserves its own class. The class should not understand helix but - // just be - // able to solve the problem using the algo. I think the method is following that but if - // we don't move it to another class its quite easy to break that contract + // just be able to solve the problem using the algo. I think the method is following that + // but if we don't move it to another class its quite easy to break that contract /** * greedy message selection algorithm: 1) calculate CS+PS state lower/upper-bounds 2) * group messages by state transition and sorted by priority 3) from highest priority to * lowest, for each message group with the same transition add message one by one and * make sure state constraint is not violated update state lower/upper-bounds when a new - * message is selected + * message is selected. + * + * @param liveInstances * @param currentStates - * @param pendingStates + * @param pendingMessages * @param messages * @param stateConstraints - * : STATE -> bound (lower:upper) * @param stateTransitionPriorities - * : FROME_STATE-TO_STATE -> priority - * @return: selected messages + * @param stateModelDef + * @return */ List<Message> selectMessages(Map<String, LiveInstance> liveInstances, Map<String, String> currentStates, Map<String, Message> pendingMessages, List<Message> messages, Map<String, Bounds> stateConstraints, - final Map<String, Integer> stateTransitionPriorities, String initialState) { + final Map<String, Integer> stateTransitionPriorities, StateModelDefinition stateModelDef, + boolean p2pMessageEnabled) { if (messages == null || messages.isEmpty()) { return Collections.emptyList(); } - List<Message> selectedMessages = new ArrayList<Message>(); - Map<String, Integer> stateCnts = new HashMap<String, Integer>(); + List<Message> selectedMessages = new ArrayList<>(); + Map<String, Integer> stateCnts = new HashMap<>(); + String initialState = stateModelDef.getInitialState(); // count currentState, if no currentState, count as in initialState for (String instance : liveInstances.keySet()) { String state = initialState; @@ -171,7 +156,10 @@ public class MessageSelectionStage extends AbstractBaseStage { // group messages based on state transition priority Map<Integer, List<Message>> messagesGroupByStateTransitPriority = - new TreeMap<Integer, List<Message>>(); + new TreeMap<>(); + + /* record all state transition messages that transition a replica from top-state */ + List<Message> fromTopStateMessages = new LinkedList<>(); for (Message message : messages) { if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION_CANCELLATION.name())) { selectedMessages.add(message); @@ -190,6 +178,10 @@ public class MessageSelectionStage extends AbstractBaseStage { messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>()); } messagesGroupByStateTransitPriority.get(priority).add(message); + + if (fromState.equals(stateModelDef.getTopState())) { + fromTopStateMessages.add(message); + } } // select messages @@ -200,8 +192,18 @@ public class MessageSelectionStage extends AbstractBaseStage { if (stateConstraints.containsKey(toState)) { int newCnt = (stateCnts.containsKey(toState) ? stateCnts.get(toState) + 1 : 1); if (newCnt > stateConstraints.get(toState).getUpperBound()) { - LOG.info("Reach upper_bound: " + stateConstraints.get(toState).getUpperBound() - + ", not send message: " + message); + if (p2pMessageEnabled && toState.equals(stateModelDef.getTopState()) + && stateModelDef.isSingleTopStateModel()) { + // attach this message as a relay message to the message to transition off current top-state replica + if (fromTopStateMessages.size() > 0) { + Message fromTopStateMsg = fromTopStateMessages.get(0); + fromTopStateMsg.attachRelayMessage(message.getTgtName(), message); + fromTopStateMessages.remove(0); + } + } else { + // reach upper-bound of message for the topState, will not send the message + LOG.info("Reach upper_bound: " + stateConstraints.get(toState).getUpperBound() + ", not send message: " + message); + } continue; } } http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java index c6b4054..a6c2779 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java @@ -56,7 +56,6 @@ public class ResourceComputationStage extends AbstractBaseStage { Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>(); Map<String, Resource> resourceToRebalance = new LinkedHashMap<>(); - if (idealStates != null && idealStates.size() > 0) { for (IdealState idealState : idealStates.values()) { if (idealState == null) { @@ -65,7 +64,8 @@ public class ResourceComputationStage extends AbstractBaseStage { Set<String> partitionSet = idealState.getPartitionSet(); String resourceName = idealState.getResourceName(); if (!resourceMap.containsKey(resourceName)) { - Resource resource = new Resource(resourceName); + Resource resource = new Resource(resourceName, cache.getClusterConfig(), + cache.getResourceConfig(resourceName)); resourceMap.put(resourceName, resource); if (!idealState.isValid() && !cache.isTaskCache() http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java index ee0f1e5..dfa747e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java @@ -144,7 +144,15 @@ public class TaskAssignmentStage extends AbstractBaseStage { "Sending Message " + message.getMsgId() + " to " + message.getTgtName() + " transit " + message.getResourceName() + "." + message.getPartitionName() + "|" + message .getPartitionNames() + " from:" + message.getFromState() + " to:" + message - .getToState() + ", Message type "); + .getToState() + ", relayMessages: " + message.getRelayMessages().size()); + if (message.hasRelayMessages()) { + for (Message msg : message.getRelayMessages().values()) { + logger.info("Sending Relay Message " + msg.getMsgId() + " to " + msg.getTgtName() + " transit " + + msg.getResourceName() + "." + msg.getPartitionName() + "|" + msg.getPartitionNames() + " from:" + + msg.getFromState() + " to:" + msg.getToState() + ", relayFrom: " + msg.getRelaySrcHost() + + ", attached to message: " + message.getMsgId()); + } + } keys.add(keyBuilder.message(message.getTgtName(), message.getId())); } http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java index 9fed082..3979a4b 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java @@ -197,6 +197,7 @@ public class BatchMessageHandler extends MessageHandler { result.setSuccess(isBatchTaskSucceed); } + result.setCompleteTime(System.currentTimeMillis()); // pass task-result to post-handle-msg _notificationContext.add(MapKey.HELIX_TASK_RESULT.toString(), result); postHandleMessage(); http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java index 26df112..ece7ac7 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java @@ -94,6 +94,12 @@ public class HelixStateTransitionHandler extends MessageHandler { throw new HelixException(errorMessage); } + logger.info( + "handling message: " + _message.getMsgId() + " transit " + _message.getResourceName() + + "." + _message.getPartitionName() + "|" + _message.getPartitionNames() + " from:" + + _message.getFromState() + " to:" + _message.getToState() + ", relayedFrom: " + + _message.getRelaySrcHost()); + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); String partitionName = _message.getPartitionName(); @@ -188,9 +194,16 @@ public class HelixStateTransitionHandler extends MessageHandler { // Set the INFO property and mark the end time, previous state of the state transition _currentStateDelta.setInfo(partitionKey, taskResult.getInfo()); - _currentStateDelta.setEndTime(partitionKey, System.currentTimeMillis()); + _currentStateDelta.setEndTime(partitionKey, taskResult.getCompleteTime()); _currentStateDelta.setPreviousState(partitionKey, _message.getFromState()); + // add host name this state transition is triggered by. + if (Message.MessageType.RELAYED_MESSAGE.name().equals(_message.getMsgSubType())) { + _currentStateDelta.setTriggerHost(partitionKey, _message.getRelaySrcHost()); + } else { + _currentStateDelta.setTriggerHost(partitionKey, _message.getMsgSrc()); + } + if (taskResult.isSuccess()) { // String fromState = message.getFromState(); String toState = _message.getToState(); @@ -339,6 +352,7 @@ public class HelixStateTransitionHandler extends MessageHandler { } } + taskResult.setCompleteTime(System.currentTimeMillis()); // add task result to context for postHandling context.add(MapKey.HELIX_TASK_RESULT.toString(), taskResult); postHandleMessage(); http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java index 3272ca4..7b1853f 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java @@ -20,15 +20,19 @@ package org.apache.helix.messaging.handling; */ import java.util.Date; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixRollbackException; import org.apache.helix.InstanceType; import org.apache.helix.NotificationContext; import org.apache.helix.NotificationContext.MapKey; +import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.PropertyPathBuilder; import org.apache.helix.messaging.handling.MessageHandler.ErrorCode; import org.apache.helix.messaging.handling.MessageHandler.ErrorType; import org.apache.helix.model.Message; @@ -37,6 +41,7 @@ import org.apache.helix.model.Message.MessageType; import org.apache.helix.monitoring.StateTransitionContext; import org.apache.helix.monitoring.StateTransitionDataPoint; import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor; +import org.apache.helix.task.TaskResult; import org.apache.helix.util.StatusUpdateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,8 +166,12 @@ public class HelixTask implements MessageTask { } } + // forward relay messages attached to this message to other participants + if (taskResult.isSuccess()) { + forwardRelayMessages(accessor, _message, taskResult.getCompleteTime()); + } + if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) { - // System.err.println("\t[dbg]remove msg: " + getTaskId()); removeMessageFromZk(accessor, _message); reportMessageStat(_manager, _message, taskResult); sendReply(accessor, _message, taskResult); @@ -196,11 +205,46 @@ public class HelixTask implements MessageTask { private void removeMessageFromZk(HelixDataAccessor accessor, Message message) { Builder keyBuilder = accessor.keyBuilder(); + PropertyKey msgKey; if (message.getTgtName().equalsIgnoreCase("controller")) { - // TODO: removeProperty returns boolean - accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId())); + msgKey = keyBuilder.controllerMessage(message.getMsgId()); } else { - accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(), message.getMsgId())); + msgKey = keyBuilder.message(_manager.getInstanceName(), message.getMsgId()); + } + boolean success = accessor.removeProperty(msgKey); + if (!success) { + logger.warn("Failed to delete message " + message.getId() + " from zk!"); + } + } + + private void forwardRelayMessages(HelixDataAccessor accessor, Message message, + long taskCompletionTime) { + if (message.hasRelayMessages()) { + Map<String, Message> relayMessages = message.getRelayMessages(); + Builder keyBuilder = accessor.keyBuilder(); + + // Ignore all relay messages if participant's session has changed. + if (!_manager.getSessionId().equals(message.getTgtSessionId())) { + return; + } + + for (String instance : relayMessages.keySet()) { + Message msg = relayMessages.get(instance); + if (msg.getMsgSubType().equals(MessageType.RELAYED_MESSAGE.name())) { + msg.setRelayTime(taskCompletionTime); + if (msg.isExpired()) { + logger.info( + "Relay message expired, ignore it! " + msg.getId() + " to instance " + instance); + continue; + } + PropertyKey msgKey = keyBuilder.message(instance, msg.getId()); + boolean success = accessor.getBaseDataAccessor() + .create(msgKey.getPath(), msg.getRecord(), AccessOption.PERSISTENT); + if (!success) { + logger.warn("Failed to send relay message " + msg.getId() + " to " + instance); + } + } + } } } http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index 4c67a5e..c0be583 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -790,8 +790,15 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { continue; } - String tgtSessionId = message.getTgtSessionId(); + if (message.isExpired()) { + LOG.info( + "Dropping expired message. mid: " + message.getId() + ", from: " + message.getMsgSrc() + " relayed from: " + + message.getRelaySrcHost()); + reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.DISCARDED); + continue; + } + String tgtSessionId = message.getTgtSessionId(); // sessionId mismatch normally means message comes from expired session, just remove it if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) { String warningMessage = http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java index 5ed6140..da96eaf 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java @@ -31,6 +31,7 @@ public class HelixTaskResult { private Map<String, String> _taskResultMap = new HashMap<String, String>(); private boolean _interrupted = false; Exception _exception = null; + private long _completeTime = -1; public boolean isSuccess() { return _success; @@ -83,4 +84,12 @@ public class HelixTaskResult { public Exception getException() { return _exception; } + + public long getCompleteTime() { + return _completeTime; + } + + public void setCompleteTime(long completeTime) { + _completeTime = completeTime; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index 2a97145..fad253d 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; +import org.apache.helix.api.config.HelixConfigProperty; import org.apache.helix.api.config.StateTransitionThrottleConfig; import org.apache.helix.api.config.StateTransitionTimeoutConfig; @@ -492,6 +493,28 @@ public class ClusterConfig extends HelixProperty { } /** + * Whether the P2P state transition message is enabled for all resources in this cluster. By + * default it is disabled if not set. + * + * @return + */ + public boolean isP2PMessageEnabled() { + return _record.getBooleanField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name(), false); + } + + /** + * Enable P2P state transition message for all resources in this cluster. P2P State Transition + * message can reduce the top-state replica unavailable time during top-state handoff period. This + * only applies for those resources with state models that only have a single top-state replica, + * such as MasterSlave or LeaderStandy models. By default P2P message is disabled if not set. + * + * @param enabled + */ + public void enableP2PMessage(boolean enabled) { + _record.setBooleanField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name(), enabled); + } + + /** * Get IdealState rules defined in the cluster config. * * @return http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/model/CurrentState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java index 34b525e..c227060 100644 --- a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java +++ b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java @@ -47,7 +47,8 @@ public class CurrentState extends HelixProperty { RESOURCE, START_TIME, END_TIME, - PREVIOUS_STATE // , + PREVIOUS_STATE, + TRIGGERED_BY// , // BUCKET_SIZE } @@ -134,6 +135,10 @@ public class CurrentState extends HelixProperty { return endTime == null ? -1L : Long.parseLong(endTime); } + public String getTriggerHost(String partitionName) { + return getProperty(partitionName, CurrentStateProperty.TRIGGERED_BY); + } + public String getPreviousState(String partitionName) { return getProperty(partitionName, CurrentStateProperty.PREVIOUS_STATE); } @@ -187,6 +192,10 @@ public class CurrentState extends HelixProperty { setProperty(partitionName, CurrentStateProperty.END_TIME, String.valueOf(endTime)); } + public void setTriggerHost(String partitionName, String triggerHost) { + setProperty(partitionName, CurrentStateProperty.TRIGGERED_BY, triggerHost); + } + public void setPreviousState(String partitionName, String state) { setProperty(partitionName, CurrentStateProperty.PREVIOUS_STATE, state); } http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/model/Message.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java index 668242a..d987c54 100644 --- a/helix-core/src/main/java/org/apache/helix/model/Message.java +++ b/helix-core/src/main/java/org/apache/helix/model/Message.java @@ -19,12 +19,16 @@ package org.apache.helix.model; * under the License. */ +import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.helix.HelixException; @@ -50,7 +54,9 @@ public class Message extends HelixProperty { TASK_REPLY, NO_OP, PARTICIPANT_ERROR_REPORT, - PARTICIPANT_SESSION_CHANGE + PARTICIPANT_SESSION_CHANGE, + CHAINED_MESSAGE, // this is a message subtype + RELAYED_MESSAGE } /** @@ -58,6 +64,7 @@ public class Message extends HelixProperty { */ public enum Attributes { MSG_ID, + RELAY_MSG_ID, SRC_SESSION_ID, TGT_SESSION_ID, SRC_NAME, @@ -84,7 +91,12 @@ public class Message extends HelixProperty { STATE_MODEL_FACTORY_NAME, BUCKET_SIZE, PARENT_MSG_ID, // used for group message mode - INNER_MESSAGE + ClusterEventName, + INNER_MESSAGE, + RELAY_PARTICIPANTS, + RELAY_TIME, + RELAY_FROM, + EXPIRY_PERIOD } /** @@ -96,6 +108,9 @@ public class Message extends HelixProperty { UNPROCESSABLE // get exception when create handler } + // default expiry time period for a relay message. + public static final long RELAY_MESSAGE_DEFAULT_EXPIRY = 5 * 1000; //5 second + /** * Compares the creation time of two Messages */ @@ -665,6 +680,171 @@ public class Message extends HelixProperty { } /** + * Get the completion time of previous task associated with this message. + * This applies only when this is a relay message, + * which specified the completion time of the task running on the participant that sent this relay message. + * + * @return + */ + public long getRelayTime() { + return _record.getLongField(Attributes.RELAY_TIME.name(), -1); + } + + /** + * Set the completion time of previous task associated with this message. + * This applies only when this is a relay message, + * which specified the completion time of the task running on the participant that sent this relay message. + * + * @param completionTime + */ + public void setRelayTime(long completionTime) { + _record.setLongField(Attributes.RELAY_TIME.name(), completionTime); + } + + /** + * Attach a relayed message and its destination participant to this message. + * + * WARNNING: only content in SimpleFields of relayed message will be carried over and sent, + * all contents in either ListFields or MapFields will be ignored. + * + * @param instance destination participant name + * @param message relayed message. + */ + public void attachRelayMessage(String instance, Message message) { + List<String> relayList = _record.getListField(Attributes.RELAY_PARTICIPANTS.name()); + if (relayList == null) { + relayList = Collections.EMPTY_LIST; + } + Set<String> relayParticipants = new LinkedHashSet<>(relayList); + relayParticipants.add(instance); + Map<String, String> messageInfo = message.getRecord().getSimpleFields(); + messageInfo.put(Attributes.RELAY_MSG_ID.name(), message.getId()); + messageInfo.put(Attributes.MSG_SUBTYPE.name(), MessageType.RELAYED_MESSAGE.name()); + messageInfo.put(Attributes.RELAY_FROM.name(), getTgtName()); + messageInfo + .put(Attributes.EXPIRY_PERIOD.name(), String.valueOf(RELAY_MESSAGE_DEFAULT_EXPIRY)); + _record.setMapField(instance, messageInfo); + _record.setListField(Attributes.RELAY_PARTICIPANTS.name(), + Lists.newArrayList(relayParticipants)); + } + + /** + * Get relay message attached for the given instance. + * + * @param instance + * @return null if no message for the instance + */ + public Message getRelayMessage(String instance) { + Map<String, String> messageInfo = _record.getMapField(instance); + if (messageInfo != null) { + String id = messageInfo.get(Attributes.RELAY_MSG_ID.name()); + if (id == null) { + id = messageInfo.get(Attributes.MSG_ID.name()); + if (id == null) { + return null; + } + } + ZNRecord record = new ZNRecord(id); + record.setSimpleFields(messageInfo); + return new Message(record); + } + + return null; + } + + public String getRelaySrcHost() { + return _record.getSimpleField(Attributes.RELAY_FROM.name()); + } + + /** + * Get all relay messages attached to this message as a map (instance->message). + * + * @return map of instanceName->message, empty map if none. + */ + public Map<String, Message> getRelayMessages() { + Map<String, Message> relayMessageMap = new HashMap<>(); + List<String> relayParticipants = _record.getListField(Attributes.RELAY_PARTICIPANTS.name()); + if (relayParticipants != null) { + for (String p : relayParticipants) { + Message msg = getRelayMessage(p); + if (p != null) { + relayMessageMap.put(p, msg); + } + } + } + + return relayMessageMap; + } + + /** + * Whether there are any relay message attached to this message. + * + * @return + */ + public boolean hasRelayMessages() { + List<String> relayHosts = _record.getListField(Attributes.RELAY_PARTICIPANTS.name()); + return (relayHosts != null && relayHosts.size() > 0); + } + + /** + * Whether this message is a relay message. + * @return + */ + public boolean isRelayMessage() { + String subType = _record.getStringField(Attributes.MSG_SUBTYPE.name(), null); + String relayFrom = _record.getStringField(Attributes.RELAY_FROM.name(), null); + return MessageType.RELAYED_MESSAGE.name().equals(subType) && (relayFrom != null); + } + + /** + * Whether a message is expired. + * + * A message is expired if: + * 1) creationTime + expiryPeriod > current time + * or + * 2) relayTime + expiryPeriod > current time iff it is relay message. + * + * @return + */ + public boolean isExpired() { + long expiry = getExpiryPeriod(); + if (expiry < 0) { + return false; + } + + long current = System.currentTimeMillis(); + + // use relay time if this is a relay message + if (isRelayMessage()) { + long relayTime = getRelayTime(); + return relayTime <= 0 || (relayTime + expiry < current); + } + + return getCreateTimeStamp() + expiry < current; + } + + /** + * Get the expiry period (in milliseconds) + * + * @return + */ + public long getExpiryPeriod() { + return _record.getLongField(Attributes.EXPIRY_PERIOD.name(), -1); + } + + /** + * Set expiry period for this message. + * A message will be expired after this period of time from either its 1) creationTime or 2) + * relayTime if it is relay message. + * Default is -1 if it is not set. + * + * @param expiry + */ + public void setExpiryPeriod(long expiry) { + _record.setLongField(Attributes.EXPIRY_PERIOD.name(), expiry); + } + + /** * Check if this message is targetted for a controller * @return true if this is a controller message, false otherwise */ http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/model/Resource.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/Resource.java b/helix-core/src/main/java/org/apache/helix/model/Resource.java index 3af830a..b911244 100644 --- a/helix-core/src/main/java/org/apache/helix/model/Resource.java +++ b/helix-core/src/main/java/org/apache/helix/model/Resource.java @@ -24,16 +24,14 @@ import java.util.LinkedHashMap; import java.util.Map; import org.apache.helix.HelixConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import org.apache.helix.api.config.HelixConfigProperty; /** * A resource contains a set of partitions and its replicas are managed by a state model */ public class Resource { - private static Logger LOG = LoggerFactory.getLogger(Resource.class); - private final String _resourceName; + private ClusterConfig _clusterConfig; + private ResourceConfig _resourceConfig; private final Map<String, Partition> _partitionMap; private String _stateModelDefRef; private String _stateModelFactoryName; @@ -48,7 +46,18 @@ public class Resource { */ public Resource(String resourceName) { this._resourceName = resourceName; - this._partitionMap = new LinkedHashMap<String, Partition>(); + this._partitionMap = new LinkedHashMap<>(); + } + + /** + * Instantiate a resource by its name + * + * @param resourceName the name of the resource that identifies it + */ + public Resource(String resourceName, ClusterConfig clusterConfig, ResourceConfig resourceConfig) { + this(resourceName); + _clusterConfig = clusterConfig; + _resourceConfig = resourceConfig; } /** @@ -185,6 +194,25 @@ public class Resource { _resourceGroupName = resourceGroupName; } + /** + * Whether P2P state transition message is enabled. + * + * @return + */ + public boolean isP2PMessageEnabled() { + String enabledInResource = _resourceConfig != null ? + _resourceConfig.getRecord().getSimpleField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name()) : null; + + if (enabledInResource != null) { + return Boolean.valueOf(enabledInResource); + } + + String enabledInCluster = _clusterConfig != null ? + _clusterConfig.getRecord().getSimpleField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name()) : null; + + return enabledInCluster != null ? Boolean.valueOf(enabledInCluster) : false; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java index 8278be3..274640c 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java @@ -20,13 +20,13 @@ package org.apache.helix.model; */ import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; +import org.apache.helix.api.config.HelixConfigProperty; import org.apache.helix.api.config.RebalanceConfig; import org.apache.helix.api.config.StateTransitionTimeoutConfig; import org.slf4j.Logger; @@ -61,7 +61,6 @@ public class ResourceConfig extends HelixProperty { } private static final Logger _logger = LoggerFactory.getLogger(ResourceConfig.class.getName()); - /** * Instantiate for a specific instance * @@ -95,13 +94,18 @@ public class ResourceConfig extends HelixProperty { Boolean helixEnabled, String resourceGroupName, String resourceType, Boolean groupRoutingEnabled, Boolean externalViewDisabled, RebalanceConfig rebalanceConfig, StateTransitionTimeoutConfig stateTransitionTimeoutConfig, - Map<String, List<String>> listFields, Map<String, Map<String, String>> mapFields) { + Map<String, List<String>> listFields, Map<String, Map<String, String>> mapFields, + Boolean p2pMessageEnabled) { super(resourceId); if (monitorDisabled != null) { _record.setBooleanField(ResourceConfigProperty.MONITORING_DISABLED.name(), monitorDisabled); } + if (p2pMessageEnabled != null) { + _record.setBooleanField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name(), p2pMessageEnabled); + } + if (numPartitions > 0) { _record.setIntField(ResourceConfigProperty.NUM_PARTITIONS.name(), numPartitions); } @@ -180,6 +184,15 @@ public class ResourceConfig extends HelixProperty { return _record.getBooleanField(ResourceConfigProperty.MONITORING_DISABLED.toString(), false); } + /** + * Whether the P2P state transition message is enabled for this resource. + * By default it is disabled if not set. + * + * @return + */ + public boolean isP2PMessageEnabled() { + return _record.getBooleanField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name(), false); + } /** * Get the associated resource @@ -458,6 +471,7 @@ public class ResourceConfig extends HelixProperty { private String _resourceType; private Boolean _groupRoutingEnabled; private Boolean _externalViewDisabled; + private Boolean _p2pMessageEnabled; private RebalanceConfig _rebalanceConfig; private StateTransitionTimeoutConfig _stateTransitionTimeoutConfig; private Map<String, List<String>> _preferenceLists; @@ -472,6 +486,17 @@ public class ResourceConfig extends HelixProperty { return this; } + /** + * Enable/Disable the p2p state transition message for this resource. + * By default it is disabled if not set. + * + * @param enabled + */ + public Builder setP2PMessageEnabled(boolean enabled) { + _p2pMessageEnabled = enabled; + return this; + } + public Boolean isMonitorDisabled() { return _monitorDisabled; } @@ -693,7 +718,7 @@ public class ResourceConfig extends HelixProperty { _stateModelFactoryName, _numReplica, _minActiveReplica, _maxPartitionsPerInstance, _instanceGroupTag, _helixEnabled, _resourceGroupName, _resourceType, _groupRoutingEnabled, _externalViewDisabled, _rebalanceConfig, _stateTransitionTimeoutConfig, _preferenceLists, - _mapFields); + _mapFields, _p2pMessageEnabled); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java index 01a3746..2fbbfcb 100644 --- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java +++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java @@ -95,8 +95,8 @@ public class StateModelDefinition extends HelixProperty { record.getListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString()); _stateTransitionPriorityList = record.getListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString()); - _stateTransitionTable = new HashMap<String, Map<String, String>>(); - _statesCountMap = new HashMap<String, String>(); + _stateTransitionTable = new HashMap<>(); + _statesCountMap = new HashMap<>(); if (_statesPriorityList != null) { int priority = 1; for (String state : _statesPriorityList) { @@ -203,6 +203,22 @@ public class StateModelDefinition extends HelixProperty { } /** + * Whether this state model allows at most a single replica in the top-state? + * + * @return + */ + public boolean isSingleTopStateModel() { + int topStateCount = 0; + try { + topStateCount = Integer.valueOf(_statesCountMap.get(getTopState())); + } catch (NumberFormatException ex) { + + } + + return topStateCount == 1; + } + + /** * Get the second top states, which need one step transition to top state * @return a set of second top states */ @@ -244,9 +260,9 @@ public class StateModelDefinition extends HelixProperty { */ public Builder(String name) { this._statemodelName = name; - statesMap = new HashMap<String, Integer>(); - transitionMap = new HashMap<Transition, Integer>(); - stateConstraintMap = new HashMap<String, String>(); + statesMap = new HashMap<>(); + transitionMap = new HashMap<>(); + stateConstraintMap = new HashMap<>(); } /** @@ -368,7 +384,7 @@ public class StateModelDefinition extends HelixProperty { } }; Collections.sort(transitionList, c2); - List<String> transitionPriorityList = new ArrayList<String>(transitionList.size()); + List<String> transitionPriorityList = new ArrayList<>(transitionList.size()); for (Transition t : transitionList) { transitionPriorityList.add(t.toString()); } @@ -383,7 +399,7 @@ public class StateModelDefinition extends HelixProperty { StateTransitionTableBuilder stateTransitionTableBuilder = new StateTransitionTableBuilder(); Map<String, Map<String, String>> transitionTable = stateTransitionTableBuilder.buildTransitionTable(statePriorityList, - new ArrayList<Transition>(transitionMap.keySet())); + new ArrayList<>(transitionMap.keySet())); for (String state : transitionTable.keySet()) { record.setMapField(state + ".next", transitionTable.get(state)); } http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java index 1589c1a..2af4151 100644 --- a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java @@ -36,7 +36,8 @@ import org.apache.helix.model.ResourceAssignment; /** * This rebalancer is deprecated, left here only for back-compatible. * */ -@Deprecated public class FixedTargetTaskRebalancer extends DeprecatedTaskRebalancer { +@Deprecated +public class FixedTargetTaskRebalancer extends DeprecatedTaskRebalancer { private FixedTargetTaskAssignmentCalculator taskAssignmentCalculator = new FixedTargetTaskAssignmentCalculator(); http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java index 1aa82f4..e2d09ba 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java @@ -19,6 +19,7 @@ package org.apache.helix.controller.stages; * under the License. */ +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -43,9 +44,13 @@ import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Resource; +import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.tools.StateModelConfigGenerator; +import org.codehaus.jackson.annotate.JsonAnySetter; +import org.testng.ITestContext; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; @@ -73,7 +78,6 @@ public class BaseStageTest { + new Date(System.currentTimeMillis())); } - @BeforeMethod() public void setup() { _clusterName = "testCluster-" + UUID.randomUUID().toString(); manager = new MockManager(_clusterName); @@ -85,6 +89,21 @@ public class BaseStageTest { admin.addCluster(_clusterName); } + @BeforeMethod + public void beforeTest(Method testMethod, ITestContext testContext){ + long startTime = System.currentTimeMillis(); + System.out.println("START " + testMethod.getName() + " at " + new Date(startTime)); + testContext.setAttribute("StartTime", System.currentTimeMillis()); + setup(); + } + + @AfterMethod + public void endTest(Method testMethod, ITestContext testContext) { + Long startTime = (Long) testContext.getAttribute("StartTime"); + long endTime = System.currentTimeMillis(); + System.out.println("END " + testMethod.getName() + " at " + new Date(endTime) + ", took: " + (endTime - startTime) + "ms."); + } + protected List<IdealState> setupIdealState(int nodes, String[] resources, int partitions, int replicas, RebalanceMode rebalanceMode, String stateModelName, String rebalanceClassName, String rebalanceStrategyName) { @@ -137,14 +156,18 @@ public class BaseStageTest { stateModelName, rebalanceClassName, null); } - protected void setupLiveInstances(int numLiveInstances) { + protected List<String> setupLiveInstances(int numLiveInstances) { + List<String> instances = new ArrayList<>(); for (int i = 0; i < numLiveInstances; i++) { LiveInstance liveInstance = new LiveInstance(HOSTNAME_PREFIX + i); liveInstance.setSessionId(SESSION_PREFIX + i); Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.liveInstance(HOSTNAME_PREFIX + i), liveInstance); + instances.add(liveInstance.getInstanceName()); } + + return instances; } protected void setupInstances(int numInstances) { @@ -234,4 +257,20 @@ public class BaseStageTest { return resourceMap; } + + protected Map<String, Resource> getResourceMap(String[] resources, int partitions, + String stateModel, ClusterConfig clusterConfig, ResourceConfig resourceConfig) { + Map<String, Resource> resourceMap = new HashMap<String, Resource>(); + + for (String r : resources) { + Resource testResource = new Resource(r, clusterConfig, resourceConfig); + testResource.setStateModelDefRef(stateModel); + for (int i = 0; i < partitions; i++) { + testResource.addPartition(r + "_" + i); + } + resourceMap.put(r, testResource); + } + + return resourceMap; + } } http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java index 994e2fa..4e20116 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java @@ -27,8 +27,8 @@ import java.util.Map; import java.util.UUID; import org.apache.helix.TestHelper; -import org.apache.helix.controller.stages.MessageSelectionStage; import org.apache.helix.controller.stages.MessageSelectionStage.Bounds; +import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageState; @@ -86,7 +86,8 @@ public class TestMsgSelectionStage { List<Message> selectedMsg = new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingMessages, - messages, stateConstraints, stateTransitionPriorities, "OFFLINE"); + messages, stateConstraints, stateTransitionPriorities, + BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(), false); Assert.assertEquals(selectedMsg.size(), 1); Assert.assertEquals(selectedMsg.get(0).getMsgId(), "msgId_1"); @@ -123,7 +124,8 @@ public class TestMsgSelectionStage { List<Message> selectedMsg = new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingMessages, - messages, stateConstraints, stateTransitionPriorities, "OFFLINE"); + messages, stateConstraints, stateTransitionPriorities, + BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(), false); Assert.assertEquals(selectedMsg.size(), 0); System.out.println("END testMasterXferAfterMasterResume at " http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java index ae7f2f5..47de2d3 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java @@ -182,7 +182,6 @@ public class TestResourceComputationStage extends BaseStageTest { @Test public void testNull() { - ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown); ResourceComputationStage stage = new ResourceComputationStage(); StageContext context = new StageContext(); stage.init(context); http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java index 21878ac..85ecb0c 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java @@ -49,7 +49,7 @@ import org.testng.annotations.Test; public class TestZkReconnect { private static final Logger LOG = LoggerFactory.getLogger(TestZkReconnect.class); - @Test + @Test (enabled = false) public void testZKReconnect() throws Exception { final AtomicReference<ZkServer> zkServerRef = new AtomicReference<ZkServer>(); final int zkPort = TestHelper.getRandomPort(); http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java index ba2196c..4e49502 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java @@ -19,6 +19,8 @@ package org.apache.helix.integration.common; * under the License. */ +import java.lang.reflect.Method; +import java.util.Date; import java.util.Map; import java.util.Set; import java.util.logging.Level; @@ -54,7 +56,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.AssertJUnit; +import org.testng.ITestContext; +import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeSuite; public class ZkIntegrationTestBase { @@ -95,6 +100,21 @@ public class ZkIntegrationTestBase { TestHelper.stopZkServer(_zkServer); } + @BeforeMethod + public void beforeTest(Method testMethod, ITestContext testContext){ + long startTime = System.currentTimeMillis(); + System.out.println("START " + testMethod.getName() + " at " + new Date(startTime)); + testContext.setAttribute("StartTime", System.currentTimeMillis()); + } + + @AfterMethod + public void endTest(Method testMethod, ITestContext testContext) { + Long startTime = (Long) testContext.getAttribute("StartTime"); + long endTime = System.currentTimeMillis(); + System.out.println( + "END " + testMethod.getName() + " at " + new Date(endTime) + ", took: " + (endTime - startTime) + "ms."); + } + protected String getShortClassName() { return this.getClass().getSimpleName(); } http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java index ec2dd1b..1448453 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java @@ -93,7 +93,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // controller should have 5 + 2n + m + (m+2)n zk-watchers // where n is number of nodes and m is number of resources - return watchPaths.size() == (7 + 5 * n); + return watchPaths.size() == (8 + 5 * n); } }, 500); Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers."); @@ -119,7 +119,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // printHandlers(participantManagerToExpire); int controllerHandlerNb = controller.getHandlers().size(); int particHandlerNb = participantManagerToExpire.getHandlers().size(); - Assert.assertEquals(controllerHandlerNb, (6 + 2 * n), + Assert.assertEquals(controllerHandlerNb, (7 + 2 * n), "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant"); Assert.assertEquals(particHandlerNb, 1, "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers"); @@ -149,7 +149,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // controller should have 5 + 2n + m + (m+2)n zk-watchers // where n is number of nodes and m is number of resources - return watchPaths.size() == (7 + 5 * n); + return watchPaths.size() == (8 + 5 * n); } }, 500); Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers after session expiry."); http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java new file mode 100644 index 0000000..e0f83e1 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java @@ -0,0 +1,240 @@ +package org.apache.helix.integration.messaging; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyKey; +import org.apache.helix.api.config.HelixConfigProperty; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.mock.participant.MockTransition; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.MasterSlaveSMD; +import org.apache.helix.model.Message; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestP2PMessageSemiAuto extends ZkIntegrationTestBase { + final String CLASS_NAME = getShortClassName(); + final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + + static final int PARTICIPANT_NUMBER = 3; + static final int PARTICIPANT_START_PORT = 12918; + + static final String DB_NAME_1 = "TestDB_1"; + static final String DB_NAME_2 = "TestDB_2"; + + static final int PARTITION_NUMBER = 20; + static final int REPLICA_NUMBER = 3; + + List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>(); + List<String> _instances = new ArrayList<>(); + ClusterControllerManager _controller; + + HelixClusterVerifier _clusterVerifier; + ConfigAccessor _configAccessor; + HelixDataAccessor _accessor; + + @BeforeClass + public void beforeClass() + throws InterruptedException { + System.out.println( + "START " + getShortClassName() + " at " + new Date(System.currentTimeMillis())); + + // setup storage cluster + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < PARTICIPANT_NUMBER; i++) { + String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance); + _instances.add(instance); + } + + // start dummy participants + for (int i = 0; i < PARTICIPANT_NUMBER; i++) { + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i)); + participant.syncStart(); + _participants.add(participant); + } + + createDBInSemiAuto(DB_NAME_1, _instances); + createDBInSemiAuto(DB_NAME_2, _instances); + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build(); + Assert.assertTrue(_clusterVerifier.verify()); + + _configAccessor = new ConfigAccessor(_gZkClient); + _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + } + + private void createDBInSemiAuto(String dbName, List<String> preferenceList) { + _gSetupTool.addResourceToCluster(CLUSTER_NAME, dbName, PARTITION_NUMBER, + BuiltInStateModelDefinitions.MasterSlave.name(), IdealState.RebalanceMode.SEMI_AUTO.toString()); + _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, REPLICA_NUMBER); + + IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName); + for (String p : is.getPartitionSet()) { + is.setPreferenceList(p, preferenceList); + } + _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, dbName, is); + } + + @Test + public void testP2PStateTransitionDisabled() { + // disable the master instance + String prevMasterInstance = _instances.get(0); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false); + + Assert.assertTrue(_clusterVerifier.verify()); + verifyP2PMessage(DB_NAME_1,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName()); + verifyP2PMessage(DB_NAME_2,_instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName()); + + + //re-enable the old master + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true); + Assert.assertTrue(_clusterVerifier.verify()); + + verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName()); + verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName()); + } + + @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"}) + public void testP2PStateTransitionEnabledInCluster() { + enableP2PInCluster(true); + enableP2PInResource(DB_NAME_1,false); + enableP2PInResource(DB_NAME_2,false); + + // disable the master instance + String prevMasterInstance = _instances.get(0); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false); + + Assert.assertTrue(_clusterVerifier.verify()); + verifyP2PMessage(DB_NAME_1, _instances.get(1), MasterSlaveSMD.States.MASTER.name(), prevMasterInstance); + verifyP2PMessage(DB_NAME_2, _instances.get(1), MasterSlaveSMD.States.MASTER.name(), prevMasterInstance); + + //re-enable the old master + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true); + Assert.assertTrue(_clusterVerifier.verify()); + + verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(1)); + verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(1)); + } + + @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"}) + public void testP2PStateTransitionEnabledInResource() { + enableP2PInCluster(false); + enableP2PInResource(DB_NAME_1,true); + enableP2PInResource(DB_NAME_2,false); + + + // disable the master instance + String prevMasterInstance = _instances.get(0); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false); + + Assert.assertTrue(_clusterVerifier.verify()); + verifyP2PMessage(DB_NAME_1, _instances.get(1), MasterSlaveSMD.States.MASTER.name(), prevMasterInstance); + verifyP2PMessage(DB_NAME_2, _instances.get(1), MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName()); + + + //re-enable the old master + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true); + Assert.assertTrue(_clusterVerifier.verify()); + + verifyP2PMessage(DB_NAME_1, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _instances.get(1)); + verifyP2PMessage(DB_NAME_2, prevMasterInstance, MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName()); + } + + private void enableP2PInCluster(boolean enable) { + // enable p2p message in cluster. + if (enable) { + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.enableP2PMessage(true); + _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + } else { + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.getRecord().getSimpleFields().remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name()); + _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + } + } + + private void enableP2PInResource(String dbName, boolean enable) { + if (enable) { + ResourceConfig resourceConfig = new ResourceConfig.Builder(dbName).setP2PMessageEnabled(true).build(); + _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig); + } else { + // remove P2P Message in resource config + ResourceConfig resourceConfig = _configAccessor.getResourceConfig(CLUSTER_NAME, dbName); + if (resourceConfig != null) { + resourceConfig.getRecord().getSimpleFields().remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name()); + _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig); + } + } + } + + private void verifyP2PMessage(String dbName, String instance, String expectedState, String expectedTriggerHost) { + ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME); + dataCache.refresh(_accessor); + + Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances(); + LiveInstance liveInstance = liveInstanceMap.get(instance); + + Map<String, CurrentState> currentStateMap = dataCache.getCurrentState(instance, liveInstance.getSessionId()); + Assert.assertNotNull(currentStateMap); + CurrentState currentState = currentStateMap.get(dbName); + Assert.assertNotNull(currentState); + Assert.assertEquals(currentState.getPartitionStateMap().size(), PARTITION_NUMBER); + + for (String partition : currentState.getPartitionStateMap().keySet()) { + String state = currentState.getState(partition); + Assert.assertEquals(state, expectedState, + dbName + " Partition " + partition + "'s state is different as expected!"); + String triggerHost = currentState.getTriggerHost(partition); + Assert.assertEquals(triggerHost, expectedTriggerHost, + "Partition " + partition + "'s transition to Master was not triggered by expected host!"); + } + } +} + http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java index fb1c090..ceda6e5 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java +++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java @@ -45,8 +45,10 @@ public class TestNodeOfflineTimeStamp extends ZkStandAloneCMTestBase { Assert.assertTrue(Math.abs(shutdownTime - recordTime) <= 500L); _participants[0].reset(); + Thread.sleep(50); _participants[0].syncStart(); + Thread.sleep(50); history = getInstanceHistory(_participants[0].getInstanceName()); Assert.assertEquals(history.getLastOfflineTime(), ParticipantHistory.ONLINE); } http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java index 967175f..d886e44 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java @@ -109,17 +109,16 @@ public class TestCrushAutoRebalance extends ZkIntegrationTestBase { } @DataProvider(name = "rebalanceStrategies") - public static String [][] rebalanceStrategies() { + public static Object [][] rebalanceStrategies() { return new String[][] { {"CrushRebalanceStrategy", CrushRebalanceStrategy.class.getName()}, {"MultiRoundCrushRebalanceStrategy", MultiRoundCrushRebalanceStrategy.class.getName()} }; } - @Test(dataProvider = "rebalanceStrategies", enabled=true) + @Test(dataProvider = "rebalanceStrategies") public void testZoneIsolation(String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception { System.out.println("testZoneIsolation " + rebalanceStrategyName); - int i = 0; for (String stateModel : _testModels) { String db = "Test-DB-" + rebalanceStrategyName + "-" + i++; @@ -143,7 +142,7 @@ public class TestCrushAutoRebalance extends ZkIntegrationTestBase { } } - @Test(dataProvider = "rebalanceStrategies", enabled=true) + @Test(dataProvider = "rebalanceStrategies") public void testZoneIsolationWithInstanceTag( String rebalanceStrategyName, String rebalanceStrategyClass) throws Exception { Set<String> tags = new HashSet<String>(_nodeToTagMap.values()); http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java index 7b7ec2c..a2d3f1d 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java @@ -22,17 +22,13 @@ package org.apache.helix.manager.zk; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; - import java.util.Map; import org.apache.helix.PropertyPathBuilder; -import org.apache.helix.PropertyType; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.ZkUnitTestBase; -import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.AssertJUnit; http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java index fcca597..6e68d1a 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java @@ -28,10 +28,8 @@ import java.util.Map; import org.apache.helix.AccessOption; import org.apache.helix.integration.manager.MockParticipantManager; -import org.apache.helix.model.ConfigScope; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; -import org.apache.helix.model.builder.ConfigScopeBuilder; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; @@ -44,7 +42,6 @@ import org.apache.helix.ZNRecord; import org.apache.helix.ZkTestHelper; import org.apache.helix.ZkUnitTestBase; import org.apache.helix.manager.MockListener; -import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.LiveInstance; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.zookeeper.data.Stat;
