Author: tabish
Date: Tue May 22 19:47:23 2012
New Revision: 1341601

URL: http://svn.apache.org/viewvc?rev=1341601&view=rev
Log:
Fix for: https://issues.apache.org/jira/browse/AMQ-3841

Ensure that the mKahaDB cleans up the per-destination kahaDB data when the 
destination is deleted, and don't throw exceptions in cases where we find an 
older one that has no destinations in it any longer.

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java?rev=1341601&r1=1341600&r2=1341601&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
 Tue May 22 19:47:23 2012
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.ConnectionContext;
@@ -176,6 +177,16 @@ public class MultiKahaDBPersistenceAdapt
         }
     }
 
+    private void stopAdapter(KahaDBPersistenceAdapter 
kahaDBPersistenceAdapter, String destination) {
+        try {
+            kahaDBPersistenceAdapter.stop();
+        } catch (Exception e) {
+            RuntimeException detail = new RuntimeException("Failed to stop per 
destination persistence adapter for destination: " + destination + ", options:" 
+ adapters, e);
+            LOG.error(detail.toString(), e);
+            throw detail;
+        }
+    }
+
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic 
destination) throws IOException {
         PersistenceAdapter persistenceAdapter = 
getMatchingPersistenceAdapter(destination);
         return 
transactionStore.proxy(persistenceAdapter.createTransactionStore(), 
persistenceAdapter.createTopicMessageStore(destination));
@@ -218,11 +229,38 @@ public class MultiKahaDBPersistenceAdapt
     }
 
     public void removeQueueMessageStore(ActiveMQQueue destination) {
-        
getMatchingPersistenceAdapter(destination).removeQueueMessageStore(destination);
+        PersistenceAdapter adapter = 
getMatchingPersistenceAdapter(destination);
+        adapter.removeQueueMessageStore(destination);
+        if (adapter instanceof KahaDBPersistenceAdapter) {
+            adapter.removeQueueMessageStore(destination);
+            removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
+        }
     }
 
     public void removeTopicMessageStore(ActiveMQTopic destination) {
-        
getMatchingPersistenceAdapter(destination).removeTopicMessageStore(destination);
+        PersistenceAdapter adapter = 
getMatchingPersistenceAdapter(destination);
+        if (adapter instanceof KahaDBPersistenceAdapter) {
+            adapter.removeTopicMessageStore(destination);
+            removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
+        }
+    }
+
+    private void removeMessageStore(KahaDBPersistenceAdapter adapter, 
ActiveMQDestination destination) {
+        if (adapter.getDestinations().isEmpty()) {
+            stopAdapter(adapter, destination.toString());
+            File adapterDir = adapter.getDirectory();
+            if (adapterDir != null) {
+                if (IOHelper.deleteFile(adapterDir)) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.info("deleted per destination adapter directory 
for: " + destination);
+                    }
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.info("failed to deleted per destination adapter 
directory for: " + destination);
+                    }
+                }
+            }
+        }
     }
 
     public void rollbackTransaction(ConnectionContext context) throws 
IOException {
@@ -280,7 +318,11 @@ public class MultiKahaDBPersistenceAdapt
     private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter 
filteredAdapter, File candidate) {
         KahaDBPersistenceAdapter adapter = 
adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), 
candidate.getName());
         startAdapter(adapter, candidate.getName());
-        registerAdapter(adapter, adapter.getDestinations().toArray(new 
ActiveMQDestination[]{})[0]);
+        if (adapter.getDestinations().size() != 0) {
+            registerAdapter(adapter, adapter.getDestinations().toArray(new 
ActiveMQDestination[]{})[0]);
+        } else {
+            stopAdapter(adapter, candidate.getName());
+        }
     }
 
     private FilteredKahaDBPersistenceAdapter 
addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, 
ActiveMQDestination destination) {

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java?rev=1341601&r1=1341600&r2=1341601&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java
 Tue May 22 19:47:23 2012
@@ -85,6 +85,10 @@ public class AMQ3841Test {
 
         prepareBrokerWithMultiStore(false);
         broker.start();
+
+        broker.getAdminView().addQueue(destinationName);
+        assertNotNull(broker.getDestination(new 
ActiveMQQueue(destinationName)));
+
     }
 
     protected KahaDBPersistenceAdapter createStore(boolean delete) throws 
IOException {


Reply via email to