Repository: qpid-broker-j
Updated Branches:
  refs/heads/6.0.x e2c3146ef -> 5502628ec


QPID-7791: Ensure that threads used by the recoverer dispose thread-locally 
cached QBBs

Cherry picked from 894c0b644f98f948e5cc245b3ad483a295287171.  Conflicts 
resolved by hand.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/5502628e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/5502628e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/5502628e

Branch: refs/heads/6.0.x
Commit: 5502628ecb7be0e3a5af37d6358a90df27fb272e
Parents: e2c3146
Author: Keith Wall <kw...@apache.org>
Authored: Wed Jun 21 11:19:34 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jun 21 13:53:54 2017 +0100

----------------------------------------------------------------------
 .../configuration/updater/TaskExecutorImpl.java | 24 +++---
 ...idByteBufferDisposingThreadPoolExecutor.java | 81 ++++++++++++++++++++
 .../transport/NetworkConnectionScheduler.java   | 37 +--------
 .../AsynchronousMessageStoreRecoverer.java      |  9 ++-
 4 files changed, 106 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5502628e/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
 
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
index 56a6279..1ed4110 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
@@ -31,8 +31,8 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
@@ -44,6 +44,7 @@ import javax.security.auth.Subject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.pool.QpidByteBufferDisposingThreadPoolExecutor;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public class TaskExecutorImpl implements TaskExecutor
@@ -81,15 +82,18 @@ public class TaskExecutorImpl implements TaskExecutor
         if (_running.compareAndSet(false, true))
         {
             LOGGER.debug("Starting task executor {}", _name);
-            _executor = Executors.newFixedThreadPool(1, new ThreadFactory()
-            {
-                @Override
-                public Thread newThread(Runnable r)
-                {
-                    _taskThread = new TaskThread(r, _name, 
TaskExecutorImpl.this);
-                    return _taskThread;
-                }
-            });
+            _executor = new QpidByteBufferDisposingThreadPoolExecutor(1, 1,
+                                                                      0L, 
TimeUnit.MILLISECONDS,
+                                                                      new 
LinkedBlockingQueue<Runnable>(),
+                                                                      new 
ThreadFactory()
+                                               {
+                                                   @Override
+                                                   public Thread 
newThread(Runnable r)
+                                                   {
+                                                       _taskThread = new 
TaskThread(r, _name, TaskExecutorImpl.this);
+                                                       return _taskThread;
+                                                   }
+                                               });
             LOGGER.debug("Task executor is started");
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5502628e/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java
 
b/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java
new file mode 100644
index 0000000..77ef166
--- /dev/null
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.pool;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.bytebuffer.QpidByteBuffer;
+
+public class QpidByteBufferDisposingThreadPoolExecutor extends 
ThreadPoolExecutor
+{
+    private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new 
ConcurrentHashMap<>();
+
+    public QpidByteBufferDisposingThreadPoolExecutor(final int corePoolSize,
+                                                     final int maximumPoolSize,
+                                                     final long keepAliveTime,
+                                                     final TimeUnit unit,
+                                                     final 
BlockingQueue<Runnable> workQueue)
+    {
+        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
Executors.defaultThreadFactory());
+    }
+
+    public QpidByteBufferDisposingThreadPoolExecutor(final int corePoolSize,
+                                                     final int maximumPoolSize,
+                                                     final long keepAliveTime,
+                                                     final TimeUnit unit,
+                                                     final 
BlockingQueue<Runnable> workQueue,
+                                                     final ThreadFactory 
factory)
+    {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
factory);
+    }
+
+    @Override
+    protected void afterExecute(final Runnable r, final Throwable t)
+    {
+        super.afterExecute(r, t);
+        final QpidByteBuffer cachedThreadLocalBuffer = 
QpidByteBuffer.getCachedThreadLocalBuffer();
+        if (cachedThreadLocalBuffer != null)
+        {
+            _cachedBufferMap.put(Thread.currentThread(), 
cachedThreadLocalBuffer);
+        }
+        else
+        {
+            _cachedBufferMap.remove(Thread.currentThread());
+        }
+    }
+
+    @Override
+    protected void terminated()
+    {
+        super.terminated();
+        for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values())
+        {
+            qpidByteBuffer.dispose();
+        }
+        _cachedBufferMap.clear();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5502628e/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
 
b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
index 2745809..d1046de 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
@@ -34,9 +34,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.transport.TransportException;
 
+import org.apache.qpid.server.pool.QpidByteBufferDisposingThreadPoolExecutor;
 
 public class NetworkConnectionScheduler
 {
@@ -101,38 +101,9 @@ public class NetworkConnectionScheduler
         try
         {
             _selectorThread = new SelectorThread(this, _numberOfSelectors);
-            _executor = new ThreadPoolExecutor(_poolSize, _poolSize,
-                                               _threadKeepAliveTimeout, 
TimeUnit.MINUTES,
-                                               new 
LinkedBlockingQueue<Runnable>(), _factory)
-            {
-                private final Map<Thread, QpidByteBuffer> _cachedBufferMap = 
new ConcurrentHashMap<>();
-
-                @Override
-                protected void afterExecute(final Runnable r, final Throwable 
t)
-                {
-                    super.afterExecute(r, t);
-                    final QpidByteBuffer cachedThreadLocalBuffer = 
QpidByteBuffer.getCachedThreadLocalBuffer();
-                    if (cachedThreadLocalBuffer != null)
-                    {
-                        _cachedBufferMap.put(Thread.currentThread(), 
cachedThreadLocalBuffer);
-                    }
-                    else
-                    {
-                        _cachedBufferMap.remove(Thread.currentThread());
-                    }
-                }
-
-                @Override
-                protected void terminated()
-                {
-                    super.terminated();
-                    for (QpidByteBuffer qpidByteBuffer : 
_cachedBufferMap.values())
-                    {
-                        qpidByteBuffer.dispose();
-                    }
-                    _cachedBufferMap.clear();
-                }
-            };
+            _executor = new 
QpidByteBufferDisposingThreadPoolExecutor(_poolSize, _poolSize,
+                                                                      
_threadKeepAliveTimeout, TimeUnit.MINUTES,
+                                                                      new 
LinkedBlockingQueue<Runnable>(), _factory);
             _executor.prestartAllCoreThreads();
             _executor.allowCoreThreadTimeOut(true);
             for(int i = 0 ; i < _poolSize; i++)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5502628e/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
----------------------------------------------------------------------
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
index cd20bf3..d7933b8 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -58,6 +58,7 @@ import org.apache.qpid.server.txn.DtxBranch;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.pool.QpidByteBufferDisposingThreadPoolExecutor;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.transport.Xid;
 import org.apache.qpid.transport.util.Functions;
@@ -97,7 +98,11 @@ public class AsynchronousMessageStoreRecoverer implements 
MessageStoreRecoverer
         private final Set<AMQQueue<?>> _recoveringQueues = new 
CopyOnWriteArraySet<>();
         private final AtomicBoolean _recoveryComplete = new AtomicBoolean();
         private final Map<Long, MessageReference<? extends ServerMessage<?>>> 
_recoveredMessages = new HashMap<>();
-        private final ListeningExecutorService _queueRecoveryExecutor = 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+        private final ListeningExecutorService _queueRecoveryExecutor =
+                MoreExecutors.listeningDecorator(new 
QpidByteBufferDisposingThreadPoolExecutor(0, Integer.MAX_VALUE,
+                                                                               
                60L, TimeUnit.SECONDS,
+                                                                               
                new SynchronousQueue<Runnable>()));
+
         private final MessageStore.MessageStoreReader _storeReader;
         private AtomicBoolean _continueRecovery = new AtomicBoolean(true);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to