Author: gtully
Date: Mon Jan 16 12:47:45 2012
New Revision: 1231979

URL: http://svn.apache.org/viewvc?rev=1231979&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3639 - Modify MKahaDB To Support 
Using One Adapter Per Destination Without Explicity Listing Every Desintation 
In The Configuration. Add perDestination boolean attribute to mKahaDb filtered 
adapter. When true, every destination will get its own persistence adapter 
using the configured adapter as as template. So any config applied to the 
destination less (default) adapter will be reused

Added:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
   (with props)
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java?rev=1231979&r1=1231978&r2=1231979&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
 Mon Jan 16 12:47:45 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.store.kahadb;
 
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.filter.DestinationMapEntry;
 
 /**
@@ -24,6 +25,16 @@ import org.apache.activemq.filter.Destin
  */
 public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry {
     private KahaDBPersistenceAdapter persistenceAdapter;
+    private boolean perDestination;
+
+    public FilteredKahaDBPersistenceAdapter() {
+        super();
+    }
+
+    public FilteredKahaDBPersistenceAdapter(ActiveMQDestination destination, 
KahaDBPersistenceAdapter adapter) {
+        setDestination(destination);
+        persistenceAdapter  = adapter;
+    }
 
     public KahaDBPersistenceAdapter getPersistenceAdapter() {
         return persistenceAdapter;
@@ -37,4 +48,12 @@ public class FilteredKahaDBPersistenceAd
     public void afterPropertiesSet() throws Exception {
         // ok to have no destination, we default it
     }
+
+    public boolean isPerDestination() {
+        return perDestination;
+    }
+
+    public void setPerDestination(boolean perDestination) {
+        this.perDestination = perDestination;
+    }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1231979&r1=1231978&r2=1231979&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
 Mon Jan 16 12:47:45 2012
@@ -241,7 +241,7 @@ public class KahaDBPersistenceAdapter im
     }
     
     public int getFailoverProducersAuditDepth() {
-        return this.getFailoverProducersAuditDepth();
+        return this.letter.getFailoverProducersAuditDepth();
     }
     
     /**
@@ -558,7 +558,7 @@ public class KahaDBPersistenceAdapter im
     }
 
     public boolean isEnableIndexPageCaching() {
-        return isEnableIndexPageCaching();
+        return letter.isEnableIndexPageCaching();
     }
 
     public KahaDBStore getStore() {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java?rev=1231979&r1=1231978&r2=1231979&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
 Mon Jan 16 12:47:45 2012
@@ -17,11 +17,14 @@
 package org.apache.activemq.store.kahadb;
 
 import java.io.File;
+import java.io.FileFilter;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -44,6 +47,7 @@ import org.apache.activemq.store.kahadb.
 import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.IntrospectionSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,12 +107,16 @@ public class MultiKahaDBPersistenceAdapt
             if (filteredAdapter.getDestination() == null) {
                 filteredAdapter.setDestination(matchAll);
             }
-            if 
(MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
-                adapter.setDirectory(new File(getDirectory(), 
nameFromDestinationFilter(filteredAdapter.getDestination())));
+
+            if (filteredAdapter.isPerDestination()) {
+                configureDirectory(adapter, null);
+                // per destination adapters will be created on demand or 
during recovery
+                continue;
+            } else {
+                configureDirectory(adapter, 
nameFromDestinationFilter(filteredAdapter.getDestination()));
             }
 
-            // need a per store factory that will put the store in the branch 
qualifier to disiambiguate xid mbeans
-            
adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
+            configureAdapter(adapter);
             adapters.add(adapter);
         }
         super.setEntries(entries);
@@ -147,9 +155,27 @@ public class MultiKahaDBPersistenceAdapt
         if (result == null) {
             throw new RuntimeException("No matching persistence adapter 
configured for destination: " + destination + ", options:" + adapters);
         }
+        FilteredKahaDBPersistenceAdapter filteredAdapter = 
(FilteredKahaDBPersistenceAdapter) result;
+        if (filteredAdapter.getDestination() == matchAll && 
filteredAdapter.isPerDestination()) {
+            result = addAdapter(filteredAdapter, destination);
+            startAdapter(((FilteredKahaDBPersistenceAdapter) 
result).getPersistenceAdapter(), destination.getQualifiedName());
+            if (LOG.isTraceEnabled()) {
+                LOG.info("created per destination adapter for: " + destination 
 + ", " + result);
+            }
+        }
         return ((FilteredKahaDBPersistenceAdapter) 
result).getPersistenceAdapter();
     }
 
+    private void startAdapter(KahaDBPersistenceAdapter 
kahaDBPersistenceAdapter, String destination) {
+        try {
+            kahaDBPersistenceAdapter.start();
+        } catch (Exception e) {
+            RuntimeException detail = new RuntimeException("Failed to start 
per destination persistence adapter for destination: " + destination + ", 
options:" + adapters, e);
+            LOG.error(detail.toString(), e);
+            throw detail;
+        }
+    }
+
     public TopicMessageStore createTopicMessageStore(ActiveMQTopic 
destination) throws IOException {
         PersistenceAdapter persistenceAdapter = 
getMatchingPersistenceAdapter(destination);
         return 
transactionStore.proxy(persistenceAdapter.createTransactionStore(), 
persistenceAdapter.createTopicMessageStore(destination));
@@ -164,6 +190,7 @@ public class MultiKahaDBPersistenceAdapt
             persistenceAdapter.deleteAllMessages();
         }
         transactionStore.deleteAllMessages();
+        IOHelper.deleteChildren(getDirectory());
     }
 
     public Set<ActiveMQDestination> getDestinations() {
@@ -223,11 +250,86 @@ public class MultiKahaDBPersistenceAdapt
     }
 
     public void start() throws Exception {
+        Object result = this.chooseValue(matchAll);
+        if (result != null) {
+            FilteredKahaDBPersistenceAdapter filteredAdapter = 
(FilteredKahaDBPersistenceAdapter) result;
+            if (filteredAdapter.getDestination() == matchAll && 
filteredAdapter.isPerDestination()) {
+                findAndRegisterExistingAdapters(filteredAdapter);
+            }
+        }
         for (PersistenceAdapter persistenceAdapter : adapters) {
             persistenceAdapter.start();
         }
     }
 
+    private void 
findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) {
+        FileFilter destinationNames = new FileFilter() {
+            @Override
+            public boolean accept(File file) {
+                return file.getName().startsWith("queue#") || 
file.getName().startsWith("topic#");
+            }
+        };
+        File[] candidates = 
template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
+        if (candidates != null) {
+            for (File candidate : candidates) {
+                registerExistingAdapter(template, candidate);
+            }
+        }
+    }
+
+    private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter 
filteredAdapter, File candidate) {
+        KahaDBPersistenceAdapter adapter = 
adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), 
candidate.getName());
+        startAdapter(adapter, candidate.getName());
+        registerAdapter(adapter, adapter.getDestinations().toArray(new 
ActiveMQDestination[]{})[0]);
+    }
+
+    private FilteredKahaDBPersistenceAdapter 
addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, 
ActiveMQDestination destination) {
+        KahaDBPersistenceAdapter adapter = 
adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), 
nameFromDestinationFilter(destination));
+        return registerAdapter(adapter, destination);
+    }
+
+    private KahaDBPersistenceAdapter 
adapterFromTemplate(KahaDBPersistenceAdapter template, String destinationName) {
+        KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template);
+        configureAdapter(adapter);
+        configureDirectory(adapter, destinationName);
+        return adapter;
+    }
+
+    private void configureDirectory(KahaDBPersistenceAdapter adapter, String 
fileName) {
+        File directory = null;
+        if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
+            // not set so inherit from mkahadb
+            directory = getDirectory();
+        } else {
+            directory = adapter.getDirectory();
+        }
+        if (fileName != null) {
+            directory = new File(directory, fileName);
+        }
+        adapter.setDirectory(directory);
+    }
+
+    private FilteredKahaDBPersistenceAdapter 
registerAdapter(KahaDBPersistenceAdapter adapter, ActiveMQDestination 
destination) {
+        adapters.add(adapter);
+        FilteredKahaDBPersistenceAdapter result = new 
FilteredKahaDBPersistenceAdapter(destination, adapter);
+        put(destination, result);
+        return result;
+    }
+
+    private void configureAdapter(KahaDBPersistenceAdapter adapter) {
+        // need a per store factory that will put the store in the branch 
qualifier to disiambiguate xid mbeans
+        
adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
+        adapter.setBrokerService(getBrokerService());
+    }
+
+    private KahaDBPersistenceAdapter 
kahaDBFromTemplate(KahaDBPersistenceAdapter template) {
+        Map<String, Object> configuration = new HashMap<String, Object>();
+        IntrospectionSupport.getProperties(template, configuration, null);
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        IntrospectionSupport.setProperties(adapter, configuration);
+        return adapter;
+    }
+
     public void stop() throws Exception {
         for (PersistenceAdapter persistenceAdapter : adapters) {
             persistenceAdapter.stop();
@@ -284,7 +386,7 @@ public class MultiKahaDBPersistenceAdapt
         transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
     }
 
-    public int getJournalMaxWriteBatchSize() {
+    public int getJournalWriteBatchSize() {
         return transactionStore.getJournalMaxWriteBatchSize();
     }
 

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java?rev=1231979&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
 Mon Jan 16 12:47:45 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import java.util.ArrayList;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+
+public class AutoStorePerDestinationTest extends StorePerDestinationTest {
+
+    // use perDestinationFlag to get multiple stores from one match all adapter
+    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws 
Exception {
+
+        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new 
MultiKahaDBPersistenceAdapter();
+        if (deleteAllMessages) {
+            multiKahaDBPersistenceAdapter.deleteAllMessages();
+        }
+        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new 
ArrayList<FilteredKahaDBPersistenceAdapter>();
+
+        FilteredKahaDBPersistenceAdapter template = new 
FilteredKahaDBPersistenceAdapter();
+        template.setPersistenceAdapter(createStore(deleteAllMessages));
+        template.setPerDestination(true);
+        adapters.add(template);
+
+        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+        brokerService  = createBroker(multiKahaDBPersistenceAdapter);
+    }
+}
\ No newline at end of file

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java?rev=1231979&r1=1231978&r2=1231979&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
 Mon Jan 16 12:47:45 2012
@@ -62,7 +62,7 @@ public class StorePerDestinationTest  {
         return broker;
     }
 
-    private KahaDBPersistenceAdapter createStore(boolean delete) throws 
IOException {
+    protected KahaDBPersistenceAdapter createStore(boolean delete) throws 
IOException {
         KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
         kaha.setJournalMaxFileLength(maxFileLength);
         kaha.setCleanupInterval(5000);
@@ -199,7 +199,7 @@ public class StorePerDestinationTest  {
         multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
 
         assertEquals(multiKahaDBPersistenceAdapter.getDirectory(), 
storeDefault.getDirectory().getParentFile());
-        assertEquals(someOtherDisk, otherStore.getDirectory());
+        assertEquals(someOtherDisk, otherStore.getDirectory().getParentFile());
     }
 
     @Test


Reply via email to