lizhimins commented on code in PR #10198:
URL: https://github.com/apache/rocketmq/pull/10198#discussion_r2985574647


##########
remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java:
##########
@@ -30,46 +30,95 @@
 @ChannelHandler.Sharable
 public class RemotingCodeDistributionHandler extends ChannelDuplexHandler {
 
-    private final ConcurrentMap<Integer, LongAdder> inboundDistribution;
-    private final ConcurrentMap<Integer, LongAdder> outboundDistribution;
+    private final ConcurrentMap<Integer, TrafficStats> inboundStats;
+    private final ConcurrentMap<Integer, TrafficStats> outboundStats;
+    private final NettyServerConfig nettyServerConfig;
 
-    public RemotingCodeDistributionHandler() {
-        inboundDistribution = new ConcurrentHashMap<>();
-        outboundDistribution = new ConcurrentHashMap<>();
+    public RemotingCodeDistributionHandler(NettyServerConfig 
nettyServerConfig) {
+        this.inboundStats = new ConcurrentHashMap<>();
+        this.outboundStats = new ConcurrentHashMap<>();
+        this.nettyServerConfig = nettyServerConfig;
     }
 
-    private void countInbound(int requestCode) {
-        LongAdder item = inboundDistribution.computeIfAbsent(requestCode, k -> 
new LongAdder());
-        item.increment();
+    void recordInbound(RemotingCommand cmd) {
+        TrafficStats stats = inboundStats.computeIfAbsent(cmd.getCode(), k -> 
new TrafficStats());
+        stats.count.increment();
+        stats.trafficSize.add(calcCommandSize(cmd));
     }
 
-    private void countOutbound(int responseCode) {
-        LongAdder item = outboundDistribution.computeIfAbsent(responseCode, k 
-> new LongAdder());
-        item.increment();
+    void recordOutbound(RemotingCommand cmd) {
+        TrafficStats stats = outboundStats.computeIfAbsent(cmd.getCode(), k -> 
new TrafficStats());
+        stats.count.increment();
+        stats.trafficSize.add(calcCommandSize(cmd));
+    }
+
+    /**
+     * Protocol fixed overhead in bytes:
+     * <pre>
+     * frameHeader:  totalLen(4) + headerLenMark(4) = 8
+     * fixedHeader:  code(2) + language(1) + version(2) + opaque(4) + flag(4)
+     *            + remarkLenPrefix(4) + extFieldsLenPrefix(4) = 21
+     * </pre>
+     */
+    static final int FIXED_OVERHEAD = 4 + 4 + 2 + 1 + 2 + 4 + 4 + 4 + 4;
+
+    private int calcCommandSize(RemotingCommand cmd) {

Review Comment:
   Use the length of bytebuf directly, no need recalculate the length after 
deserialization in handler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to