This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/snode by this push:
     new 14a75e5  Add embedded start mode(single service) for snode and enode
14a75e5 is described below

commit 14a75e5cccdcceecd86c7aa88d5a2d92248dba57
Author: duhenglucky <[email protected]>
AuthorDate: Fri Feb 22 01:57:39 2019 +0800

    Add embedded start mode(single service) for snode and enode
---
 .../apache/rocketmq/broker/BrokerController.java   |  63 +++++++---
 .../org/apache/rocketmq/broker/BrokerStartup.java  |  23 ++--
 .../processor/AbstractSendMessageProcessor.java    |  25 ++--
 .../broker/processor/ClientManageProcessor.java    |   8 +-
 .../broker/processor/SendMessageProcessor.java     |  47 ++++---
 .../org/apache/rocketmq/common/SnodeConfig.java    |  32 +++++
 snode/pom.xml                                      |   7 +-
 .../org/apache/rocketmq/snode/SnodeController.java |  17 +--
 .../org/apache/rocketmq/snode/SnodeStartup.java    |  52 +++++---
 .../snode/client/SubscriptionGroupManager.java     |  34 +----
 .../rocketmq/snode/client/SubscriptionManager.java |   5 -
 .../snode/client/impl/SubscriptionManagerImpl.java |  15 +--
 .../snode/offset/ConsumerOffsetManager.java        |  64 +++++-----
 .../snode/processor/ConsumerManageProcessor.java   |  45 +++++--
 .../snode/processor/PullMessageProcessor.java      |   6 +-
 .../snode/processor/SendMessageProcessor.java      |   6 +-
 .../mqtthandler/MqttPingreqMessageHandler.java     |   4 +-
 .../rocketmq/snode/service/AdminService.java       |   4 +-
 .../rocketmq/snode/service/EnodeService.java       |  33 +++--
 .../snode/service/impl/ClientServiceImpl.java      |   1 +
 .../snode/service/impl/LocalEnodeServiceImpl.java  | 138 +++++++++++++++++++++
 .../snode/service/impl/PushServiceImpl.java        |   4 +-
 ...erviceImpl.java => RemoteEnodeServiceImpl.java} |  67 ++++++----
 .../snode/processor/SendMessageProcessorTest.java  |   4 +-
 ...plTest.java => RemoteEnodeServiceImplTest.java} |  10 +-
 25 files changed, 481 insertions(+), 233 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index c898366..d9a618b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -121,6 +121,7 @@ public class BrokerController {
     private final ClientHousekeepingService clientHousekeepingService;
     private final PullMessageProcessor pullMessageProcessor;
     private final SnodePullMessageProcessor snodePullMessageProcessor;
+    private final SendMessageProcessor sendProcessor;
     private final PullRequestHoldService pullRequestHoldService;
     private final MessageArrivingListener messageArrivingListener;
     private final Broker2Client broker2Client;
@@ -164,6 +165,9 @@ public class BrokerController {
     private TransactionalMessageService transactionalMessageService;
     private AbstractTransactionalMessageCheckListener 
transactionalMessageCheckListener;
     private Future<?> slaveSyncFuture;
+    private ClientManageProcessor clientManageProcessor;
+    private AdminBrokerProcessor adminProcessor;
+    private ConsumerManageProcessor consumerManageProcessor;
 
     public BrokerController(
         final BrokerConfig brokerConfig,
@@ -178,6 +182,7 @@ public class BrokerController {
         this.consumerOffsetManager = new ConsumerOffsetManager(this);
         this.topicConfigManager = new TopicConfigManager(this);
         this.pullMessageProcessor = new PullMessageProcessor(this);
+        this.sendProcessor = new SendMessageProcessor(this);
         this.pullRequestHoldService = new PullRequestHoldService(this);
         this.messageArrivingListener = new 
NotifyMessageArrivingListener(this.pullRequestHoldService);
         this.consumerIdsChangeListener = new 
DefaultConsumerIdsChangeListener(this);
@@ -543,9 +548,8 @@ public class BrokerController {
         /**
          * SendMessageProcessor
          */
-        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
-        sendProcessor.registerSendMessageHook(sendMessageHookList);
-        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
+        this.sendProcessor.registerSendMessageHook(sendMessageHookList);
+        this.sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
 
         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, 
sendProcessor, this.sendMessageExecutor);
         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, 
sendProcessor, this.sendMessageExecutor);
@@ -575,28 +579,28 @@ public class BrokerController {
         /**
          * ClientManageProcessor
          */
-        ClientManageProcessor clientProcessor = new 
ClientManageProcessor(this);
-        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, 
clientProcessor, this.heartbeatExecutor);
-        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, 
clientProcessor, this.clientManageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, 
clientProcessor, this.clientManageExecutor);
-        this.remotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, 
clientProcessor, this.clientManageExecutor);
+        this.clientManageProcessor = new ClientManageProcessor(this);
+        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, 
clientManageProcessor, this.heartbeatExecutor);
+        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, 
clientManageProcessor, this.clientManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, 
clientManageProcessor, this.clientManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, 
clientManageProcessor, this.clientManageExecutor);
 
-        this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, 
clientProcessor, this.heartbeatExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, 
clientProcessor, this.clientManageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, 
clientProcessor, this.clientManageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, 
clientProcessor, this.clientManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, 
clientManageProcessor, this.heartbeatExecutor);
+        
this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, 
clientManageProcessor, this.clientManageExecutor);
+        
this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, 
clientManageProcessor, this.clientManageExecutor);
+        
this.fastRemotingServer.registerProcessor(RequestCode.CREATE_RETRY_TOPIC, 
clientManageProcessor, this.clientManageExecutor);
 
         /**
          * ConsumerManageProcessor
          */
-        ConsumerManageProcessor consumerManageProcessor = new 
ConsumerManageProcessor(this);
-        
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, 
consumerManageProcessor, this.consumerManageExecutor);
-        
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, 
consumerManageProcessor, this.consumerManageExecutor);
-        
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, 
consumerManageProcessor, this.consumerManageExecutor);
+        this.consumerManageProcessor = new ConsumerManageProcessor(this);
+        
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, 
this.consumerManageProcessor, this.consumerManageExecutor);
+        
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, 
this.consumerManageProcessor, this.consumerManageExecutor);
+        
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, 
this.consumerManageProcessor, this.consumerManageExecutor);
 
-        
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,
 consumerManageProcessor, this.consumerManageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, 
consumerManageProcessor, this.consumerManageExecutor);
-        
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, 
consumerManageProcessor, this.consumerManageExecutor);
+        
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,
 this.consumerManageProcessor, this.consumerManageExecutor);
+        
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, 
this.consumerManageProcessor, this.consumerManageExecutor);
+        
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, 
this.consumerManageProcessor, this.consumerManageExecutor);
 
         /**
          * EndTransactionProcessor
@@ -607,7 +611,7 @@ public class BrokerController {
         /**
          * Default
          */
-        AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
+        this.adminProcessor = new AdminBrokerProcessor(this);
         this.remotingServer.registerDefaultProcessor(adminProcessor, 
this.adminBrokerExecutor);
         this.fastRemotingServer.registerDefaultProcessor(adminProcessor, 
this.adminBrokerExecutor);
     }
@@ -1220,4 +1224,23 @@ public class BrokerController {
         }
     }
 
+    public SendMessageProcessor getSendProcessor() {
+        return sendProcessor;
+    }
+
+    public ClientManageProcessor getClientManageProcessor() {
+        return clientManageProcessor;
+    }
+
+    public AdminBrokerProcessor getAdminProcessor() {
+        return adminProcessor;
+    }
+
+    public void setAdminProcessor(AdminBrokerProcessor adminProcessor) {
+        this.adminProcessor = adminProcessor;
+    }
+
+    public ConsumerManageProcessor getConsumerManageProcessor() {
+        return consumerManageProcessor;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 17d2f0e..97a655c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -18,6 +18,11 @@ package org.apache.rocketmq.broker;
 
 import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.joran.JoranConfigurator;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -28,10 +33,10 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
-import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.ClientConfig;
 import org.apache.rocketmq.remoting.ServerConfig;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.common.TlsMode;
 import org.apache.rocketmq.remoting.netty.NettySystemConfig;
 import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -40,12 +45,6 @@ import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
 
 public class BrokerStartup {
@@ -53,6 +52,7 @@ public class BrokerStartup {
     public static CommandLine commandLine = null;
     public static String configFile = null;
     public static InternalLogger log;
+    private static BrokerController brokerController = null;
 
     public static void main(String[] args) {
         start(createBrokerController(args));
@@ -238,7 +238,7 @@ public class BrokerStartup {
                     }
                 }
             }, "ShutdownHook"));
-
+            brokerController = controller;
             return controller;
         } catch (Throwable e) {
             e.printStackTrace();
@@ -260,7 +260,6 @@ public class BrokerStartup {
 
     private static Options buildCommandlineOptions(final Options options) {
 
-
         Option opt = new Option("c", "configFile", true, "Broker config 
properties file");
         opt.setRequired(false);
         options.addOption(opt);
@@ -275,4 +274,8 @@ public class BrokerStartup {
 
         return options;
     }
+
+    public static BrokerController getBrokerController() {
+        return brokerController;
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index bd7625a..2e04726 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -17,6 +17,11 @@
 package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.ChannelHandlerContext;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
 import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
@@ -27,8 +32,6 @@ import org.apache.rocketmq.common.constant.DBMsgConstants;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
@@ -40,18 +43,14 @@ import 
org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.apache.rocketmq.common.utils.ChannelUtil;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RequestProcessor;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.RequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
 public abstract class AbstractSendMessageProcessor implements RequestProcessor 
{
     protected static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -159,7 +158,7 @@ public abstract class AbstractSendMessageProcessor 
implements RequestProcessor {
         return response;
     }
 
-    protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
+    protected RemotingCommand msgCheck(final String remoteAddress,
         final SendMessageRequestHeader requestHeader, final RemotingCommand 
response) {
         if 
(!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
             && 
this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic()))
 {
@@ -188,11 +187,11 @@ public abstract class AbstractSendMessageProcessor 
implements RequestProcessor {
                 }
             }
 
-            log.warn("the topic {} not exist, producer: {}", 
requestHeader.getTopic(), ctx.channel().remoteAddress());
+            log.warn("the topic {} not exist, producer: {}", 
requestHeader.getTopic(), remoteAddress);
             topicConfig = 
this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
                 requestHeader.getTopic(),
                 requestHeader.getDefaultTopic(),
-                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                remoteAddress,
                 requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
 
             if (null == topicConfig) {
@@ -218,7 +217,7 @@ public abstract class AbstractSendMessageProcessor 
implements RequestProcessor {
             String errorInfo = String.format("request queueId[%d] is illegal, 
%s Producer: %s",
                 queueIdInt,
                 topicConfig.toString(),
-                RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                remoteAddress);
 
             log.warn(errorInfo);
             response.setCode(ResponseCode.SYSTEM_ERROR);
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index e7a1e73..e5a95e9 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -207,9 +207,8 @@ public class ClientManageProcessor implements 
RequestProcessor {
     private RemotingCommand createRetryTopic(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         RemotingCommand response = RemotingCommand.createResponseCommand(null);
-        final CreateRetryTopicRequestHeader requestHeader =
-            (CreateRetryTopicRequestHeader) request
-                
.decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class);
+        final CreateRetryTopicRequestHeader requestHeader = 
(CreateRetryTopicRequestHeader) request
+            .decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class);
         if (requestHeader.getGroupName() != null) {
             SubscriptionGroupConfig subscriptionGroupConfig =
                 
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroupName());
@@ -217,7 +216,8 @@ public class ClientManageProcessor implements 
RequestProcessor {
                 createRetryTopic(false, requestHeader.getGroupName(), 
subscriptionGroupConfig.getRetryQueueNums());
             }
         }
-
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
         return response;
     }
 
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 3557a17..1408cce 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -71,7 +72,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         SendMessageContext mqtraceContext;
         switch (request.getCode()) {
             case RequestCode.CONSUMER_SEND_MSG_BACK:
-                return this.consumerSendMsgBack(ctx, request);
+                return this.consumerSendMsgBack(request);
             default:
                 SendMessageRequestHeader requestHeader = 
parseRequestHeader(request);
                 if (requestHeader == null) {
@@ -82,10 +83,13 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
                 this.executeSendMessageHookBefore(ctx, request, 
mqtraceContext);
 
                 RemotingCommand response;
+//                String remoteAddress = 
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+                SocketAddress bornHost = ctx.channel().remoteAddress();
+
                 if (requestHeader.isBatch()) {
-                    response = this.sendBatchMessage(ctx, request, 
mqtraceContext, requestHeader);
+                    response = this.sendBatchMessage(bornHost, request, 
mqtraceContext, requestHeader);
                 } else {
-                    response = this.sendMessage(ctx, request, mqtraceContext, 
requestHeader);
+                    response = this.sendMessage(bornHost, request, 
mqtraceContext, requestHeader);
                 }
 
                 this.executeSendMessageHookAfter(response, mqtraceContext);
@@ -99,7 +103,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
             
this.brokerController.getMessageStore().isTransientStorePoolDeficient();
     }
 
-    private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext 
ctx, final RemotingCommand request)
+    private RemotingCommand consumerSendMsgBack(final RemotingCommand request)
         throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
         final ConsumerSendMsgBackRequestHeader requestHeader =
@@ -296,7 +300,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         return true;
     }
 
-    private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
+    private RemotingCommand sendMessage(final SocketAddress remoteAddress,
         final RemotingCommand request,
         final SendMessageContext sendMessageContext,
         final SendMessageRequestHeader requestHeader) throws 
RemotingCommandException {
@@ -319,7 +323,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         }
 
         response.setCode(-1);
-        super.msgCheck(ctx, requestHeader, response);
+        super.msgCheck(RemotingHelper.parseChannelRemoteAddr(remoteAddress), 
requestHeader, response);
         if (response.getCode() != -1) {
             return response;
         }
@@ -346,9 +350,12 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         MessageAccessor.setProperties(msgInner, 
MessageDecoder.string2messageProperties(requestHeader.getProperties()));
         msgInner.setPropertiesString(requestHeader.getProperties());
         msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
-        msgInner.setBornHost(ctx.channel().remoteAddress());
+        msgInner.setBornHost(requestHeader.getBornHost());
         msgInner.setStoreHost(this.getStoreHost());
         msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 
0 : requestHeader.getReconsumeTimes());
+        msgInner.setBornHost(remoteAddress);
+//        ByteBuffer hostHolder = ByteBuffer.allocate(8);
+//        String bornHost = msgInner.getStoreHostBytes(hostHolder).toString();
         PutMessageResult putMessageResult = null;
         Map<String, String> oriProps = 
MessageDecoder.string2messageProperties(requestHeader.getProperties());
         String traFlag = 
oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
@@ -365,14 +372,14 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
             putMessageResult = 
this.brokerController.getMessageStore().putMessage(msgInner);
         }
 
-        return handlePutMessageResult(putMessageResult, response, request, 
msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
+        return handlePutMessageResult(putMessageResult, response, request, 
msgInner, responseHeader, sendMessageContext, queueIdInt, remoteAddress);
 
     }
 
     private RemotingCommand handlePutMessageResult(PutMessageResult 
putMessageResult, RemotingCommand response,
         RemotingCommand request, MessageExt msg,
-        SendMessageResponseHeader responseHeader, SendMessageContext 
sendMessageContext, ChannelHandlerContext ctx,
-        int queueIdInt) {
+        SendMessageResponseHeader responseHeader, SendMessageContext 
sendMessageContext,
+        int queueIdInt, SocketAddress bornHost) {
         if (putMessageResult == null) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark("store putMessage return null");
@@ -445,8 +452,8 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
             
responseHeader.setCommitLogOffset(putMessageResult.getAppendMessageResult().getWroteOffset());
             
responseHeader.setStoreTimestamp(putMessageResult.getAppendMessageResult().getStoreTimestamp());
             
responseHeader.setStoreSize(putMessageResult.getAppendMessageResult().getWroteBytes());
-            
responseHeader.setStoreHost(ctx.channel().localAddress().toString());
-            doResponse(ctx, request, response);
+            
responseHeader.setStoreHost(RemotingHelper.parseChannelRemoteAddr(bornHost));
+//            doResponse(ctx, request, response);
 
             if (hasSendMessageHook()) {
                 sendMessageContext.setMsgId(responseHeader.getMsgId());
@@ -462,6 +469,10 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
                 sendMessageContext.setCommercialSendSize(wroteSize);
                 sendMessageContext.setCommercialOwner(owner);
             }
+            if (!request.isOnewayRPC()) {
+                response.setCustomHeader(responseHeader);
+                return response;
+            }
             return null;
         } else {
             if (hasSendMessageHook()) {
@@ -477,7 +488,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         return response;
     }
 
-    private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
+    private RemotingCommand sendBatchMessage(final SocketAddress remoteAddress,
         final RemotingCommand request,
         final SendMessageContext sendMessageContext,
         final SendMessageRequestHeader requestHeader) throws 
RemotingCommandException {
@@ -500,7 +511,7 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         }
 
         response.setCode(-1);
-        super.msgCheck(ctx, requestHeader, response);
+        super.msgCheck(RemotingHelper.parseChannelRemoteAddr(remoteAddress), 
requestHeader, response);
         if (response.getCode() != -1) {
             return response;
         }
@@ -537,13 +548,17 @@ public class SendMessageProcessor extends 
AbstractSendMessageProcessor implement
         MessageAccessor.setProperties(messageExtBatch, 
MessageDecoder.string2messageProperties(requestHeader.getProperties()));
         messageExtBatch.setBody(request.getBody());
         messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp());
-        messageExtBatch.setBornHost(ctx.channel().remoteAddress());
+        messageExtBatch.setBornHost(requestHeader.getBornHost());
         messageExtBatch.setStoreHost(this.getStoreHost());
+
         messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == 
null ? 0 : requestHeader.getReconsumeTimes());
 
+//        ByteBuffer hostHolder = ByteBuffer.allocate(8);
+//        String storeHost = 
messageExtBatch.getStoreHostBytes(hostHolder).toString();
+
         PutMessageResult putMessageResult = 
this.brokerController.getMessageStore().putMessages(messageExtBatch);
 
-        return handlePutMessageResult(putMessageResult, response, request, 
messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);
+        return handlePutMessageResult(putMessageResult, response, request, 
messageExtBatch, responseHeader, sendMessageContext, queueIdInt, storeHost);
     }
 
     public boolean hasConsumeMessageHook() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
index abe1a57..dd38075 100644
--- a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
@@ -22,6 +22,8 @@ import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.ClientConfig;
+import org.apache.rocketmq.remoting.ServerConfig;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 
 public class SnodeConfig {
@@ -32,6 +34,10 @@ public class SnodeConfig {
 
     private String rocketmqHome = 
System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, 
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
 
+    private ServerConfig nettyServerConfig;
+
+    private ClientConfig nettyClientConfig;
+
     @ImportantField
     private String namesrvAddr = 
System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, 
System.getenv(MixAll.NAMESRV_ADDR_ENV));
 
@@ -106,6 +112,9 @@ public class SnodeConfig {
     @ImportantField
     private boolean aclEnable = false;
 
+    @ImportantField
+    private boolean embeddedModeEnable = true;
+
     public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) {
         this.snodeHeartBeatInterval = snodeHeartBeatInterval;
     }
@@ -410,4 +419,27 @@ public class SnodeConfig {
         this.loadOffsetInterval = loadOffsetInterval;
     }
 
+    public boolean isEmbeddedModeEnable() {
+        return embeddedModeEnable;
+    }
+
+    public void setEmbeddedModeEnable(boolean embeddedModeEnable) {
+        this.embeddedModeEnable = embeddedModeEnable;
+    }
+
+    public ServerConfig getNettyServerConfig() {
+        return nettyServerConfig;
+    }
+
+    public void setNettyServerConfig(ServerConfig nettyServerConfig) {
+        this.nettyServerConfig = nettyServerConfig;
+    }
+
+    public ClientConfig getNettyClientConfig() {
+        return nettyClientConfig;
+    }
+
+    public void setNettyClientConfig(ClientConfig nettyClientConfig) {
+        this.nettyClientConfig = nettyClientConfig;
+    }
 }
diff --git a/snode/pom.xml b/snode/pom.xml
index bb28f64..da1e3f3 100644
--- a/snode/pom.xml
+++ b/snode/pom.xml
@@ -15,7 +15,8 @@
   limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
         <groupId>org.apache.rocketmq</groupId>
         <artifactId>rocketmq-all</artifactId>
@@ -92,6 +93,10 @@
             <groupId>io.prometheus</groupId>
             <artifactId>simpleclient_hotspot</artifactId>
         </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-broker</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java 
b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index 6523d8b..5ca8a19 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.acl.AccessValidator;
+import org.apache.rocketmq.broker.BrokerStartup;
 import org.apache.rocketmq.common.SnodeConfig;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -69,10 +70,11 @@ import org.apache.rocketmq.snode.service.PushService;
 import org.apache.rocketmq.snode.service.ScheduledService;
 import org.apache.rocketmq.snode.service.WillMessageService;
 import org.apache.rocketmq.snode.service.impl.ClientServiceImpl;
-import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
+import org.apache.rocketmq.snode.service.impl.LocalEnodeServiceImpl;
 import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl;
 import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
 import org.apache.rocketmq.snode.service.impl.PushServiceImpl;
+import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl;
 import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
 import org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl;
 
@@ -127,7 +129,11 @@ public class SnodeController {
         this.nettyClientConfig = nettyClientConfig;
         this.nettyServerConfig = nettyServerConfig;
         this.snodeConfig = snodeConfig;
-        this.enodeService = new EnodeServiceImpl(this);
+        if (!this.snodeConfig.isEmbeddedModeEnable()) {
+            this.enodeService = new RemoteEnodeServiceImpl(this);
+        } else {
+            this.enodeService = new 
LocalEnodeServiceImpl(BrokerStartup.getBrokerController());
+        }
         this.nnodeService = new NnodeServiceImpl(this);
         this.scheduledService = new ScheduledServiceImpl(this);
         this.remotingClient = 
RemotingClientFactory.getInstance().createRemotingClient()
@@ -163,7 +169,6 @@ public class SnodeController {
             "SnodeHeartbeatThread",
             true);
 
-
         this.consumerManageExecutor = ThreadUtils.newThreadPoolExecutor(
             snodeConfig.getSnodeSendMessageMinPoolSize(),
             snodeConfig.getSnodeSendMessageMaxPoolSize(),
@@ -228,10 +233,8 @@ public class SnodeController {
     }
 
     public boolean initialize() {
-        this.snodeServer = 
RemotingServerFactory.getInstance().createRemotingServer()
-            .init(this.nettyServerConfig, this.clientHousekeepingService);
-        this.mqttRemotingServer = 
RemotingServerFactory.getInstance().createRemotingServer(
-            RemotingUtil.MQTT_PROTOCOL)
+        this.snodeServer = 
RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig,
 this.clientHousekeepingService);
+        this.mqttRemotingServer = 
RemotingServerFactory.getInstance().createRemotingServer(RemotingUtil.MQTT_PROTOCOL)
             .init(this.nettyServerConfig, this.clientHousekeepingService);
         this.registerProcessor();
         initSnodeInterceptorGroup();
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java 
b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
index 232def9..23ea68a 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
@@ -16,8 +16,6 @@
  */
 package org.apache.rocketmq.snode;
 
-import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
-
 import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.joran.JoranConfigurator;
 import ch.qos.logback.core.joran.spi.JoranException;
@@ -31,6 +29,7 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.broker.BrokerStartup;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.SnodeConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -44,6 +43,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.rocketmq.remoting.netty.TlsSystemConfig.TLS_ENABLE;
+
 public class SnodeStartup {
     private static InternalLogger log;
     public static Properties properties = null;
@@ -51,13 +52,17 @@ public class SnodeStartup {
     public static String configFile = null;
 
     public static void main(String[] args) throws IOException, JoranException {
-        startup(createSnodeController(args));
+        SnodeConfig snodeConfig = loadConfig(args);
+        if (snodeConfig.isEmbeddedModeEnable()) {
+            BrokerStartup.start(BrokerStartup.createBrokerController(args));
+        }
+        SnodeController snodeController = createSnodeController(snodeConfig);
+        startup(snodeController);
     }
 
     public static SnodeController startup(SnodeController controller) {
         try {
             controller.start();
-
             String tip = "The snode[" + 
controller.getSnodeConfig().getSnodeName() + ", "
                 + controller.getSnodeConfig().getSnodeIP1() + "] boot success. 
serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
 
@@ -74,7 +79,7 @@ public class SnodeStartup {
         return null;
     }
 
-    public static SnodeController createSnodeController(String[] args) throws 
IOException, JoranException {
+    public static SnodeConfig loadConfig(String[] args) throws IOException {
         Options options = ServerUtil.buildCommandlineOptions(new Options());
         commandLine = ServerUtil.parseCmdLine("snode", args, 
buildCommandlineOptions(options),
             new PosixParser());
@@ -82,12 +87,11 @@ public class SnodeStartup {
             System.exit(-1);
         }
 
-        final SnodeConfig snodeConfig = new SnodeConfig();
+        SnodeConfig snodeConfig = new SnodeConfig();
         final ServerConfig nettyServerConfig = new ServerConfig();
         final ClientConfig nettyClientConfig = new ClientConfig();
 
         nettyServerConfig.setListenPort(snodeConfig.getListenPort());
-        nettyServerConfig.setListenPort(11911);
         
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
             String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
 
@@ -101,27 +105,28 @@ public class SnodeStartup {
                 MixAll.properties2Object(properties, snodeConfig);
                 MixAll.properties2Object(properties, nettyServerConfig);
                 MixAll.properties2Object(properties, nettyClientConfig);
+
                 in.close();
             }
         }
+        snodeConfig.setNettyServerConfig(nettyServerConfig);
+        snodeConfig.setNettyClientConfig(nettyClientConfig);
         if (null == snodeConfig.getRocketmqHome()) {
             System.out.printf("Please set the %s variable in your environment 
to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
             System.exit(-2);
         }
 
-        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
-        JoranConfigurator configurator = new JoranConfigurator();
-        configurator.setContext(lc);
-        lc.reset();
-        configurator.doConfigure(snodeConfig.getRocketmqHome() + 
"/conf/logback_snode.xml");
-        log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
-
         MixAll.printObjectProperties(log, snodeConfig);
-        MixAll.printObjectProperties(log, nettyClientConfig);
-        MixAll.printObjectProperties(log, nettyServerConfig);
+        MixAll.printObjectProperties(log, snodeConfig.getNettyServerConfig());
+        MixAll.printObjectProperties(log, snodeConfig.getNettyClientConfig());
+        return snodeConfig;
+    }
+
+    public static SnodeController createSnodeController(SnodeConfig 
snodeConfig) throws JoranException {
+
         final SnodeController snodeController = new SnodeController(
-            nettyServerConfig,
-            nettyClientConfig,
+            snodeConfig.getNettyServerConfig(),
+            snodeConfig.getNettyClientConfig(),
             snodeConfig);
 
         boolean initResult = snodeController.initialize();
@@ -148,7 +153,14 @@ public class SnodeStartup {
                     }
                 }
             }
-        },"ShutdownHook"));
+        }, "ShutdownHook"));
+        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+        JoranConfigurator configurator = new JoranConfigurator();
+        configurator.setContext(lc);
+        lc.reset();
+        configurator.doConfigure(snodeConfig.getRocketmqHome() + 
"/conf/logback_snode.xml");
+        log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
         return snodeController;
     }
 
@@ -164,7 +176,7 @@ public class SnodeStartup {
         opt = new Option("m", "printImportantConfig", false, "Print important 
config item");
         opt.setRequired(false);
         options.addOption(opt);
-        
+
         return options;
     }
 }
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
 
b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
index ab013b8..47f10c9 100644
--- 
a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
+++ 
b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
@@ -42,19 +42,6 @@ public class SubscriptionGroupManager {
     private void init() {
     }
 
-    public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig 
config) {
-        SubscriptionGroupConfig old = 
this.subscriptionGroupTable.put(config.getGroupName(), config);
-        if (old != null) {
-            log.info("Update subscription group config, old: {} new: {}", old, 
config);
-        } else {
-            log.info("Create new subscription group, {}", config);
-        }
-
-        this.dataVersion.nextVersion();
-
-        this.persistSubscription(config);
-    }
-
     public void disableConsume(final String groupName) {
         SubscriptionGroupConfig old = 
this.subscriptionGroupTable.get(groupName);
         if (old != null) {
@@ -63,7 +50,8 @@ public class SubscriptionGroupManager {
         }
     }
 
-    public SubscriptionGroupConfig findSubscriptionGroupConfig(final String 
group) {
+    public SubscriptionGroupConfig findSubscriptionGroupConfig(
+        final String group) {
         SubscriptionGroupConfig subscriptionGroupConfig = 
this.subscriptionGroupTable.get(group);
         if (null == subscriptionGroupConfig) {
             if 
(snodeController.getSnodeConfig().isAutoCreateSubscriptionGroup() || 
MixAll.isSysConsumerGroup(group)) {
@@ -74,15 +62,13 @@ public class SubscriptionGroupManager {
                     log.info("Auto create a subscription group, {}", 
subscriptionGroupConfig.toString());
                 }
                 this.dataVersion.nextVersion();
-                this.persistSubscription(subscriptionGroupConfig);
+                
this.snodeController.getEnodeService().persistSubscriptionGroupConfig(subscriptionGroupConfig);
             }
         }
 
         return subscriptionGroupConfig;
     }
 
-
-
     public ConcurrentMap<String, SubscriptionGroupConfig> 
getSubscriptionGroupTable() {
         return subscriptionGroupTable;
     }
@@ -91,18 +77,4 @@ public class SubscriptionGroupManager {
         return dataVersion;
     }
 
-    public void deleteSubscriptionGroupConfig(final String groupName) {
-        SubscriptionGroupConfig old = 
this.subscriptionGroupTable.remove(groupName);
-        if (old != null) {
-            log.info("delete subscription group OK, subscription group:{}", 
old);
-            this.dataVersion.nextVersion();
-            this.persistSubscription(old);
-        } else {
-            log.warn("delete subscription group failed, subscription 
groupName: {} not exist", groupName);
-        }
-    }
-
-    void persistSubscription(SubscriptionGroupConfig config) {
-        
this.snodeController.getEnodeService().persistSubscriptionGroupConfig(config);
-    }
 }
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java 
b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
index a164fc4..a648ba8 100644
--- 
a/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
+++ 
b/snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
@@ -29,11 +29,6 @@ public interface SubscriptionManager {
     boolean subscribe(String groupId, Set<SubscriptionData> 
subscriptionDataSet, ConsumeType consumeType,
         MessageModel messageModel, ConsumeFromWhere consumeFromWhere);
 
-    void unSubscribe(String groupId, RemotingChannel remotingChannel,
-        Set<SubscriptionData> subscriptionDataSet);
-
-    void cleanSubscription(String groupId, String topic);
-
     Subscription getSubscription(String groupId);
 
     void registerPushSession(Set<SubscriptionData> subscriptionDataSet, 
RemotingChannel remotingChannel,
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
 
b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
index 1f5ecd8..4a4a35e 100644
--- 
a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+++ 
b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
@@ -44,7 +44,7 @@ public class SubscriptionManagerImpl implements 
SubscriptionManager {
     @Override
     public void registerPushSession(Set<SubscriptionData> subscriptionDataSet, 
RemotingChannel remotingChannel,
         String groupId) {
-        log.debug("Before ConsumerGroup: {} RemotingChannel: {} subscription: 
{}", groupId, remotingChannel.remoteAddress(), subscriptionDataSet);
+        log.info("Before ConsumerGroup: {} RemotingChannel: {} subscription: 
{}", groupId, remotingChannel.remoteAddress(), subscriptionDataSet);
         Set<MessageQueue> prevSubSet = 
this.clientSubscriptionTable.get(remotingChannel);
         Set<MessageQueue> keySet = new HashSet<>();
         for (SubscriptionData subscriptionData : subscriptionDataSet) {
@@ -77,7 +77,7 @@ public class SubscriptionManagerImpl implements 
SubscriptionManager {
                 }
             }
         }
-        log.debug("After ConsumerGroup: {} RemotingChannel: {} subscription: 
{}", groupId, remotingChannel.remoteAddress(), 
this.clientSubscriptionTable.get(remotingChannel));
+        log.info("After ConsumerGroup: {} RemotingChannel: {} subscription: 
{}", groupId, remotingChannel.remoteAddress(), 
this.clientSubscriptionTable.get(remotingChannel));
     }
 
     @Override
@@ -192,17 +192,6 @@ public class SubscriptionManagerImpl implements 
SubscriptionManager {
     }
 
     @Override
-    public void unSubscribe(String groupId, RemotingChannel remotingChannel,
-        Set<SubscriptionData> subscriptionDataSet) {
-
-    }
-
-    @Override
-    public void cleanSubscription(String groupId, String topic) {
-
-    }
-
-    @Override
     public Subscription getSubscription(String groupId) {
         return groupSubscriptionTable.get(groupId);
     }
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
 
b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
index d9b23b4..ba2a800 100644
--- 
a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
+++ 
b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
@@ -20,13 +20,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.ResponseCode;
-import 
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.snode.SnodeController;
 import org.apache.rocketmq.snode.exception.SnodeException;
 
@@ -79,17 +75,18 @@ public class ConsumerOffsetManager {
         }
     }
 
-    private long parserOffset(final String enodeName, final String group, 
final String topic, final int queueId) {
-        try {
-            RemotingCommand remotingCommand = queryOffset(enodeName, group, 
topic, queueId);
-            QueryConsumerOffsetResponseHeader responseHeader =
-                (QueryConsumerOffsetResponseHeader) 
remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
-            return responseHeader.getOffset();
-        } catch (Exception ex) {
-            log.error("Load offset from broker error", ex);
-        }
-        return -1;
-    }
+//    private long parserOffset(final RemotingChannel remotingChannel, final 
String enodeName, final String group,
+//        final String topic, final int queueId) {
+//        try {
+//            RemotingCommand remotingCommand = queryOffset(remotingChannel, 
enodeName, group, topic, queueId);
+//            QueryConsumerOffsetResponseHeader responseHeader =
+//                (QueryConsumerOffsetResponseHeader) 
remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+//            return responseHeader.getOffset();
+//        } catch (Exception ex) {
+//            log.error("Load offset from broker error", ex);
+//        }
+//        return -1;
+//    }
 
     public long queryCacheOffset(final String enodeName, final String group, 
final String topic, final int queueId) {
         String key = buildKey(enodeName, topic, group);
@@ -99,30 +96,33 @@ public class ConsumerOffsetManager {
             map = this.offsetTable.putIfAbsent(key, map);
         }
         CacheOffset cacheOffset = map.get(queueId);
-        if (cacheOffset != null) {
-            if (System.currentTimeMillis() - cacheOffset.getUpdateTimestamp() 
> snodeController.getSnodeConfig().getLoadOffsetInterval()) {
-                cacheOffset.setOffset(parserOffset(enodeName, group, topic, 
queueId));
-                cacheOffset.setUpdateTimestamp(System.currentTimeMillis());
+        try {
+            if (cacheOffset != null) {
+                if 
(!this.snodeController.getSnodeConfig().isEmbeddedModeEnable() && 
System.currentTimeMillis() - cacheOffset.getUpdateTimestamp() > 
snodeController.getSnodeConfig().getLoadOffsetInterval()) {
+                    long offset = 
this.snodeController.getEnodeService().queryOffset(enodeName, group, topic, 
queueId);
+                    cacheOffset.setOffset(offset);
+                    cacheOffset.setUpdateTimestamp(System.currentTimeMillis());
+                } else {
+                    long offset = 
this.snodeController.getEnodeService().queryOffset(enodeName, group, topic, 
queueId);
+                    cacheOffset.setOffset(offset);
+                }
+            } else {
+                long offset = 
this.snodeController.getEnodeService().queryOffset(enodeName, group, topic, 
queueId);
+                cacheOffset = new CacheOffset(key, offset, 
System.currentTimeMillis());
+                map.put(queueId, cacheOffset);
             }
-        } else {
-            cacheOffset = new CacheOffset(key, parserOffset(enodeName, group, 
topic, queueId), System.currentTimeMillis());
-            map.put(queueId, cacheOffset);
+        } catch (Exception ex) {
+            log.warn("Load offset error, enodeName: {}, group:{},topic:{} 
queueId:{}", enodeName, group, topic, queueId);
         }
         return cacheOffset.getOffset();
-
     }
 
-    public void commitOffset(final String enodeName, final String clientHost, 
final String group, final String topic,
+    public void commitOffset(final RemotingChannel remotingChannel, final 
String enodeName, final String clientHost,
+        final String group, final String topic,
         final int queueId,
         final long offset) {
         cacheOffset(enodeName, clientHost, group, topic, queueId, offset);
-        this.snodeController.getEnodeService().persistOffset(enodeName, group, 
topic, queueId, offset);
-    }
-
-    public RemotingCommand queryOffset(final String enodeName, final String 
group, final String topic,
-        final int queueId) throws InterruptedException, 
RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException {
-        return this.snodeController.getEnodeService().loadOffset(enodeName, 
group, topic, queueId);
+        this.snodeController.getEnodeService().persistOffset(remotingChannel, 
enodeName, group, topic, queueId, offset);
     }
 
     public class CacheOffset {
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
 
b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
index a000589..1953de1 100644
--- 
a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
+++ 
b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -25,9 +25,13 @@ import 
org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestH
 import 
org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
 import 
org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -87,7 +91,14 @@ public class ConsumerManageProcessor implements 
RequestProcessor {
             (SearchOffsetRequestHeader) request
                 .decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
         try {
-            return 
this.snodeController.getEnodeService().getOffsetByTimestamp(requestHeader.getEnodeName(),
 request);
+            final RemotingCommand response = 
RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
+            final SearchOffsetResponseHeader responseHeader = 
(SearchOffsetResponseHeader) response.readCustomHeader();
+
+            long offset = 
this.snodeController.getEnodeService().getOffsetByTimestamp(requestHeader.getEnodeName(),
+                requestHeader.getTopic(), requestHeader.getQueueId(), 
requestHeader.getTimestamp(), request);
+            responseHeader.setOffset(offset);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
         } catch (Exception ex) {
             log.error("Search offset by timestamp error:{}", ex);
         }
@@ -100,7 +111,14 @@ public class ConsumerManageProcessor implements 
RequestProcessor {
             (GetMinOffsetRequestHeader) request
                 .decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
         try {
-            return 
this.snodeController.getEnodeService().getMinOffsetInQueue(requestHeader.getEnodeName(),
 requestHeader.getTopic(), requestHeader.getQueueId());
+            final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
+            final GetMinOffsetResponseHeader responseHeader = 
(GetMinOffsetResponseHeader) response.readCustomHeader();
+
+            long offset = 
this.snodeController.getEnodeService().getMinOffsetInQueue(requestHeader.getEnodeName(),
+                requestHeader.getTopic(), requestHeader.getQueueId(), request);
+            responseHeader.setOffset(offset);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
         } catch (Exception ex) {
             log.error("Get min offset error:{}", ex);
         }
@@ -113,9 +131,16 @@ public class ConsumerManageProcessor implements 
RequestProcessor {
             (GetMaxOffsetRequestHeader) request
                 .decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
         try {
-            return 
this.snodeController.getEnodeService().getMaxOffsetInQueue(requestHeader.getEnodeName(),
 request);
+            long offset = 
this.snodeController.getEnodeService().getMaxOffsetInQueue(requestHeader.getEnodeName(),
+                requestHeader.getTopic(), requestHeader.getQueueId(), request);
+            final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
+            final GetMaxOffsetResponseHeader responseHeader = 
(GetMaxOffsetResponseHeader) response.readCustomHeader();
+            responseHeader.setOffset(offset);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
         } catch (Exception ex) {
-            log.error("Get min offset error:{}", ex);
+            log.error("Get min offset error, remoting: {} error: {} ", 
remotingChannel.remoteAddress(), ex);
         }
         return null;
     }
@@ -153,7 +178,7 @@ public class ConsumerManageProcessor implements 
RequestProcessor {
         final UpdateConsumerOffsetRequestHeader requestHeader =
             (UpdateConsumerOffsetRequestHeader) request
                 
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
-        
this.snodeController.getConsumerOffsetManager().commitOffset(requestHeader.getEnodeName(),
 RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()), 
requestHeader.getConsumerGroup(),
+        
this.snodeController.getConsumerOffsetManager().commitOffset(remotingChannel, 
requestHeader.getEnodeName(), 
RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()), 
requestHeader.getConsumerGroup(),
             requestHeader.getTopic(), requestHeader.getQueueId(), 
requestHeader.getCommitOffset());
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
@@ -171,9 +196,15 @@ public class ConsumerManageProcessor implements 
RequestProcessor {
             requestHeader.getConsumerGroup(),
             requestHeader.getTopic(),
             requestHeader.getQueueId());
-        return 
this.snodeController.getConsumerOffsetManager().queryOffset(requestHeader.getEnodeName(),
 requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+        long offset = 
this.snodeController.getEnodeService().queryOffset(requestHeader.getEnodeName(),
 requestHeader.getConsumerGroup(), requestHeader.getTopic(),
             requestHeader.getQueueId());
 
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
+        final QueryConsumerOffsetResponseHeader responseHeader = 
(QueryConsumerOffsetResponseHeader) response.readCustomHeader();
+        responseHeader.setOffset(offset);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+        return response;
     }
 
     public RemotingCommand createRetryTopic(RemotingChannel remotingChannel,
@@ -181,7 +212,7 @@ public class ConsumerManageProcessor implements 
RequestProcessor {
         RemotingSendRequestException, RemotingConnectException, 
RemotingCommandException {
         final CreateRetryTopicRequestHeader requestHeader = 
(CreateRetryTopicRequestHeader) 
request.decodeCommandCustomHeader(CreateRetryTopicRequestHeader.class);
         requestHeader.getEnodeName();
-        return 
this.snodeController.getEnodeService().creatRetryTopic(requestHeader.getEnodeName(),
 request);
+        return 
this.snodeController.getEnodeService().creatRetryTopic(remotingChannel, 
requestHeader.getEnodeName(), request);
     }
 }
 
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
 
b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
index fc13f5e..95982bc 100644
--- 
a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ 
b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -127,14 +127,16 @@ public class PullMessageProcessor implements 
RequestProcessor {
             }
         }
 
-        CompletableFuture<RemotingCommand> responseFuture = 
snodeController.getEnodeService().pullMessage(requestHeader.getEnodeName(), 
request);
+        CompletableFuture<RemotingCommand> responseFuture = 
snodeController.getEnodeService().pullMessage(remotingChannel, 
requestHeader.getEnodeName(), request);
         responseFuture.whenComplete((data, ex) -> {
             if (ex == null) {
                 if (this.snodeController.getConsumeMessageInterceptorGroup() 
!= null) {
                     ResponseContext responseContext = new 
ResponseContext(request, remotingChannel, data);
                     
this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
                 }
-                remotingChannel.reply(data);
+                if (data != null) {
+                    remotingChannel.reply(data);
+                }
             } else {
                 if (this.snodeController.getConsumeMessageInterceptorGroup() 
!= null) {
                     ExceptionContext exceptionContext = new 
ExceptionContext(request, remotingChannel, ex, null);
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
 
b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index c3fd2fe..019794e 100644
--- 
a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ 
b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -83,11 +83,11 @@ public class SendMessageProcessor implements 
RequestProcessor {
             
stringBuffer.append(MixAll.getRetryTopic(consumerSendMsgBackRequestHeader.getGroup()));
         }
 
-        CompletableFuture<RemotingCommand> responseFuture = 
snodeController.getEnodeService().sendMessage(enodeName, request);
+        CompletableFuture<RemotingCommand> responseFuture = 
snodeController.getEnodeService().sendMessage(remotingChannel, enodeName, 
request);
 
         sendMessageRequestHeaderV2.setO(remotingChannel.remoteAddress());
         final byte[] message = request.getBody();
-        final boolean isNeedPush = !isSendBack;
+        final boolean needPush = !isSendBack;
         final SendMessageRequestHeader sendMessageRequestHeader =
             
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(sendMessageRequestHeaderV2);
         responseFuture.whenComplete((data, ex) -> {
@@ -98,7 +98,7 @@ public class SendMessageProcessor implements RequestProcessor 
{
                 }
                 remotingChannel.reply(data);
                 
this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(),
 request.getBody().length);
-                if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
+                if (data.getCode() == ResponseCode.SUCCESS && needPush) {
                     
this.snodeController.getPushService().pushMessage(sendMessageRequestHeader, 
message, data);
                 }
             } else {
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java
 
b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java
index 11f8f18..1f75fcc 100644
--- 
a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java
+++ 
b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java
@@ -29,6 +29,7 @@ public class MqttPingreqMessageHandler implements 
MessageHandler {
     public MqttPingreqMessageHandler(SnodeController snodeController) {
         this.snodeController = snodeController;
     }
+
     /**
      * handle the PINGREQ message from client
      * <ol>
@@ -41,7 +42,8 @@ public class MqttPingreqMessageHandler implements 
MessageHandler {
      * @param message
      * @return
      */
-    @Override public RemotingCommand handleMessage(MqttMessage message, 
RemotingChannel remotingChannel) {
+    @Override
+    public RemotingCommand handleMessage(MqttMessage message, RemotingChannel 
remotingChannel) {
         return null;
     }
 }
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java 
b/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java
index 560233a..774a2a6 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.snode.service;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +14,7 @@ package org.apache.rocketmq.snode.service;/*
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
+package org.apache.rocketmq.snode.service;
 public interface AdminService {
 
 }
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java 
b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
index a4fa28c..e2db16e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
@@ -21,6 +21,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -42,7 +43,8 @@ public interface EnodeService {
      * @param request {@link SendMessageRequestHeaderV2} Send message request 
header
      * @return Send message response future
      */
-    CompletableFuture<RemotingCommand> sendMessage(final String enodeName, 
final RemotingCommand request);
+    CompletableFuture<RemotingCommand> sendMessage(final RemotingChannel 
remotingChannel, final String enodeName,
+        final RemotingCommand request);
 
     /**
      * Pull message from enode server.
@@ -51,7 +53,8 @@ public interface EnodeService {
      * @param request {@link PullMessageRequestHeader} Pull message request 
header
      * @return Pull message Response future
      */
-    CompletableFuture<RemotingCommand> pullMessage(final String enodeName, 
final RemotingCommand request);
+    CompletableFuture<RemotingCommand> pullMessage(final RemotingChannel 
remotingChannel, final String enodeName,
+        final RemotingCommand request);
 
     /**
      * Create retry topic in enode server.
@@ -64,7 +67,7 @@ public interface EnodeService {
      * @throws RemotingSendRequestException
      * @throws RemotingConnectException
      */
-    RemotingCommand creatRetryTopic(String enodeName,
+    RemotingCommand creatRetryTopic(final RemotingChannel remotingChannel, 
String enodeName,
         RemotingCommand request) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException;
 
     /**
@@ -98,20 +101,24 @@ public interface EnodeService {
      * @param queueId QueueId of related topic.
      * @param offset Current offset of target queue of subscribed topic.
      */
-    void persistOffset(String enodeName, String groupName, String topic, int 
queueId, long offset);
+    void persistOffset(final RemotingChannel remotingChannel, String 
enodeName, String groupName, String topic,
+        int queueId, long offset);
 
-    RemotingCommand loadOffset(String enodeName, String consumerGroup, String 
topic,
-        int queueId) throws InterruptedException, RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException;
+    long queryOffset(String enodeName, String consumerGroup,
+        String topic, int queueId) throws InterruptedException, 
RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, 
RemotingCommandException;
 
-    RemotingCommand getMaxOffsetInQueue(String enodeName,
-        RemotingCommand request) throws InterruptedException, 
RemotingTimeoutException,
+    long getMaxOffsetInQueue(String enodeName, String topic,
+        int queueId, RemotingCommand request) throws InterruptedException, 
RemotingTimeoutException,
         RemotingSendRequestException, RemotingConnectException, 
RemotingCommandException;
 
-    RemotingCommand getMinOffsetInQueue(String enodeName, String topic,
-        int queueId) throws InterruptedException, RemotingTimeoutException,
+    long getMinOffsetInQueue(String enodeName, String topic,
+        int queueId, RemotingCommand request) throws InterruptedException, 
RemotingTimeoutException,
         RemotingSendRequestException, RemotingConnectException, 
RemotingCommandException;
 
-    RemotingCommand getOffsetByTimestamp(String enodeName,
-        RemotingCommand request) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException;
+    long getOffsetByTimestamp(String enodeName,
+        String topic, int queueId,
+        long timestamp,
+        RemotingCommand request) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException,
+        RemotingConnectException, RemotingCommandException;
 }
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
 
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
index f170ae1..aaed690 100644
--- 
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
+++ 
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
@@ -16,6 +16,7 @@
  */
 
 package org.apache.rocketmq.snode.service.impl;
+
 import java.util.List;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.RequestCode;
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
 
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
new file mode 100644
index 0000000..5789104
--- /dev/null
+++ 
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.snode.service.impl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.netty.CodecHelper;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.snode.service.EnodeService;
+
+public class LocalEnodeServiceImpl implements EnodeService {
+
+    private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
+
+    private BrokerController brokerController;
+
+    public LocalEnodeServiceImpl(BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+    @Override public void sendHeartbeat(RemotingCommand remotingCommand) {
+        return;
+    }
+
+    @Override
+    public CompletableFuture<RemotingCommand> sendMessage(final 
RemotingChannel remotingChannel, String enodeName,
+        RemotingCommand request) {
+        CompletableFuture<RemotingCommand> completableFuture = new 
CompletableFuture<>();
+        try {
+            log.debug("Send message request:{}", request);
+            RemotingCommand remotingCommand = 
this.brokerController.getSendProcessor().processRequest(remotingChannel, 
request);
+            CodecHelper.encodeHeader(remotingCommand);
+            completableFuture.complete(remotingCommand);
+        } catch (Exception ex) {
+            log.error("[Local]Request local enode send message error", ex);
+            completableFuture.completeExceptionally(ex);
+        }
+        return completableFuture;
+    }
+
+    @Override
+    public CompletableFuture<RemotingCommand> pullMessage(RemotingChannel 
remotingChannel, String enodeName,
+        RemotingCommand request) {
+        CompletableFuture<RemotingCommand> completableFuture = new 
CompletableFuture<>();
+        try {
+            RemotingCommand response = 
this.brokerController.getSnodePullMessageProcessor().processRequest(remotingChannel,
 request);
+            completableFuture.complete(response);
+        } catch (Exception ex) {
+            log.error("[Local]Request local enode pull message error", ex);
+            completableFuture.completeExceptionally(ex);
+        }
+        return completableFuture;
+    }
+
+    @Override public RemotingCommand creatRetryTopic(RemotingChannel 
remotingChannel, String enodeName,
+        RemotingCommand request) {
+        try {
+            return 
this.brokerController.getClientManageProcessor().processRequest(remotingChannel,
 request);
+        } catch (Exception ex) {
+            log.error("[Local]Request create retry topic error", ex);
+        }
+        return null;
+    }
+
+    @Override public void updateEnodeAddress(
+        String clusterName) {
+
+    }
+
+    @Override public boolean persistSubscriptionGroupConfig(
+        final SubscriptionGroupConfig subscriptionGroupConfig) {
+        boolean persist = false;
+        if (subscriptionGroupConfig != null) {
+            
this.brokerController.getSubscriptionGroupManager().updateSubscriptionGroupConfig(subscriptionGroupConfig);
+            persist = true;
+        }
+        return persist;
+    }
+
+    @Override
+    public void persistOffset(RemotingChannel remotingChannel, String 
enodeName, String groupName, String topic,
+        int queueId, long offset) {
+        try {
+//            UpdateConsumerOffsetRequestHeader requestHeader = new 
UpdateConsumerOffsetRequestHeader();
+//            requestHeader.setConsumerGroup(groupName);
+//            requestHeader.setTopic(topic);
+//            requestHeader.setQueueId(queueId);
+//            requestHeader.setCommitOffset(offset);
+//            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, 
requestHeader);
+//            
this.brokerController.getConsumerManageProcessor().processRequest(remotingChannel,
 request);
+
+            
this.brokerController.getConsumerOffsetManager().commitOffset(remotingChannel.remoteAddress().toString(),
 groupName,
+                topic, queueId, offset);
+        } catch (Exception ex) {
+            log.error("[Local]Persist offset to Enode error group: [{}], 
topic: [{}] queue: [{}]!", ex, groupName, topic, queueId);
+        }
+    }
+
+    @Override
+    public long queryOffset(String enodeName, String consumerGroup, String 
topic, int queueId) {
+        return 
this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup, 
topic, queueId);
+    }
+
+    @Override
+    public long getMaxOffsetInQueue(String enodeName, String topic, int 
queueId, RemotingCommand request) {
+        return 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
+    }
+
+    @Override
+    public long getMinOffsetInQueue(String enodeName, String topic, int 
queueId, RemotingCommand request) {
+        return 
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
+    }
+
+    @Override
+    public long getOffsetByTimestamp(String enodeName,
+        String topic, int queueId, long timestamp, RemotingCommand request) {
+        return 
this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, queueId, 
timestamp);
+    }
+}
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
 
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
index 72f658e..9f4b016 100644
--- 
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
+++ 
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
@@ -95,7 +95,7 @@ public class PushServiceImpl implements PushService {
             messageExt.setFlag(sendMessageRequestHeader.getFlag());
             messageExt.setBody(message);
             messageExt.setBodyCRC(UtilAll.crc32(message));
-            log.debug("MessageExt:{}", messageExt);
+            log.info("MessageExt:{}", messageExt);
             return messageExt;
         }
 
@@ -103,7 +103,6 @@ public class PushServiceImpl implements PushService {
         public void run() {
             if (!canceled.get()) {
                 try {
-                    log.debug("sendMessageResponse: {}", sendMessageResponse);
                     SendMessageResponseHeader sendMessageResponseHeader = 
(SendMessageResponseHeader) 
sendMessageResponse.decodeCommandCustomHeader(SendMessageResponseHeader.class);
                     log.debug("sendMessageResponseHeader: {}", 
sendMessageResponseHeader);
                     MessageQueue messageQueue = new 
MessageQueue(sendMessageRequestHeader.getTopic(), 
sendMessageRequestHeader.getEnodeName(), sendMessageRequestHeader.getQueueId());
@@ -118,6 +117,7 @@ public class PushServiceImpl implements PushService {
                         MessageExt messageExt = 
buildMessageExt(sendMessageResponseHeader, message, sendMessageRequestHeader);
                         pushMessage.setBody(MessageDecoder.encode(messageExt, 
false));
                         for (RemotingChannel remotingChannel : consumerTable) {
+                            log.info("Push message pushMessage:{} to 
remotingChannel: {}", pushMessage, remotingChannel);
                             Client client = null;
                             if (remotingChannel instanceof NettyChannelImpl) {
                                 Channel channel = ((NettyChannelImpl) 
remotingChannel).getChannel();
diff --git 
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
 
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
similarity index 77%
rename from 
snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
rename to 
snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
index 4d1e522..311406f 100644
--- 
a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
+++ 
b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
@@ -28,13 +28,17 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RemotingChannel;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -46,7 +50,7 @@ import org.apache.rocketmq.snode.SnodeController;
 import org.apache.rocketmq.snode.constant.SnodeConstant;
 import org.apache.rocketmq.snode.service.EnodeService;
 
-public class EnodeServiceImpl implements EnodeService {
+public class RemoteEnodeServiceImpl implements EnodeService {
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
 
     private SnodeController snodeController;
@@ -54,7 +58,7 @@ public class EnodeServiceImpl implements EnodeService {
     private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* 
brokerId */, String/* address */>> enodeTable =
         new ConcurrentHashMap<>();
 
-    public EnodeServiceImpl(SnodeController snodeController) {
+    public RemoteEnodeServiceImpl(SnodeController snodeController) {
         this.snodeController = snodeController;
     }
 
@@ -73,7 +77,8 @@ public class EnodeServiceImpl implements EnodeService {
     }
 
     @Override
-    public CompletableFuture<RemotingCommand> pullMessage(final String 
enodeName, final RemotingCommand request) {
+    public CompletableFuture<RemotingCommand> pullMessage(final 
RemotingChannel remotingChannel, final String enodeName,
+        final RemotingCommand request) {
 
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         try {
@@ -103,7 +108,8 @@ public class EnodeServiceImpl implements EnodeService {
     }
 
     @Override
-    public CompletableFuture<RemotingCommand> sendMessage(String enodeName, 
RemotingCommand request) {
+    public CompletableFuture<RemotingCommand> sendMessage(final 
RemotingChannel remotingChannel, String enodeName,
+        RemotingCommand request) {
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         try {
             String enodeAddress = 
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
@@ -152,7 +158,8 @@ public class EnodeServiceImpl implements EnodeService {
     }
 
     @Override
-    public boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig 
subscriptionGroupConfig) {
+    public boolean persistSubscriptionGroupConfig(
+        SubscriptionGroupConfig subscriptionGroupConfig) {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP,
 null);
         boolean persist = false;
         for (Map.Entry<String, HashMap<Long, String>> entry : 
enodeTable.entrySet()) {
@@ -177,7 +184,8 @@ public class EnodeServiceImpl implements EnodeService {
     }
 
     @Override
-    public void persistOffset(String enodeName, String groupName, String 
topic, int queueId, long offset) {
+    public void persistOffset(final RemotingChannel remotingChannel, String 
enodeName, String groupName, String topic,
+        int queueId, long offset) {
         try {
             String address = 
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
             UpdateConsumerOffsetRequestHeader requestHeader = new 
UpdateConsumerOffsetRequestHeader();
@@ -194,22 +202,22 @@ public class EnodeServiceImpl implements EnodeService {
     }
 
     @Override
-    public RemotingCommand getMinOffsetInQueue(String enodeName, String topic,
-        int queueId) throws InterruptedException, RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException {
-        GetMinOffsetRequestHeader requestHeader = new 
GetMinOffsetRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setQueueId(queueId);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
+    public long getMinOffsetInQueue(String enodeName, String topic,
+        int queueId, RemotingCommand request) throws InterruptedException, 
RemotingTimeoutException,
+        RemotingSendRequestException, RemotingConnectException, 
RemotingCommandException {
         String addr = 
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
-        return 
this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(),
 addr),
+        RemotingCommand remotingCommand = 
this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(snodeController.getSnodeConfig().isVipChannelEnabled(),
 addr),
             request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
+        GetMinOffsetResponseHeader responseHeader =
+            (GetMinOffsetResponseHeader) 
remotingCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
+        return responseHeader.getOffset();
+
     }
 
     @Override
-    public RemotingCommand loadOffset(String enodeName, String consumerGroup, 
String topic,
+    public long queryOffset(String enodeName, String consumerGroup, String 
topic,
         int queueId) throws InterruptedException, RemotingTimeoutException,
-        RemotingSendRequestException, RemotingConnectException {
+        RemotingSendRequestException, RemotingConnectException, 
RemotingCommandException {
         QueryConsumerOffsetRequestHeader requestHeader = new 
QueryConsumerOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setConsumerGroup(consumerGroup);
@@ -217,28 +225,39 @@ public class EnodeServiceImpl implements EnodeService {
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, 
requestHeader);
         String addr = 
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
-        return 
this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(this.snodeController.getSnodeConfig().isVipChannelEnabled(),
 addr),
+        RemotingCommand remotingCommand = 
this.snodeController.getRemotingClient().invokeSync(MixAll.brokerVIPChannel(this.snodeController.getSnodeConfig().isVipChannelEnabled(),
 addr),
             request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
+        QueryConsumerOffsetResponseHeader responseHeader =
+            (QueryConsumerOffsetResponseHeader) 
remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+        return responseHeader.getOffset();
     }
 
     @Override
-    public RemotingCommand getMaxOffsetInQueue(String enodeName,
+    public long getMaxOffsetInQueue(String enodeName, String topic,
+        int queueId,
         RemotingCommand request) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, RemotingCommandException {
         String address = 
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
-        return this.snodeController.getRemotingClient().invokeSync(address,
+        RemotingCommand remotingCommand = 
this.snodeController.getRemotingClient().invokeSync(address,
             request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
+        GetMaxOffsetResponseHeader responseHeader =
+            (GetMaxOffsetResponseHeader) 
remotingCommand.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
+        return responseHeader.getOffset();
     }
 
     @Override
-    public RemotingCommand getOffsetByTimestamp(String enodeName,
-        RemotingCommand request) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
+    public long getOffsetByTimestamp(String enodeName, String topic, int 
queueId,
+        long timestamp,
+        RemotingCommand request) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, RemotingCommandException {
         String address = 
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
-        return this.snodeController.getRemotingClient().invokeSync(address,
+        RemotingCommand remotingCommand = 
this.snodeController.getRemotingClient().invokeSync(address,
             request, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
+        SearchOffsetResponseHeader responseHeader =
+            (SearchOffsetResponseHeader) 
remotingCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
+        return responseHeader.getOffset();
     }
 
     @Override
-    public RemotingCommand creatRetryTopic(String enodeName,
+    public RemotingCommand creatRetryTopic(final RemotingChannel 
remotingChannel, String enodeName,
         RemotingCommand request) throws InterruptedException, 
RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException {
         String address = 
this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
         return this.snodeController.getRemotingClient().invokeSync(address,
diff --git 
a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
 
b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
index e6e56a8..a97f169 100644
--- 
a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
+++ 
b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
@@ -74,7 +74,7 @@ public class SendMessageProcessorTest {
     public void testSendMessageV2ProcessRequest() throws 
RemotingCommandException {
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         RemotingCommand request = createSendMesssageV2Command();
-        when(this.snodeController.getEnodeService().sendMessage(anyString(), 
any(RemotingCommand.class))).thenReturn(future);
+        when(this.snodeController.getEnodeService().sendMessage(null, 
anyString(), any(RemotingCommand.class))).thenReturn(future);
         sendMessageProcessor.processRequest(remotingChannel, request);
     }
 
@@ -83,7 +83,7 @@ public class SendMessageProcessorTest {
         snodeController.setEnodeService(enodeService);
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         RemotingCommand request = createSendBatchMesssageCommand();
-        when(this.snodeController.getEnodeService().sendMessage(anyString(), 
any(RemotingCommand.class))).thenReturn(future);
+        when(this.snodeController.getEnodeService().sendMessage(null, 
anyString(), any(RemotingCommand.class))).thenReturn(future);
         sendMessageProcessor.processRequest(remotingChannel, request);
     }
 
diff --git 
a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
 
b/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java
similarity index 93%
rename from 
snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
rename to 
snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java
index c4075e1..582c16e 100644
--- 
a/snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
+++ 
b/snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java
@@ -30,7 +30,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
 import org.apache.rocketmq.snode.SnodeController;
 import org.apache.rocketmq.snode.SnodeTestBase;
-import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
+import org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.GetMessageStatus;
 import org.junit.Before;
@@ -51,7 +51,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public class EnodeServiceImplTest extends SnodeTestBase {
+public class RemoteEnodeServiceImplTest extends SnodeTestBase {
 
     private EnodeService enodeService;
 
@@ -74,7 +74,7 @@ public class EnodeServiceImplTest extends SnodeTestBase {
     public void init() {
         snodeController.setNnodeService(nnodeService);
         snodeController.setRemotingClient(remotingClient);
-        enodeService = new EnodeServiceImpl(snodeController);
+        enodeService = new RemoteEnodeServiceImpl(snodeController);
     }
 
     @Test
@@ -93,7 +93,7 @@ public class EnodeServiceImplTest extends SnodeTestBase {
                 return null;
             }
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
-        RemotingCommand response = enodeService.sendMessage(enodeName, 
createSendMesssageCommand(group, topic)).get(3000L, TimeUnit.MILLISECONDS);
+        RemotingCommand response = enodeService.sendMessage(null, enodeName, 
createSendMesssageCommand(group, topic)).get(3000L, TimeUnit.MILLISECONDS);
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
@@ -118,7 +118,7 @@ public class EnodeServiceImplTest extends SnodeTestBase {
                 return null;
             }
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
-        RemotingCommand response = enodeService.pullMessage(enodeName, 
createPullMessage()).get(3000L, TimeUnit.MILLISECONDS);
+        RemotingCommand response = enodeService.pullMessage(null, enodeName, 
createPullMessage()).get(3000L, TimeUnit.MILLISECONDS);
         assertThat(response).isNotNull();
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }

Reply via email to