[ 
https://issues.apache.org/jira/browse/AMQ-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher L. Shannon reassigned AMQ-6067:
-------------------------------------------

    Assignee: Christopher L. Shannon

> OutOfMemoryError when expiring big amount of topic messages
> -----------------------------------------------------------
>
>                 Key: AMQ-6067
>                 URL: https://issues.apache.org/jira/browse/AMQ-6067
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JDBC
>    Affects Versions: 5.10.0
>            Reporter: Petr Havránek
>            Assignee: Christopher L. Shannon
>              Labels: durable, durable_subscription, expiration, jdbc, 
> timeToLive,
>
> There is a problem in
> {noformat}
> org.apache.activemq.broker.region.Topic.expireMessagesTask
> {noformat}
> When there are big amount of topic messages that are going to be expired, 
> this {{expireMessagesTask}} loads all of the messages to memory. This causes
> {noformat}
> 2015-11-24 11:05:46.359 WARN  [ActiveMQ Broker[JmsEngineActivemqBroker] 
> Scheduler] [Topic] Failed to browse Topic: test-topic
> java.lang.OutOfMemoryError: Java heap space
>       at oracle.sql.BLOB.getBytes(BLOB.java:204)
>       at oracle.jdbc.driver.T4CBlobAccessor.getBytes(T4CBlobAccessor.java:464)
>       at 
> oracle.jdbc.driver.OracleResultSetImpl.getBytes(OracleResultSetImpl.java:676)
>       at 
> org.apache.commons.dbcp.DelegatingResultSet.getBytes(DelegatingResultSet.java:203)
>       at 
> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.getBinaryData(DefaultJDBCAdapter.java:80)
>       at 
> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecover(DefaultJDBCAdapter.java:418)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>       at java.lang.reflect.Method.invoke(Method.java:597)
>       at 
> org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)
>       at 
> org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
>       at 
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
>       at 
> org.springframework.aop.interceptor.AbstractTraceInterceptor.invoke(AbstractTraceInterceptor.java:113)
>       at 
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
>       at 
> org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
>       at $Proxy14.doRecover(Unknown Source)
>       at 
> org.apache.activemq.store.jdbc.JDBCMessageStore.recover(JDBCMessageStore.java:236)
>       at 
> org.apache.activemq.store.ProxyTopicMessageStore.recover(ProxyTopicMessageStore.java:62)
>       at org.apache.activemq.broker.region.Topic.doBrowse(Topic.java:594)
>       at org.apache.activemq.broker.region.Topic.access$100(Topic.java:65)
>       at org.apache.activemq.broker.region.Topic$6.run(Topic.java:733)
>       at 
> org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33)
>       at java.util.TimerThread.mainLoop(Timer.java:512)
>       at java.util.TimerThread.run(Timer.java:462)
> {noformat}
> The problem happens when using JDBC persistency with ActiveMQ 5.10.0. After a 
> short look to source code, the same problem could be also with 5.12.1.
> Test case:
> - run ActiveMQ broker with JDBC persistency
> - create subscription to a topic, but do not receive the messages
> - send enough number of messages with short TimeToLive
> - when expireMessagesTask is scheduled, it tries to load all of the messages 
> and causes the OutOfMemoryError
> It would be fine if
> {noformat}
> org.apache.activemq.store.jdbc.JDBCMessageStore.recover(MessageRecoveryListener)
> {noformat}
> will be updated like this:
> {code:java}
> public void recover(final MessageRecoveryListener listener) throws Exception {
>   // Get all the Message ids out of the database.
>   TransactionContext c = persistenceAdapter.getTransactionContext();
>   try {
>     c = persistenceAdapter.getTransactionContext();
>     adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
>       public boolean recoverMessage(long sequenceId, byte[] data) throws 
> Exception {
>         if (listener.hasSpace()) {
>           Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
>           msg.getMessageId().setBrokerSequenceId(sequenceId);
>           return listener.recoverMessage(msg);
>         } else {
>           logger.debug("Recovery limit of the messages has exceeded.");
>           return false;
>         }                    
>       }
>       public boolean recoverMessageReference(String reference) throws 
> Exception {
>         if (listener.hasSpace()) {
>           return listener.recoverMessageReference(new MessageId(reference));
>         } else {
>           logger.debug("Recovery limit of the message references has 
> exceeded.");
>           return false;
>         }
>       }
>     });
>   } catch (SQLException e) {
>     JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>     throw IOExceptionSupport.create("Failed to recover container. Reason: " + 
> e, e);
>   } finally {
>     c.close();
>   }
> }
> {code}
> But I am not sure if this limitation is the best way, because there will be 
> some messages that should be expired, but need to wait. So better solution 
> might be to do this job in more separated transactions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to