Shan Wang wrote:
When the queue reaches its full capacity, and a c++ client tries to put
a message to the queue, the client execute() will throw an exception and
the connection will be closed by broker. After that, if the client tries
to call FailoverManager::connect() again to the same queue, it will
succeed but the memory increases about 2MB each time it connects.
Sometimes my application still have tens of thousands of messages when
the queue is full and this problem has caused my program to crash after
exhausting all the available memory.
The queue is declared as durable, and the client program is not far from
the replay_sender example. I tried to close the connection explicitly
before reconnecting, or close/flush/suspend the session associated to
the connection, but in the end I still have the same result. Can anyone
please advice me how can I avoid this memory consumption?
I suspect that you are queueing up messages in the MessageReplayTracker
that are from old sessions and thus will never be cleared. The isRetry
flag on FailoverManager::Command::execute() will only be set if the
FailoverManager itself re-executes the command due to a transport failure.
If you are explicitly re-executing the command after an explicit
application level error, then the FailoverManager will treat it is a new
command. Consequently if you are using the isRetry flag to determine
whether to replay messages or not (as the replaying _sender example
does), then the replay will not get called. This leaves messages from
the old session in the buffer and these won't be cleared as the command
ids will not match the completion information from the current session.
It will also mean that you have gaps in the sequence of messages (which
you are presumably wanting to avoid as you are using
MessageReplayTracker in the first place).
I hope the above explanation helps. The simple solution to your problem
(assuming I have diagnosed it correctly) is to replace:
if (isRetry) sender.replay(session);
else sender.init(session);
with just:
sender.replay(session);
Or
alternatively, is there anyway to tell from the client side if the queue
it's talking to is full or not, so I don't have to call connect() until
the queue is cleared, of course assuming this method won't cause the
same memory leak.
The exception thrown gives an indication of why the connection/session
was aborted.
When a configured limit on the in-memory queue is reached and the policy
is to reject sessions that try to publish more messages for example the
exception will be qpid::framing::ResourceLimitExceededException.
--Gordon.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]