Author: gtully
Date: Mon Jan 16 12:47:45 2012
New Revision: 1231979
URL: http://svn.apache.org/viewvc?rev=1231979&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3639 - Modify MKahaDB To Support
Using One Adapter Per Destination Without Explicity Listing Every Desintation
In The Configuration. Add perDestination boolean attribute to mKahaDb filtered
adapter. When true, every destination will get its own persistence adapter
using the configured adapter as as template. So any config applied to the
destination less (default) adapter will be reused
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java?rev=1231979&r1=1231978&r2=1231979&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java
Mon Jan 16 12:47:45 2012
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.store.kahadb;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.filter.DestinationMapEntry;
/**
@@ -24,6 +25,16 @@ import org.apache.activemq.filter.Destin
*/
public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry {
private KahaDBPersistenceAdapter persistenceAdapter;
+ private boolean perDestination;
+
+ public FilteredKahaDBPersistenceAdapter() {
+ super();
+ }
+
+ public FilteredKahaDBPersistenceAdapter(ActiveMQDestination destination,
KahaDBPersistenceAdapter adapter) {
+ setDestination(destination);
+ persistenceAdapter = adapter;
+ }
public KahaDBPersistenceAdapter getPersistenceAdapter() {
return persistenceAdapter;
@@ -37,4 +48,12 @@ public class FilteredKahaDBPersistenceAd
public void afterPropertiesSet() throws Exception {
// ok to have no destination, we default it
}
+
+ public boolean isPerDestination() {
+ return perDestination;
+ }
+
+ public void setPerDestination(boolean perDestination) {
+ this.perDestination = perDestination;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1231979&r1=1231978&r2=1231979&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
Mon Jan 16 12:47:45 2012
@@ -241,7 +241,7 @@ public class KahaDBPersistenceAdapter im
}
public int getFailoverProducersAuditDepth() {
- return this.getFailoverProducersAuditDepth();
+ return this.letter.getFailoverProducersAuditDepth();
}
/**
@@ -558,7 +558,7 @@ public class KahaDBPersistenceAdapter im
}
public boolean isEnableIndexPageCaching() {
- return isEnableIndexPageCaching();
+ return letter.isEnableIndexPageCaching();
}
public KahaDBStore getStore() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java?rev=1231979&r1=1231978&r2=1231979&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
Mon Jan 16 12:47:45 2012
@@ -17,11 +17,14 @@
package org.apache.activemq.store.kahadb;
import java.io.File;
+import java.io.FileFilter;
import java.io.IOException;
import java.nio.charset.Charset;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@@ -44,6 +47,7 @@ import org.apache.activemq.store.kahadb.
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,12 +107,16 @@ public class MultiKahaDBPersistenceAdapt
if (filteredAdapter.getDestination() == null) {
filteredAdapter.setDestination(matchAll);
}
- if
(MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
- adapter.setDirectory(new File(getDirectory(),
nameFromDestinationFilter(filteredAdapter.getDestination())));
+
+ if (filteredAdapter.isPerDestination()) {
+ configureDirectory(adapter, null);
+ // per destination adapters will be created on demand or
during recovery
+ continue;
+ } else {
+ configureDirectory(adapter,
nameFromDestinationFilter(filteredAdapter.getDestination()));
}
- // need a per store factory that will put the store in the branch
qualifier to disiambiguate xid mbeans
-
adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
+ configureAdapter(adapter);
adapters.add(adapter);
}
super.setEntries(entries);
@@ -147,9 +155,27 @@ public class MultiKahaDBPersistenceAdapt
if (result == null) {
throw new RuntimeException("No matching persistence adapter
configured for destination: " + destination + ", options:" + adapters);
}
+ FilteredKahaDBPersistenceAdapter filteredAdapter =
(FilteredKahaDBPersistenceAdapter) result;
+ if (filteredAdapter.getDestination() == matchAll &&
filteredAdapter.isPerDestination()) {
+ result = addAdapter(filteredAdapter, destination);
+ startAdapter(((FilteredKahaDBPersistenceAdapter)
result).getPersistenceAdapter(), destination.getQualifiedName());
+ if (LOG.isTraceEnabled()) {
+ LOG.info("created per destination adapter for: " + destination
+ ", " + result);
+ }
+ }
return ((FilteredKahaDBPersistenceAdapter)
result).getPersistenceAdapter();
}
+ private void startAdapter(KahaDBPersistenceAdapter
kahaDBPersistenceAdapter, String destination) {
+ try {
+ kahaDBPersistenceAdapter.start();
+ } catch (Exception e) {
+ RuntimeException detail = new RuntimeException("Failed to start
per destination persistence adapter for destination: " + destination + ",
options:" + adapters, e);
+ LOG.error(detail.toString(), e);
+ throw detail;
+ }
+ }
+
public TopicMessageStore createTopicMessageStore(ActiveMQTopic
destination) throws IOException {
PersistenceAdapter persistenceAdapter =
getMatchingPersistenceAdapter(destination);
return
transactionStore.proxy(persistenceAdapter.createTransactionStore(),
persistenceAdapter.createTopicMessageStore(destination));
@@ -164,6 +190,7 @@ public class MultiKahaDBPersistenceAdapt
persistenceAdapter.deleteAllMessages();
}
transactionStore.deleteAllMessages();
+ IOHelper.deleteChildren(getDirectory());
}
public Set<ActiveMQDestination> getDestinations() {
@@ -223,11 +250,86 @@ public class MultiKahaDBPersistenceAdapt
}
public void start() throws Exception {
+ Object result = this.chooseValue(matchAll);
+ if (result != null) {
+ FilteredKahaDBPersistenceAdapter filteredAdapter =
(FilteredKahaDBPersistenceAdapter) result;
+ if (filteredAdapter.getDestination() == matchAll &&
filteredAdapter.isPerDestination()) {
+ findAndRegisterExistingAdapters(filteredAdapter);
+ }
+ }
for (PersistenceAdapter persistenceAdapter : adapters) {
persistenceAdapter.start();
}
}
+ private void
findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) {
+ FileFilter destinationNames = new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.getName().startsWith("queue#") ||
file.getName().startsWith("topic#");
+ }
+ };
+ File[] candidates =
template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
+ if (candidates != null) {
+ for (File candidate : candidates) {
+ registerExistingAdapter(template, candidate);
+ }
+ }
+ }
+
+ private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter
filteredAdapter, File candidate) {
+ KahaDBPersistenceAdapter adapter =
adapterFromTemplate(filteredAdapter.getPersistenceAdapter(),
candidate.getName());
+ startAdapter(adapter, candidate.getName());
+ registerAdapter(adapter, adapter.getDestinations().toArray(new
ActiveMQDestination[]{})[0]);
+ }
+
+ private FilteredKahaDBPersistenceAdapter
addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter,
ActiveMQDestination destination) {
+ KahaDBPersistenceAdapter adapter =
adapterFromTemplate(filteredAdapter.getPersistenceAdapter(),
nameFromDestinationFilter(destination));
+ return registerAdapter(adapter, destination);
+ }
+
+ private KahaDBPersistenceAdapter
adapterFromTemplate(KahaDBPersistenceAdapter template, String destinationName) {
+ KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template);
+ configureAdapter(adapter);
+ configureDirectory(adapter, destinationName);
+ return adapter;
+ }
+
+ private void configureDirectory(KahaDBPersistenceAdapter adapter, String
fileName) {
+ File directory = null;
+ if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
+ // not set so inherit from mkahadb
+ directory = getDirectory();
+ } else {
+ directory = adapter.getDirectory();
+ }
+ if (fileName != null) {
+ directory = new File(directory, fileName);
+ }
+ adapter.setDirectory(directory);
+ }
+
+ private FilteredKahaDBPersistenceAdapter
registerAdapter(KahaDBPersistenceAdapter adapter, ActiveMQDestination
destination) {
+ adapters.add(adapter);
+ FilteredKahaDBPersistenceAdapter result = new
FilteredKahaDBPersistenceAdapter(destination, adapter);
+ put(destination, result);
+ return result;
+ }
+
+ private void configureAdapter(KahaDBPersistenceAdapter adapter) {
+ // need a per store factory that will put the store in the branch
qualifier to disiambiguate xid mbeans
+
adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
+ adapter.setBrokerService(getBrokerService());
+ }
+
+ private KahaDBPersistenceAdapter
kahaDBFromTemplate(KahaDBPersistenceAdapter template) {
+ Map<String, Object> configuration = new HashMap<String, Object>();
+ IntrospectionSupport.getProperties(template, configuration, null);
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ IntrospectionSupport.setProperties(adapter, configuration);
+ return adapter;
+ }
+
public void stop() throws Exception {
for (PersistenceAdapter persistenceAdapter : adapters) {
persistenceAdapter.stop();
@@ -284,7 +386,7 @@ public class MultiKahaDBPersistenceAdapt
transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
}
- public int getJournalMaxWriteBatchSize() {
+ public int getJournalWriteBatchSize() {
return transactionStore.getJournalMaxWriteBatchSize();
}
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java?rev=1231979&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
Mon Jan 16 12:47:45 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store;
+
+import java.util.ArrayList;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+
+public class AutoStorePerDestinationTest extends StorePerDestinationTest {
+
+ // use perDestinationFlag to get multiple stores from one match all adapter
+ public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws
Exception {
+
+ MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new
MultiKahaDBPersistenceAdapter();
+ if (deleteAllMessages) {
+ multiKahaDBPersistenceAdapter.deleteAllMessages();
+ }
+ ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new
ArrayList<FilteredKahaDBPersistenceAdapter>();
+
+ FilteredKahaDBPersistenceAdapter template = new
FilteredKahaDBPersistenceAdapter();
+ template.setPersistenceAdapter(createStore(deleteAllMessages));
+ template.setPerDestination(true);
+ adapters.add(template);
+
+ multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+ brokerService = createBroker(multiKahaDBPersistenceAdapter);
+ }
+}
\ No newline at end of file
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java?rev=1231979&r1=1231978&r2=1231979&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java
Mon Jan 16 12:47:45 2012
@@ -62,7 +62,7 @@ public class StorePerDestinationTest {
return broker;
}
- private KahaDBPersistenceAdapter createStore(boolean delete) throws
IOException {
+ protected KahaDBPersistenceAdapter createStore(boolean delete) throws
IOException {
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
kaha.setJournalMaxFileLength(maxFileLength);
kaha.setCleanupInterval(5000);
@@ -199,7 +199,7 @@ public class StorePerDestinationTest {
multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
assertEquals(multiKahaDBPersistenceAdapter.getDirectory(),
storeDefault.getDirectory().getParentFile());
- assertEquals(someOtherDisk, otherStore.getDirectory());
+ assertEquals(someOtherDisk, otherStore.getDirectory().getParentFile());
}
@Test