This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/main by this push: new 4ca0618 QPIDJMS-553 Shared Netty event loop group 4ca0618 is described below commit 4ca0618f546d3d7b54c8639036841646c0ffa202 Author: franz1981 <nigro....@gmail.com> AuthorDate: Wed Mar 16 11:01:28 2022 +0100 QPIDJMS-553 Shared Netty event loop group co-author: gemmellr --- .../qpid/jms/transports/TransportOptions.java | 11 + .../jms/transports/netty/EventLoopGroupRef.java | 27 ++ .../qpid/jms/transports/netty/EventLoopType.java | 89 ++++++ .../netty/NettyEventLoopGroupFactory.java | 211 +++++++++++++ .../jms/transports/netty/NettyTcpTransport.java | 53 ++-- .../apache/qpid/jms/util/QpidJMSThreadFactory.java | 35 ++- .../qpid/jms/transports/TransportOptionsTest.java | 4 + .../netty/NettySslTransportFactoryTest.java | 4 + .../netty/NettyTcpTransportFactoryTest.java | 4 + .../transports/netty/NettyTcpTransportTest.java | 342 ++++++++++++++------- 10 files changed, 618 insertions(+), 162 deletions(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java index ba62d04..1ed1771 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java @@ -39,6 +39,7 @@ public class TransportOptions implements Cloneable { public static final int DEFAULT_SO_TIMEOUT = -1; public static final int DEFAULT_CONNECT_TIMEOUT = 60000; public static final int DEFAULT_TCP_PORT = 5672; + public static final int DEFAULT_SHARED_EVENT_LOOP_THREADS = -1; public static final boolean DEFAULT_USE_EPOLL = true; public static final boolean DEFAULT_USE_KQUEUE = false; public static final boolean DEFAULT_TRACE_BYTES = false; @@ -73,6 +74,7 @@ public class TransportOptions implements Cloneable { private boolean useKQueue = DEFAULT_USE_KQUEUE; private boolean traceBytes = DEFAULT_TRACE_BYTES; private boolean useOpenSSL = DEFAULT_USE_OPENSSL; + private int sharedEventLoopThreads = DEFAULT_SHARED_EVENT_LOOP_THREADS; private String keyStoreLocation; private String keyStorePassword; @@ -213,6 +215,14 @@ public class TransportOptions implements Cloneable { this.tcpKeepAlive = keepAlive; } + public void setSharedEventLoopThreads(int numThreads) { + this.sharedEventLoopThreads = numThreads; + } + + public int getSharedEventLoopThreads() { + return sharedEventLoopThreads; + } + public int getConnectTimeout() { return connectTimeout; } @@ -590,6 +600,7 @@ public class TransportOptions implements Cloneable { copy.setUseOpenSSL(isUseOpenSSL()); copy.setLocalAddress(getLocalAddress()); copy.setLocalPort(getLocalPort()); + copy.setSharedEventLoopThreads(getSharedEventLoopThreads()); return copy; } diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopGroupRef.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopGroupRef.java new file mode 100644 index 0000000..e590e5f --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopGroupRef.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import io.netty.channel.EventLoopGroup; + +public interface EventLoopGroupRef extends AutoCloseable { + + EventLoopGroup group(); + + @Override + void close(); +} diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopType.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopType.java new file mode 100644 index 0000000..610a6ff --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopType.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.qpid.jms.transports.TransportOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ThreadFactory; + +import static java.util.Objects.requireNonNull; + +public enum EventLoopType { + EPOLL, KQUEUE, NIO; + + private static final Logger LOG = LoggerFactory.getLogger(EventLoopType.class); + + public void createChannel(final Bootstrap bootstrap) { + createChannel(this, requireNonNull(bootstrap)); + } + + public EventLoopGroup createEventLoopGroup(final int threads, final ThreadFactory ioThreadFactory) { + return createEventLoopGroup(this, threads, ioThreadFactory); + } + + private static EventLoopGroup createEventLoopGroup(final EventLoopType type, final int threads, final ThreadFactory ioThreadFactory) { + switch (type) { + case EPOLL: + LOG.trace("Netty Transport using Epoll mode"); + return EpollSupport.createGroup(threads, ioThreadFactory); + case KQUEUE: + LOG.trace("Netty Transport using KQueue mode"); + return KQueueSupport.createGroup(threads, ioThreadFactory); + case NIO: + LOG.trace("Netty Transport using Nio mode"); + return new NioEventLoopGroup(threads, ioThreadFactory); + default: + throw new IllegalArgumentException("Unknown event loop type:" + type); + } + } + + private static void createChannel(final EventLoopType type, final Bootstrap bootstrap) { + switch (type) { + case EPOLL: + EpollSupport.createChannel(bootstrap); + break; + case KQUEUE: + KQueueSupport.createChannel(bootstrap); + break; + case NIO: + bootstrap.channel(NioSocketChannel.class); + break; + default: + throw new IllegalArgumentException("Unknown event loop type:" + type); + } + } + + public static EventLoopType valueOf(final TransportOptions transportOptions) { + final boolean useKQueue = KQueueSupport.isAvailable(transportOptions); + final boolean useEpoll = EpollSupport.isAvailable(transportOptions); + if (useKQueue) { + return KQUEUE; + } + + if (useEpoll) { + return EPOLL; + } + + return NIO; + } +} diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java new file mode 100644 index 0000000..b047f7c --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.Future; + +import org.apache.qpid.jms.util.QpidJMSThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class NettyEventLoopGroupFactory { + + private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupFactory.class); + private static final AtomicLong SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE = new AtomicLong(0); + private static final int SHUTDOWN_TIMEOUT = 50; + + private static final Map<EventLoopGroupKey, EventLoopGroupHolder> SHARED_EVENT_LOOP_GROUPS = new HashMap<>(); + + private NettyEventLoopGroupFactory() { + // No instances + } + + public static EventLoopGroupRef unsharedGroup(final EventLoopType type, final ThreadFactory threadFactory) { + Objects.requireNonNull(type); + final EventLoopGroup unsharedGroup = type.createEventLoopGroup(1, threadFactory); + + return new EventLoopGroupRef() { + @Override + public EventLoopGroup group() { + return unsharedGroup; + } + + @Override + public void close() { + shutdownEventLoopGroup(unsharedGroup); + } + }; + } + + public static EventLoopGroupRef sharedGroup(final EventLoopType type, final int threads) { + Objects.requireNonNull(type); + if (threads <= 0) { + throw new IllegalArgumentException("shared event loop threads value must be > 0"); + } + + final EventLoopGroupKey key = new EventLoopGroupKey(type, threads); + + synchronized (SHARED_EVENT_LOOP_GROUPS) { + EventLoopGroupHolder groupHolder = SHARED_EVENT_LOOP_GROUPS.get(key); + if (groupHolder == null) { + groupHolder = new EventLoopGroupHolder(createSharedEventLoopGroup(type, threads), key); + + SHARED_EVENT_LOOP_GROUPS.put(key, groupHolder); + } else { + groupHolder.incRef(); + } + + return new SharedEventLoopGroupRef(groupHolder); + } + } + + private static void sharedGroupRefClosed(EventLoopGroupHolder holder) { + boolean shutdown = false; + synchronized (SHARED_EVENT_LOOP_GROUPS) { + if (holder.decRef()) { + SHARED_EVENT_LOOP_GROUPS.remove(holder.key()); + shutdown = true; + } + } + + if (shutdown) { + shutdownEventLoopGroup(holder.group()); + } + } + + private static void shutdownEventLoopGroup(final EventLoopGroup group) { + Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + + private static ThreadFactory createSharedThreadFactory(final EventLoopType type, final int threads) { + final String baseName = "SharedNettyEventLoopGroup (" + SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE.incrementAndGet() + ")[" + type + " - size=" + threads + "]:"; + + return new QpidJMSThreadFactory(thread -> baseName + " thread-id=" + thread.getId(), true); + } + + private static EventLoopGroup createSharedEventLoopGroup(final EventLoopType type, final int threads) { + return type.createEventLoopGroup(threads, createSharedThreadFactory(type, threads)); + } + + private static final class SharedEventLoopGroupRef implements EventLoopGroupRef { + private final EventLoopGroupHolder sharedGroupHolder; + private final AtomicBoolean closed = new AtomicBoolean(); + + public SharedEventLoopGroupRef(final EventLoopGroupHolder sharedGroupHolder) { + this.sharedGroupHolder = Objects.requireNonNull(sharedGroupHolder); + } + + @Override + public EventLoopGroup group() { + if (closed.get()) { + throw new IllegalStateException("Group reference is already closed"); + } + + return sharedGroupHolder.group(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + sharedGroupRefClosed(sharedGroupHolder); + } + } + } + + private static class EventLoopGroupKey { + private final EventLoopType type; + private final int eventLoopThreads; + + private EventLoopGroupKey(final EventLoopType type, final int eventLoopThreads) { + this.type = type; + this.eventLoopThreads = eventLoopThreads; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final EventLoopGroupKey that = (EventLoopGroupKey) o; + if (eventLoopThreads != that.eventLoopThreads) { + return false; + } + return type == that.type; + } + + @Override + public int hashCode() { + int result = type != null ? type.hashCode() : 0; + result = 31 * result + eventLoopThreads; + return result; + } + } + + private static final class EventLoopGroupHolder { + private final EventLoopGroup group; + private final EventLoopGroupKey key; + private int refCnt = 1; + + private EventLoopGroupHolder(final EventLoopGroup sharedGroup, final EventLoopGroupKey key) { + this.group = Objects.requireNonNull(sharedGroup); + this.key = Objects.requireNonNull(key); + } + + public EventLoopGroup group() { + return group; + } + + public EventLoopGroupKey key() { + return key; + } + + public void incRef() { + assert Thread.holdsLock(SHARED_EVENT_LOOP_GROUPS); + if (refCnt == 0) { + throw new IllegalStateException("The group was already released, can not increment reference count."); + } + + refCnt++; + } + + public boolean decRef() { + assert Thread.holdsLock(SHARED_EVENT_LOOP_GROUPS); + if (refCnt == 0) { + throw new IllegalStateException("The group was already released, can not decrement reference count."); + } + + refCnt--; + + return refCnt == 0; + } + } +} \ No newline at end of file diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java index a329482..7f0f9d3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java @@ -23,7 +23,6 @@ import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -47,11 +46,8 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.proxy.ProxyHandler; import io.netty.handler.ssl.SslHandler; @@ -60,6 +56,9 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import static org.apache.qpid.jms.transports.netty.NettyEventLoopGroupFactory.sharedGroup; +import static org.apache.qpid.jms.transports.netty.NettyEventLoopGroupFactory.unsharedGroup; + /** * TCP based transport that uses Netty as the underlying IO layer. */ @@ -67,11 +66,9 @@ public class NettyTcpTransport implements Transport { private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class); - public static final int SHUTDOWN_TIMEOUT = 50; public static final int DEFAULT_MAX_FRAME_SIZE = 65535; - protected Bootstrap bootstrap; - protected EventLoopGroup group; + protected EventLoopGroupRef groupRef; protected Channel channel; protected TransportListener listener; protected ThreadFactory ioThreadfactory; @@ -137,29 +134,20 @@ public class NettyTcpTransport implements Transport { } TransportOptions transportOptions = getTransportOptions(); - boolean useKQueue = KQueueSupport.isAvailable(transportOptions); - boolean useEpoll = EpollSupport.isAvailable(transportOptions); - - if (useKQueue) { - LOG.trace("Netty Transport using KQueue mode"); - group = KQueueSupport.createGroup(1, ioThreadfactory); - } else if (useEpoll) { - LOG.trace("Netty Transport using Epoll mode"); - group = EpollSupport.createGroup(1, ioThreadfactory); - } else { - LOG.trace("Netty Transport using NIO mode"); - group = new NioEventLoopGroup(1, ioThreadfactory); - } - bootstrap = new Bootstrap(); - bootstrap.group(group); - if (useKQueue) { - KQueueSupport.createChannel(bootstrap); - } else if (useEpoll) { - EpollSupport.createChannel(bootstrap); + EventLoopType eventLoopType = EventLoopType.valueOf(transportOptions); + int sharedEventLoopThreads = transportOptions.getSharedEventLoopThreads(); + if (sharedEventLoopThreads > 0) { + groupRef = sharedGroup(eventLoopType, sharedEventLoopThreads); } else { - bootstrap.channel(NioSocketChannel.class); + groupRef = unsharedGroup(eventLoopType, ioThreadfactory); } + + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(groupRef.group()); + + eventLoopType.createChannel(bootstrap); + bootstrap.handler(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel connectedChannel) throws Exception { @@ -214,8 +202,8 @@ public class NettyTcpTransport implements Transport { } }); } - - return group; + // returning the channel's specific event loop: the overall event loop group may be multi-threaded + return channel.eventLoop(); } @Override @@ -237,11 +225,8 @@ public class NettyTcpTransport implements Transport { channel.close().syncUninterruptibly(); } } finally { - if (group != null) { - Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { - LOG.trace("Channel group shutdown failed to complete in allotted time"); - } + if (groupRef != null) { + groupRef.close(); } } } diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java index b4e9f06..dc73834 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java @@ -19,6 +19,7 @@ package org.apache.qpid.jms.util; import java.lang.Thread.UncaughtExceptionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +31,7 @@ public class QpidJMSThreadFactory implements ThreadFactory { private static final Logger LOG = LoggerFactory.getLogger(QpidJMSThreadFactory.class); - private final String threadName; + private final Function<Thread, String> threadNamingStrategy; private final boolean daemon; private final AtomicReference<Thread> threadTracker; @@ -39,12 +40,12 @@ public class QpidJMSThreadFactory implements ThreadFactory { * given name and daemon state. * * @param threadName - * the name that will be used for each thread created. + * the name that will be used for each thread created. * @param daemon - * should the created thread be a daemon thread. + * should the created thread be a daemon thread. */ public QpidJMSThreadFactory(String threadName, boolean daemon) { - this.threadName = threadName; + this.threadNamingStrategy = t -> threadName; this.daemon = daemon; this.threadTracker = null; } @@ -59,18 +60,33 @@ public class QpidJMSThreadFactory implements ThreadFactory { * to be known for some reason. * * @param threadName - * the name that will be used for each thread created. + * the name that will be used for each thread created. * @param daemon - * should the created thread be a daemon thread. + * should the created thread be a daemon thread. * @param threadTracker - * AtomicReference that will be updated any time a new Thread is created. + * AtomicReference that will be updated any time a new Thread is created. */ public QpidJMSThreadFactory(String threadName, boolean daemon, AtomicReference<Thread> threadTracker) { - this.threadName = threadName; + this.threadNamingStrategy = t -> threadName; this.daemon = daemon; this.threadTracker = threadTracker; } + /** + * Creates a new Thread factory that will create threads with the + * provided thread naming function and daemon state. + * + * @param threadNamingStrategy + * the naming strategy that will be used for each thread created. + * @param daemon + * should the created thread be a daemon thread. + */ + public QpidJMSThreadFactory(Function<Thread, String> threadNamingStrategy, boolean daemon) { + this.threadNamingStrategy = threadNamingStrategy; + this.daemon = daemon; + this.threadTracker = null; + } + @Override public Thread newThread(final Runnable target) { Runnable runner = target; @@ -91,8 +107,9 @@ public class QpidJMSThreadFactory implements ThreadFactory { }; } - Thread thread = new Thread(runner, threadName); + Thread thread = new Thread(runner); thread.setDaemon(daemon); + thread.setName(threadNamingStrategy.apply(thread)); thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java index 402608b..d476e5b 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java @@ -50,6 +50,7 @@ public class TransportOptionsTest extends QpidJmsTestCase { public static final int LOCAL_PORT = 30000; public static final boolean TEST_USE_EPOLL_VALUE = !TransportOptions.DEFAULT_USE_EPOLL; public static final boolean TEST_TRACE_BYTES_VALUE = !TransportOptions.DEFAULT_TRACE_BYTES; + public static final int TEST_SHARED_EVENT_LOOP_THREADS_VALUE = 5; private static final String PASSWORD = "password"; private static final String CLIENT_KEYSTORE = "src/test/resources/client-jks.keystore"; @@ -98,6 +99,7 @@ public class TransportOptionsTest extends QpidJmsTestCase { assertNull(options.getKeyAlias()); assertNull(options.getSslContextOverride()); assertNull(options.getProxyHandlerSupplier()); + assertEquals(TransportOptions.DEFAULT_SHARED_EVENT_LOOP_THREADS, options.getSharedEventLoopThreads()); } @Test @@ -149,6 +151,7 @@ public class TransportOptionsTest extends QpidJmsTestCase { assertArrayEquals(DISABLED_PROTOCOLS,options.getDisabledProtocols()); assertArrayEquals(ENABLED_CIPHERS,options.getEnabledCipherSuites()); assertArrayEquals(DISABLED_CIPHERS,options.getDisabledCipherSuites()); + assertEquals(TEST_SHARED_EVENT_LOOP_THREADS_VALUE, options.getSharedEventLoopThreads()); } @Test @@ -337,6 +340,7 @@ public class TransportOptionsTest extends QpidJmsTestCase { options.setLocalAddress(LOCAL_ADDRESS); options.setLocalPort(LOCAL_PORT); options.setProxyHandlerSupplier(PROXY_HANDLER_SUPPLIER); + options.setSharedEventLoopThreads(TEST_SHARED_EVENT_LOOP_THREADS_VALUE); return options; } diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactoryTest.java index 99a4334..b03e6b9 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactoryTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactoryTest.java @@ -49,6 +49,7 @@ public class NettySslTransportFactoryTest { public static final int CUSTOM_CONNECT_TIMEOUT = 90000; private static final String CUSTOM_LOCAL_ADDRESS = "localhost"; private static final int CUSTOM_LOCAL_PORT = 30000; + private static final int CUSTOM_SHARED_EVENT_LOOP_THREADS = 7; public static final String CUSTOM_CONTEXT_PROTOCOL = "TLSv1.2"; public static final String[] CUSTOM_ENABLED_PROTOCOLS = { "TLSv1.1", "TLSv1.2" }; @@ -93,6 +94,7 @@ public class NettySslTransportFactoryTest { assertEquals(TransportOptions.DEFAULT_SO_TIMEOUT, options.getSoTimeout()); assertNull(options.getLocalAddress()); assertEquals(TransportOptions.DEFAULT_LOCAL_PORT, options.getLocalPort()); + assertEquals(TransportOptions.DEFAULT_SHARED_EVENT_LOOP_THREADS, options.getSharedEventLoopThreads()); assertEquals(TransportOptions.DEFAULT_CONTEXT_PROTOCOL, options.getContextProtocol()); assertNull(options.getEnabledProtocols()); @@ -124,6 +126,7 @@ public class NettySslTransportFactoryTest { URI BASE_URI = new URI("tcp://localhost:5672"); URI configuredURI = new URI(BASE_URI.toString() + "?" + + "transport.sharedEventLoopThreads=" + CUSTOM_SHARED_EVENT_LOOP_THREADS + "&" + "transport.connectTimeout=" + CUSTOM_CONNECT_TIMEOUT + "&" + "transport.sendBufferSize=" + CUSTOM_SEND_BUFFER_SIZE + "&" + "transport.receiveBufferSize=" + CUSTOM_RECEIVE_BUFFER_SIZE + "&" + @@ -156,6 +159,7 @@ public class NettySslTransportFactoryTest { TransportOptions options = transport.getTransportOptions(); assertNotNull(options); + assertEquals(CUSTOM_SHARED_EVENT_LOOP_THREADS, options.getSharedEventLoopThreads()); assertEquals(CUSTOM_CONNECT_TIMEOUT, options.getConnectTimeout()); assertEquals(CUSTOM_SEND_BUFFER_SIZE, options.getSendBufferSize()); assertEquals(CUSTOM_RECEIVE_BUFFER_SIZE, options.getReceiveBufferSize()); diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactoryTest.java index cbb88d3..2968023 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactoryTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactoryTest.java @@ -44,6 +44,7 @@ public class NettyTcpTransportFactoryTest { public static final int CUSTOM_CONNECT_TIMEOUT = 90000; private static final String CUSTOM_LOCAL_ADDRESS = "localhost"; private static final int CUSTOM_LOCAL_PORT = 30000; + private static final int CUSTOM_SHARED_EVENT_LOOP_THREADS = 7; @Test(timeout = 30000) public void testCreateWithDefaultOptions() throws Exception { @@ -70,6 +71,7 @@ public class NettyTcpTransportFactoryTest { assertEquals(TransportOptions.DEFAULT_SO_TIMEOUT, options.getSoTimeout()); assertNull(options.getLocalAddress()); assertEquals(TransportOptions.DEFAULT_LOCAL_PORT, options.getLocalPort()); + assertEquals(TransportOptions.DEFAULT_SHARED_EVENT_LOOP_THREADS, options.getSharedEventLoopThreads()); } @Test(expected = IllegalArgumentException.class) @@ -114,6 +116,7 @@ public class NettyTcpTransportFactoryTest { URI BASE_URI = new URI("tcp://localhost:5672"); URI configuredURI = new URI(BASE_URI.toString() + "?" + + "transport.sharedEventLoopThreads=" + CUSTOM_SHARED_EVENT_LOOP_THREADS + "&" + "transport.connectTimeout=" + CUSTOM_CONNECT_TIMEOUT + "&" + "transport.sendBufferSize=" + CUSTOM_SEND_BUFFER_SIZE + "&" + "transport.receiveBufferSize=" + CUSTOM_RECEIVE_BUFFER_SIZE + "&" + @@ -136,6 +139,7 @@ public class NettyTcpTransportFactoryTest { TransportOptions options = transport.getTransportOptions(); assertNotNull(options); + assertEquals(CUSTOM_SHARED_EVENT_LOOP_THREADS, options.getSharedEventLoopThreads()); assertEquals(CUSTOM_CONNECT_TIMEOUT, options.getConnectTimeout()); assertEquals(CUSTOM_SEND_BUFFER_SIZE, options.getSendBufferSize()); assertEquals(CUSTOM_RECEIVE_BUFFER_SIZE, options.getReceiveBufferSize()); diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java index 80abf34..9bc61f9 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java @@ -16,9 +16,12 @@ */ package org.apache.qpid.jms.transports.netty; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -31,11 +34,14 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import io.netty.channel.EventLoopGroup; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.Wait; import org.apache.qpid.jms.test.proxy.TestProxy; @@ -58,6 +64,7 @@ import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.kqueue.KQueue; import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.proxy.ProxyHandler; import io.netty.handler.proxy.Socks5ProxyHandler; import io.netty.util.ResourceLeakDetector; @@ -225,13 +232,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); - Transport transport = createTransport(serverLocation, testListener, createClientOptions()); - try { - transport.connect(null, null); - LOG.info("Connected to server:{} as expected.", serverLocation); - } catch (Exception e) { - fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); - } + Transport transport = createConnectedTransport(serverLocation, createClientOptions()); assertTrue(transport.isConnected()); assertEquals(serverLocation, transport.getRemoteLocation()); @@ -260,15 +261,10 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { List<Transport> transports = new ArrayList<Transport>(); for (int i = 0; i < CONNECTION_COUNT; ++i) { - Transport transport = createTransport(serverLocation, testListener, createClientOptions()); - try { - transport.connect(null, null); - assertTrue(transport.isConnected()); - LOG.info("Connected to server:{} as expected.", serverLocation); - transports.add(transport); - } catch (Exception e) { - fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); - } + Transport transport = createConnectedTransport(serverLocation, createClientOptions()); + assertTrue(transport.isConnected()); + + transports.add(transport); } for (Transport transport : transports) { @@ -336,13 +332,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); - transport = createTransport(serverLocation, testListener, createClientOptions()); - try { - transport.connect(null, null); - LOG.info("Connected to server:{} as expected.", serverLocation); - } catch (Exception e) { - fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); - } + transport = createConnectedTransport(serverLocation, createClientOptions()); assertTrue(transport.isConnected()); @@ -374,13 +364,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); - Transport transport = createTransport(serverLocation, testListener, createClientOptions()); - try { - transport.connect(null, null); - LOG.info("Connected to server:{} as expected.", serverLocation); - } catch (Exception e) { - fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); - } + Transport transport = createConnectedTransport(serverLocation, createClientOptions()); assertTrue(transport.isConnected()); @@ -395,20 +379,171 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { } @Test(timeout = 60 * 1000) - public void testDataSentIsReceived() throws Exception { + public void testCannotDereferenceSharedClosedEventLoopGroup() throws Exception { try (NettyEchoServer server = createEchoServer(createServerOptions())) { server.start(); int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); + final TransportOptions sharedTransportOptions = createClientOptions(); + sharedTransportOptions.setUseKQueue(false); + sharedTransportOptions.setUseEpoll(false); + sharedTransportOptions.setSharedEventLoopThreads(1); - Transport transport = createTransport(serverLocation, testListener, createClientOptions()); + EventLoopGroupRef groupRef = null; + Transport nioSharedTransport = createConnectedTransport(serverLocation, sharedTransportOptions); try { - transport.connect(null, null); - LOG.info("Connected to server:{} as expected.", serverLocation); - } catch (Exception e) { - fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); + groupRef = getGroupRef(nioSharedTransport); + assertNotNull(groupRef.group()); + } finally { + nioSharedTransport.close(); + } + + try { + groupRef.group(); + fail("Should have thrown ISE due to being closed"); + } catch (IllegalStateException expected) { + // Ignore + } catch (Throwable unexpected) { + fail("Should have thrown IllegalStateException"); } + } + + assertTrue(!transportClosed); // Normal shutdown does not trigger the event. + assertTrue(exceptions.isEmpty()); + assertTrue(data.isEmpty()); + } + + @Test(timeout = 60 * 1000) + public void testSharedEventLoopGroups() throws Exception { + final Set<Transport> transports = new HashSet<>(); + try (NettyEchoServer server = createEchoServer(createServerOptions())) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + final TransportOptions sharedTransportOptions = createClientOptions(); + sharedTransportOptions.setUseKQueue(false); + sharedTransportOptions.setUseEpoll(false); + sharedTransportOptions.setSharedEventLoopThreads(1); + + Transport sharedNioTransport1 = createConnectedTransport(serverLocation, sharedTransportOptions); + transports.add(sharedNioTransport1); + Transport sharedNioTransport2 = createConnectedTransport(serverLocation, sharedTransportOptions); + transports.add(sharedNioTransport2); + + final EventLoopGroup sharedGroup = getGroupRef(sharedNioTransport1).group(); + assertSame(sharedGroup, getGroupRef(sharedNioTransport2).group()); + + sharedNioTransport1.close(); + assertFalse(sharedGroup.isShutdown()); + assertFalse(sharedGroup.isTerminated()); + + sharedNioTransport2.close(); + assertTrue(sharedGroup.isShutdown()); + assertTrue(sharedGroup.isTerminated()); + } finally { + // Ensures that any not already closed, e.g due to test failure, are now closed. + cleanUpTransports(transports); + } + + assertTrue(!transportClosed); // Normal shutdown does not trigger the event. + assertTrue(exceptions.isEmpty()); + assertTrue(data.isEmpty()); + } + + @Test(timeout = 60 * 1000) + public void testSharedEventLoopGroupsOfDifferentSizes() throws Exception { + final Set<Transport> transports = new HashSet<>(); + try (NettyEchoServer server = createEchoServer(createServerOptions())) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + final TransportOptions sharedTransportOptions1 = createClientOptions(); + sharedTransportOptions1.setUseKQueue(false); + sharedTransportOptions1.setUseEpoll(false); + sharedTransportOptions1.setSharedEventLoopThreads(1); + Transport nioSharedTransport1 = createConnectedTransport(serverLocation, sharedTransportOptions1); + transports.add(nioSharedTransport1); + + final TransportOptions sharedTransportOptions2 = createClientOptions(); + sharedTransportOptions2.setUseKQueue(false); + sharedTransportOptions2.setUseEpoll(false); + sharedTransportOptions2.setSharedEventLoopThreads(2); + Transport nioSharedTransport2 = createConnectedTransport(serverLocation, sharedTransportOptions2); + transports.add(nioSharedTransport2); + + EventLoopGroup sharedGroup1 = getGroupRef(nioSharedTransport1).group(); + EventLoopGroup sharedGroup2 = getGroupRef(nioSharedTransport2).group(); + assertNotSame(sharedGroup1, sharedGroup2); + + nioSharedTransport1.close(); + assertTrue(sharedGroup1.isShutdown()); + assertTrue(sharedGroup1.isTerminated()); + + nioSharedTransport2.close(); + assertTrue(sharedGroup2.isShutdown()); + assertTrue(sharedGroup2.isTerminated()); + } finally { + // Ensures that any not already closed, e.g due to test failure, are now closed. + cleanUpTransports(transports); + } + + assertTrue(!transportClosed); // Normal shutdown does not trigger the event. + assertTrue(exceptions.isEmpty()); + assertTrue(data.isEmpty()); + } + + @Test(timeout = 60 * 1000) + public void testUnsharedEventLoopGroups() throws Exception { + final Set<Transport> transports = new HashSet<>(); + try (NettyEchoServer server = createEchoServer(createServerOptions())) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + final TransportOptions unsharedTransportOptions = createClientOptions(); + unsharedTransportOptions.setUseKQueue(false); + unsharedTransportOptions.setUseEpoll(false); + unsharedTransportOptions.setSharedEventLoopThreads(0); + + Transport unsharedNioTransport1 = createConnectedTransport(serverLocation, unsharedTransportOptions); + transports.add(unsharedNioTransport1); + Transport unsharedNioTransport2 = createConnectedTransport(serverLocation, unsharedTransportOptions); + transports.add(unsharedNioTransport2); + + final EventLoopGroup unsharedGroup1 = getGroupRef(unsharedNioTransport1).group(); + final EventLoopGroup unsharedGroup2 = getGroupRef(unsharedNioTransport2).group(); + assertNotSame(unsharedGroup1, unsharedNioTransport2); + + unsharedNioTransport1.close(); + assertTrue(unsharedGroup1.isShutdown()); + assertTrue(unsharedGroup1.isTerminated()); + + unsharedNioTransport2.close(); + assertTrue(unsharedGroup2.isShutdown()); + assertTrue(unsharedGroup2.isTerminated()); + } finally { + // Ensures that any not already closed, e.g due to test failure, are now closed. + cleanUpTransports(transports); + } + + assertTrue(!transportClosed); // Normal shutdown does not trigger the event. + assertTrue(exceptions.isEmpty()); + assertTrue(data.isEmpty()); + } + + @Test(timeout = 60 * 1000) + public void testDataSentIsReceived() throws Exception { + try (NettyEchoServer server = createEchoServer(createServerOptions())) { + server.start(); + + int port = server.getServerPort(); + URI serverLocation = new URI("tcp://localhost:" + port); + + Transport transport = createConnectedTransport(serverLocation, createClientOptions()); assertTrue(transport.isConnected()); @@ -453,13 +588,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); - Transport transport = createTransport(serverLocation, testListener, createClientOptions()); - try { - transport.connect(null, null); - LOG.info("Connected to server:{} as expected.", serverLocation); - } catch (Exception e) { - fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); - } + Transport transport = createConnectedTransport(serverLocation, createClientOptions()); assertTrue(transport.isConnected()); @@ -488,21 +617,13 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { @Test(timeout = 60 * 1000) public void testSendToClosedTransportFails() throws Exception { - Transport transport = null; - try (NettyEchoServer server = createEchoServer(createServerOptions())) { server.start(); int port = server.getServerPort(); URI serverLocation = new URI("tcp://localhost:" + port); - transport = createTransport(serverLocation, testListener, createClientOptions()); - try { - transport.connect(null, null); - LOG.info("Connected to server:{} as expected.", serverLocation); - } catch (Exception e) { - fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); - } + Transport transport = createConnectedTransport(serverLocation, createClientOptions()); assertTrue(transport.isConnected()); @@ -588,13 +709,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { URI serverLocation = new URI("tcp://localhost:" + port); for (int i = 0; i < 256; ++i) { - transport = createTransport(serverLocation, testListener, createClientOptions()); - try { - transport.connect(null, null); - LOG.info("Connected to server:{} as expected.", serverLocation); - } catch (Exception e) { - fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); - } + transport = createConnectedTransport(serverLocation, createClientOptions()); assertTrue(transport.isConnected()); @@ -643,13 +758,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { }; clientOptions.setProxyHandlerSupplier(proxyHandlerFactory); - Transport transport = createTransport(serverLocation, testListener, clientOptions); - try { - transport.connect(null, null); - LOG.info("Connected to server:{} as expected.", serverLocation); - } catch (Exception e) { - fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); - } + Transport transport = createConnectedTransport(serverLocation, clientOptions); assertTrue(transport.isConnected()); assertEquals(serverLocation, transport.getRemoteLocation()); @@ -692,17 +801,16 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { TransportOptions options = createClientOptions(); options.setUseEpoll(useEpoll); options.setUseKQueue(false); - Transport transport = createTransport(serverLocation, testListener, options); - try { - transport.connect(null, null); - LOG.info("Connected to server:{} as expected.", serverLocation); - } catch (Exception e) { - fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); - } + Transport transport = createConnectedTransport(serverLocation, options); assertTrue(transport.isConnected()); assertEquals(serverLocation, transport.getRemoteLocation()); - assertEpoll("Transport should be using Epoll", useEpoll, transport); + + if(useEpoll) { + assertEventLoopGroupType("Transport should be using Epoll", transport, EpollEventLoopGroup.class); + } else { + assertEventLoopGroupType("Transport should be using Nio", transport, NioEventLoopGroup.class); + } transport.close(); @@ -715,13 +823,13 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { assertTrue(data.isEmpty()); } - private void assertEpoll(String message, boolean expected, Transport transport) throws Exception { - Field group = null; + private static EventLoopGroupRef getGroupRef(final Transport transport) throws IllegalAccessException { + Field groupRefField = null; Class<?> transportType = transport.getClass(); - while (transportType != null && group == null) { + while (transportType != null && groupRefField == null) { try { - group = transportType.getDeclaredField("group"); + groupRefField = transportType.getDeclaredField("groupRef"); } catch (NoSuchFieldException error) { transportType = transportType.getSuperclass(); if (Object.class.equals(transportType)) { @@ -730,14 +838,16 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { } } - assertNotNull("Transport implementation unknown", group); + assertNotNull("Transport implementation unknown", groupRefField); - group.setAccessible(true); - if (expected) { - assertTrue(message, group.get(transport) instanceof EpollEventLoopGroup); - } else { - assertFalse(message, group.get(transport) instanceof EpollEventLoopGroup); - } + groupRefField.setAccessible(true); + return (EventLoopGroupRef) groupRefField.get(transport); + } + + private static void assertEventLoopGroupType(String message, Transport transport, Class<? extends EventLoopGroup> eventLoopGroupClass) throws Exception { + final EventLoopGroupRef groupRef = getGroupRef(transport); + + assertThat(message, groupRef.group(), instanceOf(eventLoopGroupClass)); } @Test(timeout = 60 * 1000) @@ -762,17 +872,15 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { TransportOptions options = createClientOptions(); options.setUseKQueue(useKQueue); options.setUseEpoll(false); - Transport transport = createTransport(serverLocation, testListener, options); - try { - transport.connect(null, null); - LOG.info("Connected to server:{} as expected.", serverLocation); - } catch (Exception e) { - fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); - } + Transport transport = createConnectedTransport(serverLocation, options); assertTrue(transport.isConnected()); assertEquals(serverLocation, transport.getRemoteLocation()); - assertKQueue("Transport should be using Kqueue", useKQueue, transport); + if(useKQueue) { + assertEventLoopGroupType("Transport should be using Kqueue", transport, KQueueEventLoopGroup.class); + } else { + assertEventLoopGroupType("Transport should be using Nio", transport, NioEventLoopGroup.class); + } transport.close(); @@ -785,31 +893,6 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { assertTrue(data.isEmpty()); } - private void assertKQueue(String message, boolean expected, Transport transport) throws Exception { - Field group = null; - Class<?> transportType = transport.getClass(); - - while (transportType != null && group == null) { - try { - group = transportType.getDeclaredField("group"); - } catch (NoSuchFieldException error) { - transportType = transportType.getSuperclass(); - if (Object.class.equals(transportType)) { - transportType = null; - } - } - } - - assertNotNull("Transport implementation unknown", group); - - group.setAccessible(true); - if (expected) { - assertTrue(message, group.get(transport) instanceof KQueueEventLoopGroup); - } else { - assertFalse(message, group.get(transport) instanceof KQueueEventLoopGroup); - } - } - protected Transport createTransport(URI serverLocation, TransportListener listener, TransportOptions options) { if (listener == null) { return new NettyTcpTransport(serverLocation, options, false); @@ -818,6 +901,27 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { } } + private Transport createConnectedTransport(final URI serverLocation, final TransportOptions options) { + Transport transport = createTransport(serverLocation, testListener, options); + try { + transport.connect(null, null); + LOG.info("Connected to server:{} as expected.", serverLocation); + } catch (Exception e) { + fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); + } + return transport; + } + + private void cleanUpTransports(final Set<Transport> transports) { + transports.forEach(transport -> { + try { + transport.close(); + } catch (Throwable t) { + LOG.warn(t.getMessage()); + } + }); + } + protected TransportOptions createClientOptions() { return new TransportOptions(); } @@ -872,4 +976,4 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { exceptions.add(cause); } } -} +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org