vongosling closed pull request #106: [ROCKETMQ-202] Using the native transport
URL: https://github.com/apache/rocketmq/pull/106
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 75dbf5bd..8cfdb358 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,6 +158,8 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 
+        <netty.version>4.0.36.Final</netty.version>
+
         <!-- Maven properties -->
         <maven.test.skip>false</maven.test.skip>
         <maven.javadoc.skip>true</maven.javadoc.skip>
@@ -596,7 +598,7 @@
             <dependency>
                 <groupId>io.netty</groupId>
                 <artifactId>netty-all</artifactId>
-                <version>4.0.36.Final</version>
+                <version>${netty.version}</version>
             </dependency>
             <dependency>
                 <groupId>com.alibaba</groupId>
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 15523412..226b5f1f 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -41,6 +41,7 @@
             <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
         </dependency>
+
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 52ca47e6..06ab014d 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -26,6 +26,9 @@
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
@@ -111,14 +114,21 @@ public Thread newThread(Runnable r) {
             }
         });
 
-        this.eventLoopGroupWorker = new NioEventLoopGroup(1, new 
ThreadFactory() {
+        ThreadFactory threadFactory = new ThreadFactory() {
             private AtomicInteger threadIndex = new AtomicInteger(0);
-
             @Override
             public Thread newThread(Runnable r) {
                 return new Thread(r, String.format("NettyClientSelector_%d", 
this.threadIndex.incrementAndGet()));
             }
-        });
+        };
+
+        if (Epoll.isAvailable()) {
+            log.info("Use Epoll");
+            this.eventLoopGroupWorker = new EpollEventLoopGroup(1, 
threadFactory);
+        } else {
+            log.info("Use Java NIO");
+            this.eventLoopGroupWorker = new NioEventLoopGroup(1, 
threadFactory);
+        }
     }
 
     private static int initValueIndex() {
@@ -141,7 +151,8 @@ public Thread newThread(Runnable r) {
                 }
             });
 
-        Bootstrap handler = 
this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
+        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker)
+            .channel(Epoll.isAvailable() ? EpollSocketChannel.class : 
NioSocketChannel.class)//
             .option(ChannelOption.TCP_NODELAY, true)
             .option(ChannelOption.SO_KEEPALIVE, false)
             .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
nettyClientConfig.getConnectTimeoutMillis())
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index a9a55aba..6c811517 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -98,35 +98,36 @@ public Thread newThread(Runnable r) {
             }
         });
 
-        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() 
{
+        ThreadFactory bossThreadFactory = new ThreadFactory() {
             private AtomicInteger threadIndex = new AtomicInteger(0);
 
             @Override
             public Thread newThread(Runnable r) {
                 return new Thread(r, String.format("NettyBoss_%d", 
this.threadIndex.incrementAndGet()));
             }
-        });
+        };
 
-        if (useEpoll()) {
-            this.eventLoopGroupSelector = new 
EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new 
ThreadFactory() {
-                private AtomicInteger threadIndex = new AtomicInteger(0);
-                private int threadTotal = 
nettyServerConfig.getServerSelectorThreads();
+        ThreadFactory selectorThreadFactory = new ThreadFactory() {
+            private AtomicInteger threadIndex = new AtomicInteger(0);
+            private int threadTotal = 
nettyServerConfig.getServerSelectorThreads();
 
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, 
String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, 
this.threadIndex.incrementAndGet()));
-                }
-            });
-        } else {
-            this.eventLoopGroupSelector = new 
NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new 
ThreadFactory() {
-                private AtomicInteger threadIndex = new AtomicInteger(0);
-                private int threadTotal = 
nettyServerConfig.getServerSelectorThreads();
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, 
String.format("NettyServerEPOLLSelector_%d_%d", threadTotal,
+                    this.threadIndex.incrementAndGet()));
+            }
+        };
 
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, 
String.format("NettyServerNIOSelector_%d_%d", threadTotal, 
this.threadIndex.incrementAndGet()));
-                }
-            });
+        if (useEpoll()) {
+            log.info("Use Epoll");
+            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, 
bossThreadFactory);
+            this.eventLoopGroupSelector = new 
EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(),
+                selectorThreadFactory);
+        } else {
+            log.info("Use Java NIO");
+            this.eventLoopGroupBoss = new NioEventLoopGroup(1, 
bossThreadFactory);
+            this.eventLoopGroupSelector = new 
NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(),
+                selectorThreadFactory);
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to