On 07/15/2013 02:01 PM, Jimmy Jones wrote:
Hi,
I've got a system which can sometimes be a bit bursty, which would exhaust system memory
if the queues were left unchecked. Therefore I've been using ring queues, which solve the
problem quite nicely, apart from what happens to the "excess" messages. Ideally
I'd like to buffer them to disk and process them at a later, quieter time. I've been
digging around and can see a few options:
1) 0.24 will have flow to disk, which would be perfect but sometimes my
messages are quite big (eg. 10MB) and this requires messages to be smaller than
a page. Is this limitation likely to be removed soon?
The old mechanism (removed in 0.20) was called 'flow to disk'. I prefer
to call the newer feature (to be released with 0.24) 'paging' or paged
queue.
Though it is true that the queues page size must be as large as the
largest message, you can configure that page size. So you could have
just a few pages allowed in memory per queue, but have each page be 10MB
(the page size is configured as a multiple of the platform page size).
As to whether it is likely that the implementation gets updated to allow
a message to span multiple pages... I'd say probably not. To be able to
dispatch the message in parts without having the entire thing in memory
would require a fair bit of work. And without that I don't see a great
advantage over just having bigger pages. (Unless I'm missing something?)
2) 0.24 allows a "backup engine" to take over a loaded queue (QPID-4650), but
this looks like it'd require a fair bit of legwork to implement said engine.
3) alternate-exchanges. These look pretty good for my needs, but I can't seem to get them to
work! From reading some documentation, I thought they'd good with a limit policy of reject -
MRG 2 Installation & Configuration guide, 4.8.2 says for an alternate exchange specified
for a queue: "Messages that are acquired and then rejected by a message consumer".
However if I run the test below, messages only get routed to the alternate exchange when the
queue is destroyed while containing messages, and not when messages are rejected because the
queue is full. Presumably calling Session::reject would cause it to go to the alternate
exchange, but should a limit policy of reject be the same?
The 'reject' policy is probably a little misleading given the other use
of reject. WHat a 'reject' policy actually does is raise an AMQP 0-10
exception when the limit is reached, which effectively ends the session.
Such messages are never routed to the alternate-exchange of the exchange
or the queue.
Having a client reject rather than accept a message is in fact entirely
different, despite the (confusing) similarity in name.
I have also just added a new policy that causes a queue to self destruct
when it reaches the preconfigured limit. That could possibly be of
interest in conjunction with an alternate-exchange. What would happen
would be that at the point the limit is reached, the queue will delete
itself, re-routing any orphaned messages to the alternate-exchange if
set. The deletion of the queue will result in any subscribing session
being terminated, but won't result in the publishers session hitting an
exception. The issue there however is that messages published while the
queue doesn't exist (i.e. before the subscriber re-establishes the
session and recreates it) would be dropped (unless of course there were
then no matching bindings in which case it would be rerouted to the
exchange's alternate-exchange).
I suspect having spelled that all out it won't be a terribly appealing
path...
--8<--
qpid-config add exchange headers test1
qpid-config add exchange headers test1-overflow
# drain for messages in normal case
./drain -f "normal; { create: receiver, node: {type: queue, x-declare: {exclusive:
True, alternate-exchange: 'test1-overflow', arguments: {'qpid.max_size': 1024,
'qpid.policy_type': 'reject'}}, x-bindings: [{exchange: test1, arguments:{x-match:any,
data-format: xyz}}]}}"
# drain for messages in overflow case
./drain -f "overflow; { create: receiver, node: {type: queue, x-declare: {exclusive:
True, arguments: {'qpid.max_size': 1024000, 'qpid.policy_type': 'ring'}}, x-bindings:
[{exchange: test1-overflow, arguments:{x-match:any, data-format: xyz}}]}}"
./spout --content test -c 5 --property data-format=xyz test1
# works as expected, messages received by normal drain
./spout --content test -c 5 --property data-format=xyz test1-overflow
# works as expected, messages received by overflow drain
# Now ctrl-c normal drain, and queue will remain
# Send loads of messages, fills up q1
./spout --content test -c 500 --property data-format=xyz test1
# Blocks... and no messages sent to overflow drain
qpid-config del queue normal --force
# now messages appear in overflow drain
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]