Author: kwall
Date: Wed Nov  5 13:32:54 2014
New Revision: 1636871

URL: http://svn.apache.org/r1636871
Log:
QPID-6214: [Java Broker] Change asynch recoverer to allow for task cancellation

Modified:
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1636871&r1=1636870&r2=1636871&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 Wed Nov  5 13:32:54 2014
@@ -161,7 +161,7 @@ public abstract class AbstractVirtualHos
     private MessageDestination _defaultDestination;
 
     private MessageStore _messageStore;
-
+    private MessageStoreRecoverer _messageStoreRecoverer;
 
     public AbstractVirtualHost(final Map<String, Object> attributes, 
VirtualHostNode<?> virtualHostNode)
     {
@@ -688,6 +688,11 @@ public abstract class AbstractVirtualHos
         {
             try
             {
+                if (_messageStoreRecoverer != null)
+                {
+                    _messageStoreRecoverer.cancel();
+                }
+
                 getMessageStore().closeMessageStore();
             }
             catch (StoreException e)
@@ -1411,16 +1416,15 @@ public abstract class AbstractVirtualHos
             createDefaultExchanges();
         }
 
-        MessageStoreRecoverer messageStoreRecoverer;
         if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
         {
-            messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
+            _messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
         }
         else
         {
-           messageStoreRecoverer = new SynchronousMessageStoreRecoverer();
+           _messageStoreRecoverer = new SynchronousMessageStoreRecoverer();
         }
-        messageStoreRecoverer.recover(this);
+        _messageStoreRecoverer.recover(this);
 
 
         State finalState = State.ERRORED;

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java?rev=1636871&r1=1636870&r2=1636871&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
 Wed Nov  5 13:32:54 2014
@@ -27,6 +27,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.log4j.Logger;
@@ -56,17 +59,28 @@ import org.apache.qpid.transport.util.Fu
 public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
 {
     private static final Logger _logger = 
Logger.getLogger(AsynchronousMessageStoreRecoverer.class);
+    private AsynchronousRecoverer _asynchronousRecoverer;
 
     @Override
     public void recover(final VirtualHostImpl virtualHost)
     {
-        AsynchronousRecoverer asynchronousRecoverer = new 
AsynchronousRecoverer(virtualHost);
+        _asynchronousRecoverer = new AsynchronousRecoverer(virtualHost);
 
-        asynchronousRecoverer.recover();
+        _asynchronousRecoverer.recover();
+    }
+
+    @Override
+    public void cancel()
+    {
+        if (_asynchronousRecoverer != null)
+        {
+            _asynchronousRecoverer.cancel();
+        }
     }
 
     private static class AsynchronousRecoverer
     {
+        public static final int THREAD_POOL_SHUTDOWN_TIMEOUT = 5000;
         private final VirtualHostImpl<?, ?, ?> _virtualHost;
         private final EventLogger _eventLogger;
         private final MessageStore _store;
@@ -75,7 +89,8 @@ public class AsynchronousMessageStoreRec
         private final Set<AMQQueue<?>> _recoveringQueues = new 
CopyOnWriteArraySet<>();
         private final AtomicBoolean _recoveryComplete = new AtomicBoolean();
         private final Map<Long, MessageReference<? extends ServerMessage<?>>> 
_recoveredMessages = new HashMap<>();
-
+        private final ExecutorService _queueRecoveryExecutor = 
Executors.newCachedThreadPool();
+        private AtomicBoolean _continueRecovery = new AtomicBoolean(true);
 
         private AsynchronousRecoverer(final VirtualHostImpl<?, ?, ?> 
virtualHost)
         {
@@ -95,8 +110,7 @@ public class AsynchronousMessageStoreRec
 
             for(AMQQueue<?> queue : _recoveringQueues)
             {
-                Thread queueThread = new Thread(new 
QueueRecoveringTask(queue), "Queue Recoverer : " + queue.getName() + " (vh: " + 
getVirtualHost().getName() + ")");
-                queueThread.start();
+                _queueRecoveryExecutor.submit(new QueueRecoveringTask(queue));
             }
         }
 
@@ -161,14 +175,18 @@ public class AsynchronousMessageStoreRec
                     {
                         messagesToDelete.add(storedMessage);
                     }
-                    return messageNumber <_maxMessageId-1;
+                    return _continueRecovery.get() && messageNumber 
<_maxMessageId-1;
                 }
             });
             for(StoredMessage<?> storedMessage : messagesToDelete)
             {
-
-                _logger.info("Message id " + storedMessage.getMessageNumber() 
+ " in store, but not in any queue - removing....");
-                storedMessage.remove();
+                if (_continueRecovery.get())
+                {
+                    _logger.info("Message id "
+                                 + storedMessage.getMessageNumber()
+                                 + " in store, but not in any queue - 
removing....");
+                    storedMessage.remove();
+                }
             }
 
             messagesToDelete.clear();
@@ -198,6 +216,25 @@ public class AsynchronousMessageStoreRec
             return ref == null ? null : ref.getMessage();
         }
 
+        public void cancel()
+        {
+            _continueRecovery.set(false);
+            _queueRecoveryExecutor.shutdown();
+            try
+            {
+                boolean wasShutdown = 
_queueRecoveryExecutor.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS);
+                if (!wasShutdown)
+                {
+                    _logger.warn("Failed to gracefully shutdown queue recovery 
executor within permitted time period");
+                    _queueRecoveryExecutor.shutdownNow();
+                }
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+            }
+        }
+
 
         private class DistributedTransactionVisitor implements 
DistributedTransactionHandler
         {
@@ -335,7 +372,7 @@ public class AsynchronousMessageStoreRec
                 branch.setState(DtxBranch.State.PREPARED);
                 branch.prePrepareTransaction();
 
-                return true;
+                return _continueRecovery.get();
             }
 
             private StringBuilder xidAsString(Xid id)
@@ -364,7 +401,17 @@ public class AsynchronousMessageStoreRec
             @Override
             public void run()
             {
-                recoverQueue(_queue);
+                String originalThreadName = Thread.currentThread().getName();
+                Thread.currentThread().setName("Queue Recoverer : " + 
_queue.getName() + " (vh: " + getVirtualHost().getName() + ")");
+
+                try
+                {
+                    recoverQueue(_queue);
+                }
+                finally
+                {
+                    Thread.currentThread().setName(originalThreadName);
+                }
             }
         }
 
@@ -408,7 +455,7 @@ public class AsynchronousMessageStoreRec
                         txn.dequeueMessage(_queue, new 
DummyMessage(messageId));
                         txn.commitTranAsync();
                     }
-                    return true;
+                    return _continueRecovery.get();
                 }
                 else
                 {

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java?rev=1636871&r1=1636870&r2=1636871&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
 Wed Nov  5 13:32:54 2014
@@ -23,4 +23,10 @@ package org.apache.qpid.server.virtualho
 public interface MessageStoreRecoverer
 {
     void recover(VirtualHostImpl virtualHost);
+
+    /**
+     * Cancels any in-progress message store recovery.  If message store 
recovery has already
+     * completed, this method call has no effect.
+     */
+    void cancel();
 }

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java?rev=1636871&r1=1636870&r2=1636871&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
 Wed Nov  5 13:32:54 2014
@@ -113,6 +113,12 @@ public class SynchronousMessageStoreReco
 
     }
 
+    @Override
+    public void cancel()
+    {
+        // No-op
+    }
+
     private static class MessageVisitor implements MessageHandler
     {
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to