Do you get the same sort of behavior if you point your pub/sub client to an
external broker? 

Joe
www.ttmsolutions.com


Aaron Pieper wrote:
> 
> I'm encountering what appears to be a memory leak in the BrokerService.
> The symptom is that the BrokerService's memory usage increases with each
> message that is sent to a topic, as though each message is being stored
> permanently in memory. 
> 
>  
> 
> I've included an example which demonstrates the issue. If you run the
> included BrokerMemoryLeak.java, you should see the following output:
> 
>  
> 
> Memory Usage:     0     Memory Percent:   0     Send count: 0     
> 
> Memory Usage:     16016576    Memory Percent:   23    Send count: 16    
> 
> Memory Usage:     38039368    Memory Percent:   56    Send count: 38    
> 
> Memory Usage:     61063196    Memory Percent:   90    Send count: 61    
> 
> Memory Usage:     68070448    Memory Percent:   101   Send count: 68    
> 
> Memory Usage:     68070448    Memory Percent:   101   Send count: 68    
> 
> Memory Usage:     68070448    Memory Percent:   101   Send count: 68
> 
>  
> 
> BrokerService's memory usage climbs steadily until the memory percent
> hits 100. Then, messages stop being sent. There are several ways to make
> the bug stop happening:
> 
>   * Switch from using a Topic to a Queue
> 
>   * Switch the Broker address to 'vm://foo', so it's not using TCP
> 
>   * Set the third argument in the 'createConsumer' call to false. (the
> noLocal argument)
> 
>   * Don't register the MessageListener
> 
>  
> 
> I've witnessed this behavior both with ActiveMQ 5.1.0 and the
> 5.2-SNAPSHOT version available as of August 4, 2008. I'm using Spring
> 2.5.4. Since two-way traffic isn't an issue for this application, I can
> fix the issue by setting 'noLocal' to false. However, I wasn't sure
> whether I should submit a JIRA tracker for this, or whether I'm doing
> something wrong.
> 
>  
> 
> Thanks,
> 
>  
> 
> - Aaron Pieper
> 
>  
> 
> --------------------------------
> 
>  
> 
> import java.util.HashMap;
> 
> import java.util.Timer;
> 
> import java.util.TimerTask;
> 
>  
> 
> import javax.jms.Connection;
> 
> import javax.jms.Message;
> 
> import javax.jms.MessageConsumer;
> 
> import javax.jms.MessageListener;
> 
> import javax.jms.Session;
> 
> import javax.jms.Topic;
> 
>  
> 
> import org.apache.activemq.ActiveMQConnectionFactory;
> 
> import org.apache.activemq.ActiveMQSession;
> 
> import org.apache.activemq.broker.BrokerService;
> 
> import org.apache.activemq.usage.MemoryUsage;
> 
> import org.springframework.jms.core.JmsTemplate;
> 
>  
> 
> public class BrokerMemoryLeak {
> 
>       private int                   sendCount;
> 
>       private BrokerService   broker      = new BrokerService();
> 
>  
> 
>       public static void main(String[] args) throws Exception {
> 
>             new BrokerMemoryLeak().run();
> 
>       }
> 
>  
> 
>       private void run() throws Exception {
> 
>             broker.addConnector("tcp://localhost:8192");
> 
>             broker.setPersistent(false);
> 
>             broker.start();
> 
>  
> 
>             ActiveMQConnectionFactory connectionFactory = (new
> ActiveMQConnectionFactory("tcp://localhost:8192"));
> 
>             Connection connection =
> connectionFactory.createConnection();
> 
>             connection.start();
> 
>  
> 
>             ActiveMQSession session = (ActiveMQSession)
> connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> 
>             Topic topic = session.createTopic("foo");
> 
>             MessageConsumer messageConsumer =
> session.createConsumer(topic, null, true);
> 
>  
> 
>             messageConsumer.setMessageListener(new MessageListener() {
> 
>                   public void onMessage(Message message) {}
> 
>             });
> 
>  
> 
>             TimerTask statusTask = new TimerTask() {
> 
>                   @Override
> 
>                   public void run() {
> 
>                         StringBuffer buf = new StringBuffer();
> 
>                         MemoryUsage memoryUsage =
> broker.getSystemUsage().getMemoryUsage();
> 
>                         buf.append("Memory
> Usage:\t").append(memoryUsage.getUsage()).append("\t");
> 
>                         buf.append("Memory
> Percent:\t").append(memoryUsage.getPercentUsage()).append("\t");
> 
>                         buf.append("Send
> count:\t").append(sendCount).append("\t");
> 
>                         System.out.println(buf);
> 
>                   }
> 
>             };
> 
>  
> 
>             new Timer().schedule(statusTask, 0, 1000);
> 
>  
> 
>             JmsTemplate template = new JmsTemplate();
> 
>             template.setConnectionFactory(connectionFactory);
> 
>             template.afterPropertiesSet();
> 
>  
> 
>             while (true) {
> 
>                   HashMap<String, Object> map = new HashMap<String,
> Object>();
> 
>                   map.put("1", new byte[1000000]);
> 
>                   template.convertAndSend(topic, map);
> 
>                   sendCount++;
> 
>                   Thread.sleep(1);
> 
>             }
> 
>       }
> 
> }
> 
> 
> 

-- 
View this message in context: 
http://www.nabble.com/Memory-leak-in-broker-when-subscribing-to-a-topic-using-TCP-connector-%2B-noLocal--tp18821110p18823693.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to