Repository: qpid-broker-j Updated Branches: refs/heads/6.0.x e2c3146ef -> 5502628ec
QPID-7791: Ensure that threads used by the recoverer dispose thread-locally cached QBBs Cherry picked from 894c0b644f98f948e5cc245b3ad483a295287171. Conflicts resolved by hand. Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/5502628e Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/5502628e Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/5502628e Branch: refs/heads/6.0.x Commit: 5502628ecb7be0e3a5af37d6358a90df27fb272e Parents: e2c3146 Author: Keith Wall <kw...@apache.org> Authored: Wed Jun 21 11:19:34 2017 +0100 Committer: Keith Wall <kw...@apache.org> Committed: Wed Jun 21 13:53:54 2017 +0100 ---------------------------------------------------------------------- .../configuration/updater/TaskExecutorImpl.java | 24 +++--- ...idByteBufferDisposingThreadPoolExecutor.java | 81 ++++++++++++++++++++ .../transport/NetworkConnectionScheduler.java | 37 +-------- .../AsynchronousMessageStoreRecoverer.java | 9 ++- 4 files changed, 106 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5502628e/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java index 56a6279..1ed4110 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java +++ b/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java @@ -31,8 +31,8 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -44,6 +44,7 @@ import javax.security.auth.Subject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.qpid.server.pool.QpidByteBufferDisposingThreadPoolExecutor; import org.apache.qpid.server.util.ServerScopedRuntimeException; public class TaskExecutorImpl implements TaskExecutor @@ -81,15 +82,18 @@ public class TaskExecutorImpl implements TaskExecutor if (_running.compareAndSet(false, true)) { LOGGER.debug("Starting task executor {}", _name); - _executor = Executors.newFixedThreadPool(1, new ThreadFactory() - { - @Override - public Thread newThread(Runnable r) - { - _taskThread = new TaskThread(r, _name, TaskExecutorImpl.this); - return _taskThread; - } - }); + _executor = new QpidByteBufferDisposingThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), + new ThreadFactory() + { + @Override + public Thread newThread(Runnable r) + { + _taskThread = new TaskThread(r, _name, TaskExecutorImpl.this); + return _taskThread; + } + }); LOGGER.debug("Task executor is started"); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5502628e/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java b/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java new file mode 100644 index 0000000..77ef166 --- /dev/null +++ b/broker-core/src/main/java/org/apache/qpid/server/pool/QpidByteBufferDisposingThreadPoolExecutor.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.pool; + +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.bytebuffer.QpidByteBuffer; + +public class QpidByteBufferDisposingThreadPoolExecutor extends ThreadPoolExecutor +{ + private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>(); + + public QpidByteBufferDisposingThreadPoolExecutor(final int corePoolSize, + final int maximumPoolSize, + final long keepAliveTime, + final TimeUnit unit, + final BlockingQueue<Runnable> workQueue) + { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory()); + } + + public QpidByteBufferDisposingThreadPoolExecutor(final int corePoolSize, + final int maximumPoolSize, + final long keepAliveTime, + final TimeUnit unit, + final BlockingQueue<Runnable> workQueue, + final ThreadFactory factory) + { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, factory); + } + + @Override + protected void afterExecute(final Runnable r, final Throwable t) + { + super.afterExecute(r, t); + final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer(); + if (cachedThreadLocalBuffer != null) + { + _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer); + } + else + { + _cachedBufferMap.remove(Thread.currentThread()); + } + } + + @Override + protected void terminated() + { + super.terminated(); + for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values()) + { + qpidByteBuffer.dispose(); + } + _cachedBufferMap.clear(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5502628e/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java index 2745809..d1046de 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java +++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java @@ -34,9 +34,9 @@ import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.transport.TransportException; +import org.apache.qpid.server.pool.QpidByteBufferDisposingThreadPoolExecutor; public class NetworkConnectionScheduler { @@ -101,38 +101,9 @@ public class NetworkConnectionScheduler try { _selectorThread = new SelectorThread(this, _numberOfSelectors); - _executor = new ThreadPoolExecutor(_poolSize, _poolSize, - _threadKeepAliveTimeout, TimeUnit.MINUTES, - new LinkedBlockingQueue<Runnable>(), _factory) - { - private final Map<Thread, QpidByteBuffer> _cachedBufferMap = new ConcurrentHashMap<>(); - - @Override - protected void afterExecute(final Runnable r, final Throwable t) - { - super.afterExecute(r, t); - final QpidByteBuffer cachedThreadLocalBuffer = QpidByteBuffer.getCachedThreadLocalBuffer(); - if (cachedThreadLocalBuffer != null) - { - _cachedBufferMap.put(Thread.currentThread(), cachedThreadLocalBuffer); - } - else - { - _cachedBufferMap.remove(Thread.currentThread()); - } - } - - @Override - protected void terminated() - { - super.terminated(); - for (QpidByteBuffer qpidByteBuffer : _cachedBufferMap.values()) - { - qpidByteBuffer.dispose(); - } - _cachedBufferMap.clear(); - } - }; + _executor = new QpidByteBufferDisposingThreadPoolExecutor(_poolSize, _poolSize, + _threadKeepAliveTimeout, TimeUnit.MINUTES, + new LinkedBlockingQueue<Runnable>(), _factory); _executor.prestartAllCoreThreads(); _executor.allowCoreThreadTimeOut(true); for(int i = 0 ; i < _poolSize; i++) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/5502628e/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java index cd20bf3..d7933b8 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -58,6 +58,7 @@ import org.apache.qpid.server.txn.DtxBranch; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.pool.QpidByteBufferDisposingThreadPoolExecutor; import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.transport.Xid; import org.apache.qpid.transport.util.Functions; @@ -97,7 +98,11 @@ public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer private final Set<AMQQueue<?>> _recoveringQueues = new CopyOnWriteArraySet<>(); private final AtomicBoolean _recoveryComplete = new AtomicBoolean(); private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>(); - private final ListeningExecutorService _queueRecoveryExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + private final ListeningExecutorService _queueRecoveryExecutor = + MoreExecutors.listeningDecorator(new QpidByteBufferDisposingThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>())); + private final MessageStore.MessageStoreReader _storeReader; private AtomicBoolean _continueRecovery = new AtomicBoolean(true); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org