QPID-7731: Dispose cached QpidByteBuffers on stopping http threads

Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/35bc1cd0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/35bc1cd0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/35bc1cd0

Branch: refs/heads/master
Commit: 35bc1cd00112650d34730af33d9ae6bb7a8d1906
Parents: be26c8b
Author: Alex Rudyy <oru...@apache.org>
Authored: Thu May 25 23:36:53 2017 +0100
Committer: Alex Rudyy <oru...@apache.org>
Committed: Thu May 25 23:36:53 2017 +0100

----------------------------------------------------------------------
 .../management/plugin/HttpManagement.java       | 75 +++++++++++++++++++-
 .../transport/websocket/WebSocketProvider.java  | 67 ++++++++++++++++-
 2 files changed, 139 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/35bc1cd0/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
 
b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index ea1a76b..c67dbcf 100644
--- 
a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ 
b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -34,6 +34,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -62,12 +63,14 @@ import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.servlets.CrossOriginFilter;
+import org.eclipse.jetty.util.annotation.Name;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
 import org.apache.qpid.server.logging.messages.PortMessages;
@@ -482,8 +485,8 @@ public class HttpManagement extends 
AbstractPluginAdapter<HttpManagement> implem
         }
 
         ServerConnector connector = new ServerConnector(server,
-                                                        new 
QueuedThreadPool(port.getThreadPoolMaximum(),
-                                                                             
port.getThreadPoolMinimum()),
+                                                        new 
QBBTrackingThreadPool(port.getThreadPoolMaximum(),
+                                                                               
   port.getThreadPoolMinimum()),
                                                         null,
                                                         null,
                                                         
port.getDesiredNumberOfAcceptors(),
@@ -825,4 +828,72 @@ public class HttpManagement extends 
AbstractPluginAdapter<HttpManagement> implem
             }
         }
     }
+
+    private static class QBBTrackingThreadPool extends QueuedThreadPool
+    {
+        private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new 
ConcurrentHashMap<>();
+
+        public QBBTrackingThreadPool(@Name("maxThreads") final int maxThreads, 
@Name("minThreads") final int minThreads)
+        {
+            super(maxThreads, minThreads);
+        }
+
+        @Override
+        protected void doStop() throws Exception
+        {
+            try
+            {
+                super.doStop();
+            }
+            finally
+            {
+                for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+                {
+                    qpidByteBuffer.dispose();
+                }
+                _cachedBufferMap.clear();
+            }
+        }
+
+        @Override
+        protected Thread newThread(final Runnable runnable)
+        {
+            return super.newThread(() ->
+                {
+                    try
+                    {
+                        runnable.run();
+                    }
+                    finally
+                    {
+                        QpidByteBuffer qbb = 
_cachedBufferMap.remove(Thread.currentThread());
+                        if (qbb != null)
+                        {
+                            qbb.dispose();
+                        }
+                    }
+            });
+        }
+
+        @Override
+        protected void runJob(final Runnable job)
+        {
+            try
+            {
+                super.runJob(job);
+            }
+            finally
+            {
+                final QpidByteBuffer cachedThreadLocalBuffer = 
QpidByteBuffer.getCachedThreadLocalBuffer();
+                if (cachedThreadLocalBuffer != null)
+                {
+                    _cachedBufferMap.put(Thread.currentThread(), 
cachedThreadLocalBuffer);
+                }
+                else
+                {
+                    _cachedBufferMap.remove(Thread.currentThread());
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/35bc1cd0/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
 
b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index 1240716..7dc4e4e 100644
--- 
a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ 
b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -30,7 +30,9 @@ import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,7 +50,9 @@ import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.ServerConnector;
 import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.util.annotation.Name;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.eclipse.jetty.util.thread.ThreadPool;
 import org.eclipse.jetty.websocket.api.Session;
 import org.eclipse.jetty.websocket.api.WebSocketAdapter;
@@ -122,7 +126,68 @@ class WebSocketProvider implements AcceptingTransport
     {
         _idleTimeoutChecker.start();
 
-        _server = new Server();
+        _server = new Server(new QueuedThreadPool()
+        {
+            private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new 
ConcurrentHashMap<>();
+
+            @Override
+            protected void doStop() throws Exception
+            {
+                try
+                {
+                    super.doStop();
+                }
+                finally
+                {
+                    for (QpidByteBuffer qpidByteBuffer : 
_cachedBufferMap.values())
+                    {
+                        qpidByteBuffer.dispose();
+                    }
+                    _cachedBufferMap.clear();
+                }
+            }
+
+            @Override
+            protected Thread newThread(final Runnable runnable)
+            {
+                return super.newThread(() ->
+                                       {
+                                           try
+                                           {
+                                               runnable.run();
+                                           }
+                                           finally
+                                           {
+                                               QpidByteBuffer qbb = 
_cachedBufferMap.remove(Thread.currentThread());
+                                               if (qbb != null)
+                                               {
+                                                   qbb.dispose();
+                                               }
+                                           }
+                                       });
+            }
+
+            @Override
+            protected void runJob(final Runnable job)
+            {
+                try
+                {
+                    super.runJob(job);
+                }
+                finally
+                {
+                    final QpidByteBuffer cachedThreadLocalBuffer = 
QpidByteBuffer.getCachedThreadLocalBuffer();
+                    if (cachedThreadLocalBuffer != null)
+                    {
+                        _cachedBufferMap.put(Thread.currentThread(), 
cachedThreadLocalBuffer);
+                    }
+                    else
+                    {
+                        _cachedBufferMap.remove(Thread.currentThread());
+                    }
+                }
+            }
+        });
 
         final ServerConnector connector;
         HttpConnectionFactory httpConnectionFactory = new 
HttpConnectionFactory();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to