This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d5b474d9d3 [ISSUE #9297] Supports outputting topic put TPS in
TopicStatusSubCommand (#9298)
d5b474d9d3 is described below
commit d5b474d9d326f380d736abe700f8a7a37fe364e2
Author: ymwneu <[email protected]>
AuthorDate: Wed Apr 2 18:59:45 2025 +0800
[ISSUE #9297] Supports outputting topic put TPS in TopicStatusSubCommand
(#9298)
---
.../apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 1 +
.../rocketmq/remoting/protocol/admin/TopicStatsTable.java | 10 ++++++++++
.../org/apache/rocketmq/store/stats/BrokerStatsManager.java | 4 ++++
.../org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java | 1 +
.../rocketmq/tools/command/topic/TopicStatusSubCommand.java | 2 ++
5 files changed, 18 insertions(+)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 4ff4bed814..4200f34bde 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1837,6 +1837,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
topicStatsTable.getOffsetTable().put(mq, topicOffset);
}
+
topicStatsTable.setTopicPutTps(this.brokerController.getBrokerStatsManager().tpsTopicPutNums(requestHeader.getTopic()));
byte[] body = topicStatsTable.encode();
response.setBody(body);
response.setCode(ResponseCode.SUCCESS);
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java
index 9f467e7449..5cb2af6373 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/admin/TopicStatsTable.java
@@ -22,6 +22,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class TopicStatsTable extends RemotingSerializable {
+ private double topicPutTps;
+
private Map<MessageQueue, TopicOffset> offsetTable = new
ConcurrentHashMap<>();
public Map<MessageQueue, TopicOffset> getOffsetTable() {
@@ -31,4 +33,12 @@ public class TopicStatsTable extends RemotingSerializable {
public void setOffsetTable(Map<MessageQueue, TopicOffset> offsetTable) {
this.offsetTable = offsetTable;
}
+
+ public double getTopicPutTps() {
+ return topicPutTps;
+ }
+
+ public void setTopicPutTps(double topicPutTps) {
+ this.topicPutTps = topicPutTps;
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 530339c23b..c272a30234 100644
---
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -595,6 +595,10 @@ public class BrokerStatsManager {
this.statsTable.get(Stats.SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1);
}
+ public double tpsTopicPutNums(final String topic) {
+ return
this.statsTable.get(TOPIC_PUT_NUMS).getStatsDataInMinute(topic).getTps();
+ }
+
public double tpsGroupGetNums(final String group, final String topic) {
final String statsKey = buildStatsKey(topic, group);
return
this.statsTable.get(Stats.GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps();
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 9afd37f784..e6405cb2d9 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -367,6 +367,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt,
MQAdminExtInner {
if (addr != null) {
TopicStatsTable tst =
mqClientInstance.getMQClientAPIImpl().getTopicStatsInfo(addr, topic,
timeoutMillis);
topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
+
topicStatsTable.setTopicPutTps(topicStatsTable.getTopicPutTps() +
tst.getTopicPutTps());
}
} catch (Exception e) {
logger.error("getTopicStatsInfo error.
topic={}", topic, e);
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
index 47ca761d1f..ff91399f1c 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicStatusSubCommand.java
@@ -113,6 +113,8 @@ public class TopicStatusSubCommand implements SubCommand {
humanTimestamp
);
}
+ System.out.printf("%n");
+ System.out.printf("Topic Put TPS: %s%n",
topicStatsTable.getTopicPutTps());
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + "
command failed", e);
} finally {