The problem I have 2 consumers of one queue and each consumer only seems to
get 1/2 the messages.
Here is some my consumer code.
The concept is easy I browse the queue and remove any messages that belonged
to my application
based on an id contained in the message. Then I acknowledge which should
dequeue it (but it did not)
Later I tried the ConsumerReceiveThread() which just does a Receive which
dequeues the items but only gets 1/2 the items sent..
In my initialization I started comsuming the queue with using
AcknowledgementMode.IndividualAcknowledge session flag which I had hoped
would allow me
to remove only the items I was interested in.
Why would I get only 1/2 the items? Also would I only get 1/3 the items if I
had 3 consumers? (I guess I should just try that)
HELP ideas? thanks
//////////// Create the queue
Uri connecturi = new Uri(ActiveMQURL.Text);
Apache.NMS.ActiveMQ.ConnectionFactory connect = new
Apache.NMS.ActiveMQ.ConnectionFactory(connecturi);
connection = connect.CreateConnection();
connection.Start();
session =
connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
destination = SessionUtil.GetDestination(session, QueueName.Text);
// Create a consumer
consumer = session.CreateConsumer(destination);
///////// Consumer thread.
public void ConsumerBrowserThread()
{
String rec;
String idtext = "id=\"";
idtext = idtext + f.ID.Text + "\"";
StayAlive = true;
while (StayAlive)
{
IQueue q = SessionUtil.GetQueue(f.session, f.QueueName.Text);
IQueueBrowser qb = f.session.CreateBrowser((IQueue)f.destination);
System.Collections.IEnumerator me = qb.GetEnumerator();
while (me.MoveNext())
{
IMessage msg = (IMessage)me.Current;
if(msg == null)
continue;
ITextMessage message = (ITextMessage)msg;
rec = message.Text;
if (rec.Contains(idtext))
{
f.MessageRecieved.Text += rec;
msg.Acknowledge();
}
}
qb.Close();
}
Thread.Sleep(1000);
}
public void ConsumerReceiveThread()
{
String rec;
String idtext = "id=\"";
idtext = idtext + f.ID.Text + "\"";
StayAlive = true;
while (StayAlive)
{
System.TimeSpan ts = new System.TimeSpan(0,0,1);
IMessage msg = f.consumer.Receive(ts);
if(msg != null)
{
ITextMessage message = (ITextMessage)msg;
rec = message.Text;
if (rec.Contains(idtext))
{
f.MessageRecieved.Text += rec;
msg.Acknowledge();
}
}
Thread.Sleep(1000);
}
}
--
View this message in context:
http://activemq.2283324.n4.nabble.com/Newbie-questions-on-NMS-or-CMS-ActiveMQ-consumers-puzzle-tp2403331p2403331.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.