iamzhoug37 closed pull request #280: [ISSUE #249] master-slave sync model
performance improve
URL: https://github.com/apache/rocketmq/pull/280
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index f6f8a80af..893f7fe62 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -27,7 +27,7 @@
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageCallback;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
@@ -81,8 +81,8 @@ public void destroy() {
}
@Override
- public PutMessageResult putMessage(MessageExtBrokerInner msg) {
- return next.putMessage(msg);
+ public void putMessage(MessageExtBrokerInner msg , PutMessageCallback
putMessageCallback) {
+ next.putMessage(msg , putMessageCallback);
}
@Override
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index d69a78700..543a450af 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -106,6 +106,7 @@
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.store.ConsumeQueue;
@@ -207,6 +208,12 @@ public RemotingCommand
processRequest(ChannelHandlerContext ctx,
return null;
}
+ @Override
+ public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws
Exception {
+ RemotingCommand remotingCommand = processRequest(ctx , request) ;
+ remoteCommandResponseCallback.callback(remotingCommand);
+ }
+
@Override
public boolean rejectRequest() {
return false;
@@ -1356,4 +1363,4 @@ private RemotingCommand
queryConsumeQueue(ChannelHandlerContext ctx,
return response;
}
-}
+}
\ No newline at end of file
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index 67807a863..15b6f47a6 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -38,6 +38,7 @@
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +67,12 @@ public RemotingCommand processRequest(ChannelHandlerContext
ctx, RemotingCommand
return null;
}
+ @Override
+ public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws
Exception {
+ RemotingCommand remotingCommand = processRequest(ctx , request) ;
+ remoteCommandResponseCallback.callback(remotingCommand);
+ }
+
@Override
public boolean rejectRequest() {
return false;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index bb427050d..e0fcdb7bd 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -33,6 +33,7 @@
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +63,12 @@ public RemotingCommand processRequest(ChannelHandlerContext
ctx, RemotingCommand
return null;
}
+ @Override
+ public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws
Exception {
+ RemotingCommand remotingCommand = processRequest(ctx , request) ;
+ remoteCommandResponseCallback.callback(remotingCommand);
+ }
+
@Override
public boolean rejectRequest() {
return false;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index fee1420a9..9a8034352 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -30,9 +30,11 @@
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageCallback;
import org.apache.rocketmq.store.PutMessageResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -144,7 +146,15 @@ public RemotingCommand
processRequest(ChannelHandlerContext ctx,
}
final MessageStore messageStore =
this.brokerController.getMessageStore();
- final PutMessageResult putMessageResult =
messageStore.putMessage(msgInner);
+ PutMessageCallback putMessageCallback = new PutMessageCallback() ;
+ messageStore.putMessage(msgInner , putMessageCallback);
+ try {
+ putMessageCallback.waitComplete();
+ }
+ catch (InterruptedException e) {
+ //ignore
+ }
+ final PutMessageResult putMessageResult =
putMessageCallback.getPutMessageResult() ;
if (putMessageResult != null) {
switch (putMessageResult.getPutMessageStatus()) {
// Success
@@ -197,6 +207,12 @@ public RemotingCommand
processRequest(ChannelHandlerContext ctx,
return response;
}
+ @Override
+ public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws
Exception {
+ RemotingCommand remotingCommand = processRequest(ctx , request) ;
+ remoteCommandResponseCallback.callback(remotingCommand);
+ }
+
@Override
public boolean rejectRequest() {
return false;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
index 199aa940d..f58ad0c18 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
@@ -20,6 +20,7 @@
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +39,12 @@ public RemotingCommand processRequest(ChannelHandlerContext
ctx, RemotingCommand
return null;
}
+ @Override
+ public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws
Exception {
+ RemotingCommand remotingCommand = processRequest(ctx , request) ;
+ remoteCommandResponseCallback.callback(remotingCommand);
+ }
+
@Override
public boolean rejectRequest() {
return false;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index a46cbff2e..a709f2d01 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -56,11 +56,12 @@
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
-import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageCallback;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.slf4j.Logger;
@@ -81,6 +82,12 @@ public RemotingCommand processRequest(final
ChannelHandlerContext ctx,
return this.processRequest(ctx.channel(), request, true);
}
+ @Override
+ public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws
Exception {
+ RemotingCommand remotingCommand = processRequest(ctx , request) ;
+ remoteCommandResponseCallback.callback(remotingCommand);
+ }
+
@Override
public boolean rejectRequest() {
return false;
@@ -524,7 +531,7 @@ private void generateOffsetMovedEvent(final
OffsetMovedEvent event) {
msgInner.setReconsumeTimes(0);
- PutMessageResult putMessageResult =
this.brokerController.getMessageStore().putMessage(msgInner);
+ this.brokerController.getMessageStore().putMessage(msgInner , new
PutMessageCallback());
} catch (Exception e) {
log.warn(String.format("generateOffsetMovedEvent Exception, %s",
event.toString()), e);
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
index e8f97d0af..ae204be28 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
@@ -32,6 +32,7 @@
import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
@@ -62,6 +63,12 @@ public RemotingCommand processRequest(ChannelHandlerContext
ctx, RemotingCommand
return null;
}
+ @Override
+ public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws
Exception {
+ RemotingCommand remotingCommand = processRequest(ctx , request) ;
+ remoteCommandResponseCallback.callback(remotingCommand);
+ }
+
@Override
public boolean rejectRequest() {
return false;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageCallback.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageCallback.java
new file mode 100644
index 000000000..787e28221
--- /dev/null
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageCallback.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
+/**
+ * SendMessageCallback is a asynchronous callback for processor process the
response of the produce request
+ */
+public interface SendMessageCallback {
+ void callback(RemotingCommand response);
+}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 227a23e6b..cb963871a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -45,8 +45,10 @@
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageCallback;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -60,31 +62,43 @@ public SendMessageProcessor(final BrokerController
brokerController) {
}
@Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx,
- RemotingCommand request) throws RemotingCommandException {
- SendMessageContext mqtraceContext;
+ public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request, final RemoteCommandResponseCallback remoteCommandResponseCallback)
throws Exception {
+
+ final SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
- return this.consumerSendMsgBack(ctx, request);
+ this.consumerSendMsgBack(ctx, request ,
remoteCommandResponseCallback);
+ return ;
default:
SendMessageRequestHeader requestHeader =
parseRequestHeader(request);
if (requestHeader == null) {
- return null;
+ remoteCommandResponseCallback.callback(null);
+ return;
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request,
mqtraceContext);
- RemotingCommand response;
+ SendMessageCallback sendMessageCallback = new
SendMessageCallback() {
+ @Override
+ public void callback(RemotingCommand response) {
+ executeSendMessageHookAfter(response, mqtraceContext);
+ remoteCommandResponseCallback.callback(response);
+ }
+ } ;
if (requestHeader.isBatch()) {
- response = this.sendBatchMessage(ctx, request,
mqtraceContext, requestHeader);
+ this.sendBatchMessage(ctx, request, mqtraceContext,
requestHeader , sendMessageCallback);
} else {
- response = this.sendMessage(ctx, request, mqtraceContext,
requestHeader);
+ this.sendMessage(ctx, request, mqtraceContext,
requestHeader , sendMessageCallback);
}
-
- this.executeSendMessageHookAfter(response, mqtraceContext);
- return response;
}
+
+ }
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ throw new RemotingCommandException("sendMessageProcessor not support
processRequest" , new UnsupportedOperationException()) ;
}
@Override
@@ -93,7 +107,7 @@ public boolean rejectRequest() {
this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}
- private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext
ctx, final RemotingCommand request)
+ private void consumerSendMsgBack(final ChannelHandlerContext ctx, final
RemotingCommand request , final RemoteCommandResponseCallback
remoteCommandResponseCallback)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
@@ -117,19 +131,22 @@ private RemotingCommand consumerSendMsgBack(final
ChannelHandlerContext ctx, fin
response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
response.setRemark("subscription group not exist, " +
requestHeader.getGroup() + " "
+ FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
- return response;
+ remoteCommandResponseCallback.callback(response);
+ return ;
}
if
(!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()))
{
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" +
this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is
forbidden");
- return response;
+ remoteCommandResponseCallback.callback(response);
+ return ;
}
if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
- return response;
+ remoteCommandResponseCallback.callback(response);
+ return ;
}
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
@@ -147,20 +164,23 @@ private RemotingCommand consumerSendMsgBack(final
ChannelHandlerContext ctx, fin
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
- return response;
+ remoteCommandResponseCallback.callback(response);
+ return ;
}
if (!PermName.isWriteable(topicConfig.getPerm())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(String.format("the topic[%s] sending message is
forbidden", newTopic));
- return response;
+ remoteCommandResponseCallback.callback(response);
+ return ;
}
- MessageExt msgExt =
this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
+ final MessageExt msgExt =
this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " +
requestHeader.getOffset());
- return response;
+ remoteCommandResponseCallback.callback(response);
+ return ;
}
final String retryTopic =
msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
@@ -188,7 +208,8 @@ private RemotingCommand consumerSendMsgBack(final
ChannelHandlerContext ctx, fin
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
- return response;
+ remoteCommandResponseCallback.callback(response);
+ return ;
}
} else {
if (0 == delayLevel) {
@@ -216,34 +237,40 @@ private RemotingCommand consumerSendMsgBack(final
ChannelHandlerContext ctx, fin
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner,
UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
- PutMessageResult putMessageResult =
this.brokerController.getMessageStore().putMessage(msgInner);
- if (putMessageResult != null) {
- switch (putMessageResult.getPutMessageStatus()) {
- case PUT_OK:
- String backTopic = msgExt.getTopic();
- String correctTopic =
msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
- if (correctTopic != null) {
- backTopic = correctTopic;
+ this.brokerController.getMessageStore().putMessage(msgInner, new
PutMessageCallback() {
+ @Override
+ public void doAction(PutMessageResult putMessageResult) {
+ if (putMessageResult != null) {
+ switch (putMessageResult.getPutMessageStatus()) {
+ case PUT_OK:
+ String backTopic = msgExt.getTopic();
+ String correctTopic =
msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
+ if (correctTopic != null) {
+ backTopic = correctTopic;
+ }
+
+
brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(),
backTopic);
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+
+ remoteCommandResponseCallback.callback(response);
+ return ;
+ default:
+ break;
}
-
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(),
backTopic);
-
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+
response.setRemark(putMessageResult.getPutMessageStatus().name());
+ remoteCommandResponseCallback.callback(response);
+ return ;
+ }
- return response;
- default:
- break;
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("putMessageResult is null");
+ remoteCommandResponseCallback.callback(response);
}
-
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(putMessageResult.getPutMessageStatus().name());
- return response;
- }
-
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("putMessageResult is null");
- return response;
+ });
}
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader,
RemotingCommand response,
@@ -290,10 +317,11 @@ private boolean
handleRetryAndDLQ(SendMessageRequestHeader requestHeader, Remoti
return true;
}
- private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
+ private void sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
- final SendMessageRequestHeader requestHeader) throws
RemotingCommandException {
+ final SendMessageRequestHeader requestHeader,
+ final SendMessageCallback sendMessageCallback) throws
RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader) response.readCustomHeader();
@@ -309,13 +337,15 @@ private RemotingCommand sendMessage(final
ChannelHandlerContext ctx,
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until
%s", UtilAll.timeMillisToHumanString2(startTimstamp)));
- return response;
+ sendMessageCallback.callback(response);
+ return ;
}
response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
- return response;
+ sendMessageCallback.callback(response);
+ return ;
}
final byte[] body = request.getBody();
@@ -327,12 +357,13 @@ private RemotingCommand sendMessage(final
ChannelHandlerContext ctx,
queueIdInt = Math.abs(this.random.nextInt() % 99999999) %
topicConfig.getWriteQueueNums();
}
- MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+ final MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner,
topicConfig)) {
- return response;
+ sendMessageCallback.callback(response);
+ return ;
}
msgInner.setBody(body);
@@ -350,14 +381,20 @@ private RemotingCommand sendMessage(final
ChannelHandlerContext ctx,
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" +
this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction
message is forbidden");
- return response;
+ sendMessageCallback.callback(response);
+ return ;
}
}
- PutMessageResult putMessageResult =
this.brokerController.getMessageStore().putMessage(msgInner);
-
- return handlePutMessageResult(putMessageResult, response, request,
msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
-
+ final int innerQueueIdInt = queueIdInt ;
+ PutMessageCallback putMessageCallback = new PutMessageCallback() {
+ @Override
+ public void doAction(PutMessageResult putMessageResult) {
+ RemotingCommand remotingCommand =
handlePutMessageResult(putMessageResult, response, request, msgInner,
responseHeader, sendMessageContext, ctx, innerQueueIdInt);
+ sendMessageCallback.callback(remotingCommand);
+ }
+ } ;
+ this.brokerController.getMessageStore().putMessage(msgInner ,
putMessageCallback);
}
private RemotingCommand handlePutMessageResult(PutMessageResult
putMessageResult, RemotingCommand response,
@@ -465,10 +502,11 @@ private RemotingCommand
handlePutMessageResult(PutMessageResult putMessageResult
return response;
}
- private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
+ private void sendBatchMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
- final SendMessageRequestHeader requestHeader) throws
RemotingCommandException {
+ final SendMessageRequestHeader requestHeader ,
+ final SendMessageCallback sendMessageCallback) throws
RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
final SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader) response.readCustomHeader();
@@ -484,13 +522,15 @@ private RemotingCommand sendBatchMessage(final
ChannelHandlerContext ctx,
if (this.brokerController.getMessageStore().now() < startTimstamp) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("broker unable to service, until
%s", UtilAll.timeMillisToHumanString2(startTimstamp)));
- return response;
+ sendMessageCallback.callback(response);
+ return ;
}
response.setCode(-1);
super.msgCheck(ctx, requestHeader, response);
if (response.getCode() != -1) {
- return response;
+ sendMessageCallback.callback(response);
+ return ;
}
int queueIdInt = requestHeader.getQueueId();
@@ -503,15 +543,17 @@ private RemotingCommand sendBatchMessage(final
ChannelHandlerContext ctx,
if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("message topic length too long " +
requestHeader.getTopic().length());
- return response;
+ sendMessageCallback.callback(response);
+ return ;
}
if (requestHeader.getTopic() != null &&
requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark("batch request does not support retry group " +
requestHeader.getTopic());
- return response;
+ sendMessageCallback.callback(response);
+ return ;
}
- MessageExtBatch messageExtBatch = new MessageExtBatch();
+ final MessageExtBatch messageExtBatch = new MessageExtBatch();
messageExtBatch.setTopic(requestHeader.getTopic());
messageExtBatch.setQueueId(queueIdInt);
@@ -529,9 +571,15 @@ private RemotingCommand sendBatchMessage(final
ChannelHandlerContext ctx,
messageExtBatch.setStoreHost(this.getStoreHost());
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() ==
null ? 0 : requestHeader.getReconsumeTimes());
- PutMessageResult putMessageResult =
this.brokerController.getMessageStore().putMessages(messageExtBatch);
-
- return handlePutMessageResult(putMessageResult, response, request,
messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);
+ final int innerQueueIdInt = queueIdInt ;
+ PutMessageCallback putMessageCallback = new PutMessageCallback() {
+ @Override
+ public void doAction(PutMessageResult putMessageResult) {
+ RemotingCommand remotingCommand =
handlePutMessageResult(putMessageResult, response, request, messageExtBatch,
responseHeader, sendMessageContext, ctx, innerQueueIdInt);
+ sendMessageCallback.callback(remotingCommand);
+ }
+ } ;
+ this.brokerController.getMessageStore().putMessages(messageExtBatch ,
putMessageCallback);
}
public boolean hasConsumeMessageHook() {
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
index e544d90a1..9e993f9e4 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java
@@ -32,6 +32,7 @@
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.PutMessageCallback;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -184,7 +185,10 @@ public void dispatch(DispatchRequest request) {
msg.putUserProperty("a", String.valueOf(j * 10 + 5));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
- PutMessageResult result = master.putMessage(msg);
+ PutMessageCallback putMessageCallback = new
PutMessageCallback() ;
+ master.putMessage(msg , putMessageCallback);
+ putMessageCallback.waitComplete();
+ PutMessageResult result =
putMessageCallback.getPutMessageResult() ;
msg.setMsgId(result.getAppendMessageResult().getMsgId());
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index 7828e7a91..e522d7a3d 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@ -39,6 +39,7 @@
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageCallback;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.MessageStoreConfig;
@@ -84,13 +85,13 @@ public void init() {
@Test
public void testProcessRequest() throws RemotingCommandException {
-
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+ when(putMessage()).thenReturn(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
assertPutResult(ResponseCode.SUCCESS);
}
@Test
public void testProcessRequest_WithHook() throws RemotingCommandException {
-
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+ when(putMessage()).thenReturn(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
List<SendMessageHook> sendMessageHookList = new ArrayList<>();
final SendMessageContext[] sendMessageContext = new
SendMessageContext[1];
SendMessageHook sendMessageHook = new SendMessageHook() {
@@ -120,55 +121,55 @@ public void sendMessageAfter(SendMessageContext context) {
@Test
public void testProcessRequest_FlushTimeOut() throws
RemotingCommandException {
-
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ when(putMessage()).thenReturn(new
PutMessageResult(PutMessageStatus.FLUSH_DISK_TIMEOUT, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
assertPutResult(ResponseCode.FLUSH_DISK_TIMEOUT);
}
@Test
public void testProcessRequest_MessageIllegal() throws
RemotingCommandException {
-
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ when(putMessage()).thenReturn(new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
assertPutResult(ResponseCode.MESSAGE_ILLEGAL);
}
@Test
public void testProcessRequest_CreateMappedFileFailed() throws
RemotingCommandException {
-
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ when(putMessage()).thenReturn(new
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
assertPutResult(ResponseCode.SYSTEM_ERROR);
}
@Test
public void testProcessRequest_FlushSlaveTimeout() throws
RemotingCommandException {
-
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ when(putMessage()).thenReturn(new
PutMessageResult(PutMessageStatus.FLUSH_SLAVE_TIMEOUT, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
assertPutResult(ResponseCode.FLUSH_SLAVE_TIMEOUT);
}
@Test
public void testProcessRequest_PageCacheBusy() throws
RemotingCommandException {
-
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ when(putMessage()).thenReturn(new
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
assertPutResult(ResponseCode.SYSTEM_ERROR);
}
@Test
public void testProcessRequest_PropertiesTooLong() throws
RemotingCommandException {
-
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ when(putMessage()).thenReturn(new
PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
assertPutResult(ResponseCode.MESSAGE_ILLEGAL);
}
@Test
public void testProcessRequest_ServiceNotAvailable() throws
RemotingCommandException {
-
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ when(putMessage()).thenReturn(new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
assertPutResult(ResponseCode.SERVICE_NOT_AVAILABLE);
}
@Test
public void testProcessRequest_SlaveNotAvailable() throws
RemotingCommandException {
-
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ when(putMessage()).thenReturn(new
PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
assertPutResult(ResponseCode.SLAVE_NOT_AVAILABLE);
}
@Test
public void testProcessRequest_WithMsgBack() throws
RemotingCommandException {
-
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+ when(putMessage()).thenReturn(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
final RemotingCommand request =
createSendMsgBackCommand(RequestCode.CONSUMER_SEND_MSG_BACK);
sendMessageProcessor = new SendMessageProcessor(brokerController);
@@ -177,6 +178,17 @@ public void testProcessRequest_WithMsgBack() throws
RemotingCommandException {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
+ private PutMessageResult putMessage(){
+ PutMessageCallback putMessageCallback = new PutMessageCallback();
+ messageStore.putMessage(any(MessageExtBrokerInner.class) ,
putMessageCallback) ;
+ try{
+ putMessageCallback.waitComplete();
+ }
+ catch (InterruptedException e){
+ e.printStackTrace();
+ }
+ return putMessageCallback.getPutMessageResult() ;
+ }
private RemotingCommand createSendMsgCommand(int requestCode) {
SendMessageRequestHeader requestHeader = new
SendMessageRequestHeader();
requestHeader.setProducerGroup(group);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
index 69478cf32..f168c7401 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
@@ -43,6 +43,7 @@
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
@@ -78,6 +79,12 @@ public RemotingCommand processRequest(ChannelHandlerContext
ctx,
return null;
}
+ @Override
+ public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws
Exception {
+ RemotingCommand remotingCommand = processRequest(ctx , request) ;
+ remoteCommandResponseCallback.callback(remotingCommand);
+ }
+
@Override
public boolean rejectRequest() {
return false;
diff --git
a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
index e459b1aeb..0d9701c79 100644
---
a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
+++
b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java
@@ -45,6 +45,7 @@
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.CommitLog;
import org.slf4j.Logger;
@@ -78,6 +79,12 @@ public RemotingCommand processRequest(ChannelHandlerContext
ctx, RemotingCommand
return null;
}
+ @Override
+ public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws
Exception {
+ RemotingCommand remotingCommand = processRequest(ctx , request) ;
+ remoteCommandResponseCallback.callback(remotingCommand);
+ }
+
@Override
public boolean rejectRequest() {
return false;
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
index f6611b683..2dd363c79 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java
@@ -26,6 +26,7 @@
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
@@ -83,4 +84,10 @@ public RemotingCommand
getRouteInfoByTopic(ChannelHandlerContext ctx,
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
+
+ @Override
+ public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws
Exception {
+ RemotingCommand remotingCommand = processRequest(ctx , request) ;
+ remoteCommandResponseCallback.callback(remotingCommand);
+ }
}
diff --git
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
index ed5b20b16..8ec562a64 100644
---
a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
+++
b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
@@ -49,6 +49,7 @@
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,6 +121,12 @@ public RemotingCommand
processRequest(ChannelHandlerContext ctx,
return null;
}
+ @Override
+ public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request, RemoteCommandResponseCallback remoteCommandResponseCallback) throws
Exception {
+ RemotingCommand remotingCommand = processRequest(ctx , request) ;
+ remoteCommandResponseCallback.callback(remotingCommand);
+ }
+
@Override
public boolean rejectRequest() {
return false;
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 76752529a..7f6253731 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -45,6 +45,7 @@
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
import org.slf4j.Logger;
@@ -170,31 +171,36 @@ public void processRequestCommand(final
ChannelHandlerContext ctx, final Remotin
@Override
public void run() {
try {
- RPCHook rpcHook =
NettyRemotingAbstract.this.getRPCHook();
+ final RPCHook rpcHook =
NettyRemotingAbstract.this.getRPCHook();
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
cmd);
}
- final RemotingCommand response =
pair.getObject1().processRequest(ctx, cmd);
- if (rpcHook != null) {
-
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
cmd, response);
- }
-
- if (!cmd.isOnewayRPC()) {
- if (response != null) {
- response.setOpaque(opaque);
- response.markResponseType();
- try {
- ctx.writeAndFlush(response);
- } catch (Throwable e) {
- log.error("process request over, but
response failed", e);
- log.error(cmd.toString());
- log.error(response.toString());
+ final RemoteCommandResponseCallback responseCallback =
new RemoteCommandResponseCallback() {
+ @Override
+ public void callback(RemotingCommand response) {
+ if (rpcHook != null) {
+
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
cmd, response);
}
- } else {
+ if (!cmd.isOnewayRPC()) {
+ if (response != null) {
+ response.setOpaque(opaque);
+ response.markResponseType();
+ try {
+ ctx.writeAndFlush(response);
+ } catch (Throwable e) {
+ log.error("process request over,
but response failed", e);
+ log.error(cmd.toString());
+ log.error(response.toString());
+ }
+ } else {
+
+ }
+ }
}
- }
+ } ;
+ pair.getObject1().asyncProcessRequest(ctx, cmd ,
responseCallback);
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
index 040f76848..a6f05e93f 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.remoting.netty;
import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
/**
@@ -26,5 +27,8 @@
RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
request)
throws Exception;
+ void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand
request , RemoteCommandResponseCallback remoteCommandResponseCallback)
+ throws Exception;
+
boolean rejectRequest();
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemoteCommandResponseCallback.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemoteCommandResponseCallback.java
new file mode 100644
index 000000000..c13d64ca0
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemoteCommandResponseCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol;
+
+/**
+ * RemoteCommandResponseCallback is a asynchronous callback for netty network
io
+ */
+public interface RemoteCommandResponseCallback {
+ void callback(RemotingCommand response);
+}
diff --git
a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
index 0ecfaaa5a..461496691 100644
---
a/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
+++
b/remoting/src/test/java/org/apache/rocketmq/remoting/RemotingServerTest.java
@@ -33,6 +33,7 @@
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemoteCommandResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -55,6 +56,11 @@ public RemotingCommand processRequest(ChannelHandlerContext
ctx, RemotingCommand
return request;
}
+ @Override
+ public void asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request, RemoteCommandResponseCallback
remoteCommandResponseCallback) throws Exception {
+ throw new UnsupportedOperationException() ;
+ }
+
@Override
public boolean rejectRequest() {
return false;
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 03d98d319..0a443a417 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -62,6 +62,7 @@
private volatile long beginTimeInLock = 0;
private final PutMessageLock putMessageLock;
+ private final GroupCommitCallback groupCommitCallback ;
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new
MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
@@ -84,6 +85,7 @@ protected MessageExtBatchEncoder initialValue() {
}
};
this.putMessageLock =
defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage()
? new PutMessageReentrantLock() : new PutMessageSpinLock();
+ this.groupCommitCallback = new GroupCommitCallback() ;
}
@@ -518,7 +520,7 @@ public long getBeginTimeInLock() {
return beginTimeInLock;
}
- public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
+ public void putMessage(final MessageExtBrokerInner msg , final
CommitLogPutMessageCallback commitLogPutMessageCallback) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
@@ -573,7 +575,8 @@ public PutMessageResult putMessage(final
MessageExtBrokerInner msg) {
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " +
msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
- return new
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
+ commitLogPutMessageCallback.callback(new
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
+ return ;
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
@@ -588,20 +591,24 @@ public PutMessageResult putMessage(final
MessageExtBrokerInner msg) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " +
msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
- return new
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
+ commitLogPutMessageCallback.callback(new
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
+ return ;
}
result = mappedFile.appendMessage(msg,
this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
- return new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
+ commitLogPutMessageCallback.callback(new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
+ return ;
case UNKNOWN_ERROR:
beginTimeInLock = 0;
- return new
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+ commitLogPutMessageCallback.callback(new
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
+ return ;
default:
beginTimeInLock = 0;
- return new
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+ commitLogPutMessageCallback.callback(new
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
+ return ;
}
eclipseTimeInLock =
this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
@@ -625,9 +632,9 @@ public PutMessageResult putMessage(final
MessageExtBrokerInner msg) {
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
handleDiskFlush(result, putMessageResult, msg);
- handleHA(result, putMessageResult, msg);
-
- return putMessageResult;
+ if (!handleHA(result, putMessageResult, msg,
commitLogPutMessageCallback)) {
+ commitLogPutMessageCallback.callback(putMessageResult);
+ }
}
public void handleDiskFlush(AppendMessageResult result, PutMessageResult
putMessageResult, MessageExt messageExt) {
@@ -657,22 +664,25 @@ public void handleDiskFlush(AppendMessageResult result,
PutMessageResult putMess
}
}
- public void handleHA(AppendMessageResult result, PutMessageResult
putMessageResult, MessageExt messageExt) {
+ /**
+ *
+ * @param result
+ * @param putMessageResult
+ * @param messageExt
+ * @param commitLogPutMessageCallback
+ * @return whether to wait slave, if false, the request should respond
synchronously
+ */
+ public boolean handleHA(AppendMessageResult result, PutMessageResult
putMessageResult, MessageExt messageExt, CommitLogPutMessageCallback
commitLogPutMessageCallback) {
if (BrokerRole.SYNC_MASTER ==
this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
// Determine whether to wait
if (service.isSlaveOK(result.getWroteOffset() +
result.getWroteBytes())) {
- GroupCommitRequest request = new
GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
+
+ GroupCommitRequest request = new
GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes() ,
commitLogPutMessageCallback, putMessageResult, messageExt, groupCommitCallback)
;
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
- boolean flushOK =
-
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
- if (!flushOK) {
- log.error("do sync transfer other node, wait return,
but failed, topic: " + messageExt.getTopic() + " tags: "
- + messageExt.getTags() + " client address: " +
messageExt.getBornHostNameString());
-
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
- }
+ return true;
}
// Slave problem
else {
@@ -682,9 +692,10 @@ public void handleHA(AppendMessageResult result,
PutMessageResult putMessageResu
}
}
+ return false;
}
- public PutMessageResult putMessages(final MessageExtBatch messageExtBatch)
{
+ public void putMessages(final MessageExtBatch messageExtBatch ,
CommitLogPutMessageCallback commitLogPutMessageCallback) {
messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
AppendMessageResult result;
@@ -693,10 +704,12 @@ public PutMessageResult putMessages(final MessageExtBatch
messageExtBatch) {
final int tranType =
MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
null);
+ commitLogPutMessageCallback.callback(new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+ return ;
}
if (messageExtBatch.getDelayTimeLevel() > 0) {
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
null);
+ commitLogPutMessageCallback.callback(new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+ return ;
}
long eclipseTimeInLock = 0;
@@ -723,7 +736,8 @@ public PutMessageResult putMessages(final MessageExtBatch
messageExtBatch) {
if (null == mappedFile) {
log.error("Create mapped file1 error, topic: {} clientAddr:
{}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
beginTimeInLock = 0;
- return new
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
+ commitLogPutMessageCallback.callback(new
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
+ return ;
}
result = mappedFile.appendMessages(messageExtBatch,
this.appendMessageCallback);
@@ -738,20 +752,24 @@ public PutMessageResult putMessages(final MessageExtBatch
messageExtBatch) {
// XXX: warn and notify me
log.error("Create mapped file2 error, topic: {}
clientAddr: {}", messageExtBatch.getTopic(),
messageExtBatch.getBornHostString());
beginTimeInLock = 0;
- return new
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
+ commitLogPutMessageCallback.callback(new
PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
+ return ;
}
result = mappedFile.appendMessages(messageExtBatch,
this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
- return new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
+ commitLogPutMessageCallback.callback(new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
+ return ;
case UNKNOWN_ERROR:
beginTimeInLock = 0;
- return new
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+ commitLogPutMessageCallback.callback(new
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
+ return ;
default:
beginTimeInLock = 0;
- return new
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
+ commitLogPutMessageCallback.callback(new
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
+ return ;
}
eclipseTimeInLock =
this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
@@ -776,9 +794,9 @@ public PutMessageResult putMessages(final MessageExtBatch
messageExtBatch) {
handleDiskFlush(result, putMessageResult, messageExtBatch);
- handleHA(result, putMessageResult, messageExtBatch);
-
- return putMessageResult;
+ if (!handleHA(result, putMessageResult, messageExtBatch,
commitLogPutMessageCallback)) {
+ commitLogPutMessageCallback.callback(putMessageResult);
+ }
}
/**
@@ -1027,10 +1045,21 @@ public long getJointime() {
private final long nextOffset;
private final CountDownLatch countDownLatch = new CountDownLatch(1);
private volatile boolean flushOK = false;
+ private CommitLogPutMessageCallback commitLogPutMessageCallback ;
+ private PutMessageResult putMessageResult ;
+ private MessageExt messageExt ;
+ private GroupCommitCallback groupCommitCallback ;
public GroupCommitRequest(long nextOffset) {
this.nextOffset = nextOffset;
}
+ public GroupCommitRequest(long nextOffset, CommitLogPutMessageCallback
commitLogPutMessageCallback, PutMessageResult putMessageResult, MessageExt
messageExt, GroupCommitCallback groupCommitCallback) {
+ this.nextOffset = nextOffset;
+ this.commitLogPutMessageCallback = commitLogPutMessageCallback;
+ this.putMessageResult = putMessageResult;
+ this.messageExt = messageExt;
+ this.groupCommitCallback = groupCommitCallback ;
+ }
public long getNextOffset() {
return nextOffset;
@@ -1050,6 +1079,42 @@ public boolean waitForFlush(long timeout) {
return false;
}
}
+
+ public void setFlushOK(boolean flushOK) {
+ this.flushOK = flushOK;
+ }
+
+ public boolean isFlushOK() {
+ return flushOK;
+ }
+
+ public CommitLogPutMessageCallback getCommitLogPutMessageCallback() {
+ return commitLogPutMessageCallback;
+ }
+
+ public PutMessageResult getPutMessageResult() {
+ return putMessageResult;
+ }
+
+ public MessageExt getMessageExt() {
+ return messageExt;
+ }
+
+ public GroupCommitCallback getGroupCommitCallback() {
+ return groupCommitCallback;
+ }
+ }
+
+ public static class GroupCommitCallback {
+ public void doSlaveAction(GroupCommitRequest request) {
+ boolean flushOK = request.isFlushOK() ;
+ if (!flushOK) {
+ log.error("do sync transfer other node, wait return, but
failed, topic: " + request.getMessageExt().getTopic() + " tags: "
+ + request.getMessageExt().getTags() + " client
address: " + request.getMessageExt().getBornHostNameString());
+
request.getPutMessageResult().setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+ }
+
request.getCommitLogPutMessageCallback().callback(request.getPutMessageResult());
+ }
}
/**
diff --git
a/store/src/main/java/org/apache/rocketmq/store/CommitLogPutMessageCallback.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLogPutMessageCallback.java
new file mode 100644
index 000000000..f2cffcd5d
--- /dev/null
+++
b/store/src/main/java/org/apache/rocketmq/store/CommitLogPutMessageCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+/**
+ * CommitLogPutMessageCallback is a asynchronous callback for messageStore
+ */
+public interface CommitLogPutMessageCallback {
+ void callback(PutMessageResult result) ;
+}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7a5647c3e..2c0854fef 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -302,10 +302,11 @@ public void destroyLogics() {
}
}
- public PutMessageResult putMessage(MessageExtBrokerInner msg) {
+ public void putMessage(final MessageExtBrokerInner msg , final
PutMessageCallback putMessageCallback) {
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
- return new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+ putMessageCallback.callback(new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+ return ;
}
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
@@ -314,7 +315,8 @@ public PutMessageResult putMessage(MessageExtBrokerInner
msg) {
log.warn("message store is slave mode, so putMessage is
forbidden ");
}
- return new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+ putMessageCallback.callback(new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+ return ;
}
if (!this.runningFlags.isWriteable()) {
@@ -323,45 +325,54 @@ public PutMessageResult putMessage(MessageExtBrokerInner
msg) {
log.warn("message store is not writeable, so putMessage is
forbidden " + this.runningFlags.getFlagBits());
}
- return new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+ putMessageCallback.callback(new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+ return ;
} else {
this.printTimes.set(0);
}
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " +
msg.getTopic().length());
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
null);
+ putMessageCallback.callback(new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+ return ;
}
if (msg.getPropertiesString() != null &&
msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " +
msg.getPropertiesString().length());
- return new
PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
+ putMessageCallback.callback(new
PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null));
+ return ;
}
if (this.isOSPageCacheBusy()) {
- return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY,
null);
+ putMessageCallback.callback(new
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null));
+ return ;
}
- long beginTime = this.getSystemClock().now();
- PutMessageResult result = this.commitLog.putMessage(msg);
-
- long eclipseTime = this.getSystemClock().now() - beginTime;
- if (eclipseTime > 500) {
- log.warn("putMessage not in lock eclipse time(ms)={},
bodyLength={}", eclipseTime, msg.getBody().length);
- }
- this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
+ final long beginTime = this.getSystemClock().now();
+ CommitLogPutMessageCallback commitLogPutMessageCallback = new
CommitLogPutMessageCallback() {
+ @Override
+ public void callback(PutMessageResult result) {
+ long eclipseTime = getSystemClock().now() - beginTime;
+ if (eclipseTime > 500) {
+ log.warn("putMessage not in lock eclipse time(ms)={},
bodyLength={}", eclipseTime, msg.getBody().length);
+ }
+ storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
- if (null == result || !result.isOk()) {
-
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
- }
+ if (null == result || !result.isOk()) {
+
storeStatsService.getPutMessageFailedTimes().incrementAndGet();
+ }
- return result;
+ putMessageCallback.callback(result);
+ }
+ };
+ this.commitLog.putMessage(msg , commitLogPutMessageCallback);
}
- public PutMessageResult putMessages(MessageExtBatch messageExtBatch) {
+ public void putMessages(final MessageExtBatch messageExtBatch , final
PutMessageCallback putMessageCallback) {
if (this.shutdown) {
log.warn("DefaultMessageStore has shutdown, so putMessages is
forbidden");
- return new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+ putMessageCallback.callback(new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+ return ;
}
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
@@ -370,7 +381,8 @@ public PutMessageResult putMessages(MessageExtBatch
messageExtBatch) {
log.warn("DefaultMessageStore is in slave mode, so putMessages
is forbidden ");
}
- return new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+ putMessageCallback.callback(new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+ return ;
}
if (!this.runningFlags.isWriteable()) {
@@ -379,39 +391,47 @@ public PutMessageResult putMessages(MessageExtBatch
messageExtBatch) {
log.warn("DefaultMessageStore is not writable, so putMessages
is forbidden " + this.runningFlags.getFlagBits());
}
- return new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
+ putMessageCallback.callback(new
PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null));
+ return ;
} else {
this.printTimes.set(0);
}
if (messageExtBatch.getTopic().length() > Byte.MAX_VALUE) {
log.warn("PutMessages topic length too long " +
messageExtBatch.getTopic().length());
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
null);
+ putMessageCallback.callback(new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+ return ;
}
if (messageExtBatch.getBody().length >
messageStoreConfig.getMaxMessageSize()) {
log.warn("PutMessages body length too long " +
messageExtBatch.getBody().length);
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
null);
+ putMessageCallback.callback(new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+ return ;
}
if (this.isOSPageCacheBusy()) {
- return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY,
null);
+ putMessageCallback.callback(new
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null));
+ return ;
}
- long beginTime = this.getSystemClock().now();
- PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
-
- long eclipseTime = this.getSystemClock().now() - beginTime;
- if (eclipseTime > 500) {
- log.warn("not in lock eclipse time(ms)={}, bodyLength={}",
eclipseTime, messageExtBatch.getBody().length);
- }
- this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
+ final long beginTime = this.getSystemClock().now();
+ CommitLogPutMessageCallback commitLogPutMessageCallback = new
CommitLogPutMessageCallback() {
+ @Override
+ public void callback(PutMessageResult result) {
+ long eclipseTime = getSystemClock().now() - beginTime;
+ if (eclipseTime > 500) {
+ log.warn("not in lock eclipse time(ms)={}, bodyLength={}",
eclipseTime, messageExtBatch.getBody().length);
+ }
+ storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
- if (null == result || !result.isOk()) {
-
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
- }
+ if (null == result || !result.isOk()) {
+
storeStatsService.getPutMessageFailedTimes().incrementAndGet();
+ }
- return result;
+ putMessageCallback.callback(result);
+ }
+ } ;
+ this.commitLog.putMessages(messageExtBatch ,
commitLogPutMessageCallback);
}
@Override
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 907dfe209..4d49bf8f4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -51,21 +51,34 @@
*/
void destroy();
- /**
+/*
+ */
+/**
* Store a message into store.
*
* @param msg Message instance to store
* @return result of store operation.
- */
+ *//*
+
PutMessageResult putMessage(final MessageExtBrokerInner msg);
+*/
+
+ /**
+ * Store a message into store.
+ *
+ * @param msg Message instance to store
+ * @param putMessageCallback callback of put message
+ *
+ */
+ void putMessage(final MessageExtBrokerInner msg , PutMessageCallback
putMessageCallback);
/**
* Store a batch of messages.
*
* @param messageExtBatch Message batch.
- * @return result of storing batch messages.
+ * @param putMessageCallback callback of put message
*/
- PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
+ void putMessages(final MessageExtBatch messageExtBatch ,
PutMessageCallback putMessageCallback);
/**
* Query at most <code>maxMsgNums</code> messages belonging to
<code>topic</code> at <code>queueId</code> starting
diff --git
a/store/src/main/java/org/apache/rocketmq/store/PutMessageCallback.java
b/store/src/main/java/org/apache/rocketmq/store/PutMessageCallback.java
new file mode 100644
index 000000000..d83f3b48b
--- /dev/null
+++ b/store/src/main/java/org/apache/rocketmq/store/PutMessageCallback.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store;
+
+/**
+ * PutMessageCallback is a asynchronous callback for processor process the put
message result
+ */
+public class PutMessageCallback {
+ private final Object waitObject = new Object();
+ private volatile boolean completed = false;
+ private PutMessageResult putMessageResult;
+
+ protected void doAction(PutMessageResult putMessageResult) {
+ //default empty
+ }
+
+ public void callback(PutMessageResult putMessageResult) {
+ doAction(putMessageResult);
+ this.putMessageResult = putMessageResult;
+ completed = true;
+ synchronized (waitObject) {
+ waitObject.notifyAll();
+ }
+ }
+
+ public void waitComplete() throws InterruptedException {
+ waitComplete(-1);
+ }
+
+ public void waitComplete(long timeout) throws InterruptedException {
+ synchronized (waitObject) {
+ if (timeout < 0) {
+ waitObject.wait();
+ }
+ else {
+ waitObject.wait(timeout);
+ }
+ }
+ }
+
+ public boolean isCompleted() {
+ return completed;
+ }
+
+ public PutMessageResult getPutMessageResult() {
+ return putMessageResult;
+ }
+}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 51a8a2703..cec967fec 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -25,10 +25,11 @@
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -253,15 +254,15 @@ public String getServiceName() {
class GroupTransferService extends ServiceThread {
private final WaitNotifyObject notifyTransferObject = new
WaitNotifyObject();
- private volatile List<CommitLog.GroupCommitRequest> requestsWrite =
new ArrayList<>();
- private volatile List<CommitLog.GroupCommitRequest> requestsRead = new
ArrayList<>();
+ private ConcurrentSkipListMap<Long , CommitLog.GroupCommitRequest>
groupCommitRequestConcurrentSkipListMap = new ConcurrentSkipListMap<>() ;
- public synchronized void putRequest(final CommitLog.GroupCommitRequest
request) {
- synchronized (this.requestsWrite) {
- this.requestsWrite.add(request);
+ public void putRequest(final CommitLog.GroupCommitRequest request) {
+ if (request.getNextOffset() >=
HAService.this.push2SlaveMaxOffset.get()) {
+ request.setFlushOK(true);
+ request.getGroupCommitCallback().doSlaveAction(request) ;
}
- if (hasNotified.compareAndSet(false, true)) {
- waitPoint.countDown(); // notify
+ else {
+
groupCommitRequestConcurrentSkipListMap.put(request.getNextOffset() , request) ;
}
}
@@ -269,32 +270,33 @@ public void notifyTransferSome() {
this.notifyTransferObject.wakeup();
}
- private void swapRequests() {
- List<CommitLog.GroupCommitRequest> tmp = this.requestsWrite;
- this.requestsWrite = this.requestsRead;
- this.requestsRead = tmp;
- }
-
+ /**
+ * wait slave fetch message or fetch timeout,then response the produce
request
+ */
private void doWaitTransfer() {
- synchronized (this.requestsRead) {
- if (!this.requestsRead.isEmpty()) {
- for (CommitLog.GroupCommitRequest req : this.requestsRead)
{
- boolean transferOK =
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
- for (int i = 0; !transferOK && i < 5; i++) {
- this.notifyTransferObject.waitForRunning(1000);
- transferOK =
HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
- }
-
- if (!transferOK) {
- log.warn("transfer messsage to slave timeout, " +
req.getNextOffset());
- }
-
- req.wakeupCustomer(transferOK);
+ long waitStart = System.currentTimeMillis();
+ boolean waitTimeout = false ;
+ while (!groupCommitRequestConcurrentSkipListMap.isEmpty() &&
+
(groupCommitRequestConcurrentSkipListMap.firstEntry().getKey() <=
HAService.this.push2SlaveMaxOffset.get()
+ || (waitTimeout = System.currentTimeMillis() -
waitStart > 5000))) {
+ if (waitTimeout) {
+ Long offset =
groupCommitRequestConcurrentSkipListMap.firstEntry().getKey();
+ CommitLog.GroupCommitRequest request =
groupCommitRequestConcurrentSkipListMap.remove(offset) ;
+ request.setFlushOK(false);
+ request.getGroupCommitCallback().doSlaveAction(request);
+ }
+ else {
+ ConcurrentNavigableMap<Long, CommitLog.GroupCommitRequest>
subMap =
groupCommitRequestConcurrentSkipListMap.headMap(HAService.this.push2SlaveMaxOffset.get());
+ for (Long offset : subMap.keySet()) {
+ CommitLog.GroupCommitRequest request =
subMap.remove(offset);
+ request.setFlushOK(true);
+
request.getGroupCommitCallback().doSlaveAction(request);
}
-
- this.requestsRead.clear();
}
+ waitTimeout = false ;
+ waitStart = System.currentTimeMillis();
}
+ this.notifyTransferObject.waitForRunning(1000);
}
public void run() {
@@ -302,7 +304,6 @@ public void run() {
while (!this.isStopped()) {
try {
- this.waitForRunning(10);
this.doWaitTransfer();
} catch (Exception e) {
log.warn(this.getServiceName() + " service has exception.
", e);
@@ -312,11 +313,6 @@ public void run() {
log.info(this.getServiceName() + " service end");
}
- @Override
- protected void onWaitEnd() {
- this.swapRequests();
- }
-
@Override
public String getServiceName() {
return GroupTransferService.class.getSimpleName();
diff --git
a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index 35b8e8565..2b6315bbe 100644
---
a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@ -36,6 +36,7 @@
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageCallback;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
@@ -44,9 +45,8 @@
import org.slf4j.LoggerFactory;
public class ScheduleMessageService extends ConfigManager {
- private static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
-
public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
+ private static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final long FIRST_DELAY_TIME = 1000L;
private static final long DELAY_FOR_A_WHILE = 100L;
private static final long DELAY_FOR_A_PERIOD = 10000L;
@@ -283,11 +283,13 @@ public void executeOnTimeup() {
if (msgExt != null) {
try {
+ PutMessageCallback putMessageCallback
= new PutMessageCallback();
MessageExtBrokerInner msgInner =
this.messageTimeup(msgExt);
- PutMessageResult putMessageResult =
-
ScheduleMessageService.this.defaultMessageStore
- .putMessage(msgInner);
+
ScheduleMessageService.this.defaultMessageStore
+ .putMessage(msgInner,
putMessageCallback);
+ putMessageCallback.waitComplete();
+ PutMessageResult putMessageResult =
putMessageCallback.getPutMessageResult();
if (putMessageResult != null
&&
putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index b7d38f8c7..94e78704e 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -126,7 +126,9 @@ protected void putMsg(DefaultMessageStore master) throws
Exception {
long totalMsgs = 200;
for (long i = 0; i < totalMsgs; i++) {
- master.putMessage(buildMessage());
+ PutMessageCallback putMessageCallback = new PutMessageCallback() ;
+ master.putMessage(buildMessage() , putMessageCallback);
+ putMessageCallback.waitComplete();
}
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 9269cdfa7..e31eee854 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -24,13 +24,12 @@
import java.nio.channels.OverlappingFileLockException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.junit.After;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -108,7 +107,7 @@ public void testWriteAndRead() throws Exception {
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
for (long i = 0; i < totalMsgs; i++) {
- messageStore.putMessage(buildMessage());
+ messageStore.putMessage(buildMessage() , new PutMessageCallback());
}
for (long i = 0; i < totalMsgs; i++) {
@@ -140,7 +139,7 @@ public void testGroupCommit() throws Exception {
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
for (long i = 0; i < totalMsgs; i++) {
- messageStore.putMessage(buildMessage());
+ messageStore.putMessage(buildMessage() , new PutMessageCallback());
}
for (long i = 0; i < totalMsgs; i++) {
@@ -153,7 +152,7 @@ public void testGroupCommit() throws Exception {
private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore
master) {
for (long i = 0; i < totalMsgs; i++) {
- master.putMessage(buildMessage());
+ master.putMessage(buildMessage() , new PutMessageCallback());
}
for (long i = 0; i < totalMsgs; i++) {
@@ -172,7 +171,7 @@ public void testPullSize() throws Exception {
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
- messageStore.putMessage(messageExtBrokerInner);
+ messageStore.putMessage(messageExtBrokerInner , new
PutMessageCallback());
}
//wait for consume queue build
Thread.sleep(10);
@@ -190,7 +189,7 @@ public void testPullSize() throws Exception {
private class MyMessageArrivingListener implements MessageArrivingListener
{
@Override
public void arriving(String topic, int queueId, long logicOffset, long
tagsCode, long msgStoreTime,
- byte[] filterBitMap, Map<String, String>
properties) {
+ byte[] filterBitMap, Map<String, String> properties) {
}
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services