Java JMS client ReplyTo memory leak
-----------------------------------
Key: QPID-3016
URL: https://issues.apache.org/jira/browse/QPID-3016
Project: Qpid
Issue Type: Bug
Components: Java Client
Affects Versions: 0.8
Environment: Fedora 14: 2.6.35.10-74.fc14.x86_64
Broker: qpidd (qpidc) version 0.8
Java Client 0.8
OpenJDK Runtime Environment (IcedTea6 1.9.3) (fedora-49.1.9.3.fc14-x86_64)
OpenJDK 64-Bit Server VM (build 19.0-b09, mixed mode)
Reporter: David Kellum
I'm using the Java 0.8 client JMS interface. Relevant Client code is below.
If I make async requests to a queue with setJMSReplyTo as temporary queue for
responses, the client process will run out of memory after about 3.2M
request/response message pairs. This is at 512MB max java heap, though growth
appears to be consistently linear. Note that I use a semaphore between request
and response to keep ~1000 unanswered requests open in the client and block
before sending more. Thus this should not be a matter of simply saturating the
client with unsent messages.
Here is the top of the jhat histogram from jmap heap dump shortly before client
runs out of memory:
class org.apache.qpid.collections.ReferenceMap$SoftRef 3253120
143137280
class org.apache.qpid.collections.ReferenceMap$Entry 3253120
117112320
class [Lorg.apache.qpid.collections.ReferenceMap$Entry; 1 67108880
class org.apache.qpid.transport.ReplyTo 3253120 61809280
Note that 3253120 appears to match the total number of request/replies
successfully processed by the client up to this point. Or in other words, its
leaking one ReplyTo object and associated (not so?) soft references per
request/response.
If instead I replace the temporary queue with a fixed response and drop use of
setJMSReplyTo(), the client works fine, no memory leak.
Below are more details when running with the temp response queue:
% qpid-config queues
Queue Name Attributes
======================================================================
TempQueued4051d9d-37d3-4306-a1fc-91b93f7082c8 auto-del excl
iudex-brutefuzzy-request --max-queue-size=100000
--limit-policy=reject
Client startup Log:
635 [main] INFO o.a.q.j.PropertiesFileInitialContextFactory - No Provider URL
specified.
726 [main] INFO o.a.qpid.client.AMQConnection -
Connection:amqp://qpid:********@default-client/default-vhost?brokerlist='tcp://localhost:5672'
973 [main] INFO o.a.q.c.p.AMQProtocolSession - Using ProtocolVersion for
Session:0-10
990 [main] INFO o.a.q.c.h.ClientMethodDispatcherImpl - New Method
Dispatcher:AMQProtocolSession[null]
1002 [main] INFO o.a.qpid.client.AMQConnection - Connecting with
ProtocolHandler Version:0-10
1150 [main] INFO o.a.qpid.client.AMQConnection - Connected with
ProtocolHandler Version:0-10
1192 [main] INFO o.a.qpid.client.AMQSession - Created
session:org.apache.qpid.client.AMQSession_0_10@1b7c63f
1280 [main] INFO o.a.q.c.BasicMessageProducer_0_10 - MessageProducer
org.apache.qpid.client.BasicMessageProducer_0_10@1727745 using publish mode :
ASYNC_PUBLISH_ALL
1503 [main] INFO o.a.qpid.client.AMQSession - Prefetching delayed existing
messages will not flow until requested via receive*() or setML().
1586 [main] INFO o.a.q.c.AMQSession.Dispatcher - Dispatcher-Channel-1 created
1586 [Dispatcher-Channel-1] INFO o.a.q.c.AMQSession.Dispatcher -
Dispatcher-Channel-1 started
Relevent client code:
public class Client
implements MessageListener, Closeable, ExceptionListener
{
public Client( JMSContext context )
throws JMSException, NamingException
{
_connection = context.createConnection();
_session = context.createSession( _connection );
Destination requestQueue =
context.lookupDestination( "iudex-brutefuzzy-request" );
_producer = _session.createProducer( requestQueue );
context.close();
_responseQueue = _session.createTemporaryQueue();
_session.createConsumer( _responseQueue ).setMessageListener(this);
_connection.start();
}
public void sendRequest( long simhash, boolean doAdd )
throws JMSException, InterruptedException
{
Builder bldr = Request.newBuilder();
bldr.setSimhash( simhash );
bldr.setAction( doAdd ? RequestAction.ADD : RequestAction.CHECK_ONLY );
BytesMessage response = _session.createBytesMessage();
response.writeBytes( bldr.build().toByteArray() );
if( _responseQueue != null ) {
response.setJMSReplyTo( _responseQueue );
}
_semaphore.acquire();
_producer.send( response );
}
@Override
public void onMessage( Message msg )
{
try {
//Handle response
msg.acknowledge();
_semaphore.release();
}
catch( JMSException x ) {
if( _log.isDebugEnabled() ) _log.error( "onMessage:", x );
else _log.error( "onMessage: {}", x.toString() );
}
}
private Session _session;
private MessageProducer _producer;
private boolean _createTemporaryResponseQueue = true;
private Destination _responseQueue = null;
private Connection _connection = null;
private final Semaphore _semaphore = new Semaphore( 1000 );
private Logger _log = LoggerFactory.getLogger( getClass() );
}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]