Author: aconway
Date: Tue May 18 18:47:25 2010
New Revision: 945813

URL: http://svn.apache.org/viewvc?rev=945813&view=rev
Log:
Added locking to make amqp_0_10::SenderImpl thread safe.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp?rev=945813&r1=945812&r2=945813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp Tue May 18 
18:47:25 2010
@@ -36,7 +36,7 @@ SenderImpl::SenderImpl(SessionImpl& _par
 
 void SenderImpl::send(const qpid::messaging::Message& message, bool sync) 
 {
-    if (unreliable) {
+    if (unreliable) {           // immutable, don't need lock
         UnreliableSend f(*this, &message);
         parent->execute(f);
     } else {
@@ -53,11 +53,20 @@ void SenderImpl::close()
 
 void SenderImpl::setCapacity(uint32_t c)
 {
-    bool flush = c < capacity;
-    capacity = c;
+    bool flush;
+    {
+        sys::Mutex::ScopedLock l(lock);
+        flush = c < capacity;
+        capacity = c;
+    }
     execute1<CheckPendingSends>(flush);
 }
-uint32_t SenderImpl::getCapacity() { return capacity; }
+
+uint32_t SenderImpl::getCapacity() {
+    sys::Mutex::ScopedLock l(lock);
+    return capacity;
+}
+
 uint32_t SenderImpl::getUnsettled()
 {
     CheckPendingSends f(*this, false);
@@ -67,6 +76,7 @@ uint32_t SenderImpl::getUnsettled()
 
 void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& 
resolver)
 {
+    sys::Mutex::ScopedLock l(lock);
     session = s;
     if (state == UNRESOLVED) {
         sink = resolver.resolveSink(session, address);
@@ -74,33 +84,38 @@ void SenderImpl::init(qpid::client::Asyn
     }
     if (state == CANCELLED) {
         sink->cancel(session, name);
+        sys::Mutex::ScopedUnlock u(lock);
         parent->senderCancelled(name);
     } else {
         sink->declare(session, name);
-        replay();
+        replay(l);
     }
 }
 
 void SenderImpl::waitForCapacity() 
 {
+    sys::Mutex::ScopedLock l(lock);
     //TODO: add option to throw exception rather than blocking?
-    if (!unreliable && capacity <= (flushed ? checkPendingSends(false) : 
outgoing.size())) {
+    if (!unreliable && capacity <=
+        (flushed ? checkPendingSends(false, l) : outgoing.size()))
+    {
         //Initial implementation is very basic. As outgoing is
         //currently only reduced on receiving completions and we are
         //blocking anyway we may as well sync(). If successful that
         //should clear all outstanding sends.
         session.sync();
-        checkPendingSends(false);
+        checkPendingSends(false, l);
     }
     //flush periodically and check for conmpleted sends
     if (++window > (capacity / 4)) {//TODO: make this configurable?
-        checkPendingSends(true);
+        checkPendingSends(true, l);
         window = 0;
     }
 }
 
 void SenderImpl::sendImpl(const qpid::messaging::Message& m)
 {
+    sys::Mutex::ScopedLock l(lock);
     std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
     msg->convert(m);
     msg->setSubject(m.getSubject().empty() ? address.getSubject() : 
m.getSubject());
@@ -110,20 +125,26 @@ void SenderImpl::sendImpl(const qpid::me
 
 void SenderImpl::sendUnreliable(const qpid::messaging::Message& m)
 {
+    sys::Mutex::ScopedLock l(lock);
     OutgoingMessage msg;
     msg.convert(m);
     msg.setSubject(m.getSubject().empty() ? address.getSubject() : 
m.getSubject());
     sink->send(session, name, msg);
 }
 
-void SenderImpl::replay()
+void SenderImpl::replay(const sys::Mutex::ScopedLock&)
 {
     for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); 
++i) {
         sink->send(session, name, *i);
     }
 }
 
-uint32_t SenderImpl::checkPendingSends(bool flush)
+uint32_t SenderImpl::checkPendingSends(bool flush) {
+    sys::Mutex::ScopedLock l(lock);
+    return checkPendingSends(flush, l);
+}
+
+uint32_t SenderImpl::checkPendingSends(bool flush, const 
sys::Mutex::ScopedLock&)
 {
     if (flush) {
         session.flush(); 
@@ -139,6 +160,7 @@ uint32_t SenderImpl::checkPendingSends(b
 
 void SenderImpl::closeImpl()
 {
+    sys::Mutex::ScopedLock l(lock);
     state = CANCELLED;
     sink->cancel(session, name);
     parent->senderCancelled(name);
@@ -146,11 +168,13 @@ void SenderImpl::closeImpl()
 
 const std::string& SenderImpl::getName() const
 {
+    sys::Mutex::ScopedLock l(lock);
     return name;
 }
 
 qpid::messaging::Session SenderImpl::getSession() const
 {
+    sys::Mutex::ScopedLock l(lock);
     return qpid::messaging::Session(parent.get());
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h?rev=945813&r1=945812&r2=945813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/amqp0_10/SenderImpl.h Tue May 18 
18:47:25 2010
@@ -58,6 +58,7 @@ class SenderImpl : public qpid::messagin
     qpid::messaging::Session getSession() const;
 
   private:
+    mutable sys::Mutex lock;
     boost::intrusive_ptr<SessionImpl> parent;
     const std::string name;
     const qpid::messaging::Address address;
@@ -76,7 +77,9 @@ class SenderImpl : public qpid::messagin
     const bool unreliable;
 
     uint32_t checkPendingSends(bool flush);
-    void replay();
+    // Dummy ScopedLock parameter means call with lock held
+    uint32_t checkPendingSends(bool flush, const sys::Mutex::ScopedLock&);
+    void replay(const sys::Mutex::ScopedLock&); 
     void waitForCapacity();
 
     //logic for application visible methods:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to