Hi all :) I am working with the binary version of ActiveMQ 4.0 broker and trying out the openwire cpp apis for asynchronous messaging. [https://svn.apache.org/repos/asf/incubator/activemq/tags/activemq-4.0/openw ire-cpp]


I wonder if I m resurrecting the mummies of issues already burnt.


I am trying out having 3 consumers A, B and C listening on the same queue.
What I am trying to do is -
        A, B, C sending on Q1, and consuming Z's response on Q2
Consumer Z listening on Q1, responding back on Q2.
        like this -
        A/B/C  >>  Q1  >>  Z  >>  Q2  >>  A/B/C
Listening/responding to a single consumer is working well at present. BUT broker is spoofing up the responses from Z to the simultaneous consumers (either 2 or all three). Response for one consumer (A) is going to any of the other consumer (B/C). Same is happening for other consumers. Being prefetch size preset to 1000, the consumer that first manages session with the broker on a queue is getting all the messages (and if it gets terminated, the following one hogs the all and so on.) . As I m at presently testing, setting prefetch size to less (say 1), even dont solves the purpose as not giving it frequest quick requests (man Vs machine). Moreover as the hogging consumer is reading and acknowledging all the responses, the prefetch size of even 1 is not surpassed. I tried out "with no success" the way of grid processing of messages (using MessageGroups) as suggested in
        http://activemq.org/site/how-do-i-preserve-order-of-messages.html
Code relevant of this is as follows -
        [........A/B/C....................................................
        producer = session->createProducer(Q1) ;
        producer->setPersistent(true) ;
        message = session->createBytesMessage() ;
        message->setJMSXGroupID("cheese") ;
        message->writeString("Hello World") ;
        producer->send(message);
.................................................................]

        [ .........Z ' s OnMessage(message)...............................
        p<string> NGid;
        NGid = message->getJMSXGroupID();
        producer = session->createProducer(Q2) ;
        producer->setPersistent(true) ;
        reqMessage = session->createBytesMessage() ;
        reqMessage->setJMSXGroupID(NGid->c_str() );
        reqMessage->writeString("R E C E I V E D") ;       //response string
        producer->send(reqMessage);
.................................................................] Is there anymore needed in the code that I m loosing?

I come to know that there are certain issues yet not resolved pertaining to the prefetch buffer initial size. Correct me please. Will manipulation of prefetch buffer size help my cause? Please suggest a way otherwise. HELP ME..

THANKS IN ADVANCE
Regards,
Navin

Reply via email to