Advisory Topics of ActiveMQ implementation using NMS (Dot net api for ActiveMQ) in C# application
http://activemq.apache.org/ This document highlights to get consumers, producers list attached to activemq using NMS (dot net Api for connectivity). Unlike JMS, NMS doesn't provide such information so I found some workaround for this. If system knows about Consumers died and active then it can avoid unnecessary message sending to dead consumers. First of All I give you idea of application in which I need it. There is an request response architecture in which client sends request to server through activemq Queues and keep this request in list for sending responses to these queues, but if any client goes down then this server component doesn't know about dead consumers and try sending responses to this consumers and activemq accumulated messages without consumers, which sometimes halt activemq if buffers gets full. So to accomplish this task I made changes in server component so that it becomes consumer for advisory topics (http://activemq.apache.org/advisory-message.html). Whenever any consumer attached to activemq, server component get notified with "ConsumerInfo" type of message and if any consumer dies then it gets notified "RemoveInfo" type of message. Below is code snippet how I completed this functionality: //Create Connection public class AdvisorySupport : IAdvisorySupport,IDisposable { private IConnection _connection; private ISession session; public event RemovedConsumersHandler RemovedConsumer; Dictionary _listofConsumers = new Dictionary(); public Dictionary ListofActiveConsumers { get { return _listofConsumers; } set { _listofConsumers = value; } } public AdvisorySupport() { CreateNMSFactory(); CreateConnection(); } private bool CreateNMSFactory() { connectionFactory = new ConnectionFactory(URI); return (null != connectionFactory); } private bool CreateConnection() { _connection= connectionFactory.CreateConnection() : _connection.Start(); SubscribeAdvisoryTopics(); return true; } internal bool CreateSession() { if (session == null) session = _connection.CreateSession(AcknowledgementMode.AutoAcknowledge); return session != null; } public void SubscribeAdvisoryTopics() { // ////Queue create & destroy // Subscribe("ActiveMQ.Advisory.Queue"); // ////Expired messages on a Queue // ////String='orignalMessageId' - the expired id //Subscribe("ActiveMQ.Advisory.Expired.Queue"); // //No consumer is available to process messages being sent on a Queue // Subscribe("ActiveMQ.Advisory.NoConsumer.Queue"); // ////Message consumed by a client : String='orignalMessageId' - the delivered id // Subscribe("ActiveMQ.Advisory.MessageConsumed.Queue"); // ////ActiveMQ.Advisory.Consumer.Queue Consumer start & stop messages on a Queue String='consumerCount' - the number of Consumers ConsumerInfo // Subscribe("ActiveMQ.Advisory.Consumer..>"); if (CreateSession()) Subscribe("ActiveMQ.Advisory.Consumer.Queue.>"); } private void Subscribe(string topicname) { ITopic topicForListenQueue = session.GetTopic(topicname); IMessageConsumer consumerForListenQueue = session.CreateConsumer(topicForListenQueue); consumerForListenQueue.Listener += new MessageListener(consumerForListenQueue_Listener); } void consumerForListenQueue_Listener(IMessage message) { if (message == null) throw new Connamara.ActiveMQPublisher.ActiveMQPublisherException("got null msg"); ActiveMQMessage aMsg = (ActiveMQMessage)message; if (((Apache.NMS.ActiveMQ.Commands.Message)(aMsg)).DataStructure is ConsumerInfo) { ConsumerInfo info = (ConsumerInfo)((Apache.NMS.ActiveMQ.Commands.Message)(aMsg)).DataStructure; if (!_listofConsumers.ContainsKey(info.ConsumerId.ToString())) _listofConsumers[info.ConsumerId.ToString()] = info; } else if (((Apache.NMS.ActiveMQ.Commands.Message)(aMsg)).DataStructure is RemoveInfo) { RemoveInfo info = (RemoveInfo)((Apache.NMS.ActiveMQ.Commands.Message)(aMsg)).DataStructure; if (_listofConsumers.ContainsKey(info.ObjectId.ToString())) _listofConsumers.Remove(info.ObjectId.ToString()); if (RemovedConsumer != null) RemovedConsumer(info.ObjectId.ToString()); } } #region IDisposable Members public void Dispose() { if (session != null) session.Close(); } #endregion } -- View this message in context: http://www.nabble.com/Get-Cosumers-of-queues-in-ActiveMQ-using-NMS-tp22903480p22903480.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.