Hi,
I'm experiencing a memory leak when using the client library on Windows (0.5 release) when sending a single request. From what debugging I've done so far it looks like the The AsynchIO object used by the TCPConnector is not being deleted. When the connection is being closed aio->queueForDeletion() is called from TCPConnector::closeInternal().. However in AsynchIO::queueForDeletion() aio.opsInProgress > 0 so the 'delete this' branch is not done. There are no more queueForDeletion() calls made on the aio object (after opsInProgress is 0) and so it never gets deleted. The ConnectionImpl object involved is not cleaned up either, the shared_ptr to it has a non-zero ref count but I'm guessing it's related to the AsynchIO object not being cleaned up.

There used to be some code in AsynchIO::completion() that seems like it would do the necessary delete. It was removed with code changes in https://issues.apache.org/jira/browse/QPID-1550
The extra handling in completion() was as follows..

-    // Lock released; ok to delete if all is done.
-    if (opsInProgress == 0 && queuedDelete)
-        delete this;

Should that code be re-introduced (I don't know the history nor the 'big picture'), or could this be due to my usage ? My reproduction code is as follows.. The CRT memory leak report gets printed when the test program exits, that's what lead me to aio allocation that isn't being freed.

#define _CRTDBG_MAP_ALLOC
#include <cstdlib>
#include <crtdbg.h>

#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/SubscriptionManager.h>
#include "qpid/sys/Time.h"

#include <iostream>

using namespace qpid::client;
using namespace qpid::framing;
using qpid::sys::TIME_SEC;

int main(int argc, char **argv)
{

   _CrtSetDbgFlag ( _CRTDBG_ALLOC_MEM_DF | _CRTDBG_LEAK_CHECK_DF );
   _CrtSetReportMode( _CRT_ERROR, _CRTDBG_MODE_DEBUG | _CRTDBG_MODE_WNDW);

   Connection connection;
   Message response;
   try
   {
      //qpid::TcpAddress::DEFAULT_PORT
      connection.open("10.39.170.224");
      Session session =  connection.newSession();

      // Create a response queue using the client's session id as the name of
      // the response queue
      string responseQueue = "x.client" + session.getId().getName();

      // Use the name of the response queue as the routing key as well
      session.queueDeclare(arg::queue=responseQueue);
      session.exchangeBind(arg::exchange="amq.direct",
            arg::queue=responseQueue, arg::bindingKey=responseQueue);

      // Setup a message using our response queue so the
      // the service knows where to route response messages.
      Message req;
      DeliveryProperties &deliveryProps = req.getDeliveryProperties();
      deliveryProps.setRoutingKey("x.request");

      MessageProperties &messageProps = req.getMessageProperties();
      messageProps.setReplyTo(ReplyTo("amq.direct", responseQueue));

      session.messageTransfer(arg::content=req, arg::destination="amq.direct", 
arg::acceptMode=0);

      // Create a listener for the response queue and listen for response 
messages.
      SubscriptionManager subsManager(session);
      if(!subsManager.get(response, responseQueue, TIME_SEC))
         std::cout << "Timed out waiting for a response" << std::endl;
   }
   catch(const std::exception& error)
   {
      std::cout << "exception thrown.." << std::endl
                << error.what() << std::endl;
   }

   return 0;
}

- David

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to