Repository: activemq Updated Branches: refs/heads/master b17cc37ef -> 734fb7dda
http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java index 28884e6..276a310 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java @@ -18,12 +18,15 @@ package org.apache.activemq.store.kahadb; import java.io.File; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.store.AbstractMessageStoreSizeStatTest; import org.apache.commons.io.FileUtils; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,12 +42,13 @@ public class KahaDBMessageStoreSizeStatTest extends protected static final Logger LOG = LoggerFactory .getLogger(KahaDBMessageStoreSizeStatTest.class); - File dataFileDir = new File("target/test-amq-5748/stat-datadb"); + @Rule + public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); @Override protected void setUpBroker(boolean clearDataDir) throws Exception { - if (clearDataDir && dataFileDir.exists()) - FileUtils.cleanDirectory(dataFileDir); + if (clearDataDir && dataFileDir.getRoot().exists()) + FileUtils.cleanDirectory(dataFileDir.getRoot()); super.setUpBroker(clearDataDir); } @@ -52,7 +56,7 @@ public class KahaDBMessageStoreSizeStatTest extends protected void initPersistence(BrokerService brokerService) throws IOException { broker.setPersistent(true); - broker.setDataDirectoryFile(dataFileDir); + broker.setDataDirectoryFile(dataFileDir.getRoot()); } /** @@ -63,19 +67,19 @@ public class KahaDBMessageStoreSizeStatTest extends */ @Test public void testMessageSizeAfterRestartAndPublish() throws Exception { - - Destination dest = publishTestQueueMessages(200); + AtomicLong publishedMessageSize = new AtomicLong(); + Destination dest = publishTestQueueMessages(200, publishedMessageSize); // verify the count and size - verifyStats(dest, 200, 200 * messageSize); + verifyStats(dest, 200, publishedMessageSize.get()); // stop, restart broker and publish more messages stopBroker(); this.setUpBroker(false); - dest = publishTestQueueMessages(200); + dest = publishTestQueueMessages(200, publishedMessageSize); // verify the count and size - verifyStats(dest, 400, 400 * messageSize); + verifyStats(dest, 400, publishedMessageSize.get()); } http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java index 849a91b..3572acc 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java @@ -22,12 +22,15 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.store.AbstractMessageStoreSizeStatTest; import org.apache.commons.io.FileUtils; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,12 +46,13 @@ public class MultiKahaDBMessageStoreSizeStatTest extends protected static final Logger LOG = LoggerFactory .getLogger(MultiKahaDBMessageStoreSizeStatTest.class); - File dataFileDir = new File("target/test-amq-5748/stat-datadb"); + @Rule + public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); @Override protected void setUpBroker(boolean clearDataDir) throws Exception { - if (clearDataDir && dataFileDir.exists()) - FileUtils.cleanDirectory(dataFileDir); + if (clearDataDir && dataFileDir.getRoot().exists()) + FileUtils.cleanDirectory(dataFileDir.getRoot()); super.setUpBroker(clearDataDir); } @@ -59,7 +63,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends //setup multi-kaha adapter MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter(); - persistenceAdapter.setDirectory(dataFileDir); + persistenceAdapter.setDirectory(dataFileDir.getRoot()); KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter(); kahaStore.setJournalMaxFileLength(1024 * 512); @@ -81,51 +85,53 @@ public class MultiKahaDBMessageStoreSizeStatTest extends * * @throws Exception */ - @Test + @Test(timeout=10000) public void testMessageSizeAfterRestartAndPublish() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); - Destination dest = publishTestQueueMessages(200); + Destination dest = publishTestQueueMessages(200, publishedMessageSize); // verify the count and size - verifyStats(dest, 200, 200 * messageSize); + verifyStats(dest, 200, publishedMessageSize.get()); // stop, restart broker and publish more messages stopBroker(); this.setUpBroker(false); - dest = publishTestQueueMessages(200); + dest = publishTestQueueMessages(200, publishedMessageSize); // verify the count and size - verifyStats(dest, 400, 400 * messageSize); + verifyStats(dest, 400, publishedMessageSize.get()); } - @Test + @Test(timeout=10000) public void testMessageSizeAfterRestartAndPublishMultiQueue() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + AtomicLong publishedMessageSize2 = new AtomicLong(); - Destination dest = publishTestQueueMessages(200); + Destination dest = publishTestQueueMessages(200, publishedMessageSize); // verify the count and size - verifyStats(dest, 200, 200 * messageSize); - assertTrue(broker.getPersistenceAdapter().size() > 200 * messageSize); + verifyStats(dest, 200, publishedMessageSize.get()); + assertTrue(broker.getPersistenceAdapter().size() > publishedMessageSize.get()); - Destination dest2 = publishTestQueueMessages(200, "test.queue2"); + Destination dest2 = publishTestQueueMessages(200, "test.queue2", publishedMessageSize2); // verify the count and size - verifyStats(dest2, 200, 200 * messageSize); - assertTrue(broker.getPersistenceAdapter().size() > 400 * messageSize); + verifyStats(dest2, 200, publishedMessageSize2.get()); + assertTrue(broker.getPersistenceAdapter().size() > publishedMessageSize.get() + publishedMessageSize2.get()); // stop, restart broker and publish more messages stopBroker(); this.setUpBroker(false); - dest = publishTestQueueMessages(200); - dest2 = publishTestQueueMessages(200, "test.queue2"); + dest = publishTestQueueMessages(200, publishedMessageSize); + dest2 = publishTestQueueMessages(200, "test.queue2", publishedMessageSize2); // verify the count and size after publishing messages - verifyStats(dest, 400, 400 * messageSize); - verifyStats(dest2, 400, 400 * messageSize); + verifyStats(dest, 400, publishedMessageSize.get()); + verifyStats(dest2, 400, publishedMessageSize2.get()); - System.out.println(broker.getPersistenceAdapter().size()); - assertTrue(broker.getPersistenceAdapter().size() > 800 * messageSize); + assertTrue(broker.getPersistenceAdapter().size() > publishedMessageSize.get() + publishedMessageSize2.get()); assertTrue(broker.getPersistenceAdapter().size() >= (dest.getMessageStore().getMessageSize() + dest2.getMessageStore().getMessageSize())); http://git-wip-us.apache.org/repos/asf/activemq/blob/734fb7dd/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java index dc6ff8b..ba2ae33 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.store.memory; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.Connection; @@ -24,6 +25,7 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.store.AbstractMessageStoreSizeStatTest; +import org.apache.activemq.store.AbstractStoreStatTestSupport; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,21 +49,23 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat @Override @Test(timeout=10000) public void testMessageSizeOneDurable() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); connection.setClientID("clientId"); connection.start(); //The expected value is only 100 because for durables a LRUCache is being used //with a max size of 100 - Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100); + Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100, publishedMessageSize); //verify the count and size, should be 100 because of the LRUCache - verifyStats(dest, 100, 100 * messageSize); + //verify size is at least the minimum of 100 messages times 100 bytes + verifyStats(dest, 100, 100 * 100); - consumeDurableTestMessages(connection, "sub1", 100); + consumeDurableTestMessages(connection, "sub1", 100, publishedMessageSize); //Since an LRU cache is used and messages are kept in memory, this should be 100 still - verifyStats(dest, 100, 100 * messageSize); + verifyStats(dest, 100, publishedMessageSize.get()); connection.stop(); @@ -70,22 +74,24 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat @Override @Test(timeout=10000) public void testMessageSizeTwoDurables() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); connection.setClientID("clientId"); connection.start(); //The expected value is only 100 because for durables a LRUCache is being used //with a max size of 100, so only 100 messages are kept - Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100); + Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100, publishedMessageSize); //verify the count and size - verifyStats(dest, 100, 100 * messageSize); + //verify size is at least the minimum of 100 messages times 100 bytes + verifyStats(dest, 100, 100 * 100); //consume for sub1 - consumeDurableTestMessages(connection, "sub1", 100); + consumeDurableTestMessages(connection, "sub1", 100, publishedMessageSize); //Should be 100 messages still - verifyStats(dest, 100, 100 * messageSize); + verifyStats(dest, 100, publishedMessageSize.get()); connection.stop();