Repository: activemq-cli-tools Updated Branches: refs/heads/master a7214cc98 -> 5637f0414
AMQCLI-3 - Cleanup and add tests Project: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/commit/5637f041 Tree: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/tree/5637f041 Diff: http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/diff/5637f041 Branch: refs/heads/master Commit: 5637f0414f7109f383293947f3f124f743ef45fb Parents: a7214cc Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Wed Mar 8 13:48:25 2017 -0500 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Wed Mar 8 13:48:25 2017 -0500 ---------------------------------------------------------------------- .../activemq/cli/kahadb/exporter/Exporter.java | 1 - .../ArtemisXmlMessageRecoveryListener.java | 4 +- .../OpenWireCoreMessageTypeConverter.java | 132 ++++++++++++++++++ .../artemis/OpenWireMessageTypeConverter.java | 117 ---------------- .../exporter/MultiKahaDbExporterTest.java | 3 - .../OpenWireCoreMessageTypeConverterTest.java | 137 +++++++++++++++++++ 6 files changed, 271 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/5637f041/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java index 0022d51..ecb7abd 100644 --- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java @@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; -import io.airlift.airline.Arguments; import io.airlift.airline.Cli; import io.airlift.airline.Cli.CliBuilder; import io.airlift.airline.Command; http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/5637f041/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java index 93bd439..b17d505 100644 --- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java @@ -32,7 +32,7 @@ public class ArtemisXmlMessageRecoveryListener implements MessageRecoveryListene static final Logger LOG = LoggerFactory.getLogger(ArtemisXmlMessageRecoveryListener.class); private final ArtemisJournalMarshaller xmlMarshaller; - private final OpenWireMessageTypeConverter converter; + private final OpenWireCoreMessageTypeConverter converter; /** * @param file @@ -41,7 +41,7 @@ public class ArtemisXmlMessageRecoveryListener implements MessageRecoveryListene final ArtemisJournalMarshaller xmlMarshaller) { super(); this.xmlMarshaller = xmlMarshaller; - this.converter = new OpenWireMessageTypeConverter(store); + this.converter = new OpenWireCoreMessageTypeConverter(store); } http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/5637f041/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverter.java new file mode 100644 index 0000000..af4d363 --- /dev/null +++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverter.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.cli.kahadb.exporter.artemis; + +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporterUtil; +import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.cli.kahadb.exporter.OpenWireExportConverter; +import org.apache.activemq.cli.schema.BodyType; +import org.apache.activemq.cli.schema.MessageType; +import org.apache.activemq.cli.schema.PropertiesType; +import org.apache.activemq.cli.schema.PropertyType; +import org.apache.activemq.cli.schema.QueueType; +import org.apache.activemq.cli.schema.QueuesType; +import org.apache.activemq.command.Message; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.store.kahadb.KahaDBUtil; + +/** + * Message Converter that first converts an OpenWire message to a Core Message and then uses + * the Core message to convert to an Artemis XML MessageType. + */ +public class OpenWireCoreMessageTypeConverter implements OpenWireExportConverter<MessageType> { + + private final OpenWireMessageConverter converter = new OpenWireMessageConverter(new OpenWireFormat()); + private final KahaDBStore store; + + /** + * @param store + */ + public OpenWireCoreMessageTypeConverter(KahaDBStore store) { + super(); + this.store = store; + } + + public OpenWireCoreMessageTypeConverter() { + this(null); + } + + /* (non-Javadoc) + * @see org.apache.activemq.cli.kahadb.exporter.MessageConverter#convert(org.apache.activemq.Message) + */ + @Override + public MessageType convert(final Message message) throws Exception { + final ICoreMessage serverMessage = (ICoreMessage) converter.inbound(message); + final MessageType messageType = convertAttributes(serverMessage); + + try { + if (!message.getProperties().isEmpty()) { + final PropertiesType propertiesType = new PropertiesType(); + serverMessage.getPropertyNames().forEach(key -> { + Object value = serverMessage.getObjectProperty(key); + propertiesType.getProperty().add(PropertyType.builder() + .withName(key.toString()) + .withValueAttribute(XmlDataExporterUtil.convertProperty(value)) + .withType(XmlDataExporterUtil.getPropertyType(value)) + .build()); + }); + messageType.setProperties(propertiesType); + } + + messageType.setQueues(convertQueues(message)); + messageType.setBody(convertBody(serverMessage)); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + + return messageType; + } + + /** + * Determine the destinations associated with this message + * Will be one destination for a Queue message or 1 or more for a Topic + * + * @param message + * @return + * @throws Exception + */ + private QueuesType convertQueues(final Message message) throws Exception { + if (store == null || message.getDestination().isQueue()) { + return QueuesType.builder() + .withQueue(QueueType.builder() + .withName(message.getDestination().getPhysicalName()).build()) + .build(); + } else { + final QueuesType.Builder<Void> queuesBuilder = QueuesType.builder(); + + KahaDBUtil.getUnackedSubscriptions(store, message).forEach(sub -> { + queuesBuilder.addQueue(QueueType.builder().withName( + ActiveMQDestination.createQueueNameForDurableSubscription( + true, sub.getClientId(), sub.getSubcriptionName())).build()); + }); + + return queuesBuilder.build(); + } + } + + private BodyType convertBody(final ICoreMessage serverMessage) throws Exception { + String value = XmlDataExporterUtil.encodeMessageBody(serverMessage); + + //requires CDATA + return BodyType.builder() + .withValue("<![CDATA[" + value + "]]>") + .build(); + } + + private MessageType convertAttributes(final ICoreMessage message) { + MessageType messageType = MessageType.builder() + .withId(message.getMessageID()) + .withTimestamp(message.getTimestamp()) + .withPriority(message.getPriority()) + .withType(XmlDataExporterUtil.getMessagePrettyType(message.getType())).build(); + + return messageType; + } +} http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/5637f041/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java deleted file mode 100644 index 662062a..0000000 --- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.cli.kahadb.exporter.artemis; - -import org.apache.activemq.artemis.api.core.ICoreMessage; -import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporterUtil; -import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; -import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.activemq.cli.kahadb.exporter.OpenWireExportConverter; -import org.apache.activemq.cli.schema.BodyType; -import org.apache.activemq.cli.schema.MessageType; -import org.apache.activemq.cli.schema.PropertiesType; -import org.apache.activemq.cli.schema.PropertyType; -import org.apache.activemq.cli.schema.QueueType; -import org.apache.activemq.cli.schema.QueuesType; -import org.apache.activemq.command.Message; -import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.store.kahadb.KahaDBStore; -import org.apache.activemq.store.kahadb.KahaDBUtil; - -public class OpenWireMessageTypeConverter implements OpenWireExportConverter<MessageType> { - - private final OpenWireMessageConverter converter = new OpenWireMessageConverter(new OpenWireFormat()); - private final KahaDBStore store; - - - /** - * @param store - */ - public OpenWireMessageTypeConverter(KahaDBStore store) { - super(); - this.store = store; - } - - /* (non-Javadoc) - * @see org.apache.activemq.cli.kahadb.exporter.MessageConverter#convert(org.apache.activemq.Message) - */ - @Override - public MessageType convert(final Message message) throws Exception { - final ICoreMessage serverMessage = (ICoreMessage) converter.inbound(message); - final MessageType messageType = convertAttributes(serverMessage); - - try { - if (!message.getProperties().isEmpty()) { - final PropertiesType propertiesType = new PropertiesType(); - serverMessage.getPropertyNames().forEach(key -> { - Object value = serverMessage.getObjectProperty(key); - propertiesType.getProperty().add(PropertyType.builder() - .withName(key.toString()) - .withValueAttribute(XmlDataExporterUtil.convertProperty(value)) - .withType(XmlDataExporterUtil.getPropertyType(value)) - .build()); - }); - messageType.setProperties(propertiesType); - } - - messageType.setQueues(convertQueue(message)); - messageType.setBody(convertBody(serverMessage)); - } catch (Exception e) { - throw new IllegalStateException(e.getMessage(), e); - } - - return messageType; - } - - private QueuesType convertQueue(final Message message) throws Exception { - if (message.getDestination().isQueue()) { - return QueuesType.builder() - .withQueue(QueueType.builder() - .withName(message.getDestination().getPhysicalName()).build()) - .build(); - } else { - final QueuesType.Builder<Void> queuesBuilder = QueuesType.builder(); - - KahaDBUtil.getUnackedSubscriptions(store, message).forEach(sub -> { - queuesBuilder.addQueue(QueueType.builder().withName( - ActiveMQDestination.createQueueNameForDurableSubscription( - true, sub.getClientId(), sub.getSubcriptionName())).build()); - }); - - return queuesBuilder.build(); - } - } - - private BodyType convertBody(final ICoreMessage serverMessage) throws Exception { - String value = XmlDataExporterUtil.encodeMessageBody(serverMessage); - - //requires CDATA - return BodyType.builder() - .withValue("<![CDATA[" + value + "]]>") - .build(); - } - - private MessageType convertAttributes(final ICoreMessage message) { - MessageType messageType = MessageType.builder() - .withId(message.getMessageID()) - .withTimestamp(message.getTimestamp()) - .withPriority(message.getPriority()) - .withType(XmlDataExporterUtil.getMessagePrettyType(message.getType())).build(); - - return messageType; - } -} http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/5637f041/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java index a006cef..31bfc8f 100644 --- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java +++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java @@ -17,8 +17,6 @@ package org.apache.activemq.cli.kahadb.exporter; import java.io.File; -import java.util.ArrayList; -import java.util.List; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; @@ -54,7 +52,6 @@ public class MultiKahaDbExporterTest extends ExporterTest { @Override public void exportStore(File kahaDbDir, File xmlFile) throws Exception { Exporter.exportMultiKahaDbStore(kahaDbDir, xmlFile); - } } http://git-wip-us.apache.org/repos/asf/activemq-cli-tools/blob/5637f041/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java new file mode 100644 index 0000000..8b327a6 --- /dev/null +++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.cli.kahadb.exporter.artemis; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.activemq.artemis.cli.commands.tools.XmlDataConstants; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.cli.schema.MessageType; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.util.IdGenerator; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class OpenWireCoreMessageTypeConverterTest { + + @Rule + public TemporaryFolder storeFolder = new TemporaryFolder(); + + //Adapter used for durable subscription conversion to know which messages haven't been acked + protected KahaDBPersistenceAdapter adapter; + protected KahaDBStore store; + protected ConnectionContext context = new ConnectionContext(); + protected IdGenerator id = new IdGenerator(); + + @Before + public void before() throws Exception { + adapter = new KahaDBPersistenceAdapter(); + adapter.setJournalMaxFileLength(1024 * 1024); + adapter.setDirectory(storeFolder.getRoot()); + adapter.start(); + store = adapter.getStore(); + } + + @After + public void after() throws Exception { + adapter.stop(); + } + + @Test + public void test() throws Exception { + + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText("test"); + message.setDestination(new ActiveMQQueue("test.queue")); + message.setMessageId(new MessageId(id.generateId() + ":1", 0)); + + OpenWireCoreMessageTypeConverter c = new OpenWireCoreMessageTypeConverter(); + MessageType messageType = c.convert(message); + + assertEquals(XmlDataConstants.TEXT_TYPE_PRETTY, messageType.getType()); + assertEquals("test.queue", messageType.getQueues().getQueue().get(0).getName()); + } + + + @Test + public void testTopicNoStore() throws Exception { + + String topicName = "test.topic"; + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText("test"); + message.setDestination(new ActiveMQTopic(topicName)); + message.setMessageId(new MessageId(id.generateId() + ":1", 0)); + + OpenWireCoreMessageTypeConverter c = new OpenWireCoreMessageTypeConverter(); + MessageType messageType = c.convert(message); + + assertEquals(XmlDataConstants.TEXT_TYPE_PRETTY, messageType.getType()); + assertEquals(topicName, messageType.getQueues().getQueue().get(0).getName()); + } + + @Test + public void testTopicWithStoreNoSubscriptions() throws Exception { + + String topicName = "test.topic"; + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText("test"); + message.setDestination(new ActiveMQTopic(topicName)); + message.setMessageId(new MessageId(id.generateId() + ":1", 0)); + + TopicMessageStore ms = adapter.createTopicMessageStore(new ActiveMQTopic(topicName)); + ms.addMessage(context, message); + + OpenWireCoreMessageTypeConverter c = new OpenWireCoreMessageTypeConverter(store); + MessageType messageType = c.convert(message); + + assertEquals(XmlDataConstants.TEXT_TYPE_PRETTY, messageType.getType()); + assertTrue(messageType.getQueues().getQueue().isEmpty()); + } + + @Test + public void testTopicWithStoreOneSub() throws Exception { + + String topicName = "test.topic"; + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText("test"); + message.setDestination(new ActiveMQTopic(topicName)); + message.setMessageId(new MessageId(id.generateId() + ":1", 0)); + + TopicMessageStore ms = adapter.createTopicMessageStore(new ActiveMQTopic(topicName)); + ms.addSubscription(new SubscriptionInfo("clientId", "subName"), false); + ms.addMessage(context, message); + + OpenWireCoreMessageTypeConverter c = new OpenWireCoreMessageTypeConverter(store); + MessageType messageType = c.convert(message); + + assertEquals(XmlDataConstants.TEXT_TYPE_PRETTY, messageType.getType()); + assertEquals(ActiveMQDestination.createQueueNameForDurableSubscription(true, "clientId", "subName"), + messageType.getQueues().getQueue().get(0).getName()); + } +}