gemmellr commented on a change in pull request #45: URL: https://github.com/apache/qpid-jms/pull/45#discussion_r739102332
########## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.util.QpidJMSThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public enum NettyEventLoopGroupPool { + Instance; + + private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class); + private static final int SHUTDOWN_TIMEOUT = 50; + private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = new AtomicInteger(0); + + public interface Lease extends AutoCloseable { + + EventLoopGroup group(); + + @Override + default void close() { + Future<?> fut = group().shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + } + + private static final class DisposableLease implements Lease { + + private final SharedGroupRef sharedGroupRef; + private final AtomicBoolean closed; + + public DisposableLease(SharedGroupRef sharedGroupRef) { + this.sharedGroupRef = sharedGroupRef; + this.closed = new AtomicBoolean(); + } + + @Override + public EventLoopGroup group() { + return sharedGroupRef.group(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + sharedGroupRef.release(); + } + } + } + + private final class SharedGroupRef { + + private final EventLoopGroup group; + private final AtomicInteger refs; + + private SharedGroupRef(final EventLoopGroup group) { + this.group = group; + refs = new AtomicInteger(1); + } + + public boolean retain() { + while (true) { + final int currValue = refs.get(); + if (currValue == 0) { + // this has been already disposed! + return false; + } + if (refs.compareAndSet(currValue, currValue + 1)) { + return true; + } + } + } + + public EventLoopGroup group() { + if (refs.get() == 0) { + throw new IllegalStateException("the event loop group cannot be reused"); + } + return group; + } + + public void release() { + while (true) { + final int currValue = refs.get(); + if (currValue == 0) { + return; + } + if (refs.compareAndSet(currValue, currValue - 1)) { + if (currValue == 1) { + // if a racing thread has borrowed this lease, it would try already to set it to a new value: + // this one is a best effort cleanup to help GC + sharedEventLoopGroup.compareAndSet(this, null); + Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + return; + } + } + } + } + + private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new AtomicReference<>(null); + + public Lease borrow(final TransportOptions transportOptions, final ThreadFactory ioThreadFactory) { + assert transportOptions.getSharedEventLoopThreads() != 0; + if (transportOptions.getSharedEventLoopThreads() < 0) { + return () -> createEventLoopGroup(transportOptions, ioThreadFactory); + } + while (true) { Review comment: Newline, loop that comes is basically a different unit having typically returned already by default. Otherwise an else. ########## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.util.QpidJMSThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public enum NettyEventLoopGroupPool { + Instance; + + private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class); + private static final int SHUTDOWN_TIMEOUT = 50; + private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = new AtomicInteger(0); + + public interface Lease extends AutoCloseable { + + EventLoopGroup group(); + + @Override + default void close() { + Future<?> fut = group().shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + } + + private static final class DisposableLease implements Lease { + + private final SharedGroupRef sharedGroupRef; + private final AtomicBoolean closed; + + public DisposableLease(SharedGroupRef sharedGroupRef) { + this.sharedGroupRef = sharedGroupRef; + this.closed = new AtomicBoolean(); + } + + @Override + public EventLoopGroup group() { + return sharedGroupRef.group(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + sharedGroupRef.release(); + } + } + } + + private final class SharedGroupRef { + + private final EventLoopGroup group; + private final AtomicInteger refs; + + private SharedGroupRef(final EventLoopGroup group) { + this.group = group; + refs = new AtomicInteger(1); + } + + public boolean retain() { + while (true) { + final int currValue = refs.get(); + if (currValue == 0) { + // this has been already disposed! + return false; + } + if (refs.compareAndSet(currValue, currValue + 1)) { + return true; + } + } + } + + public EventLoopGroup group() { + if (refs.get() == 0) { + throw new IllegalStateException("the event loop group cannot be reused"); + } + return group; + } + + public void release() { + while (true) { + final int currValue = refs.get(); + if (currValue == 0) { + return; + } + if (refs.compareAndSet(currValue, currValue - 1)) { + if (currValue == 1) { + // if a racing thread has borrowed this lease, it would try already to set it to a new value: + // this one is a best effort cleanup to help GC + sharedEventLoopGroup.compareAndSet(this, null); + Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + return; + } + } + } + } + + private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new AtomicReference<>(null); Review comment: Can go at the top ########## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.util.QpidJMSThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public enum NettyEventLoopGroupPool { + Instance; + + private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class); + private static final int SHUTDOWN_TIMEOUT = 50; + private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = new AtomicInteger(0); + + public interface Lease extends AutoCloseable { + + EventLoopGroup group(); + + @Override + default void close() { + Future<?> fut = group().shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + } + + private static final class DisposableLease implements Lease { + + private final SharedGroupRef sharedGroupRef; + private final AtomicBoolean closed; + + public DisposableLease(SharedGroupRef sharedGroupRef) { + this.sharedGroupRef = sharedGroupRef; + this.closed = new AtomicBoolean(); + } + + @Override + public EventLoopGroup group() { + return sharedGroupRef.group(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + sharedGroupRef.release(); + } + } + } + + private final class SharedGroupRef { + + private final EventLoopGroup group; + private final AtomicInteger refs; + + private SharedGroupRef(final EventLoopGroup group) { + this.group = group; + refs = new AtomicInteger(1); + } + + public boolean retain() { + while (true) { + final int currValue = refs.get(); + if (currValue == 0) { + // this has been already disposed! + return false; + } + if (refs.compareAndSet(currValue, currValue + 1)) { + return true; + } + } + } + + public EventLoopGroup group() { + if (refs.get() == 0) { + throw new IllegalStateException("the event loop group cannot be reused"); + } + return group; + } + + public void release() { + while (true) { + final int currValue = refs.get(); + if (currValue == 0) { + return; + } + if (refs.compareAndSet(currValue, currValue - 1)) { + if (currValue == 1) { + // if a racing thread has borrowed this lease, it would try already to set it to a new value: + // this one is a best effort cleanup to help GC + sharedEventLoopGroup.compareAndSet(this, null); + Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + return; + } + } + } + } + + private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new AtomicReference<>(null); + + public Lease borrow(final TransportOptions transportOptions, final ThreadFactory ioThreadFactory) { + assert transportOptions.getSharedEventLoopThreads() != 0; + if (transportOptions.getSharedEventLoopThreads() < 0) { + return () -> createEventLoopGroup(transportOptions, ioThreadFactory); + } + while (true) { + SharedGroupRef sharedGroupRef = sharedEventLoopGroup.get(); + if (sharedGroupRef != null && sharedGroupRef.retain()) { + return new DisposableLease(sharedGroupRef); + } + // lease == null || lease not valid anymore, still disposing + synchronized (this) { + sharedGroupRef = sharedEventLoopGroup.get(); + if (sharedGroupRef != null && sharedGroupRef.retain()) { + return new DisposableLease(sharedGroupRef); + } + // lease == null || lease not valid anymore, still disposing Review comment: Convert to text so it doesnt look like partly commented out code. "Lease is null, or..." ########## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java ########## @@ -214,8 +200,7 @@ public void operationComplete(ChannelFuture future) throws Exception { } }); } - - return group; + return channel.eventLoop(); Review comment: This may be worth a comment, that it needs to return the specific event loop for the connection, as the group may be multi-threaded. ########## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.util.QpidJMSThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public enum NettyEventLoopGroupPool { + Instance; + + private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class); + private static final int SHUTDOWN_TIMEOUT = 50; + private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = new AtomicInteger(0); + + public interface Lease extends AutoCloseable { + + EventLoopGroup group(); + + @Override + default void close() { + Future<?> fut = group().shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + } + + private static final class DisposableLease implements Lease { + + private final SharedGroupRef sharedGroupRef; + private final AtomicBoolean closed; + + public DisposableLease(SharedGroupRef sharedGroupRef) { + this.sharedGroupRef = sharedGroupRef; + this.closed = new AtomicBoolean(); + } + + @Override + public EventLoopGroup group() { + return sharedGroupRef.group(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + sharedGroupRef.release(); + } + } + } + + private final class SharedGroupRef { + + private final EventLoopGroup group; + private final AtomicInteger refs; + + private SharedGroupRef(final EventLoopGroup group) { + this.group = group; + refs = new AtomicInteger(1); + } + + public boolean retain() { + while (true) { + final int currValue = refs.get(); + if (currValue == 0) { + // this has been already disposed! + return false; + } + if (refs.compareAndSet(currValue, currValue + 1)) { + return true; + } + } + } + + public EventLoopGroup group() { + if (refs.get() == 0) { + throw new IllegalStateException("the event loop group cannot be reused"); + } + return group; + } + + public void release() { + while (true) { + final int currValue = refs.get(); + if (currValue == 0) { + return; + } + if (refs.compareAndSet(currValue, currValue - 1)) { + if (currValue == 1) { + // if a racing thread has borrowed this lease, it would try already to set it to a new value: + // this one is a best effort cleanup to help GC + sharedEventLoopGroup.compareAndSet(this, null); Review comment: Surely they cant have borrowed this lease or we wouldnt get to this line. Did you mean to say that if a racing thread *failed* to retain this group ref and so created a new group, it could race this thread in trying to set sharedEventLoopGroup to the new value? ########## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.util.QpidJMSThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public enum NettyEventLoopGroupPool { + Instance; + + private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class); + private static final int SHUTDOWN_TIMEOUT = 50; + private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = new AtomicInteger(0); + + public interface Lease extends AutoCloseable { + + EventLoopGroup group(); + + @Override + default void close() { Review comment: Given the way it is used I think a simple signature-only Lease interface with two distinct actual implementations would be far simpler to follow in this case than what is being done (essentially relying on this default method for the single-thread groups case so you can still return a lambda, and then immediately overriding the close for the shared group case this entire class-err-enum mainly exists to service, where it instead creates/returns using a class). Its also feels a bit odd that this returns non-pooled and pooled from a 'Instance' of NettyEventLoopGroupPool. I think a static create style method that documented its behaviour might again be clearer, or for the class-err-enum to be renamed something like NettyEventLoopGroupFactory (or for the pooled and non-pooled bits to just be separate). ########## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.util.QpidJMSThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public enum NettyEventLoopGroupPool { + Instance; Review comment: Instance would be nicer in capitals. I think I'd go with a class than an enum in this case, especially given all the interfaces etc. Is there a reason to favour enum? ########## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.util.QpidJMSThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public enum NettyEventLoopGroupPool { + Instance; + + private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class); + private static final int SHUTDOWN_TIMEOUT = 50; + private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = new AtomicInteger(0); + + public interface Lease extends AutoCloseable { + + EventLoopGroup group(); + + @Override + default void close() { + Future<?> fut = group().shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + } + + private static final class DisposableLease implements Lease { + + private final SharedGroupRef sharedGroupRef; + private final AtomicBoolean closed; + + public DisposableLease(SharedGroupRef sharedGroupRef) { + this.sharedGroupRef = sharedGroupRef; + this.closed = new AtomicBoolean(); + } + + @Override + public EventLoopGroup group() { + return sharedGroupRef.group(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + sharedGroupRef.release(); + } + } + } + + private final class SharedGroupRef { + + private final EventLoopGroup group; + private final AtomicInteger refs; + + private SharedGroupRef(final EventLoopGroup group) { + this.group = group; + refs = new AtomicInteger(1); + } + + public boolean retain() { + while (true) { + final int currValue = refs.get(); + if (currValue == 0) { + // this has been already disposed! + return false; + } + if (refs.compareAndSet(currValue, currValue + 1)) { + return true; + } + } + } + + public EventLoopGroup group() { + if (refs.get() == 0) { + throw new IllegalStateException("the event loop group cannot be reused"); + } + return group; + } + + public void release() { + while (true) { + final int currValue = refs.get(); + if (currValue == 0) { + return; + } + if (refs.compareAndSet(currValue, currValue - 1)) { + if (currValue == 1) { + // if a racing thread has borrowed this lease, it would try already to set it to a new value: + // this one is a best effort cleanup to help GC + sharedEventLoopGroup.compareAndSet(this, null); + Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + return; + } + } + } + } + + private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new AtomicReference<>(null); + + public Lease borrow(final TransportOptions transportOptions, final ThreadFactory ioThreadFactory) { + assert transportOptions.getSharedEventLoopThreads() != 0; + if (transportOptions.getSharedEventLoopThreads() < 0) { + return () -> createEventLoopGroup(transportOptions, ioThreadFactory); + } + while (true) { + SharedGroupRef sharedGroupRef = sharedEventLoopGroup.get(); + if (sharedGroupRef != null && sharedGroupRef.retain()) { + return new DisposableLease(sharedGroupRef); + } + // lease == null || lease not valid anymore, still disposing Review comment: Convert to text so it doesnt look like partly commented out code. "Lease is null, or..." ########## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.util.QpidJMSThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public enum NettyEventLoopGroupPool { + Instance; + + private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class); + private static final int SHUTDOWN_TIMEOUT = 50; + private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = new AtomicInteger(0); + + public interface Lease extends AutoCloseable { Review comment: Followup, dont really like 'Lease' given the default non-pooled behaviour. Perhaps something like Holder? ########## File path: qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyEventLoopGroupPool.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.transports.netty; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.Future; +import org.apache.qpid.jms.transports.TransportOptions; +import org.apache.qpid.jms.util.QpidJMSThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public enum NettyEventLoopGroupPool { + Instance; + + private static final Logger LOG = LoggerFactory.getLogger(NettyEventLoopGroupPool.class); + private static final int SHUTDOWN_TIMEOUT = 50; + private static final AtomicInteger SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE = new AtomicInteger(0); + + public interface Lease extends AutoCloseable { + + EventLoopGroup group(); + + @Override + default void close() { + Future<?> fut = group().shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + } + + private static final class DisposableLease implements Lease { + + private final SharedGroupRef sharedGroupRef; + private final AtomicBoolean closed; + + public DisposableLease(SharedGroupRef sharedGroupRef) { + this.sharedGroupRef = sharedGroupRef; + this.closed = new AtomicBoolean(); + } + + @Override + public EventLoopGroup group() { + return sharedGroupRef.group(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + sharedGroupRef.release(); + } + } + } + + private final class SharedGroupRef { + + private final EventLoopGroup group; + private final AtomicInteger refs; + + private SharedGroupRef(final EventLoopGroup group) { + this.group = group; + refs = new AtomicInteger(1); + } + + public boolean retain() { + while (true) { + final int currValue = refs.get(); + if (currValue == 0) { + // this has been already disposed! + return false; + } + if (refs.compareAndSet(currValue, currValue + 1)) { + return true; + } + } + } + + public EventLoopGroup group() { + if (refs.get() == 0) { + throw new IllegalStateException("the event loop group cannot be reused"); + } + return group; + } + + public void release() { + while (true) { + final int currValue = refs.get(); + if (currValue == 0) { + return; + } + if (refs.compareAndSet(currValue, currValue - 1)) { + if (currValue == 1) { + // if a racing thread has borrowed this lease, it would try already to set it to a new value: + // this one is a best effort cleanup to help GC + sharedEventLoopGroup.compareAndSet(this, null); + Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + return; + } + } + } + } + + private final AtomicReference<SharedGroupRef> sharedEventLoopGroup = new AtomicReference<>(null); + + public Lease borrow(final TransportOptions transportOptions, final ThreadFactory ioThreadFactory) { + assert transportOptions.getSharedEventLoopThreads() != 0; + if (transportOptions.getSharedEventLoopThreads() < 0) { + return () -> createEventLoopGroup(transportOptions, ioThreadFactory); + } + while (true) { + SharedGroupRef sharedGroupRef = sharedEventLoopGroup.get(); + if (sharedGroupRef != null && sharedGroupRef.retain()) { + return new DisposableLease(sharedGroupRef); + } + // lease == null || lease not valid anymore, still disposing + synchronized (this) { + sharedGroupRef = sharedEventLoopGroup.get(); + if (sharedGroupRef != null && sharedGroupRef.retain()) { + return new DisposableLease(sharedGroupRef); + } + // lease == null || lease not valid anymore, still disposing + sharedGroupRef = new SharedGroupRef(createEventLoopGroup(transportOptions, + new QpidJMSThreadFactory("SharedNettyEventLoopGroup :(" + + SHARED_NETTY_EVENT_LOOP_GROUP_SEQUENCE.incrementAndGet() + ")", true))); Review comment: Same general comment about thread name as earlier session PR, it would now entirely generic in operation vs the existing behaviour of being named for the connection its operating for. Thats often useful, so might be nice if the effect could be retained in some fashion when using the pool (seems like it will be retained without it). Though it may be so ugly to implement, e.g wrapping the channel handler or transport listener, that its not actually desirable to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org