i run the request&response example on dotnet, the client send message all
the time, and the server reply message when recvied . but later i found the
c++ broker's memory increase all time. i think that maybe i did't acknowledge
message, i don't know how to ack it. please help me.
my code below:
namespace org.apache.qpid.example.requestresponse
{
internal class Client
{
private static void Main(string[] args)
{
string host = args.Length > 0 ? args[0] : "192.168.1.233";
int port = args.Length > 1 ? Convert.ToInt32(args[1]) : 5672;
client.Client connection = new client.Client();
try
{
connection.connect(host, port, "test", "guest", "guest");
ClientSession session = connection.createSession(50000);
IMessage request = new Message();
//--------- Main body of program
--------------------------------------------
// Create a response queue so the server can send us responses
// to our requests. Use the client's session ID as the name
// of the response queue.
string response_queue = "client" + session.Name;
// Use the name of the response queue as the routing key
session.queueDeclare(response_queue);
session.exchangeBind(response_queue, "amq.direct", response_queue);
// Each client sends the name of their own response queue so
// the service knows where to route messages.
request.DeliveryProperties.setRoutingKey("request");
request.MessageProperties.setReplyTo(new ReplyTo("amq.direct",
response_queue));
lock (session)
{
// Create a listener for the response queue and listen for response
messages.
Console.WriteLine("Activating response queue listener for: " +
response_queue);
IMessageListener listener = new ClientMessageListener(session);
session.attachMessageListener(listener, response_queue);
session.messageSubscribe(response_queue);
// Now send some requests ...
string strs = "sfsadfsdfasdf";
request.clearData();
request.appendData(Encoding.UTF8.GetBytes(strs));
session.messageTransfer("amq.direct", request);
Console.WriteLine("Waiting for all responses to arrive ...");
Monitor.Wait(session);
}
//---------------------------------------------------------------------------
connection.close();
}
catch (Exception e)
{
Console.WriteLine("Error: \n" + e.StackTrace);
}
}
}
public class ClientMessageListener : IMessageListener
{
private readonly ClientSession _session;
private readonly RangeSet _range = new RangeSet();
private int _counter;
public ClientMessageListener(ClientSession session)
{
_session = session;
}
public void messageTransfer(IMessage m)
{
_counter++;
BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8);
byte[] body = new byte[m.Body.Length - m.Body.Position];
reader.Read(body, 0, body.Length);
ASCIIEncoding enc = new ASCIIEncoding();
string message = enc.GetString(body);
_range.add(m.Id);
_session.messageAccept(_range);
IMessage response = new Message();
string response_queue = "client" + _session.Name;
response.DeliveryProperties.setRoutingKey("request");
response.MessageProperties.setReplyTo(new ReplyTo("amq.direct",
response_queue));
MemoryStream Str = new MemoryStream();
BinaryWriter Writer = new BinaryWriter(Str);
Writer.Write((Int16)9);
Writer.Write((Int32)18);
string sql = "SELECT SimpleData FROM playerbase where
szName='dddd';";
for (int i = 0; i < 1024; i++)
{
sql += "SELECT SimpleData FROM playerbase where szName='dddd';";
}
Writer.Write((Int16)sql.Length);
Writer.Write(sql.ToCharArray(), 0, sql.Length);
response.appendData(Str.ToArray());
_session.messageTransfer("amq.direct", response);
}
}
}
---------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------
class Server
{
static void Main(string[] args)
{
string host = "192.168.1.233";
int port = 5672;
client.Client connection = new client.Client();
try
{
connection.connect(host, port, "test", "uniqueeye", "uniqueeye");
ClientSession session = connection.createSession(50000);
//--------- Main body of program
--------------------------------------------
// Create a request queue for clients to use when making
// requests.
const string request_queue = "request";
// Use the name of the request queue as the routing key
session.queueDeclare(request_queue);
session.exchangeBind(request_queue, "amq.direct", request_queue);
lock (session)
{
// Create a listener and subscribe it to the request_queue
IMessageListener listener = new MessageListener(session);
session.attachMessageListener(listener, request_queue);
session.messageSubscribe(request_queue);
session.messageSetFlowMode("request", MessageFlowMode.CREDIT);
session.messageFlow("request", MessageCreditUnit.BYTE,
ClientSession.MESSAGE_FLOW_MAX_BYTES);
session.messageFlow("request", MessageCreditUnit.MESSAGE, -9900);
// Receive messages until all messages are received
Console.WriteLine("Waiting for requests");
Monitor.Wait(session);
}
//---------------------------------------------------------------------------
connection.close();
}
catch (Exception e)
{
Console.WriteLine("Error: \n" + e.StackTrace);
}
}
}
public class MessageListener : IMessageListener
{
private readonly ClientSession _session;
private readonly RangeSet _range = new RangeSet();
private int counter = 0;
public MessageListener(ClientSession session)
{
_session = session;
}
public void messageTransfer(IMessage request)
{
counter++;
Console.WriteLine(counter);
IMessage response = new Message();
response.clearData();
// Get routing key for response from the request's replyTo property
string routingKey;
if (request.MessageProperties.hasReplyTo())
{
routingKey = request.MessageProperties.getReplyTo().getRoutingKey();
response.DeliveryProperties.setRoutingKey(routingKey);
}
else
{
Console.WriteLine("Error: \n No routing key for request " + request);
return;
}
BinaryReader reader = new BinaryReader(request.Body, Encoding.UTF8);
byte[] body = new byte[request.Body.Length - request.Body.Position];
reader.Read(body, 0, body.Length);
ASCIIEncoding enc = new ASCIIEncoding();
string message = enc.GetString(body);
Console.WriteLine("Request: " + message);
string responseBody = message.ToUpper();
response.appendData(Encoding.UTF8.GetBytes(responseBody));
_session.messageTransfer("amq.direct", routingKey, response);
// Add this message to the list of message to be acknowledged
_range.add(request.Id);
_session.messageAccept(_range);
}
}
}
--
View this message in context:
http://n2.nabble.com/dotnet%EF%BC%8C-how-to-acknowledge-the-message---tp2662472p2662472.html
Sent from the Apache Qpid developers mailing list archive at Nabble.com.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]