[
https://issues.apache.org/jira/browse/QPID-6735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14956500#comment-14956500
]
ASF subversion and git services commented on QPID-6735:
-------------------------------------------------------
Commit 1708561 from [~lorenz.quack] in branch 'java/trunk'
[ https://svn.apache.org/r1708561 ]
QPID-6735: [Java Broker] Refactor how persisted messages are loaded from disk.
Messages that are loaded from disk are immediately reflown to disk potentially
releasing the underlying QpidByteBuffer.
Add extra code to avoid reloading of message content on creation of chunk for
delivery by doing the chunking on a higher level.
Removed unused offset parameter from MessageContentSource#getContent(ByteBuffer
dst, int offset).
> [Java Broker] Out of memory when consuming messages in very large transaction
> -----------------------------------------------------------------------------
>
> Key: QPID-6735
> URL: https://issues.apache.org/jira/browse/QPID-6735
> Project: Qpid
> Issue Type: Bug
> Components: Java Broker
> Reporter: Rob Godfrey
> Assignee: Rob Godfrey
>
> The flow to disk mechanism for the Java Broker is not sufficient for dealing
> with the case where large (or large numbers of) flowed to disk messages are
> being sent from the broker to the client in a transaction. When sending the
> message it is retrieved from disk and memory allocated for this... however
> unless the housekeeping thread evicts it again it stays in memory until the
> transaction is committed.
> The following simple test program
> {code:java}
> public class Test
> {
> public static void main(String[] args) throws Exception
> {
> System.setProperty("qpid.amqp.version","0-9-1");
> AMQConnection conn = new AMQConnection("127.0.0.1","admin","admin",
> "foo","test");
> conn.start();
> Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
> Queue queue = session.createQueue("queue");
> final MessageProducer prod = session.createProducer(queue);
> for(int i = 0; i < 20000; i++)
> {
> final BytesMessage bytesMessage = session.createBytesMessage();
> bytesMessage.writeBytes(new byte[1024*1024]);
> prod.send(bytesMessage);
> }
> Session session2 = conn.createSession(true,
> Session.SESSION_TRANSACTED);
> MessageConsumer cons = session2.createConsumer(queue);
> while(cons.receive(1000) != null);
> session2.commit();
> conn.close();
> }
> }
> {code}
> with the following exception
> {noformat}
> ########################################################################
> #
> # Unhandled Exception java.lang.OutOfMemoryError: Direct buffer memory in
> Thread virtualhost-test-iopool-3
> #
> # Exiting
> #
> ########################################################################
> java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:658)
> at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
> at
> org.apache.qpid.bytebuffer.QpidByteBuffer.allocateDirect(QpidByteBuffer.java:464)
> at
> org.apache.qpid.bytebuffer.QpidByteBuffer.allocateDirectCollection(QpidByteBuffer.java:514)
> at
> org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore.getAllContent(AbstractBDBMessageStore.java:413)
> at
> org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore$StoredBDBMessage.getContentAsByteBuffer(AbstractBDBMessageStore.java:1100)
> at
> org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore$StoredBDBMessage.getContent(AbstractBDBMessageStore.java:1126)
> at
> org.apache.qpid.server.message.AbstractServerMessageImpl.getContent(AbstractServerMessageImpl.java:172)
> at
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl$MessageContentSourceBody.writePayload(ProtocolOutputConverterImpl.java:292)
> at org.apache.qpid.framing.AMQFrame.writePayload(AMQFrame.java:81)
> at
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl$CompositeAMQBodyBlock.writePayload(ProtocolOutputConverterImpl.java:513)
> at
> org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8.writeFrame(AMQPConnection_0_8.java:434)
> at
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeFrame(ProtocolOutputConverterImpl.java:465)
> at
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDeliveryUnchanged(ProtocolOutputConverterImpl.java:224)
> at
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDelivery(ProtocolOutputConverterImpl.java:148)
> at
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDelivery(ProtocolOutputConverterImpl.java:101)
> at
> org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeDeliver(ProtocolOutputConverterImpl.java:78)
> at
> org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8$WriteDeliverMethod.deliverToClient(AMQPConnection_0_8.java:1421)
> at
> org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8.sendToClient(ConsumerTarget_0_8.java:489)
> at
> org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8$AckConsumer.doSend(ConsumerTarget_0_8.java:293)
> at
> org.apache.qpid.server.consumer.AbstractConsumerTarget.sendNextMessage(AbstractConsumerTarget.java:211)
> at
> org.apache.qpid.server.consumer.AbstractConsumerTarget.processPending(AbstractConsumerTarget.java:63)
> at
> org.apache.qpid.server.protocol.v0_8.AMQChannel.processPending(AMQChannel.java:3757)
> at
> org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8.processPending(AMQPConnection_0_8.java:1549)
> at
> org.apache.qpid.server.transport.MultiVersionProtocolEngine.processPending(MultiVersionProtocolEngine.java:197)
> at
> org.apache.qpid.server.transport.NonBlockingConnection.doWork(NonBlockingConnection.java:220)
> at
> org.apache.qpid.server.transport.NetworkConnectionScheduler.processConnection(NetworkConnectionScheduler.java:121)
> at
> org.apache.qpid.server.transport.NetworkConnectionScheduler.access$000(NetworkConnectionScheduler.java:37)
> at
> org.apache.qpid.server.transport.NetworkConnectionScheduler$2.run(NetworkConnectionScheduler.java:102)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:724)
> {noformat}
> with the broker VM memory settings as follows
> {noformat}
> -Dvirtualhost.connectionThreadPool.maximum=20
> -Dvirtualhost.connectionThreadPool.minimum=16 -verbose:gc -Xmx2048m -Xms1024m
> -XX:MaxDirectMemorySize=3072m
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]