This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 10e8e6b8a [ISSUE #4487]The trackType is wrong when the consumer in
broadcasting subscription (#4981)
10e8e6b8a is described below
commit 10e8e6b8ae565095cfa6c70fa07857e526a20d0d
Author: zhangjidi2016 <[email protected]>
AuthorDate: Tue Sep 6 13:48:10 2022 +0800
[ISSUE #4487]The trackType is wrong when the consumer in broadcasting
subscription (#4981)
Co-authored-by: zhangjidi <[email protected]>
---
.../rocketmq/common/protocol/ResponseCode.java | 2 ++
.../tools/admin/DefaultMQAdminExtImpl.java | 24 +++++++++++--
.../apache/rocketmq/tools/admin/api/TrackType.java | 1 +
.../command/message/QueryMsgByIdSubCommand.java | 2 +-
.../tools/admin/DefaultMQAdminExtTest.java | 39 ++++++++++++++++++++++
5 files changed, 64 insertions(+), 4 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index ac8a286ee..65c367d3c 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -88,6 +88,8 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int BROKER_DISPATCH_NOT_COMPLETE = 212;
+ public static final int BROADCAST_CONSUMPTION = 213;
+
public static final int FLOW_CONTROL = 215;
public static final int NOT_LEADER_FOR_QUEUE = 501;
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index ebba9d96a..465555ba2 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -89,6 +89,7 @@ import
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHea
import
org.apache.rocketmq.common.protocol.header.UpdateGroupForbiddenRequestHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.ElectMasterResponseHeader;
import
org.apache.rocketmq.common.protocol.header.namesrv.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -467,7 +468,18 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
}
if (staticResult.getOffsetTable().isEmpty()) {
- throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE, "Not
found the consumer group consume stats, because return offset table is empty,
maybe the consumer not consume any message");
+ ConsumerConnection connection;
+ try {
+ connection = this.examineConsumerConnectionInfo(consumerGroup);
+ } catch (Exception e) {
+ throw new MQClientException(ResponseCode.CONSUMER_NOT_ONLINE,
+ "Not found the consumer group consume stats, because
return offset table is empty, maybe the consumer not online");
+ }
+
+ if
(connection.getMessageModel().equals(MessageModel.BROADCASTING)) {
+ throw new MQClientException(ResponseCode.BROADCAST_CONSUMPTION,
+ "Not found the consumer group consume stats, because
return offset table is empty, the consumer is under the broadcast mode");
+ }
}
return staticResult;
@@ -1337,15 +1349,21 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
} catch (MQClientException e) {
if (ResponseCode.CONSUMER_NOT_ONLINE ==
e.getResponseCode()) {
mt.setTrackType(TrackType.NOT_ONLINE);
+ mt.setExceptionDesc("CODE:" + e.getResponseCode()
+ " DESC:" + e.getErrorMessage());
+ }
+ if (ResponseCode.BROADCAST_CONSUMPTION ==
e.getResponseCode()) {
+ mt.setTrackType(TrackType.CONSUME_BROADCASTING);
}
- mt.setExceptionDesc("CODE:" + e.getResponseCode() + "
DESC:" + e.getErrorMessage());
result.add(mt);
continue;
} catch (MQBrokerException e) {
if (ResponseCode.CONSUMER_NOT_ONLINE ==
e.getResponseCode()) {
mt.setTrackType(TrackType.NOT_ONLINE);
+ mt.setExceptionDesc("CODE:" + e.getResponseCode()
+ " DESC:" + e.getErrorMessage());
+ }
+ if (ResponseCode.BROADCAST_CONSUMPTION ==
e.getResponseCode()) {
+ mt.setTrackType(TrackType.CONSUME_BROADCASTING);
}
- mt.setExceptionDesc("CODE:" + e.getResponseCode() + "
DESC:" + e.getErrorMessage());
result.add(mt);
continue;
} catch (Exception e) {
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java
index da2b48f4e..7d4445a47 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/api/TrackType.java
@@ -23,5 +23,6 @@ public enum TrackType {
PULL,
NOT_CONSUME_YET,
NOT_ONLINE,
+ CONSUME_BROADCASTING,
UNKNOWN
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
index 004cc4321..83b16d746 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
@@ -153,7 +153,7 @@ public class QueryMsgByIdSubCommand implements SubCommand {
} else {
System.out.printf("%n%n");
for (MessageTrack mt : mtdList) {
- System.out.printf("%s", mt);
+ System.out.printf("%s%n", mt);
}
}
} catch (Exception e) {
diff --git
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index c604f0c3a..cfbcc01fa 100644
---
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -44,6 +44,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
@@ -75,6 +76,7 @@ import
org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
+import org.apache.rocketmq.tools.admin.api.TrackType;
import org.assertj.core.util.Maps;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -88,6 +90,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -318,6 +321,29 @@ public class DefaultMQAdminExtTest {
public void testExamineConsumeStats() throws InterruptedException,
RemotingException, MQClientException, MQBrokerException {
ConsumeStats consumeStats =
defaultMQAdminExt.examineConsumeStats("default-consumer-group", "unit-test");
assertThat(consumeStats.getConsumeTps()).isGreaterThanOrEqualTo(1234);
+ ConsumerConnection connection = new ConsumerConnection();
+ connection.setMessageModel(MessageModel.BROADCASTING);
+ HashSet<Connection> connections = new HashSet<>();
+ connections.add(new Connection());
+ connection.setConnectionSet(connections);
+ when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(),
anyString(), anyLong()))
+ .thenReturn(new ConsumeStats());
+ when(mQClientAPIImpl.getConsumerConnectionList(anyString(),
anyString(), anyLong()))
+ .thenReturn(new ConsumerConnection()).thenReturn(connection);
+ // CONSUMER_NOT_ONLINE
+ try {
+ defaultMQAdminExt.examineConsumeStats("default-consumer-group",
"unit-test");
+ } catch (Exception e) {
+ assertThat(e instanceof MQClientException).isTrue();
+ assertThat(((MQClientException)
e).getResponseCode()).isEqualTo(ResponseCode.CONSUMER_NOT_ONLINE);
+ }
+ // BROADCAST_CONSUMPTION
+ try {
+ defaultMQAdminExt.examineConsumeStats("default-consumer-group",
"unit-test");
+ } catch (Exception e) {
+ assertThat(e instanceof MQClientException).isTrue();
+ assertThat(((MQClientException)
e).getResponseCode()).isEqualTo(ResponseCode.BROADCAST_CONSUMPTION);
+ }
}
@Test
@@ -426,6 +452,19 @@ public class DefaultMQAdminExtTest {
messageExt.setTopic("unit-test");
List<MessageTrack> messageTrackList =
defaultMQAdminExt.messageTrackDetail(messageExt);
assertThat(messageTrackList.size()).isEqualTo(2);
+
+ ConsumerConnection connection = new ConsumerConnection();
+ connection.setMessageModel(MessageModel.BROADCASTING);
+ connection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
+ HashSet<Connection> connections = new HashSet<>();
+ connections.add(new Connection());
+ connection.setConnectionSet(connections);
+ when(mQClientAPIImpl.getConsumerConnectionList(anyString(),
anyString(), anyLong())).thenReturn(connection);
+ ConsumeStats consumeStats = new ConsumeStats();
+ when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(),
isNull(), anyLong())).thenReturn(consumeStats);
+ List<MessageTrack> broadcastMessageTracks =
defaultMQAdminExt.messageTrackDetail(messageExt);
+ assertThat(broadcastMessageTracks.size()).isEqualTo(2);
+
assertThat(broadcastMessageTracks.get(0).getTrackType()).isEqualTo(TrackType.CONSUME_BROADCASTING);
}
@Test