This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 10e8e6b8a [ISSUE #4487]The trackType is wrong when the consumer in 
broadcasting subscription (#4981)
10e8e6b8a is described below

commit 10e8e6b8ae565095cfa6c70fa07857e526a20d0d
Author: zhangjidi2016 <[email protected]>
AuthorDate: Tue Sep 6 13:48:10 2022 +0800

    [ISSUE #4487]The trackType is wrong when the consumer in broadcasting 
subscription (#4981)
    
    Co-authored-by: zhangjidi <[email protected]>
---
 .../rocketmq/common/protocol/ResponseCode.java     |  2 ++
 .../tools/admin/DefaultMQAdminExtImpl.java         | 24 +++++++++++--
 .../apache/rocketmq/tools/admin/api/TrackType.java |  1 +
 .../command/message/QueryMsgByIdSubCommand.java    |  2 +-
 .../tools/admin/DefaultMQAdminExtTest.java         | 39 ++++++++++++++++++++++
 5 files changed, 64 insertions(+), 4 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java 
b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index ac8a286ee..65c367d3c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -88,6 +88,8 @@ public class ResponseCode extends RemotingSysResponseCode {
 
     public static final int BROKER_DISPATCH_NOT_COMPLETE = 212;
 
+    public static final int BROADCAST_CONSUMPTION = 213;
+
     public static final int FLOW_CONTROL = 215;
 
     public static final int NOT_LEADER_FOR_QUEUE = 501;
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index ebba9d96a..465555ba2 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -89,6 +89,7 @@ import 
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHea
 import 
org.apache.rocketmq.common.protocol.header.UpdateGroupForbiddenRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -467,7 +468,18 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, 
MQAdminExtInner {
         }
 
         if (staticResult.getOffsetTable().isEmpty()) {
-            throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not 
found the consumer group consume stats, because return offset table is empty, 
maybe the consumer not consume any message");
+            ConsumerConnection connection;
+            try {
+                connection = this.examineConsumerConnectionInfo(consumerGroup);
+            } catch (Exception e) {
+                throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE,
+                    "Not found the consumer group consume stats, because 
return offset table is empty, maybe the consumer not online");
+            }
+
+            if 
(connection.getMessageModel().equals(MessageModel.BROADCASTING)) {
+                throw new MQClientException(ResponseCode.BROADCAST_CONSUMPTION,
+                    "Not found the consumer group consume stats, because 
return offset table is empty, the consumer is under the broadcast mode");
+            }
         }
 
         return staticResult;
@@ -1337,15 +1349,21 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
                     } catch (MQClientException e) {
                         if (ResponseCode.CONSUMER_NOT_ONLINE == 
e.getResponseCode()) {
                             mt.setTrackType(TrackType.NOT_ONLINE);
+                            mt.setExceptionDesc("CODE:" + e.getResponseCode() 
+ " DESC:" + e.getErrorMessage());
+                        }
+                        if (ResponseCode.BROADCAST_CONSUMPTION == 
e.getResponseCode()) {
+                            mt.setTrackType(TrackType.CONSUME_BROADCASTING);
                         }
-                        mt.setExceptionDesc("CODE:" + e.getResponseCode() + " 
DESC:" + e.getErrorMessage());
                         result.add(mt);
                         continue;
                     } catch (MQBrokerException e) {
                         if (ResponseCode.CONSUMER_NOT_ONLINE == 
e.getResponseCode()) {
                             mt.setTrackType(TrackType.NOT_ONLINE);
+                            mt.setExceptionDesc("CODE:" + e.getResponseCode() 
+ " DESC:" + e.getErrorMessage());
+                        }
+                        if (ResponseCode.BROADCAST_CONSUMPTION == 
e.getResponseCode()) {
+                            mt.setTrackType(TrackType.CONSUME_BROADCASTING);
                         }
-                        mt.setExceptionDesc("CODE:" + e.getResponseCode() + " 
DESC:" + e.getErrorMessage());
                         result.add(mt);
                         continue;
                     } catch (Exception e) {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java
index da2b48f4e..7d4445a47 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java
@@ -23,5 +23,6 @@ public enum TrackType {
     PULL,
     NOT_CONSUME_YET,
     NOT_ONLINE,
+    CONSUME_BROADCASTING,
     UNKNOWN
 }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
index 004cc4321..83b16d746 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
@@ -153,7 +153,7 @@ public class QueryMsgByIdSubCommand implements SubCommand {
             } else {
                 System.out.printf("%n%n");
                 for (MessageTrack mt : mtdList) {
-                    System.out.printf("%s", mt);
+                    System.out.printf("%s%n", mt);
                 }
             }
         } catch (Exception e) {
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index c604f0c3a..cfbcc01fa 100644
--- 
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.namesrv.NamesrvUtil;
+import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.Connection;
 import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
@@ -75,6 +76,7 @@ import 
org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.tools.admin.api.MessageTrack;
+import org.apache.rocketmq.tools.admin.api.TrackType;
 import org.assertj.core.util.Maps;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -88,6 +90,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -318,6 +321,29 @@ public class DefaultMQAdminExtTest {
     public void testExamineConsumeStats() throws InterruptedException, 
RemotingException, MQClientException, MQBrokerException {
         ConsumeStats consumeStats = 
defaultMQAdminExt.examineConsumeStats("default-consumer-group", "unit-test");
         assertThat(consumeStats.getConsumeTps()).isGreaterThanOrEqualTo(1234);
+        ConsumerConnection connection = new ConsumerConnection();
+        connection.setMessageModel(MessageModel.BROADCASTING);
+        HashSet<Connection> connections = new HashSet<>();
+        connections.add(new Connection());
+        connection.setConnectionSet(connections);
+        when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), 
anyString(), anyLong()))
+            .thenReturn(new ConsumeStats());
+        when(mQClientAPIImpl.getConsumerConnectionList(anyString(), 
anyString(), anyLong()))
+            .thenReturn(new ConsumerConnection()).thenReturn(connection);
+        // CONSUMER_NOT_ONLINE
+        try {
+            defaultMQAdminExt.examineConsumeStats("default-consumer-group", 
"unit-test");
+        } catch (Exception e) {
+            assertThat(e instanceof MQClientException).isTrue();
+            assertThat(((MQClientException) 
e).getResponseCode()).isEqualTo(ResponseCode.CONSUMER_NOT_ONLINE);
+        }
+        // BROADCAST_CONSUMPTION
+        try {
+            defaultMQAdminExt.examineConsumeStats("default-consumer-group", 
"unit-test");
+        } catch (Exception e) {
+            assertThat(e instanceof MQClientException).isTrue();
+            assertThat(((MQClientException) 
e).getResponseCode()).isEqualTo(ResponseCode.BROADCAST_CONSUMPTION);
+        }
     }
 
     @Test
@@ -426,6 +452,19 @@ public class DefaultMQAdminExtTest {
         messageExt.setTopic("unit-test");
         List<MessageTrack> messageTrackList = 
defaultMQAdminExt.messageTrackDetail(messageExt);
         assertThat(messageTrackList.size()).isEqualTo(2);
+
+        ConsumerConnection connection = new ConsumerConnection();
+        connection.setMessageModel(MessageModel.BROADCASTING);
+        connection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
+        HashSet<Connection> connections = new HashSet<>();
+        connections.add(new Connection());
+        connection.setConnectionSet(connections);
+        when(mQClientAPIImpl.getConsumerConnectionList(anyString(), 
anyString(), anyLong())).thenReturn(connection);
+        ConsumeStats consumeStats = new ConsumeStats();
+        when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), 
isNull(), anyLong())).thenReturn(consumeStats);
+        List<MessageTrack> broadcastMessageTracks = 
defaultMQAdminExt.messageTrackDetail(messageExt);
+        assertThat(broadcastMessageTracks.size()).isEqualTo(2);
+        
assertThat(broadcastMessageTracks.get(0).getTrackType()).isEqualTo(TrackType.CONSUME_BROADCASTING);
     }
 
     @Test

Reply via email to