Author: kwall Date: Wed Nov 5 13:32:54 2014 New Revision: 1636871 URL: http://svn.apache.org/r1636871 Log: QPID-6214: [Java Broker] Change asynch recoverer to allow for task cancellation
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1636871&r1=1636870&r2=1636871&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Wed Nov 5 13:32:54 2014 @@ -161,7 +161,7 @@ public abstract class AbstractVirtualHos private MessageDestination _defaultDestination; private MessageStore _messageStore; - + private MessageStoreRecoverer _messageStoreRecoverer; public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) { @@ -688,6 +688,11 @@ public abstract class AbstractVirtualHos { try { + if (_messageStoreRecoverer != null) + { + _messageStoreRecoverer.cancel(); + } + getMessageStore().closeMessageStore(); } catch (StoreException e) @@ -1411,16 +1416,15 @@ public abstract class AbstractVirtualHos createDefaultExchanges(); } - MessageStoreRecoverer messageStoreRecoverer; if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY)) { - messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); + _messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); } else { - messageStoreRecoverer = new SynchronousMessageStoreRecoverer(); + _messageStoreRecoverer = new SynchronousMessageStoreRecoverer(); } - messageStoreRecoverer.recover(this); + _messageStoreRecoverer.recover(this); State finalState = State.ERRORED; Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1636871&r1=1636870&r2=1636871&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java Wed Nov 5 13:32:54 2014 @@ -27,6 +27,9 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; @@ -56,17 +59,28 @@ import org.apache.qpid.transport.util.Fu public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer { private static final Logger _logger = Logger.getLogger(AsynchronousMessageStoreRecoverer.class); + private AsynchronousRecoverer _asynchronousRecoverer; @Override public void recover(final VirtualHostImpl virtualHost) { - AsynchronousRecoverer asynchronousRecoverer = new AsynchronousRecoverer(virtualHost); + _asynchronousRecoverer = new AsynchronousRecoverer(virtualHost); - asynchronousRecoverer.recover(); + _asynchronousRecoverer.recover(); + } + + @Override + public void cancel() + { + if (_asynchronousRecoverer != null) + { + _asynchronousRecoverer.cancel(); + } } private static class AsynchronousRecoverer { + public static final int THREAD_POOL_SHUTDOWN_TIMEOUT = 5000; private final VirtualHostImpl<?, ?, ?> _virtualHost; private final EventLogger _eventLogger; private final MessageStore _store; @@ -75,7 +89,8 @@ public class AsynchronousMessageStoreRec private final Set<AMQQueue<?>> _recoveringQueues = new CopyOnWriteArraySet<>(); private final AtomicBoolean _recoveryComplete = new AtomicBoolean(); private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>(); - + private final ExecutorService _queueRecoveryExecutor = Executors.newCachedThreadPool(); + private AtomicBoolean _continueRecovery = new AtomicBoolean(true); private AsynchronousRecoverer(final VirtualHostImpl<?, ?, ?> virtualHost) { @@ -95,8 +110,7 @@ public class AsynchronousMessageStoreRec for(AMQQueue<?> queue : _recoveringQueues) { - Thread queueThread = new Thread(new QueueRecoveringTask(queue), "Queue Recoverer : " + queue.getName() + " (vh: " + getVirtualHost().getName() + ")"); - queueThread.start(); + _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue)); } } @@ -161,14 +175,18 @@ public class AsynchronousMessageStoreRec { messagesToDelete.add(storedMessage); } - return messageNumber <_maxMessageId-1; + return _continueRecovery.get() && messageNumber <_maxMessageId-1; } }); for(StoredMessage<?> storedMessage : messagesToDelete) { - - _logger.info("Message id " + storedMessage.getMessageNumber() + " in store, but not in any queue - removing...."); - storedMessage.remove(); + if (_continueRecovery.get()) + { + _logger.info("Message id " + + storedMessage.getMessageNumber() + + " in store, but not in any queue - removing...."); + storedMessage.remove(); + } } messagesToDelete.clear(); @@ -198,6 +216,25 @@ public class AsynchronousMessageStoreRec return ref == null ? null : ref.getMessage(); } + public void cancel() + { + _continueRecovery.set(false); + _queueRecoveryExecutor.shutdown(); + try + { + boolean wasShutdown = _queueRecoveryExecutor.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!wasShutdown) + { + _logger.warn("Failed to gracefully shutdown queue recovery executor within permitted time period"); + _queueRecoveryExecutor.shutdownNow(); + } + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + } + private class DistributedTransactionVisitor implements DistributedTransactionHandler { @@ -335,7 +372,7 @@ public class AsynchronousMessageStoreRec branch.setState(DtxBranch.State.PREPARED); branch.prePrepareTransaction(); - return true; + return _continueRecovery.get(); } private StringBuilder xidAsString(Xid id) @@ -364,7 +401,17 @@ public class AsynchronousMessageStoreRec @Override public void run() { - recoverQueue(_queue); + String originalThreadName = Thread.currentThread().getName(); + Thread.currentThread().setName("Queue Recoverer : " + _queue.getName() + " (vh: " + getVirtualHost().getName() + ")"); + + try + { + recoverQueue(_queue); + } + finally + { + Thread.currentThread().setName(originalThreadName); + } } } @@ -408,7 +455,7 @@ public class AsynchronousMessageStoreRec txn.dequeueMessage(_queue, new DummyMessage(messageId)); txn.commitTranAsync(); } - return true; + return _continueRecovery.get(); } else { Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java?rev=1636871&r1=1636870&r2=1636871&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java Wed Nov 5 13:32:54 2014 @@ -23,4 +23,10 @@ package org.apache.qpid.server.virtualho public interface MessageStoreRecoverer { void recover(VirtualHostImpl virtualHost); + + /** + * Cancels any in-progress message store recovery. If message store recovery has already + * completed, this method call has no effect. + */ + void cancel(); } Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1636871&r1=1636870&r2=1636871&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java (original) +++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java Wed Nov 5 13:32:54 2014 @@ -113,6 +113,12 @@ public class SynchronousMessageStoreReco } + @Override + public void cancel() + { + // No-op + } + private static class MessageVisitor implements MessageHandler { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org