http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java index 6911e4f..8fed042 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/ListNode.java @@ -66,14 +66,17 @@ public final class ListNode<Key, Value> { this.value = value; } + @Override public Key getKey() { return key; } + @Override public Value getValue() { return value; } + @Override public Value setValue(Value value) { Value oldValue = this.value; this.value = value; @@ -98,10 +101,12 @@ public final class ListNode<Key, Value> { index = current.getContainingList(); } + @Override public boolean hasNext() { return nextEntry != null; } + @Override public ListNode<Key, Value> next() { ListNode<Key, Value> current = nextEntry; if (current != null) { @@ -121,6 +126,7 @@ public final class ListNode<Key, Value> { return current; } + @Override public void remove() { throw new UnsupportedOperationException(); } @@ -171,6 +177,7 @@ public final class ListNode<Key, Value> { return result; } + @Override public boolean hasNext() { if (nextEntry == null) { nextEntry = getFromNextNode(); @@ -178,6 +185,7 @@ public final class ListNode<Key, Value> { return nextEntry != null; } + @Override public Entry<Key, Value> next() { if (nextEntry != null) { entryToRemove = nextEntry; @@ -188,6 +196,7 @@ public final class ListNode<Key, Value> { } } + @Override public void remove() { if (entryToRemove == null) { throw new IllegalStateException("can only remove once, call hasNext();next() again"); @@ -228,7 +237,7 @@ public final class ListNode<Key, Value> { currentNode = previousNode; } } - targetList.onRemove(); + targetList.onRemove(entryToRemove); if (toRemoveNode != null) { tx.free(toRemoveNode.getPage()); @@ -262,6 +271,7 @@ public final class ListNode<Key, Value> { this.valueMarshaller = valueMarshaller; } + @Override public void writePayload(ListNode<Key, Value> node, DataOutput os) throws IOException { os.writeLong(node.next); short count = (short) node.entries.size(); // cast may truncate @@ -279,6 +289,7 @@ public final class ListNode<Key, Value> { } } + @Override @SuppressWarnings({ "unchecked", "rawtypes" }) public ListNode<Key, Value> readPayload(DataInput is) throws IOException { ListNode<Key, Value> node = new ListNode<Key, Value>();
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java index eafb2ac..b45692a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/plist/PListImpl.java @@ -21,22 +21,22 @@ import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.apache.activemq.broker.region.MessageReference; -import org.apache.activemq.command.Message; +import org.apache.activemq.management.SizeStatisticImpl; import org.apache.activemq.store.PList; import org.apache.activemq.store.PListEntry; import org.apache.activemq.store.kahadb.disk.index.ListIndex; +import org.apache.activemq.store.kahadb.disk.index.ListNode; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.page.Transaction; -import org.apache.activemq.util.ByteSequence; import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; -import org.apache.activemq.wireformat.WireFormat; +import org.apache.activemq.util.ByteSequence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +45,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList { final PListStoreImpl store; private String name; Object indexLock; + private final SizeStatisticImpl messageSize; PListImpl(PListStoreImpl store) { this.store = store; @@ -52,6 +53,9 @@ public class PListImpl extends ListIndex<String, Location> implements PList { setPageFile(store.getPageFile()); setKeyMarshaller(StringMarshaller.INSTANCE); setValueMarshaller(LocationMarshaller.INSTANCE); + + messageSize = new SizeStatisticImpl("messageSize", "The size in bytes of the pending messages"); + messageSize.setEnabled(true); } public void setName(String name) { @@ -75,6 +79,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList { public synchronized void destroy() throws IOException { synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override public void execute(Transaction tx) throws IOException { clear(tx); unload(tx); @@ -100,6 +105,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList { final Location location = this.store.write(bs, false); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override public void execute(Transaction tx) throws IOException { add(tx, id, location); } @@ -113,6 +119,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList { final Location location = this.store.write(bs, false); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override public void execute(Transaction tx) throws IOException { addFirst(tx, id, location); } @@ -133,6 +140,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList { final AtomicBoolean result = new AtomicBoolean(); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override public void execute(Transaction tx) throws IOException { result.set(remove(tx, id) != null); } @@ -145,6 +153,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList { final AtomicBoolean result = new AtomicBoolean(); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override public void execute(Transaction tx) throws IOException { Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position); if (iterator.hasNext()) { @@ -165,6 +174,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList { final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>(); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override public void execute(Transaction tx) throws IOException { Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position); ref.set(iterator.next()); @@ -183,6 +193,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList { final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>(); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override public void execute(Transaction tx) throws IOException { ref.set(getFirst(tx)); } @@ -200,6 +211,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList { final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>(); synchronized (indexLock) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override public void execute(Transaction tx) throws IOException { ref.set(getLast(tx)); } @@ -270,6 +282,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList { } } + @Override public void release() { try { tx.rollback(); @@ -285,6 +298,7 @@ public class PListImpl extends ListIndex<String, Location> implements PList { synchronized (indexLock) { if (loaded.get()) { this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override public void execute(Transaction tx) throws IOException { Iterator<Map.Entry<String,Location>> iterator = iterator(tx); while (iterator.hasNext()) { @@ -298,6 +312,51 @@ public class PListImpl extends ListIndex<String, Location> implements PList { } @Override + public long messageSize() { + return messageSize.getTotalSize(); + } + + @Override + public synchronized Location add(Transaction tx, String key, Location value) + throws IOException { + Location location = super.add(tx, key, value); + messageSize.addSize(value.getSize()); + return location; + } + + @Override + public synchronized Location addFirst(Transaction tx, String key, + Location value) throws IOException { + Location location = super.addFirst(tx, key, value); + messageSize.addSize(value.getSize()); + return location; + } + + @Override + public synchronized void clear(Transaction tx) throws IOException { + messageSize.reset(); + super.clear(tx); + } + + @Override + protected synchronized void onLoad(ListNode<String, Location> node, Transaction tx) { + try { + Iterator<Entry<String, Location>> i = node.iterator(tx); + while (i.hasNext()) { + messageSize.addSize(i.next().getValue().getSize()); + } + } catch (IOException e) { + LOG.warn("could not increment message size", e); + } + } + + @Override + public void onRemove(Entry<String, Location> removed) { + super.onRemove(removed); + messageSize.addSize(-removed.getValue().getSize()); + } + + @Override public String toString() { return name + "[headPageId=" + getHeadPageId() + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]"; } http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala ---------------------------------------------------------------------- diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala index 7c2d327..a4cdcac 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala @@ -1008,6 +1008,11 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P case None => 0 } } + + def getMessageSize(clientId: String, subscriptionName: String): Long = { + check_running + return 0 + } } class LevelDBPList(val name: String, val key: Long) extends PList { @@ -1066,6 +1071,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P def isEmpty = size()==0 def size(): Long = listSize.get() + def messageSize(): Long = 0 def iterator() = new PListIterator() { check_running http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java index 99382d0..6cef709 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java @@ -228,6 +228,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase { } @Override + public long getPendingMessageSize() { + return 0; + } + + @Override public int getPrefetchSize() { return 0; } @@ -354,10 +359,12 @@ public class QueueDuplicatesFromStoreTest extends TestCase { return 0; } + @Override public void incrementConsumedCount(){ } + @Override public void resetConsumedCount(){ } http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java index 2541a64..207ecda 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/SubscriptionAddRemoveQueueTest.java @@ -102,6 +102,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase { public void testNoDispatchToRemovedConsumers() throws Exception { final AtomicInteger producerId = new AtomicInteger(); Runnable sender = new Runnable() { + @Override public void run() { AtomicInteger id = new AtomicInteger(); int producerIdAndIncrement = producerId.getAndIncrement(); @@ -120,6 +121,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase { }; Runnable subRemover = new Runnable() { + @Override public void run() { for (Subscription sub : subs) { try { @@ -177,10 +179,12 @@ public class SubscriptionAddRemoveQueueTest extends TestCase { List<MessageReference> dispatched = Collections.synchronizedList(new ArrayList<MessageReference>()); + @Override public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { } + @Override public void add(MessageReference node) throws Exception { // immediate dispatch QueueMessageReference qmr = (QueueMessageReference)node; @@ -188,6 +192,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase { dispatched.add(qmr); } + @Override public ConnectionContext getContext() { return null; } @@ -228,76 +233,100 @@ public class SubscriptionAddRemoveQueueTest extends TestCase { public void resetConsumedCount() { } + @Override public void add(ConnectionContext context, Destination destination) throws Exception { } + @Override public void destroy() { } + @Override public void gc() { } + @Override public ConsumerInfo getConsumerInfo() { return info; } + @Override public long getDequeueCounter() { return 0; } + @Override public long getDispatchedCounter() { return 0; } + @Override public int getDispatchedQueueSize() { return 0; } + @Override public long getEnqueueCounter() { return 0; } + @Override public int getInFlightSize() { return 0; } + @Override public int getInFlightUsage() { return 0; } + @Override public ObjectName getObjectName() { return null; } + @Override public int getPendingQueueSize() { return 0; } + @Override + public long getPendingMessageSize() { + return 0; + } + + @Override public int getPrefetchSize() { return 0; } + @Override public String getSelector() { return null; } + @Override public boolean isBrowser() { return false; } + @Override public boolean isFull() { return false; } + @Override public boolean isHighWaterMark() { return false; } + @Override public boolean isLowWaterMark() { return false; } + @Override public boolean isRecoveryRequired() { return false; } @@ -306,19 +335,23 @@ public class SubscriptionAddRemoveQueueTest extends TestCase { return false; } + @Override public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { return true; } + @Override public boolean matches(ActiveMQDestination destination) { return false; } + @Override public void processMessageDispatchNotification( MessageDispatchNotification mdn) throws Exception { } + @Override public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { return null; @@ -329,34 +362,42 @@ public class SubscriptionAddRemoveQueueTest extends TestCase { return false; } + @Override public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { return new ArrayList<MessageReference>(dispatched); } + @Override public void setObjectName(ObjectName objectName) { } + @Override public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException { } + @Override public void updateConsumerPrefetch(int newPrefetch) { } + @Override public boolean addRecoveredMessage(ConnectionContext context, MessageReference message) throws Exception { return false; } + @Override public ActiveMQDestination getActiveMQDestination() { return null; } + @Override public int getLockPriority() { return 0; } + @Override public boolean isLockExclusive() { return false; } @@ -367,6 +408,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase { public void removeDestination(Destination destination) { } + @Override public int countBeforeFull() { return 10; } http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java new file mode 100644 index 0000000..5d0a82c --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java @@ -0,0 +1,547 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSession; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DurableTopicSubscription; +import org.apache.activemq.broker.region.TopicSubscription; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.AbstractStoreStatTestSupport; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.util.SubscriptionKey; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This test checks that KahaDB properly sets the new storeMessageSize statistic. + * + * AMQ-5748 + * + */ +public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStatTestSupport { + protected static final Logger LOG = LoggerFactory + .getLogger(AbstractPendingMessageCursorTest.class); + + + protected BrokerService broker; + protected URI brokerConnectURI; + protected String defaultQueueName = "test.queue"; + protected String defaultTopicName = "test.topic"; + protected static int maxMessageSize = 1000; + + @Before + public void startBroker() throws Exception { + setUpBroker(true); + } + + protected void setUpBroker(boolean clearDataDir) throws Exception { + + broker = new BrokerService(); + this.initPersistence(broker); + //set up a transport + TransportConnector connector = broker + .addConnector(new TransportConnector()); + connector.setUri(new URI("tcp://0.0.0.0:0")); + connector.setName("tcp"); + + PolicyEntry policy = new PolicyEntry(); + policy.setTopicPrefetch(100); + policy.setDurableTopicPrefetch(100); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + + broker.start(); + broker.waitUntilStarted(); + brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri(); + + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + @Override + protected BrokerService getBroker() { + return this.broker; + } + + @Override + protected URI getBrokerConnectURI() { + return this.brokerConnectURI; + } + + protected abstract void initPersistence(BrokerService brokerService) throws IOException; + + @Test + public void testQueueMessageSize() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize); + verifyPendingStats(dest, 200, publishedMessageSize.get()); + verifyStoreStats(dest, 200, publishedMessageSize.get()); + } + + @Test + public void testQueueBrowserMessageSize() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize); + browseTestQueueMessages(dest.getName()); + verifyPendingStats(dest, 200, publishedMessageSize.get()); + verifyStoreStats(dest, 200, publishedMessageSize.get()); + } + + @Test + public void testQueueMessageSizeNonPersistent() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, + DeliveryMode.NON_PERSISTENT, publishedMessageSize); + verifyPendingStats(dest, 200, publishedMessageSize.get()); + } + + @Test + public void testQueueMessageSizePersistentAndNonPersistent() throws Exception { + AtomicLong publishedNonPersistentMessageSize = new AtomicLong(); + AtomicLong publishedMessageSize = new AtomicLong(); + + org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(100, + DeliveryMode.PERSISTENT, publishedMessageSize); + dest = publishTestQueueMessages(100, + DeliveryMode.NON_PERSISTENT, publishedNonPersistentMessageSize); + verifyPendingStats(dest, 200, publishedMessageSize.get() + publishedNonPersistentMessageSize.get()); + verifyStoreStats(dest, 100, publishedMessageSize.get()); + } + + @Test + public void testQueueMessageSizeAfterConsumption() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, publishedMessageSize); + verifyPendingStats(dest, 200, publishedMessageSize.get()); + + consumeTestQueueMessages(); + + verifyPendingStats(dest, 0, 0); + verifyStoreStats(dest, 0, 0); + } + + @Test + public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + org.apache.activemq.broker.region.Queue dest = publishTestQueueMessages(200, DeliveryMode.NON_PERSISTENT, publishedMessageSize); + verifyPendingStats(dest, 200, publishedMessageSize.get()); + + consumeTestQueueMessages(); + + verifyPendingStats(dest, 0, 0); + verifyStoreStats(dest, 0, 0); + } + + @Test(timeout=100000) + public void testTopicMessageSize() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName)); + + org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(200, publishedMessageSize); + + //verify the count and size - there is a prefetch of 100 so only 100 are pending and 100 + //are dispatched because we have an active consumer online + //verify that the size is greater than 100 messages times the minimum size of 100 + verifyPendingStats(dest, 100, 100 * 100); + + //consume all messages + consumeTestMessages(consumer, 200); + + //All messages should now be gone + verifyPendingStats(dest, 0, 0); + + connection.close(); + } + + @Test(timeout=100000) + public void testTopicNonPersistentMessageSize() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName)); + + org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(200, + DeliveryMode.NON_PERSISTENT, publishedMessageSize); + + //verify the count and size - there is a prefetch of 100 so only 100 are pending and 100 + //are dispatched because we have an active consumer online + //verify the size is at least as big as 100 messages times the minimum of 100 size + verifyPendingStats(dest, 100, 100 * 100); + + //consume all messages + consumeTestMessages(consumer, 200); + + //All messages should now be gone + verifyPendingStats(dest, 0, 0); + + connection.close(); + } + + @Test(timeout=100000) + public void testTopicPersistentAndNonPersistentMessageSize() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(new ActiveMQTopic(this.defaultTopicName)); + + org.apache.activemq.broker.region.Topic dest = publishTestTopicMessages(100, + DeliveryMode.NON_PERSISTENT, publishedMessageSize); + + dest = publishTestTopicMessages(100, DeliveryMode.PERSISTENT, publishedMessageSize); + + //verify the count and size - there is a prefetch of 100 so only 100 are pending and 100 + //are dispatched because we have an active consumer online + //verify the size is at least as big as 100 messages times the minimum of 100 size + verifyPendingStats(dest, 100, 100 * 100); + + //consume all messages + consumeTestMessages(consumer, 200); + + //All messages should now be gone + verifyPendingStats(dest, 0, 0); + + connection.close(); + } + + @Test(timeout=10000) + public void testMessageSizeOneDurable() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + + SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1"); + org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(connection, + new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT); + + //verify the count and size - durable is offline so all 200 should be pending since none are in prefetch + verifyPendingStats(dest, subKey, 200, publishedMessageSize.get()); + verifyStoreStats(dest, 200, publishedMessageSize.get()); + + //consume all messages + consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize); + + //All messages should now be gone + verifyPendingStats(dest, subKey, 0, 0); + verifyStoreStats(dest, 0, 0); + + connection.close(); + } + + @Test(timeout=10000) + public void testMessageSizeOneDurablePartialConsumption() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + + SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1"); + org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable( + connection, new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT); + + //verify the count and size - durable is offline so all 200 should be pending since none are in prefetch + verifyPendingStats(dest, subKey, 200, publishedMessageSize.get()); + verifyStoreStats(dest, 200, publishedMessageSize.get()); + + //consume all messages + consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize); + + //150 should be left + verifyPendingStats(dest, subKey, 150, publishedMessageSize.get()); + verifyStoreStats(dest, 150, publishedMessageSize.get()); + + connection.close(); + } + + @Test(timeout=10000) + public void testMessageSizeTwoDurables() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + + org.apache.activemq.broker.region.Topic dest = + publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, + publishedMessageSize, DeliveryMode.PERSISTENT); + + //verify the count and size + SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1"); + verifyPendingStats(dest, subKey, 200, publishedMessageSize.get()); + + //consume messages just for sub1 + consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize); + + //There is still a durable that hasn't consumed so the messages should exist + SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2"); + verifyPendingStats(dest, subKey, 0, 0); + verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get()); + verifyStoreStats(dest, 200, publishedMessageSize.get()); + + connection.stop(); + } + + + protected void verifyPendingStats(final org.apache.activemq.broker.region.Queue queue, + final int count, final long minimumSize) throws Exception { + this.verifyPendingStats(queue, count, minimumSize, count, minimumSize); + } + + protected void verifyPendingStats(final org.apache.activemq.broker.region.Queue queue, + final int count, final long minimumSize, final int storeCount, final long minimumStoreSize) throws Exception { + + Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return queue.getPendingMessageCount() == count; + } + }); + + verifySize(count, new MessageSizeCalculator() { + @Override + public long getMessageSize() throws Exception { + return queue.getPendingMessageSize(); + } + }, minimumSize); + } + + //For a non-durable there won't necessarily be a message store + protected void verifyPendingStats(org.apache.activemq.broker.region.Topic topic, + final int count, final long minimumSize) throws Exception { + + final TopicSubscription sub = (TopicSubscription) topic.getConsumers().get(0); + + Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return sub.getPendingQueueSize() == count; + } + }); + + verifySize(count, new MessageSizeCalculator() { + @Override + public long getMessageSize() throws Exception { + return sub.getPendingMessageSize(); + } + }, minimumSize); + } + + protected void verifyPendingStats(org.apache.activemq.broker.region.Topic topic, SubscriptionKey subKey, + final int count, final long minimumSize) throws Exception { + + final DurableTopicSubscription sub = topic.getDurableTopicSubs().get(subKey); + + //verify message count + Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return sub.getPendingQueueSize() == count; + } + }); + + //verify message size + verifySize(count, new MessageSizeCalculator() { + @Override + public long getMessageSize() throws Exception { + return sub.getPendingMessageSize(); + } + }, minimumSize); + } + + protected void verifyStoreStats(org.apache.activemq.broker.region.Destination dest, + final int storeCount, final long minimumStoreSize) throws Exception { + final MessageStore messageStore = dest.getMessageStore(); + + Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return messageStore.getMessageCount() == storeCount; + } + }); + verifySize(storeCount, new MessageSizeCalculator() { + @Override + public long getMessageSize() throws Exception { + return messageStore.getMessageSize(); + } + }, minimumStoreSize); + + } + + + protected void verifySize(final int count, final MessageSizeCalculator messageSizeCalculator, + final long minimumSize) throws Exception { + if (count > 0) { + Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return messageSizeCalculator.getMessageSize() > minimumSize ; + } + }); + } else { + Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return messageSizeCalculator.getMessageSize() == 0; + } + }); + } + } + + protected static interface MessageSizeCalculator { + long getMessageSize() throws Exception; + } + + + protected Destination consumeTestMessages(MessageConsumer consumer, int size) throws Exception { + return consumeTestMessages(consumer, size, defaultTopicName); + } + + + protected Destination consumeTestMessages(MessageConsumer consumer, int size, String topicName) throws Exception { + // create a new queue + final ActiveMQDestination activeMqTopic = new ActiveMQTopic( + topicName); + + Destination dest = broker.getDestination(activeMqTopic); + + //Topic topic = session.createTopic(topicName); + + try { + for (int i = 0; i < size; i++) { + consumer.receive(); + } + + } finally { + //session.close(); + } + + return dest; + } + + protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, AtomicLong publishedMessageSize) throws Exception { + return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize); + } + + protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, + String[] subNames, int publishSize, AtomicLong publishedMessageSize, int deliveryMode) throws Exception { + + return publishTestMessagesDurable(connection, subNames, defaultTopicName, + publishSize, 0, AbstractStoreStatTestSupport.defaultMessageSize, + publishedMessageSize, false, deliveryMode); + } + + protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize, + AtomicLong publishedMessageSize) throws Exception { + return publishTestTopicMessages(publishSize, DeliveryMode.PERSISTENT, publishedMessageSize); + } + + protected org.apache.activemq.broker.region.Topic publishTestTopicMessages(int publishSize, + int deliveryMode, AtomicLong publishedMessageSize) throws Exception { + // create a new queue + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId2"); + connection.start(); + + final ActiveMQDestination activeMqTopic = new ActiveMQTopic( + defaultTopicName); + + org.apache.activemq.broker.region.Topic dest = + (org.apache.activemq.broker.region.Topic) broker.getDestination(activeMqTopic); + + // Start the connection + Session session = connection.createSession(false, + TopicSession.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(defaultTopicName); + + try { + // publish a bunch of non-persistent messages to fill up the temp + // store + MessageProducer prod = session.createProducer(topic); + prod.setDeliveryMode(deliveryMode); + for (int i = 0; i < publishSize; i++) { + prod.send(createMessage(session, AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize)); + } + + } finally { + connection.close(); + } + + return dest; + } + + protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count, + AtomicLong publishedMessageSize) throws Exception { + return publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT, + AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize); + } + + protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count, int deliveryMode, + AtomicLong publishedMessageSize) throws Exception { + return publishTestQueueMessages(count, defaultQueueName, deliveryMode, + AbstractPendingMessageCursorTest.maxMessageSize, publishedMessageSize); + } + + protected Destination consumeTestQueueMessages() throws Exception { + return consumeTestQueueMessages(defaultQueueName); + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java new file mode 100644 index 0000000..557c70e --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Topic; +import org.apache.activemq.util.SubscriptionKey; +import org.apache.commons.io.FileUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This test checks that pending message metrics work properly with KahaDB + * + * AMQ-5923 + * + */ +public class KahaDBPendingMessageCursorTest extends + AbstractPendingMessageCursorTest { + protected static final Logger LOG = LoggerFactory + .getLogger(KahaDBPendingMessageCursorTest.class); + + @Rule + public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); + + @Override + protected void setUpBroker(boolean clearDataDir) throws Exception { + if (clearDataDir && dataFileDir.getRoot().exists()) + FileUtils.cleanDirectory(dataFileDir.getRoot()); + super.setUpBroker(clearDataDir); + } + + @Override + protected void initPersistence(BrokerService brokerService) + throws IOException { + broker.setPersistent(true); + broker.setDataDirectoryFile(dataFileDir.getRoot()); + } + + /** + * Test that the the counter restores size and works after restart and more + * messages are published + * + * @throws Exception + */ + @Test(timeout=10000) + public void testDurableMessageSizeAfterRestartAndPublish() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + Topic topic = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, + publishedMessageSize, DeliveryMode.PERSISTENT); + + SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1"); + + // verify the count and size + verifyPendingStats(topic, subKey, 200, publishedMessageSize.get()); + verifyStoreStats(topic, 200, publishedMessageSize.get()); + + // stop, restart broker and publish more messages + stopBroker(); + this.setUpBroker(false); + + connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + + topic = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, + publishedMessageSize, DeliveryMode.PERSISTENT); + + // verify the count and size + verifyPendingStats(topic, subKey, 400, publishedMessageSize.get()); + verifyStoreStats(topic, 400, publishedMessageSize.get()); + + } + + /** + * Test that the the counter restores size and works after restart and more + * messages are published + * + * @throws Exception + */ + @Test(timeout=10000) + public void testNonPersistentDurableMessageSize() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + Topic topic = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, + publishedMessageSize, DeliveryMode.NON_PERSISTENT); + + SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1"); + + // verify the count and size + verifyPendingStats(topic, subKey, 200, publishedMessageSize.get()); + verifyStoreStats(topic, 0, 0); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java new file mode 100644 index 0000000..948193d --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.apache.activemq.util.SubscriptionKey; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This test checks that PendingMessageCursor size statistics work with the MemoryPersistentAdapter + * + * AMQ-5748 + * + */ +public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursorTest { + protected static final Logger LOG = LoggerFactory + .getLogger(MemoryPendingMessageCursorTest.class); + + @Override + protected void initPersistence(BrokerService brokerService) throws IOException { + broker.setPersistent(false); + broker.setPersistenceAdapter(new MemoryPersistenceAdapter()); + } + + + @Override + @Test(timeout=10000) + public void testMessageSizeOneDurable() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + + SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1"); + org.apache.activemq.broker.region.Topic dest = + publishTestMessagesDurable(connection, new String[] {"sub1"}, + 200, publishedMessageSize, DeliveryMode.PERSISTENT); + + verifyPendingStats(dest, subKey, 200, publishedMessageSize.get()); + + //The expected value is only 100 because for durables a LRUCache is being used + //with a max size of 100 + verifyStoreStats(dest, 100, publishedMessageSize.get()); + + //consume 100 messages + consumeDurableTestMessages(connection, "sub1", 100, publishedMessageSize); + + //100 should be left + verifyPendingStats(dest, subKey, 100, publishedMessageSize.get()); + verifyStoreStats(dest, 100, publishedMessageSize.get()); + + connection.close(); + } + + @Override + @Test(timeout=10000) + public void testMessageSizeTwoDurables() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + + org.apache.activemq.broker.region.Topic dest = + publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, + 200, publishedMessageSize, DeliveryMode.PERSISTENT); + + //verify the count and size + SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1"); + verifyPendingStats(dest, subKey, 200, publishedMessageSize.get()); + + //consume messages just for sub1 + consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize); + + //There is still a durable that hasn't consumed so the messages should exist + SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2"); + verifyPendingStats(dest, subKey, 0, 0); + verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get()); + + //The expected value is only 100 because for durables a LRUCache is being used + //with a max size of 100 + verifyStoreStats(dest, 100, publishedMessageSize.get()); + + connection.stop(); + } + + @Override + @Test(timeout=10000) + public void testMessageSizeOneDurablePartialConsumption() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + + SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1"); + org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(connection, + new String[] {"sub1"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT); + + //verify the count and size - durable is offline so all 200 should be pending since none are in prefetch + verifyPendingStats(dest, subKey, 200, publishedMessageSize.get()); + + //The expected value is only 100 because for durables a LRUCache is being used + //with a max size of 100 + verifyStoreStats(dest, 100, publishedMessageSize.get()); + + //consume all messages + consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize); + + //All messages should now be gone + verifyPendingStats(dest, subKey, 150, publishedMessageSize.get()); + + //The expected value is only 100 because for durables a LRUCache is being used + //with a max size of 100 + //verify the size is at least as big as 100 messages times the minimum of 100 size + verifyStoreStats(dest, 100, 100 * 100); + + connection.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java new file mode 100644 index 0000000..9768980 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/MultiKahaDBPendingMessageCursorTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; + +/** + * This test checks that pending message metrics work properly with MultiKahaDB + * + * AMQ-5923 + * + */ +public class MultiKahaDBPendingMessageCursorTest extends + KahaDBPendingMessageCursorTest { + + @Override + protected void initPersistence(BrokerService brokerService) + throws IOException { + broker.setPersistent(true); + + //setup multi-kaha adapter + MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(dataFileDir.getRoot()); + + KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter(); + kahaStore.setJournalMaxFileLength(1024 * 512); + + //set up a store per destination + FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); + filtered.setPersistenceAdapter(kahaStore); + filtered.setPerDestination(true); + List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>(); + stores.add(filtered); + + persistenceAdapter.setFilteredPersistenceAdapters(stores); + broker.setPersistenceAdapter(persistenceAdapter); + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java index 79d7e6c..6a9dd6b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/OrderPendingListTest.java @@ -309,6 +309,16 @@ public class OrderPendingListTest { } @Override + public long messageSize() { + long size = 0; + Iterator<MessageReference> i = theList.iterator(); + while (i.hasNext()) { + size += i.next().getMessage().getSize(); + } + return size; + } + + @Override public Iterator<MessageReference> iterator() { return theList.iterator(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java index 944d183..116500e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java @@ -16,38 +16,19 @@ */ package org.apache.activemq.store; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.URI; -import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; -import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicSession; -import javax.jms.TopicSubscriber; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.TabularData; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait.Condition; import org.junit.After; @@ -62,7 +43,7 @@ import org.slf4j.LoggerFactory; * AMQ-5748 * */ -public abstract class AbstractMessageStoreSizeStatTest { +public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStatTestSupport { protected static final Logger LOG = LoggerFactory .getLogger(AbstractMessageStoreSizeStatTest.class); @@ -71,7 +52,6 @@ public abstract class AbstractMessageStoreSizeStatTest { protected URI brokerConnectURI; protected String defaultQueueName = "test.queue"; protected String defaultTopicName = "test.topic"; - protected static int messageSize = 1000; @Before public void startBroker() throws Exception { @@ -100,39 +80,52 @@ public abstract class AbstractMessageStoreSizeStatTest { broker.waitUntilStopped(); } + @Override + protected BrokerService getBroker() { + return this.broker; + } + + @Override + protected URI getBrokerConnectURI() { + return this.brokerConnectURI; + } + protected abstract void initPersistence(BrokerService brokerService) throws IOException; - @Test + @Test(timeout=10000) public void testMessageSize() throws Exception { - Destination dest = publishTestQueueMessages(200); - verifyStats(dest, 200, 200 * messageSize); + AtomicLong publishedMessageSize = new AtomicLong(); + + Destination dest = publishTestQueueMessages(200, publishedMessageSize); + verifyStats(dest, 200, publishedMessageSize.get()); } - @Test + @Test(timeout=10000) public void testMessageSizeAfterConsumption() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); - Destination dest = publishTestQueueMessages(200); - verifyStats(dest, 200, 200 * messageSize); + Destination dest = publishTestQueueMessages(200, publishedMessageSize); + verifyStats(dest, 200, publishedMessageSize.get()); consumeTestQueueMessages(); verifyStats(dest, 0, 0); } - @Test + @Test(timeout=10000) public void testMessageSizeOneDurable() throws Exception { - + AtomicLong publishedMessageSize = new AtomicLong(); Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); connection.setClientID("clientId"); connection.start(); - Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200); + Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200, publishedMessageSize); //verify the count and size - verifyStats(dest, 200, 200 * messageSize); + verifyStats(dest, 200, publishedMessageSize.get()); //consume all messages - consumeDurableTestMessages(connection, "sub1", 200); + consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize); //All messages should now be gone verifyStats(dest, 0, 0); @@ -142,21 +135,21 @@ public abstract class AbstractMessageStoreSizeStatTest { @Test(timeout=10000) public void testMessageSizeTwoDurables() throws Exception { - + AtomicLong publishedMessageSize = new AtomicLong(); Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); connection.setClientID("clientId"); connection.start(); - Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200); + Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200, publishedMessageSize); //verify the count and size - verifyStats(dest, 200, 200 * messageSize); + verifyStats(dest, 200, publishedMessageSize.get()); //consume messages just for sub1 - consumeDurableTestMessages(connection, "sub1", 200); + consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize); //There is still a durable that hasn't consumed so the messages should exist - verifyStats(dest, 200, 200 * messageSize); + verifyStats(dest, 200, publishedMessageSize.get()); connection.stop(); @@ -164,14 +157,24 @@ public abstract class AbstractMessageStoreSizeStatTest { @Test public void testMessageSizeAfterDestinationDeletion() throws Exception { - Destination dest = publishTestQueueMessages(200); - verifyStats(dest, 200, 200 * messageSize); + AtomicLong publishedMessageSize = new AtomicLong(); + Destination dest = publishTestQueueMessages(200, publishedMessageSize); + verifyStats(dest, 200, publishedMessageSize.get()); //check that the size is 0 after deletion broker.removeDestination(dest.getActiveMQDestination()); verifyStats(dest, 0, 0); } + @Test + public void testQueueBrowserMessageSize() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + Destination dest = publishTestQueueMessages(200, publishedMessageSize); + browseTestQueueMessages(dest.getName()); + verifyStats(dest, 200, publishedMessageSize.get()); + } + protected void verifyStats(Destination dest, final int count, final long minimumSize) throws Exception { final MessageStore messageStore = dest.getMessageStore(); final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics(); @@ -203,164 +206,31 @@ public abstract class AbstractMessageStoreSizeStatTest { } } - /** - * Generate random 1 megabyte messages - * @param session - * @return - * @throws JMSException - */ - protected BytesMessage createMessage(Session session) throws JMSException { - final BytesMessage message = session.createBytesMessage(); - final byte[] data = new byte[messageSize]; - final Random rng = new Random(); - rng.nextBytes(data); - message.writeBytes(data); - return message; - } - - protected Destination publishTestQueueMessages(int count) throws Exception { - return publishTestQueueMessages(count, defaultQueueName); + protected Destination publishTestQueueMessages(int count, AtomicLong publishedMessageSize) throws Exception { + return publishTestQueueMessages(count, defaultQueueName, DeliveryMode.PERSISTENT, + AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize); } - protected Destination publishTestQueueMessages(int count, String queueName) throws Exception { - // create a new queue - final ActiveMQDestination activeMqQueue = new ActiveMQQueue( - queueName); - - Destination dest = broker.getDestination(activeMqQueue); - - // Start the connection - Connection connection = new ActiveMQConnectionFactory(brokerConnectURI) - .createConnection(); - connection.setClientID("clientId" + queueName); - connection.start(); - Session session = connection.createSession(false, - QueueSession.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(queueName); - - try { - // publish a bunch of non-persistent messages to fill up the temp - // store - MessageProducer prod = session.createProducer(queue); - prod.setDeliveryMode(DeliveryMode.PERSISTENT); - for (int i = 0; i < count; i++) { - prod.send(createMessage(session)); - } - - } finally { - connection.close(); - } - - return dest; + protected Destination publishTestQueueMessages(int count, String queueName, AtomicLong publishedMessageSize) throws Exception { + return publishTestQueueMessages(count, queueName, DeliveryMode.PERSISTENT, + AbstractStoreStatTestSupport.defaultMessageSize, publishedMessageSize); } protected Destination consumeTestQueueMessages() throws Exception { return consumeTestQueueMessages(defaultQueueName); } - protected Destination consumeDurableTestMessages(Connection connection, String sub, int size) throws Exception { - return consumeDurableTestMessages(connection, sub, size, defaultTopicName); + protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, + AtomicLong publishedMessageSize) throws Exception { + return consumeDurableTestMessages(connection, sub, size, defaultTopicName, publishedMessageSize); } - protected Destination consumeTestQueueMessages(String queueName) throws Exception { - // create a new queue - final ActiveMQDestination activeMqQueue = new ActiveMQQueue( - queueName); - - Destination dest = broker.getDestination(activeMqQueue); - - // Start the connection - Connection connection = new ActiveMQConnectionFactory(brokerConnectURI) - .createConnection(); - connection.setClientID("clientId2" + queueName); - connection.start(); - Session session = connection.createSession(false, - QueueSession.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue(queueName); - - try { - MessageConsumer consumer = session.createConsumer(queue); - for (int i = 0; i < 200; i++) { - consumer.receive(); - } - - } finally { - connection.stop(); - } - - return dest; - } - - protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, String topicName) throws Exception { - // create a new queue - final ActiveMQDestination activeMqTopic = new ActiveMQTopic( - topicName); - - Destination dest = broker.getDestination(activeMqTopic); - - Session session = connection.createSession(false, - QueueSession.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic(topicName); - - try { - TopicSubscriber consumer = session.createDurableSubscriber(topic, sub); - for (int i = 0; i < size; i++) { - consumer.receive(); - } - - } finally { - session.close(); - } - - return dest; - } - - protected Destination publishTestMessagesDurable(Connection connection, String[] subNames, int publishSize, int expectedSize) throws Exception { - // create a new queue - final ActiveMQDestination activeMqTopic = new ActiveMQTopic( - defaultTopicName); - - Destination dest = broker.getDestination(activeMqTopic); - - // Start the connection - - Session session = connection.createSession(false, - TopicSession.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic(defaultTopicName); - for (String subName : subNames) { - session.createDurableSubscriber(topic, subName); - } - - // browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore) - //in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used, - //then the statistics won't be updated properly because a new store would overwrite the old store - //which is still in use - ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers(); - - try { - // publish a bunch of non-persistent messages to fill up the temp - // store - MessageProducer prod = session.createProducer(topic); - prod.setDeliveryMode(DeliveryMode.PERSISTENT); - for (int i = 0; i < publishSize; i++) { - prod.send(createMessage(session)); - } - - //verify the view has expected messages - assertEquals(subNames.length, subs.length); - ObjectName subName = subs[0]; - DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) - broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true); - CompositeData[] data = sub.browse(); - assertNotNull(data); - assertEquals(expectedSize, data.length); - - } finally { - session.close(); - } - - return dest; + protected Destination publishTestMessagesDurable(Connection connection, String[] subNames, + int publishSize, int expectedSize, AtomicLong publishedMessageSize) throws Exception { + return publishTestMessagesDurable(connection, subNames, defaultTopicName, + publishSize, expectedSize, AbstractStoreStatTestSupport.defaultMessageSize, + publishedMessageSize, true); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java new file mode 100644 index 0000000..3f0e7c1 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractStoreStatTestSupport.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.net.URI; +import java.util.Enumeration; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; + +/** + * + * + */ +public abstract class AbstractStoreStatTestSupport { + + protected static int defaultMessageSize = 1000; + + protected abstract BrokerService getBroker(); + + protected abstract URI getBrokerConnectURI(); + + protected Destination consumeTestQueueMessages(String queueName) throws Exception { + // create a new queue + final ActiveMQDestination activeMqQueue = new ActiveMQQueue( + queueName); + + Destination dest = getBroker().getDestination(activeMqQueue); + + // Start the connection + Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI()) + .createConnection(); + connection.setClientID("clientId2" + queueName); + connection.start(); + Session session = connection.createSession(false, + QueueSession.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + + try { + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < 200; i++) { + consumer.receive(); + } + + } finally { + connection.stop(); + } + + return dest; + } + + protected Destination browseTestQueueMessages(String queueName) throws Exception { + // create a new queue + final ActiveMQDestination activeMqQueue = new ActiveMQQueue( + queueName); + + Destination dest = getBroker().getDestination(activeMqQueue); + + // Start the connection + Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI()) + .createConnection(); + connection.setClientID("clientId2" + queueName); + connection.start(); + Session session = connection.createSession(false, + QueueSession.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + + try { + QueueBrowser queueBrowser = session.createBrowser(queue); + @SuppressWarnings("unchecked") + Enumeration<Message> messages = queueBrowser.getEnumeration(); + while (messages.hasMoreElements()) { + messages.nextElement(); + } + + } finally { + connection.stop(); + } + + return dest; + } + + protected Destination consumeDurableTestMessages(Connection connection, String sub, + int size, String topicName, AtomicLong publishedMessageSize) throws Exception { + // create a new queue + final ActiveMQDestination activeMqTopic = new ActiveMQTopic( + topicName); + + Destination dest = getBroker().getDestination(activeMqTopic); + + Session session = connection.createSession(false, + QueueSession.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(topicName); + + try { + TopicSubscriber consumer = session.createDurableSubscriber(topic, sub); + for (int i = 0; i < size; i++) { + ActiveMQMessage message = (ActiveMQMessage) consumer.receive(); + if (publishedMessageSize != null) { + publishedMessageSize.addAndGet(-message.getSize()); + } + } + + } finally { + session.close(); + } + + return dest; + } + + protected org.apache.activemq.broker.region.Queue publishTestQueueMessages(int count, String queueName, + int deliveryMode, int messageSize, AtomicLong publishedMessageSize) throws Exception { + // create a new queue + final ActiveMQDestination activeMqQueue = new ActiveMQQueue( + queueName); + + Destination dest = getBroker().getDestination(activeMqQueue); + + // Start the connection + Connection connection = new ActiveMQConnectionFactory(getBrokerConnectURI()) + .createConnection(); + connection.setClientID("clientId" + queueName); + connection.start(); + Session session = connection.createSession(false, + QueueSession.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(queueName); + + try { + // publish a bunch of non-persistent messages to fill up the temp + // store + MessageProducer prod = session.createProducer(queue); + prod.setDeliveryMode(deliveryMode); + for (int i = 0; i < count; i++) { + prod.send(createMessage(session, messageSize, publishedMessageSize)); + } + + } finally { + connection.close(); + } + + return (org.apache.activemq.broker.region.Queue) dest; + } + + protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName, + int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, + boolean verifyBrowsing) throws Exception { + return this.publishTestMessagesDurable(connection, subNames, topicName, publishSize, expectedSize, messageSize, + publishedMessageSize, verifyBrowsing, DeliveryMode.PERSISTENT); + } + + protected org.apache.activemq.broker.region.Topic publishTestMessagesDurable(Connection connection, String[] subNames, String topicName, + int publishSize, int expectedSize, int messageSize, AtomicLong publishedMessageSize, + boolean verifyBrowsing, int deliveryMode) throws Exception { + // create a new queue + final ActiveMQDestination activeMqTopic = new ActiveMQTopic( + topicName); + + Destination dest = getBroker().getDestination(activeMqTopic); + + // Start the connection + + Session session = connection.createSession(false, + TopicSession.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(topicName); + for (String subName : subNames) { + session.createDurableSubscriber(topic, subName); + } + + ObjectName[] subs = null; + if (verifyBrowsing) { + // browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore) + //in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used, + //then the statistics won't be updated properly because a new store would overwrite the old store + //which is still in use + subs = getBroker().getAdminView().getDurableTopicSubscribers(); + } + + try { + // publish a bunch of non-persistent messages to fill up the temp + // store + MessageProducer prod = session.createProducer(topic); + prod.setDeliveryMode(deliveryMode); + for (int i = 0; i < publishSize; i++) { + prod.send(createMessage(session, messageSize, publishedMessageSize)); + } + + //verify the view has expected messages + if (verifyBrowsing) { + assertNotNull(subs); + assertEquals(subNames.length, subs.length); + ObjectName subName = subs[0]; + DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) + getBroker().getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true); + CompositeData[] data = sub.browse(); + assertNotNull(data); + assertEquals(expectedSize, data.length); + } + + } finally { + session.close(); + } + + return (org.apache.activemq.broker.region.Topic) dest; + } + + /** + * Generate random messages between 100 bytes and messageSize + * @param session + * @return + * @throws JMSException + */ + protected BytesMessage createMessage(Session session, int messageSize, AtomicLong publishedMessageSize) throws JMSException { + final BytesMessage message = session.createBytesMessage(); + final Random rn = new Random(); + int size = rn.nextInt(messageSize - 100); + if (publishedMessageSize != null) { + publishedMessageSize.addAndGet(size); + } + + final byte[] data = new byte[size]; + final Random rng = new Random(); + rng.nextBytes(data); + message.writeBytes(data); + return message; + } +}