On 07/15/2013 07:05 PM, Fraser Adams wrote:
I'd have quite liked the option to be able to trigger message delivery to the alternate exchange when being automatically removed from a circular queue
That would be a fairly easy change (see attached patch if interested). On 07/15/2013 08:21 PM, Jimmy Jones wrote:
I don't think i'll now need it, but is there interest in a limit policy of sending to an alternate exchange?
This is similar in some ways with the functionality above, except that (if I understand it correctly) you would want the newly arriving messages to be rerouted, rather than the oldest (or lowest priority) messages(?). Not too difficult to implement either, I don't think.
Index: cpp/src/qpid/broker/LossyQueue.cpp =================================================================== --- cpp/src/qpid/broker/LossyQueue.cpp (revision 1503661) +++ cpp/src/qpid/broker/LossyQueue.cpp (working copy) @@ -52,7 +52,7 @@ QPID_LOG(debug, "purging " << name << ": current depth is [" << current << "], max depth is [" << settings.maxDepth << "], new message has size " << increment.getSize()); qpid::sys::Mutex::ScopedUnlock u(messageLock); //TODO: arguably we should try and purge expired messages first but that is potentially expensive - if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), MessageFunctor(), PURGE, false)) { + if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), boost::bind(&reroute, alternateExchange, _1), PURGE, false)) { if (mgmtObject) { mgmtObject->inc_discardsRing(1); if (brokerMgmtObject) Index: cpp/src/qpid/broker/Queue.cpp =================================================================== --- cpp/src/qpid/broker/Queue.cpp (revision 1503661) +++ cpp/src/qpid/broker/Queue.cpp (working copy) @@ -677,17 +677,6 @@ return new MessageFilter(); } - bool reroute(boost::shared_ptr<Exchange> e, const Message& m) - { - if (e) { - DeliverableMessage d(m, 0); - d.getMessage().clearTrace(); - e->routeWithAlternate(d); - return true; - } else { - return false; - } - } void moveTo(boost::shared_ptr<Queue> q, Message& m) { if (q) { @@ -1684,6 +1673,19 @@ mgmtObject->set_redirectSource(isSrc); } } + +bool Queue::reroute(boost::shared_ptr<Exchange> e, const Message& m) +{ + if (e) { + DeliverableMessage d(m, 0); + d.getMessage().clearTrace(); + e->routeWithAlternate(d); + return true; + } else { + return false; + } +} + Queue::QueueUsers::QueueUsers() : consumers(0), browsers(0), others(0), controller(false) {} void Queue::QueueUsers::addConsumer() { ++consumers; } void Queue::QueueUsers::addBrowser() { ++browsers; } Index: cpp/src/qpid/broker/Queue.h =================================================================== --- cpp/src/qpid/broker/Queue.h (revision 1503661) +++ cpp/src/qpid/broker/Queue.h (working copy) @@ -501,6 +501,9 @@ QPID_BROKER_EXTERN bool isRedirectSource() const { return redirectSource; } QPID_BROKER_EXTERN void setMgmtRedirectState( std::string peer, bool enabled, bool isSrc ); + //utility function + static bool reroute(boost::shared_ptr<Exchange> e, const Message& m); + friend class QueueFactory; }; }
--------------------------------------------------------------------- To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org For additional commands, e-mail: users-h...@qpid.apache.org