This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new f38915c Add default message handling retry count for state transition
messages. (#1514)
f38915c is described below
commit f38915cc11b18934ecad1450a75b6d890b351448
Author: Jiajun Wang <[email protected]>
AuthorDate: Wed Nov 11 17:21:37 2020 -0800
Add default message handling retry count for state transition messages.
(#1514)
The retry count will be counted when message handler creation fails or
state transition times out.
In addition, this PR also improves several ambiguous behaviors that may
block a normal retry.
1. The cache of the known messages list in HelixTaskExecutor may not be
updated correctly if a message is going to be retried.
2. The retry count is not strictly followed in some conditions.
3. The to-be-retried message is not automatically read when there are no
other new message changes. In this PR we change this behavior and ensure the
retry will be done even no other message created.
Finally, improve the tests to cover the new changes.
---
.../controller/stages/MessageGenerationPhase.java | 5 +
.../messaging/handling/HelixTaskExecutor.java | 100 +++++++---
.../TestStateTransitionAppFailureHandling.java | 203 +++++++++++++++++++++
.../messaging/handling/TestHelixTaskExecutor.java | 71 +++++--
4 files changed, 335 insertions(+), 44 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 8774812..b869ed8 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -70,6 +70,8 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60
* 1000);
private final static String PENDING_MESSAGE = "pending message";
private final static String STALE_MESSAGE = "stale message";
+ // TODO: Make the message retry count configurable through the Cluster
Config or IdealStates.
+ public final static int DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT = 3;
private static Logger logger =
LoggerFactory.getLogger(MessageGenerationPhase.class);
@@ -415,6 +417,9 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
message.setStateModelDef(stateModelDefName);
message.setStateModelFactoryName(resource.getStateModelFactoryname());
message.setBucketSize(resource.getBucketSize());
+ // Set the retry count for state transition messages.
+ // TODO: make the retry count configurable in ClusterConfig or IdealState
+ message.setRetryCount(DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
if (resource.getResourceGroupName() != null) {
message.setResourceGroupName(resource.getResourceGroupName());
diff --git
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index b3432d4..54566cc 100644
---
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -20,6 +20,7 @@ package org.apache.helix.messaging.handling;
*/
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -30,6 +31,7 @@ import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -521,14 +523,19 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
}
}
- private void updateMessageState(List<Message> readMsgs, HelixDataAccessor
accessor,
+ private void updateMessageState(Collection<Message> msgsToBeUpdated,
HelixDataAccessor accessor,
String instanceName) {
+ if (msgsToBeUpdated.isEmpty()) {
+ return;
+ }
+
Builder keyBuilder = accessor.keyBuilder();
- List<String> readMsgPaths = new ArrayList<>();
+ List<Message> updateMsgs = new ArrayList<>();
+ List<String> updateMsgPaths = new ArrayList<>();
List<DataUpdater<ZNRecord>> updaters = new ArrayList<>();
- for (Message msg : readMsgs) {
- readMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
- _knownMessageIds.add(msg.getId());
+ for (Message msg : msgsToBeUpdated) {
+ updateMsgs.add(msg);
+ updateMsgPaths.add(msg.getKey(keyBuilder, instanceName).getPath());
/**
* We use the updater to avoid race condition between writing message to
zk as READ state and removing message after ST is done
* If there is no message at this path, meaning the message is removed
so we do not write the message
@@ -543,7 +550,33 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
return msg.getRecord();
});
}
- accessor.updateChildren(readMsgPaths, updaters, AccessOption.PERSISTENT);
+ boolean[] updateResults =
+ accessor.updateChildren(updateMsgPaths, updaters,
AccessOption.PERSISTENT);
+
+ boolean isMessageUpdatedAsNew = false;
+ // Note that only cache the known message Ids after the update to ZK is
successfully done.
+ // This is to avoid inconsistent cache.
+ for (int i = 0; i < updateMsgs.size(); i++) {
+ Message msg = updateMsgs.get(i);
+ if (msg.getMsgState().equals(MessageState.NEW)) {
+ // If a message is updated as NEW state, then we might need to process
it again soon.
+ isMessageUpdatedAsNew = true;
+ // And it shall not be treated as a known messages.
+ } else {
+ _knownMessageIds.add(msg.getId());
+ if (!updateResults[i]) {
+ // TODO: If the message update fails, maybe we shall not treat the
message as a known
+ // TODO: message. We shall apply more strict check and retry the
update.
+ LOG.error("Failed to update the message {}.", msg.getMsgId());
+ }
+ }
+ }
+
+ if (isMessageUpdatedAsNew) {
+ // Sending a NO-OP message to trigger another message callback to
re-process the messages
+ // that are updated as NEW state.
+ sendNopMessage(accessor, instanceName);
+ }
}
private void shutdownAndAwaitTermination(ExecutorService pool) {
@@ -703,9 +736,7 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
return Collections.emptyList();
}
- // In case the cache contains any deleted message Id, clean up
- _knownMessageIds.retainAll(messageIds);
-
+ // Avoid reading the already known messages.
messageIds.removeAll(_knownMessageIds);
List<PropertyKey> keys = new ArrayList<>();
for (String messageId : messageIds) {
@@ -794,7 +825,7 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
List<NotificationContext> nonStateTransitionContexts = new ArrayList<>();
// message to be updated in ZK
- List<Message> msgsToBeUpdated = new ArrayList<>();
+ Map<String, Message> msgsToBeUpdated = new HashMap<>();
String sessionId = manager.getSessionId();
List<String> curResourceNames =
@@ -821,32 +852,29 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
LOG.error(
"Exception happens when creating Message Handler for message {}.
Current remaining retry count is {}.",
message.getMsgId(), remainingRetryCount);
- // Set the message retry count to avoid infinite retrying.
+ // Reduce the message retry count to avoid infinite retrying.
message.setRetryCount(remainingRetryCount - 1);
message.setExecuteSessionId(sessionId);
- // continue processing in the next section where handler object is
double-checked.
- }
-
- if (msgHandler == null) {
// Note that we are re-using the retry count of Message that was
original designed to control
// timeout retries. So it is not checked before the first try in order
to ensure consistent
// behavior. It is possible that we introduce a new behavior for this
method. But it requires
// us to split the configuration item so as to avoid confusion.
- if (message.getRetryCount() < 0) {
+ if (message.getRetryCount() <= 0) {
// If no more retry count remains, then mark the message to be
UNPROCESSABLE.
- String errorMsg = String
- .format("No available message Handler found!"
- + " Stop processing message %s since it has a negative
remaining retry count %d!",
- message.getMsgId(), message.getRetryCount());
+ String errorMsg = String.format("No available message Handler found!"
+ + " Stop processing message %s since it has zero or negative
remaining retry count %d!",
+ message.getMsgId(), message.getRetryCount());
updateUnprocessableMessage(message, null, errorMsg, manager);
- msgsToBeUpdated.add(message);
- } else {
- // Skip processing this message in this callback. The same message
process will be retried
- // in the next round.
- LOG.warn("There is no existing handler for message {}."
- + " Skip processing it for now. Will retry on the next
callback.",
- message.getMsgId());
}
+ msgsToBeUpdated.put(message.getId(), message);
+ // continue processing in the next section where handler object is
double-checked.
+ }
+
+ if (msgHandler == null) {
+ // Skip processing this message in this callback. The same message
process will be retried
+ // in the next round if retry count > 0.
+ LOG.warn("There is no existing handler for message {}."
+ + " Skip processing it for now. Will retry on the next callback.",
message.getMsgId());
continue;
}
@@ -872,7 +900,8 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
}
// Update the normally processed messages
- msgsToBeUpdated.add(markReadMessage(message, msgWorkingContext,
manager));
+ Message markedMsg = markReadMessage(message, msgWorkingContext, manager);
+ msgsToBeUpdated.put(markedMsg.getId(), markedMsg);
// batch creation of all current state meta data
// do it for non-controller and state transition messages only
@@ -910,7 +939,7 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
}
// update message state in batch and schedule tasks for all read messages
- updateMessageState(msgsToBeUpdated, accessor, instanceName);
+ updateMessageState(msgsToBeUpdated.values(), accessor, instanceName);
for (Map.Entry<String, MessageHandler> handlerEntry :
stateTransitionHandlers.entrySet()) {
MessageHandler handler = handlerEntry.getValue();
@@ -1288,6 +1317,19 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
}
}
+ private void sendNopMessage(HelixDataAccessor accessor, String instanceName)
{
+ try {
+ Message nopMsg = new Message(MessageType.NO_OP,
UUID.randomUUID().toString());
+ nopMsg.setSrcName(instanceName);
+ nopMsg.setTgtName(instanceName);
+ accessor
+ .setProperty(accessor.keyBuilder().message(nopMsg.getTgtName(),
nopMsg.getId()), nopMsg);
+ LOG.info("Send NO_OP message to {}}, msgId: {}.", nopMsg.getTgtName(),
nopMsg.getId());
+ } catch (Exception e) {
+ LOG.error("Failed to send NO_OP message to {}.", instanceName, e);
+ }
+ }
+
@Override
public void shutdown() {
LOG.info("Shutting down HelixTaskExecutor");
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java
new file mode 100644
index 0000000..e3ac398
--- /dev/null
+++
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionAppFailureHandling.java
@@ -0,0 +1,203 @@
+package org.apache.helix.integration.paticipant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixException;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.mock.participant.MockMSStateModel;
+import org.apache.helix.mock.participant.MockTransition;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestStateTransitionAppFailureHandling extends
ZkStandAloneCMTestBase {
+ private static Logger LOG =
LoggerFactory.getLogger(TestStateTransitionAppFailureHandling.class);
+ private final static int REPLICAS = 3;
+
+ @Override
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ super.beforeClass();
+ // Clean up the resource that is created in the super cluster beforeClass
method.
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, TEST_DB);
+ _clusterVerifier.verifyByPolling();
+ }
+
+ public static class RetryStateModelFactory extends
StateModelFactory<MockMSStateModel> {
+ int _retryCountUntilSucceed;
+
+ public RetryStateModelFactory(int retryCountUntilSucceed) {
+ _retryCountUntilSucceed = retryCountUntilSucceed;
+ }
+
+ public int getRemainingRetryCountUntilSucceed() {
+ return _retryCountUntilSucceed;
+ }
+
+ @Override
+ public MockMSStateModel createNewStateModel(String resource, String
stateUnitKey) {
+ if (_retryCountUntilSucceed > 0) {
+ _retryCountUntilSucceed--;
+ throw new HelixException("You Shall Not PASS!!!");
+ } else {
+ return new MockMSStateModel(new MockTransition());
+ }
+ }
+ }
+
+ @Test
+ public void testSTHandlerInitFailureRetry() throws Exception {
+ int retryCountUntilSucceed =
+ Integer.MAX_VALUE; // ensure the retry count is large so the message
retry will fail.
+ Map<String, RetryStateModelFactory> retryFactoryMap =
resetParticipants(retryCountUntilSucceed);
+
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS,
STATE_MODEL);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, REPLICAS);
+
+ HelixDataAccessor accessor = _controller.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ // Verify and wait until all messages have been retried and failed.
+ Map<String, List<Message>> partitionMessageMap = new HashMap<>();
+ Assert.assertTrue(TestHelper.verify(() -> {
+ int totalMessageCount = 0;
+ for (int i = 0; i < NODE_NR; i++) {
+ String instanceName = _participants[i].getInstanceName();
+ List<Message> messageList = accessor.getProperty(
+ accessor.getChildNames(keyBuilder.messages(instanceName)).stream()
+ .map(childName -> keyBuilder.message(instanceName, childName))
+ .collect(Collectors.toList()), true);
+ for (Message message : messageList) {
+ if (message.getMsgState() != Message.MessageState.UNPROCESSABLE) {
+ return false;
+ }
+ }
+ partitionMessageMap.put(instanceName, messageList);
+ totalMessageCount += messageList.size();
+ }
+ return totalMessageCount == _PARTITIONS * REPLICAS;
+ }, TestHelper.WAIT_DURATION));
+
+ // Verify that the correct numbers of retry has been done on each node.
+ for (String instanceName : partitionMessageMap.keySet()) {
+ List<Message> instanceMessages = partitionMessageMap.get(instanceName);
+ for (Message message : instanceMessages) {
+ Assert.assertTrue(message.getRetryCount() <= 0);
+ Assert.assertEquals(message.getMsgState(),
Message.MessageState.UNPROCESSABLE);
+ }
+ // Check if the factory has tried enough times before fail the message.
+ Assert.assertEquals(retryCountUntilSucceed -
retryFactoryMap.get(instanceName)
+ .getRemainingRetryCountUntilSucceed(), instanceMessages.size()
+ *
MessageGenerationPhase.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT);
+ }
+
+ // Verify that the partition is not initialized.
+ for (int i = 0; i < NODE_NR; i++) {
+ String instanceName = _participants[i].getInstanceName();
+ String sessionId = _participants[i].getSessionId();
+ List<CurrentState> currentStates = accessor.getProperty(
+ accessor.getChildNames(keyBuilder.currentStates(instanceName,
sessionId)).stream()
+ .map(childName -> keyBuilder.currentState(instanceName,
sessionId, childName))
+ .collect(Collectors.toList()), true);
+ for (CurrentState currentState : currentStates) {
+ Assert.assertTrue(currentState.getPartitionStateMap().isEmpty());
+ }
+ }
+
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, TEST_DB);
+ }
+
+ @Test(dependsOnMethods = "testSTHandlerInitFailureRetry")
+ public void testSTHandlerInitFailureRetrySucceed() {
+ // Make the mock StateModelFactory return handler before last retry. So it
will successfully
+ // finish handler initialization.
+ int retryCountUntilSucceed =
+ MessageGenerationPhase.DEFAULT_STATE_TRANSITION_MESSAGE_RETRY_COUNT -
1;
+ Map<String, RetryStateModelFactory> retryFactoryMap =
resetParticipants(retryCountUntilSucceed);
+
+ _gSetupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB, _PARTITIONS,
STATE_MODEL);
+ _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, REPLICAS);
+
+ HelixDataAccessor accessor = _controller.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ // Verify and wait until all messages have been processed and the cluster
is stable.
+ Assert.assertTrue(_clusterVerifier.verifyByPolling());
+
+ // Verify that the partition is not in error state. And all messages has
been completed.
+ for (int i = 0; i < NODE_NR; i++) {
+ String instanceName = _participants[i].getInstanceName();
+ String sessionId = _participants[i].getSessionId();
+
+ List<Message> messageList = accessor.getProperty(
+ accessor.getChildNames(keyBuilder.messages(instanceName)).stream()
+ .map(childName -> keyBuilder.message(instanceName, childName))
+ .collect(Collectors.toList()), true);
+ Assert.assertTrue(messageList.isEmpty());
+
+ List<CurrentState> currentStates = accessor.getProperty(
+ accessor.getChildNames(keyBuilder.currentStates(instanceName,
sessionId)).stream()
+ .map(childName -> keyBuilder.currentState(instanceName,
sessionId, childName))
+ .collect(Collectors.toList()), true);
+ for (CurrentState currentState : currentStates) {
+ Assert.assertTrue(currentState.getPartitionStateMap().values().stream()
+ .allMatch(state -> !state.equals(HelixDefinedState.ERROR.name())));
+ }
+ // The factory should has 0 remaining "retryCountUntilSucceed".
+ Assert
+
.assertEquals(retryFactoryMap.get(instanceName).getRemainingRetryCountUntilSucceed(),
0);
+ }
+
+ _gSetupTool.dropResourceFromCluster(CLUSTER_NAME, TEST_DB);
+ }
+
+ private Map<String, RetryStateModelFactory> resetParticipants(int
retryCountUntilSucceed) {
+ Map<String, RetryStateModelFactory> retryFactoryMap = new HashMap<>();
+ for (int i = 0; i < NODE_NR; i++) {
+ if (_participants[i] != null && _participants[i].isConnected()) {
+ _participants[i].syncStop();
+ }
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
instanceName);
+ RetryStateModelFactory factory = new
RetryStateModelFactory(retryCountUntilSucceed);
+ retryFactoryMap.put(instanceName, factory);
+
_participants[i].getStateMachineEngine().registerStateModelFactory("MasterSlave",
factory);
+ _participants[i].syncStart();
+ }
+ return retryFactoryMap;
+ }
+}
diff --git
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index e285132..59d6b06 100644
---
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -557,21 +558,19 @@ public class TestHelixTaskExecutor {
}
@Test()
- public void testCreateHandlerException() throws InterruptedException {
+ public void testCreateHandlerException() throws Exception {
System.out.println("START
TestCMTaskExecutor.testCreateHandlerException()");
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
-
- TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
- for (String type : factory.getMessageTypes()) {
- executor.registerMessageHandlerFactory(type, factory);
- }
-
NotificationContext changeContext = new NotificationContext(manager);
+ TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
+ // Sending message without registering the factory.
+ // The message won't be processed since creating handler returns null.
int nMsgs1 = 5;
+ List<Message> msgList = new ArrayList<>();
for (int i = 0; i < nMsgs1; i++) {
Message msg = new Message(factory.getMessageTypes().get(0),
UUID.randomUUID().toString());
msg.setTgtSessionId(manager.getSessionId());
@@ -579,7 +578,26 @@ public class TestHelixTaskExecutor {
msg.setSrcName("127.101.1.23_2234");
msg.setCorrelationId(UUID.randomUUID().toString());
dataAccessor.setProperty(keyBuilder.message(manager.getInstanceName(),
msg.getMsgId()), msg);
+ msgList.add(msg);
+ }
+
+ changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+ executor.onMessage(manager.getInstanceName(), Collections.emptyList(),
changeContext);
+
+ for (Message message : msgList) {
+ message = dataAccessor
+ .getProperty(keyBuilder.message(manager.getInstanceName(),
message.getMsgId()));
+ Assert.assertNotNull(message);
+ Assert.assertEquals(message.getMsgState(), MessageState.NEW);
+ Assert.assertEquals(message.getRetryCount(), 0);
}
+
+ // Test with a factory that throws Exception on certain message. The
invalid message will be
+ // remain UNPROCESSABLE due to the Exception.
+ for (String type : factory.getMessageTypes()) {
+ executor.registerMessageHandlerFactory(type, factory);
+ }
+
Message exceptionMsg =
new Message(factory.getMessageTypes().get(0),
UUID.randomUUID().toString());
exceptionMsg.setTgtSessionId(manager.getSessionId());
@@ -593,16 +611,30 @@ public class TestHelixTaskExecutor {
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
executor.onMessage(manager.getInstanceName(), Collections.emptyList(),
changeContext);
- Thread.sleep(1000);
-
+ Assert.assertTrue(TestHelper.verify(() -> {
+ Message tmpExceptionMsg = dataAccessor
+ .getProperty(keyBuilder.message(manager.getInstanceName(),
exceptionMsg.getMsgId()));
+ if (tmpExceptionMsg == null || !tmpExceptionMsg.getMsgState()
+ .equals(MessageState.UNPROCESSABLE) ||
tmpExceptionMsg.getRetryCount() != -1) {
+ return false;
+ }
+ return true;
+ }, TestHelper.WAIT_DURATION),
+ "The exception message should be retied once and in UNPROCESSABLE
state.");
+
+ Assert.assertTrue(TestHelper.verify(() -> {
+ for (Message message : msgList) {
+ message = dataAccessor
+ .getProperty(keyBuilder.message(manager.getInstanceName(),
message.getMsgId()));
+ if (message != null) {
+ return false;
+ }
+ }
+ return true;
+ }, TestHelper.WAIT_DURATION), "The normal messages should be all processed
normally.");
Assert.assertEquals(factory._processedMsgIds.size(), nMsgs1);
Assert.assertEquals(factory._handlersCreated, nMsgs1);
- exceptionMsg = dataAccessor
- .getProperty(keyBuilder.message(manager.getInstanceName(),
exceptionMsg.getMsgId()));
- Assert.assertNotNull(exceptionMsg);
- Assert.assertEquals(exceptionMsg.getMsgState(),
MessageState.UNPROCESSABLE);
- Assert.assertEquals(exceptionMsg.getRetryCount(), -1);
System.out.println("END TestCMTaskExecutor.testCreateHandlerException()");
}
@@ -916,12 +948,16 @@ public class TestHelixTaskExecutor {
msg.setSrcName("127.101.1.23_2234");
msg.setCorrelationId(UUID.randomUUID().toString());
accessor.setProperty(keyBuilder.message(instanceName, msg.getId()), msg);
+
messageIds.add(msg.getId());
+ // Set for testing the update operation later
+ msg.setMsgState(MessageState.READ);
messages.add(msg);
}
Method updateMessageState = HelixTaskExecutor.class
- .getDeclaredMethod("updateMessageState", List.class,
HelixDataAccessor.class, String.class);
+ .getDeclaredMethod("updateMessageState", Collection.class,
HelixDataAccessor.class,
+ String.class);
updateMessageState.setAccessible(true);
updateMessageState.invoke(executor, messages, accessor, instanceName);
@@ -929,6 +965,11 @@ public class TestHelixTaskExecutor {
accessor.removeProperty(keyBuilder.message(instanceName,
messageIds.get(0)));
System.out.println(accessor.getChildNames(keyBuilder.messages(instanceName)).size());
+
+ for (Message message : messages) {
+ // Mock a change to ensure there will be some delta on the message node
after update
+ message.setCorrelationId(UUID.randomUUID().toString());
+ }
updateMessageState.invoke(executor, messages, accessor, instanceName);
Assert
.assertEquals(accessor.getChildNames(keyBuilder.messages(instanceName)).size(),
nMsgs1 - 1);