[ https://issues.apache.org/jira/browse/AMQ-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Christopher L. Shannon reassigned AMQ-6067: ------------------------------------------- Assignee: Christopher L. Shannon > OutOfMemoryError when expiring big amount of topic messages > ----------------------------------------------------------- > > Key: AMQ-6067 > URL: https://issues.apache.org/jira/browse/AMQ-6067 > Project: ActiveMQ > Issue Type: Bug > Components: JDBC > Affects Versions: 5.10.0 > Reporter: Petr Havránek > Assignee: Christopher L. Shannon > Labels: durable, durable_subscription, expiration, jdbc, > timeToLive, > > There is a problem in > {noformat} > org.apache.activemq.broker.region.Topic.expireMessagesTask > {noformat} > When there are big amount of topic messages that are going to be expired, > this {{expireMessagesTask}} loads all of the messages to memory. This causes > {noformat} > 2015-11-24 11:05:46.359 WARN [ActiveMQ Broker[JmsEngineActivemqBroker] > Scheduler] [Topic] Failed to browse Topic: test-topic > java.lang.OutOfMemoryError: Java heap space > at oracle.sql.BLOB.getBytes(BLOB.java:204) > at oracle.jdbc.driver.T4CBlobAccessor.getBytes(T4CBlobAccessor.java:464) > at > oracle.jdbc.driver.OracleResultSetImpl.getBytes(OracleResultSetImpl.java:676) > at > org.apache.commons.dbcp.DelegatingResultSet.getBytes(DelegatingResultSet.java:203) > at > org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.getBinaryData(DefaultJDBCAdapter.java:80) > at > org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecover(DefaultJDBCAdapter.java:418) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) > at > org.springframework.aop.interceptor.AbstractTraceInterceptor.invoke(AbstractTraceInterceptor.java:113) > at > org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) > at > org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202) > at $Proxy14.doRecover(Unknown Source) > at > org.apache.activemq.store.jdbc.JDBCMessageStore.recover(JDBCMessageStore.java:236) > at > org.apache.activemq.store.ProxyTopicMessageStore.recover(ProxyTopicMessageStore.java:62) > at org.apache.activemq.broker.region.Topic.doBrowse(Topic.java:594) > at org.apache.activemq.broker.region.Topic.access$100(Topic.java:65) > at org.apache.activemq.broker.region.Topic$6.run(Topic.java:733) > at > org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33) > at java.util.TimerThread.mainLoop(Timer.java:512) > at java.util.TimerThread.run(Timer.java:462) > {noformat} > The problem happens when using JDBC persistency with ActiveMQ 5.10.0. After a > short look to source code, the same problem could be also with 5.12.1. > Test case: > - run ActiveMQ broker with JDBC persistency > - create subscription to a topic, but do not receive the messages > - send enough number of messages with short TimeToLive > - when expireMessagesTask is scheduled, it tries to load all of the messages > and causes the OutOfMemoryError > It would be fine if > {noformat} > org.apache.activemq.store.jdbc.JDBCMessageStore.recover(MessageRecoveryListener) > {noformat} > will be updated like this: > {code:java} > public void recover(final MessageRecoveryListener listener) throws Exception { > // Get all the Message ids out of the database. > TransactionContext c = persistenceAdapter.getTransactionContext(); > try { > c = persistenceAdapter.getTransactionContext(); > adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() { > public boolean recoverMessage(long sequenceId, byte[] data) throws > Exception { > if (listener.hasSpace()) { > Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); > msg.getMessageId().setBrokerSequenceId(sequenceId); > return listener.recoverMessage(msg); > } else { > logger.debug("Recovery limit of the messages has exceeded."); > return false; > } > } > public boolean recoverMessageReference(String reference) throws > Exception { > if (listener.hasSpace()) { > return listener.recoverMessageReference(new MessageId(reference)); > } else { > logger.debug("Recovery limit of the message references has > exceeded."); > return false; > } > } > }); > } catch (SQLException e) { > JDBCPersistenceAdapter.log("JDBC Failure: ", e); > throw IOExceptionSupport.create("Failed to recover container. Reason: " + > e, e); > } finally { > c.close(); > } > } > {code} > But I am not sure if this limitation is the best way, because there will be > some messages that should be expired, but need to wait. So better solution > might be to do this job in more separated transactions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)