Author: aconway Date: Wed May 27 16:39:15 2009 New Revision: 779235 URL: http://svn.apache.org/viewvc?rev=779235&view=rev Log: Added missing locks in cluster code.
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=779235&r1=779234&r2=779235&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed May 27 16:39:15 2009 @@ -238,8 +238,13 @@ connections.insert(ConnectionMap::value_type(c->getId(), c)); } -// Called by Connection::deliverClose() in deliverFrameQueue thread. void Cluster::erase(const ConnectionId& id) { + Lock l(lock); + erase(id,l); +} + +// Called by Connection::deliverClose() in deliverFrameQueue thread. +void Cluster::erase(const ConnectionId& id, Lock&) { connections.erase(id); decoder.erase(id); } @@ -702,8 +707,10 @@ while (i != connections.end()) { ConnectionMap::iterator j = i++; MemberId m = j->second->getId().getMember(); - if (m != self && !map.isMember(m)) - j->second->deliverClose(); + if (m != self && !map.isMember(m)) { + j->second->getBrokerConnection().closed(); + erase(j->second->getId(), l); + } } } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=779235&r1=779234&r2=779235&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed May 27 16:39:15 2009 @@ -155,6 +155,7 @@ void setReady(Lock&); void memberUpdate(Lock&); void setClusterId(const framing::Uuid&, Lock&); + void erase(const ConnectionId&, Lock&); // == Called in CPG dispatch thread void deliver( // CPG deliver callback. Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp?rev=779235&r1=779234&r2=779235&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp Wed May 27 16:39:15 2009 @@ -57,4 +57,9 @@ map.erase(c); } +framing::FrameDecoder& Decoder::get(const ConnectionId& c) { + sys::Mutex::ScopedLock l(lock); + return map[c]; +} + }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h?rev=779235&r1=779234&r2=779235&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h Wed May 27 16:39:15 2009 @@ -45,7 +45,7 @@ Decoder(FrameHandler fh) : callback(fh) {} void decode(const EventHeader& eh, const char* data); void erase(const ConnectionId&); - framing::FrameDecoder& get(const ConnectionId& c) { return map[c]; } + framing::FrameDecoder& get(const ConnectionId& c); private: typedef std::map<ConnectionId, framing::FrameDecoder> Map; --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org