Author: jonathan Date: Wed Feb 9 18:58:12 2011 New Revision: 1069030 URL: http://svn.apache.org/viewvc?rev=1069030&view=rev Log: QPID-3040: The C++ messaging client library now releases pending messages when a Receiver is closed.
This only releases messages in the client's cache that have not been read. It does not release messages that have been read by the client application, but not acknowledged. Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp?rev=1069030&r1=1069029&r2=1069030&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp Wed Feb 9 18:58:12 2011 @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -43,15 +43,15 @@ void ReceiverImpl::received(qpid::messag window = capacity; } } - -qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout) + +qpid::messaging::Message ReceiverImpl::get(qpid::messaging::Duration timeout) { qpid::messaging::Message result; if (!get(result, timeout)) throw NoMessageAvailable(); return result; } - -qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) + +qpid::messaging::Message ReceiverImpl::fetch(qpid::messaging::Duration timeout) { qpid::messaging::Message result; if (!fetch(result, timeout)) throw NoMessageAvailable(); @@ -72,8 +72,8 @@ bool ReceiverImpl::fetch(qpid::messaging return f.result; } -void ReceiverImpl::close() -{ +void ReceiverImpl::close() +{ execute<Close>(); } @@ -143,10 +143,10 @@ uint32_t ReceiverImpl::getUnsettled() return parent->getUnsettledAcks(destination); } -ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, - const qpid::messaging::Address& a) : +ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, + const qpid::messaging::Address& a) : - parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), + parent(&p), destination(name), address(a), byteCredit(0xFFFFFFFF), state(UNRESOLVED), capacity(0), window(0) {} bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::messaging::Duration timeout) @@ -188,11 +188,13 @@ bool ReceiverImpl::fetchImpl(qpid::messa } } -void ReceiverImpl::closeImpl() -{ +void ReceiverImpl::closeImpl() +{ sys::Mutex::ScopedLock l(lock); if (state != CANCELLED) { state = CANCELLED; + session.messageStop(destination); + parent->releasePending(destination); source->cancel(session, destination); parent->receiverCancelled(destination); } Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp?rev=1069030&r1=1069029&r2=1069030&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp Wed Feb 9 18:58:12 2011 @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -186,7 +186,7 @@ struct SessionImpl::CreateReceiver : Com { qpid::messaging::Receiver result; const qpid::messaging::Address& address; - + CreateReceiver(SessionImpl& i, const qpid::messaging::Address& a) : Command(i), address(a) {} void operator()() { result = impl.createReceiverImpl(address); } @@ -212,7 +212,7 @@ struct SessionImpl::CreateSender : Comma { qpid::messaging::Sender result; const qpid::messaging::Address& address; - + CreateSender(SessionImpl& i, const qpid::messaging::Address& a) : Command(i), address(a) {} void operator()() { result = impl.createSenderImpl(address); } @@ -242,7 +242,7 @@ Sender SessionImpl::getSender(const std: throw KeyError(name); } else { return i->second; - } + } } Receiver SessionImpl::getReceiver(const std::string& name) const @@ -296,8 +296,8 @@ bool SessionImpl::getNextReceiver(Receiv } } -bool SessionImpl::accept(ReceiverImpl* receiver, - qpid::messaging::Message* message, +bool SessionImpl::accept(ReceiverImpl* receiver, + qpid::messaging::Message* message, IncomingMessages::MessageTransfer& transfer) { if (receiver->getName() == transfer.getDestination()) { @@ -359,7 +359,7 @@ bool SessionImpl::nextReceiver(qpid::mes } catch (const qpid::ConnectionException& e) { throw qpid::messaging::ConnectionError(e.what()); } catch (const qpid::ChannelException& e) { - throw qpid::messaging::MessagingException(e.what()); + throw qpid::messaging::MessagingException(e.what()); } } } @@ -385,7 +385,7 @@ struct SessionImpl::Receivable : Command { const std::string* destination; uint32_t result; - + Receivable(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} void operator()() { result = impl.getReceivableImpl(destination); } }; @@ -414,7 +414,7 @@ struct SessionImpl::UnsettledAcks : Comm { const std::string* destination; uint32_t result; - + UnsettledAcks(SessionImpl& i, const std::string* d) : Command(i), destination(d), result(0) {} void operator()() { result = impl.getUnsettledAcksImpl(destination); } }; @@ -451,10 +451,10 @@ void SessionImpl::rollbackImpl() getImplPtr<Receiver, ReceiverImpl>(i->second)->stop(); } //ensure that stop has been processed and all previously sent - //messages are available for release: + //messages are available for release: session.sync(); incoming.releaseAll(); - session.txRollback(); + session.txRollback(); for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { getImplPtr<Receiver, ReceiverImpl>(i->second)->start(); @@ -495,6 +495,12 @@ void SessionImpl::receiverCancelled(cons incoming.releasePending(name); } +void SessionImpl::releasePending(const std::string& name) +{ + ScopedLock l(lock); + incoming.releasePending(name); +} + void SessionImpl::senderCancelled(const std::string& name) { ScopedLock l(lock); @@ -503,12 +509,12 @@ void SessionImpl::senderCancelled(const void SessionImpl::reconnect() { - connection->open(); + connection->open(); } bool SessionImpl::backoff() { - return connection->backoff(); + return connection->backoff(); } qpid::messaging::Connection SessionImpl::getConnection() const Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h?rev=1069030&r1=1069029&r2=1069030&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.h Wed Feb 9 18:58:12 2011 @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -79,8 +79,9 @@ class SessionImpl : public qpid::messagi void checkError(); bool hasError(); - bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout); + void releasePending(const std::string& destination); void receiverCancelled(const std::string& name); void senderCancelled(const std::string& name); @@ -110,7 +111,7 @@ class SessionImpl : public qpid::messagi } catch (const qpid::ConnectionException& e) { throw qpid::messaging::ConnectionError(e.what()); } catch (const qpid::ChannelException& e) { - throw qpid::messaging::MessagingException(e.what()); + throw qpid::messaging::MessagingException(e.what()); } } @@ -206,11 +207,11 @@ class SessionImpl : public qpid::messagi struct Acknowledge1 : Command { qpid::messaging::Message& message; - + Acknowledge1(SessionImpl& i, qpid::messaging::Message& m) : Command(i), message(m) {} void operator()() { impl.acknowledgeImpl(message); } }; - + struct CreateSender; struct CreateReceiver; struct UnsettledAcks; @@ -222,12 +223,12 @@ class SessionImpl : public qpid::messagi F f(*this); return execute(f); } - + template <class F> void retry() { while (!execute<F>()) {} } - + template <class F, class P> bool execute1(P p) { F f(*this, p); Modified: qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp?rev=1069030&r1=1069029&r2=1069030&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp (original) +++ qpid/trunk/qpid/cpp/src/tests/MessagingSessionTests.cpp Wed Feb 9 18:58:12 2011 @@ -404,7 +404,7 @@ struct QueueCreatePolicyFixture : public ~QueueCreatePolicyFixture() { - admin.deleteQueue(address.getName()); + admin.deleteQueue(address.getName()); } }; @@ -448,7 +448,7 @@ struct ExchangeCreatePolicyFixture : pub ~ExchangeCreatePolicyFixture() { - admin.deleteExchange(address.getName()); + admin.deleteExchange(address.getName()); } }; @@ -597,7 +597,7 @@ QPID_AUTO_TEST_CASE(testAssertPolicyQueu s1.close(); Receiver r1 = fix.session.createReceiver(a1); r1.close(); - + std::string a2 = "q; {assert:receiver, node:{durable:true, x-declare:{arguments:{qpid.max-count:100}}}}"; Sender s2 = fix.session.createSender(a2); s2.close(); @@ -711,7 +711,7 @@ QPID_AUTO_TEST_CASE(testOptionVerificati { MessagingFixture fix; fix.session.createReceiver("my-queue; {create: always, assert: always, delete: always, node: {type: queue, durable: false, x-declare: {arguments: {a: b}}, x-bindings: [{exchange: amq.fanout}]}, link: {name: abc, durable: false, reliability: exactly-once, x-subscribe: {arguments:{a:b}}, x-bindings:[{exchange: amq.fanout}]}, mode: browse}"); - BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError); + BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError); } QPID_AUTO_TEST_CASE(testReceiveSpecialProperties) @@ -775,19 +775,48 @@ QPID_AUTO_TEST_CASE(testExclusiveSubscri QPID_AUTO_TEST_CASE(testExclusiveQueueSubscriberAndBrowser) { MessagingFixture fix; - + std::string address = "exclusive-queue; { create: receiver, node : { x-declare : { auto-delete: true, exclusive: true } } }"; std::string browseAddress = "exclusive-queue; { mode: browse }"; Receiver receiver = fix.session.createReceiver(address); fix.session.sync(); - Connection c2 = fix.newConnection(); + Connection c2 = fix.newConnection(); c2.open(); Session s2 = c2.createSession(); - + BOOST_CHECK_NO_THROW(Receiver browser = s2.createReceiver(browseAddress)); - c2.close(); + c2.close(); +} + + +QPID_AUTO_TEST_CASE(testDeleteQueueWithUnackedMessages) +{ + MessagingFixture fix; + const uint capacity = 5; + + Sender sender = fix.session.createSender("test.ex;{create:always,node:{type:topic}}"); + Receiver receiver2 = fix.session.createReceiver("alternate.ex;{create:always,node:{type:topic}}"); + Receiver receiver1 = fix.session.createReceiver("test.q;{create:always, delete:always,node:{type:queue, x-declare:{alternate-exchange:alternate.ex}},link:{x-bindings:[{exchange:test.ex,queue:test.q,key:#}]}}"); + + receiver1.setCapacity(capacity); + receiver2.setCapacity(capacity*2); + + Message out("test-message"); + for (uint i = 0; i < capacity*2; ++i) { + sender.send(out); + } + + receiver1.close(); + + // Make sure all pending messages were sent to the alternate + // exchange when the queue was deleted. + Message in; + for (uint i = 0; i < capacity*2; ++i) { + in = receiver2.fetch(Duration::SECOND * 5); + BOOST_CHECK_EQUAL(in.getContent(), out.getContent()); + } } QPID_AUTO_TEST_CASE(testAuthenticatedUsername) @@ -828,7 +857,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) messages.push_back(msg); } const uint batch(10); //acknowledge first 10 messages only - for (uint i = 0; i < batch; ++i) { + for (uint i = 0; i < batch; ++i) { other.acknowledge(messages[i]); } messages.clear(); @@ -836,7 +865,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) other.close(); other = fix.connection.createSession(); - receiver = other.createReceiver(fix.queue); + receiver = other.createReceiver(fix.queue); for (uint i = 0; i < (count-batch); ++i) { Message msg = receiver.fetch(); BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % (i+1+batch)).str()); @@ -847,7 +876,7 @@ QPID_AUTO_TEST_CASE(testAcknowledge) //check unacknowledged messages are still enqueued other = fix.connection.createSession(); - receiver = other.createReceiver(fix.queue); + receiver = other.createReceiver(fix.queue); for (uint i = 0; i < ((count-batch)/2); ++i) { Message msg = receiver.fetch(); BOOST_CHECK_EQUAL(msg.getContent(), (boost::format("Message_%1%") % ((i*2)+1+batch)).str()); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org