Thanks Gordon. The issue was exactly what you said - I had set
no_ack=True. I pulled that code from one of the python unit tests
before learning about the various flags and what they do.
-Brent
Rajith Attapattu wrote:
Brent,
First up thanks for giving qpid a spin.
I noticed that you haven't signed up for the mailing list.
Unsubscribed emails will sit in a queue until someone explicitly
approves it.
It would be best if you can sign up as it will allow your emails to
get a speedy response and also tune into other qpid user related traffic.
Here are the details on how to sign up.
http://cwiki.apache.org/qpid/mailing-lists.html
Regards,
Rajith Attapattu
Red Hat
blog: http://rajith.2rlabs.com/
On Tue, Mar 4, 2008 at 3:56 AM, Gordon Sim <[EMAIL PROTECTED]
<mailto:[EMAIL PROTECTED]>> wrote:
Brent Villalobos wrote:
> This is my first time using QPID and I'm trying to write a simple
> producer/consumer setup.
Great; thanks for giving it a try! We'll be grateful for any
feedback on
what we can improve. Please feel free to keep asking questions and
we'll
do our best to get the answers to you.
> What am I doing wrong that all the messages in my queue are
wiped out
> after I consume one message?
The issue is that the broker is sending the client as many messages as
available and because acknowledgements are turned off
(no_ack=True), it
assumes that delivery is always successful so dequeues those messages
immediately.
The no_ack mode is really intended for cases where every client has
their own temporary queue bound to an exchange (i.e. a transient
pub sub
model).
You probably need to turn acking on in your case so that messages are
only dequeued when the application acknowledges their receipt:
reply = channel.basic_consume(queue="message_queue", no_ack=False)
or even just:
reply = channel.basic_consume(queue="message_queue")
as no_ack is False by default.
You then need to ack every message you receive:
channel.basic_ack(delivery_tag=msg.delivery_tag)
or to acknowledge all messages up to a particular message:
channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
This will also make your application tolerant of client failures.
Messages not yet acked will be redelivered once the client reconnects
and consumes from the queue again.
You can also control the prefetching by sending basic_qos commands to
the broker. This only has an effect when acking is turned on
though. e.g.
channel.basic_qos(prefetch_count=1)# or whatever value seems
appropriate
If you set it to 1 you have to acknowledge every message before
another
one is sent. This minimises redelivery in the event of a failure but
will also slow things down.
A final option with 0-8 is to use basic_get instead of basic_consume
(though in my view its a little more cumbersome). E.g.
reply = channel.basic_get(no_ack=False)
if (reply.method.name <http://reply.method.name> == "get_ok"):
print "Consumer gets this message: ", msg.content.body
channel.basic_ack(delivery_tag=reply.delivery_tag)
else:
print "Queue was empty"
fyi: The 0-10 version of the protocol which the AMQP WG has recently
voted through in its final form has a more powerful flow control
mechanism and unifies the functionality of get and consume.
Support for
that is currently in progress on trunk (c++ broker and c++, python &
java clients will be the first available, java broker following
shortly
after).