Repository: activemq
Updated Branches:
  refs/heads/master 9f9b0fb26 -> a953f11d0


https://issues.apache.org/jira/browse/AMQ-6323

Applying reduceMemoryFootprint for persistent Topic messages


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a953f11d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a953f11d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a953f11d

Branch: refs/heads/master
Commit: a953f11d0e6e4f271e99f1b9ac6cfb01ee0ecf99
Parents: 9f9b0fb
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Tue Jun 14 14:36:37 2016 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Tue Jun 14 14:36:37 2016 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Topic.java    |  4 ++
 .../store/AbstractVmConcurrentDispatchTest.java | 63 ++++++++++++++------
 .../kahadb/KahaDbVmConcurrentDispatchTest.java  | 16 +++--
 .../MultiKahaDbVmConcurrentDispatchTest.java    | 16 +++--
 .../LevelDbVmConcurrentDispatchTest.java        | 11 ++--
 5 files changed, 78 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a953f11d/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 1a9949e..c43f55e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -512,6 +512,10 @@ public class Topic extends BaseDestination implements Task 
{
                 waitForSpace(context,producerExchange, 
systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
             }
             result = topicStore.asyncAddTopicMessage(context, 
message,isOptimizeStorage());
+
+            if (isReduceMemoryFootprint()) {
+                message.clearMarshalledState();
+            }
         }
 
         message.incrementReferenceCount();

http://git-wip-us.apache.org/repos/asf/activemq/blob/a953f11d/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java
index aaaaf69..69bd2c8 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractVmConcurrentDispatchTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
@@ -40,6 +41,7 @@ import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQSession;
@@ -48,6 +50,7 @@ import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.junit.After;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -67,17 +70,22 @@ public abstract class AbstractVmConcurrentDispatchTest {
 
     private final MessageType messageType;
     private final boolean reduceMemoryFootPrint;
+    protected final boolean useTopic;
 
     protected static enum MessageType {TEXT, MAP, OBJECT}
     protected final static boolean[] booleanVals = {true, false};
     protected static boolean[] reduceMemoryFootPrintVals = booleanVals;
+    protected static boolean[] useTopicVals = booleanVals;
+    private String testTopicName = "mytopic";
 
     @Rule
     public TemporaryFolder dataFileDir = new TemporaryFolder(new 
File("target"));
 
-    public AbstractVmConcurrentDispatchTest(MessageType messageType, boolean 
reduceMemoryFootPrint) {
+    public AbstractVmConcurrentDispatchTest(MessageType messageType, boolean 
reduceMemoryFootPrint,
+            boolean useTopic) {
         this.messageType = messageType;
         this.reduceMemoryFootPrint = reduceMemoryFootPrint;
+        this.useTopic = useTopic;
     }
 
     private BrokerService broker;
@@ -92,7 +100,7 @@ public abstract class AbstractVmConcurrentDispatchTest {
     private final int NUM_PRODUCERS = 1;
     private final int NUM_TASKS = NUM_CONSUMERS + NUM_PRODUCERS;
 
-    private int i = 0;
+    private final AtomicInteger count = new AtomicInteger();
     private String MessageId = null;
     private int MessageCount = 0;
 
@@ -127,23 +135,28 @@ public abstract class AbstractVmConcurrentDispatchTest {
 
     @Test(timeout=180000)
     public void testMessagesAreValid() throws Exception {
+        if (this.useTopic) {
+            Assume.assumeTrue(reduceMemoryFootPrint);
+        }
 
         ExecutorService tasks = Executors.newFixedThreadPool(NUM_TASKS);
         for (int i = 0; i < NUM_CONSUMERS; i++) {
             LOG.info("Created Consumer: {}", i + 1);
-            tasks.execute(new HelloWorldConsumer());
+            tasks.execute(new HelloWorldConsumer(useTopic));
         }
 
         for (int i = 0; i < NUM_PRODUCERS; i++) {
             LOG.info("Created Producer: {}", i + 1);
-            tasks.execute(new HelloWorldProducer());
+            tasks.execute(new HelloWorldProducer(useTopic));
         }
 
         assertTrue(ready.await(20, TimeUnit.SECONDS));
 
         try {
             tasks.shutdown();
-            tasks.awaitTermination(20, TimeUnit.SECONDS);
+            //run for 10 seconds as that seems to be enough time to cause an 
error
+            //if there is going to be one
+            tasks.awaitTermination(10, TimeUnit.SECONDS);
         } catch (Exception e) {
             //should get exception with no errors
         }
@@ -161,6 +174,12 @@ public abstract class AbstractVmConcurrentDispatchTest {
 
     public class HelloWorldProducer implements Runnable {
 
+        final boolean useTopic;
+
+        public HelloWorldProducer(boolean useTopic) {
+            this.useTopic = useTopic;
+        }
+
         @Override
         public void run() {
             try {
@@ -172,7 +191,10 @@ public abstract class AbstractVmConcurrentDispatchTest {
 
                 Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
-                Destination destination = 
session.createTopic("VirtualTopic.AMQ6218Test");
+                //If using topics, just test a generic topic name
+                //If useTopic is false then we are testing virtual 
topics/queue consumes
+                Destination destination = useTopic ? 
session.createTopic(testTopicName) :
+                    session.createTopic("VirtualTopic.AMQ6218Test");
 
                 MessageProducer producer = session.createProducer(destination);
 
@@ -213,25 +235,30 @@ public abstract class AbstractVmConcurrentDispatchTest {
     }
 
     public class HelloWorldConsumer implements Runnable, ExceptionListener {
-        String queueName;
+        final boolean useTopic;
+
+        public HelloWorldConsumer(boolean useTopic) {
+            this.useTopic = useTopic;
+        }
 
         @Override
         public void run() {
             try {
 
+                int i = count.incrementAndGet();
+                String destName = !useTopic ? "Consumer.Q" + i + 
".VirtualTopic.AMQ6218Test" : testTopicName;
+                LOG.info(destName);
+
                 ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(getBrokerURI());
                 Connection connection = connectionFactory.createConnection();
+                connection.setClientID("clientId" + i);
                 connection.start();
-
                 Session session = connection.createSession(false, 
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
-                synchronized (this) {
-                    queueName = "Consumer.Q" + i + ".VirtualTopic.AMQ6218Test";
-                    i++;
-                    LOG.info(queueName);
-                }
 
-                Destination destination = session.createQueue(queueName);
-                MessageConsumer consumer = session.createConsumer(destination);
+                Destination destination = useTopic ? 
session.createTopic(destName) : session.createQueue(destName);
+                MessageConsumer consumer = useTopic ?
+                        session.createDurableSubscriber((Topic) destination, 
"sub" + i) :
+                            session.createConsumer(destination);
 
                 ready.countDown();
 
@@ -266,14 +293,14 @@ public abstract class AbstractVmConcurrentDispatchTest {
                             MapMessage mapMessage = (MapMessage) message;
                             text = mapMessage.getString("text");
                         } else {
-                            LOG.info(queueName + " Message is not a instanceof 
" + messageType + " message id: " + message.getJMSMessageID() + message);
+                            LOG.info(destName + " Message is not a instanceof 
" + messageType + " message id: " + message.getJMSMessageID() + message);
                         }
 
                         if (text == null) {
-                            LOG.warn(queueName + " text received as a null " + 
message);
+                            LOG.warn(destName + " text received as a null " + 
message);
                             failure.set(true);
                         } else {
-                            LOG.info(queueName + " text " + text + " message 
id: " + message.getJMSMessageID());
+                            LOG.info(destName + " text " + text + " message 
id: " + message.getJMSMessageID());
                         }
 
                         message.acknowledge();

http://git-wip-us.apache.org/repos/asf/activemq/blob/a953f11d/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java
index 217a7c7..36d5a81 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDbVmConcurrentDispatchTest.java
@@ -33,14 +33,16 @@ public class KahaDbVmConcurrentDispatchTest extends 
AbstractVmConcurrentDispatch
     private final boolean concurrentDispatch;
     private static boolean[] concurrentDispatchVals = booleanVals;
 
-      @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; 
ConcurrentDispatch:{2}")
+      @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; 
ConcurrentDispatch:{2}; UseTopic:{3}")
       public static Collection<Object[]> data() {
           List<Object[]> values = new ArrayList<>();
 
           for (MessageType mt : MessageType.values()) {
               for (boolean rmfVal : reduceMemoryFootPrintVals) {
                   for (boolean cdVal : concurrentDispatchVals) {
-                      values.add(new Object[] {mt, rmfVal, cdVal});
+                      for (boolean tpVal : useTopicVals) {
+                          values.add(new Object[] {mt, rmfVal, cdVal, tpVal});
+                      }
                   }
               }
           }
@@ -54,15 +56,19 @@ public class KahaDbVmConcurrentDispatchTest extends 
AbstractVmConcurrentDispatch
      * @param concurrentDispatch
      */
     public KahaDbVmConcurrentDispatchTest(MessageType messageType, boolean 
reduceMemoryFootPrint,
-            boolean concurrentDispatch) {
-        super(messageType, reduceMemoryFootPrint);
+            boolean concurrentDispatch, boolean useTopic) {
+        super(messageType, reduceMemoryFootPrint, useTopic);
         this.concurrentDispatch = concurrentDispatch;
     }
 
     @Override
     protected void configurePersistenceAdapter(BrokerService broker) throws 
IOException {
         KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) 
broker.getPersistenceAdapter();
-        ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
+        if (useTopic) {
+            ad.setConcurrentStoreAndDispatchTopics(concurrentDispatch);
+        } else {
+            ad.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a953f11d/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java
index 3d16ce7..efe8688 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDbVmConcurrentDispatchTest.java
@@ -33,14 +33,16 @@ public class MultiKahaDbVmConcurrentDispatchTest extends 
AbstractVmConcurrentDis
     private final boolean concurrentDispatch;
     private static boolean[] concurrentDispatchVals = booleanVals;
 
-      @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; 
ConcurrentDispatch:{2}")
+      @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; 
ConcurrentDispatch:{2}; UseTopic:{3}")
       public static Collection<Object[]> data() {
           List<Object[]> values = new ArrayList<>();
 
           for (MessageType mt : MessageType.values()) {
               for (boolean rmfVal : reduceMemoryFootPrintVals) {
                   for (boolean cdVal : concurrentDispatchVals) {
-                      values.add(new Object[] {mt, rmfVal, cdVal});
+                      for (boolean tpVal : useTopicVals) {
+                          values.add(new Object[] {mt, rmfVal, cdVal, tpVal});
+                      }
                   }
               }
           }
@@ -54,8 +56,8 @@ public class MultiKahaDbVmConcurrentDispatchTest extends 
AbstractVmConcurrentDis
      * @param concurrentDispatch
      */
     public MultiKahaDbVmConcurrentDispatchTest(MessageType messageType, 
boolean reduceMemoryFootPrint,
-            boolean concurrentDispatch) {
-        super(messageType, reduceMemoryFootPrint);
+            boolean concurrentDispatch, boolean useTopic) {
+        super(messageType, reduceMemoryFootPrint, useTopic);
         this.concurrentDispatch = concurrentDispatch;
     }
 
@@ -66,7 +68,11 @@ public class MultiKahaDbVmConcurrentDispatchTest extends 
AbstractVmConcurrentDis
         persistenceAdapter.setDirectory(dataFileDir.getRoot());
 
         KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
-        kahaStore.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
+        if (useTopic) {
+            kahaStore.setConcurrentStoreAndDispatchTopics(concurrentDispatch);
+        } else {
+            kahaStore.setConcurrentStoreAndDispatchQueues(concurrentDispatch);
+        }
 
         FilteredKahaDBPersistenceAdapter filtered = new 
FilteredKahaDBPersistenceAdapter();
         filtered.setPersistenceAdapter(kahaStore);

http://git-wip-us.apache.org/repos/asf/activemq/blob/a953f11d/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java
index d1b7e43..06e09be 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/leveldb/LevelDbVmConcurrentDispatchTest.java
@@ -31,13 +31,15 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class LevelDbVmConcurrentDispatchTest extends 
AbstractVmConcurrentDispatchTest {
 
-      @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}")
+      @Parameters(name="Type:{0}; ReduceMemoryFootPrint:{1}; UseTopic:{2}")
       public static Collection<Object[]> data() {
           List<Object[]> values = new ArrayList<>();
 
           for (MessageType mt : MessageType.values()) {
               for (boolean rmfVal : reduceMemoryFootPrintVals) {
-                  values.add(new Object[] {mt, rmfVal});
+                  for (boolean tpVal : useTopicVals) {
+                      values.add(new Object[] {mt, rmfVal, tpVal});
+                  }
               }
           }
 
@@ -49,8 +51,9 @@ public class LevelDbVmConcurrentDispatchTest extends 
AbstractVmConcurrentDispatc
      * @param reduceMemoryFootPrint
      * @param concurrentDispatch
      */
-    public LevelDbVmConcurrentDispatchTest(MessageType messageType, boolean 
reduceMemoryFootPrint) {
-        super(messageType, reduceMemoryFootPrint);
+    public LevelDbVmConcurrentDispatchTest(MessageType messageType, boolean 
reduceMemoryFootPrint,
+            boolean useTopic) {
+        super(messageType, reduceMemoryFootPrint, useTopic);
     }
 
     @Override

Reply via email to