Repository: helix Updated Branches: refs/heads/master 323fbd049 -> 8dc19afb9
[HELIX-705]: Participant duplicated state transition handling rework Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8dc19afb Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8dc19afb Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8dc19afb Branch: refs/heads/master Commit: 8dc19afb9b70d262da0eb2081840d65f2a031122 Parents: 323fbd0 Author: Harry Zhang <[email protected]> Authored: Mon Jun 25 15:55:14 2018 -0700 Committer: Harry Zhang <[email protected]> Committed: Tue Jun 26 15:33:47 2018 -0700 ---------------------------------------------------------------------- .../handling/HelixStateTransitionHandler.java | 54 +++++++++++++++----- .../helix/messaging/handling/HelixTask.java | 2 +- .../messaging/handling/HelixTaskExecutor.java | 36 +++++++++++-- .../handling/TestHelixTaskExecutor.java | 51 +++++++++++++----- 4 files changed, 111 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/8dc19afb/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java index 9412dde..36245de 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java @@ -28,7 +28,6 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; - import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixDefinedState; @@ -61,6 +60,15 @@ public class HelixStateTransitionHandler extends MessageHandler { } } + /** + * If current state == toState in message, this is considered as Duplicated state transition + */ + public static class HelixDuplicatedStateTransitionException extends Exception { + public HelixDuplicatedStateTransitionException(String info) { + super(info); + } + } + private static final Logger logger = LoggerFactory.getLogger(HelixStateTransitionHandler.class); private final StateModel _stateModel; StatusUpdateUtil _statusUpdateUtil; @@ -104,24 +112,39 @@ public class HelixStateTransitionHandler extends MessageHandler { String partitionName = _message.getPartitionName(); String fromState = _message.getFromState(); + String toState = _message.getToState(); // Verify the fromState and current state of the stateModel - String state = _currentStateDelta.getState(partitionName); + // getting current state from state model will provide most up-to-date + // current state. In case current state is null, partition is in initial + // state and we are setting it in current state + String state = _stateModel.getCurrentState() != null + ? _stateModel.getCurrentState() + : _currentStateDelta.getState(partitionName); // Set start time right before invoke client logic _currentStateDelta.setStartTime(_message.getPartitionName(), System.currentTimeMillis()); - if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) { - String errorMessage = - "Current state of stateModel does not match the fromState in Message" - + ", Current State:" + state + ", message expected:" + fromState + ", partition: " - + partitionName + ", from: " + _message.getMsgSrc() + ", to: " - + _message.getTgtName(); + Exception err = null; + if (toState.equalsIgnoreCase(state)) { + // To state equals current state, we can just ignore the message + err = new HelixDuplicatedStateTransitionException( + String.format("Partition %s current state is same as toState (%s->%s) from message.", + partitionName, fromState, toState) + ); + } else if (fromState != null && !fromState.equals("*") && !fromState.equalsIgnoreCase(state)) { + // If current state is neither toState nor fromState in message, there is a problem + err = new HelixStateMismatchException( + String.format( + "Current state of stateModel does not match the fromState in Message, CurrentState: %s, Message: %s->%s, Partition: %s, from: %s, to: %s", + state, fromState, toState, partitionName, _message.getMsgSrc(), _message.getTgtName()) + ); + } - _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, errorMessage, - _manager); - logger.error(errorMessage); - throw new HelixStateMismatchException(errorMessage); + if (err != null) { + _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, err.getMessage(), _manager); + logger.error(err.getMessage()); + throw err; } // Reset the REQUESTED_STATE property if it exists. @@ -319,6 +342,11 @@ public class HelixStateTransitionHandler extends MessageHandler { try { preHandleMessage(); invoke(manager, context, taskResult, message); + } catch (HelixDuplicatedStateTransitionException e) { + // Duplicated state transition problem is fine + taskResult.setSuccess(true); + taskResult.setMessage(e.toString()); + taskResult.setInfo(e.getMessage()); } catch (HelixStateMismatchException e) { // Simply log error and return from here if State mismatch. // The current state of the state model is intact. @@ -414,7 +442,7 @@ public class HelixStateTransitionHandler extends MessageHandler { + _stateModel.getClass(); logger.error(errorMessage); taskResult.setSuccess(false); - + taskResult.setInfo(errorMessage); _statusUpdateUtil .logError(message, HelixStateTransitionHandler.class, errorMessage, manager); } http://git-wip-us.apache.org/repos/asf/helix/blob/8dc19afb/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java index 337a933..3cca883 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java @@ -357,4 +357,4 @@ public class HelixTask implements MessageTask { } _isStarted = true; } -}; +} http://git-wip-us.apache.org/repos/asf/helix/blob/8dc19afb/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index bb55604..9734cc8 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -406,10 +406,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { // Check to see if dedicate thread pool for handling state transition messages is configured or provided. updateStateTransitionMessageThreadPool(message, manager); - LOG.info("Scheduling message: " + taskId); - // System.out.println("sched msg: " + message.getPartitionName() + "-" - // + message.getTgtName() + "-" + message.getFromState() + "-" - // + message.getToState()); + LOG.info("Scheduling message {}: {}:{}, {}->{}", taskId, message.getResourceName(), + message.getPartitionName(), message.getFromState(), message.getToState()); _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Message handling task scheduled", manager); @@ -867,12 +865,23 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { String messageTarget = getMessageTarget(message.getResourceName(), message.getPartitionName()); if (stateTransitionHandlers.containsKey(messageTarget)) { + // If there are 2 messages in same batch about same partition's state transition, + // the later one is discarded Message duplicatedMessage = stateTransitionHandlers.get(messageTarget)._message; throw new HelixException(String.format( - "Duplicated state transition message: %s. Existing: %s -> %s; New (Discarded): %s -> %s", + "Duplicated state transition message: %s. Existing: %s->%s; New (Discarded): %s->%s", message.getMsgId(), duplicatedMessage.getFromState(), duplicatedMessage.getToState(), message.getFromState(), message.getToState())); + } else if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) + && isStateTransitionInProgress(messageTarget)) { + // If there is another state transition for same partition is going on, + // discard the message. Controller will resend if this is a valid message + throw new HelixException(String + .format("Another state transition for %s:%s is in progress. Discarding %s->%s message", + message.getResourceName(), message.getPartitionName(), message.getFromState(), + message.getToState())); } + stateTransitionHandlers .put(getMessageTarget(message.getResourceName(), message.getPartitionName()), createHandler); @@ -959,6 +968,23 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { } } + /** + * Check if a state transition of the given message target is in progress. This function + * assumes the given message target corresponds to a state transition task + * + * @param messageTarget message target generated by getMessageTarget() + * @return true if there is a task going on with same message target else false + */ + private boolean isStateTransitionInProgress(String messageTarget) { + synchronized (_lock) { + if (_messageTaskMap.containsKey(messageTarget)) { + String taskId = _messageTaskMap.get(messageTarget); + return !_taskMap.get(taskId).getFuture().isDone(); + } + return false; + } + } + // Try to cancel this state transition that has not been started yet. // Three Types of Cancellation: 1. Message arrived with previous state transition // 2. Message handled but task not started http://git-wip-us.apache.org/repos/asf/helix/blob/8dc19afb/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java index ea2bb20..9db842d 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; + import com.google.common.collect.ImmutableList; import org.apache.helix.HelixConstants; @@ -94,7 +95,7 @@ public class TestHelixTaskExecutor { @Override public List<String> getMessageTypes() { - return Arrays.asList(new String[]{"TestingMessageHandler", Message.MessageType.STATE_TRANSITION.name()}); + return Collections.singletonList("TestingMessageHandler"); } @Override @@ -200,9 +201,16 @@ public class TestHelixTaskExecutor { class TestStateTransitionHandlerFactory implements MultiTypeMessageHandlerFactory { ConcurrentHashMap<String, String> _processedMsgIds = new ConcurrentHashMap<String, String>(); private final String _msgType; + private final long _delay; public TestStateTransitionHandlerFactory(String msgType) { + this(msgType, -1); + } + + public TestStateTransitionHandlerFactory(String msgType, long delay) { _msgType = msgType; + _delay = delay; } + class TestStateTransitionMessageHandler extends MessageHandler { public TestStateTransitionMessageHandler(Message message, NotificationContext context) { super(message, context); @@ -212,6 +220,10 @@ public class TestHelixTaskExecutor { public HelixTaskResult handleMessage() throws InterruptedException { HelixTaskResult result = new HelixTaskResult(); _processedMsgIds.put(_message.getMsgId(), _message.getMsgId()); + if (_delay > 0) { + System.out.println("Sleeping..." + _delay); + Thread.sleep(_delay); + } result.setSuccess(true); return result; } @@ -308,42 +320,55 @@ public class TestHelixTaskExecutor { HelixDataAccessor dataAccessor = manager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder(); - TestMessageHandlerFactory factory = new TestMessageHandlerFactory(); - for (String type : factory.getMessageTypes()) { - executor.registerMessageHandlerFactory(type, factory); - } + TestStateTransitionHandlerFactory stateTransitionFactory = + new TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), 1000); + executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(), + stateTransitionFactory); NotificationContext changeContext = new NotificationContext(manager); List<Message> msgList = new ArrayList<Message>(); - int nMsgs = 10; - String instanceName = "someInstance"; + int nMsgs = 3; + String instanceName = manager.getInstanceName(); for (int i = 0; i < nMsgs; i++) { Message msg = new Message(Message.MessageType.STATE_TRANSITION.name(), UUID.randomUUID().toString()); msg.setTgtSessionId(manager.getSessionId()); + msg.setCreateTimeStamp((long) i); msg.setTgtName("Localhost_1123"); msg.setSrcName("127.101.1.23_2234"); msg.setPartitionName("Partition"); msg.setResourceName("Resource"); + msg.setStateModelDef("DummyMasterSlave"); + msg.setFromState("SLAVE"); + msg.setToState("MASTER"); dataAccessor.setProperty(msg.getKey(keyBuilder, instanceName), msg); msgList.add(msg); } - System.out.println(dataAccessor.getChildNames(keyBuilder.messages(instanceName))); AssertJUnit .assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), nMsgs); changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE); executor.onMessage(instanceName, msgList, changeContext); - Thread.sleep(1000); + Thread.sleep(200); - // Will not be able to process state transition messages, but we shall verify that - // only 1 message is left over - AssertJUnit - .assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 1); + // only 1 message is left over - state transition takes 1sec + Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 1); + // While a state transition message is going on, another state transition message for same + // resource / partition comes in, it should be discarded by message handler + + // Mock accessor is modifying message state in memory so we set it back to NEW + msgList.get(2).setMsgState(MessageState.NEW); + dataAccessor.setProperty(msgList.get(2).getKey(keyBuilder, instanceName), msgList.get(2)); + executor.onMessage(instanceName, Arrays.asList(msgList.get(2)), changeContext); + Thread.sleep(200); + Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 1); + + Thread.sleep(1000); + Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 0); System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()"); }
