Author: jonathan
Date: Wed Feb  9 18:58:12 2011
New Revision: 1069030

URL: http://svn.apache.org/viewvc?rev=1069030&view=rev
Log:
QPID-3040: The C++ messaging client library now releases pending messages when 
a Receiver is closed.

This only releases messages in the client's cache that have not been read. It 
does not release messages that have been read by the client application, but 
not acknowledged.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
    qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=1069030&r1=1069029&r2=1069030&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Wed Feb  9 
18:58:12 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -43,15 +43,15 @@ void ReceiverImpl::received(qpid::messag
         window = capacity;
     }
 }
-    
-qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout) 
+
+qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout)
 {
     qpid::messaging::Message result;
     if (!get(result, timeout)) throw NoMessageAvailable();
     return result;
 }
-    
-qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration 
timeout) 
+
+qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout)
 {
     qpid::messaging::Message result;
     if (!fetch(result, timeout)) throw NoMessageAvailable();
@@ -72,8 +72,8 @@ bool ReceiverImpl::fetch(qpid::messaging
     return f.result;
 }
 
-void ReceiverImpl::close() 
-{ 
+void ReceiverImpl::close()
+{
     execute<Close>();
 }
 
@@ -143,10 +143,10 @@ uint32_t ReceiverImpl::getUnsettled()
     return parent->getUnsettledAcks(destination);
 }
 
-ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, 
-                           const qpid::messaging::Address& a) : 
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
+                           const qpid::messaging::Address& a) :
 
-    parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), 
+    parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF),
     state(UNRESOLVED), capacity(0), window(0) {}
 
 bool ReceiverImpl::getImpl(qpid::messaging::Message& message, 
qpid::messaging::Duration timeout)
@@ -188,11 +188,13 @@ bool ReceiverImpl::fetchImpl(qpid::messa
     }
 }
 
-void ReceiverImpl::closeImpl() 
-{ 
+void ReceiverImpl::closeImpl()
+{
     sys::Mutex::ScopedLock l(lock);
     if (state != CANCELLED) {
         state = CANCELLED;
+        session.messageStop(destination);
+        parent->releasePending(destination);
         source->cancel(session, destination);
         parent->receiverCancelled(destination);
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1069030&r1=1069029&r2=1069030&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Wed Feb  9 
18:58:12 2011
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -186,7 +186,7 @@ struct SessionImpl::CreateReceiver : Com
 {
     qpid::messaging::Receiver result;
     const qpid::messaging::Address& address;
-    
+
     CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) :
         Command(i), address(a) {}
     void operator()() { result = impl.createReceiverImpl(address); }
@@ -212,7 +212,7 @@ struct SessionImpl::CreateSender : Comma
 {
     qpid::messaging::Sender result;
     const qpid::messaging::Address& address;
-    
+
     CreateSender(SessionImpl& i, const qpid::messaging::Address& a) :
         Command(i), address(a) {}
     void operator()() { result = impl.createSenderImpl(address); }
@@ -242,7 +242,7 @@ Sender SessionImpl::getSender(const std:
         throw KeyError(name);
     } else {
         return i->second;
-    }    
+    }
 }
 
 Receiver SessionImpl::getReceiver(const std::string& name) const
@@ -296,8 +296,8 @@ bool SessionImpl::getNextReceiver(Receiv
     }
 }
 
-bool SessionImpl::accept(ReceiverImpl* receiver, 
-                         qpid::messaging::Message* message, 
+bool SessionImpl::accept(ReceiverImpl* receiver,
+                         qpid::messaging::Message* message,
                          IncomingMessages::MessageTransfer& transfer)
 {
     if (receiver->getName() == transfer.getDestination()) {
@@ -359,7 +359,7 @@ bool SessionImpl::nextReceiver(qpid::mes
         } catch (const qpid::ConnectionException& e) {
             throw qpid::messaging::ConnectionError(e.what());
         } catch (const qpid::ChannelException& e) {
-            throw qpid::messaging::MessagingException(e.what());            
+            throw qpid::messaging::MessagingException(e.what());
         }
     }
 }
@@ -385,7 +385,7 @@ struct SessionImpl::Receivable : Command
 {
     const std::string* destination;
     uint32_t result;
-    
+
     Receivable(SessionImpl& i, const std::string* d) : Command(i), 
destination(d), result(0) {}
     void operator()() { result = impl.getReceivableImpl(destination); }
 };
@@ -414,7 +414,7 @@ struct SessionImpl::UnsettledAcks : Comm
 {
     const std::string* destination;
     uint32_t result;
-    
+
     UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), 
destination(d), result(0) {}
     void operator()() { result = impl.getUnsettledAcksImpl(destination); }
 };
@@ -451,10 +451,10 @@ void SessionImpl::rollbackImpl()
         getImplPtr<Receiver, ReceiverImpl>(i->second)->stop();
     }
     //ensure that stop has been processed and all previously sent
-    //messages are available for release:                   
+    //messages are available for release:
     session.sync();
     incoming.releaseAll();
-    session.txRollback();    
+    session.txRollback();
 
     for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) 
{
         getImplPtr<Receiver, ReceiverImpl>(i->second)->start();
@@ -495,6 +495,12 @@ void SessionImpl::receiverCancelled(cons
     incoming.releasePending(name);
 }
 
+void SessionImpl::releasePending(const std::string& name)
+{
+    ScopedLock l(lock);
+    incoming.releasePending(name);
+}
+
 void SessionImpl::senderCancelled(const std::string& name)
 {
     ScopedLock l(lock);
@@ -503,12 +509,12 @@ void SessionImpl::senderCancelled(const 
 
 void SessionImpl::reconnect()
 {
-    connection->open();    
+    connection->open();
 }
 
 bool SessionImpl::backoff()
 {
-    return connection->backoff();    
+    return connection->backoff();
 }
 
 qpid::messaging::Connection SessionImpl::getConnection() const

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=1069030&r1=1069029&r2=1069030&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Wed Feb  9 
18:58:12 2011
@@ -10,9 +10,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -79,8 +79,9 @@ class SessionImpl : public qpid::messagi
     void checkError();
     bool hasError();
 
-    bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, 
qpid::messaging::Duration timeout);    
+    bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, 
qpid::messaging::Duration timeout);
 
+    void releasePending(const std::string& destination);
     void receiverCancelled(const std::string& name);
     void senderCancelled(const std::string& name);
 
@@ -110,7 +111,7 @@ class SessionImpl : public qpid::messagi
         } catch (const qpid::ConnectionException& e) {
             throw qpid::messaging::ConnectionError(e.what());
         } catch (const qpid::ChannelException& e) {
-            throw qpid::messaging::MessagingException(e.what());            
+            throw qpid::messaging::MessagingException(e.what());
         }
     }
 
@@ -206,11 +207,11 @@ class SessionImpl : public qpid::messagi
     struct Acknowledge1 : Command
     {
         qpid::messaging::Message& message;
-        
+
         Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : 
Command(i), message(m) {}
         void operator()() { impl.acknowledgeImpl(message); }
     };
-    
+
     struct CreateSender;
     struct CreateReceiver;
     struct UnsettledAcks;
@@ -222,12 +223,12 @@ class SessionImpl : public qpid::messagi
         F f(*this);
         return execute(f);
     }
-    
+
     template <class F> void retry()
     {
         while (!execute<F>()) {}
     }
-    
+
     template <class F, class P> bool execute1(P p)
     {
         F f(*this, p);

Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1069030&r1=1069029&r2=1069030&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Wed Feb  9 18:58:12 
2011
@@ -404,7 +404,7 @@ struct QueueCreatePolicyFixture : public
 
     ~QueueCreatePolicyFixture()
     {
-        admin.deleteQueue(address.getName());    
+        admin.deleteQueue(address.getName());
     }
 };
 
@@ -448,7 +448,7 @@ struct ExchangeCreatePolicyFixture : pub
 
     ~ExchangeCreatePolicyFixture()
     {
-        admin.deleteExchange(address.getName());    
+        admin.deleteExchange(address.getName());
     }
 };
 
@@ -597,7 +597,7 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueu
     s1.close();
     Receiver r1 = fix.session.createReceiver(a1);
     r1.close();
-    
+
     std::string a2 = "q; {assert:receiver, node:{durable:true, 
x-declare:{arguments:{qpid.max-count:100}}}}";
     Sender s2 = fix.session.createSender(a2);
     s2.close();
@@ -711,7 +711,7 @@ QPID_AUTO_TEST_CASE(testOptionVerificati
 {
     MessagingFixture fix;
     fix.session.createReceiver("my-queue; {create: always, assert: always, 
delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: 
b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, 
reliability: exactly-once, x-subscribe: {arguments:{a:b}}, 
x-bindings:[{exchange: amq.fanout}]}, mode: browse}");
-    BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; 
{invalid-option:blah}"), qpid::messaging::AddressError);    
+    BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; 
{invalid-option:blah}"), qpid::messaging::AddressError);
 }
 
 QPID_AUTO_TEST_CASE(testReceiveSpecialProperties)
@@ -775,19 +775,48 @@ QPID_AUTO_TEST_CASE(testExclusiveSubscri
 QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser)
 {
     MessagingFixture fix;
-    
+
     std::string address =       "exclusive-queue; { create: receiver, node : { 
x-declare : { auto-delete: true, exclusive: true } } }";
     std::string browseAddress = "exclusive-queue; { mode: browse }";
 
     Receiver receiver = fix.session.createReceiver(address);
     fix.session.sync();
 
-    Connection c2 = fix.newConnection();    
+    Connection c2 = fix.newConnection();
     c2.open();
     Session s2 = c2.createSession();
-   
+
     BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress));
-    c2.close();    
+    c2.close();
+}
+
+
+QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages)
+{
+    MessagingFixture fix;
+    const uint capacity = 5;
+
+    Sender sender = 
fix.session.createSender("test.ex;{create:always,node:{type:topic}}");
+       Receiver receiver2 = 
fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}");
+       Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, 
delete:always,node:{type:queue, 
x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}");
+
+       receiver1.setCapacity(capacity);
+       receiver2.setCapacity(capacity*2);
+
+    Message out("test-message");
+    for (uint i = 0; i < capacity*2; ++i) {
+        sender.send(out);
+    }
+
+       receiver1.close();
+
+    // Make sure all pending messages were sent to the alternate
+    // exchange when the queue was deleted.
+    Message in;
+    for (uint i = 0; i < capacity*2; ++i) {
+        in = receiver2.fetch(Duration::SECOND * 5);
+        BOOST_CHECK_EQUAL(in.getContent(), out.getContent());
+    }
 }
 
 QPID_AUTO_TEST_CASE(testAuthenticatedUsername)
@@ -828,7 +857,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
         messages.push_back(msg);
     }
     const uint batch(10); //acknowledge first 10 messages only
-    for (uint i = 0; i < batch; ++i) {    
+    for (uint i = 0; i < batch; ++i) {
         other.acknowledge(messages[i]);
     }
     messages.clear();
@@ -836,7 +865,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
     other.close();
 
     other = fix.connection.createSession();
-    receiver = other.createReceiver(fix.queue);    
+    receiver = other.createReceiver(fix.queue);
     for (uint i = 0; i < (count-batch); ++i) {
         Message msg = receiver.fetch();
         BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % 
(i+1+batch)).str());
@@ -847,7 +876,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge)
 
     //check unacknowledged messages are still enqueued
     other = fix.connection.createSession();
-    receiver = other.createReceiver(fix.queue);    
+    receiver = other.createReceiver(fix.queue);
     for (uint i = 0; i < ((count-batch)/2); ++i) {
         Message msg = receiver.fetch();
         BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % 
((i*2)+1+batch)).str());



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to