Author: gsim Date: Fri Jan 23 06:08:42 2009 New Revision: 737028 URL: http://svn.apache.org/viewvc?rev=737028&view=rev Log: QPID-1613: Ensure that the rule registered with the demuxer for LocalQueue subscriptions is removed when they are cancelled.
Modified: qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h?rev=737028&r1=737027&r2=737028&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h Fri Jan 23 06:08:42 2009 @@ -107,6 +107,8 @@ /** Grant the specified amount of byte credit */ void grantByteCredit(uint32_t); + + friend class SubscriptionManager; }; }} // namespace qpid::client Modified: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp?rev=737028&r1=737027&r2=737028&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp Fri Jan 23 06:08:42 2009 @@ -145,5 +145,15 @@ } } +Demux::QueuePtr SubscriptionImpl::divert() +{ + demuxRule = std::auto_ptr<ScopedDivert>(new ScopedDivert(name, manager.getSession().getExecution().getDemux())); + return demuxRule->getQueue(); +} + +void SubscriptionImpl::cancelDiversion() { + demuxRule.reset(); +} + }} // namespace qpid::client Modified: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h?rev=737028&r1=737027&r2=737028&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h Fri Jan 23 06:08:42 2009 @@ -25,10 +25,12 @@ #include "qpid/client/SubscriptionSettings.h" #include "qpid/client/Session.h" #include "qpid/client/MessageListener.h" +#include "qpid/client/Demux.h" #include "qpid/framing/enum.h" #include "qpid/framing/SequenceSet.h" #include "qpid/sys/Mutex.h" #include "qpid/RefCounted.h" +#include <memory> namespace qpid { namespace client { @@ -93,7 +95,17 @@ void grantCredit(framing::message::CreditUnit unit, uint32_t value); void received(Message&); - + + /** + * Set up demux diversion for messages sent to this subscription + */ + Demux::QueuePtr divert(); + /** + * Cancel any demux diversion that may have been setup for this + * subscription + */ + void cancelDiversion(); + private: mutable sys::Mutex lock; @@ -102,6 +114,7 @@ SubscriptionSettings settings; framing::SequenceSet unacquired, unaccepted; MessageListener* listener; + std::auto_ptr<ScopedDivert> demuxRule; }; }} // namespace qpid::client Modified: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=737028&r1=737027&r2=737028&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Fri Jan 23 06:08:42 2009 @@ -53,8 +53,8 @@ LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, const std::string& n) { std::string name=n.empty() ? q:n; - lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name)); boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, ss, name, 0); + lq.queue=si->divert(); si->subscribe(); lq.subscription = Subscription(si.get()); return subscriptions[name] = lq.subscription; @@ -74,8 +74,14 @@ void SubscriptionManager::cancel(const std::string& dest) { - sync(session).messageCancel(dest); - dispatcher.cancel(dest); + std::map<std::string, Subscription>::iterator i = subscriptions.find(dest); + if (i != subscriptions.end()) { + sync(session).messageCancel(dest); + dispatcher.cancel(dest); + Subscription s = i->second; + if (s.isValid()) subscriptions[dest].impl->cancelDiversion(); + subscriptions.erase(dest); + } } void SubscriptionManager::setAutoStop(bool set) { autoStop=set; } Modified: qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=737028&r1=737027&r2=737028&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Fri Jan 23 06:08:42 2009 @@ -468,6 +468,25 @@ BOOST_CHECK(!fix.subs.get(got, "queue-2")); } +QPID_AUTO_TEST_CASE(testResubscribeWithLocalQueue) { + ClientSessionFixture fix; + fix.session.queueDeclare(arg::queue="some-queue", arg::exclusive=true, arg::autoDelete=true); + LocalQueue p, q; + fix.subs.subscribe(p, "some-queue"); + fix.subs.cancel("some-queue"); + fix.subs.subscribe(q, "some-queue"); + + fix.session.messageTransfer(arg::content=Message("some-data", "some-queue")); + fix.session.messageFlush(arg::destination="some-queue"); + + Message got; + BOOST_CHECK(!p.get(got)); + + BOOST_CHECK(q.get(got)); + BOOST_CHECK_EQUAL("some-data", got.getData()); + BOOST_CHECK(!q.get(got)); +} + QPID_AUTO_TEST_SUITE_END() --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org