This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new b511e0977 [ISSUE #5095] [Remoting-A] Support logging rpc distribution 
in remoting protocol (#5114)
b511e0977 is described below

commit b511e097720bc0e5a05ccd7bac10d488918e3b6d
Author: lizhimins <[email protected]>
AuthorDate: Mon Sep 19 17:07:49 2022 +0800

    [ISSUE #5095] [Remoting-A] Support logging rpc distribution in remoting 
protocol (#5114)
    
    * [ISSUE #5087] [Remoting-A] Support logging rpc distribution in remoting 
protocol
    
    * [ISSUE #5087] [Remoting-A] Support logging rpc distribution in remoting 
protocol
    
    * [ISSUE #5087] [Remoting-A] Support logging rpc distribution in remoting 
protocol
    
    * [ISSUE #5087] [Remoting-A] Support logging rpc distribution in remoting 
protocol
    
    * [ISSUE #5087] [Remoting-A] Support logging rpc distribution in remoting 
protocol
    
    Co-authored-by: 斜阳 <[email protected]>
---
 distribution/conf/logback_broker.xml               |  26 ++++++
 distribution/conf/logback_namesrv.xml              |  26 ++++++
 remoting/pom.xml                                   |   5 +-
 .../rocketmq/remoting/common/RemotingHelper.java   |   1 +
 .../remoting/netty/NettyRemotingServer.java        |  38 +++++++-
 .../netty/RemotingCodeDistributionHandler.java     | 100 +++++++++++++++++++++
 .../netty/RemotingCodeDistributionHandlerTest.java |  80 +++++++++++++++++
 7 files changed, 274 insertions(+), 2 deletions(-)

diff --git a/distribution/conf/logback_broker.xml 
b/distribution/conf/logback_broker.xml
index 1186d7fb8..3daa0b2f2 100644
--- a/distribution/conf/logback_broker.xml
+++ b/distribution/conf/logback_broker.xml
@@ -119,6 +119,27 @@
         <appender-ref ref="RocketmqStoreAppender_inner"/>
     </appender>
 
+    <appender name="RocketmqTrafficAppender_inner"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${user.home}/logs/rocketmqlogs/broker_traffic.log</file>
+        <append>true</append>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            
<fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/broker_traffic.%i.log.gz</fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>10</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy 
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>100MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+    <appender name="RocketmqTrafficAppender" 
class="ch.qos.logback.classic.AsyncAppender">
+        <appender-ref ref="RocketmqTrafficAppender_inner"/>
+    </appender>
+
     <appender name="RocketmqRemotingAppender_inner"
               class="ch.qos.logback.core.rolling.RollingFileAppender">
         
<file>${user.home}/logs/rocketmqlogs/${brokerLogDir}/remoting.log</file>
@@ -359,6 +380,11 @@
         <appender-ref ref="RocketmqPopAppender" />
     </logger>
 
+    <logger name="RocketmqTraffic" additivity="false">
+        <level value="INFO" />
+        <appender-ref ref="RocketmqTrafficAppender" />
+    </logger>
+
     <root>
         <level value="INFO"/>
         <appender-ref ref="DefaultAppender"/>
diff --git a/distribution/conf/logback_namesrv.xml 
b/distribution/conf/logback_namesrv.xml
index b0f5eca55..f8e0c59ac 100644
--- a/distribution/conf/logback_namesrv.xml
+++ b/distribution/conf/logback_namesrv.xml
@@ -59,6 +59,27 @@
         <discardingThreshold>0</discardingThreshold>
     </appender>
 
+    <appender name="RocketmqTrafficAppender_inner"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${user.home}/logs/rocketmqlogs/namesrv_traffic.log</file>
+        <append>true</append>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            
<fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/namesrv_traffic.%i.log.gz</fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>10</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy 
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>100MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+    <appender name="RocketmqTrafficAppender" 
class="ch.qos.logback.classic.AsyncAppender">
+        <appender-ref ref="RocketmqTrafficAppender_inner"/>
+    </appender>
+
     <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
         <append>true</append>
         <encoder>
@@ -92,6 +113,11 @@
         <appender-ref ref="RocketmqNamesrvAppender"/>
     </logger>
 
+    <logger name="RocketmqTraffic" additivity="false">
+        <level value="INFO" />
+        <appender-ref ref="RocketmqTrafficAppender" />
+    </logger>
+
     <root>
         <level value="INFO"/>
         <appender-ref ref="DefaultAppender"/>
diff --git a/remoting/pom.xml b/remoting/pom.xml
index f567d84ea..84595ac7a 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -44,7 +44,10 @@
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-logging</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 4c8a62a44..b8b180611 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -34,6 +34,7 @@ import 
org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class RemotingHelper {
+    public static final String ROCKETMQ_TRAFFIC = "RocketmqTraffic";
     public static final String ROCKETMQ_REMOTING = "RocketmqRemoting";
     public static final String DEFAULT_CHARSET = "UTF-8";
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index e9d5c0dc2..a80434545 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -49,7 +49,10 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -69,12 +72,16 @@ import 
org.apache.rocketmq.remoting.protocol.RemotingCommand;
 @SuppressWarnings("NullableProblems")
 public class NettyRemotingServer extends NettyRemotingAbstract implements 
RemotingServer {
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+    private static final InternalLogger TRAFFIC_LOGGER =
+        InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_TRAFFIC);
+
     private final ServerBootstrap serverBootstrap;
     private final EventLoopGroup eventLoopGroupSelector;
     private final EventLoopGroup eventLoopGroupBoss;
     private final NettyServerConfig nettyServerConfig;
 
     private final ExecutorService publicExecutor;
+    private final ScheduledExecutorService scheduledExecutorService;
     private final ChannelEventListener channelEventListener;
 
     private final Timer timer = new Timer("ServerHouseKeepingService", true);
@@ -95,6 +102,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
     private NettyEncoder encoder;
     private NettyConnectManageHandler connectionManageHandler;
     private NettyServerHandler serverHandler;
+    private RemotingCodeDistributionHandler distributionHandler;
 
     public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
         this(nettyServerConfig, null);
@@ -108,9 +116,9 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         this.channelEventListener = channelEventListener;
 
         this.publicExecutor = buildPublicExecutor(nettyServerConfig);
+        this.scheduledExecutorService = buildScheduleExecutor();
 
         this.eventLoopGroupBoss = buildBossEventLoopGroup();
-
         this.eventLoopGroupSelector = buildEventLoopGroupSelector();
 
         loadSslContext();
@@ -178,6 +186,15 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         });
     }
 
+    private ScheduledExecutorService buildScheduleExecutor() {
+        return new ScheduledThreadPoolExecutor(1,
+            r -> {
+                Thread thread = new Thread(r, "NettyServerScheduler");
+                thread.setDaemon(true);
+                return thread;
+            }, new ThreadPoolExecutor.DiscardOldestPolicy());
+    }
+
     public void loadSslContext() {
         TlsMode tlsMode = TlsSystemConfig.tlsMode;
         log.info("Server is running in TLS {} mode", tlsMode.getName());
@@ -230,6 +247,7 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                         .addLast(defaultEventExecutorGroup,
                             encoder,
                             new NettyDecoder(),
+                            distributionHandler,
                             new IdleStateHandler(0, 0,
                                 
nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                             connectionManageHandler,
@@ -269,6 +287,14 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
                 }
             }
         }, 1000 * 3, 1000);
+
+        scheduledExecutorService.scheduleWithFixedDelay(() -> {
+            try {
+                NettyRemotingServer.this.printRemotingCodeDistribution();
+            } catch (Throwable e) {
+                TRAFFIC_LOGGER.error("NettyRemotingServer print remoting code 
distribution exception", e);
+            }
+        }, 1, 1, TimeUnit.SECONDS);
     }
 
     private void addCustomConfig(ServerBootstrap childHandler) {
@@ -400,6 +426,16 @@ public class NettyRemotingServer extends 
NettyRemotingAbstract implements Remoti
         encoder = new NettyEncoder();
         connectionManageHandler = new NettyConnectManageHandler();
         serverHandler = new NettyServerHandler();
+        distributionHandler = new RemotingCodeDistributionHandler();
+    }
+
+    private void printRemotingCodeDistribution() {
+        if (distributionHandler != null) {
+            TRAFFIC_LOGGER.info("Port: {}, RequestCode Distribution: {}",
+                nettyServerConfig.getListenPort(), 
distributionHandler.getInBoundSnapshotString());
+            TRAFFIC_LOGGER.info("Port: {}, ResponseCode Distribution: {}",
+                nettyServerConfig.getListenPort(), 
distributionHandler.getOutBoundSnapshotString());
+        }
     }
 
     @ChannelHandler.Sharable
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java
new file mode 100644
index 000000000..598628b85
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+
[email protected]
+public class RemotingCodeDistributionHandler extends ChannelDuplexHandler {
+
+    private final ConcurrentMap<Integer, LongAdder> inboundDistribution;
+    private final ConcurrentMap<Integer, LongAdder> outboundDistribution;
+
+    public RemotingCodeDistributionHandler() {
+        inboundDistribution = new ConcurrentHashMap<>();
+        outboundDistribution = new ConcurrentHashMap<>();
+    }
+
+    private void countInbound(int requestCode) {
+        LongAdder item = inboundDistribution.computeIfAbsent(requestCode, k -> 
new LongAdder());
+        item.increment();
+    }
+
+    private void countOutbound(int responseCode) {
+        LongAdder item = outboundDistribution.computeIfAbsent(responseCode, k 
-> new LongAdder());
+        item.increment();
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+        if (msg instanceof RemotingCommand) {
+            RemotingCommand cmd = (RemotingCommand) msg;
+            countInbound(cmd.getCode());
+        }
+        ctx.fireChannelRead(msg);
+    }
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
+        if (msg instanceof RemotingCommand) {
+            RemotingCommand cmd = (RemotingCommand) msg;
+            countOutbound(cmd.getCode());
+        }
+        ctx.write(msg, promise);
+    }
+
+    private Map<Integer, Long> getDistributionSnapshot(Map<Integer, LongAdder> 
countMap) {
+        Map<Integer, Long> map = new HashMap<>(countMap.size());
+        for (Map.Entry<Integer, LongAdder> entry : countMap.entrySet()) {
+            map.put(entry.getKey(), entry.getValue().sumThenReset());
+        }
+        return map;
+    }
+
+    private String snapshotToString(Map<Integer, Long> distribution) {
+        StringBuilder sb = new StringBuilder("{");
+        if (null != distribution && !distribution.isEmpty()) {
+            boolean first = true;
+            for (Map.Entry<Integer, Long> entry : distribution.entrySet()) {
+                if (0L == entry.getValue()) {
+                    continue;
+                }
+                sb.append(first ? "" : ", 
").append(entry.getKey()).append(":").append(entry.getValue());
+                first = false;
+            }
+        }
+        sb.append("}");
+        return sb.toString();
+    }
+
+    public String getInBoundSnapshotString() {
+        return 
this.snapshotToString(this.getDistributionSnapshot(this.inboundDistribution));
+    }
+
+    public String getOutBoundSnapshotString() {
+        return 
this.snapshotToString(this.getDistributionSnapshot(this.outboundDistribution));
+    }
+}
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java
 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java
new file mode 100644
index 000000000..ee6f3f6c2
--- /dev/null
+++ 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.netty;
+
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+public class RemotingCodeDistributionHandlerTest {
+
+    private final RemotingCodeDistributionHandler distributionHandler = new 
RemotingCodeDistributionHandler();
+
+    @Test
+    public void remotingCodeCountTest() throws Exception {
+        Class<RemotingCodeDistributionHandler> clazz = 
RemotingCodeDistributionHandler.class;
+        Method methodIn = clazz.getDeclaredMethod("countInbound", int.class);
+        Method methodOut = clazz.getDeclaredMethod("countOutbound", int.class);
+        methodIn.setAccessible(true);
+        methodOut.setAccessible(true);
+
+        int threadCount = 4;
+        int count = 1000 * 1000;
+        CountDownLatch latch = new CountDownLatch(threadCount);
+        AtomicBoolean result = new AtomicBoolean(true);
+        ExecutorService executorService = 
Executors.newFixedThreadPool(threadCount, new ThreadFactory() {
+            private final AtomicInteger threadIndex = new AtomicInteger(0);
+
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "RemotingCodeTest_" + 
this.threadIndex.incrementAndGet());
+            }
+        });
+
+        for (int i = 0; i < threadCount; i++) {
+            executorService.submit(() -> {
+                try {
+                    for (int j = 0; j < count; j++) {
+                        methodIn.invoke(distributionHandler, 1);
+                        methodOut.invoke(distributionHandler, 2);
+                    }
+                } catch (Exception e) {
+                    result.set(false);
+                } finally {
+                    latch.countDown();
+                }
+            });
+        }
+
+        latch.await();
+        Assert.assertTrue(result.get());
+        
await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(10)).until(()
 -> {
+            boolean f1 = ("{1:" + count * threadCount + 
"}").equals(distributionHandler.getInBoundSnapshotString());
+            boolean f2 = ("{2:" + count * threadCount + 
"}").equals(distributionHandler.getOutBoundSnapshotString());
+            return f1 && f2;
+        });
+    }
+}
\ No newline at end of file

Reply via email to