QPID-7731: Dispose cached QpidByteBuffers on stopping http threads
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/35bc1cd0 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/35bc1cd0 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/35bc1cd0 Branch: refs/heads/master Commit: 35bc1cd00112650d34730af33d9ae6bb7a8d1906 Parents: be26c8b Author: Alex Rudyy <oru...@apache.org> Authored: Thu May 25 23:36:53 2017 +0100 Committer: Alex Rudyy <oru...@apache.org> Committed: Thu May 25 23:36:53 2017 +0100 ---------------------------------------------------------------------- .../management/plugin/HttpManagement.java | 75 +++++++++++++++++++- .../transport/websocket/WebSocketProvider.java | 67 ++++++++++++++++- 2 files changed, 139 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/35bc1cd0/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java ---------------------------------------------------------------------- diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index ea1a76b..c67dbcf 100644 --- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -34,6 +34,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -62,12 +63,14 @@ import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlets.CrossOriginFilter; +import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.ExecutorThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; import org.apache.qpid.server.logging.messages.PortMessages; @@ -482,8 +485,8 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem } ServerConnector connector = new ServerConnector(server, - new QueuedThreadPool(port.getThreadPoolMaximum(), - port.getThreadPoolMinimum()), + new QBBTrackingThreadPool(port.getThreadPoolMaximum(), + port.getThreadPoolMinimum()), null, null, port.getDesiredNumberOfAcceptors(), @@ -825,4 +828,72 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem } } } + + private static class QBBTrackingThreadPool extends QueuedThreadPool + { + private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>(); + + public QBBTrackingThreadPool(@Name("maxThreads") final int maxThreads, @Name("minThreads") final int minThreads) + { + super(maxThreads, minThreads); + } + + @Override + protected void doStop() throws Exception + { + try + { + super.doStop(); + } + finally + { + for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values()) + { + qpidByteBuffer.dispose(); + } + _cachedBufferMap.clear(); + } + } + + @Override + protected Thread newThread(final Runnable runnable) + { + return super.newThread(() -> + { + try + { + runnable.run(); + } + finally + { + QpidByteBuffer qbb = _cachedBufferMap.remove(Thread.currentThread()); + if (qbb != null) + { + qbb.dispose(); + } + } + }); + } + + @Override + protected void runJob(final Runnable job) + { + try + { + super.runJob(job); + } + finally + { + final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer(); + if (cachedThreadLocalBuffer != null) + { + _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer); + } + else + { + _cachedBufferMap.remove(Thread.currentThread()); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/35bc1cd0/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java ---------------------------------------------------------------------- diff --git a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java index 1240716..7dc4e4e 100644 --- a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java +++ b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -30,7 +30,9 @@ import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,7 +50,9 @@ import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.annotation.Name; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.ThreadPool; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WebSocketAdapter; @@ -122,7 +126,68 @@ class WebSocketProvider implements AcceptingTransport { _idleTimeoutChecker.start(); - _server = new Server(); + _server = new Server(new QueuedThreadPool() + { + private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>(); + + @Override + protected void doStop() throws Exception + { + try + { + super.doStop(); + } + finally + { + for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values()) + { + qpidByteBuffer.dispose(); + } + _cachedBufferMap.clear(); + } + } + + @Override + protected Thread newThread(final Runnable runnable) + { + return super.newThread(() -> + { + try + { + runnable.run(); + } + finally + { + QpidByteBuffer qbb = _cachedBufferMap.remove(Thread.currentThread()); + if (qbb != null) + { + qbb.dispose(); + } + } + }); + } + + @Override + protected void runJob(final Runnable job) + { + try + { + super.runJob(job); + } + finally + { + final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer(); + if (cachedThreadLocalBuffer != null) + { + _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer); + } + else + { + _cachedBufferMap.remove(Thread.currentThread()); + } + } + } + }); final ServerConnector connector; HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org