i run the request&response example on dotnet, the client send message all 
the time, and the server reply message when recvied .  but later i found the 
c++ broker's memory increase all time. i think that maybe i did't acknowledge 
message, i don't know how to ack it. please help me.

my code below:

namespace org.apache.qpid.example.requestresponse
{
  internal class Client
  {
    private static void Main(string[] args)
    {
      string host = args.Length > 0 ? args[0] : "192.168.1.233";
      int port = args.Length > 1 ? Convert.ToInt32(args[1]) : 5672;
      client.Client connection = new client.Client();
      try
      {
        connection.connect(host, port, "test", "guest", "guest");
        ClientSession session = connection.createSession(50000);
        IMessage request = new Message();

        //--------- Main body of program 
--------------------------------------------
        // Create a response queue so the server can send us responses
        // to our requests. Use the client's session ID as the name
        // of the response queue.
        string response_queue = "client" + session.Name;
        // Use the name of the response queue as the routing key
        session.queueDeclare(response_queue);
        session.exchangeBind(response_queue, "amq.direct", response_queue);

        // Each client sends the name of their own response queue so
        // the service knows where to route messages.
        request.DeliveryProperties.setRoutingKey("request");
        request.MessageProperties.setReplyTo(new ReplyTo("amq.direct", 
response_queue));

        lock (session)
        {
          // Create a listener for the response queue and listen for response 
messages.
          Console.WriteLine("Activating response queue listener for: " + 
response_queue);
          IMessageListener listener = new ClientMessageListener(session);
          session.attachMessageListener(listener, response_queue);
          session.messageSubscribe(response_queue);

          // Now send some requests ...
          string strs = "sfsadfsdfasdf";

          request.clearData();
          request.appendData(Encoding.UTF8.GetBytes(strs));
          session.messageTransfer("amq.direct", request);
          Console.WriteLine("Waiting for all responses to arrive ...");
          Monitor.Wait(session);
        }
        
//---------------------------------------------------------------------------

        connection.close();
      }
      catch (Exception e)
      {
        Console.WriteLine("Error: \n" + e.StackTrace);
      }
    }
  }

  public class ClientMessageListener : IMessageListener
  {
    private readonly ClientSession _session;
    private readonly RangeSet _range = new RangeSet();
    private int _counter;
    public ClientMessageListener(ClientSession session)
    {
      _session = session;
    }

    public void messageTransfer(IMessage m)
        {
            _counter++;
            BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8);
            byte[] body = new byte[m.Body.Length - m.Body.Position];
            reader.Read(body, 0, body.Length);
            ASCIIEncoding enc = new ASCIIEncoding();
            string message = enc.GetString(body);
            _range.add(m.Id);
            _session.messageAccept(_range);
            IMessage response = new Message();
            string response_queue = "client" + _session.Name;
            response.DeliveryProperties.setRoutingKey("request");
            response.MessageProperties.setReplyTo(new ReplyTo("amq.direct", 
response_queue));
            MemoryStream Str = new MemoryStream();
            BinaryWriter Writer = new BinaryWriter(Str);
            Writer.Write((Int16)9);
            Writer.Write((Int32)18);
            string sql = "SELECT SimpleData FROM playerbase where 
szName='dddd';";
            for (int i = 0; i < 1024; i++)
            {
              sql += "SELECT SimpleData FROM playerbase where szName='dddd';";
            }
            Writer.Write((Int16)sql.Length);
            Writer.Write(sql.ToCharArray(), 0, sql.Length);
            response.appendData(Str.ToArray());
            _session.messageTransfer("amq.direct", response);
        }
  }
}

---------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------

class Server
  {
    static void Main(string[] args)
    {
      string host = "192.168.1.233";
      int port = 5672;
      client.Client connection = new client.Client();
      try
      {
        connection.connect(host, port, "test", "uniqueeye", "uniqueeye");
        ClientSession session = connection.createSession(50000);

        //--------- Main body of program 
--------------------------------------------
        // Create a request queue for clients to use when making
        // requests.
        const string request_queue = "request";
        // Use the name of the request queue as the routing key
        session.queueDeclare(request_queue);
        session.exchangeBind(request_queue, "amq.direct", request_queue);

        lock (session)
        {
          // Create a listener and subscribe it to the request_queue      
          IMessageListener listener = new MessageListener(session);
          session.attachMessageListener(listener, request_queue);
          session.messageSubscribe(request_queue);
          session.messageSetFlowMode("request", MessageFlowMode.CREDIT);
          session.messageFlow("request", MessageCreditUnit.BYTE, 
ClientSession.MESSAGE_FLOW_MAX_BYTES);
          session.messageFlow("request", MessageCreditUnit.MESSAGE, -9900);
          // Receive messages until all messages are received
          Console.WriteLine("Waiting for requests");
          Monitor.Wait(session);
        }

        
//---------------------------------------------------------------------------

        connection.close();
      }
      catch (Exception e)
      {
        Console.WriteLine("Error: \n" + e.StackTrace);
      }
    }
  }

  public class MessageListener : IMessageListener
  {
    private readonly ClientSession _session;
    private readonly RangeSet _range = new RangeSet();
    private int counter = 0;
    public MessageListener(ClientSession session)
    {
      _session = session;
    }

    public void messageTransfer(IMessage request)
    {
      counter++;
      Console.WriteLine(counter);
      IMessage response = new Message();
      response.clearData();
      // Get routing key for response from the request's replyTo property
      string routingKey;
      if (request.MessageProperties.hasReplyTo())
      {
        routingKey = request.MessageProperties.getReplyTo().getRoutingKey();
        response.DeliveryProperties.setRoutingKey(routingKey);
      }
      else
      {
        Console.WriteLine("Error: \n No routing key for request " + request);
        return;
      }

      BinaryReader reader = new BinaryReader(request.Body, Encoding.UTF8);
      byte[] body = new byte[request.Body.Length - request.Body.Position];
      reader.Read(body, 0, body.Length);
      ASCIIEncoding enc = new ASCIIEncoding();
      string message = enc.GetString(body);
      Console.WriteLine("Request: " + message);

      string responseBody = message.ToUpper();
      response.appendData(Encoding.UTF8.GetBytes(responseBody));
      _session.messageTransfer("amq.direct", routingKey, response);

      // Add this message to the list of message to be acknowledged 
      _range.add(request.Id);
      _session.messageAccept(_range);
    }
  }
}

-- 
View this message in context: 
http://n2.nabble.com/dotnet%EF%BC%8C-how-to-acknowledge-the-message---tp2662472p2662472.html
Sent from the Apache Qpid developers mailing list archive at Nabble.com.


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to