There was some recent discussion very similar to this on the group. Are you by any chance using the pre-0.10 .NET client? If so, I recommend you stop, wait for Qpid 0.10 (which should be released shortly) and use the new .NET bindings.
-Steve -- Steve Huston, Riverace Corporation Total Lifecycle Support for Your Networked Applications http://www.riverace.com > -----Original Message----- > From: fcolle [mailto:fco...@tiscali.it] > Sent: Thursday, April 14, 2011 6:02 AM > To: users@qpid.apache.org > Subject: c# topic publisher and listener sessions dropped > > > Hello! > I'm using c# client to connect to a qpid c++ broker for > publishing and receiving messages on a topic (amq.topic). > Unfortunately, both publisher and listener > sessions/connections are dropped after 3-5 minutes. No > exceptions or events are generated by the c# client, so I > wonder how to handle this situation: I want the listener to > be connected forever to its topic. I've tried to change the > connection timeout, but with no success. > > I created a simple publisher and a listener for reproducing > the problem. The first c# console application publishes the > lines read from the console, while the second application > reads from the topic. > > After a few minutes the two parts are not connected anymore > and using the 'list queue' command in the qpid-tool utility I > see that the queue has been destroyed. > > Any help is appreciated, > bye Fcolle > > > /************************************************************* > ********************/ > //----------------------------------------------- > // PUBLISHER CODE > //----------------------------------------------- > > using System; > using System.Configuration; > using System.IO; > using System.Text; > using System.Threading; > using org.apache.qpid.client; > using org.apache.qpid.transport; > > namespace Publisher > { > > /// > /// This program is one of two programs designed to be used > /// together. These programs use the topic exchange. > /// > /// Publisher: > /// > /// Publishes to a broker, specifying a routing key. > /// > /// Listener (this program): > /// > /// Reads from a queue on the broker using a message listener. > /// > /// > class Listener > { > public static int _count = 1; > > private static void Main(string[] args) > { > string host = PublisherSettings.Default.Host; > int port = PublisherSettings.Default.Port; > string virtualhost = PublisherSettings.Default.VirtualHost; > string username = PublisherSettings.Default.Username; > string password = PublisherSettings.Default.Password; > > Client connection = new Client(); > try > { > connection.Connect(host, port, virtualhost, > username, password); > IClientSession session = > connection.CreateSession(50000); > > publishMessages(session, "myqueue.1"); > > noMoreMessages(session); > > > connection.Close(); > } > catch (Exception e) > { > Console.WriteLine("Error: \n" + e.StackTrace); > } > } > > private static void publishMessages(IClientSession > session, string > routing_key) > { > > string read = ""; > do > { > IMessage message = new Message(); > read = Console.ReadLine(); > message.ClearData(); > message.AppendData(Encoding.UTF8.GetBytes(read)); > session.MessageTransfer("amq.topic", > routing_key, message); > } while (read != "End"); > } > > private static void noMoreMessages(IClientSession session) > { > IMessage message = new Message(); > // And send a syncrhonous final message to > indicate termination. > message.ClearData(); > message.AppendData(Encoding.UTF8.GetBytes("End")); > session.MessageTransfer("amq.topic", "control", message); > session.Sync(); > } > } > } > > > > /************************************************************* > ********************/ > //----------------------------------------------- > // LISTENER CODE > //----------------------------------------------- > using System; > using System.Configuration; > using System.IO; > using System.Text; > using System.Threading; > using org.apache.qpid.client; > using org.apache.qpid.transport; > > namespace Subscriber > { > > /// > /// This program is one of two programs designed to be used > /// together. These programs use the topic exchange. > /// > /// Publisher: > /// > /// Publishes to a broker, specifying a routing key. > /// > /// Listener (this program): > /// > /// Reads from a queue on the broker using a message listener. > /// > /// > class Listener > { > public static int _count = 1; > > private static void Main(string[] args) > { > string host = Settings1.Default.Host; > int port = Settings1.Default.Port; > string virtualhost = Settings1.Default.VirtualHost; > string username = Settings1.Default.Username; > string password = Settings1.Default.Password; > > Client connection = new Client(); > try > { > connection.Connect(host, port, virtualhost, > username, password); > IClientSession session = > connection.CreateSession(50000); > > //--------- Main body of program > -------------------------------------------- > > lock (session) > { > Console.WriteLine("Listening for messages ..."); > // Create a listener > prepareQueue("myqueue", "myqueue.#", session); > while (_count > 0) > { > Monitor.Wait(session); > } > } > > > //------------------------------------------------------------ > --------------- > > connection.Close(); > } > catch (Exception e) > { > Console.WriteLine("Error: \n" + e.StackTrace); > } > } > > private static void prepareQueue(string queue, string > routing_key, IClientSession session) > { > // Create a unique queue name for this consumer by > concatenating > // the queue name parameter with the Session ID. > Console.WriteLine("Declaring queue: " + queue); > session.QueueDeclare(queue, Option.EXCLUSIVE, > Option.AUTO_DELETE); > > // Route messages to the new queue if they match > the routing key. > // Also route any messages to with the "control" > routing key to > // this queue so we know when it's time to stop. A > publisher sends > // a message with the content "That's all, > Folks!", using the > // "control" routing key, when it is finished. > > session.ExchangeBind(queue, "amq.topic", routing_key); > session.ExchangeBind(queue, "amq.topic", "control"); > > // subscribe the listener to the queue > IMessageListener listener = new MessageListener(session); > session.AttachMessageListener(listener, queue); > session.MessageSubscribe(queue); > } > } > > public class MessageListener : IMessageListener > { > private readonly IClientSession _session; > private readonly RangeSet _range = new RangeSet(); > > public MessageListener(IClientSession session) > { > _session = session; > } > > public void MessageTransfer(IMessage m) > { > BinaryReader reader = new BinaryReader(m.Body, > Encoding.UTF8); > byte[] body = new byte[m.Body.Length - m.Body.Position]; > reader.Read(body, 0, body.Length); > ASCIIEncoding enc = new ASCIIEncoding(); > string message = enc.GetString(body); > > Console.WriteLine(DateTime.Now.ToString()+"\t-\tMessage:\t[" > + message + "]\tfrom: " + m.Destination); > // Add this message to the list of message to be > acknowledged > _range.Add(m.Id); > if (message.Equals("End")) > { > Console.WriteLine("Shutting down listener for > " + m.DeliveryProperties.GetRoutingKey()); > Listener._count--; > // Acknowledge all the received messages > _session.MessageAccept(_range); > lock (_session) > { > Monitor.Pulse(_session); > } > } > } > } > } > > > > > > > > > -- > View this message in context: > http://apache-qpid-users.2158936.n2.nabble.com/c-topic-publish > er-and-listener-sessions-dropped-tp6272182p6272182.html > Sent from the Apache Qpid users mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > Apache Qpid - AMQP Messaging Implementation > Project: http://qpid.apache.org > Use/Interact: mailto:users-subscr...@qpid.apache.org > > --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:users-subscr...@qpid.apache.org