gemmellr commented on a change in pull request #45:
URL: https://github.com/apache/qpid-jms/pull/45#discussion_r739102332



##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public enum NettyEventLoopGroupPool {
+   Instance;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEventLoopGroupPool.class);
+   private static final int SHUTDOWN_TIMEOUT = 50;
+   private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = 
new AtomicInteger(0);
+
+   public interface Lease extends AutoCloseable {
+
+      EventLoopGroup group();
+
+      @Override
+      default void close() {
+         Future<?> fut = group().shutdownGracefully(0, SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+         if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+            LOG.trace("Channel group shutdown failed to complete in allotted 
time");
+         }
+      }
+   }
+
+   private static final class DisposableLease implements Lease {
+
+      private final SharedGroupRef sharedGroupRef;
+      private final AtomicBoolean closed;
+
+      public DisposableLease(SharedGroupRef sharedGroupRef) {
+         this.sharedGroupRef = sharedGroupRef;
+         this.closed = new AtomicBoolean();
+      }
+
+      @Override
+      public EventLoopGroup group() {
+         return sharedGroupRef.group();
+      }
+
+      @Override
+      public void close() {
+         if (closed.compareAndSet(false, true)) {
+            sharedGroupRef.release();
+         }
+      }
+   }
+
+   private final class SharedGroupRef {
+
+      private final EventLoopGroup group;
+      private final AtomicInteger refs;
+
+      private SharedGroupRef(final EventLoopGroup group) {
+         this.group = group;
+         refs = new AtomicInteger(1);
+      }
+
+      public boolean retain() {
+         while (true) {
+            final int currValue = refs.get();
+            if (currValue == 0) {
+               // this has been already disposed!
+               return false;
+            }
+            if (refs.compareAndSet(currValue, currValue + 1)) {
+               return true;
+            }
+         }
+      }
+
+      public EventLoopGroup group() {
+         if (refs.get() == 0) {
+            throw new IllegalStateException("the event loop group cannot be 
reused");
+         }
+         return group;
+      }
+
+      public void release() {
+         while (true) {
+            final int currValue = refs.get();
+            if (currValue == 0) {
+               return;
+            }
+            if (refs.compareAndSet(currValue, currValue - 1)) {
+               if (currValue == 1) {
+                  // if a racing thread has borrowed this lease, it would try 
already to set it to a new value:
+                  // this one is a best effort cleanup to help GC
+                  sharedEventLoopGroup.compareAndSet(this, null);
+                  Future<?> fut = group.shutdownGracefully(0, 
SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+                  if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+                     LOG.trace("Channel group shutdown failed to complete in 
allotted time");
+                  }
+               }
+               return;
+            }
+         }
+      }
+   }
+
+   private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new 
AtomicReference<>(null);
+
+   public Lease borrow(final TransportOptions transportOptions, final 
ThreadFactory ioThreadFactory) {
+      assert transportOptions.getSharedEventLoopThreads() != 0;
+      if (transportOptions.getSharedEventLoopThreads() < 0) {
+         return () -> createEventLoopGroup(transportOptions, ioThreadFactory);
+      }
+      while (true) {

Review comment:
       Newline, loop that comes is basically a different unit having typically 
returned already by default. Otherwise an else.

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public enum NettyEventLoopGroupPool {
+   Instance;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEventLoopGroupPool.class);
+   private static final int SHUTDOWN_TIMEOUT = 50;
+   private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = 
new AtomicInteger(0);
+
+   public interface Lease extends AutoCloseable {
+
+      EventLoopGroup group();
+
+      @Override
+      default void close() {
+         Future<?> fut = group().shutdownGracefully(0, SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+         if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+            LOG.trace("Channel group shutdown failed to complete in allotted 
time");
+         }
+      }
+   }
+
+   private static final class DisposableLease implements Lease {
+
+      private final SharedGroupRef sharedGroupRef;
+      private final AtomicBoolean closed;
+
+      public DisposableLease(SharedGroupRef sharedGroupRef) {
+         this.sharedGroupRef = sharedGroupRef;
+         this.closed = new AtomicBoolean();
+      }
+
+      @Override
+      public EventLoopGroup group() {
+         return sharedGroupRef.group();
+      }
+
+      @Override
+      public void close() {
+         if (closed.compareAndSet(false, true)) {
+            sharedGroupRef.release();
+         }
+      }
+   }
+
+   private final class SharedGroupRef {
+
+      private final EventLoopGroup group;
+      private final AtomicInteger refs;
+
+      private SharedGroupRef(final EventLoopGroup group) {
+         this.group = group;
+         refs = new AtomicInteger(1);
+      }
+
+      public boolean retain() {
+         while (true) {
+            final int currValue = refs.get();
+            if (currValue == 0) {
+               // this has been already disposed!
+               return false;
+            }
+            if (refs.compareAndSet(currValue, currValue + 1)) {
+               return true;
+            }
+         }
+      }
+
+      public EventLoopGroup group() {
+         if (refs.get() == 0) {
+            throw new IllegalStateException("the event loop group cannot be 
reused");
+         }
+         return group;
+      }
+
+      public void release() {
+         while (true) {
+            final int currValue = refs.get();
+            if (currValue == 0) {
+               return;
+            }
+            if (refs.compareAndSet(currValue, currValue - 1)) {
+               if (currValue == 1) {
+                  // if a racing thread has borrowed this lease, it would try 
already to set it to a new value:
+                  // this one is a best effort cleanup to help GC
+                  sharedEventLoopGroup.compareAndSet(this, null);
+                  Future<?> fut = group.shutdownGracefully(0, 
SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+                  if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+                     LOG.trace("Channel group shutdown failed to complete in 
allotted time");
+                  }
+               }
+               return;
+            }
+         }
+      }
+   }
+
+   private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new 
AtomicReference<>(null);

Review comment:
       Can go at the top

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public enum NettyEventLoopGroupPool {
+   Instance;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEventLoopGroupPool.class);
+   private static final int SHUTDOWN_TIMEOUT = 50;
+   private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = 
new AtomicInteger(0);
+
+   public interface Lease extends AutoCloseable {
+
+      EventLoopGroup group();
+
+      @Override
+      default void close() {
+         Future<?> fut = group().shutdownGracefully(0, SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+         if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+            LOG.trace("Channel group shutdown failed to complete in allotted 
time");
+         }
+      }
+   }
+
+   private static final class DisposableLease implements Lease {
+
+      private final SharedGroupRef sharedGroupRef;
+      private final AtomicBoolean closed;
+
+      public DisposableLease(SharedGroupRef sharedGroupRef) {
+         this.sharedGroupRef = sharedGroupRef;
+         this.closed = new AtomicBoolean();
+      }
+
+      @Override
+      public EventLoopGroup group() {
+         return sharedGroupRef.group();
+      }
+
+      @Override
+      public void close() {
+         if (closed.compareAndSet(false, true)) {
+            sharedGroupRef.release();
+         }
+      }
+   }
+
+   private final class SharedGroupRef {
+
+      private final EventLoopGroup group;
+      private final AtomicInteger refs;
+
+      private SharedGroupRef(final EventLoopGroup group) {
+         this.group = group;
+         refs = new AtomicInteger(1);
+      }
+
+      public boolean retain() {
+         while (true) {
+            final int currValue = refs.get();
+            if (currValue == 0) {
+               // this has been already disposed!
+               return false;
+            }
+            if (refs.compareAndSet(currValue, currValue + 1)) {
+               return true;
+            }
+         }
+      }
+
+      public EventLoopGroup group() {
+         if (refs.get() == 0) {
+            throw new IllegalStateException("the event loop group cannot be 
reused");
+         }
+         return group;
+      }
+
+      public void release() {
+         while (true) {
+            final int currValue = refs.get();
+            if (currValue == 0) {
+               return;
+            }
+            if (refs.compareAndSet(currValue, currValue - 1)) {
+               if (currValue == 1) {
+                  // if a racing thread has borrowed this lease, it would try 
already to set it to a new value:
+                  // this one is a best effort cleanup to help GC
+                  sharedEventLoopGroup.compareAndSet(this, null);
+                  Future<?> fut = group.shutdownGracefully(0, 
SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+                  if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+                     LOG.trace("Channel group shutdown failed to complete in 
allotted time");
+                  }
+               }
+               return;
+            }
+         }
+      }
+   }
+
+   private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new 
AtomicReference<>(null);
+
+   public Lease borrow(final TransportOptions transportOptions, final 
ThreadFactory ioThreadFactory) {
+      assert transportOptions.getSharedEventLoopThreads() != 0;
+      if (transportOptions.getSharedEventLoopThreads() < 0) {
+         return () -> createEventLoopGroup(transportOptions, ioThreadFactory);
+      }
+      while (true) {
+         SharedGroupRef sharedGroupRef = sharedEventLoopGroup.get();
+         if (sharedGroupRef != null && sharedGroupRef.retain()) {
+            return new DisposableLease(sharedGroupRef);
+         }
+         // lease == null || lease not valid anymore, still disposing
+         synchronized (this) {
+            sharedGroupRef = sharedEventLoopGroup.get();
+            if (sharedGroupRef != null && sharedGroupRef.retain()) {
+               return new DisposableLease(sharedGroupRef);
+            }
+            // lease == null || lease not valid anymore, still disposing

Review comment:
       Convert to text so it doesnt look like partly commented out code. "Lease 
is null, or..."

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
##########
@@ -214,8 +200,7 @@ public void operationComplete(ChannelFuture future) throws 
Exception {
                 }
             });
         }
-
-        return group;
+        return channel.eventLoop();

Review comment:
       This may be worth a comment, that it needs to return the specific event 
loop for the connection, as the group may be multi-threaded.

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public enum NettyEventLoopGroupPool {
+   Instance;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEventLoopGroupPool.class);
+   private static final int SHUTDOWN_TIMEOUT = 50;
+   private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = 
new AtomicInteger(0);
+
+   public interface Lease extends AutoCloseable {
+
+      EventLoopGroup group();
+
+      @Override
+      default void close() {
+         Future<?> fut = group().shutdownGracefully(0, SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+         if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+            LOG.trace("Channel group shutdown failed to complete in allotted 
time");
+         }
+      }
+   }
+
+   private static final class DisposableLease implements Lease {
+
+      private final SharedGroupRef sharedGroupRef;
+      private final AtomicBoolean closed;
+
+      public DisposableLease(SharedGroupRef sharedGroupRef) {
+         this.sharedGroupRef = sharedGroupRef;
+         this.closed = new AtomicBoolean();
+      }
+
+      @Override
+      public EventLoopGroup group() {
+         return sharedGroupRef.group();
+      }
+
+      @Override
+      public void close() {
+         if (closed.compareAndSet(false, true)) {
+            sharedGroupRef.release();
+         }
+      }
+   }
+
+   private final class SharedGroupRef {
+
+      private final EventLoopGroup group;
+      private final AtomicInteger refs;
+
+      private SharedGroupRef(final EventLoopGroup group) {
+         this.group = group;
+         refs = new AtomicInteger(1);
+      }
+
+      public boolean retain() {
+         while (true) {
+            final int currValue = refs.get();
+            if (currValue == 0) {
+               // this has been already disposed!
+               return false;
+            }
+            if (refs.compareAndSet(currValue, currValue + 1)) {
+               return true;
+            }
+         }
+      }
+
+      public EventLoopGroup group() {
+         if (refs.get() == 0) {
+            throw new IllegalStateException("the event loop group cannot be 
reused");
+         }
+         return group;
+      }
+
+      public void release() {
+         while (true) {
+            final int currValue = refs.get();
+            if (currValue == 0) {
+               return;
+            }
+            if (refs.compareAndSet(currValue, currValue - 1)) {
+               if (currValue == 1) {
+                  // if a racing thread has borrowed this lease, it would try 
already to set it to a new value:
+                  // this one is a best effort cleanup to help GC
+                  sharedEventLoopGroup.compareAndSet(this, null);

Review comment:
       Surely they cant have borrowed this lease or we wouldnt get to this 
line. Did you mean to say that if a racing thread *failed* to retain this group 
ref and so created a new group, it could race this thread in trying to set 
sharedEventLoopGroup to the new value?

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public enum NettyEventLoopGroupPool {
+   Instance;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEventLoopGroupPool.class);
+   private static final int SHUTDOWN_TIMEOUT = 50;
+   private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = 
new AtomicInteger(0);
+
+   public interface Lease extends AutoCloseable {
+
+      EventLoopGroup group();
+
+      @Override
+      default void close() {

Review comment:
       Given the way it is used I think a simple signature-only Lease interface 
with two distinct actual implementations would be far simpler to follow in this 
case than what is being done (essentially relying on this default method for 
the single-thread groups case so you can still return a lambda, and then 
immediately overriding the close for the shared group case this entire 
class-err-enum mainly exists to service, where it instead creates/returns using 
a class).
   
   Its also feels a bit odd that this returns non-pooled and pooled from a 
'Instance' of NettyEventLoopGroupPool. I think a static create style method 
that documented its behaviour might again be clearer, or for the class-err-enum 
to be renamed something like NettyEventLoopGroupFactory (or for the pooled and 
non-pooled bits to just be separate).

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public enum NettyEventLoopGroupPool {
+   Instance;

Review comment:
       Instance would be nicer in capitals. I think I'd go with a class than an 
enum in this case, especially given all the interfaces etc. Is there a reason 
to favour enum?

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public enum NettyEventLoopGroupPool {
+   Instance;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEventLoopGroupPool.class);
+   private static final int SHUTDOWN_TIMEOUT = 50;
+   private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = 
new AtomicInteger(0);
+
+   public interface Lease extends AutoCloseable {
+
+      EventLoopGroup group();
+
+      @Override
+      default void close() {
+         Future<?> fut = group().shutdownGracefully(0, SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+         if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+            LOG.trace("Channel group shutdown failed to complete in allotted 
time");
+         }
+      }
+   }
+
+   private static final class DisposableLease implements Lease {
+
+      private final SharedGroupRef sharedGroupRef;
+      private final AtomicBoolean closed;
+
+      public DisposableLease(SharedGroupRef sharedGroupRef) {
+         this.sharedGroupRef = sharedGroupRef;
+         this.closed = new AtomicBoolean();
+      }
+
+      @Override
+      public EventLoopGroup group() {
+         return sharedGroupRef.group();
+      }
+
+      @Override
+      public void close() {
+         if (closed.compareAndSet(false, true)) {
+            sharedGroupRef.release();
+         }
+      }
+   }
+
+   private final class SharedGroupRef {
+
+      private final EventLoopGroup group;
+      private final AtomicInteger refs;
+
+      private SharedGroupRef(final EventLoopGroup group) {
+         this.group = group;
+         refs = new AtomicInteger(1);
+      }
+
+      public boolean retain() {
+         while (true) {
+            final int currValue = refs.get();
+            if (currValue == 0) {
+               // this has been already disposed!
+               return false;
+            }
+            if (refs.compareAndSet(currValue, currValue + 1)) {
+               return true;
+            }
+         }
+      }
+
+      public EventLoopGroup group() {
+         if (refs.get() == 0) {
+            throw new IllegalStateException("the event loop group cannot be 
reused");
+         }
+         return group;
+      }
+
+      public void release() {
+         while (true) {
+            final int currValue = refs.get();
+            if (currValue == 0) {
+               return;
+            }
+            if (refs.compareAndSet(currValue, currValue - 1)) {
+               if (currValue == 1) {
+                  // if a racing thread has borrowed this lease, it would try 
already to set it to a new value:
+                  // this one is a best effort cleanup to help GC
+                  sharedEventLoopGroup.compareAndSet(this, null);
+                  Future<?> fut = group.shutdownGracefully(0, 
SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+                  if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+                     LOG.trace("Channel group shutdown failed to complete in 
allotted time");
+                  }
+               }
+               return;
+            }
+         }
+      }
+   }
+
+   private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new 
AtomicReference<>(null);
+
+   public Lease borrow(final TransportOptions transportOptions, final 
ThreadFactory ioThreadFactory) {
+      assert transportOptions.getSharedEventLoopThreads() != 0;
+      if (transportOptions.getSharedEventLoopThreads() < 0) {
+         return () -> createEventLoopGroup(transportOptions, ioThreadFactory);
+      }
+      while (true) {
+         SharedGroupRef sharedGroupRef = sharedEventLoopGroup.get();
+         if (sharedGroupRef != null && sharedGroupRef.retain()) {
+            return new DisposableLease(sharedGroupRef);
+         }
+         // lease == null || lease not valid anymore, still disposing

Review comment:
       Convert to text so it doesnt look like partly commented out code. "Lease 
is null, or..."

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public enum NettyEventLoopGroupPool {
+   Instance;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEventLoopGroupPool.class);
+   private static final int SHUTDOWN_TIMEOUT = 50;
+   private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = 
new AtomicInteger(0);
+
+   public interface Lease extends AutoCloseable {

Review comment:
       Followup, dont really like 'Lease' given the default non-pooled 
behaviour. Perhaps something like Holder?

##########
File path: 
qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.Future;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public enum NettyEventLoopGroupPool {
+   Instance;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(NettyEventLoopGroupPool.class);
+   private static final int SHUTDOWN_TIMEOUT = 50;
+   private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = 
new AtomicInteger(0);
+
+   public interface Lease extends AutoCloseable {
+
+      EventLoopGroup group();
+
+      @Override
+      default void close() {
+         Future<?> fut = group().shutdownGracefully(0, SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+         if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+            LOG.trace("Channel group shutdown failed to complete in allotted 
time");
+         }
+      }
+   }
+
+   private static final class DisposableLease implements Lease {
+
+      private final SharedGroupRef sharedGroupRef;
+      private final AtomicBoolean closed;
+
+      public DisposableLease(SharedGroupRef sharedGroupRef) {
+         this.sharedGroupRef = sharedGroupRef;
+         this.closed = new AtomicBoolean();
+      }
+
+      @Override
+      public EventLoopGroup group() {
+         return sharedGroupRef.group();
+      }
+
+      @Override
+      public void close() {
+         if (closed.compareAndSet(false, true)) {
+            sharedGroupRef.release();
+         }
+      }
+   }
+
+   private final class SharedGroupRef {
+
+      private final EventLoopGroup group;
+      private final AtomicInteger refs;
+
+      private SharedGroupRef(final EventLoopGroup group) {
+         this.group = group;
+         refs = new AtomicInteger(1);
+      }
+
+      public boolean retain() {
+         while (true) {
+            final int currValue = refs.get();
+            if (currValue == 0) {
+               // this has been already disposed!
+               return false;
+            }
+            if (refs.compareAndSet(currValue, currValue + 1)) {
+               return true;
+            }
+         }
+      }
+
+      public EventLoopGroup group() {
+         if (refs.get() == 0) {
+            throw new IllegalStateException("the event loop group cannot be 
reused");
+         }
+         return group;
+      }
+
+      public void release() {
+         while (true) {
+            final int currValue = refs.get();
+            if (currValue == 0) {
+               return;
+            }
+            if (refs.compareAndSet(currValue, currValue - 1)) {
+               if (currValue == 1) {
+                  // if a racing thread has borrowed this lease, it would try 
already to set it to a new value:
+                  // this one is a best effort cleanup to help GC
+                  sharedEventLoopGroup.compareAndSet(this, null);
+                  Future<?> fut = group.shutdownGracefully(0, 
SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+                  if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+                     LOG.trace("Channel group shutdown failed to complete in 
allotted time");
+                  }
+               }
+               return;
+            }
+         }
+      }
+   }
+
+   private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new 
AtomicReference<>(null);
+
+   public Lease borrow(final TransportOptions transportOptions, final 
ThreadFactory ioThreadFactory) {
+      assert transportOptions.getSharedEventLoopThreads() != 0;
+      if (transportOptions.getSharedEventLoopThreads() < 0) {
+         return () -> createEventLoopGroup(transportOptions, ioThreadFactory);
+      }
+      while (true) {
+         SharedGroupRef sharedGroupRef = sharedEventLoopGroup.get();
+         if (sharedGroupRef != null && sharedGroupRef.retain()) {
+            return new DisposableLease(sharedGroupRef);
+         }
+         // lease == null || lease not valid anymore, still disposing
+         synchronized (this) {
+            sharedGroupRef = sharedEventLoopGroup.get();
+            if (sharedGroupRef != null && sharedGroupRef.retain()) {
+               return new DisposableLease(sharedGroupRef);
+            }
+            // lease == null || lease not valid anymore, still disposing
+            sharedGroupRef = new 
SharedGroupRef(createEventLoopGroup(transportOptions,
+                                                            new 
QpidJMSThreadFactory("SharedNettyEventLoopGroup :(" +
+                                                                               
      SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE.incrementAndGet() + ")", true)));

Review comment:
       Same general comment about thread name as earlier session PR, it would 
now entirely generic in operation vs the existing behaviour of being named for 
the connection its operating for. Thats often useful, so might be nice if the 
effect could be retained in some fashion when using the pool (seems like it 
will be retained without it). Though it may be so ugly to implement, e.g 
wrapping the channel handler or transport listener, that its not actually 
desirable to do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to