Patrik Berglund wrote:
Hello,

I've just started to play with Qpid (M3) and the python client. The
company I work for is interested in using Qpid or Red Hat MRG Messaging
(instead of IBM MQ). We have read the AMQP 0-10 spec and the examples
and unit tests that are part of M3, but when writing or own tests we
haven't been able to figure out how to do a couple of things.

The test scenario is this, we just want to send a lot of messages
directly from application A to application B and never want to
experience any loss of messages. Our queue is currently declared like
this (python):

    session.queue_declare(queue="testQueue1",
                          durable=True,
                          auto_delete=False,
                          exclusive=True)
    session.exchange_bind(exchange="amq.direct",
                          queue="testQueue1",
                          binding_key="testQueue1")

Messages are transferred from application A like this:

    dprops = session.delivery_properties(routing_key="testQueue1")
    msg = qpid.datatypes.Message(dprops, "X" * 2**16)
    session.message_transfer(destination='amq.direct',
                             message=msg)

Both of our test applications are written in python and are running on
the same (multi-core) machine as the broker.

Now to my two newbie questions.

Q1. If messages are produced faster than the consumer can handle them,
    how do you configure flow control? Either some form of credit or
    something like a "rejected message" exception thrown when the
    producer transfer a message that overflows the queue, would be
    interesting for us.

M3, has few few things you can do, M4 has a few more...

http://cwiki.apache.org/qpid/cheat-sheet-for-configuring-queue-options.html

The reject and flow-to-disk options work in M3. The two ring options have been added for M4.

Basically you set the size policy and then a message and or byte limit when declaring the queue.


set "qpid.max_count" to value
set "qpid.max_size" to value
set "qpid.policy_type" to "reject" || "flow_to_disk" || "ring" || "ring_strict"


This can be done by setting the queue properties, of via qpid-config

qpid-config --help
Usage:  qpid-config [OPTIONS]
       qpid-config [OPTIONS] exchanges [filter-string]
       qpid-config [OPTIONS] queues    [filter-string]
qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]
       qpid-config [OPTIONS] del exchange <name>
       qpid-config [OPTIONS] add queue <name> [AddQueueOptions]
       qpid-config [OPTIONS] del queue <name>
qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key] qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]

Options:
-b [ --bindings ] Show bindings in queue or exchange list
   -a [ --broker-addr ] Address (localhost)  Address of qpidd broker
broker-addr is in the form: [username/[EMAIL PROTECTED] hostname | ip-address [:<port>] ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/[EMAIL PROTECTED]

Add Queue Options:
   --durable           Queue is durable
   --file-count N (8)  Number of files in queue's persistence journal
   --file-size  N (24) File size in pages (64Kib/page)
   --max-queue-size N  Maximum in-memory queue size as bytes
   --max-queue-count N Maximum in-memory queue size as a number of messages
--policy-type TYPE Action taken when queue limit is reached (reject, flow_to_disk, ring, ring_strict)

Add Exchange Options:
   --durable           Exchange is durable



Q2. What happen to messages sent to an exchange, that for some reason
    has not (yet) been declared and bound as above. In our example the
    messages just disappear.

    We would like to be able to setup (delivery-properties?) so that the
    message_transfer will throw an appropriate exception, and then the
    producer simply need to retry until the messages are not rejected.

    What is the correct way to achieve this?



The above can be done, but I think a safer pattern is to text the existence of the exchange before publishing. Then to make sure via message properties a binding exists when the message is transfered. This can be achieved I believe
using immediate flag on the deliever-properties of message transfer

so in your code above
- set arguments on queue declare with your size policy
- set immediate flag in the delivery properties on message transfer
- optionally test the exchange has been declared.

regards,
Carl.

Reply via email to