Add P2P (Participant-to-Participant) state-transition message support in Helix 
controller.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d0a3c0d1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d0a3c0d1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d0a3c0d1

Branch: refs/heads/master
Commit: d0a3c0d1ea3a467564ed5ffd20bcd6a8e5a56678
Parents: 5117022
Author: Lei Xia <[email protected]>
Authored: Mon Oct 9 14:52:19 2017 -0700
Committer: Junkai Xue <[email protected]>
Committed: Wed Jan 24 18:30:30 2018 -0800

----------------------------------------------------------------------
 .../helix/api/config/HelixConfigProperty.java   |  43 ++++
 .../rebalancer/DelayedAutoRebalancer.java       |   2 +-
 .../controller/stages/ClusterDataCache.java     |  62 ++++-
 .../controller/stages/CurrentStateOutput.java   |  16 +-
 .../stages/MessageGenerationPhase.java          |   3 +-
 .../stages/MessageSelectionStage.java           |  82 +++----
 .../stages/ResourceComputationStage.java        |   4 +-
 .../controller/stages/TaskAssignmentStage.java  |  10 +-
 .../messaging/handling/BatchMessageHandler.java |   1 +
 .../handling/HelixStateTransitionHandler.java   |  16 +-
 .../helix/messaging/handling/HelixTask.java     |  52 +++-
 .../messaging/handling/HelixTaskExecutor.java   |   9 +-
 .../messaging/handling/HelixTaskResult.java     |   9 +
 .../org/apache/helix/model/ClusterConfig.java   |  23 ++
 .../org/apache/helix/model/CurrentState.java    |  11 +-
 .../java/org/apache/helix/model/Message.java    | 184 +++++++++++++-
 .../java/org/apache/helix/model/Resource.java   |  40 +++-
 .../org/apache/helix/model/ResourceConfig.java  |  33 ++-
 .../helix/model/StateModelDefinition.java       |  30 ++-
 .../helix/task/FixedTargetTaskRebalancer.java   |   3 +-
 .../helix/controller/stages/BaseStageTest.java  |  43 +++-
 .../stages/TestMsgSelectionStage.java           |   8 +-
 .../stages/TestResourceComputationStage.java    |   1 -
 .../helix/integration/TestZkReconnect.java      |   2 +-
 .../common/ZkIntegrationTestBase.java           |  20 ++
 .../manager/TestZkCallbackHandlerLeak.java      |   6 +-
 .../messaging/TestP2PMessageSemiAuto.java       | 240 +++++++++++++++++++
 .../paticipant/TestNodeOfflineTimeStamp.java    |   2 +
 .../TestCrushAutoRebalance.java                 |   7 +-
 .../org/apache/helix/manager/zk/TestZKUtil.java |   4 -
 .../helix/manager/zk/TestZkClusterManager.java  |   3 -
 .../TestP2PStateTransitionMessages.java         | 176 ++++++++++++++
 .../apache/helix/mock/MockBaseDataAccessor.java |   6 +-
 .../org/apache/helix/mock/MockHelixAdmin.java   |  14 +-
 .../mbeans/TestDisableResourceMbean.java        |   9 +-
 .../mbeans/TestTopStateHandoffMetrics.java      |   4 +-
 36 files changed, 1065 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/api/config/HelixConfigProperty.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/api/config/HelixConfigProperty.java 
b/helix-core/src/main/java/org/apache/helix/api/config/HelixConfigProperty.java
new file mode 100644
index 0000000..eff63cb
--- /dev/null
+++ 
b/helix-core/src/main/java/org/apache/helix/api/config/HelixConfigProperty.java
@@ -0,0 +1,43 @@
+package org.apache.helix.api.config;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+
+/**
+ * This class defines all possible configuration options
+ * and its applicable config scopes (eg, Cluster/Resource/Instance/Partition 
config).
+ */
+public enum HelixConfigProperty {
+  P2P_MESSAGE_ENABLED(ConfigScopeProperty.CLUSTER, 
ConfigScopeProperty.RESOURCE);
+
+  Set<ConfigScopeProperty> _applicableScopes;
+
+  HelixConfigProperty(ConfigScopeProperty ...configScopeProperties) {
+    _applicableScopes = new HashSet<>(Arrays.asList(configScopeProperties));
+  }
+
+  public Set<ConfigScopeProperty> applicableScopes() {
+    return _applicableScopes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index a44aa11..adac235 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -310,7 +310,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer {
   private boolean isDelayRebalanceEnabled(IdealState idealState, ClusterConfig 
clusterConfig) {
     long delay = getRebalanceDelay(idealState, clusterConfig);
     return (delay > 0 && idealState.isDelayRebalanceEnabled() && clusterConfig
-        .isDelayRebalaceEnabled());
+        . isDelayRebalaceEnabled());
   }
 
   private ZNRecord getFinalDelayedMapping(IdealState idealState, ZNRecord 
newIdealMapping,

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 8999ed7..1dd862d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -188,16 +188,20 @@ public class ClusterDataCache {
         accessor.getChildValuesMap(keyBuilder.stateModelDefs());
     _stateModelDefMap = new ConcurrentHashMap<>(stateDefMap);
     _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
+    _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
 
     refreshMessages(accessor);
     refreshCurrentStates(accessor);
 
-    _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
+    // current state must be refreshed before refreshing relay messages
+    // because we need to use current state to validate all relay messages.
+    updateRelayMessages(_messageMap);
+
     if (_clusterConfig != null) {
       _idealStateRuleMap = _clusterConfig.getIdealStateRules();
     } else {
       _idealStateRuleMap = Maps.newHashMap();
-      LOG.error("Cluster config is null!");
+      LOG.warn("Cluster config is null!");
     }
 
     long endTime = System.currentTimeMillis();
@@ -275,6 +279,7 @@ public class ClusterDataCache {
         }
       }
     }
+
     _messageMap = Collections.unmodifiableMap(msgMap);
 
     if (LOG.isDebugEnabled()) {
@@ -284,6 +289,59 @@ public class ClusterDataCache {
     }
   }
 
+  // update all valid relay messages attached to existing state transition 
messages into message map.
+  private void updateRelayMessages(Map<String, Map<String, Message>> 
messageMap) {
+    List<Message> relayMessages = new ArrayList<>();
+    for (String instance : messageMap.keySet()) {
+      Map<String, Message> instanceMessages = messageMap.get(instance);
+      Map<String, Map<String, CurrentState>> instanceCurrentStateMap = 
_currentStateMap.get(instance);
+      if (instanceCurrentStateMap == null) {
+        continue;
+      }
+
+      for (Message message : instanceMessages.values()) {
+        if (message.hasRelayMessages()) {
+          String sessionId = message.getTgtSessionId();
+          String resourceName = message.getResourceName();
+          String partitionName = message.getPartitionName();
+          String targetState = message.getToState();
+          String instanceSessionId = 
_liveInstanceMap.get(instance).getSessionId();
+
+          if (!instanceSessionId.equals(sessionId)) {
+            continue;
+          }
+
+          Map<String, CurrentState> sessionCurrentStateMap = 
instanceCurrentStateMap.get(sessionId);
+          if (sessionCurrentStateMap == null) {
+            continue;
+          }
+          CurrentState currentState = sessionCurrentStateMap.get(resourceName);
+          if (currentState == null || 
!targetState.equals(currentState.getState(partitionName))) {
+            continue;
+          }
+          long transitionCompleteTime = currentState.getEndTime(partitionName);
+
+          for (Message msg : message.getRelayMessages().values()) {
+            msg.setRelayTime(transitionCompleteTime);
+            if (!message.isExpired()) {
+              relayMessages.add(msg);
+            }
+          }
+        }
+      }
+    }
+
+    for (Message message : relayMessages) {
+      String instance = message.getTgtName();
+      Map<String, Message> instanceMessages = messageMap.get(instance);
+      if (instanceMessages == null) {
+        instanceMessages = new HashMap<>();
+        messageMap.put(instance, instanceMessages);
+      }
+      instanceMessages.put(message.getId(), message);
+    }
+  }
+
   private void refreshCurrentStates(HelixDataAccessor accessor) {
     refreshCurrentStatesCache(accessor);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index 3821303..97420e5 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -54,13 +54,13 @@ public class CurrentStateOutput {
   private final Map<String, CurrentState> _curStateMetaMap;
 
   public CurrentStateOutput() {
-    _currentStateMap = new HashMap<String, Map<Partition, Map<String, 
String>>>();
-    _pendingStateMap = new HashMap<String, Map<Partition, Map<String, 
Message>>>();
-    _cancellationStateMap = new HashMap<String, Map<Partition, Map<String, 
Message>>>();
-    _resourceStateModelMap = new HashMap<String, String>();
-    _curStateMetaMap = new HashMap<String, CurrentState>();
-    _requestedStateMap = new HashMap<String, Map<Partition, Map<String, 
String>>>();
-    _infoMap = new HashMap<String, Map<Partition, Map<String, String>>>();
+    _currentStateMap = new HashMap<>();
+    _pendingStateMap = new HashMap<>();
+    _cancellationStateMap = new HashMap<>();
+    _resourceStateModelMap = new HashMap<>();
+    _curStateMetaMap = new HashMap<>();
+    _requestedStateMap = new HashMap<>();
+    _infoMap = new HashMap<>();
   }
 
   public void setResourceStateModelDef(String resourceName, String 
stateModelDefName) {
@@ -331,7 +331,7 @@ public class CurrentStateOutput {
 
   private Map<String, Integer> getPartitionCountWithState(String 
resourceStateModel, String state,
       Map<String, Map<Partition, Map<String, Object>>> stateMap) {
-    Map<String, Integer> currentPartitionCount = new HashMap<String, 
Integer>();
+    Map<String, Integer> currentPartitionCount = new HashMap<>();
     for (String resource : stateMap.keySet()) {
       String stateModel = _resourceStateModelMap.get(resource);
       if ((stateModel != null && stateModel.equals(resourceStateModel)) || 
(stateModel == null

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index ca0adfe..d60c20e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -187,7 +186,7 @@ public class MessageGenerationPhase extends 
AbstractBaseStage {
               message.setExecutionTimeout(timeout);
             }
 
-            message.getRecord().setSimpleField("ClusterEventName", 
event.getEventType().name());
+            message.setAttribute(Message.Attributes.ClusterEventName, 
event.getEventType().name());
             // output.addMessage(resourceName, partition, message);
             if (!messageMap.containsKey(desiredState)) {
               messageMap.put(desiredState, new ArrayList<Message>());

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index 94b645d..1a94dcb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -22,17 +22,20 @@ package org.apache.helix.controller.stages;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +43,7 @@ import org.slf4j.LoggerFactory;
 public class MessageSelectionStage extends AbstractBaseStage {
   private static final Logger LOG = 
LoggerFactory.getLogger(MessageSelectionStage.class);
 
-  public static class Bounds {
+  protected static class Bounds {
     private int upper;
     private int lower;
 
@@ -49,26 +52,6 @@ public class MessageSelectionStage extends AbstractBaseStage 
{
       this.upper = upper;
     }
 
-    public void increaseUpperBound() {
-      upper++;
-    }
-
-    public void increaseLowerBound() {
-      lower++;
-    }
-
-    public void decreaseUpperBound() {
-      upper--;
-    }
-
-    public void decreaseLowerBound() {
-      lower--;
-    }
-
-    public int getLowerBound() {
-      return lower;
-    }
-
     public int getUpperBound() {
       return upper;
     }
@@ -100,11 +83,11 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
           computeStateConstraints(stateModelDef, idealState, cache);
       for (Partition partition : resource.getPartitions()) {
         List<Message> messages = messageGenOutput.getMessages(resourceName, 
partition);
-        List<Message> selectedMessages =
-            selectMessages(cache.getLiveInstances(),
-                currentStateOutput.getCurrentStateMap(resourceName, partition),
-                currentStateOutput.getPendingMessageMap(resourceName, 
partition), messages,
-                stateConstraints, stateTransitionPriorities, 
stateModelDef.getInitialState());
+        List<Message> selectedMessages = 
selectMessages(cache.getLiveInstances(),
+            currentStateOutput.getCurrentStateMap(resourceName, partition),
+            currentStateOutput.getPendingMessageMap(resourceName, partition), 
messages,
+            stateConstraints, stateTransitionPriorities, stateModelDef,
+            resource.isP2PMessageEnabled());
         output.addMessages(resourceName, partition, selectedMessages);
       }
     }
@@ -124,34 +107,36 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
   }
 
   // TODO: This method deserves its own class. The class should not understand 
helix but
-  // just be
-  // able to solve the problem using the algo. I think the method is following 
that but if
-  // we don't move it to another class its quite easy to break that contract
+  // just be able to solve the problem using the algo. I think the method is 
following that
+  // but if we don't move it to another class its quite easy to break that 
contract
   /**
    * greedy message selection algorithm: 1) calculate CS+PS state 
lower/upper-bounds 2)
    * group messages by state transition and sorted by priority 3) from highest 
priority to
    * lowest, for each message group with the same transition add message one 
by one and
    * make sure state constraint is not violated update state 
lower/upper-bounds when a new
-   * message is selected
+   * message is selected.
+   *
+   * @param liveInstances
    * @param currentStates
-   * @param pendingStates
+   * @param pendingMessages
    * @param messages
    * @param stateConstraints
-   *          : STATE -> bound (lower:upper)
    * @param stateTransitionPriorities
-   *          : FROME_STATE-TO_STATE -> priority
-   * @return: selected messages
+   * @param stateModelDef
+   * @return
    */
   List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
       Map<String, String> currentStates, Map<String, Message> pendingMessages,
       List<Message> messages, Map<String, Bounds> stateConstraints,
-      final Map<String, Integer> stateTransitionPriorities, String 
initialState) {
+      final Map<String, Integer> stateTransitionPriorities, 
StateModelDefinition stateModelDef,
+      boolean p2pMessageEnabled) {
     if (messages == null || messages.isEmpty()) {
       return Collections.emptyList();
     }
-    List<Message> selectedMessages = new ArrayList<Message>();
-    Map<String, Integer> stateCnts = new HashMap<String, Integer>();
+    List<Message> selectedMessages = new ArrayList<>();
+    Map<String, Integer> stateCnts = new HashMap<>();
 
+    String initialState = stateModelDef.getInitialState();
     // count currentState, if no currentState, count as in initialState
     for (String instance : liveInstances.keySet()) {
       String state = initialState;
@@ -171,7 +156,10 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
 
     // group messages based on state transition priority
     Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
-        new TreeMap<Integer, List<Message>>();
+        new TreeMap<>();
+
+    /* record all state transition messages that transition a replica from 
top-state */
+    List<Message> fromTopStateMessages = new LinkedList<>();
     for (Message message : messages) {
       if 
(message.getMsgType().equals(Message.MessageType.STATE_TRANSITION_CANCELLATION.name()))
 {
         selectedMessages.add(message);
@@ -190,6 +178,10 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
         messagesGroupByStateTransitPriority.put(priority, new 
ArrayList<Message>());
       }
       messagesGroupByStateTransitPriority.get(priority).add(message);
+
+      if (fromState.equals(stateModelDef.getTopState())) {
+        fromTopStateMessages.add(message);
+      }
     }
 
     // select messages
@@ -200,8 +192,18 @@ public class MessageSelectionStage extends 
AbstractBaseStage {
         if (stateConstraints.containsKey(toState)) {
           int newCnt = (stateCnts.containsKey(toState) ? 
stateCnts.get(toState) + 1 : 1);
           if (newCnt > stateConstraints.get(toState).getUpperBound()) {
-            LOG.info("Reach upper_bound: " + 
stateConstraints.get(toState).getUpperBound()
-                + ", not send message: " + message);
+            if (p2pMessageEnabled && 
toState.equals(stateModelDef.getTopState())
+                && stateModelDef.isSingleTopStateModel()) {
+              // attach this message as a relay message to the message to 
transition off current top-state replica
+              if (fromTopStateMessages.size() > 0) {
+                Message fromTopStateMsg = fromTopStateMessages.get(0);
+                fromTopStateMsg.attachRelayMessage(message.getTgtName(), 
message);
+                fromTopStateMessages.remove(0);
+              }
+            } else {
+              // reach upper-bound of message for the topState, will not send 
the message
+              LOG.info("Reach upper_bound: " + 
stateConstraints.get(toState).getUpperBound() + ", not send message: " + 
message);
+            }
             continue;
           }
         }

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index c6b4054..a6c2779 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -56,7 +56,6 @@ public class ResourceComputationStage extends 
AbstractBaseStage {
     Map<String, Resource> resourceMap = new LinkedHashMap<String, Resource>();
     Map<String, Resource> resourceToRebalance = new LinkedHashMap<>();
 
-
     if (idealStates != null && idealStates.size() > 0) {
       for (IdealState idealState : idealStates.values()) {
         if (idealState == null) {
@@ -65,7 +64,8 @@ public class ResourceComputationStage extends 
AbstractBaseStage {
         Set<String> partitionSet = idealState.getPartitionSet();
         String resourceName = idealState.getResourceName();
         if (!resourceMap.containsKey(resourceName)) {
-          Resource resource = new Resource(resourceName);
+          Resource resource = new Resource(resourceName, 
cache.getClusterConfig(),
+              cache.getResourceConfig(resourceName));
           resourceMap.put(resourceName, resource);
 
           if (!idealState.isValid() && !cache.isTaskCache()

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index ee0f1e5..dfa747e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -144,7 +144,15 @@ public class TaskAssignmentStage extends AbstractBaseStage 
{
           "Sending Message " + message.getMsgId() + " to " + 
message.getTgtName() + " transit "
               + message.getResourceName() + "." + message.getPartitionName() + 
"|" + message
               .getPartitionNames() + " from:" + message.getFromState() + " 
to:" + message
-              .getToState() + ", Message type ");
+              .getToState() + ", relayMessages: " + 
message.getRelayMessages().size());
+      if (message.hasRelayMessages()) {
+        for (Message msg : message.getRelayMessages().values()) {
+          logger.info("Sending Relay Message " + msg.getMsgId() + " to " + 
msg.getTgtName() + " transit "
+              + msg.getResourceName() + "." + msg.getPartitionName() + "|" + 
msg.getPartitionNames() + " from:"
+              + msg.getFromState() + " to:" + msg.getToState() + ", relayFrom: 
" + msg.getRelaySrcHost()
+              + ", attached to message: " + message.getMsgId());
+        }
+      }
 
       keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
index 9fed082..3979a4b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/BatchMessageHandler.java
@@ -197,6 +197,7 @@ public class BatchMessageHandler extends MessageHandler {
         result.setSuccess(isBatchTaskSucceed);
       }
 
+      result.setCompleteTime(System.currentTimeMillis());
       // pass task-result to post-handle-msg
       _notificationContext.add(MapKey.HELIX_TASK_RESULT.toString(), result);
       postHandleMessage();

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 26df112..ece7ac7 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -94,6 +94,12 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
       throw new HelixException(errorMessage);
     }
 
+    logger.info(
+        "handling message: " + _message.getMsgId() + " transit " + 
_message.getResourceName()
+            + "." + _message.getPartitionName() + "|" + 
_message.getPartitionNames() + " from:"
+            + _message.getFromState() + " to:" + _message.getToState() + ", 
relayedFrom: "
+            + _message.getRelaySrcHost());
+
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
 
     String partitionName = _message.getPartitionName();
@@ -188,9 +194,16 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
 
     // Set the INFO property and mark the end time, previous state of the 
state transition
     _currentStateDelta.setInfo(partitionKey, taskResult.getInfo());
-    _currentStateDelta.setEndTime(partitionKey, System.currentTimeMillis());
+    _currentStateDelta.setEndTime(partitionKey, taskResult.getCompleteTime());
     _currentStateDelta.setPreviousState(partitionKey, _message.getFromState());
 
+    // add host name this state transition is triggered by.
+    if 
(Message.MessageType.RELAYED_MESSAGE.name().equals(_message.getMsgSubType())) {
+      _currentStateDelta.setTriggerHost(partitionKey, 
_message.getRelaySrcHost());
+    } else {
+      _currentStateDelta.setTriggerHost(partitionKey, _message.getMsgSrc());
+    }
+
     if (taskResult.isSuccess()) {
       // String fromState = message.getFromState();
       String toState = _message.getToState();
@@ -339,6 +352,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
         }
       }
 
+      taskResult.setCompleteTime(System.currentTimeMillis());
       // add task result to context for postHandling
       context.add(MapKey.HELIX_TASK_RESULT.toString(), taskResult);
       postHandleMessage();

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 3272ca4..7b1853f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -20,15 +20,19 @@ package org.apache.helix.messaging.handling;
  */
 
 import java.util.Date;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixRollbackException;
 import org.apache.helix.InstanceType;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.MapKey;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.messaging.handling.MessageHandler.ErrorCode;
 import org.apache.helix.messaging.handling.MessageHandler.ErrorType;
 import org.apache.helix.model.Message;
@@ -37,6 +41,7 @@ import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.monitoring.StateTransitionContext;
 import org.apache.helix.monitoring.StateTransitionDataPoint;
 import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
+import org.apache.helix.task.TaskResult;
 import org.apache.helix.util.StatusUpdateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -161,8 +166,12 @@ public class HelixTask implements MessageTask {
         }
       }
 
+      // forward relay messages attached to this message to other participants
+      if (taskResult.isSuccess()) {
+        forwardRelayMessages(accessor, _message, taskResult.getCompleteTime());
+      }
+
       if (_message.getAttribute(Attributes.PARENT_MSG_ID) == null) {
-        // System.err.println("\t[dbg]remove msg: " + getTaskId());
         removeMessageFromZk(accessor, _message);
         reportMessageStat(_manager, _message, taskResult);
         sendReply(accessor, _message, taskResult);
@@ -196,11 +205,46 @@ public class HelixTask implements MessageTask {
 
   private void removeMessageFromZk(HelixDataAccessor accessor, Message 
message) {
     Builder keyBuilder = accessor.keyBuilder();
+    PropertyKey msgKey;
     if (message.getTgtName().equalsIgnoreCase("controller")) {
-      // TODO: removeProperty returns boolean
-      
accessor.removeProperty(keyBuilder.controllerMessage(message.getMsgId()));
+      msgKey = keyBuilder.controllerMessage(message.getMsgId());
     } else {
-      accessor.removeProperty(keyBuilder.message(_manager.getInstanceName(), 
message.getMsgId()));
+      msgKey = keyBuilder.message(_manager.getInstanceName(), 
message.getMsgId());
+    }
+    boolean success = accessor.removeProperty(msgKey);
+    if (!success) {
+      logger.warn("Failed to delete message " + message.getId() + " from zk!");
+    }
+  }
+
+  private void forwardRelayMessages(HelixDataAccessor accessor, Message 
message,
+      long taskCompletionTime) {
+    if (message.hasRelayMessages()) {
+      Map<String, Message> relayMessages = message.getRelayMessages();
+      Builder keyBuilder = accessor.keyBuilder();
+
+      // Ignore all relay messages if participant's session has changed.
+      if (!_manager.getSessionId().equals(message.getTgtSessionId())) {
+        return;
+      }
+
+      for (String instance : relayMessages.keySet()) {
+        Message msg = relayMessages.get(instance);
+        if (msg.getMsgSubType().equals(MessageType.RELAYED_MESSAGE.name())) {
+          msg.setRelayTime(taskCompletionTime);
+          if (msg.isExpired()) {
+            logger.info(
+                "Relay message expired, ignore it! " + msg.getId() + " to 
instance " + instance);
+            continue;
+          }
+          PropertyKey msgKey = keyBuilder.message(instance, msg.getId());
+          boolean success = accessor.getBaseDataAccessor()
+              .create(msgKey.getPath(), msg.getRecord(), 
AccessOption.PERSISTENT);
+          if (!success) {
+            logger.warn("Failed to send relay message " + msg.getId() + " to " 
+ instance);
+          }
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index 4c67a5e..c0be583 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -790,8 +790,15 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
         continue;
       }
 
-      String tgtSessionId = message.getTgtSessionId();
+      if (message.isExpired()) {
+        LOG.info(
+            "Dropping expired message. mid: " + message.getId() + ", from: " + 
message.getMsgSrc() + " relayed from: "
+                + message.getRelaySrcHost());
+        reportAndRemoveMessage(message, accessor, instanceName, 
ProcessedMessageState.DISCARDED);
+        continue;
+      }
 
+      String tgtSessionId = message.getTgtSessionId();
       // sessionId mismatch normally means message comes from expired session, 
just remove it
       if (!sessionId.equals(tgtSessionId) && !tgtSessionId.equals("*")) {
         String warningMessage =

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
index 5ed6140..da96eaf 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskResult.java
@@ -31,6 +31,7 @@ public class HelixTaskResult {
   private Map<String, String> _taskResultMap = new HashMap<String, String>();
   private boolean _interrupted = false;
   Exception _exception = null;
+  private long _completeTime = -1;
 
   public boolean isSuccess() {
     return _success;
@@ -83,4 +84,12 @@ public class HelixTaskResult {
   public Exception getException() {
     return _exception;
   }
+
+  public long getCompleteTime() {
+    return _completeTime;
+  }
+
+  public void setCompleteTime(long completeTime) {
+    _completeTime = completeTime;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 2a97145..fad253d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
 
@@ -492,6 +493,28 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Whether the P2P state transition message is enabled for all resources in 
this cluster. By
+   * default it is disabled if not set.
+   *
+   * @return
+   */
+  public boolean isP2PMessageEnabled() {
+    return 
_record.getBooleanField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name(), false);
+  }
+
+  /**
+   * Enable P2P state transition message for all resources in this cluster. 
P2P State Transition
+   * message can reduce the top-state replica unavailable time during 
top-state handoff period. This
+   * only applies for those resources with state models that only have a 
single top-state replica,
+   * such as MasterSlave or LeaderStandy models. By default P2P message is 
disabled if not set.
+   *
+   * @param enabled
+   */
+  public void enableP2PMessage(boolean enabled) {
+    _record.setBooleanField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name(), 
enabled);
+  }
+
+  /**
    * Get IdealState rules defined in the cluster config.
    *
    * @return

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java 
b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
index 34b525e..c227060 100644
--- a/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/CurrentState.java
@@ -47,7 +47,8 @@ public class CurrentState extends HelixProperty {
     RESOURCE,
     START_TIME,
     END_TIME,
-    PREVIOUS_STATE // ,
+    PREVIOUS_STATE,
+    TRIGGERED_BY// ,
              // BUCKET_SIZE
   }
 
@@ -134,6 +135,10 @@ public class CurrentState extends HelixProperty {
     return endTime == null ? -1L : Long.parseLong(endTime);
   }
 
+  public String getTriggerHost(String partitionName) {
+    return getProperty(partitionName, CurrentStateProperty.TRIGGERED_BY);
+  }
+
   public String getPreviousState(String partitionName) {
     return getProperty(partitionName, CurrentStateProperty.PREVIOUS_STATE);
   }
@@ -187,6 +192,10 @@ public class CurrentState extends HelixProperty {
     setProperty(partitionName, CurrentStateProperty.END_TIME, 
String.valueOf(endTime));
   }
 
+  public void setTriggerHost(String partitionName, String triggerHost) {
+    setProperty(partitionName, CurrentStateProperty.TRIGGERED_BY, triggerHost);
+  }
+
   public void setPreviousState(String partitionName, String state) {
     setProperty(partitionName, CurrentStateProperty.PREVIOUS_STATE, state);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java 
b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 668242a..d987c54 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -19,12 +19,16 @@ package org.apache.helix.model;
  * under the License.
  */
 
+import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import org.apache.helix.HelixException;
@@ -50,7 +54,9 @@ public class Message extends HelixProperty {
     TASK_REPLY,
     NO_OP,
     PARTICIPANT_ERROR_REPORT,
-    PARTICIPANT_SESSION_CHANGE
+    PARTICIPANT_SESSION_CHANGE,
+    CHAINED_MESSAGE, // this is a message subtype
+    RELAYED_MESSAGE
   }
 
   /**
@@ -58,6 +64,7 @@ public class Message extends HelixProperty {
    */
   public enum Attributes {
     MSG_ID,
+    RELAY_MSG_ID,
     SRC_SESSION_ID,
     TGT_SESSION_ID,
     SRC_NAME,
@@ -84,7 +91,12 @@ public class Message extends HelixProperty {
     STATE_MODEL_FACTORY_NAME,
     BUCKET_SIZE,
     PARENT_MSG_ID, // used for group message mode
-    INNER_MESSAGE
+    ClusterEventName,
+    INNER_MESSAGE,
+    RELAY_PARTICIPANTS,
+    RELAY_TIME,
+    RELAY_FROM,
+    EXPIRY_PERIOD
   }
 
   /**
@@ -96,6 +108,9 @@ public class Message extends HelixProperty {
     UNPROCESSABLE // get exception when create handler
   }
 
+  // default expiry time period for a relay message.
+  public static final long RELAY_MESSAGE_DEFAULT_EXPIRY = 5 * 1000;  //5 second
+
   /**
    * Compares the creation time of two Messages
    */
@@ -665,6 +680,171 @@ public class Message extends HelixProperty {
   }
 
   /**
+   * Get the completion time of previous task associated with this message.
+   * This applies only when this is a relay message,
+   * which specified the completion time of the task running on the 
participant that sent this relay message.
+   *
+   * @return
+   */
+  public long getRelayTime() {
+    return _record.getLongField(Attributes.RELAY_TIME.name(), -1);
+  }
+
+  /**
+   * Set the completion time of previous task associated with this message.
+   * This applies only when this is a relay message,
+   * which specified the completion time of the task running on the 
participant that sent this relay message.
+   *
+   * @param completionTime
+   */
+  public void setRelayTime(long completionTime) {
+    _record.setLongField(Attributes.RELAY_TIME.name(), completionTime);
+  }
+
+  /**
+   * Attach a relayed message and its destination participant to this message.
+   *
+   * WARNNING: only content in SimpleFields of relayed message will be carried 
over and sent,
+   * all contents in either ListFields or MapFields will be ignored.
+   *
+   * @param instance destination participant name
+   * @param message relayed message.
+   */
+  public void attachRelayMessage(String instance, Message message) {
+    List<String> relayList = 
_record.getListField(Attributes.RELAY_PARTICIPANTS.name());
+    if (relayList == null) {
+      relayList = Collections.EMPTY_LIST;
+    }
+    Set<String> relayParticipants = new LinkedHashSet<>(relayList);
+    relayParticipants.add(instance);
+    Map<String, String> messageInfo = message.getRecord().getSimpleFields();
+    messageInfo.put(Attributes.RELAY_MSG_ID.name(), message.getId());
+    messageInfo.put(Attributes.MSG_SUBTYPE.name(), 
MessageType.RELAYED_MESSAGE.name());
+    messageInfo.put(Attributes.RELAY_FROM.name(), getTgtName());
+    messageInfo
+        .put(Attributes.EXPIRY_PERIOD.name(), 
String.valueOf(RELAY_MESSAGE_DEFAULT_EXPIRY));
+    _record.setMapField(instance, messageInfo);
+    _record.setListField(Attributes.RELAY_PARTICIPANTS.name(),
+        Lists.newArrayList(relayParticipants));
+  }
+
+  /**
+   * Get relay message attached for the given instance.
+   *
+   * @param instance
+   * @return null if no message for the instance
+   */
+  public Message getRelayMessage(String instance) {
+    Map<String, String> messageInfo = _record.getMapField(instance);
+    if (messageInfo != null) {
+      String id = messageInfo.get(Attributes.RELAY_MSG_ID.name());
+      if (id == null) {
+        id = messageInfo.get(Attributes.MSG_ID.name());
+        if (id == null) {
+          return null;
+        }
+      }
+      ZNRecord record = new ZNRecord(id);
+      record.setSimpleFields(messageInfo);
+      return new Message(record);
+    }
+
+    return null;
+  }
+
+  public String getRelaySrcHost() {
+    return _record.getSimpleField(Attributes.RELAY_FROM.name());
+  }
+
+  /**
+   * Get all relay messages attached to this message as a map 
(instance->message).
+   *
+   * @return map of instanceName->message, empty map if none.
+   */
+  public Map<String, Message> getRelayMessages() {
+    Map<String, Message> relayMessageMap = new HashMap<>();
+    List<String> relayParticipants = 
_record.getListField(Attributes.RELAY_PARTICIPANTS.name());
+    if (relayParticipants != null) {
+      for (String p : relayParticipants) {
+        Message msg = getRelayMessage(p);
+        if (p != null) {
+          relayMessageMap.put(p, msg);
+        }
+      }
+    }
+
+    return relayMessageMap;
+  }
+
+  /**
+   * Whether there are any relay message attached to this message.
+   *
+   * @return
+   */
+  public boolean hasRelayMessages() {
+    List<String> relayHosts = 
_record.getListField(Attributes.RELAY_PARTICIPANTS.name());
+    return (relayHosts != null && relayHosts.size() > 0);
+  }
+
+  /**
+   * Whether this message is a relay message.
+   * @return
+   */
+  public boolean isRelayMessage() {
+    String subType = _record.getStringField(Attributes.MSG_SUBTYPE.name(), 
null);
+    String relayFrom = _record.getStringField(Attributes.RELAY_FROM.name(), 
null);
+    return MessageType.RELAYED_MESSAGE.name().equals(subType) && (relayFrom != 
null);
+  }
+
+  /**
+   * Whether a message is expired.
+   *
+   * A message is expired if:
+   *   1) creationTime + expiryPeriod > current time
+   *   or
+   *   2) relayTime + expiryPeriod > current time iff it is relay message.
+   *
+   * @return
+   */
+  public boolean isExpired() {
+    long expiry = getExpiryPeriod();
+    if (expiry < 0) {
+      return false;
+    }
+
+    long current = System.currentTimeMillis();
+
+    // use relay time if this is a relay message
+    if (isRelayMessage()) {
+      long relayTime = getRelayTime();
+      return relayTime <= 0 || (relayTime + expiry < current);
+    }
+
+    return getCreateTimeStamp() + expiry < current;
+  }
+
+  /**
+   * Get the expiry period (in milliseconds)
+   *
+   * @return
+   */
+  public long getExpiryPeriod() {
+    return _record.getLongField(Attributes.EXPIRY_PERIOD.name(), -1);
+  }
+
+  /**
+   * Set expiry period for this message.
+   * A message will be expired after this period of time from either its 1) 
creationTime or 2)
+   * relayTime if it is relay message.
+   * Default is -1 if it is not set.
+   *
+   * @param expiry
+   */
+  public void setExpiryPeriod(long expiry) {
+    _record.setLongField(Attributes.EXPIRY_PERIOD.name(), expiry);
+  }
+
+  /**
    * Check if this message is targetted for a controller
    * @return true if this is a controller message, false otherwise
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/model/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Resource.java 
b/helix-core/src/main/java/org/apache/helix/model/Resource.java
index 3af830a..b911244 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Resource.java
@@ -24,16 +24,14 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.helix.HelixConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.helix.api.config.HelixConfigProperty;
 /**
  * A resource contains a set of partitions and its replicas are managed by a 
state model
  */
 public class Resource {
-  private static Logger LOG = LoggerFactory.getLogger(Resource.class);
-
   private final String _resourceName;
+  private ClusterConfig _clusterConfig;
+  private ResourceConfig _resourceConfig;
   private final Map<String, Partition> _partitionMap;
   private String _stateModelDefRef;
   private String _stateModelFactoryName;
@@ -48,7 +46,18 @@ public class Resource {
    */
   public Resource(String resourceName) {
     this._resourceName = resourceName;
-    this._partitionMap = new LinkedHashMap<String, Partition>();
+    this._partitionMap = new LinkedHashMap<>();
+  }
+
+  /**
+   * Instantiate a resource by its name
+   *
+   * @param resourceName the name of the resource that identifies it
+   */
+  public Resource(String resourceName, ClusterConfig clusterConfig, 
ResourceConfig resourceConfig) {
+    this(resourceName);
+    _clusterConfig = clusterConfig;
+    _resourceConfig = resourceConfig;
   }
 
   /**
@@ -185,6 +194,25 @@ public class Resource {
     _resourceGroupName = resourceGroupName;
   }
 
+  /**
+   * Whether P2P state transition message is enabled.
+   *
+   * @return
+   */
+  public boolean isP2PMessageEnabled() {
+    String enabledInResource = _resourceConfig != null ?
+        
_resourceConfig.getRecord().getSimpleField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name())
 : null;
+
+    if (enabledInResource != null) {
+      return Boolean.valueOf(enabledInResource);
+    }
+
+    String enabledInCluster = _clusterConfig != null ?
+        
_clusterConfig.getRecord().getSimpleField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name())
 : null;
+
+    return enabledInCluster != null ? Boolean.valueOf(enabledInCluster) : 
false;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
index 8278be3..274640c 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfig.java
@@ -20,13 +20,13 @@ package org.apache.helix.model;
  */
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import java.util.TreeMap;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.api.config.RebalanceConfig;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
 import org.slf4j.Logger;
@@ -61,7 +61,6 @@ public class ResourceConfig extends HelixProperty {
   }
 
   private static final Logger _logger = 
LoggerFactory.getLogger(ResourceConfig.class.getName());
-
   /**
    * Instantiate for a specific instance
    *
@@ -95,13 +94,18 @@ public class ResourceConfig extends HelixProperty {
       Boolean helixEnabled, String resourceGroupName, String resourceType,
       Boolean groupRoutingEnabled, Boolean externalViewDisabled,
       RebalanceConfig rebalanceConfig, StateTransitionTimeoutConfig 
stateTransitionTimeoutConfig,
-      Map<String, List<String>> listFields, Map<String, Map<String, String>> 
mapFields) {
+      Map<String, List<String>> listFields, Map<String, Map<String, String>> 
mapFields,
+      Boolean p2pMessageEnabled) {
     super(resourceId);
 
     if (monitorDisabled != null) {
       
_record.setBooleanField(ResourceConfigProperty.MONITORING_DISABLED.name(), 
monitorDisabled);
     }
 
+    if (p2pMessageEnabled != null) {
+      _record.setBooleanField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name(), 
p2pMessageEnabled);
+    }
+
     if (numPartitions > 0) {
       _record.setIntField(ResourceConfigProperty.NUM_PARTITIONS.name(), 
numPartitions);
     }
@@ -180,6 +184,15 @@ public class ResourceConfig extends HelixProperty {
     return 
_record.getBooleanField(ResourceConfigProperty.MONITORING_DISABLED.toString(), 
false);
   }
 
+  /**
+   * Whether the P2P state transition message is enabled for this resource.
+   * By default it is disabled if not set.
+   *
+   * @return
+   */
+  public boolean isP2PMessageEnabled() {
+    return 
_record.getBooleanField(HelixConfigProperty.P2P_MESSAGE_ENABLED.name(), false);
+  }
 
   /**
    * Get the associated resource
@@ -458,6 +471,7 @@ public class ResourceConfig extends HelixProperty {
     private String _resourceType;
     private Boolean _groupRoutingEnabled;
     private Boolean _externalViewDisabled;
+    private Boolean _p2pMessageEnabled;
     private RebalanceConfig _rebalanceConfig;
     private StateTransitionTimeoutConfig _stateTransitionTimeoutConfig;
     private Map<String, List<String>> _preferenceLists;
@@ -472,6 +486,17 @@ public class ResourceConfig extends HelixProperty {
       return this;
     }
 
+    /**
+     * Enable/Disable the p2p state transition message for this resource.
+     * By default it is disabled if not set.
+     *
+     * @param enabled
+     */
+    public Builder setP2PMessageEnabled(boolean enabled) {
+      _p2pMessageEnabled = enabled;
+      return this;
+    }
+
     public Boolean isMonitorDisabled() {
       return _monitorDisabled;
     }
@@ -693,7 +718,7 @@ public class ResourceConfig extends HelixProperty {
           _stateModelFactoryName, _numReplica, _minActiveReplica, 
_maxPartitionsPerInstance,
           _instanceGroupTag, _helixEnabled, _resourceGroupName, _resourceType, 
_groupRoutingEnabled,
           _externalViewDisabled, _rebalanceConfig, 
_stateTransitionTimeoutConfig, _preferenceLists,
-          _mapFields);
+          _mapFields, _p2pMessageEnabled);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java 
b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index 01a3746..2fbbfcb 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -95,8 +95,8 @@ public class StateModelDefinition extends HelixProperty {
         
record.getListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString());
     _stateTransitionPriorityList =
         
record.getListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString());
-    _stateTransitionTable = new HashMap<String, Map<String, String>>();
-    _statesCountMap = new HashMap<String, String>();
+    _stateTransitionTable = new HashMap<>();
+    _statesCountMap = new HashMap<>();
     if (_statesPriorityList != null) {
       int priority = 1;
       for (String state : _statesPriorityList) {
@@ -203,6 +203,22 @@ public class StateModelDefinition extends HelixProperty {
   }
 
   /**
+   * Whether this state model allows at most a single replica in the top-state?
+   *
+   * @return
+   */
+  public boolean isSingleTopStateModel() {
+    int topStateCount = 0;
+    try {
+      topStateCount = Integer.valueOf(_statesCountMap.get(getTopState()));
+    } catch (NumberFormatException ex) {
+
+    }
+
+    return topStateCount == 1;
+  }
+
+  /**
    * Get the second top states, which need one step transition to top state
    * @return a set of second top states
    */
@@ -244,9 +260,9 @@ public class StateModelDefinition extends HelixProperty {
      */
     public Builder(String name) {
       this._statemodelName = name;
-      statesMap = new HashMap<String, Integer>();
-      transitionMap = new HashMap<Transition, Integer>();
-      stateConstraintMap = new HashMap<String, String>();
+      statesMap = new HashMap<>();
+      transitionMap = new HashMap<>();
+      stateConstraintMap = new HashMap<>();
     }
 
     /**
@@ -368,7 +384,7 @@ public class StateModelDefinition extends HelixProperty {
         }
       };
       Collections.sort(transitionList, c2);
-      List<String> transitionPriorityList = new 
ArrayList<String>(transitionList.size());
+      List<String> transitionPriorityList = new 
ArrayList<>(transitionList.size());
       for (Transition t : transitionList) {
         transitionPriorityList.add(t.toString());
       }
@@ -383,7 +399,7 @@ public class StateModelDefinition extends HelixProperty {
       StateTransitionTableBuilder stateTransitionTableBuilder = new 
StateTransitionTableBuilder();
       Map<String, Map<String, String>> transitionTable =
           stateTransitionTableBuilder.buildTransitionTable(statePriorityList,
-              new ArrayList<Transition>(transitionMap.keySet()));
+              new ArrayList<>(transitionMap.keySet()));
       for (String state : transitionTable.keySet()) {
         record.setMapField(state + ".next", transitionTable.get(state));
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
index 1589c1a..2af4151 100644
--- 
a/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/task/FixedTargetTaskRebalancer.java
@@ -36,7 +36,8 @@ import org.apache.helix.model.ResourceAssignment;
 /**
  * This rebalancer is deprecated, left here only for back-compatible. *
  */
-@Deprecated public class FixedTargetTaskRebalancer extends 
DeprecatedTaskRebalancer {
+@Deprecated
+public class FixedTargetTaskRebalancer extends DeprecatedTaskRebalancer {
   private FixedTargetTaskAssignmentCalculator taskAssignmentCalculator =
       new FixedTargetTaskAssignmentCalculator();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 1aa82f4..e2d09ba 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -43,9 +44,13 @@ import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.StateModelConfigGenerator;
+import org.codehaus.jackson.annotate.JsonAnySetter;
+import org.testng.ITestContext;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 
@@ -73,7 +78,6 @@ public class BaseStageTest {
         + new Date(System.currentTimeMillis()));
   }
 
-  @BeforeMethod()
   public void setup() {
     _clusterName = "testCluster-" + UUID.randomUUID().toString();
     manager = new MockManager(_clusterName);
@@ -85,6 +89,21 @@ public class BaseStageTest {
     admin.addCluster(_clusterName);
   }
 
+  @BeforeMethod
+  public void beforeTest(Method testMethod, ITestContext testContext){
+    long startTime = System.currentTimeMillis();
+    System.out.println("START " + testMethod.getName() + " at " + new 
Date(startTime));
+    testContext.setAttribute("StartTime", System.currentTimeMillis());
+    setup();
+  }
+
+  @AfterMethod
+  public void endTest(Method testMethod, ITestContext testContext) {
+    Long startTime = (Long) testContext.getAttribute("StartTime");
+    long endTime = System.currentTimeMillis();
+    System.out.println("END " + testMethod.getName() + " at " + new 
Date(endTime) + ", took: " + (endTime - startTime) + "ms.");
+  }
+
   protected List<IdealState> setupIdealState(int nodes, String[] resources, 
int partitions,
       int replicas, RebalanceMode rebalanceMode, String stateModelName, String 
rebalanceClassName,
       String rebalanceStrategyName) {
@@ -137,14 +156,18 @@ public class BaseStageTest {
       stateModelName, rebalanceClassName, null);
   }
 
-  protected void setupLiveInstances(int numLiveInstances) {
+  protected List<String> setupLiveInstances(int numLiveInstances) {
+    List<String> instances = new ArrayList<>();
     for (int i = 0; i < numLiveInstances; i++) {
       LiveInstance liveInstance = new LiveInstance(HOSTNAME_PREFIX + i);
       liveInstance.setSessionId(SESSION_PREFIX + i);
 
       Builder keyBuilder = accessor.keyBuilder();
       accessor.setProperty(keyBuilder.liveInstance(HOSTNAME_PREFIX + i), 
liveInstance);
+      instances.add(liveInstance.getInstanceName());
     }
+
+    return instances;
   }
 
   protected void setupInstances(int numInstances) {
@@ -234,4 +257,20 @@ public class BaseStageTest {
 
     return resourceMap;
   }
+
+  protected Map<String, Resource> getResourceMap(String[] resources, int 
partitions,
+      String stateModel, ClusterConfig clusterConfig, ResourceConfig 
resourceConfig) {
+    Map<String, Resource> resourceMap = new HashMap<String, Resource>();
+
+    for (String r : resources) {
+      Resource testResource = new Resource(r, clusterConfig, resourceConfig);
+      testResource.setStateModelDefRef(stateModel);
+      for (int i = 0; i < partitions; i++) {
+        testResource.addPartition(r + "_" + i);
+      }
+      resourceMap.put(r, testResource);
+    }
+
+    return resourceMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
index 994e2fa..4e20116 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -27,8 +27,8 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.apache.helix.TestHelper;
-import org.apache.helix.controller.stages.MessageSelectionStage;
 import org.apache.helix.controller.stages.MessageSelectionStage.Bounds;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
@@ -86,7 +86,8 @@ public class TestMsgSelectionStage {
 
     List<Message> selectedMsg =
         new MessageSelectionStage().selectMessages(liveInstances, 
currentStates, pendingMessages,
-            messages, stateConstraints, stateTransitionPriorities, "OFFLINE");
+            messages, stateConstraints, stateTransitionPriorities,
+            
BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(), false);
 
     Assert.assertEquals(selectedMsg.size(), 1);
     Assert.assertEquals(selectedMsg.get(0).getMsgId(), "msgId_1");
@@ -123,7 +124,8 @@ public class TestMsgSelectionStage {
 
     List<Message> selectedMsg =
         new MessageSelectionStage().selectMessages(liveInstances, 
currentStates, pendingMessages,
-            messages, stateConstraints, stateTransitionPriorities, "OFFLINE");
+            messages, stateConstraints, stateTransitionPriorities,
+            
BuiltInStateModelDefinitions.MasterSlave.getStateModelDefinition(), false);
 
     Assert.assertEquals(selectedMsg.size(), 0);
     System.out.println("END testMasterXferAfterMasterResume at "

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
index ae7f2f5..47de2d3 100644
--- 
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
+++ 
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestResourceComputationStage.java
@@ -182,7 +182,6 @@ public class TestResourceComputationStage extends 
BaseStageTest {
 
   @Test
   public void testNull() {
-    ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
     ResourceComputationStage stage = new ResourceComputationStage();
     StageContext context = new StageContext();
     stage.init(context);

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java 
b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
index 21878ac..85ecb0c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkReconnect.java
@@ -49,7 +49,7 @@ import org.testng.annotations.Test;
 public class TestZkReconnect {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestZkReconnect.class);
 
-  @Test
+  @Test (enabled = false)
   public void testZKReconnect() throws Exception {
     final AtomicReference<ZkServer> zkServerRef = new 
AtomicReference<ZkServer>();
     final int zkPort = TestHelper.getRandomPort();

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
 
b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
index ba2196c..4e49502 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
@@ -19,6 +19,8 @@ package org.apache.helix.integration.common;
  * under the License.
  */
 
+import java.lang.reflect.Method;
+import java.util.Date;
 import java.util.Map;
 import java.util.Set;
 import java.util.logging.Level;
@@ -54,7 +56,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.AssertJUnit;
+import org.testng.ITestContext;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.BeforeSuite;
 
 public class ZkIntegrationTestBase {
@@ -95,6 +100,21 @@ public class ZkIntegrationTestBase {
     TestHelper.stopZkServer(_zkServer);
   }
 
+  @BeforeMethod
+  public void beforeTest(Method testMethod, ITestContext testContext){
+    long startTime = System.currentTimeMillis();
+    System.out.println("START " + testMethod.getName() + " at " + new 
Date(startTime));
+    testContext.setAttribute("StartTime", System.currentTimeMillis());
+  }
+
+  @AfterMethod
+  public void endTest(Method testMethod, ITestContext testContext) {
+    Long startTime = (Long) testContext.getAttribute("StartTime");
+    long endTime = System.currentTimeMillis();
+    System.out.println(
+        "END " + testMethod.getName() + " at " + new Date(endTime) + ", took: 
" + (endTime - startTime) + "ms.");
+  }
+
   protected String getShortClassName() {
     return this.getClass().getSimpleName();
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
index ec2dd1b..1448453 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
@@ -93,7 +93,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase 
{
 
         // controller should have 5 + 2n + m + (m+2)n zk-watchers
         // where n is number of nodes and m is number of resources
-        return watchPaths.size() == (7 + 5 * n);
+        return watchPaths.size() == (8 + 5 * n);
       }
     }, 500);
     Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers.");
@@ -119,7 +119,7 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
     // printHandlers(participantManagerToExpire);
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManagerToExpire.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, (6 + 2 * n),
+    Assert.assertEquals(controllerHandlerNb, (7 + 2 * n),
         "HelixController should have 9 (5+2n) callback handlers for 2 (n) 
participant");
     Assert.assertEquals(particHandlerNb, 1,
         "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback 
handlers");
@@ -149,7 +149,7 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
         // controller should have 5 + 2n + m + (m+2)n zk-watchers
         // where n is number of nodes and m is number of resources
-        return watchPaths.size() == (7 + 5 * n);
+        return watchPaths.size() == (8 + 5 * n);
       }
     }, 500);
     Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers 
after session expiry.");

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
new file mode 100644
index 0000000..e0f83e1
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java
@@ -0,0 +1,240 @@
+package org.apache.helix.integration.messaging;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.api.config.HelixConfigProperty;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.ResourceConfig;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestP2PMessageSemiAuto extends ZkIntegrationTestBase {
+  final String CLASS_NAME = getShortClassName();
+  final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+
+  static final int PARTICIPANT_NUMBER = 3;
+  static final int PARTICIPANT_START_PORT = 12918;
+
+  static final String DB_NAME_1 = "TestDB_1";
+  static final String DB_NAME_2 = "TestDB_2";
+
+  static final int PARTITION_NUMBER = 20;
+  static final int REPLICA_NUMBER = 3;
+
+  List<MockParticipantManager> _participants = new 
ArrayList<MockParticipantManager>();
+  List<String> _instances = new ArrayList<>();
+  ClusterControllerManager _controller;
+
+  HelixClusterVerifier _clusterVerifier;
+  ConfigAccessor _configAccessor;
+  HelixDataAccessor _accessor;
+
+  @BeforeClass
+  public void beforeClass()
+      throws InterruptedException {
+    System.out.println(
+        "START " + getShortClassName() + " at " + new 
Date(System.currentTimeMillis()));
+
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + 
i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+      _instances.add(instance);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i));
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    createDBInSemiAuto(DB_NAME_1, _instances);
+    createDBInSemiAuto(DB_NAME_2, _instances);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    _clusterVerifier = new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    _configAccessor = new ConfigAccessor(_gZkClient);
+    _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor);
+  }
+
+  private void createDBInSemiAuto(String dbName, List<String> preferenceList) {
+    _gSetupTool.addResourceToCluster(CLUSTER_NAME, dbName, PARTITION_NUMBER,
+        BuiltInStateModelDefinitions.MasterSlave.name(), 
IdealState.RebalanceMode.SEMI_AUTO.toString());
+    _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, REPLICA_NUMBER);
+
+    IdealState is = 
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
dbName);
+    for (String p : is.getPartitionSet()) {
+      is.setPreferenceList(p, preferenceList);
+    }
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
dbName, is);
+  }
+
+  @Test
+  public void testP2PStateTransitionDisabled() {
+    // disable the master instance
+    String prevMasterInstance = _instances.get(0);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
prevMasterInstance, false);
+
+    Assert.assertTrue(_clusterVerifier.verify());
+    verifyP2PMessage(DB_NAME_1,_instances.get(1), 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+    verifyP2PMessage(DB_NAME_2,_instances.get(1), 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+
+
+    //re-enable the old master
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
prevMasterInstance, true);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    verifyP2PMessage(DB_NAME_1, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+    verifyP2PMessage(DB_NAME_2, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+  }
+
+  @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
+  public void testP2PStateTransitionEnabledInCluster() {
+    enableP2PInCluster(true);
+    enableP2PInResource(DB_NAME_1,false);
+    enableP2PInResource(DB_NAME_2,false);
+
+    // disable the master instance
+    String prevMasterInstance = _instances.get(0);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
prevMasterInstance, false);
+
+    Assert.assertTrue(_clusterVerifier.verify());
+    verifyP2PMessage(DB_NAME_1, _instances.get(1), 
MasterSlaveSMD.States.MASTER.name(), prevMasterInstance);
+    verifyP2PMessage(DB_NAME_2, _instances.get(1), 
MasterSlaveSMD.States.MASTER.name(), prevMasterInstance);
+
+    //re-enable the old master
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
prevMasterInstance, true);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    verifyP2PMessage(DB_NAME_1, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _instances.get(1));
+    verifyP2PMessage(DB_NAME_2, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _instances.get(1));
+  }
+
+  @Test (dependsOnMethods = {"testP2PStateTransitionDisabled"})
+  public void testP2PStateTransitionEnabledInResource() {
+    enableP2PInCluster(false);
+    enableP2PInResource(DB_NAME_1,true);
+    enableP2PInResource(DB_NAME_2,false);
+
+
+    // disable the master instance
+    String prevMasterInstance = _instances.get(0);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
prevMasterInstance, false);
+
+    Assert.assertTrue(_clusterVerifier.verify());
+    verifyP2PMessage(DB_NAME_1, _instances.get(1), 
MasterSlaveSMD.States.MASTER.name(), prevMasterInstance);
+    verifyP2PMessage(DB_NAME_2, _instances.get(1), 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+
+
+    //re-enable the old master
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, 
prevMasterInstance, true);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    verifyP2PMessage(DB_NAME_1, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _instances.get(1));
+    verifyP2PMessage(DB_NAME_2, prevMasterInstance, 
MasterSlaveSMD.States.MASTER.name(), _controller.getInstanceName());
+  }
+
+  private void enableP2PInCluster(boolean enable) {
+    // enable p2p message in cluster.
+    if (enable) {
+      ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(CLUSTER_NAME);
+      clusterConfig.enableP2PMessage(true);
+      _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    } else {
+      ClusterConfig clusterConfig = 
_configAccessor.getClusterConfig(CLUSTER_NAME);
+      
clusterConfig.getRecord().getSimpleFields().remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
+      _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    }
+  }
+
+  private void enableP2PInResource(String dbName, boolean enable) {
+    if (enable) {
+      ResourceConfig resourceConfig = new 
ResourceConfig.Builder(dbName).setP2PMessageEnabled(true).build();
+      _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, resourceConfig);
+    } else {
+      // remove P2P Message in resource config
+      ResourceConfig resourceConfig = 
_configAccessor.getResourceConfig(CLUSTER_NAME, dbName);
+      if (resourceConfig != null) {
+        
resourceConfig.getRecord().getSimpleFields().remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
+        _configAccessor.setResourceConfig(CLUSTER_NAME, dbName, 
resourceConfig);
+      }
+    }
+  }
+
+  private void verifyP2PMessage(String dbName, String instance, String 
expectedState, String expectedTriggerHost) {
+    ClusterDataCache dataCache = new ClusterDataCache(CLUSTER_NAME);
+    dataCache.refresh(_accessor);
+
+    Map<String, LiveInstance> liveInstanceMap = dataCache.getLiveInstances();
+    LiveInstance liveInstance = liveInstanceMap.get(instance);
+
+    Map<String, CurrentState> currentStateMap = 
dataCache.getCurrentState(instance, liveInstance.getSessionId());
+    Assert.assertNotNull(currentStateMap);
+    CurrentState currentState = currentStateMap.get(dbName);
+    Assert.assertNotNull(currentState);
+    Assert.assertEquals(currentState.getPartitionStateMap().size(), 
PARTITION_NUMBER);
+
+    for (String partition : currentState.getPartitionStateMap().keySet()) {
+      String state = currentState.getState(partition);
+      Assert.assertEquals(state, expectedState,
+          dbName + " Partition " + partition + "'s state is different as 
expected!");
+      String triggerHost = currentState.getTriggerHost(partition);
+      Assert.assertEquals(triggerHost, expectedTriggerHost,
+          "Partition " + partition + "'s transition to Master was not 
triggered by expected host!");
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java
 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java
index fb1c090..ceda6e5 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestNodeOfflineTimeStamp.java
@@ -45,8 +45,10 @@ public class TestNodeOfflineTimeStamp extends 
ZkStandAloneCMTestBase {
     Assert.assertTrue(Math.abs(shutdownTime - recordTime) <= 500L);
 
     _participants[0].reset();
+    Thread.sleep(50);
     _participants[0].syncStart();
 
+    Thread.sleep(50);
     history = getInstanceHistory(_participants[0].getInstanceName());
     Assert.assertEquals(history.getLastOfflineTime(), 
ParticipantHistory.ONLINE);
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
index 967175f..d886e44 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/CrushRebalancers/TestCrushAutoRebalance.java
@@ -109,17 +109,16 @@ public class TestCrushAutoRebalance extends 
ZkIntegrationTestBase {
   }
 
   @DataProvider(name = "rebalanceStrategies")
-  public static String [][] rebalanceStrategies() {
+  public static Object [][] rebalanceStrategies() {
     return new String[][] { {"CrushRebalanceStrategy", 
CrushRebalanceStrategy.class.getName()},
         {"MultiRoundCrushRebalanceStrategy", 
MultiRoundCrushRebalanceStrategy.class.getName()}
     };
   }
 
-  @Test(dataProvider = "rebalanceStrategies", enabled=true)
+  @Test(dataProvider = "rebalanceStrategies")
   public void testZoneIsolation(String rebalanceStrategyName, String 
rebalanceStrategyClass)
       throws Exception {
     System.out.println("testZoneIsolation " + rebalanceStrategyName);
-
     int i = 0;
     for (String stateModel : _testModels) {
       String db = "Test-DB-" + rebalanceStrategyName + "-" + i++;
@@ -143,7 +142,7 @@ public class TestCrushAutoRebalance extends 
ZkIntegrationTestBase {
     }
   }
 
-  @Test(dataProvider = "rebalanceStrategies", enabled=true)
+  @Test(dataProvider = "rebalanceStrategies")
   public void testZoneIsolationWithInstanceTag(
       String rebalanceStrategyName, String rebalanceStrategyClass) throws 
Exception {
     Set<String> tags = new HashSet<String>(_nodeToTagMap.values());

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
index 7b7ec2c..a2d3f1d 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZKUtil.java
@@ -22,17 +22,13 @@ package org.apache.helix.manager.zk;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-
 import java.util.Map;
 import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.PropertyType;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
-import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.AssertJUnit;

http://git-wip-us.apache.org/repos/asf/helix/blob/d0a3c0d1/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index fcca597..6e68d1a 100644
--- 
a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -28,10 +28,8 @@ import java.util.Map;
 
 import org.apache.helix.AccessOption;
 import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
-import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
@@ -44,7 +42,6 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkTestHelper;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.manager.MockListener;
-import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.zookeeper.data.Stat;

Reply via email to