This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ca0618  QPIDJMS-553 Shared Netty event loop group
4ca0618 is described below

commit 4ca0618f546d3d7b54c8639036841646c0ffa202
Author: franz1981 <nigro....@gmail.com>
AuthorDate: Wed Mar 16 11:01:28 2022 +0100

    QPIDJMS-553 Shared Netty event loop group
    
    co-author: gemmellr
---
 .../qpid/jms/transports/TransportOptions.java      |  11 +
 .../jms/transports/netty/EventLoopGroupRef.java    |  27 ++
 .../qpid/jms/transports/netty/EventLoopType.java   |  89 ++++++
 .../netty/NettyEventLoopGroupFactory.java          | 211 +++++++++++++
 .../jms/transports/netty/NettyTcpTransport.java    |  53 ++--
 .../apache/qpid/jms/util/QpidJMSThreadFactory.java |  35 ++-
 .../qpid/jms/transports/TransportOptionsTest.java  |   4 +
 .../netty/NettySslTransportFactoryTest.java        |   4 +
 .../netty/NettyTcpTransportFactoryTest.java        |   4 +
 .../transports/netty/NettyTcpTransportTest.java    | 342 ++++++++++++++-------
 10 files changed, 618 insertions(+), 162 deletions(-)

diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
index ba62d04..1ed1771 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
@@ -39,6 +39,7 @@ public class TransportOptions implements Cloneable {
     public static final int DEFAULT_SO_TIMEOUT = -1;
     public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
     public static final int DEFAULT_TCP_PORT = 5672;
+    public static final int DEFAULT_SHARED_EVENT_LOOP_THREADS = -1;
     public static final boolean DEFAULT_USE_EPOLL = true;
     public static final boolean DEFAULT_USE_KQUEUE = false;
     public static final boolean DEFAULT_TRACE_BYTES = false;
@@ -73,6 +74,7 @@ public class TransportOptions implements Cloneable {
     private boolean useKQueue = DEFAULT_USE_KQUEUE;
     private boolean traceBytes = DEFAULT_TRACE_BYTES;
     private boolean useOpenSSL = DEFAULT_USE_OPENSSL;
+    private int sharedEventLoopThreads = DEFAULT_SHARED_EVENT_LOOP_THREADS;
 
     private String keyStoreLocation;
     private String keyStorePassword;
@@ -213,6 +215,14 @@ public class TransportOptions implements Cloneable {
         this.tcpKeepAlive = keepAlive;
     }
 
+    public void setSharedEventLoopThreads(int numThreads) {
+        this.sharedEventLoopThreads = numThreads;
+    }
+
+    public int getSharedEventLoopThreads() {
+        return sharedEventLoopThreads;
+    }
+
     public int getConnectTimeout() {
         return connectTimeout;
     }
@@ -590,6 +600,7 @@ public class TransportOptions implements Cloneable {
         copy.setUseOpenSSL(isUseOpenSSL());
         copy.setLocalAddress(getLocalAddress());
         copy.setLocalPort(getLocalPort());
+        copy.setSharedEventLoopThreads(getSharedEventLoopThreads());
 
         return copy;
     }
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopGroupRef.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopGroupRef.java
new file mode 100644
index 0000000..e590e5f
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopGroupRef.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import io.netty.channel.EventLoopGroup;
+
+public interface EventLoopGroupRef extends AutoCloseable {
+
+    EventLoopGroup group();
+
+    @Override
+    void close();
+}
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopType.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopType.java
new file mode 100644
index 0000000..610a6ff
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/EventLoopType.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ThreadFactory;
+
+import static java.util.Objects.requireNonNull;
+
+public enum EventLoopType {
+    EPOLL, KQUEUE, NIO;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(EventLoopType.class);
+
+    public void createChannel(final Bootstrap bootstrap) {
+        createChannel(this, requireNonNull(bootstrap));
+    }
+
+    public EventLoopGroup createEventLoopGroup(final int threads, final 
ThreadFactory ioThreadFactory) {
+        return createEventLoopGroup(this, threads, ioThreadFactory);
+    }
+
+    private static EventLoopGroup createEventLoopGroup(final EventLoopType 
type, final int threads, final ThreadFactory ioThreadFactory) {
+        switch (type) {
+            case EPOLL:
+                LOG.trace("Netty Transport using Epoll mode");
+                return EpollSupport.createGroup(threads, ioThreadFactory);
+            case KQUEUE:
+                LOG.trace("Netty Transport using KQueue mode");
+                return KQueueSupport.createGroup(threads, ioThreadFactory);
+            case NIO:
+                LOG.trace("Netty Transport using Nio mode");
+                return new NioEventLoopGroup(threads, ioThreadFactory);
+            default:
+                throw new IllegalArgumentException("Unknown event loop type:" 
+ type);
+        }
+    }
+
+    private static void createChannel(final EventLoopType type, final 
Bootstrap bootstrap) {
+        switch (type) {
+            case EPOLL:
+                EpollSupport.createChannel(bootstrap);
+                break;
+            case KQUEUE:
+                KQueueSupport.createChannel(bootstrap);
+                break;
+            case NIO:
+                bootstrap.channel(NioSocketChannel.class);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown event loop type:" 
+ type);
+        }
+    }
+
+    public static EventLoopType valueOf(final TransportOptions 
transportOptions) {
+        final boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
+        final boolean useEpoll = EpollSupport.isAvailable(transportOptions);
+        if (useKQueue) {
+            return KQUEUE;
+        }
+
+        if (useEpoll) {
+            return EPOLL;
+        }
+
+        return NIO;
+    }
+}
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java
new file mode 100644
index 0000000..b047f7c
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.Future;
+
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(NettyEventLoopGroupFactory.class);
+    private static final AtomicLong SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE 
= new AtomicLong(0);
+    private static final int SHUTDOWN_TIMEOUT = 50;
+
+    private static final Map<EventLoopGroupKey, EventLoopGroupHolder> 
SHARED_EVENT_LOOP_GROUPS = new HashMap<>();
+
+    private NettyEventLoopGroupFactory() {
+        // No instances
+    }
+
+    public static EventLoopGroupRef unsharedGroup(final EventLoopType type, 
final ThreadFactory threadFactory) {
+        Objects.requireNonNull(type);
+        final EventLoopGroup unsharedGroup = type.createEventLoopGroup(1, 
threadFactory);
+
+        return new EventLoopGroupRef() {
+            @Override
+            public EventLoopGroup group() {
+                return unsharedGroup;
+            }
+
+            @Override
+            public void close() {
+                shutdownEventLoopGroup(unsharedGroup);
+            }
+        };
+    }
+
+    public static EventLoopGroupRef sharedGroup(final EventLoopType type, 
final int threads) {
+        Objects.requireNonNull(type);
+        if (threads <= 0) {
+            throw new IllegalArgumentException("shared event loop threads 
value must be > 0");
+        }
+
+        final EventLoopGroupKey key = new EventLoopGroupKey(type, threads);
+
+        synchronized (SHARED_EVENT_LOOP_GROUPS) {
+            EventLoopGroupHolder groupHolder = 
SHARED_EVENT_LOOP_GROUPS.get(key);
+            if (groupHolder == null) {
+                groupHolder = new 
EventLoopGroupHolder(createSharedEventLoopGroup(type, threads), key);
+
+                SHARED_EVENT_LOOP_GROUPS.put(key, groupHolder);
+            } else {
+                groupHolder.incRef();
+            }
+
+            return new SharedEventLoopGroupRef(groupHolder);
+        }
+    }
+
+    private static void sharedGroupRefClosed(EventLoopGroupHolder holder) {
+        boolean shutdown = false;
+        synchronized (SHARED_EVENT_LOOP_GROUPS) {
+            if (holder.decRef()) {
+                SHARED_EVENT_LOOP_GROUPS.remove(holder.key());
+                shutdown = true;
+            }
+        }
+
+        if (shutdown) {
+            shutdownEventLoopGroup(holder.group());
+        }
+    }
+
+    private static void shutdownEventLoopGroup(final EventLoopGroup group) {
+        Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+        if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+            LOG.trace("Channel group shutdown failed to complete in allotted 
time");
+        }
+    }
+
+    private static ThreadFactory createSharedThreadFactory(final EventLoopType 
type, final int threads) {
+        final String baseName = "SharedNettyEventLoopGroup (" + 
SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE.incrementAndGet() + ")[" + type + " - 
size=" + threads + "]:";
+
+        return new QpidJMSThreadFactory(thread -> baseName + " thread-id=" + 
thread.getId(), true);
+    }
+
+    private static EventLoopGroup createSharedEventLoopGroup(final 
EventLoopType type, final int threads) {
+        return type.createEventLoopGroup(threads, 
createSharedThreadFactory(type, threads));
+    }
+
+    private static final class SharedEventLoopGroupRef implements 
EventLoopGroupRef {
+        private final EventLoopGroupHolder sharedGroupHolder;
+        private final AtomicBoolean closed = new AtomicBoolean();
+
+        public SharedEventLoopGroupRef(final EventLoopGroupHolder 
sharedGroupHolder) {
+            this.sharedGroupHolder = Objects.requireNonNull(sharedGroupHolder);
+        }
+
+        @Override
+        public EventLoopGroup group() {
+            if (closed.get()) {
+                throw new IllegalStateException("Group reference is already 
closed");
+            }
+
+            return sharedGroupHolder.group();
+        }
+
+        @Override
+        public void close() {
+            if (closed.compareAndSet(false, true)) {
+                sharedGroupRefClosed(sharedGroupHolder);
+            }
+        }
+    }
+
+    private static class EventLoopGroupKey {
+        private final EventLoopType type;
+        private final int eventLoopThreads;
+
+        private EventLoopGroupKey(final EventLoopType type, final int 
eventLoopThreads) {
+            this.type = type;
+            this.eventLoopThreads = eventLoopThreads;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            final EventLoopGroupKey that = (EventLoopGroupKey) o;
+            if (eventLoopThreads != that.eventLoopThreads) {
+                return false;
+            }
+            return type == that.type;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = type != null ? type.hashCode() : 0;
+            result = 31 * result + eventLoopThreads;
+            return result;
+        }
+    }
+
+    private static final class EventLoopGroupHolder {
+        private final EventLoopGroup group;
+        private final EventLoopGroupKey key;
+        private int refCnt = 1;
+
+        private EventLoopGroupHolder(final EventLoopGroup sharedGroup, final 
EventLoopGroupKey key) {
+            this.group = Objects.requireNonNull(sharedGroup);
+            this.key = Objects.requireNonNull(key);
+        }
+
+        public EventLoopGroup group() {
+            return group;
+        }
+
+        public EventLoopGroupKey key() {
+            return key;
+        }
+
+        public void incRef() {
+            assert Thread.holdsLock(SHARED_EVENT_LOOP_GROUPS);
+            if (refCnt == 0) {
+                throw new IllegalStateException("The group was already 
released, can not increment reference count.");
+            }
+
+            refCnt++;
+        }
+
+        public boolean decRef() {
+            assert Thread.holdsLock(SHARED_EVENT_LOOP_GROUPS);
+            if (refCnt == 0) {
+                throw new IllegalStateException("The group was already 
released, can not decrement reference count.");
+            }
+
+            refCnt--;
+
+            return refCnt == 0;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index a329482..7f0f9d3 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -23,7 +23,6 @@ import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
@@ -47,11 +46,8 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
 import io.netty.channel.FixedRecvByteBufAllocator;
 import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.logging.LoggingHandler;
 import io.netty.handler.proxy.ProxyHandler;
 import io.netty.handler.ssl.SslHandler;
@@ -60,6 +56,9 @@ import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 
+import static 
org.apache.qpid.jms.transports.netty.NettyEventLoopGroupFactory.sharedGroup;
+import static 
org.apache.qpid.jms.transports.netty.NettyEventLoopGroupFactory.unsharedGroup;
+
 /**
  * TCP based transport that uses Netty as the underlying IO layer.
  */
@@ -67,11 +66,9 @@ public class NettyTcpTransport implements Transport {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NettyTcpTransport.class);
 
-    public static final int SHUTDOWN_TIMEOUT = 50;
     public static final int DEFAULT_MAX_FRAME_SIZE = 65535;
 
-    protected Bootstrap bootstrap;
-    protected EventLoopGroup group;
+    protected EventLoopGroupRef groupRef;
     protected Channel channel;
     protected TransportListener listener;
     protected ThreadFactory ioThreadfactory;
@@ -137,29 +134,20 @@ public class NettyTcpTransport implements Transport {
         }
 
         TransportOptions transportOptions = getTransportOptions();
-        boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
-        boolean useEpoll = EpollSupport.isAvailable(transportOptions);
-
-        if (useKQueue) {
-            LOG.trace("Netty Transport using KQueue mode");
-            group = KQueueSupport.createGroup(1, ioThreadfactory);
-        } else if (useEpoll) {
-            LOG.trace("Netty Transport using Epoll mode");
-            group = EpollSupport.createGroup(1, ioThreadfactory);
-        } else {
-            LOG.trace("Netty Transport using NIO mode");
-            group = new NioEventLoopGroup(1, ioThreadfactory);
-        }
 
-        bootstrap = new Bootstrap();
-        bootstrap.group(group);
-        if (useKQueue) {
-            KQueueSupport.createChannel(bootstrap);
-        } else if (useEpoll) {
-            EpollSupport.createChannel(bootstrap);
+        EventLoopType eventLoopType = EventLoopType.valueOf(transportOptions);
+        int sharedEventLoopThreads = 
transportOptions.getSharedEventLoopThreads();
+        if (sharedEventLoopThreads > 0) {
+            groupRef = sharedGroup(eventLoopType, sharedEventLoopThreads);
         } else {
-            bootstrap.channel(NioSocketChannel.class);
+            groupRef = unsharedGroup(eventLoopType, ioThreadfactory);
         }
+
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.group(groupRef.group());
+
+        eventLoopType.createChannel(bootstrap);
+
         bootstrap.handler(new ChannelInitializer<Channel>() {
             @Override
             public void initChannel(Channel connectedChannel) throws Exception 
{
@@ -214,8 +202,8 @@ public class NettyTcpTransport implements Transport {
                 }
             });
         }
-
-        return group;
+        // returning the channel's specific event loop: the overall event loop 
group may be multi-threaded
+        return channel.eventLoop();
     }
 
     @Override
@@ -237,11 +225,8 @@ public class NettyTcpTransport implements Transport {
                     channel.close().syncUninterruptibly();
                 }
             } finally {
-                if (group != null) {
-                    Future<?> fut = group.shutdownGracefully(0, 
SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
-                    if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
-                        LOG.trace("Channel group shutdown failed to complete 
in allotted time");
-                    }
+                if (groupRef != null) {
+                    groupRef.close();
                 }
             }
         }
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
index b4e9f06..dc73834 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/QpidJMSThreadFactory.java
@@ -19,6 +19,7 @@ package org.apache.qpid.jms.util;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,7 +31,7 @@ public class QpidJMSThreadFactory implements ThreadFactory {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(QpidJMSThreadFactory.class);
 
-    private final String threadName;
+    private final Function<Thread, String> threadNamingStrategy;
     private final boolean daemon;
     private final AtomicReference<Thread> threadTracker;
 
@@ -39,12 +40,12 @@ public class QpidJMSThreadFactory implements ThreadFactory {
      * given name and daemon state.
      *
      * @param threadName
-     *                 the name that will be used for each thread created.
+     *      the name that will be used for each thread created.
      * @param daemon
-     *                 should the created thread be a daemon thread.
+     *      should the created thread be a daemon thread.
      */
     public QpidJMSThreadFactory(String threadName, boolean daemon) {
-        this.threadName = threadName;
+        this.threadNamingStrategy = t -> threadName;
         this.daemon = daemon;
         this.threadTracker = null;
     }
@@ -59,18 +60,33 @@ public class QpidJMSThreadFactory implements ThreadFactory {
      * to be known for some reason.
      *
      * @param threadName
-     *                 the name that will be used for each thread created.
+     *      the name that will be used for each thread created.
      * @param daemon
-     *                 should the created thread be a daemon thread.
+     *      should the created thread be a daemon thread.
      * @param threadTracker
-     *                 AtomicReference that will be updated any time a new 
Thread is created.
+     *      AtomicReference that will be updated any time a new Thread is 
created.
      */
     public QpidJMSThreadFactory(String threadName, boolean daemon, 
AtomicReference<Thread> threadTracker) {
-        this.threadName = threadName;
+        this.threadNamingStrategy = t -> threadName;
         this.daemon = daemon;
         this.threadTracker = threadTracker;
     }
 
+    /**
+     * Creates a new Thread factory that will create threads with the
+     * provided thread naming function and daemon state.
+     *
+     * @param threadNamingStrategy
+     *      the naming strategy that will be used for each thread created.
+     * @param daemon
+     *      should the created thread be a daemon thread.
+     */
+    public QpidJMSThreadFactory(Function<Thread, String> threadNamingStrategy, 
boolean daemon) {
+        this.threadNamingStrategy = threadNamingStrategy;
+        this.daemon = daemon;
+        this.threadTracker = null;
+    }
+
     @Override
     public Thread newThread(final Runnable target) {
         Runnable runner = target;
@@ -91,8 +107,9 @@ public class QpidJMSThreadFactory implements ThreadFactory {
             };
         }
 
-        Thread thread = new Thread(runner, threadName);
+        Thread thread = new Thread(runner);
         thread.setDaemon(daemon);
+        thread.setName(threadNamingStrategy.apply(thread));
         thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
 
             @Override
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
index 402608b..d476e5b 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
@@ -50,6 +50,7 @@ public class TransportOptionsTest extends QpidJmsTestCase {
     public static final int LOCAL_PORT = 30000;
     public static final boolean TEST_USE_EPOLL_VALUE = 
!TransportOptions.DEFAULT_USE_EPOLL;
     public static final boolean TEST_TRACE_BYTES_VALUE = 
!TransportOptions.DEFAULT_TRACE_BYTES;
+    public static final int TEST_SHARED_EVENT_LOOP_THREADS_VALUE = 5;
 
     private static final String PASSWORD = "password";
     private static final String CLIENT_KEYSTORE = 
"src/test/resources/client-jks.keystore";
@@ -98,6 +99,7 @@ public class TransportOptionsTest extends QpidJmsTestCase {
         assertNull(options.getKeyAlias());
         assertNull(options.getSslContextOverride());
         assertNull(options.getProxyHandlerSupplier());
+        assertEquals(TransportOptions.DEFAULT_SHARED_EVENT_LOOP_THREADS, 
options.getSharedEventLoopThreads());
     }
 
     @Test
@@ -149,6 +151,7 @@ public class TransportOptionsTest extends QpidJmsTestCase {
         assertArrayEquals(DISABLED_PROTOCOLS,options.getDisabledProtocols());
         assertArrayEquals(ENABLED_CIPHERS,options.getEnabledCipherSuites());
         assertArrayEquals(DISABLED_CIPHERS,options.getDisabledCipherSuites());
+        assertEquals(TEST_SHARED_EVENT_LOOP_THREADS_VALUE, 
options.getSharedEventLoopThreads());
     }
 
     @Test
@@ -337,6 +340,7 @@ public class TransportOptionsTest extends QpidJmsTestCase {
         options.setLocalAddress(LOCAL_ADDRESS);
         options.setLocalPort(LOCAL_PORT);
         options.setProxyHandlerSupplier(PROXY_HANDLER_SUPPLIER);
+        
options.setSharedEventLoopThreads(TEST_SHARED_EVENT_LOOP_THREADS_VALUE);
 
         return options;
     }
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactoryTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactoryTest.java
index 99a4334..b03e6b9 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactoryTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactoryTest.java
@@ -49,6 +49,7 @@ public class NettySslTransportFactoryTest {
     public static final int CUSTOM_CONNECT_TIMEOUT = 90000;
     private static final String CUSTOM_LOCAL_ADDRESS = "localhost";
     private static final int CUSTOM_LOCAL_PORT = 30000;
+    private static final int CUSTOM_SHARED_EVENT_LOOP_THREADS = 7;
 
     public static final String CUSTOM_CONTEXT_PROTOCOL = "TLSv1.2";
     public static final String[] CUSTOM_ENABLED_PROTOCOLS = { "TLSv1.1", 
"TLSv1.2" };
@@ -93,6 +94,7 @@ public class NettySslTransportFactoryTest {
         assertEquals(TransportOptions.DEFAULT_SO_TIMEOUT, 
options.getSoTimeout());
         assertNull(options.getLocalAddress());
         assertEquals(TransportOptions.DEFAULT_LOCAL_PORT, 
options.getLocalPort());
+        assertEquals(TransportOptions.DEFAULT_SHARED_EVENT_LOOP_THREADS, 
options.getSharedEventLoopThreads());
 
         assertEquals(TransportOptions.DEFAULT_CONTEXT_PROTOCOL, 
options.getContextProtocol());
         assertNull(options.getEnabledProtocols());
@@ -124,6 +126,7 @@ public class NettySslTransportFactoryTest {
         URI BASE_URI = new URI("tcp://localhost:5672");
 
         URI configuredURI = new URI(BASE_URI.toString() + "?" +
+            "transport.sharedEventLoopThreads=" + 
CUSTOM_SHARED_EVENT_LOOP_THREADS + "&" +
             "transport.connectTimeout=" + CUSTOM_CONNECT_TIMEOUT + "&" +
             "transport.sendBufferSize=" + CUSTOM_SEND_BUFFER_SIZE + "&" +
             "transport.receiveBufferSize=" + CUSTOM_RECEIVE_BUFFER_SIZE + "&" +
@@ -156,6 +159,7 @@ public class NettySslTransportFactoryTest {
         TransportOptions options = transport.getTransportOptions();
         assertNotNull(options);
 
+        assertEquals(CUSTOM_SHARED_EVENT_LOOP_THREADS, 
options.getSharedEventLoopThreads());
         assertEquals(CUSTOM_CONNECT_TIMEOUT, options.getConnectTimeout());
         assertEquals(CUSTOM_SEND_BUFFER_SIZE, options.getSendBufferSize());
         assertEquals(CUSTOM_RECEIVE_BUFFER_SIZE, 
options.getReceiveBufferSize());
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactoryTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactoryTest.java
index cbb88d3..2968023 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactoryTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportFactoryTest.java
@@ -44,6 +44,7 @@ public class NettyTcpTransportFactoryTest {
     public static final int CUSTOM_CONNECT_TIMEOUT = 90000;
     private static final String CUSTOM_LOCAL_ADDRESS = "localhost";
     private static final int CUSTOM_LOCAL_PORT = 30000;
+    private static final int CUSTOM_SHARED_EVENT_LOOP_THREADS = 7;
 
     @Test(timeout = 30000)
     public void testCreateWithDefaultOptions() throws Exception {
@@ -70,6 +71,7 @@ public class NettyTcpTransportFactoryTest {
         assertEquals(TransportOptions.DEFAULT_SO_TIMEOUT, 
options.getSoTimeout());
         assertNull(options.getLocalAddress());
         assertEquals(TransportOptions.DEFAULT_LOCAL_PORT, 
options.getLocalPort());
+        assertEquals(TransportOptions.DEFAULT_SHARED_EVENT_LOOP_THREADS, 
options.getSharedEventLoopThreads());
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -114,6 +116,7 @@ public class NettyTcpTransportFactoryTest {
         URI BASE_URI = new URI("tcp://localhost:5672");
 
         URI configuredURI = new URI(BASE_URI.toString() + "?" +
+            "transport.sharedEventLoopThreads=" + 
CUSTOM_SHARED_EVENT_LOOP_THREADS + "&" +
             "transport.connectTimeout=" + CUSTOM_CONNECT_TIMEOUT + "&" +
             "transport.sendBufferSize=" + CUSTOM_SEND_BUFFER_SIZE + "&" +
             "transport.receiveBufferSize=" + CUSTOM_RECEIVE_BUFFER_SIZE + "&" +
@@ -136,6 +139,7 @@ public class NettyTcpTransportFactoryTest {
         TransportOptions options = transport.getTransportOptions();
         assertNotNull(options);
 
+        assertEquals(CUSTOM_SHARED_EVENT_LOOP_THREADS, 
options.getSharedEventLoopThreads());
         assertEquals(CUSTOM_CONNECT_TIMEOUT, options.getConnectTimeout());
         assertEquals(CUSTOM_SEND_BUFFER_SIZE, options.getSendBufferSize());
         assertEquals(CUSTOM_RECEIVE_BUFFER_SIZE, 
options.getReceiveBufferSize());
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
index 80abf34..9bc61f9 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
@@ -16,9 +16,12 @@
  */
 package org.apache.qpid.jms.transports.netty;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
@@ -31,11 +34,14 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
+import io.netty.channel.EventLoopGroup;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.Wait;
 import org.apache.qpid.jms.test.proxy.TestProxy;
@@ -58,6 +64,7 @@ import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollEventLoopGroup;
 import io.netty.channel.kqueue.KQueue;
 import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.handler.proxy.ProxyHandler;
 import io.netty.handler.proxy.Socks5ProxyHandler;
 import io.netty.util.ResourceLeakDetector;
@@ -225,13 +232,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase 
{
             int port = server.getServerPort();
             URI serverLocation = new URI("tcp://localhost:" + port);
 
-            Transport transport = createTransport(serverLocation, 
testListener, createClientOptions());
-            try {
-                transport.connect(null, null);
-                LOG.info("Connected to server:{} as expected.", 
serverLocation);
-            } catch (Exception e) {
-                fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
-            }
+            Transport transport = createConnectedTransport(serverLocation, 
createClientOptions());
 
             assertTrue(transport.isConnected());
             assertEquals(serverLocation, transport.getRemoteLocation());
@@ -260,15 +261,10 @@ public class NettyTcpTransportTest extends 
QpidJmsTestCase {
             List<Transport> transports = new ArrayList<Transport>();
 
             for (int i = 0; i < CONNECTION_COUNT; ++i) {
-                Transport transport = createTransport(serverLocation, 
testListener, createClientOptions());
-                try {
-                    transport.connect(null, null);
-                    assertTrue(transport.isConnected());
-                    LOG.info("Connected to server:{} as expected.", 
serverLocation);
-                    transports.add(transport);
-                } catch (Exception e) {
-                    fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
-                }
+                Transport transport = createConnectedTransport(serverLocation, 
createClientOptions());
+                assertTrue(transport.isConnected());
+
+                transports.add(transport);
             }
 
             for (Transport transport : transports) {
@@ -336,13 +332,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase 
{
             int port = server.getServerPort();
             URI serverLocation = new URI("tcp://localhost:" + port);
 
-            transport = createTransport(serverLocation, testListener, 
createClientOptions());
-            try {
-                transport.connect(null, null);
-                LOG.info("Connected to server:{} as expected.", 
serverLocation);
-            } catch (Exception e) {
-                fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
-            }
+            transport = createConnectedTransport(serverLocation, 
createClientOptions());
 
             assertTrue(transport.isConnected());
 
@@ -374,13 +364,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase 
{
             int port = server.getServerPort();
             URI serverLocation = new URI("tcp://localhost:" + port);
 
-            Transport transport = createTransport(serverLocation, 
testListener, createClientOptions());
-            try {
-                transport.connect(null, null);
-                LOG.info("Connected to server:{} as expected.", 
serverLocation);
-            } catch (Exception e) {
-                fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
-            }
+            Transport transport = createConnectedTransport(serverLocation, 
createClientOptions());
 
             assertTrue(transport.isConnected());
 
@@ -395,20 +379,171 @@ public class NettyTcpTransportTest extends 
QpidJmsTestCase {
     }
 
     @Test(timeout = 60 * 1000)
-    public void testDataSentIsReceived() throws Exception {
+    public void testCannotDereferenceSharedClosedEventLoopGroup() throws 
Exception {
         try (NettyEchoServer server = createEchoServer(createServerOptions())) 
{
             server.start();
 
             int port = server.getServerPort();
             URI serverLocation = new URI("tcp://localhost:" + port);
+            final TransportOptions sharedTransportOptions = 
createClientOptions();
+            sharedTransportOptions.setUseKQueue(false);
+            sharedTransportOptions.setUseEpoll(false);
+            sharedTransportOptions.setSharedEventLoopThreads(1);
 
-            Transport transport = createTransport(serverLocation, 
testListener, createClientOptions());
+            EventLoopGroupRef groupRef = null;
+            Transport nioSharedTransport = 
createConnectedTransport(serverLocation, sharedTransportOptions);
             try {
-                transport.connect(null, null);
-                LOG.info("Connected to server:{} as expected.", 
serverLocation);
-            } catch (Exception e) {
-                fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
+                groupRef = getGroupRef(nioSharedTransport);
+                assertNotNull(groupRef.group());
+            } finally {
+                nioSharedTransport.close();
+            }
+
+            try {
+                groupRef.group();
+                fail("Should have thrown ISE due to being closed");
+            } catch (IllegalStateException expected) {
+                // Ignore
+            } catch (Throwable unexpected) {
+                fail("Should have thrown IllegalStateException");
             }
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSharedEventLoopGroups() throws Exception {
+        final Set<Transport> transports = new HashSet<>();
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) 
{
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+            final TransportOptions sharedTransportOptions = 
createClientOptions();
+            sharedTransportOptions.setUseKQueue(false);
+            sharedTransportOptions.setUseEpoll(false);
+            sharedTransportOptions.setSharedEventLoopThreads(1);
+
+            Transport sharedNioTransport1 = 
createConnectedTransport(serverLocation, sharedTransportOptions);
+            transports.add(sharedNioTransport1);
+            Transport sharedNioTransport2 = 
createConnectedTransport(serverLocation, sharedTransportOptions);
+            transports.add(sharedNioTransport2);
+
+            final EventLoopGroup sharedGroup = 
getGroupRef(sharedNioTransport1).group();
+            assertSame(sharedGroup, getGroupRef(sharedNioTransport2).group());
+
+            sharedNioTransport1.close();
+            assertFalse(sharedGroup.isShutdown());
+            assertFalse(sharedGroup.isTerminated());
+
+            sharedNioTransport2.close();
+            assertTrue(sharedGroup.isShutdown());
+            assertTrue(sharedGroup.isTerminated());
+        } finally {
+            // Ensures that any not already closed, e.g due to test failure, 
are now closed.
+            cleanUpTransports(transports);
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSharedEventLoopGroupsOfDifferentSizes() throws Exception {
+        final Set<Transport> transports = new HashSet<>();
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) 
{
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            final TransportOptions sharedTransportOptions1 = 
createClientOptions();
+            sharedTransportOptions1.setUseKQueue(false);
+            sharedTransportOptions1.setUseEpoll(false);
+            sharedTransportOptions1.setSharedEventLoopThreads(1);
+            Transport nioSharedTransport1 = 
createConnectedTransport(serverLocation, sharedTransportOptions1);
+            transports.add(nioSharedTransport1);
+
+            final TransportOptions sharedTransportOptions2 = 
createClientOptions();
+            sharedTransportOptions2.setUseKQueue(false);
+            sharedTransportOptions2.setUseEpoll(false);
+            sharedTransportOptions2.setSharedEventLoopThreads(2);
+            Transport nioSharedTransport2 = 
createConnectedTransport(serverLocation, sharedTransportOptions2);
+            transports.add(nioSharedTransport2);
+
+            EventLoopGroup sharedGroup1 = 
getGroupRef(nioSharedTransport1).group();
+            EventLoopGroup sharedGroup2 = 
getGroupRef(nioSharedTransport2).group();
+            assertNotSame(sharedGroup1, sharedGroup2);
+
+            nioSharedTransport1.close();
+            assertTrue(sharedGroup1.isShutdown());
+            assertTrue(sharedGroup1.isTerminated());
+
+            nioSharedTransport2.close();
+            assertTrue(sharedGroup2.isShutdown());
+            assertTrue(sharedGroup2.isTerminated());
+        } finally {
+            // Ensures that any not already closed, e.g due to test failure, 
are now closed.
+            cleanUpTransports(transports);
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testUnsharedEventLoopGroups() throws Exception {
+        final Set<Transport> transports = new HashSet<>();
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) 
{
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+            final TransportOptions unsharedTransportOptions = 
createClientOptions();
+            unsharedTransportOptions.setUseKQueue(false);
+            unsharedTransportOptions.setUseEpoll(false);
+            unsharedTransportOptions.setSharedEventLoopThreads(0);
+
+            Transport unsharedNioTransport1 = 
createConnectedTransport(serverLocation, unsharedTransportOptions);
+            transports.add(unsharedNioTransport1);
+            Transport unsharedNioTransport2 = 
createConnectedTransport(serverLocation, unsharedTransportOptions);
+            transports.add(unsharedNioTransport2);
+
+            final EventLoopGroup unsharedGroup1 = 
getGroupRef(unsharedNioTransport1).group();
+            final EventLoopGroup unsharedGroup2 = 
getGroupRef(unsharedNioTransport2).group();
+            assertNotSame(unsharedGroup1, unsharedNioTransport2);
+
+            unsharedNioTransport1.close();
+            assertTrue(unsharedGroup1.isShutdown());
+            assertTrue(unsharedGroup1.isTerminated());
+
+            unsharedNioTransport2.close();
+            assertTrue(unsharedGroup2.isShutdown());
+            assertTrue(unsharedGroup2.isTerminated());
+        } finally {
+            // Ensures that any not already closed, e.g due to test failure, 
are now closed.
+            cleanUpTransports(transports);
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testDataSentIsReceived() throws Exception {
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) 
{
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            Transport transport = createConnectedTransport(serverLocation, 
createClientOptions());
 
             assertTrue(transport.isConnected());
 
@@ -453,13 +588,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase 
{
             int port = server.getServerPort();
             URI serverLocation = new URI("tcp://localhost:" + port);
 
-            Transport transport = createTransport(serverLocation, 
testListener, createClientOptions());
-            try {
-                transport.connect(null, null);
-                LOG.info("Connected to server:{} as expected.", 
serverLocation);
-            } catch (Exception e) {
-                fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
-            }
+            Transport transport = createConnectedTransport(serverLocation, 
createClientOptions());
 
             assertTrue(transport.isConnected());
 
@@ -488,21 +617,13 @@ public class NettyTcpTransportTest extends 
QpidJmsTestCase {
 
     @Test(timeout = 60 * 1000)
     public void testSendToClosedTransportFails() throws Exception {
-        Transport transport = null;
-
         try (NettyEchoServer server = createEchoServer(createServerOptions())) 
{
             server.start();
 
             int port = server.getServerPort();
             URI serverLocation = new URI("tcp://localhost:" + port);
 
-            transport = createTransport(serverLocation, testListener, 
createClientOptions());
-            try {
-                transport.connect(null, null);
-                LOG.info("Connected to server:{} as expected.", 
serverLocation);
-            } catch (Exception e) {
-                fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
-            }
+            Transport transport = createConnectedTransport(serverLocation, 
createClientOptions());
 
             assertTrue(transport.isConnected());
 
@@ -588,13 +709,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase 
{
             URI serverLocation = new URI("tcp://localhost:" + port);
 
             for (int i = 0; i < 256; ++i) {
-                transport = createTransport(serverLocation, testListener, 
createClientOptions());
-                try {
-                    transport.connect(null, null);
-                    LOG.info("Connected to server:{} as expected.", 
serverLocation);
-                } catch (Exception e) {
-                    fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
-                }
+                transport = createConnectedTransport(serverLocation, 
createClientOptions());
 
                 assertTrue(transport.isConnected());
 
@@ -643,13 +758,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase 
{
             };
             clientOptions.setProxyHandlerSupplier(proxyHandlerFactory);
 
-            Transport transport = createTransport(serverLocation, 
testListener, clientOptions);
-            try {
-                transport.connect(null, null);
-                LOG.info("Connected to server:{} as expected.", 
serverLocation);
-            } catch (Exception e) {
-                fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
-            }
+            Transport transport = createConnectedTransport(serverLocation, 
clientOptions);
 
             assertTrue(transport.isConnected());
             assertEquals(serverLocation, transport.getRemoteLocation());
@@ -692,17 +801,16 @@ public class NettyTcpTransportTest extends 
QpidJmsTestCase {
             TransportOptions options = createClientOptions();
             options.setUseEpoll(useEpoll);
             options.setUseKQueue(false);
-            Transport transport = createTransport(serverLocation, 
testListener, options);
-            try {
-                transport.connect(null, null);
-                LOG.info("Connected to server:{} as expected.", 
serverLocation);
-            } catch (Exception e) {
-                fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
-            }
+            Transport transport = createConnectedTransport(serverLocation, 
options);
 
             assertTrue(transport.isConnected());
             assertEquals(serverLocation, transport.getRemoteLocation());
-            assertEpoll("Transport should be using Epoll", useEpoll, 
transport);
+
+            if(useEpoll) {
+                assertEventLoopGroupType("Transport should be using Epoll", 
transport, EpollEventLoopGroup.class);
+            } else {
+                assertEventLoopGroupType("Transport should be using Nio", 
transport, NioEventLoopGroup.class);
+            }
 
             transport.close();
 
@@ -715,13 +823,13 @@ public class NettyTcpTransportTest extends 
QpidJmsTestCase {
         assertTrue(data.isEmpty());
     }
 
-    private void assertEpoll(String message, boolean expected, Transport 
transport) throws Exception {
-        Field group = null;
+    private static EventLoopGroupRef getGroupRef(final Transport transport) 
throws IllegalAccessException {
+        Field groupRefField = null;
         Class<?> transportType = transport.getClass();
 
-        while (transportType != null && group == null) {
+        while (transportType != null && groupRefField == null) {
             try {
-                group = transportType.getDeclaredField("group");
+                groupRefField = transportType.getDeclaredField("groupRef");
             } catch (NoSuchFieldException error) {
                 transportType = transportType.getSuperclass();
                 if (Object.class.equals(transportType)) {
@@ -730,14 +838,16 @@ public class NettyTcpTransportTest extends 
QpidJmsTestCase {
             }
         }
 
-        assertNotNull("Transport implementation unknown", group);
+        assertNotNull("Transport implementation unknown", groupRefField);
 
-        group.setAccessible(true);
-        if (expected) {
-            assertTrue(message, group.get(transport) instanceof 
EpollEventLoopGroup);
-        } else {
-            assertFalse(message, group.get(transport) instanceof 
EpollEventLoopGroup);
-        }
+        groupRefField.setAccessible(true);
+        return (EventLoopGroupRef) groupRefField.get(transport);
+    }
+
+    private static void assertEventLoopGroupType(String message, Transport 
transport, Class<? extends EventLoopGroup> eventLoopGroupClass) throws 
Exception {
+        final EventLoopGroupRef groupRef = getGroupRef(transport);
+
+        assertThat(message, groupRef.group(), instanceOf(eventLoopGroupClass));
     }
 
     @Test(timeout = 60 * 1000)
@@ -762,17 +872,15 @@ public class NettyTcpTransportTest extends 
QpidJmsTestCase {
             TransportOptions options = createClientOptions();
             options.setUseKQueue(useKQueue);
             options.setUseEpoll(false);
-            Transport transport = createTransport(serverLocation, 
testListener, options);
-            try {
-                transport.connect(null, null);
-                LOG.info("Connected to server:{} as expected.", 
serverLocation);
-            } catch (Exception e) {
-                fail("Should have connected to the server at " + 
serverLocation + " but got exception: " + e);
-            }
+            Transport transport = createConnectedTransport(serverLocation, 
options);
 
             assertTrue(transport.isConnected());
             assertEquals(serverLocation, transport.getRemoteLocation());
-            assertKQueue("Transport should be using Kqueue", useKQueue, 
transport);
+            if(useKQueue) {
+                assertEventLoopGroupType("Transport should be using Kqueue", 
transport, KQueueEventLoopGroup.class);
+            } else {
+                assertEventLoopGroupType("Transport should be using Nio", 
transport, NioEventLoopGroup.class);
+            }
 
             transport.close();
 
@@ -785,31 +893,6 @@ public class NettyTcpTransportTest extends QpidJmsTestCase 
{
         assertTrue(data.isEmpty());
     }
 
-    private void assertKQueue(String message, boolean expected, Transport 
transport) throws Exception {
-        Field group = null;
-        Class<?> transportType = transport.getClass();
-
-        while (transportType != null && group == null) {
-            try {
-                group = transportType.getDeclaredField("group");
-            } catch (NoSuchFieldException error) {
-                transportType = transportType.getSuperclass();
-                if (Object.class.equals(transportType)) {
-                    transportType = null;
-                }
-            }
-        }
-
-        assertNotNull("Transport implementation unknown", group);
-
-        group.setAccessible(true);
-        if (expected) {
-            assertTrue(message, group.get(transport) instanceof 
KQueueEventLoopGroup);
-        } else {
-            assertFalse(message, group.get(transport) instanceof 
KQueueEventLoopGroup);
-        }
-    }
-
     protected Transport createTransport(URI serverLocation, TransportListener 
listener, TransportOptions options) {
         if (listener == null) {
             return new NettyTcpTransport(serverLocation, options, false);
@@ -818,6 +901,27 @@ public class NettyTcpTransportTest extends QpidJmsTestCase 
{
         }
     }
 
+    private Transport createConnectedTransport(final URI serverLocation, final 
TransportOptions options) {
+        Transport transport = createTransport(serverLocation, testListener, 
options);
+        try {
+            transport.connect(null, null);
+            LOG.info("Connected to server:{} as expected.", serverLocation);
+        } catch (Exception e) {
+            fail("Should have connected to the server at " + serverLocation + 
" but got exception: " + e);
+        }
+        return transport;
+    }
+
+    private void cleanUpTransports(final Set<Transport> transports) {
+        transports.forEach(transport -> {
+            try {
+                transport.close();
+            } catch (Throwable t) {
+                LOG.warn(t.getMessage());
+            }
+        });
+    }
+
     protected TransportOptions createClientOptions() {
         return new TransportOptions();
     }
@@ -872,4 +976,4 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
             exceptions.add(cause);
         }
     }
-}
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to