vongosling closed pull request #106: [ROCKETMQ-202] Using the native transport URL: https://github.com/apache/rocketmq/pull/106
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pom.xml b/pom.xml index 75dbf5bd..8cfdb358 100644 --- a/pom.xml +++ b/pom.xml @@ -158,6 +158,8 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <netty.version>4.0.36.Final</netty.version> + <!-- Maven properties --> <maven.test.skip>false</maven.test.skip> <maven.javadoc.skip>true</maven.javadoc.skip> @@ -596,7 +598,7 @@ <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> - <version>4.0.36.Final</version> + <version>${netty.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> diff --git a/remoting/pom.xml b/remoting/pom.xml index 15523412..226b5f1f 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -41,6 +41,7 @@ <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 52ca47e6..06ab014d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -26,6 +26,9 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; @@ -111,14 +114,21 @@ public Thread newThread(Runnable r) { } }); - this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() { + ThreadFactory threadFactory = new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); - @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet())); } - }); + }; + + if (Epoll.isAvailable()) { + log.info("Use Epoll"); + this.eventLoopGroupWorker = new EpollEventLoopGroup(1, threadFactory); + } else { + log.info("Use Java NIO"); + this.eventLoopGroupWorker = new NioEventLoopGroup(1, threadFactory); + } } private static int initValueIndex() { @@ -141,7 +151,8 @@ public Thread newThread(Runnable r) { } }); - Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// + Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker) + .channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)// .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index a9a55aba..6c811517 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -98,35 +98,36 @@ public Thread newThread(Runnable r) { } }); - this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { + ThreadFactory bossThreadFactory = new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet())); } - }); + }; - if (useEpoll()) { - this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); - private int threadTotal = nettyServerConfig.getServerSelectorThreads(); + ThreadFactory selectorThreadFactory = new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + private int threadTotal = nettyServerConfig.getServerSelectorThreads(); - @Override - public Thread newThread(Runnable r) { - return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); - } - }); - } else { - this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); - private int threadTotal = nettyServerConfig.getServerSelectorThreads(); + @Override + public Thread newThread(Runnable r) { + return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, + this.threadIndex.incrementAndGet())); + } + }; - @Override - public Thread newThread(Runnable r) { - return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); - } - }); + if (useEpoll()) { + log.info("Use Epoll"); + this.eventLoopGroupBoss = new EpollEventLoopGroup(1, bossThreadFactory); + this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), + selectorThreadFactory); + } else { + log.info("Use Java NIO"); + this.eventLoopGroupBoss = new NioEventLoopGroup(1, bossThreadFactory); + this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), + selectorThreadFactory); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services