Updated Branches: refs/heads/master 2bf12e92c -> 11df95e81
HELIX-197: fix state model leak, rb=13437 Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/11df95e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/11df95e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/11df95e8 Branch: refs/heads/master Commit: 11df95e810a0102584d4ba690dcbe241a8a00b1b Parents: 2bf12e9 Author: zzhang <[email protected]> Authored: Fri Aug 9 13:29:34 2013 -0700 Committer: zzhang <[email protected]> Committed: Fri Aug 9 13:29:34 2013 -0700 ---------------------------------------------------------------------- .../handling/HelixStateTransitionHandler.java | 33 +-- .../participant/HelixStateMachineEngine.java | 23 +- .../statemachine/ScheduledTaskStateModel.java | 19 +- .../statemachine/StateModelFactory.java | 67 +++--- .../org/apache/helix/TestHelixTaskExecutor.java | 47 ++-- .../org/apache/helix/TestHelixTaskHandler.java | 47 ++-- .../integration/manager/TestStateModelLeak.java | 225 +++++++++++++++++++ .../handling/TestHelixTaskExecutor.java | 20 +- .../mock/participant/MockMSModelFactory.java | 4 +- 9 files changed, 363 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java index 2baa1cf..752437f 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java @@ -44,6 +44,7 @@ import org.apache.helix.model.CurrentState; import org.apache.helix.model.Message; import org.apache.helix.model.Message.Attributes; import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.helix.participant.statemachine.StateModelParser; import org.apache.helix.participant.statemachine.StateTransitionError; import org.apache.helix.util.StatusUpdateUtil; @@ -66,9 +67,11 @@ public class HelixStateTransitionHandler extends MessageHandler private final StateModelParser _transitionMethodFinder; private final CurrentState _currentStateDelta; private final HelixManager _manager; + private final StateModelFactory<? extends StateModel> _stateModelFactory; volatile boolean _isTimeout = false; - public HelixStateTransitionHandler(StateModel stateModel, + public HelixStateTransitionHandler(StateModelFactory<? extends StateModel> stateModelFactory, + StateModel stateModel, Message message, NotificationContext context, CurrentState currentStateDelta) @@ -78,7 +81,8 @@ public class HelixStateTransitionHandler extends MessageHandler _statusUpdateUtil = new StatusUpdateUtil(); _transitionMethodFinder = new StateModelParser(); _currentStateDelta = currentStateDelta; - _manager = _notificationContext.getManager();; + _manager = _notificationContext.getManager(); + _stateModelFactory = stateModelFactory; } void preHandleMessage() throws Exception @@ -97,7 +101,7 @@ public class HelixStateTransitionHandler extends MessageHandler logger.error(errorMessage); throw new HelixException(errorMessage); } - + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); String partitionName = _message.getPartitionName(); @@ -127,7 +131,7 @@ public class HelixStateTransitionHandler extends MessageHandler { HelixTaskResult taskResult = (HelixTaskResult) _notificationContext.get(MapKey.HELIX_TASK_RESULT.toString()); Exception exception = taskResult.getException(); - + String partitionKey = _message.getPartitionName(); String resource = _message.getResourceName(); String sessionId = _message.getTgtSessionId(); @@ -140,7 +144,7 @@ public class HelixStateTransitionHandler extends MessageHandler ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(bucketSize); // No need to sync on manager, we are cancel executor in expiry session before start executor in new session - // sessionId might change when we update the state model state. + // sessionId might change when we update the state model state. // for zk current state it is OK as we have the per-session current state node if (!_message.getTgtSessionId().equals(_manager.getSessionId())) { @@ -169,6 +173,7 @@ public class HelixStateTransitionHandler extends MessageHandler List<ZNRecordDelta> deltaList = new ArrayList<ZNRecordDelta>(); deltaList.add(delta); _currentStateDelta.setDeltaList(deltaList); + _stateModelFactory.removeStateModel(partitionKey); } else { @@ -213,14 +218,14 @@ public class HelixStateTransitionHandler extends MessageHandler _stateModel.rollbackOnError(_message, _notificationContext, error); _currentStateDelta.setState(partitionKey, HelixDefinedState.ERROR.toString()); _stateModel.updateState(HelixDefinedState.ERROR.toString()); - + // if we have errors transit from ERROR state, disable the partition if (_message.getFromState().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) { disablePartition(); } } } - + try { // Update the ZK current state of the node @@ -236,7 +241,7 @@ public class HelixStateTransitionHandler extends MessageHandler else { // sub-message of a batch message - ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap + ConcurrentHashMap<String, CurrentStateUpdate> csUpdateMap = (ConcurrentHashMap<String, CurrentStateUpdate>) _notificationContext.get(MapKey.CURRENT_STATE_UPDATE.toString()); csUpdateMap.put(partitionKey, new CurrentStateUpdate(key, _currentStateDelta)); } @@ -266,13 +271,13 @@ public class HelixStateTransitionHandler extends MessageHandler + " for partition: " + partitionName + ". disable it on " + instanceName); } - + @Override public HelixTaskResult handleMessage() { NotificationContext context = _notificationContext; Message message = _message; - + synchronized (_stateModel) { HelixTaskResult taskResult = new HelixTaskResult(); @@ -318,7 +323,7 @@ public class HelixStateTransitionHandler extends MessageHandler taskResult.setException(e); taskResult.setInterrupted(e instanceof InterruptedException); } - + // add task result to context for postHandling context.add(MapKey.HELIX_TASK_RESULT.toString(), taskResult); postHandleMessage(); @@ -371,13 +376,13 @@ public class HelixStateTransitionHandler extends MessageHandler @Override public void onError(Exception e, ErrorCode code, ErrorType type) - { + { HelixDataAccessor accessor = _manager.getHelixDataAccessor(); Builder keyBuilder = accessor.keyBuilder(); String instanceName = _manager.getInstanceName(); String resourceName = _message.getResourceName(); String partition = _message.getPartitionName(); - + // All internal error has been processed already, so we can skip them if (type == ErrorType.INTERNAL) { @@ -392,7 +397,7 @@ public class HelixStateTransitionHandler extends MessageHandler CurrentState currentStateDelta = new CurrentState(resourceName); currentStateDelta.setState(partition, HelixDefinedState.ERROR.toString()); _stateModel.updateState(HelixDefinedState.ERROR.toString()); - + // if transit from ERROR state, disable the partition if (_message.getFromState().equalsIgnoreCase(HelixDefinedState.ERROR.toString())) { disablePartition(); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java index 8287b46..c6dc8ca 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java +++ b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java @@ -167,15 +167,9 @@ public class HelixStateMachineEngine implements StateMachineEngine { for (StateModelFactory<? extends StateModel> stateModelFactory : ftyMap.values()) { - Map<String, ? extends StateModel> modelMap = stateModelFactory.getStateModelMap(); - if (modelMap == null || modelMap.isEmpty()) + for (String resourceKey : stateModelFactory.getPartitionSet()) { - continue; - } - - for (String resourceKey : modelMap.keySet()) - { - StateModel stateModel = modelMap.get(resourceKey); + StateModel stateModel = stateModelFactory.getStateModel(resourceKey); stateModel.reset(); String initialState = _stateModelParser.getInitialState(stateModel.getClass()); stateModel.updateState(initialState); @@ -193,7 +187,7 @@ public class HelixStateMachineEngine implements StateMachineEngine if (!type.equals(MessageType.STATE_TRANSITION.toString())) { - throw new HelixException("Expect state-transition message type, but was " + throw new HelixException("Expect state-transition message type, but was " + message.getMsgType() + ", msgId: " + message.getMsgId()); } @@ -259,28 +253,29 @@ public class HelixStateMachineEngine implements StateMachineEngine currentStateDelta.setState(partitionKey, (stateModel.getCurrentState() == null) ? initState : stateModel.getCurrentState()); - return new HelixStateTransitionHandler(stateModel, + return new HelixStateTransitionHandler(stateModelFactory, + stateModel, message, context, currentStateDelta); } else - { + { BatchMessageWrapper wrapper = stateModelFactory.getBatchMessageWrapper(resourceName); if (wrapper == null) { wrapper = stateModelFactory.createAndAddBatchMessageWrapper(resourceName); } - + // get executor-service for the message TaskExecutor executor = (TaskExecutor) context.get(MapKey.TASK_EXECUTOR.toString()); if (executor == null) { - logger.error("fail to get executor-service for batch message: " + message.getId() + logger.error("fail to get executor-service for batch message: " + message.getId() + ". msgType: " + message.getMsgType() + ", resource: " + message.getResourceName()); return null; } return new BatchMessageHandler(message, context, this, wrapper, executor); - } + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java index e4df933..d8576bb 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java +++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/ScheduledTaskStateModel.java @@ -34,7 +34,7 @@ public class ScheduledTaskStateModel extends StateModel { static final String DEFAULT_INITIAL_STATE = "OFFLINE"; Logger logger = Logger.getLogger(ScheduledTaskStateModel.class); - + // TODO Get default state from implementation or from state model annotation // StateModel with initial state other than OFFLINE should override this field protected String _currentState = DEFAULT_INITIAL_STATE; @@ -45,7 +45,7 @@ public class ScheduledTaskStateModel extends StateModel public ScheduledTaskStateModel(ScheduledTaskStateModelFactory factory, HelixTaskExecutor executor, String partitionName) { - _factory = factory; + _factory = factory; _partitionName = partitionName; _executor = executor; } @@ -55,7 +55,7 @@ public class ScheduledTaskStateModel extends StateModel NotificationContext context) throws InterruptedException { logger.info(_partitionName + " onBecomeCompletedFromOffline"); - + // Construct the inner task message from the mapfields of scheduledTaskQueue resource group Map<String, String> messageInfo = message.getRecord().getMapField(Message.Attributes.INNER_MESSAGE.toString()); ZNRecord record = new ZNRecord(_partitionName); @@ -74,7 +74,7 @@ public class ScheduledTaskStateModel extends StateModel handler.handleMessage(); logger.info(_partitionName + " onBecomeCompletedFromOffline completed"); } - + @Transition(to="OFFLINE",from="COMPLETED") public void onBecomeOfflineFromCompleted(Message message, NotificationContext context) @@ -89,7 +89,7 @@ public class ScheduledTaskStateModel extends StateModel logger.info(_partitionName + " onBecomeDroppedFromCompleted"); removeFromStatemodelFactory(); } - + @Transition(to="DROPPED",from="OFFLINE") public void onBecomeDroppedFromOffline(Message message, @@ -105,19 +105,20 @@ public class ScheduledTaskStateModel extends StateModel { logger.info(_partitionName + " onBecomeOfflineFromError"); } - + + @Override public void reset() { logger.info(_partitionName + " ScheduledTask reset"); removeFromStatemodelFactory(); } - + // We need this to prevent state model leak private void removeFromStatemodelFactory() { - if(_factory.getStateModelMap().containsKey(_partitionName)) + if(_factory.getStateModel(_partitionName) != null) { - _factory.getStateModelMap().remove(_partitionName); + _factory.removeStateModel(_partitionName); } else { http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java index 5267d6e..aec158b 100644 --- a/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java +++ b/helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModelFactory.java @@ -20,6 +20,7 @@ package org.apache.helix.participant.statemachine; */ import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -27,35 +28,28 @@ import org.apache.helix.messaging.handling.BatchMessageWrapper; public abstract class StateModelFactory<T extends StateModel> { - // partitionName -> StateModel - private ConcurrentMap<String, T> _stateModelMap = new ConcurrentHashMap<String, T>(); - - // resourceName -> BatchMessageWrapper + /** + * mapping from partitionName to StateModel + */ + private final ConcurrentMap<String, T> _stateModelMap = new ConcurrentHashMap<String, T>(); + + /** + * mapping from resourceName to BatchMessageWrapper + */ private final ConcurrentMap<String, BatchMessageWrapper> _batchMsgWrapperMap = new ConcurrentHashMap<String, BatchMessageWrapper>(); /** * This method will be invoked only once per partitionName per session - * + * * @param partitionName * @return */ public abstract T createNewStateModel(String partitionName); -// /** -// * Add a state model for a partition -// * -// * @param partitionName -// * @return -// */ -// public void addStateModel(String partitionName, T stateModel) -// { -// _stateModelMap.put(partitionName, stateModel); -// } - /** * Create a state model for a partition - * + * * @param partitionName */ public T createAndAddStateModel(String partitionName) @@ -67,9 +61,9 @@ public abstract class StateModelFactory<T extends StateModel> /** * Get the state model for a partition - * + * * @param partitionName - * @return + * @return state model if exists, null otherwise */ public T getStateModel(String partitionName) { @@ -77,18 +71,29 @@ public abstract class StateModelFactory<T extends StateModel> } /** - * Get the state model map - * - * @return + * remove state model for a partition + * + * @param partitionName + * @return state model removed or null if not exist */ - public Map<String, T> getStateModelMap() + public T removeStateModel(String partitionName) { - return _stateModelMap; + return _stateModelMap.remove(partitionName); } - + + /** + * get partition set + * + * @return partition key set + */ + public Set<String> getPartitionSet() + { + return _stateModelMap.keySet(); + } + /** * create a default batch-message-wrapper for a resource - * + * * @param resourceName * @return */ @@ -96,10 +101,10 @@ public abstract class StateModelFactory<T extends StateModel> { return new BatchMessageWrapper(); } - + /** * create a batch-message-wrapper for a resource and put it into map - * + * * @param resourceName * @return */ @@ -109,10 +114,10 @@ public abstract class StateModelFactory<T extends StateModel> _batchMsgWrapperMap.put(resourceName, wrapper); return wrapper; } - + /** * get batch-message-wrapper for a resource - * + * * @param resourceName * @return */ @@ -120,5 +125,5 @@ public abstract class StateModelFactory<T extends StateModel> { return _batchMsgWrapperMap.get(resourceName); } - + } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java index 5bc1fc0..711be81 100644 --- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java +++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskExecutor.java @@ -30,11 +30,11 @@ import org.apache.helix.model.CurrentState; import org.apache.helix.model.Message; import org.apache.helix.model.Message.MessageType; import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.helix.tools.StateModelConfigGenerator; import org.testng.AssertJUnit; import org.testng.annotations.Test; - public class TestHelixTaskExecutor { @@ -56,36 +56,43 @@ public class TestHelixTaskExecutor message.setStateModelDef("MasterSlave"); MockManager manager = new MockManager("clusterName"); - // DataAccessor accessor = manager.getDataAccessor(); HelixDataAccessor accessor = manager.getHelixDataAccessor(); - StateModelConfigGenerator generator = new StateModelConfigGenerator(); StateModelDefinition stateModelDef = - new StateModelDefinition(generator.generateConfigForMasterSlave()); + new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.stateModelDef("MasterSlave"), stateModelDef); MockHelixTaskExecutor executor = new MockHelixTaskExecutor(); MockStateModel stateModel = new MockStateModel(); - NotificationContext context; - executor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(), - new AsyncCallbackService()); - // String clusterName =" testcluster"; - context = new NotificationContext(manager); + executor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(), new AsyncCallbackService()); + + NotificationContext context = new NotificationContext(manager); CurrentState currentStateDelta = new CurrentState("TestDB"); currentStateDelta.setState("TestDB_0", "OFFLINE"); + + StateModelFactory<MockStateModel> stateModelFactory = new StateModelFactory<MockStateModel>() + { + + @Override + public MockStateModel createNewStateModel(String partitionName) + { + // TODO Auto-generated method stub + return new MockStateModel(); + } + + }; HelixStateTransitionHandler handler = - new HelixStateTransitionHandler(stateModel, - message, - context, - currentStateDelta); + new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context, currentStateDelta); - HelixTask task = new HelixTask(message, context, handler, executor); - executor.scheduleTask(task); - for (int i = 0; i < 10; i++) { - if (!executor.isDone(task.getTaskId())) { - Thread.sleep(500); - } - } + HelixTask task = new HelixTask(message, context, handler, executor); + executor.scheduleTask(task); + for (int i = 0; i < 10; i++) + { + if (!executor.isDone(task.getTaskId())) + { + Thread.sleep(500); + } + } AssertJUnit.assertTrue(stateModel.stateModelInvoked); System.out.println("END TestCMTaskExecutor"); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java index 0910061..064e04d 100644 --- a/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java +++ b/helix-core/src/test/java/org/apache/helix/TestHelixTaskHandler.java @@ -35,11 +35,11 @@ import org.apache.helix.model.CurrentState; import org.apache.helix.model.Message; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.model.Message.MessageType; +import org.apache.helix.participant.statemachine.StateModelFactory; import org.apache.helix.tools.StateModelConfigGenerator; import org.testng.AssertJUnit; import org.testng.annotations.Test; - public class TestHelixTaskHandler { @Test() @@ -62,11 +62,9 @@ public class TestHelixTaskHandler MockStateModel stateModel = new MockStateModel(); NotificationContext context; MockManager manager = new MockManager("clusterName"); -// DataAccessor accessor = manager.getDataAccessor(); HelixDataAccessor accessor = manager.getHelixDataAccessor(); - StateModelConfigGenerator generator = new StateModelConfigGenerator(); - StateModelDefinition stateModelDef = new StateModelDefinition( - generator.generateConfigForMasterSlave()); + StateModelDefinition stateModelDef = + new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.stateModelDef("MasterSlave"), stateModelDef); @@ -74,21 +72,19 @@ public class TestHelixTaskHandler CurrentState currentStateDelta = new CurrentState("TestDB"); currentStateDelta.setState("TestDB_0", "OFFLINE"); - HelixStateTransitionHandler stHandler = new HelixStateTransitionHandler(stateModel, message, - context, currentStateDelta); + HelixStateTransitionHandler stHandler = + new HelixStateTransitionHandler(null, stateModel, message, context, currentStateDelta); HelixTask handler; handler = new HelixTask(message, context, stHandler, executor); handler.call(); AssertJUnit.assertTrue(stateModel.stateModelInvoked); - System.out.println("END TestCMTaskHandler.testInvocation() at " - + new Date(System.currentTimeMillis())); + System.out.println("END TestCMTaskHandler.testInvocation() at " + new Date(System.currentTimeMillis())); } @Test() public void testInvocationAnnotated() throws Exception { - System.out.println("START TestCMTaskHandler.testInvocationAnnotated() at " - + new Date(System.currentTimeMillis())); + System.out.println("START TestCMTaskHandler.testInvocationAnnotated() at " + new Date(System.currentTimeMillis())); HelixTaskExecutor executor = new HelixTaskExecutor(); Message message = new Message(MessageType.STATE_TRANSITION, "Some unique id"); message.setSrcName("cm-instance-0"); @@ -105,30 +101,37 @@ public class TestHelixTaskHandler NotificationContext context; MockManager manager = new MockManager("clusterName"); -// DataAccessor accessor = manager.getDataAccessor(); HelixDataAccessor accessor = manager.getHelixDataAccessor(); - StateModelConfigGenerator generator = new StateModelConfigGenerator(); - StateModelDefinition stateModelDef = new StateModelDefinition( - generator.generateConfigForMasterSlave()); -// accessor.setProperty(PropertyType.STATEMODELDEFS, stateModelDef, "MasterSlave"); + StateModelDefinition stateModelDef = + new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); Builder keyBuilder = accessor.keyBuilder(); accessor.setProperty(keyBuilder.stateModelDef("MasterSlave"), stateModelDef); - context = new NotificationContext(manager); - + CurrentState currentStateDelta = new CurrentState("TestDB"); currentStateDelta.setState("TestDB_0", "OFFLINE"); - HelixStateTransitionHandler stHandler = new HelixStateTransitionHandler(stateModel, message, - context, currentStateDelta); + StateModelFactory<MockStateModelAnnotated> stateModelFactory = new StateModelFactory<MockStateModelAnnotated>() + { + + @Override + public MockStateModelAnnotated createNewStateModel(String partitionName) + { + // TODO Auto-generated method stub + return new MockStateModelAnnotated(); + } + + }; + + HelixStateTransitionHandler stHandler = + new HelixStateTransitionHandler(stateModelFactory, stateModel, message, context, currentStateDelta); HelixTask handler = new HelixTask(message, context, stHandler, executor); handler.call(); AssertJUnit.assertTrue(stateModel.stateModelInvoked); - System.out.println("END TestCMTaskHandler.testInvocationAnnotated() at " - + new Date(System.currentTimeMillis())); + System.out.println("END TestCMTaskHandler.testInvocationAnnotated() at " + new Date(System.currentTimeMillis())); } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java new file mode 100644 index 0000000..efd2459 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestStateModelLeak.java @@ -0,0 +1,225 @@ +package org.apache.helix.integration.manager; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.helix.HelixAdmin; +import org.apache.helix.TestHelper; +import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.mock.participant.ErrTransition; +import org.apache.helix.participant.HelixStateMachineEngine; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * test drop resource should remove state-models + */ +public class TestStateModelLeak extends ZkUnitTestBase +{ + private static Logger LOG = Logger.getLogger(TestStateModelLeak.class); + + /** + * test drop resource should remove all state models + * @throws Exception + */ + @Test + public void testDrop() throws Exception + { + // Logger.getRootLogger().setLevel(Level.INFO); + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + int n = 2; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + 4, // partitions per resource + n, // number of nodes + 2, // replicas + "MasterSlave", + true); // do rebalance + + // start controller + ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName, "controller"); + controller.syncStart(); + + MockParticipantManager[] participants = new MockParticipantManager[n]; + for (int i = 0; i < n; i++) + { + final String instanceName = "localhost_" + (12918 + i); + + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].syncStart(); + } + + boolean result = ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName)); + Assert.assertTrue(result); + + // check state-models in state-machine + HelixStateMachineEngine stateMachine = (HelixStateMachineEngine) participants[0].getStateMachineEngine(); + StateModelFactory<? extends StateModel> fty = stateMachine.getStateModelFactory("MasterSlave"); + Map<String, String> expectStateModelMap = new TreeMap<String, String>(); + expectStateModelMap.put("TestDB0_0", "SLAVE"); + expectStateModelMap.put("TestDB0_1", "MASTER"); + expectStateModelMap.put("TestDB0_2", "SLAVE"); + expectStateModelMap.put("TestDB0_3", "MASTER"); + checkStateModelMap(fty, expectStateModelMap); + + // drop resource + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); + admin.dropResource(clusterName, "TestDB0"); + + result = ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName)); + Assert.assertTrue(result); + + // check state models have been dropped also + Assert.assertTrue(fty.getPartitionSet().isEmpty(), "All state-models should be dropped, but was " + + fty.getPartitionSet()); + + // cleanup + controller.syncStop(); + for (int i = 0; i < n; i++) + { + participants[i].syncStop(); + } + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + /** + * test drop resource in error state should remove all state-models + * @throws Exception + */ + @Test + public void testDropErrorPartition() throws Exception + { + // Logger.getRootLogger().setLevel(Level.INFO); + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + int n = 2; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + 4, // partitions per resource + n, // number of nodes + 2, // replicas + "MasterSlave", + true); // do rebalance + + // start controller + ClusterControllerManager controller = new ClusterControllerManager(ZK_ADDR, clusterName, "controller"); + controller.syncStart(); + + MockParticipantManager[] participants = new MockParticipantManager[n]; + for (int i = 0; i < n; i++) + { + final String instanceName = "localhost_" + (12918 + i); + + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + if (i == 0) + { + Map<String, Set<String>> errTransitionMap = new HashMap<String, Set<String>>(); + Set<String> partitions = new HashSet<String>(); + partitions.add("TestDB0_0"); + errTransitionMap.put("OFFLINE-SLAVE", partitions); + participants[0].setTransition(new ErrTransition(errTransitionMap)); + } + + participants[i].syncStart(); + } + + Map<String, Map<String, String>> errStates = new HashMap<String, Map<String, String>>(); + errStates.put("TestDB0", new HashMap<String, String>()); + errStates.get("TestDB0").put("TestDB0_0", "localhost_12918"); + boolean result = ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName, errStates)); + Assert.assertTrue(result); + + // check state-models in state-machine + HelixStateMachineEngine stateMachine = (HelixStateMachineEngine) participants[0].getStateMachineEngine(); + StateModelFactory<? extends StateModel> fty = stateMachine.getStateModelFactory("MasterSlave"); + Map<String, String> expectStateModelMap = new TreeMap<String, String>(); + expectStateModelMap.put("TestDB0_0", "ERROR"); + expectStateModelMap.put("TestDB0_1", "MASTER"); + expectStateModelMap.put("TestDB0_2", "SLAVE"); + expectStateModelMap.put("TestDB0_3", "MASTER"); + checkStateModelMap(fty, expectStateModelMap); + + // drop resource + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); + admin.dropResource(clusterName, "TestDB0"); + + result = ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, clusterName)); + Assert.assertTrue(result); + + // check state models have been dropped also + Assert.assertTrue(fty.getPartitionSet().isEmpty(), "All state-models should be dropped, but was " + + fty.getPartitionSet()); + + // cleanup + controller.syncStop(); + for (int i = 0; i < n; i++) + { + participants[i].syncStop(); + } + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + /** + * check state-model factory contains state-models same as in expect-state-model map + * + * @param fty + * @param expectStateModelMap + */ + static void checkStateModelMap(StateModelFactory<? extends StateModel> fty, Map<String, String> expectStateModelMap) + { + Assert.assertEquals(fty.getPartitionSet().size(), expectStateModelMap.size()); + for (String partition : fty.getPartitionSet()) + { + StateModel stateModel = fty.getStateModel(partition); + String actualState = stateModel.getCurrentState(); + String expectState = expectStateModelMap.get(partition); + LOG.debug(partition + " actual state: " + actualState + ", expect state: " + expectState); + Assert.assertEquals(actualState, expectState, "partition: " + partition + " should be in state: " + expectState + + " but was " + actualState); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java index d84dd3c..d065f88 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java @@ -68,7 +68,7 @@ public class TestHelixTaskExecutor { HelixTaskResult result = new HelixTaskResult(); _processedMsgIds.put(_message.getMsgId(), _message.getMsgId()); - Thread.currentThread().sleep(100); + Thread.sleep(100); result.setSuccess(true); return result; } @@ -77,7 +77,7 @@ public class TestHelixTaskExecutor public void onError(Exception e, ErrorCode code, ErrorType type) { // TODO Auto-generated method stub - + } } @Override @@ -116,7 +116,7 @@ public class TestHelixTaskExecutor // TODO Auto-generated method stub return "TestingMessageHandler2"; } - + } class CancellableHandlerFactory implements MessageHandlerFactory @@ -150,7 +150,7 @@ public class TestHelixTaskExecutor { Thread.sleep(100); } - } + } catch (InterruptedException e) { _interrupted = true; @@ -374,7 +374,7 @@ public class TestHelixTaskExecutor TestMessageHandlerFactory factory = new TestMessageHandlerFactory(); executor.registerMessageHandlerFactory(factory.getMessageType(), factory); - + NotificationContext changeContext = new NotificationContext(manager); List<Message> msgList = new ArrayList<Message>(); @@ -396,7 +396,7 @@ public class TestHelixTaskExecutor exceptionMsg.setSrcName("127.101.1.23_2234"); exceptionMsg.setCorrelationId(UUID.randomUUID().toString()); msgList.add(exceptionMsg); - + executor.onMessage("someInstance", msgList, changeContext); Thread.sleep(1000); @@ -527,7 +527,7 @@ public class TestHelixTaskExecutor } System.out.println("END TestCMTaskExecutor.testShutdown()"); } - + @Test () public void testNoRetry() throws InterruptedException { @@ -554,7 +554,7 @@ public class TestHelixTaskExecutor msgList.add(msg); } executor.onMessage("someInstance", msgList, changeContext); - + Thread.sleep(4000); AssertJUnit.assertTrue(factory._handlersCreated == nMsgs2); @@ -569,7 +569,7 @@ public class TestHelixTaskExecutor } } } - + @Test () public void testRetryOnce() throws InterruptedException { @@ -608,6 +608,6 @@ public class TestHelixTaskExecutor AssertJUnit.assertTrue(msgList.get(1).getRecord().getSimpleField("Cancelcount").equals("1")); AssertJUnit.assertEquals(factory._timedOutMsgIds.size(),2); AssertJUnit.assertTrue(executor._taskMap.size() == 0); - + } } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/11df95e8/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java index d8f851e..c73f589 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java +++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSModelFactory.java @@ -43,9 +43,9 @@ public class MockMSModelFactory extends StateModelFactory<MockMSStateModel> _transition = transition; // set existing transition - Map<String, MockMSStateModel> stateModelMap = getStateModelMap(); - for (MockMSStateModel stateModel : stateModelMap.values()) + for (String partition : getPartitionSet()) { + MockMSStateModel stateModel = getStateModel(partition); stateModel.setTransition(transition); } }
