Repository: activemq-cli-tools
Updated Branches:
  refs/heads/master 35ddb07d5 -> f4ae5076b


AMQCLI-8: Adding support for multikahadb

MultiKahaDb can now be used as a source for export.


Project: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/commit/f4ae5076
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/tree/f4ae5076
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/diff/f4ae5076

Branch: refs/heads/master
Commit: f4ae5076b0256b243b3c193f37263d6bd37a79d6
Parents: 35ddb07
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Wed Mar 1 07:55:21 2017 -0500
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Wed Mar 1 07:55:21 2017 -0500

----------------------------------------------------------------------
 .../activemq/cli/kahadb/exporter/Exporter.java  | 105 +++++++++++++++++--
 .../cli/kahadb/exporter/ExporterTest.java       |  22 ++--
 .../cli/kahadb/exporter/KahaDbExporterTest.java |  38 +++++++
 .../exporter/MultiKahaDbExporterTest.java       |  60 +++++++++++
 4 files changed, 206 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/f4ae5076/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
 
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
index fee79bf..5cb7992 100644
--- 
a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
+++ 
b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
@@ -20,18 +20,26 @@ import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.OutputStream;
+import java.util.List;
+import java.util.stream.Collectors;
 import java.util.zip.GZIPOutputStream;
 
 import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamWriter;
 
 import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
 import 
org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener;
 import 
org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMetadataExporter;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * KahaDB Exporter
  */
@@ -45,15 +53,25 @@ public class Exporter {
     }
 
     public static void exportKahaDbStore(final File kahaDbDir, final File 
artemisXml) throws Exception {
-        Exporter.exportKahaDbStore(kahaDbDir, artemisXml, false);
+        Exporter.exportStore(kahaDbDir, artemisXml, false, false);
     }
 
     public static void exportKahaDbStore(final File kahaDbDir, final File 
artemisXml,
             boolean compress) throws Exception {
+        Exporter.exportStore(kahaDbDir, artemisXml, false, compress);
+    }
 
-        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
-        adapter.setDirectory(kahaDbDir);
-        adapter.start();
+    public static void exportMultiKahaDbStore(final File kahaDbDir, final File 
artemisXml) throws Exception {
+        Exporter.exportStore(kahaDbDir, artemisXml, true, false);
+    }
+
+    public static void exportMultiKahaDbStore(final File kahaDbDir, final File 
artemisXml,
+            boolean compress) throws Exception {
+        Exporter.exportStore(kahaDbDir, artemisXml, true, compress);
+    }
+
+    private static void exportStore(final File kahaDbDir, final File 
artemisXml,
+            boolean multiKaha, boolean compress) throws Exception {
 
         if (artemisXml.exists()) {
             throw new IllegalStateException("File: " + artemisXml + " already 
exists");
@@ -65,11 +83,66 @@ public class Exporter {
 
             final XMLStreamWriter xmlWriter = 
XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
             final ArtemisJournalMarshaller xmlMarshaller = new 
ArtemisJournalMarshaller(xmlWriter);
+
+            xmlMarshaller.appendJournalOpen();
+
+            if (multiKaha) {
+                appendMultiKahaDbStore(xmlMarshaller, 
getMultiKahaDbAdapter(kahaDbDir));
+            } else {
+                appendKahaDbStore(xmlMarshaller, getKahaDbAdapter(kahaDbDir));
+            }
+
+            xmlMarshaller.appendJournalClose(true);
+        }
+
+        long end = System.currentTimeMillis();
+
+        LOG.info("Total export time: " + (end - start) + " ms");
+    }
+
+
+    private static void appendMultiKahaDbStore(final ArtemisJournalMarshaller 
xmlMarshaller,
+            MultiKahaDBPersistenceAdapter multiAdapter) throws Exception {
+
+        try {
+            multiAdapter.start();
+
+            List<KahaDBExporter> dbExporters = 
multiAdapter.getAdapters().stream()
+                    .filter(adapter -> adapter instanceof 
KahaDBPersistenceAdapter)
+                    .map(adapter -> {
+                        KahaDBPersistenceAdapter kahaAdapter = 
(KahaDBPersistenceAdapter) adapter;
+                        return new KahaDBExporter(kahaAdapter,
+                              new 
ArtemisXmlMetadataExporter(kahaAdapter.getStore(), xmlMarshaller),
+                              new 
ArtemisXmlMessageRecoveryListener(kahaAdapter.getStore(), xmlMarshaller));
+            }).collect(Collectors.toList());
+
+            xmlMarshaller.appendBindingsElement();
+            for (KahaDBExporter dbExporter : dbExporters) {
+                dbExporter.exportMetadata();
+            }
+            xmlMarshaller.appendEndElement();
+
+            xmlMarshaller.appendMessagesElement();
+            for (KahaDBExporter dbExporter : dbExporters) {
+                dbExporter.exportQueues();
+                dbExporter.exportTopics();
+            }
+            xmlMarshaller.appendEndElement();
+        } finally {
+            multiAdapter.stop();
+        }
+    }
+
+    private static void appendKahaDbStore(final ArtemisJournalMarshaller 
xmlMarshaller,
+            KahaDBPersistenceAdapter adapter) throws Exception {
+
+        try {
+            adapter.start();
+
             final KahaDBExporter dbExporter = new KahaDBExporter(adapter,
                     new ArtemisXmlMetadataExporter(adapter.getStore(), 
xmlMarshaller),
                     new ArtemisXmlMessageRecoveryListener(adapter.getStore(), 
xmlMarshaller));
 
-            xmlMarshaller.appendJournalOpen();
             xmlMarshaller.appendBindingsElement();
             dbExporter.exportMetadata();
             xmlMarshaller.appendEndElement();
@@ -77,12 +150,28 @@ public class Exporter {
             dbExporter.exportQueues();
             dbExporter.exportTopics();
             xmlMarshaller.appendEndElement();
-            xmlMarshaller.appendJournalClose(true);
         } finally {
             adapter.stop();
         }
-        long end = System.currentTimeMillis();
+    }
 
-        LOG.info("Total export time: " + (end - start) + " ms");
+    private static KahaDBPersistenceAdapter getKahaDbAdapter(File dir) {
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setDirectory(dir);
+        return adapter;
+    }
+
+    private static MultiKahaDBPersistenceAdapter getMultiKahaDbAdapter(File 
dir) {
+        MultiKahaDBPersistenceAdapter adapter = new 
MultiKahaDBPersistenceAdapter();
+        adapter.setDirectory(dir);
+
+        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+        kahaStore.setDirectory(dir);
+        FilteredKahaDBPersistenceAdapter filtered = new 
FilteredKahaDBPersistenceAdapter();
+        filtered.setPersistenceAdapter(kahaStore);
+        filtered.setPerDestination(true);
+
+        adapter.setFilteredPersistenceAdapters(Lists.newArrayList(filtered));
+        return adapter;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/f4ae5076/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
 
b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
index 8efdf42..b89e035 100644
--- 
a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
+++ 
b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
@@ -65,8 +65,8 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IdGenerator;
 import org.junit.Rule;
@@ -75,13 +75,17 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ExporterTest {
+public abstract class ExporterTest {
 
     static final Logger LOG = LoggerFactory.getLogger(ExporterTest.class);
 
     @Rule
     public TemporaryFolder storeFolder = new TemporaryFolder();
 
+    public abstract PersistenceAdapter getPersistenceAdapter(File dir);
+
+    public abstract void exportStore(final File kahaDbDir, final File xmlFile) 
throws Exception;
+
     /**
      * TODO Improve test when real exporting is done, for now this just
      * tests that the recovery listener iterates over all the queue messages
@@ -93,9 +97,7 @@ public class ExporterTest {
 
         File kahaDbDir = storeFolder.newFolder();
         ActiveMQQueue queue = new ActiveMQQueue("test.queue");
-        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
-        adapter.setJournalMaxFileLength(1024 * 1024);
-        adapter.setDirectory(kahaDbDir);
+        PersistenceAdapter adapter = getPersistenceAdapter(kahaDbDir);
         adapter.start();
         MessageStore messageStore = adapter.createQueueMessageStore(queue);
         messageStore.start();
@@ -154,7 +156,7 @@ public class ExporterTest {
         adapter.stop();
 
         File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), 
"outputXml.xml");
-        Exporter.exportKahaDbStore(kahaDbDir, xmlFile);
+        exportStore(kahaDbDir, xmlFile);
 
       // printFile(xmlFile);
 
@@ -229,9 +231,7 @@ public class ExporterTest {
         File kahaDbDir = storeFolder.newFolder();
 
         ActiveMQTopic topic = new ActiveMQTopic("test.topic");
-        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
-        adapter.setJournalMaxFileLength(1024 * 1024);
-        adapter.setDirectory(kahaDbDir);
+        PersistenceAdapter adapter = getPersistenceAdapter(kahaDbDir);
         adapter.start();
         TopicMessageStore messageStore = 
adapter.createTopicMessageStore(topic);
         messageStore.start();
@@ -264,9 +264,9 @@ public class ExporterTest {
         adapter.stop();
 
         File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), 
"outputXml.xml");
-        Exporter.exportKahaDbStore(kahaDbDir, xmlFile);
+        exportStore(kahaDbDir, xmlFile);
 
-     //   printFile(xmlFile);
+        printFile(xmlFile);
 
         validate(xmlFile, 5);
 

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/f4ae5076/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/KahaDbExporterTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/KahaDbExporterTest.java
 
b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/KahaDbExporterTest.java
new file mode 100644
index 0000000..e9411af
--- /dev/null
+++ 
b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/KahaDbExporterTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter;
+
+import java.io.File;
+
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+public class KahaDbExporterTest extends ExporterTest {
+
+    @Override
+    public PersistenceAdapter getPersistenceAdapter(File dir) {
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setJournalMaxFileLength(1024 * 1024);
+        adapter.setDirectory(dir);
+        return adapter;
+    }
+
+    @Override
+    public void exportStore(File kahaDbDir, File xmlFile) throws Exception {
+        Exporter.exportKahaDbStore(kahaDbDir, xmlFile);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/f4ae5076/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java
 
b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java
new file mode 100644
index 0000000..a006cef
--- /dev/null
+++ 
b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+
+import com.google.common.collect.Lists;
+
+public class MultiKahaDbExporterTest extends ExporterTest {
+
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.cli.kahadb.exporter.ExporterTest#getPersistenceAdapter(java.io.File)
+     */
+    @Override
+    public PersistenceAdapter getPersistenceAdapter(File dir) {
+        MultiKahaDBPersistenceAdapter adapter = new 
MultiKahaDBPersistenceAdapter();
+        adapter.setJournalMaxFileLength(1024 * 1024);
+        adapter.setDirectory(dir);
+
+        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+        kahaStore.setDirectory(dir);
+        FilteredKahaDBPersistenceAdapter filtered = new 
FilteredKahaDBPersistenceAdapter();
+        filtered.setPersistenceAdapter(kahaStore);
+        filtered.setPerDestination(true);
+
+        adapter.setFilteredPersistenceAdapters(Lists.newArrayList(filtered));
+        return adapter;
+    }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.cli.kahadb.exporter.ExporterTest#exportStore(java.io.File, 
java.io.File)
+     */
+    @Override
+    public void exportStore(File kahaDbDir, File xmlFile) throws Exception {
+        Exporter.exportMultiKahaDbStore(kahaDbDir, xmlFile);
+
+    }
+
+}

Reply via email to