This is an automated email from the ASF dual-hosted git repository. aleksey pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new a339aa9 Update Netty dependencies to latest, clean up SocketFactory a339aa9 is described below commit a339aa9e9811723e52896ec3c96395461cad0fd0 Author: Aleksey Yeshchenko <alek...@apache.org> AuthorDate: Tue Jul 2 10:10:19 2019 +0100 Update Netty dependencies to latest, clean up SocketFactory patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for CASSANDRA-15195 --- CHANGES.txt | 1 + build.xml | 3 +- .../{netty-4.1.28.txt => netty-4.1.37.txt} | 0 ...{netty-4.1.28.txt => netty-tcnative-2.0.25.txt} | 1 - ...4.1.28.Final.jar => netty-all-4.1.37.Final.jar} | Bin 3839841 -> 4024948 bytes ...etty-tcnative-boringssl-static-2.0.25.Final.jar | Bin 0 -> 3108312 bytes .../cassandra/net/InboundConnectionInitiator.java | 2 + .../cassandra/net/OutboundConnectionInitiator.java | 16 +- .../org/apache/cassandra/net/SocketFactory.java | 230 +++++++++------------ .../org/apache/cassandra/security/SSLFactory.java | 14 +- .../cassandra/service/NativeTransportService.java | 2 +- 11 files changed, 126 insertions(+), 143 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 2faca24..4d3a9a9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Update Netty dependencies to latest, clean up SocketFactory (CASSANDRA-15195) * Native Transport - Apply noSpamLogger to ConnectionLimitHandler (CASSANDRA-15167) * Reduce heap pressure during compactions (CASSANDRA-14654) * Support building Cassandra with JDK 11 (CASSANDRA-15108) diff --git a/build.xml b/build.xml index bdf5ae2..acfc613 100644 --- a/build.xml +++ b/build.xml @@ -548,7 +548,8 @@ <dependency groupId="com.addthis.metrics" artifactId="reporter-config3" version="3.0.3" /> <dependency groupId="org.mindrot" artifactId="jbcrypt" version="0.3m" /> <dependency groupId="io.airlift" artifactId="airline" version="0.8" /> - <dependency groupId="io.netty" artifactId="netty-all" version="4.1.28.Final" /> + <dependency groupId="io.netty" artifactId="netty-all" version="4.1.37.Final" /> + <dependency groupId="io.netty" artifactId="netty-tcnative-boringssl-static" version="2.0.25.Final" /> <dependency groupId="net.openhft" artifactId="chronicle-queue" version="${chronicle-queue.version}"/> <dependency groupId="net.openhft" artifactId="chronicle-core" version="${chronicle-core.version}"/> <dependency groupId="net.openhft" artifactId="chronicle-bytes" version="${chronicle-bytes.version}"/> diff --git a/lib/licenses/netty-4.1.28.txt b/lib/licenses/netty-4.1.37.txt similarity index 100% copy from lib/licenses/netty-4.1.28.txt copy to lib/licenses/netty-4.1.37.txt diff --git a/lib/licenses/netty-4.1.28.txt b/lib/licenses/netty-tcnative-2.0.25.txt similarity index 99% rename from lib/licenses/netty-4.1.28.txt rename to lib/licenses/netty-tcnative-2.0.25.txt index d645695..261eeb9 100644 --- a/lib/licenses/netty-4.1.28.txt +++ b/lib/licenses/netty-tcnative-2.0.25.txt @@ -1,4 +1,3 @@ - Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ diff --git a/lib/netty-all-4.1.28.Final.jar b/lib/netty-all-4.1.37.Final.jar similarity index 56% rename from lib/netty-all-4.1.28.Final.jar rename to lib/netty-all-4.1.37.Final.jar index 058662e..93cff04 100644 Binary files a/lib/netty-all-4.1.28.Final.jar and b/lib/netty-all-4.1.37.Final.jar differ diff --git a/lib/netty-tcnative-boringssl-static-2.0.25.Final.jar b/lib/netty-tcnative-boringssl-static-2.0.25.Final.jar new file mode 100644 index 0000000..954627f Binary files /dev/null and b/lib/netty-tcnative-boringssl-static-2.0.25.Final.jar differ diff --git a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java index d26abfd..c390ba4 100644 --- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java @@ -132,6 +132,8 @@ public class InboundConnectionInitiator ServerBootstrap bootstrap = initializer.settings.socketFactory .newServerBootstrap() .option(ChannelOption.SO_BACKLOG, 1 << 9) + .option(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance) + .option(ChannelOption.SO_REUSEADDR, true) .childHandler(initializer); int socketReceiveBufferSizeInBytes = initializer.settings.socketReceiveBufferSizeInBytes; diff --git a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java index a63ccf9..fdfb2df 100644 --- a/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/OutboundConnectionInitiator.java @@ -171,13 +171,15 @@ public class OutboundConnectionInitiator<SuccessType extends OutboundConnectionI */ private Bootstrap createBootstrap(EventLoop eventLoop) { - Bootstrap bootstrap = newBootstrap(eventLoop, settings.tcpUserTimeoutInMS) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.tcpConnectTimeoutInMS) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.TCP_NODELAY, settings.tcpNoDelay) - .option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, NoSizeEstimator.instance) - .handler(new Initializer()); + Bootstrap bootstrap = settings.socketFactory + .newClientBootstrap(eventLoop, settings.tcpUserTimeoutInMS) + .option(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.tcpConnectTimeoutInMS) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.TCP_NODELAY, settings.tcpNoDelay) + .option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, NoSizeEstimator.instance) + .handler(new Initializer()); if (settings.socketSendBufferSizeInBytes > 0) bootstrap.option(ChannelOption.SO_SNDBUF, settings.socketSendBufferSizeInBytes); diff --git a/src/java/org/apache/cassandra/net/SocketFactory.java b/src/java/org/apache/cassandra/net/SocketFactory.java index 18bb0d5..062c44b 100644 --- a/src/java/org/apache/cassandra/net/SocketFactory.java +++ b/src/java/org/apache/cassandra/net/SocketFactory.java @@ -18,13 +18,14 @@ package org.apache.cassandra.net; import java.io.IOException; -import java.lang.reflect.Field; import java.net.ConnectException; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; +import java.nio.channels.spi.SelectorProvider; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import javax.net.ssl.SSLEngine; @@ -37,11 +38,11 @@ import org.slf4j.LoggerFactory; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; -import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelFactory; +import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; -import io.netty.channel.MultithreadEventLoopGroup; -import io.netty.channel.SingleThreadEventLoop; +import io.netty.channel.ServerChannel; import io.netty.channel.epoll.EpollChannelOption; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; @@ -53,10 +54,10 @@ import io.netty.channel.unix.Errors; import io.netty.handler.ssl.OpenSsl; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.DefaultEventExecutorChooserFactory; import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.MultithreadEventExecutorGroup; -import io.netty.util.concurrent.SingleThreadEventExecutor; +import io.netty.util.concurrent.RejectedExecutionHandlers; +import io.netty.util.concurrent.ThreadPerTaskExecutor; import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.Slf4JLoggerFactory; import org.apache.cassandra.concurrent.NamedThreadFactory; @@ -82,8 +83,85 @@ public final class SocketFactory private static final int EVENT_THREADS = Integer.getInteger(Config.PROPERTY_PREFIX + "internode-event-threads", FBUtilities.getAvailableProcessors()); - public enum Provider { EPOLL, NIO } - private static final Provider DEFAULT_PROVIDER = NativeTransportService.useEpoll() ? Provider.EPOLL : Provider.NIO; + /** + * The default task queue used by {@code NioEventLoop} and {@code EpollEventLoop} is {@code MpscUnboundedArrayQueue}, + * provided by JCTools. While efficient, it has an undesirable quality for a queue backing an event loop: it is + * not non-blocking, and can cause the event loop to busy-spin while waiting for a partially completed task + * offer, if the producer thread has been suspended mid-offer. + * + * As it happens, however, we have an MPSC queue implementation that is perfectly fit for this purpose - + * {@link ManyToOneConcurrentLinkedQueue}, that is non-blocking, and already used throughout the codebase, + * that we can and do use here as well. + */ + enum Provider + { + NIO + { + @Override + NioEventLoopGroup makeEventLoopGroup(int threadCount, ThreadFactory threadFactory) + { + return new NioEventLoopGroup(threadCount, + new ThreadPerTaskExecutor(threadFactory), + DefaultEventExecutorChooserFactory.INSTANCE, + SelectorProvider.provider(), + DefaultSelectStrategyFactory.INSTANCE, + RejectedExecutionHandlers.reject(), + capacity -> new ManyToOneConcurrentLinkedQueue<>()); + } + + @Override + ChannelFactory<NioSocketChannel> clientChannelFactory() + { + return NioSocketChannel::new; + } + + @Override + ChannelFactory<NioServerSocketChannel> serverChannelFactory() + { + return NioServerSocketChannel::new; + } + }, + EPOLL + { + @Override + EpollEventLoopGroup makeEventLoopGroup(int threadCount, ThreadFactory threadFactory) + { + return new EpollEventLoopGroup(threadCount, + new ThreadPerTaskExecutor(threadFactory), + DefaultEventExecutorChooserFactory.INSTANCE, + DefaultSelectStrategyFactory.INSTANCE, + RejectedExecutionHandlers.reject(), + capacity -> new ManyToOneConcurrentLinkedQueue<>()); + } + + @Override + ChannelFactory<EpollSocketChannel> clientChannelFactory() + { + return EpollSocketChannel::new; + } + + @Override + ChannelFactory<EpollServerSocketChannel> serverChannelFactory() + { + return EpollServerSocketChannel::new; + } + }; + + EventLoopGroup makeEventLoopGroup(int threadCount, String threadNamePrefix) + { + logger.debug("using netty {} event loop for pool prefix {}", name(), threadNamePrefix); + return makeEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix, true)); + } + + abstract EventLoopGroup makeEventLoopGroup(int threadCount, ThreadFactory threadFactory); + abstract ChannelFactory<? extends Channel> clientChannelFactory(); + abstract ChannelFactory<? extends ServerChannel> serverChannelFactory(); + + static Provider optimalProvider() + { + return NativeTransportService.useEpoll() ? EPOLL : NIO; + } + } /** a useful addition for debugging; simply set to true to get more data in your logs */ static final boolean WIRETRACE = false; @@ -93,94 +171,42 @@ public final class SocketFactory InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE); } + private final Provider provider; private final EventLoopGroup acceptGroup; private final EventLoopGroup defaultGroup; // we need a separate EventLoopGroup for outbound streaming because sendFile is blocking private final EventLoopGroup outboundStreamingGroup; final ExecutorService synchronousWorkExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("Messaging-SynchronousWork")); - SocketFactory() { this(DEFAULT_PROVIDER); } - SocketFactory(Provider provider) - { - this.acceptGroup = getEventLoopGroup(provider, 1, "Messaging-AcceptLoop"); - this.defaultGroup = getEventLoopGroup(provider, EVENT_THREADS, NamedThreadFactory.globalPrefix() + "Messaging-EventLoop"); - this.outboundStreamingGroup = getEventLoopGroup(provider, EVENT_THREADS, "Streaming-EventLoop"); - assert provider == providerOf(acceptGroup) - && provider == providerOf(defaultGroup) - && provider == providerOf(outboundStreamingGroup); - } - - private static EventLoopGroup getEventLoopGroup(Provider provider, int threadCount, String threadNamePrefix) + SocketFactory() { - switch (provider) - { - case EPOLL: - logger.debug("using netty epoll event loop for pool prefix {}", threadNamePrefix); - return overwriteMPSCQueues(new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix, true))); - case NIO: - logger.debug("using netty nio event loop for pool prefix {}", threadNamePrefix); - return overwriteMPSCQueues(new NioEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix, true))); - default: - throw new IllegalStateException(); - } + this(Provider.optimalProvider()); } - private static Provider providerOf(EventLoopGroup eventLoopGroup) + SocketFactory(Provider provider) { - while (eventLoopGroup instanceof SingleThreadEventLoop) - eventLoopGroup = ((SingleThreadEventLoop) eventLoopGroup).parent(); - - if (eventLoopGroup instanceof EpollEventLoopGroup) - return Provider.EPOLL; - if (eventLoopGroup instanceof NioEventLoopGroup) - return Provider.NIO; - throw new IllegalStateException(); + this.provider = provider; + this.acceptGroup = provider.makeEventLoopGroup(1, "Messaging-AcceptLoop"); + this.defaultGroup = provider.makeEventLoopGroup(EVENT_THREADS, NamedThreadFactory.globalPrefix() + "Messaging-EventLoop"); + this.outboundStreamingGroup = provider.makeEventLoopGroup(EVENT_THREADS, "Streaming-EventLoop"); } - static Bootstrap newBootstrap(EventLoop eventLoop, int tcpUserTimeoutInMS) + Bootstrap newClientBootstrap(EventLoop eventLoop, int tcpUserTimeoutInMS) { if (eventLoop == null) throw new IllegalArgumentException("must provide eventLoop"); - Bootstrap bootstrap = new Bootstrap() - .group(eventLoop) - .option(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance) - .option(ChannelOption.SO_KEEPALIVE, true); + Bootstrap bootstrap = new Bootstrap().group(eventLoop).channelFactory(provider.clientChannelFactory()); + + if (provider == Provider.EPOLL) + bootstrap.option(EpollChannelOption.TCP_USER_TIMEOUT, tcpUserTimeoutInMS); - switch (providerOf(eventLoop)) - { - case EPOLL: - bootstrap.channel(EpollSocketChannel.class); - bootstrap.option(EpollChannelOption.TCP_USER_TIMEOUT, tcpUserTimeoutInMS); - break; - case NIO: - bootstrap.channel(NioSocketChannel.class); - } return bootstrap; } ServerBootstrap newServerBootstrap() { - return newServerBootstrap(acceptGroup, defaultGroup); - } - - private static ServerBootstrap newServerBootstrap(EventLoopGroup acceptGroup, EventLoopGroup defaultGroup) - { - ServerBootstrap bootstrap = new ServerBootstrap() - .group(acceptGroup, defaultGroup) - .option(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance) - .option(ChannelOption.SO_REUSEADDR, true); - - switch (providerOf(defaultGroup)) - { - case EPOLL: - bootstrap.channel(EpollServerSocketChannel.class); - break; - case NIO: - bootstrap.channel(NioServerSocketChannel.class); - } - - return bootstrap; + return new ServerBootstrap().group(acceptGroup, defaultGroup).channelFactory(provider.serverChannelFactory()); } /** @@ -270,58 +296,4 @@ public final class SocketFactory { return from + "->" + to + '-' + type + '-' + id; } - - /** - * The default task queue used by {@code NioEventLoop} and {@code EpollEventLoop} is {@code MpscUnboundedArrayQueue}, - * provided by JCTools. While efficient, it has an undesirable quality for a queue backing an event loop: it is - * not non-blocking, and can cause the event loop to busy-spin while waiting for a partially completed task - * offer, if the producer thread has been suspended mid-offer. Sadly, there is currently no way to work around - * this behaviour in application-logic. - * - * As it happens, however, we have an MPSC queue implementation that is perfectly fit for this purpose - - * {@link ManyToOneConcurrentLinkedQueue}, that is non-blocking, and already used throughout the codebase. - * - * Unfortunately, there is no Netty API or to override the default queue, so we have to resort to reflection, - * for now. - * - * We filed a Netty issue asking for this capability to be provided cleanly: - * https://github.com/netty/netty/issues/9105, and hopefully Netty will implement it some day. When and if - * that happens, this reflection-based workaround should be removed. - */ - private static EventLoopGroup overwriteMPSCQueues(MultithreadEventLoopGroup eventLoopGroup) - { - try - { - for (EventExecutor eventExecutor : (EventExecutor[]) childrenField.get(eventLoopGroup)) - { - SingleThreadEventLoop eventLoop = (SingleThreadEventLoop) eventExecutor; - taskQueueField.set(eventLoop, new ManyToOneConcurrentLinkedQueue<>()); - tailTasksField.set(eventLoop, new ManyToOneConcurrentLinkedQueue<>()); - } - return eventLoopGroup; - } - catch (IllegalAccessException e) - { - throw new IllegalStateException(e); - } - } - - private static final Field childrenField, taskQueueField, tailTasksField; - static - { - try - { - childrenField = MultithreadEventExecutorGroup.class.getDeclaredField("children"); - taskQueueField = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); - tailTasksField = SingleThreadEventLoop.class.getDeclaredField("tailTasks"); - - childrenField.setAccessible(true); - taskQueueField.setAccessible(true); - tailTasksField.setAccessible(true); - } - catch (NoSuchFieldException e) - { - throw new IllegalStateException(e); - } - } } diff --git a/src/java/org/apache/cassandra/security/SSLFactory.java b/src/java/org/apache/cassandra/security/SSLFactory.java index 6674963..2ccb126 100644 --- a/src/java/org/apache/cassandra/security/SSLFactory.java +++ b/src/java/org/apache/cassandra/security/SSLFactory.java @@ -240,10 +240,12 @@ public final class SSLFactory * Get a netty {@link SslContext} instance. */ @VisibleForTesting - static SslContext getOrCreateSslContext(EncryptionOptions options, boolean buildTruststore, - SocketType socketType, boolean useOpenSsl) throws IOException + static SslContext getOrCreateSslContext(EncryptionOptions options, + boolean buildTruststore, + SocketType socketType, + boolean useOpenSsl) throws IOException { - CacheKey key = new CacheKey(options, socketType); + CacheKey key = new CacheKey(options, socketType, useOpenSsl); SslContext sslContext; sslContext = cachedSslContexts.get(key); @@ -413,11 +415,13 @@ public final class SSLFactory { private final EncryptionOptions encryptionOptions; private final SocketType socketType; + private final boolean useOpenSSL; - public CacheKey(EncryptionOptions encryptionOptions, SocketType socketType) + public CacheKey(EncryptionOptions encryptionOptions, SocketType socketType, boolean useOpenSSL) { this.encryptionOptions = encryptionOptions; this.socketType = socketType; + this.useOpenSSL = useOpenSSL; } public boolean equals(Object o) @@ -426,6 +430,7 @@ public final class SSLFactory if (o == null || getClass() != o.getClass()) return false; CacheKey cacheKey = (CacheKey) o; return (socketType == cacheKey.socketType && + useOpenSSL == cacheKey.useOpenSSL && Objects.equals(encryptionOptions, cacheKey.encryptionOptions)); } @@ -434,6 +439,7 @@ public final class SSLFactory int result = 0; result += 31 * socketType.hashCode(); result += 31 * encryptionOptions.hashCode(); + result += 31 * Boolean.hashCode(useOpenSSL); return result; } } diff --git a/src/java/org/apache/cassandra/service/NativeTransportService.java b/src/java/org/apache/cassandra/service/NativeTransportService.java index 79acab1..79caafc 100644 --- a/src/java/org/apache/cassandra/service/NativeTransportService.java +++ b/src/java/org/apache/cassandra/service/NativeTransportService.java @@ -153,7 +153,7 @@ public class NativeTransportService final boolean enableEpoll = Boolean.parseBoolean(System.getProperty("cassandra.native.epoll.enabled", "true")); if (enableEpoll && !Epoll.isAvailable() && NativeLibrary.osType == NativeLibrary.OSType.LINUX) - logger.warn("epoll not available {}", Epoll.unavailabilityCause()); + logger.warn("epoll not available", Epoll.unavailabilityCause()); return enableEpoll && Epoll.isAvailable(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org