Hello! I'm using c# client to connect to a qpid c++ broker for publishing and receiving messages on a topic (amq.topic). Unfortunately, both publisher and listener sessions/connections are dropped after 3-5 minutes. No exceptions or events are generated by the c# client, so I wonder how to handle this situation: I want the listener to be connected forever to its topic. I've tried to change the connection timeout, but with no success. I created a simple publisher and a listener for reproducing the problem. The first c# console application publishes the lines read from the console, while the second application reads from the topic.
After a few minutes the two parts are not connected anymore and using the 'list queue' command in the qpid-tool utility I see that the queue has been destroyed. Any help is appreciated, bye Fcolle /*********************************************************************************/ //----------------------------------------------- // PUBLISHER CODE //----------------------------------------------- using System; using System.Configuration; using System.IO; using System.Text; using System.Threading; using org.apache.qpid.client; using org.apache.qpid.transport; namespace Publisher { /// /// This program is one of two programs designed to be used /// together. These programs use the topic exchange. /// /// Publisher: /// /// Publishes to a broker, specifying a routing key. /// /// Listener (this program): /// /// Reads from a queue on the broker using a message listener. /// /// class Listener { public static int _count = 1; private static void Main(string[] args) { string host = PublisherSettings.Default.Host; int port = PublisherSettings.Default.Port; string virtualhost = PublisherSettings.Default.VirtualHost; string username = PublisherSettings.Default.Username; string password = PublisherSettings.Default.Password; Client connection = new Client(); try { connection.Connect(host, port, virtualhost, username, password); IClientSession session = connection.CreateSession(50000); publishMessages(session, "myqueue.1"); noMoreMessages(session); connection.Close(); } catch (Exception e) { Console.WriteLine("Error: \n" + e.StackTrace); } } private static void publishMessages(IClientSession session, string routing_key) { string read = ""; do { IMessage message = new Message(); read = Console.ReadLine(); message.ClearData(); message.AppendData(Encoding.UTF8.GetBytes(read)); session.MessageTransfer("amq.topic", routing_key, message); } while (read != "End"); } private static void noMoreMessages(IClientSession session) { IMessage message = new Message(); // And send a syncrhonous final message to indicate termination. message.ClearData(); message.AppendData(Encoding.UTF8.GetBytes("End")); session.MessageTransfer("amq.topic", "control", message); session.Sync(); } } } /*********************************************************************************/ //----------------------------------------------- // LISTENER CODE //----------------------------------------------- using System; using System.Configuration; using System.IO; using System.Text; using System.Threading; using org.apache.qpid.client; using org.apache.qpid.transport; namespace Subscriber { /// /// This program is one of two programs designed to be used /// together. These programs use the topic exchange. /// /// Publisher: /// /// Publishes to a broker, specifying a routing key. /// /// Listener (this program): /// /// Reads from a queue on the broker using a message listener. /// /// class Listener { public static int _count = 1; private static void Main(string[] args) { string host = Settings1.Default.Host; int port = Settings1.Default.Port; string virtualhost = Settings1.Default.VirtualHost; string username = Settings1.Default.Username; string password = Settings1.Default.Password; Client connection = new Client(); try { connection.Connect(host, port, virtualhost, username, password); IClientSession session = connection.CreateSession(50000); //--------- Main body of program -------------------------------------------- lock (session) { Console.WriteLine("Listening for messages ..."); // Create a listener prepareQueue("myqueue", "myqueue.#", session); while (_count > 0) { Monitor.Wait(session); } } //--------------------------------------------------------------------------- connection.Close(); } catch (Exception e) { Console.WriteLine("Error: \n" + e.StackTrace); } } private static void prepareQueue(string queue, string routing_key, IClientSession session) { // Create a unique queue name for this consumer by concatenating // the queue name parameter with the Session ID. Console.WriteLine("Declaring queue: " + queue); session.QueueDeclare(queue, Option.EXCLUSIVE, Option.AUTO_DELETE); // Route messages to the new queue if they match the routing key. // Also route any messages to with the "control" routing key to // this queue so we know when it's time to stop. A publisher sends // a message with the content "That's all, Folks!", using the // "control" routing key, when it is finished. session.ExchangeBind(queue, "amq.topic", routing_key); session.ExchangeBind(queue, "amq.topic", "control"); // subscribe the listener to the queue IMessageListener listener = new MessageListener(session); session.AttachMessageListener(listener, queue); session.MessageSubscribe(queue); } } public class MessageListener : IMessageListener { private readonly IClientSession _session; private readonly RangeSet _range = new RangeSet(); public MessageListener(IClientSession session) { _session = session; } public void MessageTransfer(IMessage m) { BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8); byte[] body = new byte[m.Body.Length - m.Body.Position]; reader.Read(body, 0, body.Length); ASCIIEncoding enc = new ASCIIEncoding(); string message = enc.GetString(body); Console.WriteLine(DateTime.Now.ToString()+"\t-\tMessage:\t[" + message + "]\tfrom: " + m.Destination); // Add this message to the list of message to be acknowledged _range.Add(m.Id); if (message.Equals("End")) { Console.WriteLine("Shutting down listener for " + m.DeliveryProperties.GetRoutingKey()); Listener._count--; // Acknowledge all the received messages _session.MessageAccept(_range); lock (_session) { Monitor.Pulse(_session); } } } } } -- View this message in context: http://apache-qpid-users.2158936.n2.nabble.com/c-topic-publisher-and-listener-sessions-dropped-tp6272182p6272182.html Sent from the Apache Qpid users mailing list archive at Nabble.com. --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:users-subscr...@qpid.apache.org