Do you get the same sort of behavior if you point your pub/sub client to an
external broker? 


Aaron Pieper wrote:
> I'm encountering what appears to be a memory leak in the BrokerService.
> The symptom is that the BrokerService's memory usage increases with each
> message that is sent to a topic, as though each message is being stored
> permanently in memory. 
> I've included an example which demonstrates the issue. If you run the
> included, you should see the following output:
> Memory Usage:     0     Memory Percent:   0     Send count: 0     
> Memory Usage:     16016576    Memory Percent:   23    Send count: 16    
> Memory Usage:     38039368    Memory Percent:   56    Send count: 38    
> Memory Usage:     61063196    Memory Percent:   90    Send count: 61    
> Memory Usage:     68070448    Memory Percent:   101   Send count: 68    
> Memory Usage:     68070448    Memory Percent:   101   Send count: 68    
> Memory Usage:     68070448    Memory Percent:   101   Send count: 68
> BrokerService's memory usage climbs steadily until the memory percent
> hits 100. Then, messages stop being sent. There are several ways to make
> the bug stop happening:
>   * Switch from using a Topic to a Queue
>   * Switch the Broker address to 'vm://foo', so it's not using TCP
>   * Set the third argument in the 'createConsumer' call to false. (the
> noLocal argument)
>   * Don't register the MessageListener
> I've witnessed this behavior both with ActiveMQ 5.1.0 and the
> 5.2-SNAPSHOT version available as of August 4, 2008. I'm using Spring
> 2.5.4. Since two-way traffic isn't an issue for this application, I can
> fix the issue by setting 'noLocal' to false. However, I wasn't sure
> whether I should submit a JIRA tracker for this, or whether I'm doing
> something wrong.
> Thanks,
> - Aaron Pieper
> --------------------------------
> import java.util.HashMap;
> import java.util.Timer;
> import java.util.TimerTask;
> import javax.jms.Connection;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.Session;
> import javax.jms.Topic;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.ActiveMQSession;
> import;
> import org.apache.activemq.usage.MemoryUsage;
> import org.springframework.jms.core.JmsTemplate;
> public class BrokerMemoryLeak {
>       private int                   sendCount;
>       private BrokerService   broker      = new BrokerService();
>       public static void main(String[] args) throws Exception {
>             new BrokerMemoryLeak().run();
>       }
>       private void run() throws Exception {
>             broker.addConnector("tcp://localhost:8192");
>             broker.setPersistent(false);
>             broker.start();
>             ActiveMQConnectionFactory connectionFactory = (new
> ActiveMQConnectionFactory("tcp://localhost:8192"));
>             Connection connection =
> connectionFactory.createConnection();
>             connection.start();
>             ActiveMQSession session = (ActiveMQSession)
> connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>             Topic topic = session.createTopic("foo");
>             MessageConsumer messageConsumer =
> session.createConsumer(topic, null, true);
>             messageConsumer.setMessageListener(new MessageListener() {
>                   public void onMessage(Message message) {}
>             });
>             TimerTask statusTask = new TimerTask() {
>                   @Override
>                   public void run() {
>                         StringBuffer buf = new StringBuffer();
>                         MemoryUsage memoryUsage =
> broker.getSystemUsage().getMemoryUsage();
>                         buf.append("Memory
> Usage:\t").append(memoryUsage.getUsage()).append("\t");
>                         buf.append("Memory
> Percent:\t").append(memoryUsage.getPercentUsage()).append("\t");
>                         buf.append("Send
> count:\t").append(sendCount).append("\t");
>                         System.out.println(buf);
>                   }
>             };
>             new Timer().schedule(statusTask, 0, 1000);
>             JmsTemplate template = new JmsTemplate();
>             template.setConnectionFactory(connectionFactory);
>             template.afterPropertiesSet();
>             while (true) {
>                   HashMap<String, Object> map = new HashMap<String,
> Object>();
>                   map.put("1", new byte[1000000]);
>                   template.convertAndSend(topic, map);
>                   sendCount++;
>                   Thread.sleep(1);
>             }
>       }
> }

View this message in context:
Sent from the ActiveMQ - User mailing list archive at

Reply via email to