Patrik Berglund wrote:
Hello,
I've just started to play with Qpid (M3) and the python client. The
company I work for is interested in using Qpid or Red Hat MRG Messaging
(instead of IBM MQ). We have read the AMQP 0-10 spec and the examples
and unit tests that are part of M3, but when writing or own tests we
haven't been able to figure out how to do a couple of things.
The test scenario is this, we just want to send a lot of messages
directly from application A to application B and never want to
experience any loss of messages. Our queue is currently declared like
this (python):
session.queue_declare(queue="testQueue1",
durable=True,
auto_delete=False,
exclusive=True)
session.exchange_bind(exchange="amq.direct",
queue="testQueue1",
binding_key="testQueue1")
Messages are transferred from application A like this:
dprops = session.delivery_properties(routing_key="testQueue1")
msg = qpid.datatypes.Message(dprops, "X" * 2**16)
session.message_transfer(destination='amq.direct',
message=msg)
Both of our test applications are written in python and are running on
the same (multi-core) machine as the broker.
Now to my two newbie questions.
Q1. If messages are produced faster than the consumer can handle them,
how do you configure flow control? Either some form of credit or
something like a "rejected message" exception thrown when the
producer transfer a message that overflows the queue, would be
interesting for us.
M3, has few few things you can do, M4 has a few more...
http://cwiki.apache.org/qpid/cheat-sheet-for-configuring-queue-options.html
The reject and flow-to-disk options work in M3. The two ring options
have been added for M4.
Basically you set the size policy and then a message and or byte limit
when declaring the queue.
set "qpid.max_count" to value
set "qpid.max_size" to value
set "qpid.policy_type" to "reject" || "flow_to_disk" || "ring" ||
"ring_strict"
This can be done by setting the queue properties, of via qpid-config
qpid-config --help
Usage: qpid-config [OPTIONS]
qpid-config [OPTIONS] exchanges [filter-string]
qpid-config [OPTIONS] queues [filter-string]
qpid-config [OPTIONS] add exchange <type> <name>
[AddExchangeOptions]
qpid-config [OPTIONS] del exchange <name>
qpid-config [OPTIONS] add queue <name> [AddQueueOptions]
qpid-config [OPTIONS] del queue <name>
qpid-config [OPTIONS] bind <exchange-name> <queue-name>
[binding-key]
qpid-config [OPTIONS] unbind <exchange-name> <queue-name>
[binding-key]
Options:
-b [ --bindings ] Show bindings in queue or
exchange list
-a [ --broker-addr ] Address (localhost) Address of qpidd broker
broker-addr is in the form: [username/[EMAIL PROTECTED] hostname |
ip-address [:<port>]
ex: localhost, 10.1.1.7:10000, broker-host:10000,
guest/[EMAIL PROTECTED]
Add Queue Options:
--durable Queue is durable
--file-count N (8) Number of files in queue's persistence journal
--file-size N (24) File size in pages (64Kib/page)
--max-queue-size N Maximum in-memory queue size as bytes
--max-queue-count N Maximum in-memory queue size as a number of messages
--policy-type TYPE Action taken when queue limit is reached
(reject, flow_to_disk, ring, ring_strict)
Add Exchange Options:
--durable Exchange is durable
Q2. What happen to messages sent to an exchange, that for some reason
has not (yet) been declared and bound as above. In our example the
messages just disappear.
We would like to be able to setup (delivery-properties?) so that the
message_transfer will throw an appropriate exception, and then the
producer simply need to retry until the messages are not rejected.
What is the correct way to achieve this?
The above can be done, but I think a safer pattern is to text the
existence of the exchange before publishing. Then
to make sure via message properties a binding exists when the message is
transfered. This can be achieved I believe
using immediate flag on the deliever-properties of message transfer
so in your code above
- set arguments on queue declare with your size policy
- set immediate flag in the delivery properties on message transfer
- optionally test the exchange has been declared.
regards,
Carl.