Thanks Gordon. The issue was exactly what you said - I had set no_ack=True. I pulled that code from one of the python unit tests before learning about the various flags and what they do.
-Brent

Rajith Attapattu wrote:
Brent,

First up thanks for giving qpid a spin.
I noticed that you haven't signed up for the mailing list.
Unsubscribed emails will sit in a queue until someone explicitly approves it. It would be best if you can sign up as it will allow your emails to get a speedy response and also tune into other qpid user related traffic.

Here are the details on how to sign up.
http://cwiki.apache.org/qpid/mailing-lists.html

Regards,

Rajith Attapattu
Red Hat
blog: http://rajith.2rlabs.com/

On Tue, Mar 4, 2008 at 3:56 AM, Gordon Sim <[EMAIL PROTECTED] <mailto:[EMAIL PROTECTED]>> wrote:

    Brent Villalobos wrote:
    > This is my first time using QPID and I'm trying to write a simple
    > producer/consumer setup.

    Great; thanks for giving it a try! We'll be grateful for any
    feedback on
    what we can improve. Please feel free to keep asking questions and
    we'll
    do our best to get the answers to you.

    > What am I doing wrong that all the messages in my queue are
    wiped out
    > after I consume one message?

    The issue is that the broker is sending the client as many messages as
    available and because acknowledgements are turned off
    (no_ack=True), it
    assumes that delivery is always successful so dequeues those messages
    immediately.

    The no_ack mode is really intended for cases where every client has
    their own temporary queue bound to an exchange (i.e. a transient
    pub sub
    model).

    You probably need to turn acking on in your case so that messages are
    only dequeued when the application acknowledges their receipt:

    reply = channel.basic_consume(queue="message_queue", no_ack=False)

    or even just:

    reply = channel.basic_consume(queue="message_queue")

    as no_ack is False by default.

    You then need to ack every message you receive:

    channel.basic_ack(delivery_tag=msg.delivery_tag)

    or to acknowledge all messages up to a particular message:

    channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)

    This will also make your application tolerant of client failures.
    Messages not yet acked will be redelivered once the client reconnects
    and consumes from the queue again.

    You can also control the prefetching by sending basic_qos commands to
    the broker. This only has an effect when acking is turned on
    though. e.g.

    channel.basic_qos(prefetch_count=1)# or whatever value seems
    appropriate

    If you set it to 1 you have to acknowledge every message before
    another
    one is sent. This minimises redelivery in the event of a failure but
    will also slow things down.

    A final option with 0-8 is to use basic_get instead of basic_consume
    (though in my view its a little more cumbersome). E.g.

    reply = channel.basic_get(no_ack=False)
    if (reply.method.name <http://reply.method.name> == "get_ok"):
       print "Consumer gets this message: ", msg.content.body
       channel.basic_ack(delivery_tag=reply.delivery_tag)
    else:
       print "Queue was empty"


    fyi: The 0-10 version of the protocol which the AMQP WG has recently
    voted through in its final form has a more powerful flow control
    mechanism and unifies the functionality of get and consume.
    Support for
    that is currently in progress on trunk (c++ broker and c++, python &
    java clients will be the first available, java broker following
    shortly
    after).


Reply via email to