Author: aconway Date: Tue May 18 18:47:25 2010 New Revision: 945813 URL: http://svn.apache.org/viewvc?rev=945813&view=rev Log: Added locking to make amqp_0_10::SenderImpl thread safe.
Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=945813&r1=945812&r2=945813&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Tue May 18 18:47:25 2010 @@ -36,7 +36,7 @@ SenderImpl::SenderImpl(SessionImpl& _par void SenderImpl::send(const qpid::messaging::Message& message, bool sync) { - if (unreliable) { + if (unreliable) { // immutable, don't need lock UnreliableSend f(*this, &message); parent->execute(f); } else { @@ -53,11 +53,20 @@ void SenderImpl::close() void SenderImpl::setCapacity(uint32_t c) { - bool flush = c < capacity; - capacity = c; + bool flush; + { + sys::Mutex::ScopedLock l(lock); + flush = c < capacity; + capacity = c; + } execute1<CheckPendingSends>(flush); } -uint32_t SenderImpl::getCapacity() { return capacity; } + +uint32_t SenderImpl::getCapacity() { + sys::Mutex::ScopedLock l(lock); + return capacity; +} + uint32_t SenderImpl::getUnsettled() { CheckPendingSends f(*this, false); @@ -67,6 +76,7 @@ uint32_t SenderImpl::getUnsettled() void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) { + sys::Mutex::ScopedLock l(lock); session = s; if (state == UNRESOLVED) { sink = resolver.resolveSink(session, address); @@ -74,33 +84,38 @@ void SenderImpl::init(qpid::client::Asyn } if (state == CANCELLED) { sink->cancel(session, name); + sys::Mutex::ScopedUnlock u(lock); parent->senderCancelled(name); } else { sink->declare(session, name); - replay(); + replay(l); } } void SenderImpl::waitForCapacity() { + sys::Mutex::ScopedLock l(lock); //TODO: add option to throw exception rather than blocking? - if (!unreliable && capacity <= (flushed ? checkPendingSends(false) : outgoing.size())) { + if (!unreliable && capacity <= + (flushed ? checkPendingSends(false, l) : outgoing.size())) + { //Initial implementation is very basic. As outgoing is //currently only reduced on receiving completions and we are //blocking anyway we may as well sync(). If successful that //should clear all outstanding sends. session.sync(); - checkPendingSends(false); + checkPendingSends(false, l); } //flush periodically and check for conmpleted sends if (++window > (capacity / 4)) {//TODO: make this configurable? - checkPendingSends(true); + checkPendingSends(true, l); window = 0; } } void SenderImpl::sendImpl(const qpid::messaging::Message& m) { + sys::Mutex::ScopedLock l(lock); std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); msg->convert(m); msg->setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); @@ -110,20 +125,26 @@ void SenderImpl::sendImpl(const qpid::me void SenderImpl::sendUnreliable(const qpid::messaging::Message& m) { + sys::Mutex::ScopedLock l(lock); OutgoingMessage msg; msg.convert(m); msg.setSubject(m.getSubject().empty() ? address.getSubject() : m.getSubject()); sink->send(session, name, msg); } -void SenderImpl::replay() +void SenderImpl::replay(const sys::Mutex::ScopedLock&) { for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { sink->send(session, name, *i); } } -uint32_t SenderImpl::checkPendingSends(bool flush) +uint32_t SenderImpl::checkPendingSends(bool flush) { + sys::Mutex::ScopedLock l(lock); + return checkPendingSends(flush, l); +} + +uint32_t SenderImpl::checkPendingSends(bool flush, const sys::Mutex::ScopedLock&) { if (flush) { session.flush(); @@ -139,6 +160,7 @@ uint32_t SenderImpl::checkPendingSends(b void SenderImpl::closeImpl() { + sys::Mutex::ScopedLock l(lock); state = CANCELLED; sink->cancel(session, name); parent->senderCancelled(name); @@ -146,11 +168,13 @@ void SenderImpl::closeImpl() const std::string& SenderImpl::getName() const { + sys::Mutex::ScopedLock l(lock); return name; } qpid::messaging::Session SenderImpl::getSession() const { + sys::Mutex::ScopedLock l(lock); return qpid::messaging::Session(parent.get()); } Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=945813&r1=945812&r2=945813&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Tue May 18 18:47:25 2010 @@ -58,6 +58,7 @@ class SenderImpl : public qpid::messagin qpid::messaging::Session getSession() const; private: + mutable sys::Mutex lock; boost::intrusive_ptr<SessionImpl> parent; const std::string name; const qpid::messaging::Address address; @@ -76,7 +77,9 @@ class SenderImpl : public qpid::messagin const bool unreliable; uint32_t checkPendingSends(bool flush); - void replay(); + // Dummy ScopedLock parameter means call with lock held + uint32_t checkPendingSends(bool flush, const sys::Mutex::ScopedLock&); + void replay(const sys::Mutex::ScopedLock&); void waitForCapacity(); //logic for application visible methods: --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org