Author: gsim
Date: Fri Jan 23 06:08:42 2009
New Revision: 737028

URL: http://svn.apache.org/viewvc?rev=737028&view=rev
Log:
QPID-1613: Ensure that the rule registered with the demuxer for LocalQueue 
subscriptions is removed when they are cancelled.


Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
    qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
    qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h?rev=737028&r1=737027&r2=737028&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/Subscription.h Fri Jan 23 06:08:42 2009
@@ -107,6 +107,8 @@
 
     /** Grant the specified amount of byte credit */
     void grantByteCredit(uint32_t);
+
+  friend class SubscriptionManager;
 };
 }} // namespace qpid::client
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp?rev=737028&r1=737027&r2=737028&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.cpp Fri Jan 23 
06:08:42 2009
@@ -145,5 +145,15 @@
     }
 }
 
+Demux::QueuePtr SubscriptionImpl::divert()
+{
+    demuxRule = std::auto_ptr<ScopedDivert>(new ScopedDivert(name, 
manager.getSession().getExecution().getDemux()));
+    return demuxRule->getQueue();
+}
+
+void SubscriptionImpl::cancelDiversion() { 
+    demuxRule.reset();
+}
+
 }} // namespace qpid::client
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h?rev=737028&r1=737027&r2=737028&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionImpl.h Fri Jan 23 06:08:42 
2009
@@ -25,10 +25,12 @@
 #include "qpid/client/SubscriptionSettings.h"
 #include "qpid/client/Session.h"
 #include "qpid/client/MessageListener.h"
+#include "qpid/client/Demux.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/SequenceSet.h"
 #include "qpid/sys/Mutex.h"
 #include "qpid/RefCounted.h"
+#include <memory>
 
 namespace qpid {
 namespace client {
@@ -93,7 +95,17 @@
     void grantCredit(framing::message::CreditUnit unit, uint32_t value);
 
     void received(Message&);
-    
+
+    /**
+     * Set up demux diversion for messages sent to this subscription
+     */
+    Demux::QueuePtr divert();
+    /**
+     * Cancel any demux diversion that may have been setup for this
+     * subscription
+     */
+    void cancelDiversion();
+
   private:
 
     mutable sys::Mutex lock;
@@ -102,6 +114,7 @@
     SubscriptionSettings settings;
     framing::SequenceSet unacquired, unaccepted;
     MessageListener* listener;
+    std::auto_ptr<ScopedDivert> demuxRule;
 };
 
 }} // namespace qpid::client

Modified: qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp?rev=737028&r1=737027&r2=737028&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SubscriptionManager.cpp Fri Jan 23 
06:08:42 2009
@@ -53,8 +53,8 @@
     LocalQueue& lq, const std::string& q, const SubscriptionSettings& ss, 
const std::string& n)
 {
     std::string name=n.empty() ? q:n;
-    lq.queue=session.getExecution().getDemux().add(name, ByTransferDest(name));
     boost::intrusive_ptr<SubscriptionImpl> si = new SubscriptionImpl(*this, q, 
ss, name, 0);
+    lq.queue=si->divert();
     si->subscribe();
     lq.subscription = Subscription(si.get());
     return subscriptions[name] = lq.subscription;
@@ -74,8 +74,14 @@
 
 void SubscriptionManager::cancel(const std::string& dest)
 {
-    sync(session).messageCancel(dest);
-    dispatcher.cancel(dest);
+    std::map<std::string, Subscription>::iterator i = subscriptions.find(dest);
+    if (i != subscriptions.end()) {
+        sync(session).messageCancel(dest);
+        dispatcher.cancel(dest);
+        Subscription s = i->second;
+        if (s.isValid()) subscriptions[dest].impl->cancelDiversion();
+        subscriptions.erase(dest);
+    }
 }
 
 void SubscriptionManager::setAutoStop(bool set) { autoStop=set; }

Modified: qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp?rev=737028&r1=737027&r2=737028&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/ClientSessionTest.cpp Fri Jan 23 06:08:42 2009
@@ -468,6 +468,25 @@
     BOOST_CHECK(!fix.subs.get(got, "queue-2"));
 }
 
+QPID_AUTO_TEST_CASE(testResubscribeWithLocalQueue) {
+    ClientSessionFixture fix;
+    fix.session.queueDeclare(arg::queue="some-queue", arg::exclusive=true, 
arg::autoDelete=true);
+    LocalQueue p, q;
+    fix.subs.subscribe(p, "some-queue");
+    fix.subs.cancel("some-queue");
+    fix.subs.subscribe(q, "some-queue");
+    
+    fix.session.messageTransfer(arg::content=Message("some-data", 
"some-queue"));
+    fix.session.messageFlush(arg::destination="some-queue");
+
+    Message got;
+    BOOST_CHECK(!p.get(got));
+
+    BOOST_CHECK(q.get(got));
+    BOOST_CHECK_EQUAL("some-data", got.getData());
+    BOOST_CHECK(!q.get(got));
+}
+
 
 QPID_AUTO_TEST_SUITE_END()
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to