This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2d1e3143c7 [ISSUE #8372] Add more test coverage for 
AdminBrokerProcessor
2d1e3143c7 is described below

commit 2d1e3143c7dc60aca805e6689c5364825e3020d4
Author: Tan Xiang <82364837+tanxiang...@users.noreply.github.com>
AuthorDate: Wed Jul 17 16:08:20 2024 +0800

    [ISSUE #8372] Add more test coverage for AdminBrokerProcessor
---
 .../broker/processor/AdminBrokerProcessor.java     |   2 +-
 .../broker/processor/AdminBrokerProcessorTest.java | 471 ++++++++++++++++++++-
 2 files changed, 454 insertions(+), 19 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 1b29ff173c..c5419a62df 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1941,7 +1941,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
 
         String content = 
this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager().encode();
-        if (content != null && content.length() > 0) {
+        if (content != null && !content.isEmpty()) {
             try {
                 response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
             } catch (UnsupportedEncodingException e) {
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index e66703e565..04324043fb 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -20,21 +20,6 @@ import com.alibaba.fastjson.JSON;
 import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.LongAdder;
 import org.apache.rocketmq.auth.authentication.enums.UserType;
 import 
org.apache.rocketmq.auth.authentication.manager.AuthenticationMetadataManager;
 import org.apache.rocketmq.auth.authentication.model.Subject;
@@ -45,8 +30,10 @@ import org.apache.rocketmq.auth.authorization.model.Acl;
 import org.apache.rocketmq.auth.authorization.model.Environment;
 import org.apache.rocketmq.auth.authorization.model.Resource;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.client.net.Broker2Client;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
 import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager;
@@ -55,11 +42,13 @@ import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.TopicQueueId;
 import org.apache.rocketmq.common.action.Action;
+import org.apache.rocketmq.common.constant.FIleReadaheadMode;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -67,13 +56,20 @@ import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.body.AclInfo;
+import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody;
+import org.apache.rocketmq.remoting.protocol.body.GroupList;
+import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
 import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
+import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody;
 import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.UserInfo;
 import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader;
@@ -82,8 +78,11 @@ import 
org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.DeleteAclRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
@@ -91,6 +90,13 @@ import 
org.apache.rocketmq.remoting.protocol.header.GetTopicConfigRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.NotifyMinBrokerIdChangeRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryCorrectionOffsetHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.ResetMasterFlushOffsetHeader;
+import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ResumeCheckHalfMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.UpdateAclRequestHeader;
@@ -98,12 +104,17 @@ import 
org.apache.rocketmq.remoting.protocol.header.UpdateUserRequestHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.store.CommitLog;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.logfile.DefaultMappedFile;
 import org.apache.rocketmq.store.stats.BrokerStats;
+import org.apache.rocketmq.store.timer.TimerCheckpoint;
+import org.apache.rocketmq.store.timer.TimerMessageStore;
+import org.apache.rocketmq.store.timer.TimerMetrics;
+import org.apache.rocketmq.store.util.LibC;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -112,6 +123,26 @@ import org.mockito.Mock;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.LongAdder;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
@@ -170,11 +201,32 @@ public class AdminBrokerProcessorTest {
     @Mock
     private AuthorizationMetadataManager authorizationMetadataManager;
 
+    @Mock
+    private TimerMessageStore timerMessageStore;
+
+    @Mock
+    private TimerMetrics timerMetrics;
+
+    @Mock
+    private MessageStoreConfig messageStoreConfig;
+
+    @Mock
+    private CommitLog commitLog;
+
+    @Mock
+    private Broker2Client broker2Client;
+
+    @Mock
+    private ClientChannelInfo clientChannelInfo;
+
     @Before
     public void init() throws Exception {
         brokerController.setMessageStore(messageStore);
         
brokerController.setAuthenticationMetadataManager(authenticationMetadataManager);
         
brokerController.setAuthorizationMetadataManager(authorizationMetadataManager);
+        Field field = BrokerController.class.getDeclaredField("broker2Client");
+        field.setAccessible(true);
+        field.set(brokerController, broker2Client);
 
         
//doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor();
 
@@ -280,6 +332,31 @@ public class AdminBrokerProcessorTest {
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
     }
 
+    @Test
+    public void testUpdateAndCreateTopicList() throws RemotingCommandException 
{
+        List<String> systemTopicList = new ArrayList<>(systemTopicSet);
+        RemotingCommand request = buildCreateTopicListRequest(systemTopicList);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+        assertThat(response.getRemark()).isEqualTo("The topic[" + 
systemTopicList.get(0) + "] is conflict with system topic.");
+
+        List<String> inValidTopicList = new ArrayList<>();
+        inValidTopicList.add("");
+        request = buildCreateTopicListRequest(inValidTopicList);
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+        List<String> topicList = new ArrayList<>();
+        topicList.add("TEST_CREATE_TOPIC");
+        topicList.add("TEST_CREATE_TOPIC1");
+        request = buildCreateTopicListRequest(topicList);
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        //test no changes
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
     @Test
     public void testDeleteTopicInRocksdb() throws Exception {
         if (notToBeExecuted()) {
@@ -815,7 +892,6 @@ public class AdminBrokerProcessorTest {
         request.setBody(JSON.toJSONBytes(aclInfo));
         RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
-
     }
 
     @Test
@@ -833,7 +909,6 @@ public class AdminBrokerProcessorTest {
         request.setBody(JSON.toJSONBytes(aclInfo));
         RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
         assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
-
     }
 
     @Test
@@ -893,6 +968,349 @@ public class AdminBrokerProcessorTest {
         
assertThat(aclInfoData.get(0).getPolicies().get(0).getEntries().get(0).getDecision()).isEqualTo("Allow");
     }
 
+    @Test
+    public void testGetTimeCheckPoint() throws RemotingCommandException {
+        when(this.brokerController.getTimerCheckpoint()).thenReturn(null);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_CHECK_POINT, null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+        assertThat(response.getRemark()).isEqualTo("The checkpoint is null");
+
+        when(this.brokerController.getTimerCheckpoint()).thenReturn(new 
TimerCheckpoint());
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+    @Test
+    public void testGetTimeMetrics() throws RemotingCommandException, 
IOException {
+        
when(this.brokerController.getMessageStore().getTimerMessageStore()).thenReturn(null);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_METRICS, null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+        
when(this.brokerController.getMessageStore().getTimerMessageStore()).thenReturn(timerMessageStore);
+        
when(this.timerMessageStore.getTimerMetrics()).thenReturn(timerMetrics);
+        when(this.timerMetrics.encode()).thenReturn(new 
TimerMetrics.TimerMetricsSerializeWrapper().toJson(false));
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testUpdateColdDataFlowCtrGroupConfig() throws 
RemotingCommandException {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_COLD_DATA_FLOW_CTR_CONFIG,
 null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        request.setBody("consumerGroup1=1".getBytes());
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        request.setBody("".getBytes());
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testRemoveColdDataFlowCtrGroupConfig() throws 
RemotingCommandException {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.REMOVE_COLD_DATA_FLOW_CTR_CONFIG,
 null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        request.setBody("consumerGroup1".getBytes());
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testGetColdDataFlowCtrInfo() throws RemotingCommandException {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_COLD_DATA_FLOW_CTR_INFO, 
null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testSetCommitLogReadAheadMode() throws 
RemotingCommandException {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.SET_COMMITLOG_READ_MODE, null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+        HashMap<String, String> extfields = new HashMap<>();
+        extfields.put(FIleReadaheadMode.READ_AHEAD_MODE, 
String.valueOf(LibC.MADV_DONTNEED));
+        request.setExtFields(extfields);
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+        extfields.clear();
+        extfields.put(FIleReadaheadMode.READ_AHEAD_MODE, 
String.valueOf(LibC.MADV_NORMAL));
+        request.setExtFields(extfields);
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        this.brokerController.setMessageStore(defaultMessageStore);
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+        
when(this.defaultMessageStore.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+        when(this.defaultMessageStore.getCommitLog()).thenReturn(commitLog);
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testGetUnknownCmdResponse() throws RemotingCommandException {
+        RemotingCommand request = RemotingCommand.createRequestCommand(10000, 
null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        
assertThat(response.getCode()).isEqualTo(ResponseCode.REQUEST_CODE_NOT_SUPPORTED);
+    }
+
+    @Test
+    public void testGetAllMessageRequestMode() throws RemotingCommandException 
{
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_MESSAGE_REQUEST_MODE, 
null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testResetOffset() throws RemotingCommandException {
+        ResetOffsetRequestHeader requestHeader =
+                createRequestHeader("topic","group",-1,false,-1,-1);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
+
+        
this.brokerController.getTopicConfigManager().getTopicConfigTable().put("topic",
 new TopicConfig("topic"));
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        
assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+
+        
this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().put("group",
 new SubscriptionGroupConfig());
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        requestHeader.setQueueId(0);
+        request = 
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        requestHeader.setOffset(2L);
+        request = 
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+    }
+
+    @Test
+    public void testGetConsumerStatus() throws RemotingCommandException {
+        GetConsumerStatusRequestHeader requestHeader = new 
GetConsumerStatusRequestHeader();
+        requestHeader.setGroup("group");
+        requestHeader.setTopic("topic");
+        requestHeader.setClientAddr("");
+        RemotingCommand request = RemotingCommand
+                
.createRequestCommand(RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS, 
requestHeader);
+        RemotingCommand responseCommand = 
RemotingCommand.createResponseCommand(null);
+        responseCommand.setCode(ResponseCode.SUCCESS);
+        
when(broker2Client.getConsumeStatus(anyString(),anyString(),anyString())).thenReturn(responseCommand);
+        request.makeCustomHeaderToNet();
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testQueryTopicConsumeByWho() throws RemotingCommandException {
+        QueryTopicConsumeByWhoRequestHeader requestHeader = new 
QueryTopicConsumeByWhoRequestHeader();
+        requestHeader.setTopic("topic");
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        HashSet<String> groups = new HashSet<>();
+        groups.add("group");
+        
when(brokerController.getConsumerManager()).thenReturn(consumerManager);
+        
when(consumerManager.queryTopicConsumeByWho(anyString())).thenReturn(groups);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        assertThat(RemotingSerializable.decode(response.getBody(), 
GroupList.class)
+                .getGroupList().contains("group"))
+                .isEqualTo(groups.contains("group"));
+    }
+
+    @Test
+    public void testQueryTopicByConsumer() throws RemotingCommandException {
+        QueryTopicsByConsumerRequestHeader requestHeader = new 
QueryTopicsByConsumerRequestHeader();
+        requestHeader.setGroup("group");
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPICS_BY_CONSUMER, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testQuerySubscriptionByConsumer() throws 
RemotingCommandException {
+        QuerySubscriptionByConsumerRequestHeader requestHeader = new 
QuerySubscriptionByConsumerRequestHeader();
+        requestHeader.setGroup("group");
+        requestHeader.setTopic("topic");
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_SUBSCRIPTION_BY_CONSUMER,
 requestHeader);
+        request.makeCustomHeaderToNet();
+        
when(brokerController.getConsumerManager()).thenReturn(consumerManager);
+        
when(consumerManager.findSubscriptionData(anyString(),anyString())).thenReturn(null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testGetSystemTopicListFromBroker() throws 
RemotingCommandException {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER,
 null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testCleanExpiredConsumeQueue() throws RemotingCommandException 
{
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, 
null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testDeleteExpiredCommitLog() throws RemotingCommandException {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_EXPIRED_COMMITLOG, 
null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testCleanUnusedTopic() throws RemotingCommandException {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testGetConsumerRunningInfo() throws RemotingCommandException, 
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+        
when(brokerController.getConsumerManager()).thenReturn(consumerManager);
+        
when(consumerManager.findChannel(anyString(),anyString())).thenReturn(null);
+        GetConsumerRunningInfoRequestHeader requestHeader = new 
GetConsumerRunningInfoRequestHeader();
+        requestHeader.setClientId("client");
+        requestHeader.setConsumerGroup("group");
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+        
when(consumerManager.findChannel(anyString(),anyString())).thenReturn(clientChannelInfo);
+        
when(clientChannelInfo.getVersion()).thenReturn(MQVersion.Version.V3_0_0_SNAPSHOT.ordinal());
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+        
when(clientChannelInfo.getVersion()).thenReturn(MQVersion.Version.V5_2_3.ordinal());
+        when(brokerController.getBroker2Client()).thenReturn(broker2Client);
+        when(clientChannelInfo.getChannel()).thenReturn(channel);
+        RemotingCommand responseCommand = 
RemotingCommand.createResponseCommand(null);
+        responseCommand.setCode(ResponseCode.SUCCESS);
+        
when(broker2Client.callClient(any(Channel.class),any(RemotingCommand.class))).thenReturn(responseCommand);
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        
when(broker2Client.callClient(any(Channel.class),any(RemotingCommand.class))).thenThrow(new
 RemotingTimeoutException("timeout"));
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        
assertThat(response.getCode()).isEqualTo(ResponseCode.CONSUME_MSG_TIMEOUT);
+    }
+
+    @Test
+    public void testQueryCorrectionOffset() throws RemotingCommandException {
+        Map<Integer, Long> correctionOffsetMap = new HashMap<>();
+        correctionOffsetMap.put(0, 100L);
+        correctionOffsetMap.put(1, 200L);
+        Map<Integer, Long> compareOffsetMap = new HashMap<>();
+        compareOffsetMap.put(0, 80L);
+        compareOffsetMap.put(1, 300L);
+        
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+        
when(consumerOffsetManager.queryMinOffsetInAllGroup(anyString(),anyString())).thenReturn(correctionOffsetMap);
+        
when(consumerOffsetManager.queryOffset(anyString(),anyString())).thenReturn(compareOffsetMap);
+        QueryCorrectionOffsetHeader queryCorrectionOffsetHeader = new 
QueryCorrectionOffsetHeader();
+        queryCorrectionOffsetHeader.setTopic("topic");
+        queryCorrectionOffsetHeader.setCompareGroup("group");
+        queryCorrectionOffsetHeader.setFilterGroups("");
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CORRECTION_OFFSET, 
queryCorrectionOffsetHeader);
+        request.makeCustomHeaderToNet();
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        QueryCorrectionOffsetBody body = 
RemotingSerializable.decode(response.getBody(), 
QueryCorrectionOffsetBody.class);
+        Map<Integer, Long> correctionOffsets = body.getCorrectionOffsets();
+        assertThat(correctionOffsets.get(0)).isEqualTo(Long.MAX_VALUE);
+        assertThat(correctionOffsets.get(1)).isEqualTo(200L);
+    }
+
+    @Test
+    public void testNotifyMinBrokerIdChange() throws RemotingCommandException {
+        NotifyMinBrokerIdChangeRequestHeader requestHeader = new 
NotifyMinBrokerIdChangeRequestHeader();
+        requestHeader.setMinBrokerId(1L);
+        requestHeader.setMinBrokerAddr("127.0.0.1:10912");
+        requestHeader.setOfflineBrokerAddr("127.0.0.1:10911");
+        requestHeader.setHaBrokerAddr("");
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_MIN_BROKER_ID_CHANGE, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testUpdateBrokerHaInfo() throws RemotingCommandException {
+        ExchangeHAInfoResponseHeader requestHeader = new 
ExchangeHAInfoResponseHeader();
+        requestHeader.setMasterAddress("127.0.0.1:10911");
+        requestHeader.setMasterFlushOffset(0L);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.EXCHANGE_BROKER_HA_INFO, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        when(brokerController.getMessageStore()).thenReturn(messageStore);
+        requestHeader.setMasterHaAddress("127.0.0.1:10912");
+        request = 
RemotingCommand.createRequestCommand(RequestCode.EXCHANGE_BROKER_HA_INFO, 
requestHeader);
+        request.makeCustomHeaderToNet();
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        when(messageStore.getMasterFlushedOffset()).thenReturn(0L);
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testGetBrokerHaStatus() throws RemotingCommandException {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_HA_STATUS,null);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+
+        when(brokerController.getMessageStore()).thenReturn(messageStore);
+        when(messageStore.getHARuntimeInfo()).thenReturn(new HARuntimeInfo());
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    @Test
+    public void testResetMasterFlushOffset() throws RemotingCommandException {
+        ResetMasterFlushOffsetHeader requestHeader = new 
ResetMasterFlushOffsetHeader();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.RESET_MASTER_FLUSH_OFFSET,requestHeader);
+        RemotingCommand response = 
adminBrokerProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        requestHeader.setMasterFlushOffset(0L);
+        request.makeCustomHeaderToNet();
+        response = adminBrokerProcessor.processRequest(handlerContext, 
request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    private ResetOffsetRequestHeader createRequestHeader(String topic,String 
group,long timestamp,boolean force,long offset,int queueId) {
+        ResetOffsetRequestHeader requestHeader = new 
ResetOffsetRequestHeader();
+        requestHeader.setTopic(topic);
+        requestHeader.setGroup(group);
+        requestHeader.setTimestamp(timestamp);
+        requestHeader.setForce(force);
+        requestHeader.setOffset(offset);
+        requestHeader.setQueueId(queueId);
+        return requestHeader;
+    }
+
     private RemotingCommand buildCreateTopicRequest(String topic) {
         CreateTopicRequestHeader requestHeader = new 
CreateTopicRequestHeader();
         requestHeader.setTopic(topic);
@@ -900,12 +1318,29 @@ public class AdminBrokerProcessorTest {
         requestHeader.setReadQueueNums(8);
         requestHeader.setWriteQueueNums(8);
         requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
-
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, 
requestHeader);
         request.makeCustomHeaderToNet();
         return request;
     }
 
+    private RemotingCommand buildCreateTopicListRequest(List<String> 
topicList) {
+        List<TopicConfig> topicConfigList = new ArrayList<>();
+        for (String topic:topicList) {
+            TopicConfig topicConfig = new TopicConfig(topic);
+            topicConfig.setReadQueueNums(8);
+            topicConfig.setWriteQueueNums(8);
+            topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG);
+            topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
+            topicConfig.setTopicSysFlag(0);
+            topicConfig.setOrder(false);
+            topicConfigList.add(topicConfig);
+        }
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC_LIST, 
null);
+        CreateTopicListRequestBody createTopicListRequestBody = new 
CreateTopicListRequestBody(topicConfigList);
+        request.setBody(createTopicListRequestBody.encode());
+        return request;
+    }
+
     private RemotingCommand buildDeleteTopicRequest(String topic) {
         DeleteTopicRequestHeader requestHeader = new 
DeleteTopicRequestHeader();
         requestHeader.setTopic(topic);


Reply via email to