This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c6d3f2615b [ISSUE #8343] Add more test coverage for MQClientAPIImpl 
(#8344)
c6d3f2615b is described below

commit c6d3f2615bf7e2d63b1d6baf8a0d94b1cff05830
Author: yx9o <yangx_s...@163.com>
AuthorDate: Mon Jul 1 13:17:07 2024 +0800

    [ISSUE #8343] Add more test coverage for MQClientAPIImpl (#8344)
---
 .../rocketmq/client/impl/MQClientAPIImplTest.java  | 1849 +++++++++++++++-----
 1 file changed, 1416 insertions(+), 433 deletions(-)

diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index b0876c7c0d..e311e0c9b8 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -16,13 +16,6 @@
  */
 package org.apache.rocketmq.client.impl;
 
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.AckCallback;
 import org.apache.rocketmq.client.consumer.AckResult;
@@ -30,6 +23,9 @@ import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.client.consumer.PopCallback;
 import org.apache.rocketmq.client.consumer.PopResult;
 import org.apache.rocketmq.client.consumer.PopStatus;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.hook.SendMessageContext;
@@ -39,8 +35,10 @@ import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.PlainAccessConfig;
 import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
@@ -49,20 +47,65 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.message.MessageQueueAssignment;
 import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.apache.rocketmq.common.namesrv.TopAddressing;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.common.HeartbeatV2Result;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
+import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.AclInfo;
+import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
+import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.remoting.protocol.body.BrokerStatsItem;
+import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.Connection;
+import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.remoting.protocol.body.ConsumeQueueData;
+import org.apache.rocketmq.remoting.protocol.body.ConsumeStatsList;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
+import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
+import org.apache.rocketmq.remoting.protocol.body.GroupList;
+import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
+import org.apache.rocketmq.remoting.protocol.body.KVTable;
+import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
+import org.apache.rocketmq.remoting.protocol.body.ProducerConnection;
+import org.apache.rocketmq.remoting.protocol.body.ProducerInfo;
+import org.apache.rocketmq.remoting.protocol.body.ProducerTableInfo;
 import org.apache.rocketmq.remoting.protocol.body.QueryAssignmentResponseBody;
+import 
org.apache.rocketmq.remoting.protocol.body.QueryConsumeQueueResponseBody;
+import org.apache.rocketmq.remoting.protocol.body.QueryConsumeTimeSpanBody;
+import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
+import 
org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody;
+import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.remoting.protocol.body.TopicList;
+import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.remoting.protocol.body.UserInfo;
 import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
+import 
org.apache.rocketmq.remoting.protocol.header.GetBrokerAclConfigResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody;
 import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeResponseHeader;
@@ -70,15 +113,33 @@ import 
org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.PullMessageResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.namesrv.GetKVConfigResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.protocol.route.QueueData;
+import org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
+import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicConfigAndQueueMapping;
+import 
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
+import org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingInfo;
+import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.assertj.core.api.Assertions;
 import org.junit.Before;
@@ -86,34 +147,74 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mock;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class MQClientAPIImplTest {
+
     private MQClientAPIImpl mqClientAPI = new MQClientAPIImpl(new 
NettyClientConfig(), null, null, new ClientConfig());
+
     @Mock
     private RemotingClient remotingClient;
+
     @Mock
     private DefaultMQProducerImpl defaultMQProducerImpl;
 
-    private String brokerAddr = "127.0.0.1";
-    private String brokerName = "DefaultBroker";
-    private String clusterName = "DefaultCluster";
-    private static String group = "FooBarGroup";
-    private static String topic = "FooBar";
-    private Message msg = new Message("FooBar", new byte[] {});
-    private static String clientId = "127.0.0.2@UnitTest";
+    @Mock
+    private RemotingCommand response;
+
+    private final String brokerAddr = "127.0.0.1";
+
+    private final String brokerName = "DefaultBroker";
+
+    private final String clusterName = "DefaultCluster";
+
+    private final String group = "FooBarGroup";
+
+    private final String topic = "FooBar";
+
+    private final Message msg = new Message("FooBar", new byte[]{});
+
+    private final String clientId = "127.0.0.2@UnitTest";
+
+    private final String defaultTopic = "defaultTopic";
+
+    private final String defaultBrokerAddr = "127.0.0.1:10911";
+
+    private final String defaultNsAddr = "127.0.0.1:9876";
+
+    private final long defaultTimeout = 3000L;
 
     @Before
     public void init() throws Exception {
@@ -153,12 +254,9 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testSendMessageSync_Success() throws InterruptedException, 
RemotingException, MQBrokerException {
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock mock) throws Throwable {
-                RemotingCommand request = mock.getArgument(1);
-                return createSendMessageSuccessResponse(request);
-            }
+        doAnswer(mock -> {
+            RemotingCommand request = mock.getArgument(1);
+            return createSendMessageSuccessResponse(request);
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         SendMessageRequestHeader requestHeader = 
createSendMessageRequestHeader();
@@ -173,17 +271,14 @@ public class MQClientAPIImplTest {
     }
 
     @Test
-    public void testSendMessageSync_WithException() throws 
InterruptedException, RemotingException, MQBrokerException {
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock mock) throws Throwable {
-                RemotingCommand request = mock.getArgument(1);
-                RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
-                response.setCode(ResponseCode.SYSTEM_ERROR);
-                response.setOpaque(request.getOpaque());
-                response.setRemark("Broker is broken.");
-                return response;
-            }
+    public void testSendMessageSync_WithException() throws 
InterruptedException, RemotingException {
+        doAnswer(mock -> {
+            RemotingCommand request = mock.getArgument(1);
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setOpaque(request.getOpaque());
+            response.setRemark("Broker is broken.");
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         SendMessageRequestHeader requestHeader = 
createSendMessageRequestHeader();
@@ -204,16 +299,13 @@ public class MQClientAPIImplTest {
             3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), 
defaultMQProducerImpl);
         assertThat(sendResult).isNull();
 
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock mock) throws Throwable {
-                InvokeCallback callback = mock.getArgument(3);
-                RemotingCommand request = mock.getArgument(1);
-                ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
-                
responseFuture.setResponseCommand(createSendMessageSuccessResponse(request));
-                callback.operationSucceed(responseFuture.getResponseCommand());
-                return null;
-            }
+        doAnswer(mock -> {
+            InvokeCallback callback = mock.getArgument(3);
+            RemotingCommand request = mock.getArgument(1);
+            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
+            
responseFuture.setResponseCommand(createSendMessageSuccessResponse(request));
+            callback.operationSucceed(responseFuture.getResponseCommand());
+            return null;
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
         SendMessageContext sendMessageContext = new SendMessageContext();
         sendMessageContext.setProducer(new DefaultMQProducerImpl(new 
DefaultMQProducer()));
@@ -266,34 +358,26 @@ public class MQClientAPIImplTest {
     }
 
     @Test
-    public void testCreatePlainAccessConfig_Success() throws 
InterruptedException, RemotingException, MQBrokerException {
-
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock mock) throws Throwable {
-                RemotingCommand request = mock.getArgument(1);
-                return createSuccessResponse4UpdateAclConfig(request);
-            }
+    public void testCreatePlainAccessConfig_Success() throws 
InterruptedException, RemotingException {
+        doAnswer(mock -> {
+            RemotingCommand request = mock.getArgument(1);
+            return createSuccessResponse4UpdateAclConfig(request);
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         PlainAccessConfig config = createUpdateAclConfig();
 
         try {
             mqClientAPI.createPlainAccessConfig(brokerAddr, config, 3 * 1000);
-        } catch (MQClientException ex) {
+        } catch (MQClientException ignored) {
 
         }
     }
 
     @Test
-    public void testCreatePlainAccessConfig_Exception() throws 
InterruptedException, RemotingException, MQBrokerException {
-
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock mock) throws Throwable {
-                RemotingCommand request = mock.getArgument(1);
-                return createErrorResponse4UpdateAclConfig(request);
-            }
+    public void testCreatePlainAccessConfig_Exception() throws 
InterruptedException, RemotingException {
+        doAnswer(mock -> {
+            RemotingCommand request = mock.getArgument(1);
+            return createErrorResponse4UpdateAclConfig(request);
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         PlainAccessConfig config = createUpdateAclConfig();
@@ -306,33 +390,25 @@ public class MQClientAPIImplTest {
     }
 
     @Test
-    public void testDeleteAccessConfig_Success() throws InterruptedException, 
RemotingException, MQBrokerException {
-
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock mock) throws Throwable {
-                RemotingCommand request = mock.getArgument(1);
-                return createSuccessResponse4DeleteAclConfig(request);
-            }
+    public void testDeleteAccessConfig_Success() throws InterruptedException, 
RemotingException {
+        doAnswer(mock -> {
+            RemotingCommand request = mock.getArgument(1);
+            return createSuccessResponse4DeleteAclConfig(request);
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         String accessKey = "1234567";
         try {
             mqClientAPI.deleteAccessConfig(brokerAddr, accessKey, 3 * 1000);
-        } catch (MQClientException ex) {
+        } catch (MQClientException ignored) {
 
         }
     }
 
     @Test
-    public void testDeleteAccessConfig_Exception() throws 
InterruptedException, RemotingException, MQBrokerException {
-
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock mock) throws Throwable {
-                RemotingCommand request = mock.getArgument(1);
-                return createErrorResponse4DeleteAclConfig(request);
-            }
+    public void testDeleteAccessConfig_Exception() throws 
InterruptedException, RemotingException {
+        doAnswer(mock -> {
+            RemotingCommand request = mock.getArgument(1);
+            return createErrorResponse4DeleteAclConfig(request);
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         try {
@@ -344,17 +420,14 @@ public class MQClientAPIImplTest {
     }
 
     @Test
-    public void testResumeCheckHalfMessage_WithException() throws 
RemotingException, InterruptedException, MQBrokerException, MQClientException {
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock mock) throws Throwable {
-                RemotingCommand request = mock.getArgument(1);
-                RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
-                response.setCode(ResponseCode.SYSTEM_ERROR);
-                response.setOpaque(request.getOpaque());
-                response.setRemark("Put message back to 
RMQ_SYS_TRANS_HALF_TOPIC failed.");
-                return response;
-            }
+    public void testResumeCheckHalfMessage_WithException() throws 
RemotingException, InterruptedException {
+        doAnswer(mock -> {
+            RemotingCommand request = mock.getArgument(1);
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setOpaque(request.getOpaque());
+            response.setRemark("Put message back to RMQ_SYS_TRANS_HALF_TOPIC 
failed.");
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, 
"topic,", "test", 3000);
@@ -362,13 +435,10 @@ public class MQClientAPIImplTest {
     }
 
     @Test
-    public void testResumeCheckHalfMessage_Success() throws 
InterruptedException, RemotingException, MQBrokerException, MQClientException {
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock mock) throws Throwable {
-                RemotingCommand request = mock.getArgument(1);
-                return createResumeSuccessResponse(request);
-            }
+    public void testResumeCheckHalfMessage_Success() throws 
InterruptedException, RemotingException {
+        doAnswer(mock -> {
+            RemotingCommand request = mock.getArgument(1);
+            return createResumeSuccessResponse(request);
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         boolean result = mqClientAPI.resumeCheckHalfMessage(brokerAddr, 
"topic", "test", 3000);
@@ -378,16 +448,13 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testSendMessageTypeofReply() throws Exception {
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock mock) throws Throwable {
-                InvokeCallback callback = mock.getArgument(3);
-                RemotingCommand request = mock.getArgument(1);
-                ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
-                
responseFuture.setResponseCommand(createSendMessageSuccessResponse(request));
-                callback.operationSucceed(responseFuture.getResponseCommand());
-                return null;
-            }
+        doAnswer(mock -> {
+            InvokeCallback callback = mock.getArgument(3);
+            RemotingCommand request = mock.getArgument(1);
+            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
+            
responseFuture.setResponseCommand(createSendMessageSuccessResponse(request));
+            callback.operationSucceed(responseFuture.getResponseCommand());
+            return null;
         }).when(remotingClient).invokeAsync(ArgumentMatchers.anyString(), 
ArgumentMatchers.any(RemotingCommand.class), ArgumentMatchers.anyLong(), 
ArgumentMatchers.any(InvokeCallback.class));
         SendMessageContext sendMessageContext = new SendMessageContext();
         sendMessageContext.setProducer(new DefaultMQProducerImpl(new 
DefaultMQProducer()));
@@ -410,19 +477,16 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testQueryAssignment_Success() throws Exception {
-        doAnswer(new Answer<RemotingCommand>() {
-            @Override
-            public RemotingCommand answer(InvocationOnMock mock) {
-                RemotingCommand request = mock.getArgument(1);
-
-                RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-                QueryAssignmentResponseBody b = new 
QueryAssignmentResponseBody();
-                b.setMessageQueueAssignments(Collections.singleton(new 
MessageQueueAssignment()));
-                response.setBody(b.encode());
-                return response;
-            }
+        doAnswer((Answer<RemotingCommand>) mock -> {
+            RemotingCommand request = mock.getArgument(1);
+
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            QueryAssignmentResponseBody b = new QueryAssignmentResponseBody();
+            b.setMessageQueueAssignments(Collections.singleton(new 
MessageQueueAssignment()));
+            response.setBody(b.encode());
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
         Set<MessageQueueAssignment> assignments = 
mqClientAPI.queryAssignment(brokerAddr, topic, group, clientId, null, 
MessageModel.CLUSTERING, 10 * 1000);
         assertThat(assignments).size().isEqualTo(1);
@@ -432,48 +496,45 @@ public class MQClientAPIImplTest {
     public void testPopMessageAsync_Success() throws Exception {
         final long popTime = System.currentTimeMillis();
         final int invisibleTime = 10 * 1000;
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock mock) throws Throwable {
-                InvokeCallback callback = mock.getArgument(3);
-                RemotingCommand request = mock.getArgument(1);
-                ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
-                RemotingCommand response = 
RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-
-                PopMessageResponseHeader responseHeader = 
(PopMessageResponseHeader) response.readCustomHeader();
-                responseHeader.setInvisibleTime(invisibleTime);
-                responseHeader.setPopTime(popTime);
-                responseHeader.setReviveQid(0);
-                responseHeader.setRestNum(1);
-                StringBuilder startOffsetInfo = new StringBuilder(64);
-                ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 
0L);
-                responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
-                StringBuilder msgOffsetInfo = new StringBuilder(64);
-                ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, 
Collections.singletonList(0L));
-                responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
-                response.setRemark("FOUND");
-                response.makeCustomHeaderToNet();
-
-                MessageExt message = new MessageExt();
-                message.setQueueId(0);
-                message.setFlag(12);
-                message.setQueueOffset(0L);
-                message.setCommitLogOffset(100L);
-                message.setSysFlag(0);
-                message.setBornTimestamp(System.currentTimeMillis());
-                message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
-                message.setStoreTimestamp(System.currentTimeMillis());
-                message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
-                message.setBody("body".getBytes());
-                message.setTopic(topic);
-                message.putUserProperty("key", "value");
-                response.setBody(MessageDecoder.encode(message, false));
-                responseFuture.setResponseCommand(response);
-                callback.operationSucceed(responseFuture.getResponseCommand());
-                return null;
-            }
+        doAnswer((Answer<Void>) mock -> {
+            InvokeCallback callback = mock.getArgument(3);
+            RemotingCommand request = mock.getArgument(1);
+            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+
+            PopMessageResponseHeader responseHeader = 
(PopMessageResponseHeader) response.readCustomHeader();
+            responseHeader.setInvisibleTime(invisibleTime);
+            responseHeader.setPopTime(popTime);
+            responseHeader.setReviveQid(0);
+            responseHeader.setRestNum(1);
+            StringBuilder startOffsetInfo = new StringBuilder(64);
+            ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 0L);
+            responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
+            StringBuilder msgOffsetInfo = new StringBuilder(64);
+            ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, 
Collections.singletonList(0L));
+            responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
+            response.setRemark("FOUND");
+            response.makeCustomHeaderToNet();
+
+            MessageExt message = new MessageExt();
+            message.setQueueId(0);
+            message.setFlag(12);
+            message.setQueueOffset(0L);
+            message.setCommitLogOffset(100L);
+            message.setSysFlag(0);
+            message.setBornTimestamp(System.currentTimeMillis());
+            message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+            message.setStoreTimestamp(System.currentTimeMillis());
+            message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+            message.setBody("body".getBytes());
+            message.setTopic(topic);
+            message.putUserProperty("key", "value");
+            response.setBody(MessageDecoder.encode(message, false));
+            responseFuture.setResponseCommand(response);
+            callback.operationSucceed(responseFuture.getResponseCommand());
+            return null;
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
         final CountDownLatch done = new CountDownLatch(1);
         mqClientAPI.popMessageAsync(brokerName, brokerAddr, new 
PopMessageRequestHeader(), 10 * 1000, new PopCallback() {
@@ -501,50 +562,47 @@ public class MQClientAPIImplTest {
         final long popTime = System.currentTimeMillis();
         final int invisibleTime = 10 * 1000;
         final String lmqTopic = MixAll.LMQ_PREFIX + "lmq1";
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock mock) throws Throwable {
-                InvokeCallback callback = mock.getArgument(3);
-                RemotingCommand request = mock.getArgument(1);
-                ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
-                RemotingCommand response = 
RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-
-                PopMessageResponseHeader responseHeader = 
(PopMessageResponseHeader) response.readCustomHeader();
-                responseHeader.setInvisibleTime(invisibleTime);
-                responseHeader.setPopTime(popTime);
-                responseHeader.setReviveQid(0);
-                responseHeader.setRestNum(1);
-                StringBuilder startOffsetInfo = new StringBuilder(64);
-                ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 
0L);
-                responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
-                StringBuilder msgOffsetInfo = new StringBuilder(64);
-                ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, 
Collections.singletonList(0L));
-                responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
-                response.setRemark("FOUND");
-                response.makeCustomHeaderToNet();
-
-                MessageExt message = new MessageExt();
-                message.setQueueId(3);
-                message.setFlag(0);
-                message.setQueueOffset(5L);
-                message.setCommitLogOffset(11111L);
-                message.setSysFlag(0);
-                message.setBornTimestamp(System.currentTimeMillis());
-                message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
-                message.setStoreTimestamp(System.currentTimeMillis());
-                message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
-                message.setBody("body".getBytes());
-                message.setTopic(topic);
-                message.putUserProperty("key", "value");
-                
message.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, lmqTopic);
-                
message.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, 
String.valueOf(0));
-                response.setBody(MessageDecoder.encode(message, false));
-                responseFuture.setResponseCommand(response);
-                callback.operationSucceed(responseFuture.getResponseCommand());
-                return null;
-            }
+        doAnswer((Answer<Void>) mock -> {
+            InvokeCallback callback = mock.getArgument(3);
+            RemotingCommand request = mock.getArgument(1);
+            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+
+            PopMessageResponseHeader responseHeader = 
(PopMessageResponseHeader) response.readCustomHeader();
+            responseHeader.setInvisibleTime(invisibleTime);
+            responseHeader.setPopTime(popTime);
+            responseHeader.setReviveQid(0);
+            responseHeader.setRestNum(1);
+            StringBuilder startOffsetInfo = new StringBuilder(64);
+            ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 0L);
+            responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
+            StringBuilder msgOffsetInfo = new StringBuilder(64);
+            ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, 
Collections.singletonList(0L));
+            responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
+            response.setRemark("FOUND");
+            response.makeCustomHeaderToNet();
+
+            MessageExt message = new MessageExt();
+            message.setQueueId(3);
+            message.setFlag(0);
+            message.setQueueOffset(5L);
+            message.setCommitLogOffset(11111L);
+            message.setSysFlag(0);
+            message.setBornTimestamp(System.currentTimeMillis());
+            message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+            message.setStoreTimestamp(System.currentTimeMillis());
+            message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+            message.setBody("body".getBytes());
+            message.setTopic(topic);
+            message.putUserProperty("key", "value");
+            
message.putUserProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, lmqTopic);
+            
message.getProperties().put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, 
String.valueOf(0));
+            response.setBody(MessageDecoder.encode(message, false));
+            responseFuture.setResponseCommand(response);
+            callback.operationSucceed(responseFuture.getResponseCommand());
+            return null;
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
         final CountDownLatch done = new CountDownLatch(1);
         final PopMessageRequestHeader requestHeader = new 
PopMessageRequestHeader();
@@ -580,49 +638,46 @@ public class MQClientAPIImplTest {
         final String lmqTopic2 = MixAll.LMQ_PREFIX + "lmq2";
         final String multiDispatch = 
String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, lmqTopic, lmqTopic2);
         final String multiOffset = 
String.join(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER, "0", "0");
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock mock) throws Throwable {
-                InvokeCallback callback = mock.getArgument(3);
-                RemotingCommand request = mock.getArgument(1);
-                ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
-                RemotingCommand response = 
RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-
-                PopMessageResponseHeader responseHeader = 
(PopMessageResponseHeader) response.readCustomHeader();
-                responseHeader.setInvisibleTime(invisibleTime);
-                responseHeader.setPopTime(popTime);
-                responseHeader.setReviveQid(0);
-                responseHeader.setRestNum(1);
-                StringBuilder startOffsetInfo = new StringBuilder(64);
-                ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 
0L);
-                responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
-                StringBuilder msgOffsetInfo = new StringBuilder(64);
-                ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, 
Collections.singletonList(0L));
-                responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
-                response.setRemark("FOUND");
-                response.makeCustomHeaderToNet();
-
-                MessageExt message = new MessageExt();
-                message.setQueueId(0);
-                message.setFlag(0);
-                message.setQueueOffset(10L);
-                message.setCommitLogOffset(10000L);
-                message.setSysFlag(0);
-                message.setBornTimestamp(System.currentTimeMillis());
-                message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
-                message.setStoreTimestamp(System.currentTimeMillis());
-                message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
-                message.setBody("body".getBytes());
-                message.setTopic(topic);
-                MessageAccessor.putProperty(message, 
MessageConst.PROPERTY_INNER_MULTI_DISPATCH, multiDispatch);
-                MessageAccessor.putProperty(message, 
MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, multiOffset);
-                response.setBody(MessageDecoder.encode(message, false));
-                responseFuture.setResponseCommand(response);
-                callback.operationSucceed(responseFuture.getResponseCommand());
-                return null;
-            }
+        doAnswer((Answer<Void>) mock -> {
+            InvokeCallback callback = mock.getArgument(3);
+            RemotingCommand request = mock.getArgument(1);
+            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+
+            PopMessageResponseHeader responseHeader = 
(PopMessageResponseHeader) response.readCustomHeader();
+            responseHeader.setInvisibleTime(invisibleTime);
+            responseHeader.setPopTime(popTime);
+            responseHeader.setReviveQid(0);
+            responseHeader.setRestNum(1);
+            StringBuilder startOffsetInfo = new StringBuilder(64);
+            ExtraInfoUtil.buildStartOffsetInfo(startOffsetInfo, topic, 0, 0L);
+            responseHeader.setStartOffsetInfo(startOffsetInfo.toString());
+            StringBuilder msgOffsetInfo = new StringBuilder(64);
+            ExtraInfoUtil.buildMsgOffsetInfo(msgOffsetInfo, topic, 0, 
Collections.singletonList(0L));
+            responseHeader.setMsgOffsetInfo(msgOffsetInfo.toString());
+            response.setRemark("FOUND");
+            response.makeCustomHeaderToNet();
+
+            MessageExt message = new MessageExt();
+            message.setQueueId(0);
+            message.setFlag(0);
+            message.setQueueOffset(10L);
+            message.setCommitLogOffset(10000L);
+            message.setSysFlag(0);
+            message.setBornTimestamp(System.currentTimeMillis());
+            message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+            message.setStoreTimestamp(System.currentTimeMillis());
+            message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+            message.setBody("body".getBytes());
+            message.setTopic(topic);
+            MessageAccessor.putProperty(message, 
MessageConst.PROPERTY_INNER_MULTI_DISPATCH, multiDispatch);
+            MessageAccessor.putProperty(message, 
MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, multiOffset);
+            response.setBody(MessageDecoder.encode(message, false));
+            responseFuture.setResponseCommand(response);
+            callback.operationSucceed(responseFuture.getResponseCommand());
+            return null;
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
         final CountDownLatch done = new CountDownLatch(1);
         final PopMessageRequestHeader requestHeader = new 
PopMessageRequestHeader();
@@ -654,19 +709,16 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testAckMessageAsync_Success() throws Exception {
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock mock) throws Throwable {
-                InvokeCallback callback = mock.getArgument(3);
-                RemotingCommand request = mock.getArgument(1);
-                ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
-                RemotingCommand response = 
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
-                response.setOpaque(request.getOpaque());
-                response.setCode(ResponseCode.SUCCESS);
-                responseFuture.setResponseCommand(response);
-                callback.operationSucceed(responseFuture.getResponseCommand());
-                return null;
-            }
+        doAnswer((Answer<Void>) mock -> {
+            InvokeCallback callback = mock.getArgument(3);
+            RemotingCommand request = mock.getArgument(1);
+            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
+            response.setOpaque(request.getOpaque());
+            response.setCode(ResponseCode.SUCCESS);
+            responseFuture.setResponseCommand(response);
+            callback.operationSucceed(responseFuture.getResponseCommand());
+            return null;
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
 
         final CountDownLatch done = new CountDownLatch(1);
@@ -688,22 +740,19 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testChangeInvisibleTimeAsync_Success() throws Exception {
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock mock) throws Throwable {
-                InvokeCallback callback = mock.getArgument(3);
-                RemotingCommand request = mock.getArgument(1);
-                ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
-                RemotingCommand response = 
RemotingCommand.createResponseCommand(ChangeInvisibleTimeResponseHeader.class);
-                response.setOpaque(request.getOpaque());
-                response.setCode(ResponseCode.SUCCESS);
-                ChangeInvisibleTimeResponseHeader responseHeader = 
(ChangeInvisibleTimeResponseHeader) response.readCustomHeader();
-                responseHeader.setPopTime(System.currentTimeMillis());
-                responseHeader.setInvisibleTime(10 * 1000L);
-                responseFuture.setResponseCommand(response);
-                callback.operationSucceed(responseFuture.getResponseCommand());
-                return null;
-            }
+        doAnswer((Answer<Void>) mock -> {
+            InvokeCallback callback = mock.getArgument(3);
+            RemotingCommand request = mock.getArgument(1);
+            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(ChangeInvisibleTimeResponseHeader.class);
+            response.setOpaque(request.getOpaque());
+            response.setCode(ResponseCode.SUCCESS);
+            ChangeInvisibleTimeResponseHeader responseHeader = 
(ChangeInvisibleTimeResponseHeader) response.readCustomHeader();
+            responseHeader.setPopTime(System.currentTimeMillis());
+            responseHeader.setInvisibleTime(10 * 1000L);
+            responseFuture.setResponseCommand(response);
+            callback.operationSucceed(responseFuture.getResponseCommand());
+            return null;
         }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
 
         final CountDownLatch done = new CountDownLatch(1);
@@ -730,16 +779,13 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testSetMessageRequestMode_Success() throws Exception {
-        doAnswer(new Answer<RemotingCommand>() {
-            @Override
-            public RemotingCommand answer(InvocationOnMock mock) {
-                RemotingCommand request = mock.getArgument(1);
+        doAnswer((Answer<RemotingCommand>) mock -> {
+            RemotingCommand request = mock.getArgument(1);
 
-                RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-                return response;
-            }
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         mqClientAPI.setMessageRequestMode(brokerAddr, topic, group, 
MessageRequestMode.POP, 8, 10 * 1000L);
@@ -747,16 +793,13 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testCreateSubscriptionGroup_Success() throws Exception {
-        doAnswer(new Answer<RemotingCommand>() {
-            @Override
-            public RemotingCommand answer(InvocationOnMock mock) {
-                RemotingCommand request = mock.getArgument(1);
+        doAnswer((Answer<RemotingCommand>) mock -> {
+            RemotingCommand request = mock.getArgument(1);
 
-                RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-                return response;
-            }
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         mqClientAPI.createSubscriptionGroup(brokerAddr, new 
SubscriptionGroupConfig(), 10000);
@@ -764,16 +807,13 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testCreateTopic_Success() throws Exception {
-        doAnswer(new Answer<RemotingCommand>() {
-            @Override
-            public RemotingCommand answer(InvocationOnMock mock) {
-                RemotingCommand request = mock.getArgument(1);
+        doAnswer((Answer<RemotingCommand>) mock -> {
+            RemotingCommand request = mock.getArgument(1);
 
-                RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-                return response;
-            }
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         mqClientAPI.createTopic(brokerAddr, topic, new TopicConfig(), 10000);
@@ -781,31 +821,28 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testViewMessage() throws Exception {
-        doAnswer(new Answer<RemotingCommand>() {
-            @Override
-            public RemotingCommand answer(InvocationOnMock mock) throws 
Exception {
-                RemotingCommand request = mock.getArgument(1);
-
-                RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
-                MessageExt message = new MessageExt();
-                message.setQueueId(0);
-                message.setFlag(12);
-                message.setQueueOffset(0L);
-                message.setCommitLogOffset(100L);
-                message.setSysFlag(0);
-                message.setBornTimestamp(System.currentTimeMillis());
-                message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
-                message.setStoreTimestamp(System.currentTimeMillis());
-                message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
-                message.setBody("body".getBytes());
-                message.setTopic(topic);
-                message.putUserProperty("key", "value");
-                response.setBody(MessageDecoder.encode(message, false));
-                response.makeCustomHeaderToNet();
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-                return response;
-            }
+        doAnswer((Answer<RemotingCommand>) mock -> {
+            RemotingCommand request = mock.getArgument(1);
+
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+            MessageExt message = new MessageExt();
+            message.setQueueId(0);
+            message.setFlag(12);
+            message.setQueueOffset(0L);
+            message.setCommitLogOffset(100L);
+            message.setSysFlag(0);
+            message.setBornTimestamp(System.currentTimeMillis());
+            message.setBornHost(new InetSocketAddress("127.0.0.1", 10));
+            message.setStoreTimestamp(System.currentTimeMillis());
+            message.setStoreHost(new InetSocketAddress("127.0.0.1", 11));
+            message.setBody("body".getBytes());
+            message.setTopic(topic);
+            message.putUserProperty("key", "value");
+            response.setBody(MessageDecoder.encode(message, false));
+            response.makeCustomHeaderToNet();
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         MessageExt messageExt = mqClientAPI.viewMessage(brokerAddr, "topic", 
100L, 10000);
@@ -814,19 +851,16 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testSearchOffset() throws Exception {
-        doAnswer(new Answer<RemotingCommand>() {
-            @Override
-            public RemotingCommand answer(InvocationOnMock mock) {
-                RemotingCommand request = mock.getArgument(1);
-
-                final RemotingCommand response = 
RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
-                final SearchOffsetResponseHeader responseHeader = 
(SearchOffsetResponseHeader) response.readCustomHeader();
-                responseHeader.setOffset(100L);
-                response.makeCustomHeaderToNet();
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-                return response;
-            }
+        doAnswer((Answer<RemotingCommand>) mock -> {
+            RemotingCommand request = mock.getArgument(1);
+
+            final RemotingCommand response = 
RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
+            final SearchOffsetResponseHeader responseHeader = 
(SearchOffsetResponseHeader) response.readCustomHeader();
+            responseHeader.setOffset(100L);
+            response.makeCustomHeaderToNet();
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         long offset = mqClientAPI.searchOffset(brokerAddr, topic, 0, 
System.currentTimeMillis() - 1000, 10000);
@@ -835,19 +869,16 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testGetMaxOffset() throws Exception {
-        doAnswer(new Answer<RemotingCommand>() {
-            @Override
-            public RemotingCommand answer(InvocationOnMock mock) {
-                RemotingCommand request = mock.getArgument(1);
-
-                final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
-                final GetMaxOffsetResponseHeader responseHeader = 
(GetMaxOffsetResponseHeader) response.readCustomHeader();
-                responseHeader.setOffset(100L);
-                response.makeCustomHeaderToNet();
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-                return response;
-            }
+        doAnswer((Answer<RemotingCommand>) mock -> {
+            RemotingCommand request = mock.getArgument(1);
+
+            final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
+            final GetMaxOffsetResponseHeader responseHeader = 
(GetMaxOffsetResponseHeader) response.readCustomHeader();
+            responseHeader.setOffset(100L);
+            response.makeCustomHeaderToNet();
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         long offset = mqClientAPI.getMaxOffset(brokerAddr, new 
MessageQueue(topic, brokerName, 0), 10000);
@@ -856,19 +887,16 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testGetMinOffset() throws Exception {
-        doAnswer(new Answer<RemotingCommand>() {
-            @Override
-            public RemotingCommand answer(InvocationOnMock mock) {
-                RemotingCommand request = mock.getArgument(1);
-
-                final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
-                final GetMinOffsetResponseHeader responseHeader = 
(GetMinOffsetResponseHeader) response.readCustomHeader();
-                responseHeader.setOffset(100L);
-                response.makeCustomHeaderToNet();
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-                return response;
-            }
+        doAnswer((Answer<RemotingCommand>) mock -> {
+            RemotingCommand request = mock.getArgument(1);
+
+            final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
+            final GetMinOffsetResponseHeader responseHeader = 
(GetMinOffsetResponseHeader) response.readCustomHeader();
+            responseHeader.setOffset(100L);
+            response.makeCustomHeaderToNet();
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         long offset = mqClientAPI.getMinOffset(brokerAddr, new 
MessageQueue(topic, brokerName, 0), 10000);
@@ -877,19 +905,16 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testGetEarliestMsgStoretime() throws Exception {
-        doAnswer(new Answer<RemotingCommand>() {
-            @Override
-            public RemotingCommand answer(InvocationOnMock mock) {
-                RemotingCommand request = mock.getArgument(1);
-
-                final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
-                final GetEarliestMsgStoretimeResponseHeader responseHeader = 
(GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
-                responseHeader.setTimestamp(100L);
-                response.makeCustomHeaderToNet();
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-                return response;
-            }
+        doAnswer((Answer<RemotingCommand>) mock -> {
+            RemotingCommand request = mock.getArgument(1);
+
+            final RemotingCommand response = 
RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
+            final GetEarliestMsgStoretimeResponseHeader responseHeader = 
(GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
+            responseHeader.setTimestamp(100L);
+            response.makeCustomHeaderToNet();
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         long t = mqClientAPI.getEarliestMsgStoretime(brokerAddr, new 
MessageQueue(topic, brokerName, 0), 10000);
@@ -898,21 +923,18 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testQueryConsumerOffset() throws Exception {
-        doAnswer(new Answer<RemotingCommand>() {
-            @Override
-            public RemotingCommand answer(InvocationOnMock mock) {
-                RemotingCommand request = mock.getArgument(1);
+        doAnswer((Answer<RemotingCommand>) mock -> {
+            RemotingCommand request = mock.getArgument(1);
 
-                final RemotingCommand response =
+            final RemotingCommand response =
                     
RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
-                final QueryConsumerOffsetResponseHeader responseHeader =
+            final QueryConsumerOffsetResponseHeader responseHeader =
                     (QueryConsumerOffsetResponseHeader) 
response.readCustomHeader();
-                responseHeader.setOffset(100L);
-                response.makeCustomHeaderToNet();
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-                return response;
-            }
+            responseHeader.setOffset(100L);
+            response.makeCustomHeaderToNet();
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         long t = mqClientAPI.queryConsumerOffset(brokerAddr, new 
QueryConsumerOffsetRequestHeader(), 1000);
@@ -921,18 +943,15 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testUpdateConsumerOffset() throws Exception {
-        doAnswer(new Answer<RemotingCommand>() {
-            @Override
-            public RemotingCommand answer(InvocationOnMock mock) {
-                RemotingCommand request = mock.getArgument(1);
+        doAnswer((Answer<RemotingCommand>) mock -> {
+            RemotingCommand request = mock.getArgument(1);
 
-                final RemotingCommand response =
+            final RemotingCommand response =
                     
RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
-                response.makeCustomHeaderToNet();
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-                return response;
-            }
+            response.makeCustomHeaderToNet();
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         mqClientAPI.updateConsumerOffset(brokerAddr, new 
UpdateConsumerOffsetRequestHeader(), 1000);
@@ -940,21 +959,18 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testGetConsumerIdListByGroup() throws Exception {
-        doAnswer(new Answer<RemotingCommand>() {
-            @Override
-            public RemotingCommand answer(InvocationOnMock mock) {
-                RemotingCommand request = mock.getArgument(1);
+        doAnswer((Answer<RemotingCommand>) mock -> {
+            RemotingCommand request = mock.getArgument(1);
 
-                final RemotingCommand response =
+            final RemotingCommand response =
                     
RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
-                GetConsumerListByGroupResponseBody body = new 
GetConsumerListByGroupResponseBody();
-                body.setConsumerIdList(Collections.singletonList("consumer1"));
-                response.setBody(body.encode());
-                response.makeCustomHeaderToNet();
-                response.setCode(ResponseCode.SUCCESS);
-                response.setOpaque(request.getOpaque());
-                return response;
-            }
+            GetConsumerListByGroupResponseBody body = new 
GetConsumerListByGroupResponseBody();
+            body.setConsumerIdList(Collections.singletonList("consumer1"));
+            response.setBody(body.encode());
+            response.makeCustomHeaderToNet();
+            response.setCode(ResponseCode.SUCCESS);
+            response.setOpaque(request.getOpaque());
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
         List<String> consumerIdList = 
mqClientAPI.getConsumerIdListByGroup(brokerAddr, group, 10000);
         assertThat(consumerIdList).size().isGreaterThan(0);
@@ -991,7 +1007,6 @@ public class MQClientAPIImplTest {
         response.setOpaque(request.getOpaque());
         response.markResponseType();
         response.setRemark(null);
-
         return response;
     }
 
@@ -1001,7 +1016,6 @@ public class MQClientAPIImplTest {
         response.setOpaque(request.getOpaque());
         response.markResponseType();
         response.setRemark(null);
-
         return response;
     }
 
@@ -1011,7 +1025,6 @@ public class MQClientAPIImplTest {
         response.setOpaque(request.getOpaque());
         response.markResponseType();
         response.setRemark("corresponding to accessConfig has been updated 
failed");
-
         return response;
     }
 
@@ -1021,12 +1034,10 @@ public class MQClientAPIImplTest {
         response.setOpaque(request.getOpaque());
         response.markResponseType();
         response.setRemark("corresponding to accessConfig has been deleted 
failed");
-
         return response;
     }
 
     private PlainAccessConfig createUpdateAclConfig() {
-
         PlainAccessConfig config = new PlainAccessConfig();
         config.setAccessKey("Rocketmq111");
         config.setSecretKey("123456789");
@@ -1049,21 +1060,18 @@ public class MQClientAPIImplTest {
 
     @Test
     public void testAddWritePermOfBroker() throws Exception {
-        doAnswer(new Answer() {
-            @Override
-            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
-                RemotingCommand request = invocationOnMock.getArgument(1);
-                if (request.getCode() != RequestCode.ADD_WRITE_PERM_OF_BROKER) 
{
-                    return null;
-                }
-
-                RemotingCommand response = 
RemotingCommand.createResponseCommand(AddWritePermOfBrokerResponseHeader.class);
-                AddWritePermOfBrokerResponseHeader responseHeader = 
(AddWritePermOfBrokerResponseHeader) response.readCustomHeader();
-                response.setCode(ResponseCode.SUCCESS);
-                responseHeader.setAddTopicCount(7);
-                response.addExtField("addTopicCount", 
String.valueOf(responseHeader.getAddTopicCount()));
-                return response;
+        doAnswer(invocationOnMock -> {
+            RemotingCommand request = invocationOnMock.getArgument(1);
+            if (request.getCode() != RequestCode.ADD_WRITE_PERM_OF_BROKER) {
+                return null;
             }
+
+            RemotingCommand response = 
RemotingCommand.createResponseCommand(AddWritePermOfBrokerResponseHeader.class);
+            AddWritePermOfBrokerResponseHeader responseHeader = 
(AddWritePermOfBrokerResponseHeader) response.readCustomHeader();
+            response.setCode(ResponseCode.SUCCESS);
+            responseHeader.setAddTopicCount(7);
+            response.addExtField("addTopicCount", 
String.valueOf(responseHeader.getAddTopicCount()));
+            return response;
         }).when(remotingClient).invokeSync(anyString(), 
any(RemotingCommand.class), anyLong());
 
         int topicCnt = mqClientAPI.addWritePermOfBroker("127.0.0.1", 
"default-broker", 1000);
@@ -1091,4 +1099,979 @@ public class MQClientAPIImplTest {
         mqClientAPI.createTopicList(brokerAddr, topicConfigList, 10000);
     }
 
+    @Test
+    public void assertFetchNameServerAddr() throws NoSuchFieldException, 
IllegalAccessException {
+        setTopAddressing();
+        assertEquals(defaultNsAddr, mqClientAPI.fetchNameServerAddr());
+    }
+
+    @Test
+    public void assertOnNameServerAddressChange() {
+        assertEquals(defaultNsAddr, 
mqClientAPI.onNameServerAddressChange(defaultNsAddr));
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testUpdateGlobalWhiteAddrsConfig() throws MQBrokerException, 
RemotingException, InterruptedException, MQClientException {
+        mqClientAPI.updateGlobalWhiteAddrsConfig(defaultNsAddr, "", "", 
defaultTimeout);
+    }
+
+    @Test
+    public void assertGetBrokerClusterAclInfo() throws MQBrokerException, 
RemotingException, InterruptedException {
+        mockInvokeSync();
+        GetBrokerAclConfigResponseHeader responseHeader = 
mock(GetBrokerAclConfigResponseHeader.class);
+        when(responseHeader.getBrokerName()).thenReturn(brokerName);
+        when(responseHeader.getBrokerAddr()).thenReturn(defaultBrokerAddr);
+        when(responseHeader.getClusterName()).thenReturn(clusterName);
+        
when(responseHeader.getAllAclFileVersion()).thenReturn("{\"key\":{\"stateVersion\":1}}");
+        setResponseHeader(responseHeader);
+        ClusterAclVersionInfo actual = 
mqClientAPI.getBrokerClusterAclInfo(defaultNsAddr, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(brokerName, actual.getBrokerName());
+        assertEquals(defaultBrokerAddr, actual.getBrokerAddr());
+        assertEquals(clusterName, actual.getClusterName());
+        assertEquals(1, actual.getAllAclConfigDataVersion().size());
+        assertNull(actual.getAclConfigDataVersion());
+    }
+
+    @Test
+    public void assertPullMessage() throws MQBrokerException, 
RemotingException, InterruptedException {
+        PullMessageRequestHeader requestHeader = 
mock(PullMessageRequestHeader.class);
+        mockInvokeSync();
+        PullCallback callback = mock(PullCallback.class);
+        PullMessageResponseHeader responseHeader = 
mock(PullMessageResponseHeader.class);
+        setResponseHeader(responseHeader);
+        when(responseHeader.getNextBeginOffset()).thenReturn(1L);
+        when(responseHeader.getMinOffset()).thenReturn(1L);
+        when(responseHeader.getMaxOffset()).thenReturn(10L);
+        
when(responseHeader.getSuggestWhichBrokerId()).thenReturn(MixAll.MASTER_ID);
+        PullResult actual = mqClientAPI.pullMessage(defaultBrokerAddr, 
requestHeader, defaultTimeout, CommunicationMode.SYNC, callback);
+        assertNotNull(actual);
+        assertEquals(1L, actual.getNextBeginOffset());
+        assertEquals(1L, actual.getMinOffset());
+        assertEquals(10L, actual.getMaxOffset());
+        assertEquals(PullStatus.FOUND, actual.getPullStatus());
+        assertNull(actual.getMsgFoundList());
+    }
+
+    @Test
+    public void testBatchAckMessageAsync() throws MQBrokerException, 
RemotingException, InterruptedException {
+        AckCallback callback = mock(AckCallback.class);
+        List<String> extraInfoList = new ArrayList<>();
+        extraInfoList.add(String.format("%s %s %s %s %s %s %d %d", "1", "2", 
"3", "4", "5", brokerName, 7, 8));
+        mqClientAPI.batchAckMessageAsync(defaultBrokerAddr, defaultTimeout, 
callback, defaultTopic, "", extraInfoList);
+    }
+
+    @Test
+    public void assertSearchOffset() throws MQBrokerException, 
RemotingException, InterruptedException {
+        mockInvokeSync();
+        SearchOffsetResponseHeader responseHeader = 
mock(SearchOffsetResponseHeader.class);
+        when(responseHeader.getOffset()).thenReturn(1L);
+        setResponseHeader(responseHeader);
+        assertEquals(1L, mqClientAPI.searchOffset(defaultBrokerAddr, new 
MessageQueue(), System.currentTimeMillis(), defaultTimeout));
+    }
+
+    @Test
+    public void testUpdateConsumerOffsetOneway() throws RemotingException, 
InterruptedException {
+        UpdateConsumerOffsetRequestHeader requestHeader = 
mock(UpdateConsumerOffsetRequestHeader.class);
+        mqClientAPI.updateConsumerOffsetOneway(defaultBrokerAddr, 
requestHeader, defaultTimeout);
+    }
+
+    @Test
+    public void assertSendHeartbeat() throws MQBrokerException, 
RemotingException, InterruptedException {
+        mockInvokeSync();
+        HeartbeatData heartbeatData = new HeartbeatData();
+        assertEquals(1, mqClientAPI.sendHeartbeat(defaultBrokerAddr, 
heartbeatData, defaultTimeout));
+    }
+
+    @Test
+    public void assertSendHeartbeatV2() throws MQBrokerException, 
RemotingException, InterruptedException {
+        mockInvokeSync();
+        HeartbeatData heartbeatData = new HeartbeatData();
+        HeartbeatV2Result actual = 
mqClientAPI.sendHeartbeatV2(defaultBrokerAddr, heartbeatData, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getVersion());
+        assertFalse(actual.isSubChange());
+        assertFalse(actual.isSupportV2());
+    }
+
+    @Test
+    public void testUnregisterClient() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        mqClientAPI.unregisterClient(defaultBrokerAddr, "", "", "", 
defaultTimeout);
+    }
+
+    @Test
+    public void testEndTransactionOneway() throws RemotingException, 
InterruptedException {
+        mockInvokeSync();
+        EndTransactionRequestHeader requestHeader = 
mock(EndTransactionRequestHeader.class);
+        mqClientAPI.endTransactionOneway(defaultBrokerAddr, requestHeader, "", 
defaultTimeout);
+    }
+
+    @Test
+    public void testQueryMessage() throws MQBrokerException, 
RemotingException, InterruptedException {
+        QueryMessageRequestHeader requestHeader = 
mock(QueryMessageRequestHeader.class);
+        InvokeCallback callback = mock(InvokeCallback.class);
+        mqClientAPI.queryMessage(defaultBrokerAddr, requestHeader, 
defaultTimeout, callback, false);
+    }
+
+    @Test
+    public void testRegisterClient() throws RemotingException, 
InterruptedException {
+        mockInvokeSync();
+        HeartbeatData heartbeatData = new HeartbeatData();
+        assertTrue(mqClientAPI.registerClient(defaultBrokerAddr, 
heartbeatData, defaultTimeout));
+    }
+
+    @Test
+    public void testConsumerSendMessageBack() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        MessageExt messageExt = mock(MessageExt.class);
+        mqClientAPI.consumerSendMessageBack(defaultBrokerAddr, brokerName, 
messageExt, "", 1, defaultTimeout, 1000);
+    }
+
+    @Test
+    public void assertLockBatchMQ() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        LockBatchRequestBody responseBody = new LockBatchRequestBody();
+        setResponseBody(responseBody);
+        Set<MessageQueue> actual = mqClientAPI.lockBatchMQ(defaultBrokerAddr, 
responseBody, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(0, actual.size());
+    }
+
+    @Test
+    public void testUnlockBatchMQ() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        UnlockBatchRequestBody unlockBatchRequestBody = new 
UnlockBatchRequestBody();
+        mqClientAPI.unlockBatchMQ(defaultBrokerAddr, unlockBatchRequestBody, 
defaultTimeout, false);
+    }
+
+    @Test
+    public void assertGetTopicStatsInfo() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        TopicStatsTable responseBody = new TopicStatsTable();
+        MessageQueue messageQueue = new MessageQueue();
+        TopicOffset topicOffset = new TopicOffset();
+        responseBody.getOffsetTable().put(messageQueue, topicOffset);
+        setResponseBody(responseBody);
+        TopicStatsTable actual = 
mqClientAPI.getTopicStatsInfo(defaultBrokerAddr, defaultTopic, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getOffsetTable().size());
+    }
+
+    @Test
+    public void assertGetConsumeStats() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        ConsumeStats responseBody = new ConsumeStats();
+        responseBody.setConsumeTps(1000);
+        setResponseBody(responseBody);
+        ConsumeStats actual = mqClientAPI.getConsumeStats(defaultBrokerAddr, 
"", defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1000, actual.getConsumeTps(), 0.0);
+    }
+
+    @Test
+    public void assertGetProducerConnectionList() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        ProducerConnection responseBody = new ProducerConnection();
+        responseBody.getConnectionSet().add(new Connection());
+        setResponseBody(responseBody);
+        ProducerConnection actual = 
mqClientAPI.getProducerConnectionList(defaultBrokerAddr, "", defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getConnectionSet().size());
+    }
+
+    @Test
+    public void assertGetAllProducerInfo() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        Map<String, List<ProducerInfo>> data = new HashMap<>();
+        data.put("key", Collections.emptyList());
+        ProducerTableInfo responseBody = new ProducerTableInfo(data);
+        setResponseBody(responseBody);
+        ProducerTableInfo actual = 
mqClientAPI.getAllProducerInfo(defaultBrokerAddr, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getData().size());
+    }
+
+    @Test
+    public void assertGetConsumerConnectionList() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        ConsumerConnection responseBody = new ConsumerConnection();
+        responseBody.setConsumeType(ConsumeType.CONSUME_ACTIVELY);
+        
responseBody.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+        responseBody.setMessageModel(MessageModel.CLUSTERING);
+        setResponseBody(responseBody);
+        ConsumerConnection actual = 
mqClientAPI.getConsumerConnectionList(defaultBrokerAddr, "", defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(ConsumeType.CONSUME_ACTIVELY, actual.getConsumeType());
+        assertEquals(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET, 
actual.getConsumeFromWhere());
+        assertEquals(MessageModel.CLUSTERING, actual.getMessageModel());
+    }
+
+    @Test
+    public void assertGetBrokerRuntimeInfo() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        KVTable responseBody = new KVTable();
+        responseBody.getTable().put("key", "value");
+        setResponseBody(responseBody);
+        KVTable actual = mqClientAPI.getBrokerRuntimeInfo(defaultBrokerAddr, 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getTable().size());
+    }
+
+    @Test
+    public void testAddBroker() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        mqClientAPI.addBroker(defaultBrokerAddr, "", defaultTimeout);
+    }
+
+    @Test
+    public void testRemoveBroker() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        mqClientAPI.removeBroker(defaultBrokerAddr, clusterName, brokerName, 
MixAll.MASTER_ID, defaultTimeout);
+    }
+
+    @Test
+    public void testUpdateBrokerConfig() throws RemotingException, 
InterruptedException, MQBrokerException, UnsupportedEncodingException, 
MQClientException {
+        mockInvokeSync();
+        mqClientAPI.updateBrokerConfig(defaultBrokerAddr, createProperties(), 
defaultTimeout);
+    }
+
+    @Test
+    public void assertGetBrokerConfig() throws RemotingException, 
InterruptedException, MQBrokerException, UnsupportedEncodingException {
+        mockInvokeSync();
+        setResponseBody("{\"key\":\"value\"}");
+        Properties actual = mqClientAPI.getBrokerConfig(defaultBrokerAddr, 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.size());
+    }
+
+    @Test
+    public void testUpdateColdDataFlowCtrGroupConfig() throws 
RemotingException, InterruptedException, MQBrokerException, 
UnsupportedEncodingException {
+        mockInvokeSync();
+        Properties props = new Properties();
+        mqClientAPI.updateColdDataFlowCtrGroupConfig(defaultBrokerAddr, props, 
defaultTimeout);
+    }
+
+    @Test
+    public void testRemoveColdDataFlowCtrGroupConfig() throws 
RemotingException, InterruptedException, MQBrokerException, 
UnsupportedEncodingException {
+        mockInvokeSync();
+        mqClientAPI.removeColdDataFlowCtrGroupConfig(defaultBrokerAddr, "", 
defaultTimeout);
+    }
+
+    @Test
+    public void assertGetColdDataFlowCtrInfo() throws RemotingException, 
InterruptedException, MQBrokerException, UnsupportedEncodingException {
+        mockInvokeSync();
+        setResponseBody("{\"key\":\"value\"}");
+        String actual = mqClientAPI.getColdDataFlowCtrInfo(defaultBrokerAddr, 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals("\"{\\\"key\\\":\\\"value\\\"}\"", actual);
+    }
+
+    @Test
+    public void assertSetCommitLogReadAheadMode() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        when(response.getRemark()).thenReturn("remark");
+        String actual = 
mqClientAPI.setCommitLogReadAheadMode(defaultBrokerAddr, "", defaultTimeout);
+        assertNotNull(actual);
+        assertEquals("remark", actual);
+    }
+
+    @Test
+    public void assertGetBrokerClusterInfo() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        ClusterInfo responseBody = new ClusterInfo();
+        Map<String, Set<String>> clusterAddrTable = new HashMap<>();
+        clusterAddrTable.put(clusterName, new HashSet<>());
+        Map<String, BrokerData> brokerAddrTable = new HashMap<>();
+        brokerAddrTable.put(brokerName, new BrokerData());
+        responseBody.setClusterAddrTable(clusterAddrTable);
+        responseBody.setBrokerAddrTable(brokerAddrTable);
+        setResponseBody(responseBody);
+        ClusterInfo actual = mqClientAPI.getBrokerClusterInfo(defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getClusterAddrTable().size());
+        assertEquals(1, actual.getBrokerAddrTable().size());
+    }
+
+    @Test
+    public void assertGetDefaultTopicRouteInfoFromNameServer() throws 
RemotingException, InterruptedException, MQClientException {
+        mockInvokeSync();
+        TopicRouteData responseBody = new TopicRouteData();
+        responseBody.getQueueDatas().add(new QueueData());
+        responseBody.getBrokerDatas().add(new BrokerData());
+        responseBody.getFilterServerTable().put("key", 
Collections.emptyList());
+        Map<String, TopicQueueMappingInfo> topicQueueMappingByBroker = new 
HashMap<>();
+        topicQueueMappingByBroker.put("key", new TopicQueueMappingInfo());
+        responseBody.setTopicQueueMappingByBroker(topicQueueMappingByBroker);
+        setResponseBody(responseBody);
+        TopicRouteData actual = 
mqClientAPI.getDefaultTopicRouteInfoFromNameServer(defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getQueueDatas().size());
+        assertEquals(1, actual.getBrokerDatas().size());
+        assertEquals(1, actual.getFilterServerTable().size());
+        assertEquals(1, actual.getTopicQueueMappingByBroker().size());
+    }
+
+    @Test
+    public void assertGetTopicRouteInfoFromNameServer() throws 
RemotingException, InterruptedException, MQClientException {
+        mockInvokeSync();
+        TopicRouteData responseBody = new TopicRouteData();
+        responseBody.getQueueDatas().add(new QueueData());
+        responseBody.getBrokerDatas().add(new BrokerData());
+        responseBody.getFilterServerTable().put("key", 
Collections.emptyList());
+        Map<String, TopicQueueMappingInfo> topicQueueMappingByBroker = new 
HashMap<>();
+        topicQueueMappingByBroker.put("key", new TopicQueueMappingInfo());
+        responseBody.setTopicQueueMappingByBroker(topicQueueMappingByBroker);
+        setResponseBody(responseBody);
+        TopicRouteData actual = 
mqClientAPI.getTopicRouteInfoFromNameServer(defaultTopic, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getQueueDatas().size());
+        assertEquals(1, actual.getBrokerDatas().size());
+        assertEquals(1, actual.getFilterServerTable().size());
+        assertEquals(1, actual.getTopicQueueMappingByBroker().size());
+    }
+
+    @Test
+    public void assertGetTopicListFromNameServer() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        TopicList responseBody = new TopicList();
+        responseBody.setBrokerAddr(defaultBrokerAddr);
+        responseBody.getTopicList().add(defaultTopic);
+        setResponseBody(responseBody);
+        TopicList actual = 
mqClientAPI.getTopicListFromNameServer(defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getTopicList().size());
+        assertEquals(defaultBrokerAddr, actual.getBrokerAddr());
+    }
+
+    @Test
+    public void assertWipeWritePermOfBroker() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        WipeWritePermOfBrokerResponseHeader responseHeader = 
mock(WipeWritePermOfBrokerResponseHeader.class);
+        when(responseHeader.getWipeTopicCount()).thenReturn(1);
+        setResponseHeader(responseHeader);
+        assertEquals(1, mqClientAPI.wipeWritePermOfBroker(defaultNsAddr, 
brokerName, defaultTimeout));
+    }
+
+    @Test
+    public void testDeleteTopicInBroker() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        mqClientAPI.deleteTopicInBroker(defaultBrokerAddr, defaultTopic, 
defaultTimeout);
+    }
+
+    @Test
+    public void testDeleteTopicInNameServer() throws RemotingException, 
InterruptedException, MQClientException, MQBrokerException {
+        mockInvokeSync();
+        mqClientAPI.deleteTopicInNameServer(defaultNsAddr, defaultTopic, 
defaultTimeout);
+        mqClientAPI.deleteTopicInNameServer(defaultNsAddr, clusterName, 
defaultTopic, defaultTimeout);
+    }
+
+    @Test
+    public void testDeleteSubscriptionGroup() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        mqClientAPI.deleteSubscriptionGroup(defaultBrokerAddr, "", true, 
defaultTimeout);
+    }
+
+    @Test
+    public void assertGetKVConfigValue() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        GetKVConfigResponseHeader responseHeader = 
mock(GetKVConfigResponseHeader.class);
+        when(responseHeader.getValue()).thenReturn("value");
+        setResponseHeader(responseHeader);
+        assertEquals("value", mqClientAPI.getKVConfigValue("", "", 
defaultTimeout));
+    }
+
+    @Test
+    public void testPutKVConfigValue() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        mqClientAPI.putKVConfigValue("", "", "", defaultTimeout);
+    }
+
+    @Test
+    public void testDeleteKVConfigValue() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        mqClientAPI.deleteKVConfigValue("", "", defaultTimeout);
+    }
+
+    @Test
+    public void assertGetKVListByNamespace() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        KVTable responseBody = new KVTable();
+        responseBody.getTable().put("key", "value");
+        setResponseBody(responseBody);
+        KVTable actual = mqClientAPI.getKVListByNamespace("", defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getTable().size());
+    }
+
+    @Test
+    public void assertInvokeBrokerToResetOffset() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        ResetOffsetBody responseBody = new ResetOffsetBody();
+        responseBody.getOffsetTable().put(new MessageQueue(), 1L);
+        setResponseBody(responseBody);
+        Map<MessageQueue, Long> actual = 
mqClientAPI.invokeBrokerToResetOffset(defaultBrokerAddr, defaultTopic, "", 
System.currentTimeMillis(), false, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.size());
+        actual = mqClientAPI.invokeBrokerToResetOffset(defaultBrokerAddr, 
defaultTopic, "", System.currentTimeMillis(), 1, 1L, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.size());
+    }
+
+    @Test
+    public void assertInvokeBrokerToGetConsumerStatus() throws 
RemotingException, InterruptedException, MQClientException {
+        mockInvokeSync();
+        GetConsumerStatusBody responseBody = new GetConsumerStatusBody();
+        responseBody.getConsumerTable().put("key", new HashMap<>());
+        responseBody.getMessageQueueTable().put(new MessageQueue(), 1L);
+        setResponseBody(responseBody);
+        Map<String, Map<MessageQueue, Long>> actual = 
mqClientAPI.invokeBrokerToGetConsumerStatus(defaultBrokerAddr, defaultTopic, 
"", "", defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.size());
+    }
+
+    @Test
+    public void assertQueryTopicConsumeByWho() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        GroupList responseBody = new GroupList();
+        responseBody.getGroupList().add("");
+        setResponseBody(responseBody);
+        GroupList actual = 
mqClientAPI.queryTopicConsumeByWho(defaultBrokerAddr, defaultTopic, 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getGroupList().size());
+    }
+
+    @Test
+    public void assertQueryTopicsByConsumer() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        TopicList responseBody = new TopicList();
+        responseBody.getTopicList().add(defaultTopic);
+        responseBody.setBrokerAddr(defaultBrokerAddr);
+        setResponseBody(responseBody);
+        TopicList actual = 
mqClientAPI.queryTopicsByConsumer(defaultBrokerAddr, "", defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getTopicList().size());
+        assertEquals(defaultBrokerAddr, actual.getBrokerAddr());
+    }
+
+    @Test
+    public void assertQuerySubscriptionByConsumer() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        QuerySubscriptionResponseBody responseBody = new 
QuerySubscriptionResponseBody();
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setTopic(defaultTopic);
+        responseBody.setSubscriptionData(subscriptionData);
+        setResponseBody(responseBody);
+        SubscriptionData actual = 
mqClientAPI.querySubscriptionByConsumer(defaultBrokerAddr, group, defaultTopic, 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(defaultTopic, actual.getTopic());
+    }
+
+    @Test
+    public void assertQueryConsumeTimeSpan() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        QueryConsumeTimeSpanBody responseBody = new QueryConsumeTimeSpanBody();
+        responseBody.getConsumeTimeSpanSet().add(new QueueTimeSpan());
+        setResponseBody(responseBody);
+        List<QueueTimeSpan> actual = 
mqClientAPI.queryConsumeTimeSpan(defaultBrokerAddr, defaultTopic, group, 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.size());
+    }
+
+    @Test
+    public void assertGetTopicsByCluster() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        TopicList responseBody = new TopicList();
+        responseBody.setBrokerAddr(defaultBrokerAddr);
+        responseBody.setTopicList(Collections.singleton(defaultTopic));
+        setResponseBody(responseBody);
+        TopicList actual = mqClientAPI.getTopicsByCluster(clusterName, 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(defaultBrokerAddr, actual.getBrokerAddr());
+        assertEquals(1, actual.getTopicList().size());
+        assertTrue(actual.getTopicList().contains(defaultTopic));
+    }
+
+    @Test
+    public void assertGetSystemTopicList() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        TopicList responseBody = new TopicList();
+        responseBody.setBrokerAddr(defaultBrokerAddr);
+        responseBody.setTopicList(Collections.singleton(defaultTopic));
+        setResponseBody(responseBody);
+        TopicList actual = mqClientAPI.getSystemTopicList(defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(defaultBrokerAddr, actual.getBrokerAddr());
+        assertEquals(1, actual.getTopicList().size());
+        assertTrue(actual.getTopicList().contains(defaultTopic));
+    }
+
+    @Test
+    public void assertGetSystemTopicListFromBroker() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        TopicList responseBody = new TopicList();
+        responseBody.setBrokerAddr(defaultBrokerAddr);
+        responseBody.setTopicList(Collections.singleton(defaultTopic));
+        setResponseBody(responseBody);
+        TopicList actual = 
mqClientAPI.getSystemTopicListFromBroker(defaultBrokerAddr, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(defaultBrokerAddr, actual.getBrokerAddr());
+        assertEquals(1, actual.getTopicList().size());
+        assertTrue(actual.getTopicList().contains(defaultTopic));
+    }
+
+    @Test
+    public void assertCleanExpiredConsumeQueue() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        assertTrue(mqClientAPI.cleanExpiredConsumeQueue(defaultBrokerAddr, 
defaultTimeout));
+    }
+
+    @Test
+    public void assertDeleteExpiredCommitLog() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        assertTrue(mqClientAPI.deleteExpiredCommitLog(defaultBrokerAddr, 
defaultTimeout));
+    }
+
+    @Test
+    public void assertCleanUnusedTopicByAddr() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        assertTrue(mqClientAPI.cleanUnusedTopicByAddr(defaultBrokerAddr, 
defaultTimeout));
+    }
+
+    @Test
+    public void assertGetConsumerRunningInfo() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        ConsumerRunningInfo responseBody = new ConsumerRunningInfo();
+        responseBody.setJstack("jstack");
+        responseBody.getUserConsumerInfo().put("key", "value");
+        setResponseBody(responseBody);
+        ConsumerRunningInfo actual = 
mqClientAPI.getConsumerRunningInfo(defaultBrokerAddr, group, clientId, false, 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals("jstack", actual.getJstack());
+        assertEquals(1, actual.getUserConsumerInfo().size());
+    }
+
+    @Test
+    public void assertConsumeMessageDirectly() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        ConsumeMessageDirectlyResult responseBody = new 
ConsumeMessageDirectlyResult();
+        responseBody.setAutoCommit(true);
+        responseBody.setRemark("remark");
+        setResponseBody(responseBody);
+        ConsumeMessageDirectlyResult actual = 
mqClientAPI.consumeMessageDirectly(defaultBrokerAddr, group, clientId, topic, 
"", defaultTimeout);
+        assertNotNull(actual);
+        assertEquals("remark", actual.getRemark());
+        assertTrue(actual.isAutoCommit());
+    }
+
+    @Test
+    public void assertQueryCorrectionOffset() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        QueryCorrectionOffsetBody responseBody = new 
QueryCorrectionOffsetBody();
+        responseBody.getCorrectionOffsets().put(1, 1L);
+        setResponseBody(responseBody);
+        Map<Integer, Long> actual = 
mqClientAPI.queryCorrectionOffset(defaultBrokerAddr, topic, group, new 
HashSet<>(), defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.size());
+        assertTrue(actual.containsKey(1));
+        assertTrue(actual.containsValue(1L));
+    }
+
+    @Test
+    public void assertGetUnitTopicList() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        TopicList responseBody = new TopicList();
+        responseBody.getTopicList().add(defaultTopic);
+        setResponseBody(responseBody);
+        TopicList actual = mqClientAPI.getUnitTopicList(false, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getTopicList().size());
+    }
+
+    @Test
+    public void assertGetHasUnitSubTopicList() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        TopicList responseBody = new TopicList();
+        responseBody.getTopicList().add(defaultTopic);
+        setResponseBody(responseBody);
+        TopicList actual = mqClientAPI.getHasUnitSubTopicList(false, 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getTopicList().size());
+    }
+
+    @Test
+    public void assertGetHasUnitSubUnUnitTopicList() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        TopicList responseBody = new TopicList();
+        responseBody.getTopicList().add(defaultTopic);
+        setResponseBody(responseBody);
+        TopicList actual = mqClientAPI.getHasUnitSubUnUnitTopicList(false, 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getTopicList().size());
+    }
+
+    @Test
+    public void testCloneGroupOffset() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        mqClientAPI.cloneGroupOffset(defaultBrokerAddr, "", "", defaultTopic, 
false, defaultTimeout);
+    }
+
+    @Test
+    public void assertViewBrokerStatsData() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        BrokerStatsData responseBody = new BrokerStatsData();
+        responseBody.setStatsDay(new BrokerStatsItem());
+        setResponseBody(responseBody);
+        BrokerStatsData actual = 
mqClientAPI.viewBrokerStatsData(defaultBrokerAddr, "", "", defaultTimeout);
+        assertNotNull(actual);
+        assertNotNull(actual.getStatsDay());
+    }
+
+    @Test
+    public void assertGetClusterList() {
+        Set<String> actual = mqClientAPI.getClusterList(topic, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(0, actual.size());
+    }
+
+    @Test
+    public void assertFetchConsumeStatsInBroker() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        ConsumeStatsList responseBody = new ConsumeStatsList();
+        responseBody.setBrokerAddr(defaultBrokerAddr);
+        responseBody.getConsumeStatsList().add(new HashMap<>());
+        setResponseBody(responseBody);
+        ConsumeStatsList actual = 
mqClientAPI.fetchConsumeStatsInBroker(defaultBrokerAddr, false, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getConsumeStatsList().size());
+        assertEquals(defaultBrokerAddr, actual.getBrokerAddr());
+    }
+
+    @Test
+    public void assertGetAllSubscriptionGroupForSubscriptionGroupWrapper() 
throws RemotingException, InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        SubscriptionGroupWrapper responseBody = new SubscriptionGroupWrapper();
+        responseBody.getSubscriptionGroupTable().put("key", new 
SubscriptionGroupConfig());
+        setResponseBody(responseBody);
+        SubscriptionGroupWrapper actual = 
mqClientAPI.getAllSubscriptionGroup(defaultBrokerAddr, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getSubscriptionGroupTable().size());
+        assertNotNull(actual.getDataVersion());
+        assertEquals(0, actual.getDataVersion().getStateVersion());
+    }
+
+    @Test
+    public void assertGetAllSubscriptionGroupForSubscriptionGroupConfig() 
throws RemotingException, InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        SubscriptionGroupConfig responseBody = new SubscriptionGroupConfig();
+        responseBody.setGroupName(group);
+        responseBody.setBrokerId(MixAll.MASTER_ID);
+        setResponseBody(responseBody);
+        SubscriptionGroupConfig actual = 
mqClientAPI.getSubscriptionGroupConfig(defaultBrokerAddr, group, 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(group, actual.getGroupName());
+        assertEquals(MixAll.MASTER_ID, actual.getBrokerId());
+    }
+
+    @Test
+    public void assertGetAllTopicConfig() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        TopicConfigSerializeWrapper responseBody = new 
TopicConfigSerializeWrapper();
+        responseBody.getTopicConfigTable().put("key", new TopicConfig());
+        setResponseBody(responseBody);
+        TopicConfigSerializeWrapper actual = 
mqClientAPI.getAllTopicConfig(defaultBrokerAddr, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getTopicConfigTable().size());
+        assertNotNull(actual.getDataVersion());
+        assertEquals(0, actual.getDataVersion().getStateVersion());
+    }
+
+    @Test
+    public void testUpdateNameServerConfig() throws RemotingException, 
InterruptedException, MQClientException, UnsupportedEncodingException {
+        mockInvokeSync();
+        mqClientAPI.updateNameServerConfig(createProperties(), 
Collections.singletonList(defaultNsAddr), defaultTimeout);
+    }
+
+    @Test
+    public void assertGetNameServerConfig() throws RemotingException, 
InterruptedException, UnsupportedEncodingException, MQClientException {
+        mockInvokeSync();
+        setResponseBody("{\"key\":\"value\"}");
+        Map<String, Properties> actual = 
mqClientAPI.getNameServerConfig(Collections.singletonList(defaultNsAddr), 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.size());
+        assertTrue(actual.containsKey(defaultNsAddr));
+    }
+
+    @Test
+    public void assertQueryConsumeQueue() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        QueryConsumeQueueResponseBody responseBody = new 
QueryConsumeQueueResponseBody();
+        responseBody.setQueueData(Collections.singletonList(new 
ConsumeQueueData()));
+        setResponseBody(responseBody);
+        QueryConsumeQueueResponseBody actual = 
mqClientAPI.queryConsumeQueue(defaultBrokerAddr, defaultTopic, 1, 1, 1, group, 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1, actual.getQueueData().size());
+    }
+
+    @Test
+    public void testCheckClientInBroker() throws RemotingException, 
InterruptedException, MQClientException {
+        mockInvokeSync();
+        mqClientAPI.checkClientInBroker(defaultBrokerAddr, group, clientId, 
new SubscriptionData(), defaultTimeout);
+    }
+
+    @Test
+    public void assertGetTopicConfig() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        TopicConfigAndQueueMapping responseBody = new 
TopicConfigAndQueueMapping(new TopicConfig(), new TopicQueueMappingDetail());
+        setResponseBody(responseBody);
+        TopicConfigAndQueueMapping actual = 
mqClientAPI.getTopicConfig(defaultBrokerAddr, defaultTopic, defaultTimeout);
+        assertNotNull(actual);
+        assertNotNull(actual.getMappingDetail());
+    }
+
+    @Test
+    public void testCreateStaticTopic() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        mqClientAPI.createStaticTopic(defaultBrokerAddr, defaultTopic, new 
TopicConfig(), new TopicQueueMappingDetail(), false, defaultTimeout);
+    }
+
+    @Test
+    public void assertUpdateAndGetGroupForbidden() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        GroupForbidden responseBody = new GroupForbidden();
+        responseBody.setGroup(group);
+        responseBody.setTopic(defaultTopic);
+        setResponseBody(responseBody);
+        GroupForbidden actual = 
mqClientAPI.updateAndGetGroupForbidden(defaultBrokerAddr, new 
UpdateGroupForbiddenRequestHeader(), defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(group, actual.getGroup());
+        assertEquals(defaultTopic, actual.getTopic());
+    }
+
+    @Test
+    public void testResetMasterFlushOffset() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        mqClientAPI.resetMasterFlushOffset(defaultBrokerAddr, 1L);
+    }
+
+    @Test
+    public void assertGetBrokerHAStatus() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        HARuntimeInfo responseBody = new HARuntimeInfo();
+        responseBody.setMaster(true);
+        responseBody.setMasterCommitLogMaxOffset(1L);
+        setResponseBody(responseBody);
+        HARuntimeInfo actual = 
mqClientAPI.getBrokerHAStatus(defaultBrokerAddr, defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1L, actual.getMasterCommitLogMaxOffset());
+        assertTrue(actual.isMaster());
+    }
+
+    @Test
+    public void assertGetControllerMetaData() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        GetMetaDataResponseHeader responseHeader = new 
GetMetaDataResponseHeader();
+        responseHeader.setGroup(group);
+        responseHeader.setIsLeader(true);
+        setResponseHeader(responseHeader);
+        GetMetaDataResponseHeader actual = 
mqClientAPI.getControllerMetaData(defaultBrokerAddr);
+        assertNotNull(actual);
+        assertEquals(group, actual.getGroup());
+        assertTrue(actual.isLeader());
+    }
+
+    @Test
+    public void assertGetInSyncStateData() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        BrokerReplicasInfo responseBody = new BrokerReplicasInfo();
+        BrokerReplicasInfo.ReplicasInfo replicasInfo = new 
BrokerReplicasInfo.ReplicasInfo(MixAll.MASTER_ID, defaultBrokerAddr, 1, 1, 
Collections.emptyList(), Collections.emptyList());
+        responseBody.getReplicasInfoTable().put("key", replicasInfo);
+        GetMetaDataResponseHeader responseHeader = new 
GetMetaDataResponseHeader();
+        responseHeader.setControllerLeaderAddress(defaultBrokerAddr);
+        setResponseHeader(responseHeader);
+        setResponseBody(responseBody);
+        BrokerReplicasInfo actual = 
mqClientAPI.getInSyncStateData(defaultBrokerAddr, 
Collections.singletonList(defaultBrokerAddr));
+        assertNotNull(actual);
+        assertEquals(1L, actual.getReplicasInfoTable().size());
+    }
+
+    @Test
+    public void assertGetBrokerEpochCache() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        EpochEntryCache responseBody = new EpochEntryCache(clusterName, 
brokerName, MixAll.MASTER_ID, Collections.emptyList(), 1);
+        setResponseBody(responseBody);
+        EpochEntryCache actual = 
mqClientAPI.getBrokerEpochCache(defaultBrokerAddr);
+        assertNotNull(actual);
+        assertEquals(1L, actual.getMaxOffset());
+        assertEquals(MixAll.MASTER_ID, actual.getBrokerId());
+        assertEquals(brokerName, actual.getBrokerName());
+        assertEquals(clusterName, actual.getClusterName());
+    }
+
+    @Test
+    public void assertGetControllerConfig() throws RemotingException, 
InterruptedException, UnsupportedEncodingException, MQClientException {
+        mockInvokeSync();
+        setResponseBody("{\"key\":\"value\"}");
+        Map<String, Properties> actual = 
mqClientAPI.getControllerConfig(Collections.singletonList(defaultBrokerAddr), 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals(1L, actual.size());
+    }
+
+    @Test
+    public void testUpdateControllerConfig() throws RemotingException, 
InterruptedException, UnsupportedEncodingException, MQClientException {
+        mockInvokeSync();
+        mqClientAPI.updateControllerConfig(createProperties(), 
Collections.singletonList(defaultBrokerAddr), defaultTimeout);
+    }
+
+    @Test
+    public void assertElectMaster() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        BrokerMemberGroup responseBody = new BrokerMemberGroup();
+        setResponseBody(responseBody);
+        GetMetaDataResponseHeader getMetaDataResponseHeader = new 
GetMetaDataResponseHeader();
+        
getMetaDataResponseHeader.setControllerLeaderAddress(defaultBrokerAddr);
+        
when(response.decodeCommandCustomHeader(GetMetaDataResponseHeader.class)).thenReturn(getMetaDataResponseHeader);
+        ElectMasterResponseHeader responseHeader = new 
ElectMasterResponseHeader();
+        
when(response.decodeCommandCustomHeader(ElectMasterResponseHeader.class)).thenReturn(responseHeader);
+        Pair<ElectMasterResponseHeader, BrokerMemberGroup> actual = 
mqClientAPI.electMaster(defaultBrokerAddr, clusterName, brokerName, 
MixAll.MASTER_ID);
+        assertNotNull(actual);
+        assertEquals(responseHeader, actual.getObject1());
+        assertEquals(responseBody, actual.getObject2());
+    }
+
+    @Test
+    public void testCleanControllerBrokerData() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        GetMetaDataResponseHeader responseHeader = new 
GetMetaDataResponseHeader();
+        responseHeader.setControllerLeaderAddress(defaultBrokerAddr);
+        setResponseHeader(responseHeader);
+        mqClientAPI.cleanControllerBrokerData(defaultBrokerAddr, clusterName, 
brokerName, "", false);
+    }
+
+    @Test
+    public void testCreateUser() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        mqClientAPI.createUser(defaultBrokerAddr, new UserInfo(), 
defaultTimeout);
+    }
+
+    @Test
+    public void testUpdateUser() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        mqClientAPI.updateUser(defaultBrokerAddr, new UserInfo(), 
defaultTimeout);
+    }
+
+    @Test
+    public void testDeleteUser() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        mqClientAPI.deleteUser(defaultBrokerAddr, "", defaultTimeout);
+    }
+
+    @Test
+    public void assertGetUser() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        setResponseBody(createUserInfo());
+        UserInfo actual = mqClientAPI.getUser(defaultBrokerAddr, "", 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals("username", actual.getUsername());
+        assertEquals("password", actual.getPassword());
+        assertEquals("userStatus", actual.getUserStatus());
+        assertEquals("userType", actual.getUserType());
+    }
+
+    @Test
+    public void assertListUser() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        setResponseBody(Collections.singletonList(createUserInfo()));
+        List<UserInfo> actual = mqClientAPI.listUser(defaultBrokerAddr, "", 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals("username", actual.get(0).getUsername());
+        assertEquals("password", actual.get(0).getPassword());
+        assertEquals("userStatus", actual.get(0).getUserStatus());
+        assertEquals("userType", actual.get(0).getUserType());
+    }
+
+    @Test
+    public void testCreateAcl() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        mqClientAPI.createAcl(defaultBrokerAddr, new AclInfo(), 
defaultTimeout);
+    }
+
+    @Test
+    public void testUpdateAcl() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        mqClientAPI.updateAcl(defaultBrokerAddr, new AclInfo(), 
defaultTimeout);
+    }
+
+    @Test
+    public void testDeleteAcl() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        mqClientAPI.deleteAcl(defaultBrokerAddr, "", "", defaultTimeout);
+    }
+
+    @Test
+    public void assertGetAcl() throws RemotingException, InterruptedException, 
MQBrokerException {
+        mockInvokeSync();
+        setResponseBody(createAclInfo());
+        AclInfo actual = mqClientAPI.getAcl(defaultBrokerAddr, "", 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals("subject", actual.getSubject());
+        assertEquals(1, actual.getPolicies().size());
+    }
+
+    @Test
+    public void assertListAcl() throws RemotingException, 
InterruptedException, MQBrokerException {
+        mockInvokeSync();
+        setResponseBody(Collections.singletonList(createAclInfo()));
+        List<AclInfo> actual = mqClientAPI.listAcl(defaultBrokerAddr, "", "", 
defaultTimeout);
+        assertNotNull(actual);
+        assertEquals("subject", actual.get(0).getSubject());
+        assertEquals(1, actual.get(0).getPolicies().size());
+    }
+
+    private Properties createProperties() {
+        Properties result = new Properties();
+        result.put("key", "value");
+        return result;
+    }
+
+    private AclInfo createAclInfo() {
+        return AclInfo.of("subject", Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList(), "");
+    }
+
+    private UserInfo createUserInfo() {
+        UserInfo result = new UserInfo();
+        result.setUsername("username");
+        result.setPassword("password");
+        result.setUserStatus("userStatus");
+        result.setUserType("userType");
+        return result;
+    }
+
+    private void setResponseHeader(CommandCustomHeader responseHeader) throws 
RemotingCommandException {
+        
when(response.decodeCommandCustomHeader(any())).thenReturn(responseHeader);
+    }
+
+    private void setResponseBody(Object responseBody) {
+        
when(response.getBody()).thenReturn(RemotingSerializable.encode(responseBody));
+    }
+
+    private void mockInvokeSync() throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+        when(response.getCode()).thenReturn(ResponseCode.SUCCESS);
+        when(response.getVersion()).thenReturn(1);
+        when(remotingClient.invokeSync(any(), any(), 
anyLong())).thenReturn(response);
+        
when(remotingClient.getNameServerAddressList()).thenReturn(Collections.singletonList(defaultNsAddr));
+    }
+
+    private void setTopAddressing() throws NoSuchFieldException, 
IllegalAccessException {
+        TopAddressing topAddressing = mock(TopAddressing.class);
+        setField(mqClientAPI, "topAddressing", topAddressing);
+        when(topAddressing.fetchNSAddr()).thenReturn(defaultNsAddr);
+    }
+
+    private void setField(final Object target, final String fieldName, final 
Object newValue) throws NoSuchFieldException, IllegalAccessException {
+        Class<?> clazz = target.getClass();
+        Field field = clazz.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        field.set(target, newValue);
+    }
 }

Reply via email to