gemmellr commented on a change in pull request #45:
URL: https://github.com/apache/qpid-jms/pull/45#discussion_r823559298



##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.Future;
+
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(NettyEventLoopGroupFactory.class);
+    private static final AtomicLong SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE 
= new AtomicLong(0);
+    private static final int SHUTDOWN_TIMEOUT = 50;
+
+    private static final Map<EventLoopGroupKey, EventLoopGroupHolder> 
SHARED_EVENT_LOOP_GROUPS = new HashMap<>();
+
+    private NettyEventLoopGroupFactory() {
+        // No instances
+    }
+
+    public static EventLoopGroupRef unsharedGroup(final EventLoopType type, 
final ThreadFactory threadFactory) {
+        Objects.requireNonNull(type);
+        final EventLoopGroup unsharedGroup = type.createEventLoopGroup(1, 
threadFactory);
+
+        return new EventLoopGroupRef() {
+            @Override
+            public EventLoopGroup group() {
+                return unsharedGroup;
+            }
+
+            @Override
+            public void close() {
+                shutdownEventLoopGroup(unsharedGroup);
+            }
+        };
+    }
+
+    public static EventLoopGroupRef sharedGroup(final EventLoopType type, 
final int threads) {
+        Objects.requireNonNull(type);
+        if (threads <= 0) {
+            throw new IllegalArgumentException("shared event loop threads 
value must be > 0");
+        }
+
+        final EventLoopGroupKey key = new EventLoopGroupKey(type, threads);
+
+        synchronized (SHARED_EVENT_LOOP_GROUPS) {
+            EventLoopGroupHolder groupHolder = 
SHARED_EVENT_LOOP_GROUPS.get(key);
+            if (groupHolder == null) {
+                groupHolder = new 
EventLoopGroupHolder(createSharedEventLoopGroup(type, threads), key);
+
+                SHARED_EVENT_LOOP_GROUPS.put(key, groupHolder);
+            } else {
+                groupHolder.incRef();
+            }
+
+            return new SharedEventLoopGroupRef(groupHolder);
+        }
+    }
+
+    private static void sharedGroupRefClosed(EventLoopGroupHolder holder) {
+        boolean shutdown = false;
+        synchronized (SHARED_EVENT_LOOP_GROUPS) {
+            int remaining = holder.decRef();
+            if (remaining == 0) {
+                SHARED_EVENT_LOOP_GROUPS.remove(holder.key());
+                shutdown = true;
+            }
+        }
+
+        if (shutdown) {
+            shutdownEventLoopGroup(holder.group());
+        }
+    }
+
+    private static void shutdownEventLoopGroup(final EventLoopGroup group) {
+        Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+        if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+            LOG.trace("Channel group shutdown failed to complete in allotted 
time");
+        }
+    }
+
+    private static QpidJMSThreadFactory createSharedQpidJMSThreadFactory(final 
EventLoopType type, final int threads) {
+        final String baseName = "SharedNettyEventLoopGroup: type = " + type + 
" - threads = " + threads + " - group-id = " + 
SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE.incrementAndGet();
+        return new QpidJMSThreadFactory(thread -> baseName + " - thread-id = " 
+ thread.getId(), true, null);
+    }
+
+    private static EventLoopGroup createSharedEventLoopGroup(final 
EventLoopType type, final int threads) {
+        return type.createEventLoopGroup(threads, 
createSharedQpidJMSThreadFactory(type, threads));
+    }
+
+    private static final class SharedEventLoopGroupRef implements 
EventLoopGroupRef {
+        private final EventLoopGroupHolder sharedGroupHolder;
+        private final AtomicBoolean closed = new AtomicBoolean();
+
+        public SharedEventLoopGroupRef(final EventLoopGroupHolder 
sharedGroupHolder) {
+            this.sharedGroupHolder = Objects.requireNonNull(sharedGroupHolder);
+        }
+
+        @Override
+        public EventLoopGroup group() {
+            if (closed.get()) {
+                throw new IllegalStateException("Group reference is already 
closed");
+            }
+
+            return sharedGroupHolder.group();
+        }
+
+        @Override
+        public void close() {
+            if (closed.compareAndSet(false, true)) {
+                sharedGroupRefClosed(sharedGroupHolder);
+            }
+        }
+    }
+
+    private static class EventLoopGroupKey {
+        private final EventLoopType type;
+        private final int eventLoopThreads;
+
+        private EventLoopGroupKey(final EventLoopType type, final int 
eventLoopThreads) {
+            this.type = type;
+            this.eventLoopThreads = eventLoopThreads;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            final EventLoopGroupKey that = (EventLoopGroupKey) o;
+            if (eventLoopThreads != that.eventLoopThreads)
+                return false;
+            return type == that.type;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = type != null ? type.hashCode() : 0;
+            result = 31 * result + eventLoopThreads;
+            return result;
+        }
+    }
+
+    private static final class EventLoopGroupHolder {
+        private final EventLoopGroup group;
+        private final EventLoopGroupKey key;
+        private int refCnt = 1;
+
+        private EventLoopGroupHolder(final EventLoopGroup sharedGroup, final 
EventLoopGroupKey key) {
+            this.group = Objects.requireNonNull(sharedGroup);
+            this.key = Objects.requireNonNull(key);
+        }
+
+        public EventLoopGroup group() {
+            return group;
+        }
+
+        public EventLoopGroupKey key() {
+            return key;
+        }
+
+        public int incRef() {
+            assert Thread.holdsLock(SHARED_EVENT_LOOP_GROUPS);
+            return refCnt++;

Review comment:
       I put this in 'just because', before ever using the method, but as the 
return value is not used I'm wondering if the return should just be removed (or 
at least return the new rather than old value so it is consistent with decRef).

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.Future;
+
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(NettyEventLoopGroupFactory.class);
+    private static final AtomicLong SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE 
= new AtomicLong(0);
+    private static final int SHUTDOWN_TIMEOUT = 50;
+
+    private static final Map<EventLoopGroupKey, EventLoopGroupHolder> 
SHARED_EVENT_LOOP_GROUPS = new HashMap<>();
+
+    private NettyEventLoopGroupFactory() {
+        // No instances
+    }
+
+    public static EventLoopGroupRef unsharedGroup(final EventLoopType type, 
final ThreadFactory threadFactory) {
+        Objects.requireNonNull(type);
+        final EventLoopGroup unsharedGroup = type.createEventLoopGroup(1, 
threadFactory);
+
+        return new EventLoopGroupRef() {
+            @Override
+            public EventLoopGroup group() {
+                return unsharedGroup;
+            }
+
+            @Override
+            public void close() {
+                shutdownEventLoopGroup(unsharedGroup);
+            }
+        };
+    }
+
+    public static EventLoopGroupRef sharedGroup(final EventLoopType type, 
final int threads) {
+        Objects.requireNonNull(type);
+        if (threads <= 0) {
+            throw new IllegalArgumentException("shared event loop threads 
value must be > 0");
+        }
+
+        final EventLoopGroupKey key = new EventLoopGroupKey(type, threads);
+
+        synchronized (SHARED_EVENT_LOOP_GROUPS) {
+            EventLoopGroupHolder groupHolder = 
SHARED_EVENT_LOOP_GROUPS.get(key);
+            if (groupHolder == null) {
+                groupHolder = new 
EventLoopGroupHolder(createSharedEventLoopGroup(type, threads), key);
+
+                SHARED_EVENT_LOOP_GROUPS.put(key, groupHolder);
+            } else {
+                groupHolder.incRef();
+            }
+
+            return new SharedEventLoopGroupRef(groupHolder);
+        }
+    }
+
+    private static void sharedGroupRefClosed(EventLoopGroupHolder holder) {
+        boolean shutdown = false;
+        synchronized (SHARED_EVENT_LOOP_GROUPS) {
+            int remaining = holder.decRef();
+            if (remaining == 0) {
+                SHARED_EVENT_LOOP_GROUPS.remove(holder.key());
+                shutdown = true;
+            }
+        }
+
+        if (shutdown) {
+            shutdownEventLoopGroup(holder.group());
+        }
+    }
+
+    private static void shutdownEventLoopGroup(final EventLoopGroup group) {
+        Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+        if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+            LOG.trace("Channel group shutdown failed to complete in allotted 
time");
+        }
+    }
+
+    private static QpidJMSThreadFactory createSharedQpidJMSThreadFactory(final 
EventLoopType type, final int threads) {
+        final String baseName = "SharedNettyEventLoopGroup: type = " + type + 
" - threads = " + threads + " - group-id = " + 
SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE.incrementAndGet();
+        return new QpidJMSThreadFactory(thread -> baseName + " - thread-id = " 
+ thread.getId(), true, null);
+    }
+
+    private static EventLoopGroup createSharedEventLoopGroup(final 
EventLoopType type, final int threads) {
+        return type.createEventLoopGroup(threads, 
createSharedQpidJMSThreadFactory(type, threads));
+    }
+
+    private static final class SharedEventLoopGroupRef implements 
EventLoopGroupRef {
+        private final EventLoopGroupHolder sharedGroupHolder;
+        private final AtomicBoolean closed = new AtomicBoolean();
+
+        public SharedEventLoopGroupRef(final EventLoopGroupHolder 
sharedGroupHolder) {
+            this.sharedGroupHolder = Objects.requireNonNull(sharedGroupHolder);
+        }
+
+        @Override
+        public EventLoopGroup group() {
+            if (closed.get()) {
+                throw new IllegalStateException("Group reference is already 
closed");
+            }
+
+            return sharedGroupHolder.group();
+        }
+
+        @Override
+        public void close() {
+            if (closed.compareAndSet(false, true)) {
+                sharedGroupRefClosed(sharedGroupHolder);
+            }
+        }
+    }
+
+    private static class EventLoopGroupKey {
+        private final EventLoopType type;
+        private final int eventLoopThreads;
+
+        private EventLoopGroupKey(final EventLoopType type, final int 
eventLoopThreads) {
+            this.type = type;
+            this.eventLoopThreads = eventLoopThreads;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            final EventLoopGroupKey that = (EventLoopGroupKey) o;
+            if (eventLoopThreads != that.eventLoopThreads)
+                return false;

Review comment:
       Lets add the braces.

##########
File path: 
qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
##########
@@ -394,6 +399,123 @@ public void testZeroSizedSentNoErrors() throws Exception {
         assertTrue(data.isEmpty());
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testCannotDereferenceSharedClosedEventLoopGroup() throws 
Exception {
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) 
{
+
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+            final TransportOptions sharedTransportOptions = 
createClientOptions();
+            sharedTransportOptions.setUseKQueue(false);
+            sharedTransportOptions.setUseEpoll(false);
+            sharedTransportOptions.setSharedEventLoopThreads(1);
+
+            EventLoopGroupRef groupRef = null;
+            Transport nioTransport = createConnectedTransport(serverLocation, 
sharedTransportOptions);
+            try {
+                groupRef = getGroupRef(nioTransport);
+                assertNotNull(groupRef.group());
+            } finally {
+                nioTransport.close();
+            }
+
+            server.stop();
+
+            try {
+                groupRef.group();
+                fail("Should have thrown ISE due to being closed");
+            } catch (IllegalStateException expected) {
+                // Ignore
+            } catch (Throwable unexpected) {
+                fail("Should have thrown IllegalStateException");
+            }
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSharedEventLoopGroups() throws Exception {
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) 
{
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+            final TransportOptions sharedTransportOptions = 
createClientOptions();
+            sharedTransportOptions.setUseKQueue(false);
+            sharedTransportOptions.setUseEpoll(false);
+            sharedTransportOptions.setSharedEventLoopThreads(1);
+            Transport sharedNioTransport1 = 
createConnectedTransport(serverLocation, sharedTransportOptions);
+            Transport sharedNioTransport2 = 
createConnectedTransport(serverLocation, sharedTransportOptions);
+            assertSame(getGroupRef(sharedNioTransport1).group(), 
getGroupRef(sharedNioTransport2).group());
+            final EventLoopGroup sharedGroup = 
getGroupRef(sharedNioTransport1).group();
+
+            sharedNioTransport1.close();
+            assertFalse(sharedGroup.isShutdown());
+            assertFalse(sharedGroup.isTerminated());
+            sharedNioTransport2.close();
+            assertTrue(sharedGroup.isShutdown());
+            assertTrue(sharedGroup.isTerminated());
+
+            server.stop();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testUnsharedEventLoopGroups() throws Exception {
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) 
{
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            final TransportOptions unsharedTransportOptions1 = 
createClientOptions();
+            unsharedTransportOptions1.setUseKQueue(false);
+            unsharedTransportOptions1.setUseEpoll(false);
+            unsharedTransportOptions1.setSharedEventLoopThreads(1);

Review comment:
       Why is the 'unsharedTransportOptions1' (and 2 below) asking for a shared 
group? Seems like this is not in fact "testUnsharedEventLoopGroups" but really 
"testSharedEventLoopGroupsOfDifferentSizes".
   
   If the variable names and test name were updated to retain this test of 
behaviour, it should then also probably add another transport to verify there 
is in fact still any sharing amongst the same-size groups, which it doesnt 
currently check.
   
   Then the other test would need added

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupFactory.java
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.Future;
+
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class NettyEventLoopGroupFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(NettyEventLoopGroupFactory.class);
+    private static final AtomicLong SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE 
= new AtomicLong(0);
+    private static final int SHUTDOWN_TIMEOUT = 50;
+
+    private static final Map<EventLoopGroupKey, EventLoopGroupHolder> 
SHARED_EVENT_LOOP_GROUPS = new HashMap<>();
+
+    private NettyEventLoopGroupFactory() {
+        // No instances
+    }
+
+    public static EventLoopGroupRef unsharedGroup(final EventLoopType type, 
final ThreadFactory threadFactory) {
+        Objects.requireNonNull(type);
+        final EventLoopGroup unsharedGroup = type.createEventLoopGroup(1, 
threadFactory);
+
+        return new EventLoopGroupRef() {
+            @Override
+            public EventLoopGroup group() {
+                return unsharedGroup;
+            }
+
+            @Override
+            public void close() {
+                shutdownEventLoopGroup(unsharedGroup);
+            }
+        };
+    }
+
+    public static EventLoopGroupRef sharedGroup(final EventLoopType type, 
final int threads) {
+        Objects.requireNonNull(type);
+        if (threads <= 0) {
+            throw new IllegalArgumentException("shared event loop threads 
value must be > 0");
+        }
+
+        final EventLoopGroupKey key = new EventLoopGroupKey(type, threads);
+
+        synchronized (SHARED_EVENT_LOOP_GROUPS) {
+            EventLoopGroupHolder groupHolder = 
SHARED_EVENT_LOOP_GROUPS.get(key);
+            if (groupHolder == null) {
+                groupHolder = new 
EventLoopGroupHolder(createSharedEventLoopGroup(type, threads), key);
+
+                SHARED_EVENT_LOOP_GROUPS.put(key, groupHolder);
+            } else {
+                groupHolder.incRef();
+            }
+
+            return new SharedEventLoopGroupRef(groupHolder);
+        }
+    }
+
+    private static void sharedGroupRefClosed(EventLoopGroupHolder holder) {
+        boolean shutdown = false;
+        synchronized (SHARED_EVENT_LOOP_GROUPS) {
+            int remaining = holder.decRef();
+            if (remaining == 0) {
+                SHARED_EVENT_LOOP_GROUPS.remove(holder.key());
+                shutdown = true;
+            }
+        }
+
+        if (shutdown) {
+            shutdownEventLoopGroup(holder.group());
+        }
+    }
+
+    private static void shutdownEventLoopGroup(final EventLoopGroup group) {
+        Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+        if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+            LOG.trace("Channel group shutdown failed to complete in allotted 
time");
+        }
+    }
+
+    private static QpidJMSThreadFactory createSharedQpidJMSThreadFactory(final 
EventLoopType type, final int threads) {
+        final String baseName = "SharedNettyEventLoopGroup: type = " + type + 
" - threads = " + threads + " - group-id = " + 
SHARED_EVENT_LOOP_GROUP_INSTANCE_SEQUENCE.incrementAndGet();
+        return new QpidJMSThreadFactory(thread -> baseName + " - thread-id = " 
+ thread.getId(), true, null);
+    }
+
+    private static EventLoopGroup createSharedEventLoopGroup(final 
EventLoopType type, final int threads) {
+        return type.createEventLoopGroup(threads, 
createSharedQpidJMSThreadFactory(type, threads));
+    }
+
+    private static final class SharedEventLoopGroupRef implements 
EventLoopGroupRef {
+        private final EventLoopGroupHolder sharedGroupHolder;
+        private final AtomicBoolean closed = new AtomicBoolean();
+
+        public SharedEventLoopGroupRef(final EventLoopGroupHolder 
sharedGroupHolder) {
+            this.sharedGroupHolder = Objects.requireNonNull(sharedGroupHolder);
+        }
+
+        @Override
+        public EventLoopGroup group() {
+            if (closed.get()) {
+                throw new IllegalStateException("Group reference is already 
closed");
+            }
+
+            return sharedGroupHolder.group();
+        }
+
+        @Override
+        public void close() {
+            if (closed.compareAndSet(false, true)) {
+                sharedGroupRefClosed(sharedGroupHolder);
+            }
+        }
+    }
+
+    private static class EventLoopGroupKey {
+        private final EventLoopType type;
+        private final int eventLoopThreads;
+
+        private EventLoopGroupKey(final EventLoopType type, final int 
eventLoopThreads) {
+            this.type = type;
+            this.eventLoopThreads = eventLoopThreads;
+        }
+
+        @Override
+        public boolean equals(final Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+            final EventLoopGroupKey that = (EventLoopGroupKey) o;
+            if (eventLoopThreads != that.eventLoopThreads)
+                return false;
+            return type == that.type;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = type != null ? type.hashCode() : 0;
+            result = 31 * result + eventLoopThreads;
+            return result;
+        }
+    }
+
+    private static final class EventLoopGroupHolder {
+        private final EventLoopGroup group;
+        private final EventLoopGroupKey key;
+        private int refCnt = 1;
+
+        private EventLoopGroupHolder(final EventLoopGroup sharedGroup, final 
EventLoopGroupKey key) {
+            this.group = Objects.requireNonNull(sharedGroup);
+            this.key = Objects.requireNonNull(key);
+        }
+
+        public EventLoopGroup group() {
+            return group;
+        }
+
+        public EventLoopGroupKey key() {
+            return key;
+        }
+
+        public int incRef() {
+            assert Thread.holdsLock(SHARED_EVENT_LOOP_GROUPS);
+            return refCnt++;
+        }
+
+        public int decRef() {
+            assert Thread.holdsLock(SHARED_EVENT_LOOP_GROUPS);

Review comment:
       I considered adding a comment about needing the lock when using these 
but decided it was obvious enough...this is nice though :)

##########
File path: 
qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
##########
@@ -394,6 +399,123 @@ public void testZeroSizedSentNoErrors() throws Exception {
         assertTrue(data.isEmpty());
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testCannotDereferenceSharedClosedEventLoopGroup() throws 
Exception {
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) 
{
+
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+            final TransportOptions sharedTransportOptions = 
createClientOptions();
+            sharedTransportOptions.setUseKQueue(false);
+            sharedTransportOptions.setUseEpoll(false);
+            sharedTransportOptions.setSharedEventLoopThreads(1);
+
+            EventLoopGroupRef groupRef = null;
+            Transport nioTransport = createConnectedTransport(serverLocation, 
sharedTransportOptions);
+            try {
+                groupRef = getGroupRef(nioTransport);
+                assertNotNull(groupRef.group());
+            } finally {
+                nioTransport.close();
+            }
+
+            server.stop();
+
+            try {
+                groupRef.group();
+                fail("Should have thrown ISE due to being closed");
+            } catch (IllegalStateException expected) {
+                // Ignore
+            } catch (Throwable unexpected) {
+                fail("Should have thrown IllegalStateException");
+            }
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the 
event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSharedEventLoopGroups() throws Exception {
+        try (NettyEchoServer server = createEchoServer(createServerOptions())) 
{
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+            final TransportOptions sharedTransportOptions = 
createClientOptions();
+            sharedTransportOptions.setUseKQueue(false);
+            sharedTransportOptions.setUseEpoll(false);
+            sharedTransportOptions.setSharedEventLoopThreads(1);
+            Transport sharedNioTransport1 = 
createConnectedTransport(serverLocation, sharedTransportOptions);
+            Transport sharedNioTransport2 = 
createConnectedTransport(serverLocation, sharedTransportOptions);

Review comment:
       The test should 'collect' these as they are created and ensure they get 
shut down at the end. If any assertion fails in the 'wrong place' it could leak 
them, which could impact other tests due to the sharing.

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
##########
@@ -137,29 +134,17 @@ public ScheduledExecutorService connect(final Runnable 
initRoutine, SSLContext s
         }
 
         TransportOptions transportOptions = getTransportOptions();
-        boolean useKQueue = KQueueSupport.isAvailable(transportOptions);
-        boolean useEpoll = EpollSupport.isAvailable(transportOptions);
-
-        if (useKQueue) {
-            LOG.trace("Netty Transport using KQueue mode");
-            group = KQueueSupport.createGroup(1, ioThreadfactory);
-        } else if (useEpoll) {
-            LOG.trace("Netty Transport using Epoll mode");
-            group = EpollSupport.createGroup(1, ioThreadfactory);
-        } else {
-            LOG.trace("Netty Transport using NIO mode");
-            group = new NioEventLoopGroup(1, ioThreadfactory);
-        }
 
-        bootstrap = new Bootstrap();
-        bootstrap.group(group);
-        if (useKQueue) {
-            KQueueSupport.createChannel(bootstrap);
-        } else if (useEpoll) {
-            EpollSupport.createChannel(bootstrap);
+        EventLoopType type = EventLoopType.valueOf(transportOptions);
+        int sharedEventLoopThreads = 
transportOptions.getSharedEventLoopThreads();
+        if (sharedEventLoopThreads > 0) {
+            groupRef = sharedGroup(type, sharedEventLoopThreads);
         } else {
-            bootstrap.channel(NioSocketChannel.class);
+            groupRef = unsharedGroup(type, ioThreadfactory);
         }
+
+        Bootstrap bootstrap = new Bootstrap();
+        type.createChannel(bootstrap.group(groupRef.group()));

Review comment:
       Lets spend the line on setting up the bootstrap group before then 
separately creating the channel.

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportOptions.java
##########
@@ -213,6 +215,14 @@ public void setTcpKeepAlive(boolean keepAlive) {
         this.tcpKeepAlive = keepAlive;
     }
 
+    public void setSharedEventLoopThreads(int numThreads) {
+        this.sharedEventLoopThreads = numThreads;
+    }
+
+    public int getSharedEventLoopThreads() {
+        return sharedEventLoopThreads;
+    }

Review comment:
       There are related unit tests for this class and its usage that will need 
updating. I'll do that as its quicker than explaining.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to