This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new f11396e Refine the message handler error handling logic to avoid
unnecessary retry. (#1500)
f11396e is described below
commit f11396e5feebe20552d259e553342c17a8573a8e
Author: Jiajun Wang <[email protected]>
AuthorDate: Thu Nov 5 09:18:57 2020 -0800
Refine the message handler error handling logic to avoid unnecessary retry.
(#1500)
This PR enhances the message event processing logic to prevent silent
failure and unnecessary retry.
1. If creating a message handler fails unexpectedly (meaning there is an
Exception), then the message will be marked as UNPROCESSABLE unless the message
is sent with a retry count larger than 0. When the retry count is configured,
then before the message runs out of the retry count, the participant will keep
retrying on any message callbacks.
2. The UNPROCESSABLE message, which is generated due to the previous point,
will be left in the participant message folder and not automatically removed.
This is to prevent unnecessary retry.
3. If the message handler fails due to the participant cannot schedule the
task, then the message will be discarded. If the message is a state transition
message, then the corresponding state model and the partition current state
will be set to ERROR. This is also to prevent unnecessary retry.
---
.../messaging/handling/HelixTaskExecutor.java | 230 +++++++++++++--------
.../messaging/handling/MessageHandlerFactory.java | 8 +
.../main/java/org/apache/helix/model/Message.java | 6 +-
.../messaging/handling/TestHelixTaskExecutor.java | 135 ++++++++----
4 files changed, 254 insertions(+), 125 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index b039afd..424d96d 100644
---
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -613,7 +613,12 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
MsgHandlerFactoryRegistryItem item = _hdlrFtyRegistry.get(msgType);
if (item.factory() != null) {
- item.factory().reset();
+ try {
+ item.factory().reset();
+ } catch (Exception ex) {
+ LOG.error("Failed to reset the factory {} of message type {}.",
item.factory().toString(),
+ msgType, ex);
+ }
}
}
// threads pool specific to STATE_TRANSITION.Key specific pool are not
shut down.
@@ -788,8 +793,8 @@ public class HelixTaskExecutor implements MessageListener,
TaskExecutor {
List<MessageHandler> nonStateTransitionHandlers = new ArrayList<>();
List<NotificationContext> nonStateTransitionContexts = new ArrayList<>();
- // message read
- List<Message> readMsgs = new ArrayList<>();
+ // message to be updated in ZK
+ List<Message> msgsToBeUpdated = new ArrayList<>();
String sessionId = manager.getSessionId();
List<String> curResourceNames =
@@ -804,40 +809,71 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
// skip the following operations for the no-op messages.
continue;
}
- // create message handlers, if handlers not found, leave its state as NEW
+
NotificationContext msgWorkingContext = changeContext.clone();
+ MessageHandler msgHandler = null;
try {
- MessageHandler msgHandler = createMessageHandler(message,
msgWorkingContext);
- if (msgHandler == null) {
- // Failed to create message handler, skip processing this message in
this callback.
- // The same message process will be retried in the next round.
- continue;
- }
- if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
|| message.getMsgType()
- .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
- if (validateAndProcessStateTransitionMessage(message, instanceName,
manager,
- stateTransitionHandlers, msgHandler)) {
- // Need future process by triggering state transition
- String msgTarget =
- getMessageTarget(message.getResourceName(),
message.getPartitionName());
- stateTransitionHandlers.put(msgTarget, msgHandler);
- stateTransitionContexts.put(msgTarget, msgWorkingContext);
- } else {
- // skip the following operations for the invalid/expired state
transition messages.
- continue;
- }
+ // create message handlers, if handlers not found but no exception,
leave its state as NEW
+ msgHandler = createMessageHandler(message, msgWorkingContext);
+ } catch (Exception ex) {
+ // Failed to create message handler and there is an Exception.
+ int remainingRetryCount = message.getRetryCount();
+ LOG.error(
+ "Exception happens when creating Message Handler for message {}.
Current remaining retry count is {}.",
+ message.getMsgId(), remainingRetryCount);
+ // Set the message retry count to avoid infinite retrying.
+ message.setRetryCount(remainingRetryCount - 1);
+ message.setExecuteSessionId(sessionId);
+ // continue processing in the next section where handler object is
double-checked.
+ }
+
+ if (msgHandler == null) {
+ // Note that we are re-using the retry count of Message that was
original designed to control
+ // timeout retries. So it is not checked before the first try in order
to ensure consistent
+ // behavior. It is possible that we introduce a new behavior for this
method. But it requires
+ // us to split the configuration item so as to avoid confusion.
+ if (message.getRetryCount() < 0) {
+ // If no more retry count remains, then mark the message to be
UNPROCESSABLE.
+ String errorMsg = String
+ .format("No available message Handler found!"
+ + " Stop processing message %s since it has a negative
remaining retry count %d!",
+ message.getMsgId(), message.getRetryCount());
+ updateUnprocessableMessage(message, null, errorMsg, manager);
+ msgsToBeUpdated.add(message);
} else {
- // Need future process non state transition messages by triggering
the handler
- nonStateTransitionHandlers.add(msgHandler);
- nonStateTransitionContexts.add(msgWorkingContext);
+ // Skip processing this message in this callback. The same message
process will be retried
+ // in the next round.
+ LOG.warn("There is no existing handler for message {}."
+ + " Skip processing it for now. Will retry on the next
callback.",
+ message.getMsgId());
}
- } catch (Exception e) {
- handleUnprocessableMessage(message, e, e.getMessage(), accessor,
instanceName, manager);
continue;
}
- // Update the processed message objects
- readMsgs.add(markReadMessage(message, msgWorkingContext, manager));
+ if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name()) ||
message.getMsgType()
+ .equals(MessageType.STATE_TRANSITION_CANCELLATION.name())) {
+ if (validateAndProcessStateTransitionMessage(message, manager,
stateTransitionHandlers,
+ msgHandler)) {
+ // Need future process by triggering state transition
+ String msgTarget =
+ getMessageTarget(message.getResourceName(),
message.getPartitionName());
+ stateTransitionHandlers.put(msgTarget, msgHandler);
+ stateTransitionContexts.put(msgTarget, msgWorkingContext);
+ } else {
+ // Skip the following operations for the invalid/expired state
transition messages.
+ // Also remove the message since it might block the other state
transition messages.
+ removeMessageFromZK(accessor, message, instanceName);
+ continue;
+ }
+ } else {
+ // Need future process non state transition messages by triggering the
handler
+ nonStateTransitionHandlers.add(msgHandler);
+ nonStateTransitionContexts.add(msgWorkingContext);
+ }
+
+ // Update the normally processed messages
+ msgsToBeUpdated.add(markReadMessage(message, msgWorkingContext,
manager));
+
// batch creation of all current state meta data
// do it for non-controller and state transition messages only
if (!message.isControlerMsg() && message.getMsgType()
@@ -869,25 +905,34 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
try {
accessor.createChildren(createCurStateKeys, metaCurStates);
} catch (Exception e) {
- LOG.error("fail to create cur-state znodes for messages: " + readMsgs,
e);
+ LOG.error("fail to create cur-state znodes for messages: " +
msgsToBeUpdated, e);
}
}
- // update message state to READ in batch and schedule tasks for all read
messages
- if (readMsgs.size() > 0) {
- updateMessageState(readMsgs, accessor, instanceName);
+ // update message state in batch and schedule tasks for all read messages
+ updateMessageState(msgsToBeUpdated, accessor, instanceName);
- for (Map.Entry<String, MessageHandler> handlerEntry :
stateTransitionHandlers.entrySet()) {
- MessageHandler handler = handlerEntry.getValue();
- NotificationContext context =
stateTransitionContexts.get(handlerEntry.getKey());
- scheduleTaskForMessage(instanceName, accessor, handler, context);
+ for (Map.Entry<String, MessageHandler> handlerEntry :
stateTransitionHandlers.entrySet()) {
+ MessageHandler handler = handlerEntry.getValue();
+ NotificationContext context =
stateTransitionContexts.get(handlerEntry.getKey());
+ if (!scheduleTaskForMessage(instanceName, accessor, handler, context)) {
+ try {
+ // Record error state to the message handler.
+ handler.onError(new HelixException(String
+ .format("Failed to schedule the task for executing message
handler for %s.",
+ handler._message.getMsgId())),
MessageHandler.ErrorCode.ERROR,
+ MessageHandler.ErrorType.FRAMEWORK);
+ } catch (Exception ex) {
+ LOG.error("Failed to trigger onError method of the message handler
for {}",
+ handler._message.getMsgId(), ex);
+ }
}
+ }
- for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
- MessageHandler handler = nonStateTransitionHandlers.get(i);
- NotificationContext context = nonStateTransitionContexts.get(i);
- scheduleTaskForMessage(instanceName, accessor, handler, context);
- }
+ for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
+ MessageHandler handler = nonStateTransitionHandlers.get(i);
+ NotificationContext context = nonStateTransitionContexts.get(i);
+ scheduleTaskForMessage(instanceName, accessor, handler, context);
}
}
@@ -993,67 +1038,78 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
* Preprocess the state transition message to validate if the request is
valid.
* If no operation needs to be triggered, discard the the message.
* @param message
- * @param instanceName
* @param manager
* @param stateTransitionHandlers
* @param createHandler
* @return True if the requested state transition is valid, and need to
schedule the transition.
* False if no more operation is required.
*/
- private boolean validateAndProcessStateTransitionMessage(Message message,
String instanceName,
- HelixManager manager, Map<String, MessageHandler>
stateTransitionHandlers,
- MessageHandler createHandler) {
- HelixDataAccessor accessor = manager.getHelixDataAccessor();
-
+ private boolean validateAndProcessStateTransitionMessage(Message message,
HelixManager manager,
+ Map<String, MessageHandler> stateTransitionHandlers, MessageHandler
createHandler) {
String messageTarget = getMessageTarget(message.getResourceName(),
message.getPartitionName());
- if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
- && isStateTransitionInProgress(messageTarget)) {
- String taskId = _messageTaskMap.get(messageTarget);
- Message msg = _taskMap.get(taskId).getTask().getMessage();
- // If there is another state transition for same partition is going on,
- // discard the message. Controller will resend if this is a valid message
- String errMsg = String.format(
- "Another state transition for %s:%s is in progress with msg: %s,
p2p: %s, read: %d, current:%d. Discarding %s->%s message",
- message.getResourceName(), message.getPartitionName(),
msg.getMsgId(),
- msg.isRelayMessage(), msg.getReadTimeStamp(),
System.currentTimeMillis(),
- message.getFromState(), message.getToState());
- handleUnprocessableMessage(message, null /* exception */, errMsg,
accessor, instanceName,
- manager);
- return false;
- }
- if (createHandler instanceof HelixStateTransitionHandler) {
- // We only check to state if there is no ST task scheduled/executing.
- HelixStateTransitionHandler.StaleMessageValidateResult result =
- ((HelixStateTransitionHandler)
createHandler).staleMessageValidator();
- if (!result.isValid) {
- handleUnprocessableMessage(message, null /* exception */,
result.exception.getMessage(),
- accessor, instanceName, manager);
+
+ try {
+ if (message.getMsgType().equals(MessageType.STATE_TRANSITION.name())
+ && isStateTransitionInProgress(messageTarget)) {
+ String taskId = _messageTaskMap.get(messageTarget);
+ Message msg = _taskMap.get(taskId).getTask().getMessage();
+ // If there is another state transition for same partition is going on,
+ // discard the message. Controller will resend if this is a valid
message
+ String errMsg = String.format(
+ "Another state transition for %s:%s is in progress with msg: %s,
p2p: %s, read: %d, current:%d. Discarding %s->%s message",
+ message.getResourceName(), message.getPartitionName(),
msg.getMsgId(),
+ msg.isRelayMessage(), msg.getReadTimeStamp(),
System.currentTimeMillis(),
+ message.getFromState(), message.getToState());
+ updateUnprocessableMessage(message, null /* exception */, errMsg,
manager);
return false;
}
- }
- if (stateTransitionHandlers.containsKey(messageTarget)) {
- // If there are 2 messages in same batch about same partition's state
transition,
- // the later one is discarded
- Message duplicatedMessage =
stateTransitionHandlers.get(messageTarget)._message;
- String errMsg = String.format(
- "Duplicated state transition message: %s. Existing: %s->%s; New
(Discarded): %s->%s",
- message.getMsgId(), duplicatedMessage.getFromState(),
duplicatedMessage.getToState(),
- message.getFromState(), message.getToState());
- handleUnprocessableMessage(message, null /* exception */, errMsg,
accessor, instanceName,
+ if (createHandler instanceof HelixStateTransitionHandler) {
+ // We only check to state if there is no ST task scheduled/executing.
+ HelixStateTransitionHandler.StaleMessageValidateResult result =
+ ((HelixStateTransitionHandler)
createHandler).staleMessageValidator();
+ if (!result.isValid) {
+ updateUnprocessableMessage(message, null /* exception */,
result.exception.getMessage(),
+ manager);
+ return false;
+ }
+ }
+ if (stateTransitionHandlers.containsKey(messageTarget)) {
+ // If there are 2 messages in same batch about same partition's state
transition,
+ // the later one is discarded
+ Message duplicatedMessage =
stateTransitionHandlers.get(messageTarget)._message;
+ String errMsg = String.format(
+ "Duplicated state transition message: %s. Existing: %s->%s; New
(Discarded): %s->%s",
+ message.getMsgId(), duplicatedMessage.getFromState(),
duplicatedMessage.getToState(),
+ message.getFromState(), message.getToState());
+ updateUnprocessableMessage(message, null /* exception */, errMsg,
manager);
+ return false;
+ }
+ return true;
+ } catch (Exception ex) {
+ updateUnprocessableMessage(message, ex, "State transition validation
failed with Exception.",
manager);
return false;
}
- return true;
}
- private void scheduleTaskForMessage(String instanceName, HelixDataAccessor
accessor,
+ /**
+ * Schedule task to execute the message handler.
+ * @param instanceName
+ * @param accessor
+ * @param handler
+ * @param context
+ * @return True if schedule the task successfully. False otherwise.
+ */
+ private boolean scheduleTaskForMessage(String instanceName,
HelixDataAccessor accessor,
MessageHandler handler, NotificationContext context) {
Message msg = handler._message;
if (!scheduleTask(new HelixTask(msg, context, handler, this))) {
// Remove message if schedule tasks are failed.
removeMessageFromTaskAndFutureMap(msg);
removeMessageFromZK(accessor, msg, instanceName);
+ return false;
}
+ return true;
}
/**
@@ -1151,8 +1207,8 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
return message;
}
- private void handleUnprocessableMessage(Message message, Exception
exception, String errorMsg,
- HelixDataAccessor accessor, String instanceName, HelixManager manager) {
+ private void updateUnprocessableMessage(Message message, Exception
exception, String errorMsg,
+ HelixManager manager) {
String error = "Message " + message.getMsgId() + " cannot be processed: "
+ message.getRecord();
if (exception != null) {
LOG.error(error, exception);
@@ -1162,9 +1218,7 @@ public class HelixTaskExecutor implements
MessageListener, TaskExecutor {
_statusUpdateUtil.logError(message, HelixStateMachineEngine.class,
errorMsg, manager);
}
message.setMsgState(MessageState.UNPROCESSABLE);
- removeMessageFromZK(accessor, message, instanceName);
- _monitor
- .reportProcessedMessage(message,
ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
+ _monitor.reportProcessedMessage(message, ProcessedMessageState.FAILED);
}
public MessageHandler createMessageHandler(Message message,
NotificationContext changeContext) {
diff --git
a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
index ce8ffcf..acbeadc 100644
---
a/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
+++
b/helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
@@ -24,6 +24,14 @@ import org.apache.helix.model.Message;
@Deprecated
public interface MessageHandlerFactory {
+
+ /**
+ * Create the message handler for processing the message task.
+ * @param message
+ * @param context
+ * @return message handler object.
+ * Or null if the message cannot be processed given the current
status.
+ */
MessageHandler createHandler(Message message, NotificationContext context);
String getMessageType();
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java
b/helix-core/src/main/java/org/apache/helix/model/Message.java
index c81a494..b509506 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -597,7 +597,8 @@ public class Message extends HelixProperty {
}
/**
- * Set the number of times to retry message handling on timeouts
+ * Set the number of times to retry message handling on timeouts and other
unexpected conditions
+ * with Exceptions. For example, the participant failed to create message
handler.
* @param retryCount maximum number of retries
*/
public void setRetryCount(int retryCount) {
@@ -605,7 +606,8 @@ public class Message extends HelixProperty {
}
/**
- * Get the number of times to retry message handling on timeouts
+ * Set the number of times to retry message handling on timeouts and other
unexpected conditions
+ * with Exceptions. For example, the participant failed to create message
handler.
* @return maximum number of retries
*/
public int getRetryCount() {
diff --git
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
index 706284d..e285132 100644
---
a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
+++
b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java
@@ -32,21 +32,36 @@ import java.util.concurrent.ExecutorService;
import com.google.common.collect.ImmutableList;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
import org.apache.helix.mock.MockManager;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestHelixTaskExecutor {
- public static class MockClusterManager extends MockManager {
+ @BeforeClass
+ public void beforeClass() {
+ System.out.println("START " + TestHelper.getTestClassName());
+ }
+ @AfterClass
+ public void afterClass() {
+ System.out.println("End " + TestHelper.getTestClassName());
+ }
+
+ public static class MockClusterManager extends MockManager {
@Override
public String getSessionId() {
return "123";
@@ -60,7 +75,6 @@ public class TestHelixTaskExecutor {
class TestMessageHandler extends MessageHandler {
public TestMessageHandler(Message message, NotificationContext context) {
super(message, context);
- // TODO Auto-generated constructor stub
}
@Override
@@ -74,14 +88,12 @@ public class TestHelixTaskExecutor {
@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
- // TODO Auto-generated method stub
}
}
@Override
public MessageHandler createHandler(Message message, NotificationContext
context) {
- // TODO Auto-generated method stub
if (message.getMsgSubType() != null &&
message.getMsgSubType().equals("EXCEPTION")) {
throw new HelixException("Test Message handler exception, can ignore");
}
@@ -91,7 +103,6 @@ public class TestHelixTaskExecutor {
@Override
public String getMessageType() {
- // TODO Auto-generated method stub
return "TestingMessageHandler";
}
@@ -102,7 +113,6 @@ public class TestHelixTaskExecutor {
@Override
public void reset() {
- // TODO Auto-generated method stub
}
}
@@ -110,13 +120,11 @@ public class TestHelixTaskExecutor {
class TestMessageHandlerFactory2 extends TestMessageHandlerFactory {
@Override
public String getMessageType() {
- // TODO Auto-generated method stub
return "TestingMessageHandler2";
}
@Override
public List<String> getMessageTypes() {
- // TODO Auto-generated method stub
return ImmutableList.of("TestingMessageHandler2");
}
@@ -132,7 +140,6 @@ public class TestHelixTaskExecutor {
class CancellableHandler extends MessageHandler {
public CancellableHandler(Message message, NotificationContext context) {
super(message, context);
- // TODO Auto-generated constructor stub
}
public boolean _interrupted = false;
@@ -168,21 +175,18 @@ public class TestHelixTaskExecutor {
@Override
public void onError(Exception e, ErrorCode code, ErrorType type) {
- // TODO Auto-generated method stub
_message.getRecord().setSimpleField("exception", e.getMessage());
}
}
@Override
public MessageHandler createHandler(Message message, NotificationContext
context) {
- // TODO Auto-generated method stub
_handlersCreated++;
return new CancellableHandler(message, context);
}
@Override
public String getMessageType() {
- // TODO Auto-generated method stub
return "Cancellable";
}
@@ -192,7 +196,6 @@ public class TestHelixTaskExecutor {
@Override
public void reset() {
- // TODO Auto-generated method stub
_handlersCreated = 0;
_processedMsgIds.clear();
_processingMsgIds.clear();
@@ -217,7 +220,11 @@ public class TestHelixTaskExecutor {
public TestStateTransitionMessageHandler(Message message,
NotificationContext context,
CurrentState currentStateDelta) {
- super(null, null, message, context, currentStateDelta);
+ super(new StateModelFactory<StateModel>() {
+ // Empty no-op state model factory is good enough for the test.
+ }, new StateModel() {
+ // Empty no-op state model is good enough for the test.
+ }, message, context, currentStateDelta);
}
@Override
@@ -237,10 +244,6 @@ public class TestHelixTaskExecutor {
}
@Override
- public void onError(Exception e, ErrorCode code, ErrorType type) {
- }
-
- @Override
public StaleMessageValidateResult staleMessageValidator() {
return super.staleMessageValidator();
}
@@ -452,6 +455,7 @@ public class TestHelixTaskExecutor {
@Test()
public void testUnknownTypeMsgExecution() throws InterruptedException {
+ System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
@@ -497,10 +501,12 @@ public class TestHelixTaskExecutor {
AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
}
}
+ System.out.println("END " + TestHelper.getTestMethodName());
}
@Test()
public void testMsgSessionId() throws InterruptedException {
+ System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
@@ -547,6 +553,7 @@ public class TestHelixTaskExecutor {
AssertJUnit.assertTrue(factory._processedMsgIds.containsKey(message.getId()));
}
}
+ System.out.println("END " + TestHelper.getTestMethodName());
}
@Test()
@@ -554,6 +561,8 @@ public class TestHelixTaskExecutor {
System.out.println("START
TestCMTaskExecutor.testCreateHandlerException()");
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
+ HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
for (String type : factory.getMessageTypes()) {
@@ -561,7 +570,6 @@ public class TestHelixTaskExecutor {
}
NotificationContext changeContext = new NotificationContext(manager);
- List<Message> msgList = new ArrayList<Message>();
int nMsgs1 = 5;
for (int i = 0; i < nMsgs1; i++) {
@@ -570,30 +578,37 @@ public class TestHelixTaskExecutor {
msg.setTgtName("Localhost_1123");
msg.setSrcName("127.101.1.23_2234");
msg.setCorrelationId(UUID.randomUUID().toString());
- msgList.add(msg);
+ dataAccessor.setProperty(keyBuilder.message(manager.getInstanceName(),
msg.getMsgId()), msg);
}
- Message exceptionMsg = new Message(factory.getMessageTypes().get(0),
UUID.randomUUID().toString());
+ Message exceptionMsg =
+ new Message(factory.getMessageTypes().get(0),
UUID.randomUUID().toString());
exceptionMsg.setTgtSessionId(manager.getSessionId());
exceptionMsg.setMsgSubType("EXCEPTION");
exceptionMsg.setTgtName("Localhost_1123");
exceptionMsg.setSrcName("127.101.1.23_2234");
exceptionMsg.setCorrelationId(UUID.randomUUID().toString());
- msgList.add(exceptionMsg);
+ dataAccessor.setProperty(keyBuilder.message(manager.getInstanceName(),
exceptionMsg.getMsgId()),
+ exceptionMsg);
changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
- executor.onMessage("someInstance", msgList, changeContext);
+ executor.onMessage(manager.getInstanceName(), Collections.emptyList(),
changeContext);
Thread.sleep(1000);
- AssertJUnit.assertTrue(factory._processedMsgIds.size() == nMsgs1);
- AssertJUnit.assertTrue(factory._handlersCreated == nMsgs1);
+ Assert.assertEquals(factory._processedMsgIds.size(), nMsgs1);
+ Assert.assertEquals(factory._handlersCreated, nMsgs1);
- AssertJUnit.assertTrue(exceptionMsg.getMsgState() ==
MessageState.UNPROCESSABLE);
+ exceptionMsg = dataAccessor
+ .getProperty(keyBuilder.message(manager.getInstanceName(),
exceptionMsg.getMsgId()));
+ Assert.assertNotNull(exceptionMsg);
+ Assert.assertEquals(exceptionMsg.getMsgState(),
MessageState.UNPROCESSABLE);
+ Assert.assertEquals(exceptionMsg.getRetryCount(), -1);
System.out.println("END TestCMTaskExecutor.testCreateHandlerException()");
}
@Test()
public void testTaskCancellation() throws InterruptedException {
+ System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
@@ -643,6 +658,7 @@ public class TestHelixTaskExecutor {
AssertJUnit.assertTrue(factory._processingMsgIds.containsKey(message.getId()));
}
}
+ System.out.println("END " + TestHelper.getTestMethodName());
}
@Test()
@@ -709,8 +725,7 @@ public class TestHelixTaskExecutor {
@Test()
public void testNoRetry() throws InterruptedException {
- // String p = "test_";
- // System.out.println(p.substring(p.lastIndexOf('_')+1));
+ System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
@@ -746,14 +761,12 @@ public class TestHelixTaskExecutor {
AssertJUnit.assertTrue(factory._timedOutMsgIds.containsKey(msgList.get(i).getId()));
}
}
+ System.out.println("END " + TestHelper.getTestMethodName());
}
@Test()
public void testRetryOnce() throws InterruptedException {
- // Logger.getRootLogger().setLevel(Level.INFO);
-
- // String p = "test_";
- // System.out.println(p.substring(p.lastIndexOf('_')+1));
+ System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
@@ -765,8 +778,6 @@ public class TestHelixTaskExecutor {
List<Message> msgList = new ArrayList<Message>();
- // factory.reset();
- // msgList.clear();
// Test the case that the message are executed for the second time
int nMsgs2 = 4;
for (int i = 0; i < nMsgs2; i++) {
@@ -786,11 +797,12 @@ public class TestHelixTaskExecutor {
AssertJUnit.assertTrue(msgList.get(1).getRecord().getSimpleField("Cancelcount").equals("1"));
AssertJUnit.assertEquals(factory._timedOutMsgIds.size(), 2);
AssertJUnit.assertTrue(executor._taskMap.size() == 0);
-
+ System.out.println("END " + TestHelper.getTestMethodName());
}
@Test
public void testStateTransitionCancellationMsg() throws InterruptedException
{
+ System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
@@ -830,10 +842,12 @@ public class TestHelixTaskExecutor {
Thread.sleep(3000);
AssertJUnit.assertEquals(cancelFactory._processedMsgIds.size(), 0);
AssertJUnit.assertEquals(stateTransitionFactory._processedMsgIds.size(),
0);
+ System.out.println("END " + TestHelper.getTestMethodName());
}
@Test
public void testMessageReadOptimization() throws InterruptedException {
+ System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
@@ -873,11 +887,13 @@ public class TestHelixTaskExecutor {
AssertJUnit.assertEquals(nMsgs1, factory._processedMsgIds.size());
// After all messages are processed, _knownMessageIds should be empty.
Assert.assertTrue(executor._knownMessageIds.isEmpty());
+ System.out.println("END " + TestHelper.getTestMethodName());
}
@Test
public void testNoWriteReadStateForRemovedMessage()
throws NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
+ System.out.println("START " + TestHelper.getTestMethodName());
HelixTaskExecutor executor = new HelixTaskExecutor();
HelixManager manager = new MockClusterManager();
TestMessageHandlerFactory factory = new TestMessageHandlerFactory();
@@ -916,5 +932,54 @@ public class TestHelixTaskExecutor {
updateMessageState.invoke(executor, messages, accessor, instanceName);
Assert
.assertEquals(accessor.getChildNames(keyBuilder.messages(instanceName)).size(),
nMsgs1 - 1);
+ System.out.println("END " + TestHelper.getTestMethodName());
+ }
+
+ @Test(dependsOnMethods = "testStateTransitionCancellationMsg")
+ public void testStateTransitionMsgScheduleFailure() {
+ System.out.println("START " + TestHelper.getTestMethodName());
+
+ // Create a mock executor that fails the task scheduling.
+ HelixTaskExecutor executor = new HelixTaskExecutor() {
+ @Override
+ public boolean scheduleTask(MessageTask task) {
+ return false;
+ }
+ };
+ HelixManager manager = new MockClusterManager();
+ HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+
+ TestStateTransitionHandlerFactory stateTransitionFactory =
+ new
TestStateTransitionHandlerFactory(Message.MessageType.STATE_TRANSITION.name());
+
executor.registerMessageHandlerFactory(Message.MessageType.STATE_TRANSITION.name(),
+ stateTransitionFactory);
+
+ NotificationContext changeContext = new NotificationContext(manager);
+
+ Message msg = new Message(Message.MessageType.STATE_TRANSITION,
UUID.randomUUID().toString());
+ msg.setTgtSessionId(manager.getSessionId());
+ msg.setPartitionName("P1");
+ msg.setResourceName("R1");
+ msg.setTgtName("Localhost_1123");
+ msg.setSrcName("127.101.1.23_2234");
+ msg.setFromState("SLAVE");
+ msg.setToState("MASTER");
+ dataAccessor.setProperty(keyBuilder.message(manager.getInstanceName(),
msg.getMsgId()), msg);
+
+ changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE);
+ executor.onMessage(manager.getInstanceName(), Collections.emptyList(),
changeContext);
+
+ Assert.assertEquals(stateTransitionFactory._processedMsgIds.size(), 0);
+ // Message should have been removed
+ Assert.assertNull(
+ dataAccessor.getProperty(keyBuilder.message(manager.getInstanceName(),
msg.getMsgId())));
+ // Current state would be ERROR due to the failure of task scheduling.
+ CurrentState currentState = dataAccessor.getProperty(keyBuilder
+ .currentState(manager.getInstanceName(), manager.getSessionId(),
msg.getResourceName()));
+ Assert.assertNotNull(currentState);
+ Assert.assertEquals(currentState.getState(msg.getPartitionName()),
+ HelixDefinedState.ERROR.toString());
+ System.out.println("END " + TestHelper.getTestMethodName());
}
}