Updated Branches:
  refs/heads/master 2bf12e92c -> 11df95e81

HELIX-197: fix state model leak, rb=13437


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/11df95e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/11df95e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/11df95e8

Branch: refs/heads/master
Commit: 11df95e810a0102584d4ba690dcbe241a8a00b1b
Parents: 2bf12e9
Author: zzhang <[email protected]>
Authored: Fri Aug 9 13:29:34 2013 -0700
Committer: zzhang <[email protected]>
Committed: Fri Aug 9 13:29:34 2013 -0700

----------------------------------------------------------------------
 .../handling/HelixStateTransitionHandler.java   |  33 +--
 .../participant/HelixStateMachineEngine.java    |  23 +-
 .../statemachine/ScheduledTaskStateModel.java   |  19 +-
 .../statemachine/StateModelFactory.java         |  67 +++---
 .../org/apache/helix/TestHelixTaskExecutor.java |  47 ++--
 .../org/apache/helix/TestHelixTaskHandler.java  |  47 ++--
 .../integration/manager/TestStateModelLeak.java | 225 +++++++++++++++++++
 .../handling/TestHelixTaskExecutor.java         |  20 +-
 .../mock/participant/MockMSModelFactory.java    |   4 +-
 9 files changed, 363 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index 2baa1cf..752437f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -44,6 +44,7 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.Attributes;
 import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelParser;
 import org.apache.helix.participant.statemachine.StateTransitionError;
 import org.apache.helix.util.StatusUpdateUtil;
@@ -66,9 +67,11 @@ public class HelixStateTransitionHandler extends 
MessageHandler
   private final StateModelParser _transitionMethodFinder;
   private final CurrentState     _currentStateDelta;
   private final HelixManager     _manager;
+  private final StateModelFactory<? extends StateModel> _stateModelFactory;
   volatile boolean               _isTimeout = false;
 
-  public HelixStateTransitionHandler(StateModel stateModel,
+  public HelixStateTransitionHandler(StateModelFactory<? extends StateModel> 
stateModelFactory,
+                                     StateModel stateModel,
                                      Message message,
                                      NotificationContext context,
                                      CurrentState currentStateDelta)
@@ -78,7 +81,8 @@ public class HelixStateTransitionHandler extends 
MessageHandler
     _statusUpdateUtil = new StatusUpdateUtil();
     _transitionMethodFinder = new StateModelParser();
     _currentStateDelta = currentStateDelta;
-    _manager = _notificationContext.getManager();;
+    _manager = _notificationContext.getManager();
+    _stateModelFactory = stateModelFactory;
   }
 
   void preHandleMessage() throws Exception
@@ -97,7 +101,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler
       logger.error(errorMessage);
       throw new HelixException(errorMessage);
     }
-    
+
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
 
     String partitionName = _message.getPartitionName();
@@ -127,7 +131,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler
   {
        HelixTaskResult taskResult = (HelixTaskResult) 
_notificationContext.get(MapKey.HELIX_TASK_RESULT.toString());
        Exception exception = taskResult.getException();
-               
+
     String partitionKey = _message.getPartitionName();
     String resource = _message.getResourceName();
     String sessionId = _message.getTgtSessionId();
@@ -140,7 +144,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler
     ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize);
 
     // No need to sync on manager, we are cancel executor in expiry session 
before start executor in new session
-    // sessionId might change when we update the state model state. 
+    // sessionId might change when we update the state model state.
     // for zk current state it is OK as we have the per-session current state 
node
     if (!_message.getTgtSessionId().equals(_manager.getSessionId()))
     {
@@ -169,6 +173,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler
         List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>();
         deltaList.add(delta);
         _currentStateDelta.setDeltaList(deltaList);
+        _stateModelFactory.removeStateModel(partitionKey);
       }
       else
       {
@@ -213,14 +218,14 @@ public class HelixStateTransitionHandler extends 
MessageHandler
         _stateModel.rollbackOnError(_message, _notificationContext, error);
         _currentStateDelta.setState(partitionKey, 
HelixDefinedState.ERROR.toString());
         _stateModel.updateState(HelixDefinedState.ERROR.toString());
-        
+
         // if we have errors transit from ERROR state, disable the partition
         if 
(_message.getFromState().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
           disablePartition();
         }
       }
     }
-    
+
     try
     {
       // Update the ZK current state of the node
@@ -236,7 +241,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler
       else
       {
         // sub-message of a batch message
-        ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap 
+        ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap
           = (ConcurrentHashMap<String, CurrentStateUpdate>) 
_notificationContext.get(MapKey.CURRENT_STATE_UPDATE.toString());
         csUpdateMap.put(partitionKey, new CurrentStateUpdate(key, 
_currentStateDelta));
       }
@@ -266,13 +271,13 @@ public class HelixStateTransitionHandler extends 
MessageHandler
           + " for partition: " + partitionName + ". disable it on " + 
instanceName);
 
   }
-  
+
   @Override
   public HelixTaskResult handleMessage()
   {
        NotificationContext context = _notificationContext;
        Message message = _message;
-               
+
     synchronized (_stateModel)
     {
       HelixTaskResult taskResult = new HelixTaskResult();
@@ -318,7 +323,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler
         taskResult.setException(e);
         taskResult.setInterrupted(e instanceof InterruptedException);
       }
-      
+
       // add task result to context for postHandling
       context.add(MapKey.HELIX_TASK_RESULT.toString(), taskResult);
       postHandleMessage();
@@ -371,13 +376,13 @@ public class HelixStateTransitionHandler extends 
MessageHandler
 
   @Override
   public void onError(Exception e, ErrorCode code, ErrorType type)
-  {    
+  {
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     Builder keyBuilder = accessor.keyBuilder();
     String instanceName = _manager.getInstanceName();
     String resourceName = _message.getResourceName();
     String partition = _message.getPartitionName();
-    
+
     // All internal error has been processed already, so we can skip them
     if (type == ErrorType.INTERNAL)
     {
@@ -392,7 +397,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler
         CurrentState currentStateDelta = new CurrentState(resourceName);
         currentStateDelta.setState(partition, 
HelixDefinedState.ERROR.toString());
         _stateModel.updateState(HelixDefinedState.ERROR.toString());
-  
+
         // if transit from ERROR state, disable the partition
         if 
(_message.getFromState().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) {
           disablePartition();

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
 
b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index 8287b46..c6dc8ca 100644
--- 
a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ 
b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -167,15 +167,9 @@ public class HelixStateMachineEngine implements 
StateMachineEngine
     {
       for (StateModelFactory<? extends StateModel> stateModelFactory : 
ftyMap.values())
       {
-        Map<String, ? extends StateModel> modelMap = 
stateModelFactory.getStateModelMap();
-        if (modelMap == null || modelMap.isEmpty())
+        for (String resourceKey : stateModelFactory.getPartitionSet())
         {
-          continue;
-        }
-
-        for (String resourceKey : modelMap.keySet())
-        {
-          StateModel stateModel = modelMap.get(resourceKey);
+          StateModel stateModel = stateModelFactory.getStateModel(resourceKey);
           stateModel.reset();
           String initialState = 
_stateModelParser.getInitialState(stateModel.getClass());
           stateModel.updateState(initialState);
@@ -193,7 +187,7 @@ public class HelixStateMachineEngine implements 
StateMachineEngine
 
     if (!type.equals(MessageType.STATE_TRANSITION.toString()))
     {
-      throw new HelixException("Expect state-transition message type, but was 
" 
+      throw new HelixException("Expect state-transition message type, but was "
                  + message.getMsgType() + ", msgId: " + message.getMsgId());
     }
 
@@ -259,28 +253,29 @@ public class HelixStateMachineEngine implements 
StateMachineEngine
         currentStateDelta.setState(partitionKey, (stateModel.getCurrentState() 
== null)
             ? initState : stateModel.getCurrentState());
 
-        return new HelixStateTransitionHandler(stateModel,
+        return new HelixStateTransitionHandler(stateModelFactory,
+                                               stateModel,
                                                message,
                                                context,
                                                currentStateDelta);
     } else
-    {          
+    {
       BatchMessageWrapper wrapper = 
stateModelFactory.getBatchMessageWrapper(resourceName);
       if (wrapper == null)
       {
         wrapper = 
stateModelFactory.createAndAddBatchMessageWrapper(resourceName);
       }
-      
+
        // get executor-service for the message
        TaskExecutor executor = (TaskExecutor) 
context.get(MapKey.TASK_EXECUTOR.toString());
        if (executor == null)
        {
-               logger.error("fail to get executor-service for batch message: " 
+ message.getId() 
+               logger.error("fail to get executor-service for batch message: " 
+ message.getId()
                                + ". msgType: " + message.getMsgType() + ", 
resource: " + message.getResourceName());
                return null;
        }
        return new BatchMessageHandler(message, context, this, wrapper, 
executor);
-    }  
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
 
b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
index e4df933..d8576bb 100644
--- 
a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
+++ 
b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java
@@ -34,7 +34,7 @@ public class ScheduledTaskStateModel extends StateModel
 {
   static final String DEFAULT_INITIAL_STATE = "OFFLINE";
   Logger logger = Logger.getLogger(ScheduledTaskStateModel.class);
-  
+
   // TODO Get default state from implementation or from state model annotation
   // StateModel with initial state other than OFFLINE should override this 
field
   protected String _currentState = DEFAULT_INITIAL_STATE;
@@ -45,7 +45,7 @@ public class ScheduledTaskStateModel extends StateModel
 
   public ScheduledTaskStateModel(ScheduledTaskStateModelFactory factory, 
HelixTaskExecutor executor, String partitionName)
   {
-   _factory = factory; 
+   _factory = factory;
    _partitionName = partitionName;
    _executor = executor;
   }
@@ -55,7 +55,7 @@ public class ScheduledTaskStateModel extends StateModel
       NotificationContext context) throws InterruptedException
   {
     logger.info(_partitionName + " onBecomeCompletedFromOffline");
-    
+
     // Construct the inner task message from the mapfields of 
scheduledTaskQueue resource group
     Map<String, String> messageInfo = 
message.getRecord().getMapField(Message.Attributes.INNER_MESSAGE.toString());
     ZNRecord record = new ZNRecord(_partitionName);
@@ -74,7 +74,7 @@ public class ScheduledTaskStateModel extends StateModel
     handler.handleMessage();
     logger.info(_partitionName + " onBecomeCompletedFromOffline completed");
   }
-  
+
   @Transition(to="OFFLINE",from="COMPLETED")
   public void onBecomeOfflineFromCompleted(Message message,
       NotificationContext context)
@@ -89,7 +89,7 @@ public class ScheduledTaskStateModel extends StateModel
     logger.info(_partitionName + " onBecomeDroppedFromCompleted");
     removeFromStatemodelFactory();
   }
-  
+
 
   @Transition(to="DROPPED",from="OFFLINE")
   public void onBecomeDroppedFromOffline(Message message,
@@ -105,19 +105,20 @@ public class ScheduledTaskStateModel extends StateModel
   {
     logger.info(_partitionName + " onBecomeOfflineFromError");
   }
-  
+
+  @Override
   public void reset()
   {
     logger.info(_partitionName + " ScheduledTask reset");
     removeFromStatemodelFactory();
   }
-  
+
   // We need this to prevent state model leak
   private void removeFromStatemodelFactory()
   {
-    if(_factory.getStateModelMap().containsKey(_partitionName))
+    if(_factory.getStateModel(_partitionName) != null)
     {
-      _factory.getStateModelMap().remove(_partitionName);
+      _factory.removeStateModel(_partitionName);
     }
     else
     {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
 
b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
index 5267d6e..aec158b 100644
--- 
a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
+++ 
b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java
@@ -20,6 +20,7 @@ package org.apache.helix.participant.statemachine;
  */
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -27,35 +28,28 @@ import 
org.apache.helix.messaging.handling.BatchMessageWrapper;
 
 public abstract class StateModelFactory<T extends StateModel>
 {
-  // partitionName -> StateModel
-  private ConcurrentMap<String, T> _stateModelMap = new 
ConcurrentHashMap<String, T>();
-  
-  // resourceName -> BatchMessageWrapper
+  /**
+   * mapping from partitionName to StateModel
+   */
+  private final ConcurrentMap<String, T> _stateModelMap = new 
ConcurrentHashMap<String, T>();
+
+  /**
+   * mapping from resourceName to BatchMessageWrapper
+   */
   private final ConcurrentMap<String, BatchMessageWrapper> _batchMsgWrapperMap
       = new ConcurrentHashMap<String, BatchMessageWrapper>();
 
   /**
    * This method will be invoked only once per partitionName per session
-   * 
+   *
    * @param partitionName
    * @return
    */
   public abstract T createNewStateModel(String partitionName);
 
-//  /**
-//   * Add a state model for a partition
-//   * 
-//   * @param partitionName
-//   * @return
-//   */
-//  public void addStateModel(String partitionName, T stateModel)
-//  {
-//    _stateModelMap.put(partitionName, stateModel);
-//  }
-  
   /**
    * Create a state model for a partition
-   * 
+   *
    * @param partitionName
    */
   public T createAndAddStateModel(String partitionName)
@@ -67,9 +61,9 @@ public abstract class StateModelFactory<T extends StateModel>
 
   /**
    * Get the state model for a partition
-   * 
+   *
    * @param partitionName
-   * @return
+   * @return state model if exists, null otherwise
    */
   public T getStateModel(String partitionName)
   {
@@ -77,18 +71,29 @@ public abstract class StateModelFactory<T extends 
StateModel>
   }
 
   /**
-   * Get the state model map
-   * 
-   * @return
+   * remove state model for a partition
+   *
+   * @param partitionName
+   * @return state model removed or null if not exist
    */
-  public Map<String, T> getStateModelMap()
+  public T removeStateModel(String partitionName)
   {
-    return _stateModelMap;
+    return _stateModelMap.remove(partitionName);
   }
-  
+
+  /**
+   * get partition set
+   *
+   * @return partition key set
+   */
+  public Set<String> getPartitionSet()
+  {
+    return _stateModelMap.keySet();
+  }
+
   /**
    * create a default batch-message-wrapper for a resource
-   * 
+   *
    * @param resourceName
    * @return
    */
@@ -96,10 +101,10 @@ public abstract class StateModelFactory<T extends 
StateModel>
   {
     return new BatchMessageWrapper();
   }
-  
+
   /**
    * create a batch-message-wrapper for a resource and put it into map
-   * 
+   *
    * @param resourceName
    * @return
    */
@@ -109,10 +114,10 @@ public abstract class StateModelFactory<T extends 
StateModel>
     _batchMsgWrapperMap.put(resourceName, wrapper);
     return wrapper;
   }
-  
+
   /**
    * get batch-message-wrapper for a resource
-   * 
+   *
    * @param resourceName
    * @return
    */
@@ -120,5 +125,5 @@ public abstract class StateModelFactory<T extends 
StateModel>
   {
     return _batchMsgWrapperMap.get(resourceName);
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java 
b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
index 5bc1fc0..711be81 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java
@@ -30,11 +30,11 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
-
 public class TestHelixTaskExecutor
 {
 
@@ -56,36 +56,43 @@ public class TestHelixTaskExecutor
     message.setStateModelDef("MasterSlave");
 
     MockManager manager = new MockManager("clusterName");
-    // DataAccessor accessor = manager.getDataAccessor();
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    StateModelConfigGenerator generator = new StateModelConfigGenerator();
     StateModelDefinition stateModelDef =
-        new StateModelDefinition(generator.generateConfigForMasterSlave());
+        new 
StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
     Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.stateModelDef("MasterSlave"), 
stateModelDef);
 
     MockHelixTaskExecutor executor = new MockHelixTaskExecutor();
     MockStateModel stateModel = new MockStateModel();
-    NotificationContext context;
-    executor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
-                                           new AsyncCallbackService());
-    // String clusterName =" testcluster";
-    context = new NotificationContext(manager);
+    executor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(), 
new AsyncCallbackService());
+
+    NotificationContext context = new NotificationContext(manager);
     CurrentState currentStateDelta = new CurrentState("TestDB");
     currentStateDelta.setState("TestDB_0", "OFFLINE");
+
+    StateModelFactory<MockStateModel> stateModelFactory = new 
StateModelFactory<MockStateModel>()
+    {
+
+      @Override
+      public MockStateModel createNewStateModel(String partitionName)
+      {
+        // TODO Auto-generated method stub
+        return new MockStateModel();
+      }
+
+    };
     HelixStateTransitionHandler handler =
-        new HelixStateTransitionHandler(stateModel,
-                                        message,
-                                        context,
-                                        currentStateDelta);
+        new HelixStateTransitionHandler(stateModelFactory, stateModel, 
message, context, currentStateDelta);
 
-       HelixTask task = new HelixTask(message, context, handler, executor);
-       executor.scheduleTask(task);
-       for (int i = 0; i < 10; i++) {
-               if (!executor.isDone(task.getTaskId())) {
-                       Thread.sleep(500);
-               }
-       }
+    HelixTask task = new HelixTask(message, context, handler, executor);
+    executor.scheduleTask(task);
+    for (int i = 0; i < 10; i++)
+    {
+      if (!executor.isDone(task.getTaskId()))
+      {
+        Thread.sleep(500);
+      }
+    }
     AssertJUnit.assertTrue(stateModel.stateModelInvoked);
     System.out.println("END TestCMTaskExecutor");
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java 
b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
index 0910061..064e04d 100644
--- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
+++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java
@@ -35,11 +35,11 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.tools.StateModelConfigGenerator;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
-
 public class TestHelixTaskHandler
 {
   @Test()
@@ -62,11 +62,9 @@ public class TestHelixTaskHandler
     MockStateModel stateModel = new MockStateModel();
     NotificationContext context;
     MockManager manager = new MockManager("clusterName");
-//    DataAccessor accessor = manager.getDataAccessor();
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
-    StateModelConfigGenerator generator = new StateModelConfigGenerator();
-    StateModelDefinition stateModelDef = new StateModelDefinition(
-        generator.generateConfigForMasterSlave());
+    StateModelDefinition stateModelDef =
+        new 
StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
     Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.stateModelDef("MasterSlave"), 
stateModelDef);
 
@@ -74,21 +72,19 @@ public class TestHelixTaskHandler
     CurrentState currentStateDelta = new CurrentState("TestDB");
     currentStateDelta.setState("TestDB_0", "OFFLINE");
 
-    HelixStateTransitionHandler stHandler = new 
HelixStateTransitionHandler(stateModel, message,
-        context, currentStateDelta);
+    HelixStateTransitionHandler stHandler =
+        new HelixStateTransitionHandler(null, stateModel, message, context, 
currentStateDelta);
     HelixTask handler;
     handler = new HelixTask(message, context, stHandler, executor);
     handler.call();
     AssertJUnit.assertTrue(stateModel.stateModelInvoked);
-    System.out.println("END TestCMTaskHandler.testInvocation() at "
-        + new Date(System.currentTimeMillis()));
+    System.out.println("END TestCMTaskHandler.testInvocation() at " + new 
Date(System.currentTimeMillis()));
   }
 
   @Test()
   public void testInvocationAnnotated() throws Exception
   {
-    System.out.println("START TestCMTaskHandler.testInvocationAnnotated() at "
-        + new Date(System.currentTimeMillis()));
+    System.out.println("START TestCMTaskHandler.testInvocationAnnotated() at " 
+ new Date(System.currentTimeMillis()));
     HelixTaskExecutor executor = new HelixTaskExecutor();
     Message message = new Message(MessageType.STATE_TRANSITION, "Some unique 
id");
     message.setSrcName("cm-instance-0");
@@ -105,30 +101,37 @@ public class TestHelixTaskHandler
     NotificationContext context;
 
     MockManager manager = new MockManager("clusterName");
-//    DataAccessor accessor = manager.getDataAccessor();
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
 
-    StateModelConfigGenerator generator = new StateModelConfigGenerator();
-    StateModelDefinition stateModelDef = new StateModelDefinition(
-        generator.generateConfigForMasterSlave());
-//    accessor.setProperty(PropertyType.STATEMODELDEFS, stateModelDef, 
"MasterSlave");
+    StateModelDefinition stateModelDef =
+        new 
StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
     Builder keyBuilder = accessor.keyBuilder();
     accessor.setProperty(keyBuilder.stateModelDef("MasterSlave"), 
stateModelDef);
 
-
     context = new NotificationContext(manager);
-    
+
     CurrentState currentStateDelta = new CurrentState("TestDB");
     currentStateDelta.setState("TestDB_0", "OFFLINE");
 
-    HelixStateTransitionHandler stHandler = new 
HelixStateTransitionHandler(stateModel, message,
-        context, currentStateDelta);
+    StateModelFactory<MockStateModelAnnotated> stateModelFactory = new 
StateModelFactory<MockStateModelAnnotated>()
+    {
+
+      @Override
+      public MockStateModelAnnotated createNewStateModel(String partitionName)
+      {
+        // TODO Auto-generated method stub
+        return new MockStateModelAnnotated();
+      }
+
+    };
+
+    HelixStateTransitionHandler stHandler =
+        new HelixStateTransitionHandler(stateModelFactory, stateModel, 
message, context, currentStateDelta);
 
     HelixTask handler = new HelixTask(message, context, stHandler, executor);
     handler.call();
     AssertJUnit.assertTrue(stateModel.stateModelInvoked);
-    System.out.println("END TestCMTaskHandler.testInvocationAnnotated() at "
-        + new Date(System.currentTimeMillis()));
+    System.out.println("END TestCMTaskHandler.testInvocationAnnotated() at " + 
new Date(System.currentTimeMillis()));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
new file mode 100644
index 0000000..efd2459
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java
@@ -0,0 +1,225 @@
+package org.apache.helix.integration.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.mock.participant.ErrTransition;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.helix.tools.ClusterStateVerifier;
+import 
org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * test drop resource should remove state-models
+ */
+public class TestStateModelLeak extends ZkUnitTestBase
+{
+  private static Logger LOG = Logger.getLogger(TestStateModelLeak.class);
+
+  /**
+   * test drop resource should remove all state models
+   * @throws Exception
+   */
+  @Test
+  public void testDrop() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            4, // partitions per resource
+                            n, // number of nodes
+                            2, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // start controller
+    ClusterControllerManager controller = new 
ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++)
+    {
+      final String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+      participants[i].syncStart();
+    }
+
+    boolean result = ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // check state-models in state-machine
+    HelixStateMachineEngine stateMachine = (HelixStateMachineEngine) 
participants[0].getStateMachineEngine();
+    StateModelFactory<? extends StateModel> fty = 
stateMachine.getStateModelFactory("MasterSlave");
+    Map<String, String> expectStateModelMap = new TreeMap<String, String>();
+    expectStateModelMap.put("TestDB0_0", "SLAVE");
+    expectStateModelMap.put("TestDB0_1", "MASTER");
+    expectStateModelMap.put("TestDB0_2", "SLAVE");
+    expectStateModelMap.put("TestDB0_3", "MASTER");
+    checkStateModelMap(fty, expectStateModelMap);
+
+    // drop resource
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.dropResource(clusterName, "TestDB0");
+
+    result = ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // check state models have been dropped also
+    Assert.assertTrue(fty.getPartitionSet().isEmpty(), "All state-models 
should be dropped, but was "
+    + fty.getPartitionSet());
+
+    // cleanup
+    controller.syncStop();
+    for (int i = 0; i < n; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * test drop resource in error state should remove all state-models
+   * @throws Exception
+   */
+  @Test
+  public void testDropErrorPartition() throws Exception
+  {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            4, // partitions per resource
+                            n, // number of nodes
+                            2, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+
+    // start controller
+    ClusterControllerManager controller = new 
ClusterControllerManager(ZK_ADDR, clusterName, "controller");
+    controller.syncStart();
+
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < n; i++)
+    {
+      final String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, 
instanceName);
+      if (i == 0)
+      {
+        Map<String, Set<String>> errTransitionMap = new HashMap<String, 
Set<String>>();
+        Set<String> partitions = new HashSet<String>();
+        partitions.add("TestDB0_0");
+        errTransitionMap.put("OFFLINE-SLAVE", partitions);
+        participants[0].setTransition(new ErrTransition(errTransitionMap));
+      }
+
+      participants[i].syncStart();
+    }
+
+    Map<String, Map<String, String>> errStates = new HashMap<String, 
Map<String, String>>();
+    errStates.put("TestDB0", new HashMap<String, String>());
+    errStates.get("TestDB0").put("TestDB0_0", "localhost_12918");
+    boolean result = ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName, errStates));
+    Assert.assertTrue(result);
+
+    // check state-models in state-machine
+    HelixStateMachineEngine stateMachine = (HelixStateMachineEngine) 
participants[0].getStateMachineEngine();
+    StateModelFactory<? extends StateModel> fty = 
stateMachine.getStateModelFactory("MasterSlave");
+    Map<String, String> expectStateModelMap = new TreeMap<String, String>();
+    expectStateModelMap.put("TestDB0_0", "ERROR");
+    expectStateModelMap.put("TestDB0_1", "MASTER");
+    expectStateModelMap.put("TestDB0_2", "SLAVE");
+    expectStateModelMap.put("TestDB0_3", "MASTER");
+    checkStateModelMap(fty, expectStateModelMap);
+
+    // drop resource
+    HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+    admin.dropResource(clusterName, "TestDB0");
+
+    result = ClusterStateVerifier.verifyByZkCallback(new 
BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName));
+    Assert.assertTrue(result);
+
+    // check state models have been dropped also
+    Assert.assertTrue(fty.getPartitionSet().isEmpty(), "All state-models 
should be dropped, but was "
+    + fty.getPartitionSet());
+
+    // cleanup
+    controller.syncStop();
+    for (int i = 0; i < n; i++)
+    {
+      participants[i].syncStop();
+    }
+
+    System.out.println("END " + clusterName + " at " + new 
Date(System.currentTimeMillis()));
+  }
+
+  /**
+   * check state-model factory contains state-models same as in 
expect-state-model map
+   *
+   * @param fty
+   * @param expectStateModelMap
+   */
+  static void checkStateModelMap(StateModelFactory<? extends StateModel> fty, 
Map<String, String> expectStateModelMap)
+  {
+    Assert.assertEquals(fty.getPartitionSet().size(), 
expectStateModelMap.size());
+    for (String partition : fty.getPartitionSet())
+    {
+      StateModel stateModel = fty.getStateModel(partition);
+      String actualState = stateModel.getCurrentState();
+      String expectState = expectStateModelMap.get(partition);
+      LOG.debug(partition + " actual state: " + actualState + ", expect state: 
" + expectState);
+      Assert.assertEquals(actualState, expectState, "partition: " + partition 
+ " should be in state: " + expectState
+                          + " but was " + actualState);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index d84dd3c..d065f88 100644
--- 
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++ 
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -68,7 +68,7 @@ public class TestHelixTaskExecutor
       {
         HelixTaskResult result = new HelixTaskResult();
         _processedMsgIds.put(_message.getMsgId(), _message.getMsgId());
-        Thread.currentThread().sleep(100);
+        Thread.sleep(100);
         result.setSuccess(true);
         return result;
       }
@@ -77,7 +77,7 @@ public class TestHelixTaskExecutor
       public void onError(Exception e, ErrorCode code, ErrorType type)
       {
         // TODO Auto-generated method stub
-        
+
       }
     }
     @Override
@@ -116,7 +116,7 @@ public class TestHelixTaskExecutor
       // TODO Auto-generated method stub
       return "TestingMessageHandler2";
     }
-    
+
   }
 
   class CancellableHandlerFactory implements MessageHandlerFactory
@@ -150,7 +150,7 @@ public class TestHelixTaskExecutor
           {
             Thread.sleep(100);
           }
-        } 
+        }
         catch (InterruptedException e)
         {
           _interrupted = true;
@@ -374,7 +374,7 @@ public class TestHelixTaskExecutor
 
     TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
     executor.registerMessageHandlerFactory(factory.getMessageType(), factory);
-    
+
 
     NotificationContext changeContext = new NotificationContext(manager);
     List<Message> msgList = new ArrayList<Message>();
@@ -396,7 +396,7 @@ public class TestHelixTaskExecutor
     exceptionMsg.setSrcName("127.101.1.23_2234");
     exceptionMsg.setCorrelationId(UUID.randomUUID().toString());
     msgList.add(exceptionMsg);
-    
+
     executor.onMessage("someInstance", msgList, changeContext);
 
     Thread.sleep(1000);
@@ -527,7 +527,7 @@ public class TestHelixTaskExecutor
       }
       System.out.println("END TestCMTaskExecutor.testShutdown()");
   }
-  
+
   @Test ()
   public void testNoRetry() throws InterruptedException
   {
@@ -554,7 +554,7 @@ public class TestHelixTaskExecutor
       msgList.add(msg);
     }
     executor.onMessage("someInstance", msgList, changeContext);
-    
+
     Thread.sleep(4000);
 
     AssertJUnit.assertTrue(factory._handlersCreated ==  nMsgs2);
@@ -569,7 +569,7 @@ public class TestHelixTaskExecutor
       }
     }
   }
-  
+
   @Test ()
   public void testRetryOnce() throws InterruptedException
   {
@@ -608,6 +608,6 @@ public class TestHelixTaskExecutor
     
AssertJUnit.assertTrue(msgList.get(1).getRecord().getSimpleField("Cancelcount").equals("1"));
     AssertJUnit.assertEquals(factory._timedOutMsgIds.size(),2);
     AssertJUnit.assertTrue(executor._taskMap.size() == 0);
-    
+
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
 
b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
index d8f851e..c73f589 100644
--- 
a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
+++ 
b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java
@@ -43,9 +43,9 @@ public class MockMSModelFactory extends 
StateModelFactory<MockMSStateModel>
     _transition = transition;
 
     // set existing transition
-    Map<String, MockMSStateModel> stateModelMap = getStateModelMap();
-    for (MockMSStateModel stateModel : stateModelMap.values())
+    for (String partition : getPartitionSet())
     {
+      MockMSStateModel stateModel = getStateModel(partition);
       stateModel.setTransition(transition);
     }
   }

Reply via email to