This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/snode by this push:
new 14a75e5 Add embedded start mode(single service) for snode and enode
14a75e5 is described below
commit 14a75e5cccdcceecd86c7aa88d5a2d92248dba57
Author: duhenglucky <[email protected]>
AuthorDate: Fri Feb 22 01:57:39 2019 +0800
Add embedded start mode(single service) for snode and enode
---
.../apache/rocketmq/broker/BrokerController.java | 63 +++++++---
.../org/apache/rocketmq/broker/BrokerStartup.java | 23 ++--
.../processor/AbstractSendMessageProcessor.java | 25 ++--
.../broker/processor/ClientManageProcessor.java | 8 +-
.../broker/processor/SendMessageProcessor.java | 47 ++++---
.../org/apache/rocketmq/common/SnodeConfig.java | 32 +++++
snode/pom.xml | 7 +-
.../org/apache/rocketmq/snode/SnodeController.java | 17 +--
.../org/apache/rocketmq/snode/SnodeStartup.java | 52 +++++---
.../snode/client/SubscriptionGroupManager.java | 34 +----
.../rocketmq/snode/client/SubscriptionManager.java | 5 -
.../snode/client/impl/SubscriptionManagerImpl.java | 15 +--
.../snode/offset/ConsumerOffsetManager.java | 64 +++++-----
.../snode/processor/ConsumerManageProcessor.java | 45 +++++--
.../snode/processor/PullMessageProcessor.java | 6 +-
.../snode/processor/SendMessageProcessor.java | 6 +-
.../mqtthandler/MqttPingreqMessageHandler.java | 4 +-
.../rocketmq/snode/service/AdminService.java | 4 +-
.../rocketmq/snode/service/EnodeService.java | 33 +++--
.../snode/service/impl/ClientServiceImpl.java | 1 +
.../snode/service/impl/LocalEnodeServiceImpl.java | 138 +++++++++++++++++++++
.../snode/service/impl/PushServiceImpl.java | 4 +-
...erviceImpl.java => RemoteEnodeServiceImpl.java} | 67 ++++++----
.../snode/processor/SendMessageProcessorTest.java | 4 +-
...plTest.java => RemoteEnodeServiceImplTest.java} | 10 +-
25 files changed, 481 insertions(+), 233 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index c898366..d9a618b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -121,6 +121,7 @@ public class BrokerController {
private final ClientHousekeepingService clientHousekeepingService;
private final PullMessageProcessor pullMessageProcessor;
private final SnodePullMessageProcessor snodePullMessageProcessor;
+ private final SendMessageProcessor sendProcessor;
private final PullRequestHoldService pullRequestHoldService;
private final MessageArrivingListener messageArrivingListener;
private final Broker2Client broker2Client;
@@ -164,6 +165,9 @@ public class BrokerController {
private TransactionalMessageService transactionalMessageService;
private AbstractTransactionalMessageCheckListener
transactionalMessageCheckListener;
private Future<?> slaveSyncFuture;
+ private ClientManageProcessor clientManageProcessor;
+ private AdminBrokerProcessor adminProcessor;
+ private ConsumerManageProcessor consumerManageProcessor;
public BrokerController(
final BrokerConfig brokerConfig,
@@ -178,6 +182,7 @@ public class BrokerController {
this.consumerOffsetManager = new ConsumerOffsetManager(this);
this.topicConfigManager = new TopicConfigManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
+ this.sendProcessor = new SendMessageProcessor(this);
this.pullRequestHoldService = new PullRequestHoldService(this);
this.messageArrivingListener = new
NotifyMessageArrivingListener(this.pullRequestHoldService);
this.consumerIdsChangeListener = new
DefaultConsumerIdsChangeListener(this);
@@ -543,9 +548,8 @@ public class BrokerController {
/**
* SendMessageProcessor
*/
- SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
- sendProcessor.registerSendMessageHook(sendMessageHookList);
- sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
+ this.sendProcessor.registerSendMessageHook(sendMessageHookList);
+ this.sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE,
sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2,
sendProcessor, this.sendMessageExecutor);
@@ -575,28 +579,28 @@ public class BrokerController {
/**
* ClientManageProcessor
*/
- ClientManageProcessor clientProcessor = new
ClientManageProcessor(this);
- this.remotingServer.registerProcessor(RequestCode.HEART_BEAT,
clientProcessor, this.heartbeatExecutor);
- this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT,
clientProcessor, this.clientManageExecutor);
- this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG,
clientProcessor, this.clientManageExecutor);
- this.remotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC,
clientProcessor, this.clientManageExecutor);
+ this.clientManageProcessor = new ClientManageProcessor(this);
+ this.remotingServer.registerProcessor(RequestCode.HEART_BEAT,
clientManageProcessor, this.heartbeatExecutor);
+ this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT,
clientManageProcessor, this.clientManageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG,
clientManageProcessor, this.clientManageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC,
clientManageProcessor, this.clientManageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT,
clientProcessor, this.heartbeatExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT,
clientProcessor, this.clientManageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG,
clientProcessor, this.clientManageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC,
clientProcessor, this.clientManageExecutor);
+ this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT,
clientManageProcessor, this.heartbeatExecutor);
+
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT,
clientManageProcessor, this.clientManageExecutor);
+
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG,
clientManageProcessor, this.clientManageExecutor);
+
this.fastRemotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC,
clientManageProcessor, this.clientManageExecutor);
/**
* ConsumerManageProcessor
*/
- ConsumerManageProcessor consumerManageProcessor = new
ConsumerManageProcessor(this);
-
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,
consumerManageProcessor, this.consumerManageExecutor);
-
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET,
consumerManageProcessor, this.consumerManageExecutor);
-
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET,
consumerManageProcessor, this.consumerManageExecutor);
+ this.consumerManageProcessor = new ConsumerManageProcessor(this);
+
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,
this.consumerManageProcessor, this.consumerManageExecutor);
+
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET,
this.consumerManageProcessor, this.consumerManageExecutor);
+
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET,
this.consumerManageProcessor, this.consumerManageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,
consumerManageProcessor, this.consumerManageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET,
consumerManageProcessor, this.consumerManageExecutor);
-
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET,
consumerManageProcessor, this.consumerManageExecutor);
+
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,
this.consumerManageProcessor, this.consumerManageExecutor);
+
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET,
this.consumerManageProcessor, this.consumerManageExecutor);
+
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET,
this.consumerManageProcessor, this.consumerManageExecutor);
/**
* EndTransactionProcessor
@@ -607,7 +611,7 @@ public class BrokerController {
/**
* Default
*/
- AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
+ this.adminProcessor = new AdminBrokerProcessor(this);
this.remotingServer.registerDefaultProcessor(adminProcessor,
this.adminBrokerExecutor);
this.fastRemotingServer.registerDefaultProcessor(adminProcessor,
this.adminBrokerExecutor);
}
@@ -1220,4 +1224,23 @@ public class BrokerController {
}
}
+ public SendMessageProcessor getSendProcessor() {
+ return sendProcessor;
+ }
+
+ public ClientManageProcessor getClientManageProcessor() {
+ return clientManageProcessor;
+ }
+
+ public AdminBrokerProcessor getAdminProcessor() {
+ return adminProcessor;
+ }
+
+ public void setAdminProcessor(AdminBrokerProcessor adminProcessor) {
+ this.adminProcessor = adminProcessor;
+ }
+
+ public ConsumerManageProcessor getConsumerManageProcessor() {
+ return consumerManageProcessor;
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 17d2f0e..97a655c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -18,6 +18,11 @@ package org.apache.rocketmq.broker;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
@@ -28,10 +33,10 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -40,12 +45,6 @@ import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
public class BrokerStartup {
@@ -53,6 +52,7 @@ public class BrokerStartup {
public static CommandLine commandLine = null;
public static String configFile = null;
public static InternalLogger log;
+ private static BrokerController brokerController = null;
public static void main(String[] args) {
start(createBrokerController(args));
@@ -238,7 +238,7 @@ public class BrokerStartup {
}
}
}, "ShutdownHook"));
-
+ brokerController = controller;
return controller;
} catch (Throwable e) {
e.printStackTrace();
@@ -260,7 +260,6 @@ public class BrokerStartup {
private static Options buildCommandlineOptions(final Options options) {
-
Option opt = new Option("c", "configFile", true, "Broker config
properties file");
opt.setRequired(false);
options.addOption(opt);
@@ -275,4 +274,8 @@ public class BrokerStartup {
return options;
}
+
+ public static BrokerController getBrokerController() {
+ return brokerController;
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index bd7625a..2e04726 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -17,6 +17,11 @@
package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
@@ -27,8 +32,6 @@ import org.apache.rocketmq.common.constant.DBMsgConstants;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -40,18 +43,14 @@ import
org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.common.utils.ChannelUtil;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
public abstract class AbstractSendMessageProcessor implements RequestProcessor
{
protected static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -159,7 +158,7 @@ public abstract class AbstractSendMessageProcessor
implements RequestProcessor {
return response;
}
- protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
+ protected RemotingCommand msgCheck(final String remoteAddress,
final SendMessageRequestHeader requestHeader, final RemotingCommand
response) {
if
(!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&&
this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic()))
{
@@ -188,11 +187,11 @@ public abstract class AbstractSendMessageProcessor
implements RequestProcessor {
}
}
- log.warn("the topic {} not exist, producer: {}",
requestHeader.getTopic(), ctx.channel().remoteAddress());
+ log.warn("the topic {} not exist, producer: {}",
requestHeader.getTopic(), remoteAddress);
topicConfig =
this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+ remoteAddress,
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
if (null == topicConfig) {
@@ -218,7 +217,7 @@ public abstract class AbstractSendMessageProcessor
implements RequestProcessor {
String errorInfo = String.format("request queueId[%d] is illegal,
%s Producer: %s",
queueIdInt,
topicConfig.toString(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ remoteAddress);
log.warn(errorInfo);
response.setCode(ResponseCode.SYSTEM_ERROR);
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index e7a1e73..e5a95e9 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -207,9 +207,8 @@ public class ClientManageProcessor implements
RequestProcessor {
private RemotingCommand createRetryTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
- final CreateRetryTopicRequestHeader requestHeader =
- (CreateRetryTopicRequestHeader) request
-
.decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class);
+ final CreateRetryTopicRequestHeader requestHeader =
(CreateRetryTopicRequestHeader) request
+ .decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class);
if (requestHeader.getGroupName() != null) {
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroupName());
@@ -217,7 +216,8 @@ public class ClientManageProcessor implements
RequestProcessor {
createRetryTopic(false, requestHeader.getGroupName(),
subscriptionGroupConfig.getRetryQueueNums());
}
}
-
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
return response;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 3557a17..1408cce 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -71,7 +72,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
- return this.consumerSendMsgBack(ctx, request);
+ return this.consumerSendMsgBack(request);
default:
SendMessageRequestHeader requestHeader =
parseRequestHeader(request);
if (requestHeader == null) {
@@ -82,10 +83,13 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
this.executeSendMessageHookBefore(ctx, request,
mqtraceContext);
RemotingCommand response;
+// String remoteAddress =
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ SocketAddress bornHost = ctx.channel().remoteAddress();
+
if (requestHeader.isBatch()) {
- response = this.sendBatchMessage(ctx, request,
mqtraceContext, requestHeader);
+ response = this.sendBatchMessage(bornHost, request,
mqtraceContext, requestHeader);
} else {
- response = this.sendMessage(ctx, request, mqtraceContext,
requestHeader);
+ response = this.sendMessage(bornHost, request,
mqtraceContext, requestHeader);
}
this.executeSendMessageHookAfter(response, mqtraceContext);
@@ -99,7 +103,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
this.brokerController.getMessageStore().isTransientStorePoolDeficient();
}
- private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext
ctx, final RemotingCommand request)
+ private RemotingCommand consumerSendMsgBack(final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
@@ -296,7 +300,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
return true;
}
- private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
+ private RemotingCommand sendMessage(final SocketAddress remoteAddress,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws
RemotingCommandException {
@@ -319,7 +323,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
}
response.setCode(-1);
- super.msgCheck(ctx, requestHeader, response);
+ super.msgCheck(RemotingHelper.parseChannelRemoteAddr(remoteAddress),
requestHeader, response);
if (response.getCode() != -1) {
return response;
}
@@ -346,9 +350,12 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
MessageAccessor.setProperties(msgInner,
MessageDecoder.string2messageProperties(requestHeader.getProperties()));
msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
- msgInner.setBornHost(ctx.channel().remoteAddress());
+ msgInner.setBornHost(requestHeader.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ?
0 : requestHeader.getReconsumeTimes());
+ msgInner.setBornHost(remoteAddress);
+// ByteBuffer hostHolder = ByteBuffer.allocate(8);
+// String bornHost = msgInner.getStoreHostBytes(hostHolder).toString();
PutMessageResult putMessageResult = null;
Map<String, String> oriProps =
MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag =
oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
@@ -365,14 +372,14 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
putMessageResult =
this.brokerController.getMessageStore().putMessage(msgInner);
}
- return handlePutMessageResult(putMessageResult, response, request,
msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
+ return handlePutMessageResult(putMessageResult, response, request,
msgInner, responseHeader, sendMessageContext, queueIdInt, remoteAddress);
}
private RemotingCommand handlePutMessageResult(PutMessageResult
putMessageResult, RemotingCommand response,
RemotingCommand request, MessageExt msg,
- SendMessageResponseHeader responseHeader, SendMessageContext
sendMessageContext, ChannelHandlerContext ctx,
- int queueIdInt) {
+ SendMessageResponseHeader responseHeader, SendMessageContext
sendMessageContext,
+ int queueIdInt, SocketAddress bornHost) {
if (putMessageResult == null) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("store putMessage return null");
@@ -445,8 +452,8 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
responseHeader.setCommitLogOffset(putMessageResult.getAppendMessageResult().getWroteOffset());
responseHeader.setStoreTimestamp(putMessageResult.getAppendMessageResult().getStoreTimestamp());
responseHeader.setStoreSize(putMessageResult.getAppendMessageResult().getWroteBytes());
-
responseHeader.setStoreHost(ctx.channel().localAddress().toString());
- doResponse(ctx, request, response);
+
responseHeader.setStoreHost(RemotingHelper.parseChannelRemoteAddr(bornHost));
+// doResponse(ctx, request, response);
if (hasSendMessageHook()) {
sendMessageContext.setMsgId(responseHeader.getMsgId());
@@ -462,6 +469,10 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
sendMessageContext.setCommercialSendSize(wroteSize);
sendMessageContext.setCommercialOwner(owner);
}
+ if (!request.isOnewayRPC()) {
+ response.setCustomHeader(responseHeader);
+ return response;
+ }
return null;
} else {
if (hasSendMessageHook()) {
@@ -477,7 +488,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
return response;
}
- private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
+ private RemotingCommand sendBatchMessage(final SocketAddress remoteAddress,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws
RemotingCommandException {
@@ -500,7 +511,7 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
}
response.setCode(-1);
- super.msgCheck(ctx, requestHeader, response);
+ super.msgCheck(RemotingHelper.parseChannelRemoteAddr(remoteAddress),
requestHeader, response);
if (response.getCode() != -1) {
return response;
}
@@ -537,13 +548,17 @@ public class SendMessageProcessor extends
AbstractSendMessageProcessor implement
MessageAccessor.setProperties(messageExtBatch,
MessageDecoder.string2messageProperties(requestHeader.getProperties()));
messageExtBatch.setBody(request.getBody());
messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
- messageExtBatch.setBornHost(ctx.channel().remoteAddress());
+ messageExtBatch.setBornHost(requestHeader.getBornHost());
messageExtBatch.setStoreHost(this.getStoreHost());
+
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() ==
null ? 0 : requestHeader.getReconsumeTimes());
+// ByteBuffer hostHolder = ByteBuffer.allocate(8);
+// String storeHost =
messageExtBatch.getStoreHostBytes(hostHolder).toString();
+
PutMessageResult putMessageResult =
this.brokerController.getMessageStore().putMessages(messageExtBatch);
- return handlePutMessageResult(putMessageResult, response, request,
messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);
+ return handlePutMessageResult(putMessageResult, response, request,
messageExtBatch, responseHeader, sendMessageContext, queueIdInt, storeHost);
}
public boolean hasConsumeMessageHook() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
index abe1a57..dd38075 100644
--- a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
@@ -22,6 +22,8 @@ import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.common.RemotingUtil;
public class SnodeConfig {
@@ -32,6 +34,10 @@ public class SnodeConfig {
private String rocketmqHome =
System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+ private ServerConfig nettyServerConfig;
+
+ private ClientConfig nettyClientConfig;
+
@ImportantField
private String namesrvAddr =
System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY,
System.getenv(MixAll.NAMESRV_ADDR_ENV));
@@ -106,6 +112,9 @@ public class SnodeConfig {
@ImportantField
private boolean aclEnable = false;
+ @ImportantField
+ private boolean embeddedModeEnable = true;
+
public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) {
this.snodeHeartBeatInterval = snodeHeartBeatInterval;
}
@@ -410,4 +419,27 @@ public class SnodeConfig {
this.loadOffsetInterval = loadOffsetInterval;
}
+ public boolean isEmbeddedModeEnable() {
+ return embeddedModeEnable;
+ }
+
+ public void setEmbeddedModeEnable(boolean embeddedModeEnable) {
+ this.embeddedModeEnable = embeddedModeEnable;
+ }
+
+ public ServerConfig getNettyServerConfig() {
+ return nettyServerConfig;
+ }
+
+ public void setNettyServerConfig(ServerConfig nettyServerConfig) {
+ this.nettyServerConfig = nettyServerConfig;
+ }
+
+ public ClientConfig getNettyClientConfig() {
+ return nettyClientConfig;
+ }
+
+ public void setNettyClientConfig(ClientConfig nettyClientConfig) {
+ this.nettyClientConfig = nettyClientConfig;
+ }
}
diff --git a/snode/pom.xml b/snode/pom.xml
index bb28f64..da1e3f3 100644
--- a/snode/pom.xml
+++ b/snode/pom.xml
@@ -15,7 +15,8 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
@@ -92,6 +93,10 @@
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>rocketmq-broker</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 6523d8b..5ca8a19 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
+import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -69,10 +70,11 @@ import org.apache.rocketmq.snode.service.PushService;
import org.apache.rocketmq.snode.service.ScheduledService;
import org.apache.rocketmq.snode.service.WillMessageService;
import org.apache.rocketmq.snode.service.impl.ClientServiceImpl;
-import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
+import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl;
import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.PushServiceImpl;
+import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl;
@@ -127,7 +129,11 @@ public class SnodeController {
this.nettyClientConfig = nettyClientConfig;
this.nettyServerConfig = nettyServerConfig;
this.snodeConfig = snodeConfig;
- this.enodeService = new EnodeServiceImpl(this);
+ if (!this.snodeConfig.isEmbeddedModeEnable()) {
+ this.enodeService = new RemoteEnodeServiceImpl(this);
+ } else {
+ this.enodeService = new
LocalEnodeServiceImpl(BrokerStartup.getBrokerController());
+ }
this.nnodeService = new NnodeServiceImpl(this);
this.scheduledService = new ScheduledServiceImpl(this);
this.remotingClient =
RemotingClientFactory.getInstance().createRemotingClient()
@@ -163,7 +169,6 @@ public class SnodeController {
"SnodeHeartbeatThread",
true);
-
this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
snodeConfig.getSnodeSendMessageMinPoolSize(),
snodeConfig.getSnodeSendMessageMaxPoolSize(),
@@ -228,10 +233,8 @@ public class SnodeController {
}
public boolean initialize() {
- this.snodeServer =
RemotingServerFactory.getInstance().createRemotingServer()
- .init(this.nettyServerConfig, this.clientHousekeepingService);
- this.mqttRemotingServer =
RemotingServerFactory.getInstance().createRemotingServer(
- RemotingUtil.MQTT_PROTOCOL)
+ this.snodeServer =
RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig,
this.clientHousekeepingService);
+ this.mqttRemotingServer =
RemotingServerFactory.getInstance().createRemotingServer(RemotingUtil.MQTT_PROTOCOL)
.init(this.nettyServerConfig, this.clientHousekeepingService);
this.registerProcessor();
initSnodeInterceptorGroup();
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
index 232def9..23ea68a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
@@ -16,8 +16,6 @@
*/
package org.apache.rocketmq.snode;
-import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
-
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
import ch.qos.logback.core.joran.spi.JoranException;
@@ -31,6 +29,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.broker.BrokerStartup;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.SnodeConfig;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -44,6 +43,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.slf4j.LoggerFactory;
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
+
public class SnodeStartup {
private static InternalLogger log;
public static Properties properties = null;
@@ -51,13 +52,17 @@ public class SnodeStartup {
public static String configFile = null;
public static void main(String[] args) throws IOException, JoranException {
- startup(createSnodeController(args));
+ SnodeConfig snodeConfig = loadConfig(args);
+ if (snodeConfig.isEmbeddedModeEnable()) {
+ BrokerStartup.start(BrokerStartup.createBrokerController(args));
+ }
+ SnodeController snodeController = createSnodeController(snodeConfig);
+ startup(snodeController);
}
public static SnodeController startup(SnodeController controller) {
try {
controller.start();
-
String tip = "The snode[" +
controller.getSnodeConfig().getSnodeName() + ", "
+ controller.getSnodeConfig().getSnodeIP1() + "] boot success.
serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
@@ -74,7 +79,7 @@ public class SnodeStartup {
return null;
}
- public static SnodeController createSnodeController(String[] args) throws
IOException, JoranException {
+ public static SnodeConfig loadConfig(String[] args) throws IOException {
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("snode", args,
buildCommandlineOptions(options),
new PosixParser());
@@ -82,12 +87,11 @@ public class SnodeStartup {
System.exit(-1);
}
- final SnodeConfig snodeConfig = new SnodeConfig();
+ SnodeConfig snodeConfig = new SnodeConfig();
final ServerConfig nettyServerConfig = new ServerConfig();
final ClientConfig nettyClientConfig = new ClientConfig();
nettyServerConfig.setListenPort(snodeConfig.getListenPort());
- nettyServerConfig.setListenPort(11911);
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
@@ -101,27 +105,28 @@ public class SnodeStartup {
MixAll.properties2Object(properties, snodeConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
+
in.close();
}
}
+ snodeConfig.setNettyServerConfig(nettyServerConfig);
+ snodeConfig.setNettyClientConfig(nettyClientConfig);
if (null == snodeConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment
to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
- LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
- JoranConfigurator configurator = new JoranConfigurator();
- configurator.setContext(lc);
- lc.reset();
- configurator.doConfigure(snodeConfig.getRocketmqHome() +
"/conf/logback_snode.xml");
- log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
-
MixAll.printObjectProperties(log, snodeConfig);
- MixAll.printObjectProperties(log, nettyClientConfig);
- MixAll.printObjectProperties(log, nettyServerConfig);
+ MixAll.printObjectProperties(log, snodeConfig.getNettyServerConfig());
+ MixAll.printObjectProperties(log, snodeConfig.getNettyClientConfig());
+ return snodeConfig;
+ }
+
+ public static SnodeController createSnodeController(SnodeConfig
snodeConfig) throws JoranException {
+
final SnodeController snodeController = new SnodeController(
- nettyServerConfig,
- nettyClientConfig,
+ snodeConfig.getNettyServerConfig(),
+ snodeConfig.getNettyClientConfig(),
snodeConfig);
boolean initResult = snodeController.initialize();
@@ -148,7 +153,14 @@ public class SnodeStartup {
}
}
}
- },"ShutdownHook"));
+ }, "ShutdownHook"));
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ JoranConfigurator configurator = new JoranConfigurator();
+ configurator.setContext(lc);
+ lc.reset();
+ configurator.doConfigure(snodeConfig.getRocketmqHome() +
"/conf/logback_snode.xml");
+ log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
return snodeController;
}
@@ -164,7 +176,7 @@ public class SnodeStartup {
opt = new Option("m", "printImportantConfig", false, "Print important
config item");
opt.setRequired(false);
options.addOption(opt);
-
+
return options;
}
}
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
index ab013b8..47f10c9 100644
---
a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
+++
b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
@@ -42,19 +42,6 @@ public class SubscriptionGroupManager {
private void init() {
}
- public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig
config) {
- SubscriptionGroupConfig old =
this.subscriptionGroupTable.put(config.getGroupName(), config);
- if (old != null) {
- log.info("Update subscription group config, old: {} new: {}", old,
config);
- } else {
- log.info("Create new subscription group, {}", config);
- }
-
- this.dataVersion.nextVersion();
-
- this.persistSubscription(config);
- }
-
public void disableConsume(final String groupName) {
SubscriptionGroupConfig old =
this.subscriptionGroupTable.get(groupName);
if (old != null) {
@@ -63,7 +50,8 @@ public class SubscriptionGroupManager {
}
}
- public SubscriptionGroupConfig findSubscriptionGroupConfig(final String
group) {
+ public SubscriptionGroupConfig findSubscriptionGroupConfig(
+ final String group) {
SubscriptionGroupConfig subscriptionGroupConfig =
this.subscriptionGroupTable.get(group);
if (null == subscriptionGroupConfig) {
if
(snodeController.getSnodeConfig().isAutoCreateSubscriptionGroup() ||
MixAll.isSysConsumerGroup(group)) {
@@ -74,15 +62,13 @@ public class SubscriptionGroupManager {
log.info("Auto create a subscription group, {}",
subscriptionGroupConfig.toString());
}
this.dataVersion.nextVersion();
- this.persistSubscription(subscriptionGroupConfig);
+
this.snodeController.getEnodeService().persistSubscriptionGroupConfig(subscriptionGroupConfig);
}
}
return subscriptionGroupConfig;
}
-
-
public ConcurrentMap<String, SubscriptionGroupConfig>
getSubscriptionGroupTable() {
return subscriptionGroupTable;
}
@@ -91,18 +77,4 @@ public class SubscriptionGroupManager {
return dataVersion;
}
- public void deleteSubscriptionGroupConfig(final String groupName) {
- SubscriptionGroupConfig old =
this.subscriptionGroupTable.remove(groupName);
- if (old != null) {
- log.info("delete subscription group OK, subscription group:{}",
old);
- this.dataVersion.nextVersion();
- this.persistSubscription(old);
- } else {
- log.warn("delete subscription group failed, subscription
groupName: {} not exist", groupName);
- }
- }
-
- void persistSubscription(SubscriptionGroupConfig config) {
-
this.snodeController.getEnodeService().persistSubscriptionGroupConfig(config);
- }
}
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
index a164fc4..a648ba8 100644
---
a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
+++
b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
@@ -29,11 +29,6 @@ public interface SubscriptionManager {
boolean subscribe(String groupId, Set<SubscriptionData>
subscriptionDataSet, ConsumeType consumeType,
MessageModel messageModel, ConsumeFromWhere consumeFromWhere);
- void unSubscribe(String groupId, RemotingChannel remotingChannel,
- Set<SubscriptionData> subscriptionDataSet);
-
- void cleanSubscription(String groupId, String topic);
-
Subscription getSubscription(String groupId);
void registerPushSession(Set<SubscriptionData> subscriptionDataSet,
RemotingChannel remotingChannel,
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
index 1f5ecd8..4a4a35e 100644
---
a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+++
b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
@@ -44,7 +44,7 @@ public class SubscriptionManagerImpl implements
SubscriptionManager {
@Override
public void registerPushSession(Set<SubscriptionData> subscriptionDataSet,
RemotingChannel remotingChannel,
String groupId) {
- log.debug("Before ConsumerGroup: {} RemotingChannel: {} subscription:
{}", groupId, remotingChannel.remoteAddress(), subscriptionDataSet);
+ log.info("Before ConsumerGroup: {} RemotingChannel: {} subscription:
{}", groupId, remotingChannel.remoteAddress(), subscriptionDataSet);
Set<MessageQueue> prevSubSet =
this.clientSubscriptionTable.get(remotingChannel);
Set<MessageQueue> keySet = new HashSet<>();
for (SubscriptionData subscriptionData : subscriptionDataSet) {
@@ -77,7 +77,7 @@ public class SubscriptionManagerImpl implements
SubscriptionManager {
}
}
}
- log.debug("After ConsumerGroup: {} RemotingChannel: {} subscription:
{}", groupId, remotingChannel.remoteAddress(),
this.clientSubscriptionTable.get(remotingChannel));
+ log.info("After ConsumerGroup: {} RemotingChannel: {} subscription:
{}", groupId, remotingChannel.remoteAddress(),
this.clientSubscriptionTable.get(remotingChannel));
}
@Override
@@ -192,17 +192,6 @@ public class SubscriptionManagerImpl implements
SubscriptionManager {
}
@Override
- public void unSubscribe(String groupId, RemotingChannel remotingChannel,
- Set<SubscriptionData> subscriptionDataSet) {
-
- }
-
- @Override
- public void cleanSubscription(String groupId, String topic) {
-
- }
-
- @Override
public Subscription getSubscription(String groupId) {
return groupSubscriptionTable.get(groupId);
}
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
index d9b23b4..ba2a800 100644
---
a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
+++
b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
@@ -20,13 +20,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
-import
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.exception.SnodeException;
@@ -79,17 +75,18 @@ public class ConsumerOffsetManager {
}
}
- private long parserOffset(final String enodeName, final String group,
final String topic, final int queueId) {
- try {
- RemotingCommand remotingCommand = queryOffset(enodeName, group,
topic, queueId);
- QueryConsumerOffsetResponseHeader responseHeader =
- (QueryConsumerOffsetResponseHeader)
remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
- return responseHeader.getOffset();
- } catch (Exception ex) {
- log.error("Load offset from broker error", ex);
- }
- return -1;
- }
+// private long parserOffset(final RemotingChannel remotingChannel, final
String enodeName, final String group,
+// final String topic, final int queueId) {
+// try {
+// RemotingCommand remotingCommand = queryOffset(remotingChannel,
enodeName, group, topic, queueId);
+// QueryConsumerOffsetResponseHeader responseHeader =
+// (QueryConsumerOffsetResponseHeader)
remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+// return responseHeader.getOffset();
+// } catch (Exception ex) {
+// log.error("Load offset from broker error", ex);
+// }
+// return -1;
+// }
public long queryCacheOffset(final String enodeName, final String group,
final String topic, final int queueId) {
String key = buildKey(enodeName, topic, group);
@@ -99,30 +96,33 @@ public class ConsumerOffsetManager {
map = this.offsetTable.putIfAbsent(key, map);
}
CacheOffset cacheOffset = map.get(queueId);
- if (cacheOffset != null) {
- if (System.currentTimeMillis() - cacheOffset.getUpdateTimestamp()
> snodeController.getSnodeConfig().getLoadOffsetInterval()) {
- cacheOffset.setOffset(parserOffset(enodeName, group, topic,
queueId));
- cacheOffset.setUpdateTimestamp(System.currentTimeMillis());
+ try {
+ if (cacheOffset != null) {
+ if
(!this.snodeController.getSnodeConfig().isEmbeddedModeEnable() &&
System.currentTimeMillis() - cacheOffset.getUpdateTimestamp() >
snodeController.getSnodeConfig().getLoadOffsetInterval()) {
+ long offset =
this.snodeController.getEnodeService().queryOffset(enodeName, group, topic,
queueId);
+ cacheOffset.setOffset(offset);
+ cacheOffset.setUpdateTimestamp(System.currentTimeMillis());
+ } else {
+ long offset =
this.snodeController.getEnodeService().queryOffset(enodeName, group, topic,
queueId);
+ cacheOffset.setOffset(offset);
+ }
+ } else {
+ long offset =
this.snodeController.getEnodeService().queryOffset(enodeName, group, topic,
queueId);
+ cacheOffset = new CacheOffset(key, offset,
System.currentTimeMillis());
+ map.put(queueId, cacheOffset);
}
- } else {
- cacheOffset = new CacheOffset(key, parserOffset(enodeName, group,
topic, queueId), System.currentTimeMillis());
- map.put(queueId, cacheOffset);
+ } catch (Exception ex) {
+ log.warn("Load offset error, enodeName: {}, group:{},topic:{}
queueId:{}", enodeName, group, topic, queueId);
}
return cacheOffset.getOffset();
-
}
- public void commitOffset(final String enodeName, final String clientHost,
final String group, final String topic,
+ public void commitOffset(final RemotingChannel remotingChannel, final
String enodeName, final String clientHost,
+ final String group, final String topic,
final int queueId,
final long offset) {
cacheOffset(enodeName, clientHost, group, topic, queueId, offset);
- this.snodeController.getEnodeService().persistOffset(enodeName, group,
topic, queueId, offset);
- }
-
- public RemotingCommand queryOffset(final String enodeName, final String
group, final String topic,
- final int queueId) throws InterruptedException,
RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException {
- return this.snodeController.getEnodeService().loadOffset(enodeName,
group, topic, queueId);
+ this.snodeController.getEnodeService().persistOffset(remotingChannel,
enodeName, group, topic, queueId, offset);
}
public class CacheOffset {
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
index a000589..1953de1 100644
---
a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
+++
b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -25,9 +25,13 @@ import
org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestH
import
org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
import
org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
import org.apache.rocketmq.logging.InternalLogger;
@@ -87,7 +91,14 @@ public class ConsumerManageProcessor implements
RequestProcessor {
(SearchOffsetRequestHeader) request
.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
try {
- return
this.snodeController.getEnodeService().getOffsetByTimestamp(requestHeader.getEnodeName(),
request);
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
+ final SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader) response.readCustomHeader();
+
+ long offset =
this.snodeController.getEnodeService().getOffsetByTimestamp(requestHeader.getEnodeName(),
+ requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getTimestamp(), request);
+ responseHeader.setOffset(offset);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
} catch (Exception ex) {
log.error("Search offset by timestamp error:{}", ex);
}
@@ -100,7 +111,14 @@ public class ConsumerManageProcessor implements
RequestProcessor {
(GetMinOffsetRequestHeader) request
.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
try {
- return
this.snodeController.getEnodeService().getMinOffsetInQueue(requestHeader.getEnodeName(),
requestHeader.getTopic(), requestHeader.getQueueId());
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
+ final GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader) response.readCustomHeader();
+
+ long offset =
this.snodeController.getEnodeService().getMinOffsetInQueue(requestHeader.getEnodeName(),
+ requestHeader.getTopic(), requestHeader.getQueueId(), request);
+ responseHeader.setOffset(offset);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
} catch (Exception ex) {
log.error("Get min offset error:{}", ex);
}
@@ -113,9 +131,16 @@ public class ConsumerManageProcessor implements
RequestProcessor {
(GetMaxOffsetRequestHeader) request
.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
try {
- return
this.snodeController.getEnodeService().getMaxOffsetInQueue(requestHeader.getEnodeName(),
request);
+ long offset =
this.snodeController.getEnodeService().getMaxOffsetInQueue(requestHeader.getEnodeName(),
+ requestHeader.getTopic(), requestHeader.getQueueId(), request);
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
+ final GetMaxOffsetResponseHeader responseHeader =
(GetMaxOffsetResponseHeader) response.readCustomHeader();
+ responseHeader.setOffset(offset);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
} catch (Exception ex) {
- log.error("Get min offset error:{}", ex);
+ log.error("Get min offset error, remoting: {} error: {} ",
remotingChannel.remoteAddress(), ex);
}
return null;
}
@@ -153,7 +178,7 @@ public class ConsumerManageProcessor implements
RequestProcessor {
final UpdateConsumerOffsetRequestHeader requestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
-
this.snodeController.getConsumerOffsetManager().commitOffset(requestHeader.getEnodeName(),
RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()),
requestHeader.getConsumerGroup(),
+
this.snodeController.getConsumerOffsetManager().commitOffset(remotingChannel,
requestHeader.getEnodeName(),
RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()),
requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getCommitOffset());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
@@ -171,9 +196,15 @@ public class ConsumerManageProcessor implements
RequestProcessor {
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId());
- return
this.snodeController.getConsumerOffsetManager().queryOffset(requestHeader.getEnodeName(),
requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+ long offset =
this.snodeController.getEnodeService().queryOffset(requestHeader.getEnodeName(),
requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId());
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
+ final QueryConsumerOffsetResponseHeader responseHeader =
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
+ responseHeader.setOffset(offset);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
}
public RemotingCommand createRetryTopic(RemotingChannel remotingChannel,
@@ -181,7 +212,7 @@ public class ConsumerManageProcessor implements
RequestProcessor {
RemotingSendRequestException, RemotingConnectException,
RemotingCommandException {
final CreateRetryTopicRequestHeader requestHeader =
(CreateRetryTopicRequestHeader)
request.decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class);
requestHeader.getEnodeName();
- return
this.snodeController.getEnodeService().creatRetryTopic(requestHeader.getEnodeName(),
request);
+ return
this.snodeController.getEnodeService().creatRetryTopic(remotingChannel,
requestHeader.getEnodeName(), request);
}
}
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
index fc13f5e..95982bc 100644
---
a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++
b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -127,14 +127,16 @@ public class PullMessageProcessor implements
RequestProcessor {
}
}
- CompletableFuture<RemotingCommand> responseFuture =
snodeController.getEnodeService().pullMessage(requestHeader.getEnodeName(),
request);
+ CompletableFuture<RemotingCommand> responseFuture =
snodeController.getEnodeService().pullMessage(remotingChannel,
requestHeader.getEnodeName(), request);
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
if (this.snodeController.getConsumeMessageInterceptorGroup()
!= null) {
ResponseContext responseContext = new
ResponseContext(request, remotingChannel, data);
this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
}
- remotingChannel.reply(data);
+ if (data != null) {
+ remotingChannel.reply(data);
+ }
} else {
if (this.snodeController.getConsumeMessageInterceptorGroup()
!= null) {
ExceptionContext exceptionContext = new
ExceptionContext(request, remotingChannel, ex, null);
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index c3fd2fe..019794e 100644
---
a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++
b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -83,11 +83,11 @@ public class SendMessageProcessor implements
RequestProcessor {
stringBuffer.append(MixAll.getRetryTopic(consumerSendMsgBackRequestHeader.getGroup()));
}
- CompletableFuture<RemotingCommand> responseFuture =
snodeController.getEnodeService().sendMessage(enodeName, request);
+ CompletableFuture<RemotingCommand> responseFuture =
snodeController.getEnodeService().sendMessage(remotingChannel, enodeName,
request);
sendMessageRequestHeaderV2.setO(remotingChannel.remoteAddress());
final byte[] message = request.getBody();
- final boolean isNeedPush = !isSendBack;
+ final boolean needPush = !isSendBack;
final SendMessageRequestHeader sendMessageRequestHeader =
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(sendMessageRequestHeaderV2);
responseFuture.whenComplete((data, ex) -> {
@@ -98,7 +98,7 @@ public class SendMessageProcessor implements RequestProcessor
{
}
remotingChannel.reply(data);
this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(),
request.getBody().length);
- if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
+ if (data.getCode() == ResponseCode.SUCCESS && needPush) {
this.snodeController.getPushService().pushMessage(sendMessageRequestHeader,
message, data);
}
} else {
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java
b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java
index 11f8f18..1f75fcc 100644
---
a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java
+++
b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java
@@ -29,6 +29,7 @@ public class MqttPingreqMessageHandler implements
MessageHandler {
public MqttPingreqMessageHandler(SnodeController snodeController) {
this.snodeController = snodeController;
}
+
/**
* handle the PINGREQ message from client
* <ol>
@@ -41,7 +42,8 @@ public class MqttPingreqMessageHandler implements
MessageHandler {
* @param message
* @return
*/
- @Override public RemotingCommand handleMessage(MqttMessage message,
RemotingChannel remotingChannel) {
+ @Override
+ public RemotingCommand handleMessage(MqttMessage message, RemotingChannel
remotingChannel) {
return null;
}
}
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java
b/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java
index 560233a..774a2a6 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.snode.service;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +14,7 @@ package org.apache.rocketmq.snode.service;/*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+package org.apache.rocketmq.snode.service;
public interface AdminService {
}
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
index a4fa28c..e2db16e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
@@ -21,6 +21,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -42,7 +43,8 @@ public interface EnodeService {
* @param request {@link SendMessageRequestHeaderV2} Send message request
header
* @return Send message response future
*/
- CompletableFuture<RemotingCommand> sendMessage(final String enodeName,
final RemotingCommand request);
+ CompletableFuture<RemotingCommand> sendMessage(final RemotingChannel
remotingChannel, final String enodeName,
+ final RemotingCommand request);
/**
* Pull message from enode server.
@@ -51,7 +53,8 @@ public interface EnodeService {
* @param request {@link PullMessageRequestHeader} Pull message request
header
* @return Pull message Response future
*/
- CompletableFuture<RemotingCommand> pullMessage(final String enodeName,
final RemotingCommand request);
+ CompletableFuture<RemotingCommand> pullMessage(final RemotingChannel
remotingChannel, final String enodeName,
+ final RemotingCommand request);
/**
* Create retry topic in enode server.
@@ -64,7 +67,7 @@ public interface EnodeService {
* @throws RemotingSendRequestException
* @throws RemotingConnectException
*/
- RemotingCommand creatRetryTopic(String enodeName,
+ RemotingCommand creatRetryTopic(final RemotingChannel remotingChannel,
String enodeName,
RemotingCommand request) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException;
/**
@@ -98,20 +101,24 @@ public interface EnodeService {
* @param queueId QueueId of related topic.
* @param offset Current offset of target queue of subscribed topic.
*/
- void persistOffset(String enodeName, String groupName, String topic, int
queueId, long offset);
+ void persistOffset(final RemotingChannel remotingChannel, String
enodeName, String groupName, String topic,
+ int queueId, long offset);
- RemotingCommand loadOffset(String enodeName, String consumerGroup, String
topic,
- int queueId) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException;
+ long queryOffset(String enodeName, String consumerGroup,
+ String topic, int queueId) throws InterruptedException,
RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException,
RemotingCommandException;
- RemotingCommand getMaxOffsetInQueue(String enodeName,
- RemotingCommand request) throws InterruptedException,
RemotingTimeoutException,
+ long getMaxOffsetInQueue(String enodeName, String topic,
+ int queueId, RemotingCommand request) throws InterruptedException,
RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException,
RemotingCommandException;
- RemotingCommand getMinOffsetInQueue(String enodeName, String topic,
- int queueId) throws InterruptedException, RemotingTimeoutException,
+ long getMinOffsetInQueue(String enodeName, String topic,
+ int queueId, RemotingCommand request) throws InterruptedException,
RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException,
RemotingCommandException;
- RemotingCommand getOffsetByTimestamp(String enodeName,
- RemotingCommand request) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException;
+ long getOffsetByTimestamp(String enodeName,
+ String topic, int queueId,
+ long timestamp,
+ RemotingCommand request) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
+ RemotingConnectException, RemotingCommandException;
}
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
index f170ae1..aaed690 100644
---
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
+++
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.snode.service.impl;
+
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
new file mode 100644
index 0000000..5789104
--- /dev/null
+++
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.service.impl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.netty.CodecHelper;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.service.EnodeService;
+
+public class LocalEnodeServiceImpl implements EnodeService {
+
+ private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+ private BrokerController brokerController;
+
+ public LocalEnodeServiceImpl(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ @Override public void sendHeartbeat(RemotingCommand remotingCommand) {
+ return;
+ }
+
+ @Override
+ public CompletableFuture<RemotingCommand> sendMessage(final
RemotingChannel remotingChannel, String enodeName,
+ RemotingCommand request) {
+ CompletableFuture<RemotingCommand> completableFuture = new
CompletableFuture<>();
+ try {
+ log.debug("Send message request:{}", request);
+ RemotingCommand remotingCommand =
this.brokerController.getSendProcessor().processRequest(remotingChannel,
request);
+ CodecHelper.encodeHeader(remotingCommand);
+ completableFuture.complete(remotingCommand);
+ } catch (Exception ex) {
+ log.error("[Local]Request local enode send message error", ex);
+ completableFuture.completeExceptionally(ex);
+ }
+ return completableFuture;
+ }
+
+ @Override
+ public CompletableFuture<RemotingCommand> pullMessage(RemotingChannel
remotingChannel, String enodeName,
+ RemotingCommand request) {
+ CompletableFuture<RemotingCommand> completableFuture = new
CompletableFuture<>();
+ try {
+ RemotingCommand response =
this.brokerController.getSnodePullMessageProcessor().processRequest(remotingChannel,
request);
+ completableFuture.complete(response);
+ } catch (Exception ex) {
+ log.error("[Local]Request local enode pull message error", ex);
+ completableFuture.completeExceptionally(ex);
+ }
+ return completableFuture;
+ }
+
+ @Override public RemotingCommand creatRetryTopic(RemotingChannel
remotingChannel, String enodeName,
+ RemotingCommand request) {
+ try {
+ return
this.brokerController.getClientManageProcessor().processRequest(remotingChannel,
request);
+ } catch (Exception ex) {
+ log.error("[Local]Request create retry topic error", ex);
+ }
+ return null;
+ }
+
+ @Override public void updateEnodeAddress(
+ String clusterName) {
+
+ }
+
+ @Override public boolean persistSubscriptionGroupConfig(
+ final SubscriptionGroupConfig subscriptionGroupConfig) {
+ boolean persist = false;
+ if (subscriptionGroupConfig != null) {
+
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(subscriptionGroupConfig);
+ persist = true;
+ }
+ return persist;
+ }
+
+ @Override
+ public void persistOffset(RemotingChannel remotingChannel, String
enodeName, String groupName, String topic,
+ int queueId, long offset) {
+ try {
+// UpdateConsumerOffsetRequestHeader requestHeader = new
UpdateConsumerOffsetRequestHeader();
+// requestHeader.setConsumerGroup(groupName);
+// requestHeader.setTopic(topic);
+// requestHeader.setQueueId(queueId);
+// requestHeader.setCommitOffset(offset);
+// RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET,
requestHeader);
+//
this.brokerController.getConsumerManageProcessor().processRequest(remotingChannel,
request);
+
+
this.brokerController.getConsumerOffsetManager().commitOffset(remotingChannel.remoteAddress().toString(),
groupName,
+ topic, queueId, offset);
+ } catch (Exception ex) {
+ log.error("[Local]Persist offset to Enode error group: [{}],
topic: [{}] queue: [{}]!", ex, groupName, topic, queueId);
+ }
+ }
+
+ @Override
+ public long queryOffset(String enodeName, String consumerGroup, String
topic, int queueId) {
+ return
this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup,
topic, queueId);
+ }
+
+ @Override
+ public long getMaxOffsetInQueue(String enodeName, String topic, int
queueId, RemotingCommand request) {
+ return
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
+ }
+
+ @Override
+ public long getMinOffsetInQueue(String enodeName, String topic, int
queueId, RemotingCommand request) {
+ return
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
+ }
+
+ @Override
+ public long getOffsetByTimestamp(String enodeName,
+ String topic, int queueId, long timestamp, RemotingCommand request) {
+ return
this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, queueId,
timestamp);
+ }
+}
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
index 72f658e..9f4b016 100644
---
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
+++
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
@@ -95,7 +95,7 @@ public class PushServiceImpl implements PushService {
messageExt.setFlag(sendMessageRequestHeader.getFlag());
messageExt.setBody(message);
messageExt.setBodyCRC(UtilAll.crc32(message));
- log.debug("MessageExt:{}", messageExt);
+ log.info("MessageExt:{}", messageExt);
return messageExt;
}
@@ -103,7 +103,6 @@ public class PushServiceImpl implements PushService {
public void run() {
if (!canceled.get()) {
try {
- log.debug("sendMessageResponse: {}", sendMessageResponse);
SendMessageResponseHeader sendMessageResponseHeader =
(SendMessageResponseHeader)
sendMessageResponse.decodeCommandCustomHeader(SendMessageResponseHeader.class);
log.debug("sendMessageResponseHeader: {}",
sendMessageResponseHeader);
MessageQueue messageQueue = new
MessageQueue(sendMessageRequestHeader.getTopic(),
sendMessageRequestHeader.getEnodeName(), sendMessageRequestHeader.getQueueId());
@@ -118,6 +117,7 @@ public class PushServiceImpl implements PushService {
MessageExt messageExt =
buildMessageExt(sendMessageResponseHeader, message, sendMessageRequestHeader);
pushMessage.setBody(MessageDecoder.encode(messageExt,
false));
for (RemotingChannel remotingChannel : consumerTable) {
+ log.info("Push message pushMessage:{} to
remotingChannel: {}", pushMessage, remotingChannel);
Client client = null;
if (remotingChannel instanceof NettyChannelImpl) {
Channel channel = ((NettyChannelImpl)
remotingChannel).getChannel();
diff --git
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
similarity index 77%
rename from
snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
rename to
snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
index 4d1e522..311406f 100644
---
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
+++
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
@@ -28,13 +28,17 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
import
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -46,7 +50,7 @@ import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.snode.service.EnodeService;
-public class EnodeServiceImpl implements EnodeService {
+public class RemoteEnodeServiceImpl implements EnodeService {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private SnodeController snodeController;
@@ -54,7 +58,7 @@ public class EnodeServiceImpl implements EnodeService {
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/*
brokerId */, String/* address */>> enodeTable =
new ConcurrentHashMap<>();
- public EnodeServiceImpl(SnodeController snodeController) {
+ public RemoteEnodeServiceImpl(SnodeController snodeController) {
this.snodeController = snodeController;
}
@@ -73,7 +77,8 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
- public CompletableFuture<RemotingCommand> pullMessage(final String
enodeName, final RemotingCommand request) {
+ public CompletableFuture<RemotingCommand> pullMessage(final
RemotingChannel remotingChannel, final String enodeName,
+ final RemotingCommand request) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
@@ -103,7 +108,8 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
- public CompletableFuture<RemotingCommand> sendMessage(String enodeName,
RemotingCommand request) {
+ public CompletableFuture<RemotingCommand> sendMessage(final
RemotingChannel remotingChannel, String enodeName,
+ RemotingCommand request) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
String enodeAddress =
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
@@ -152,7 +158,8 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
- public boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig
subscriptionGroupConfig) {
+ public boolean persistSubscriptionGroupConfig(
+ SubscriptionGroupConfig subscriptionGroupConfig) {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP,
null);
boolean persist = false;
for (Map.Entry<String, HashMap<Long, String>> entry :
enodeTable.entrySet()) {
@@ -177,7 +184,8 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
- public void persistOffset(String enodeName, String groupName, String
topic, int queueId, long offset) {
+ public void persistOffset(final RemotingChannel remotingChannel, String
enodeName, String groupName, String topic,
+ int queueId, long offset) {
try {
String address =
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
UpdateConsumerOffsetRequestHeader requestHeader = new
UpdateConsumerOffsetRequestHeader();
@@ -194,22 +202,22 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
- public RemotingCommand getMinOffsetInQueue(String enodeName, String topic,
- int queueId) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException {
- GetMinOffsetRequestHeader requestHeader = new
GetMinOffsetRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setQueueId(queueId);
- RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
+ public long getMinOffsetInQueue(String enodeName, String topic,
+ int queueId, RemotingCommand request) throws InterruptedException,
RemotingTimeoutException,
+ RemotingSendRequestException, RemotingConnectException,
RemotingCommandException {
String addr =
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
- return
this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(),
addr),
+ RemotingCommand remotingCommand =
this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(),
addr),
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
+ GetMinOffsetResponseHeader responseHeader =
+ (GetMinOffsetResponseHeader)
remotingCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
+ return responseHeader.getOffset();
+
}
@Override
- public RemotingCommand loadOffset(String enodeName, String consumerGroup,
String topic,
+ public long queryOffset(String enodeName, String consumerGroup, String
topic,
int queueId) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException {
+ RemotingSendRequestException, RemotingConnectException,
RemotingCommandException {
QueryConsumerOffsetRequestHeader requestHeader = new
QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setConsumerGroup(consumerGroup);
@@ -217,28 +225,39 @@ public class EnodeServiceImpl implements EnodeService {
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET,
requestHeader);
String addr =
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
- return
this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(this.snodeController.getSnodeConfig().isVipChannelEnabled(),
addr),
+ RemotingCommand remotingCommand =
this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(this.snodeController.getSnodeConfig().isVipChannelEnabled(),
addr),
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
+ QueryConsumerOffsetResponseHeader responseHeader =
+ (QueryConsumerOffsetResponseHeader)
remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+ return responseHeader.getOffset();
}
@Override
- public RemotingCommand getMaxOffsetInQueue(String enodeName,
+ public long getMaxOffsetInQueue(String enodeName, String topic,
+ int queueId,
RemotingCommand request) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, RemotingCommandException {
String address =
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
- return this.snodeController.getRemotingClient().invokeSync(address,
+ RemotingCommand remotingCommand =
this.snodeController.getRemotingClient().invokeSync(address,
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
+ GetMaxOffsetResponseHeader responseHeader =
+ (GetMaxOffsetResponseHeader)
remotingCommand.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
+ return responseHeader.getOffset();
}
@Override
- public RemotingCommand getOffsetByTimestamp(String enodeName,
- RemotingCommand request) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException {
+ public long getOffsetByTimestamp(String enodeName, String topic, int
queueId,
+ long timestamp,
+ RemotingCommand request) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, RemotingCommandException {
String address =
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
- return this.snodeController.getRemotingClient().invokeSync(address,
+ RemotingCommand remotingCommand =
this.snodeController.getRemotingClient().invokeSync(address,
request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
+ SearchOffsetResponseHeader responseHeader =
+ (SearchOffsetResponseHeader)
remotingCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
+ return responseHeader.getOffset();
}
@Override
- public RemotingCommand creatRetryTopic(String enodeName,
+ public RemotingCommand creatRetryTopic(final RemotingChannel
remotingChannel, String enodeName,
RemotingCommand request) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException {
String address =
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
return this.snodeController.getRemotingClient().invokeSync(address,
diff --git
a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
index e6e56a8..a97f169 100644
---
a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
+++
b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
@@ -74,7 +74,7 @@ public class SendMessageProcessorTest {
public void testSendMessageV2ProcessRequest() throws
RemotingCommandException {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
RemotingCommand request = createSendMesssageV2Command();
- when(this.snodeController.getEnodeService().sendMessage(anyString(),
any(RemotingCommand.class))).thenReturn(future);
+ when(this.snodeController.getEnodeService().sendMessage(null,
anyString(), any(RemotingCommand.class))).thenReturn(future);
sendMessageProcessor.processRequest(remotingChannel, request);
}
@@ -83,7 +83,7 @@ public class SendMessageProcessorTest {
snodeController.setEnodeService(enodeService);
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
RemotingCommand request = createSendBatchMesssageCommand();
- when(this.snodeController.getEnodeService().sendMessage(anyString(),
any(RemotingCommand.class))).thenReturn(future);
+ when(this.snodeController.getEnodeService().sendMessage(null,
anyString(), any(RemotingCommand.class))).thenReturn(future);
sendMessageProcessor.processRequest(remotingChannel, request);
}
diff --git
a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
b/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java
similarity index 93%
rename from
snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
rename to
snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java
index c4075e1..582c16e 100644
---
a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
+++
b/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java
@@ -30,7 +30,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase;
-import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
+import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.junit.Before;
@@ -51,7 +51,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
-public class EnodeServiceImplTest extends SnodeTestBase {
+public class RemoteEnodeServiceImplTest extends SnodeTestBase {
private EnodeService enodeService;
@@ -74,7 +74,7 @@ public class EnodeServiceImplTest extends SnodeTestBase {
public void init() {
snodeController.setNnodeService(nnodeService);
snodeController.setRemotingClient(remotingClient);
- enodeService = new EnodeServiceImpl(snodeController);
+ enodeService = new RemoteEnodeServiceImpl(snodeController);
}
@Test
@@ -93,7 +93,7 @@ public class EnodeServiceImplTest extends SnodeTestBase {
return null;
}
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
- RemotingCommand response = enodeService.sendMessage(enodeName,
createSendMesssageCommand(group, topic)).get(3000L, TimeUnit.MILLISECONDS);
+ RemotingCommand response = enodeService.sendMessage(null, enodeName,
createSendMesssageCommand(group, topic)).get(3000L, TimeUnit.MILLISECONDS);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@@ -118,7 +118,7 @@ public class EnodeServiceImplTest extends SnodeTestBase {
return null;
}
}).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
- RemotingCommand response = enodeService.pullMessage(enodeName,
createPullMessage()).get(3000L, TimeUnit.MILLISECONDS);
+ RemotingCommand response = enodeService.pullMessage(null, enodeName,
createPullMessage()).get(3000L, TimeUnit.MILLISECONDS);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}