Author: aconway
Date: Wed May 27 16:39:15 2009
New Revision: 779235

URL: http://svn.apache.org/viewvc?rev=779235&view=rev
Log:
Added missing locks in cluster code.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=779235&r1=779234&r2=779235&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed May 27 16:39:15 2009
@@ -238,8 +238,13 @@
     connections.insert(ConnectionMap::value_type(c->getId(), c));
 }
 
-// Called by Connection::deliverClose() in deliverFrameQueue thread.
 void Cluster::erase(const ConnectionId& id) {
+    Lock l(lock);    
+    erase(id,l);
+}
+
+// Called by Connection::deliverClose() in deliverFrameQueue thread.
+void Cluster::erase(const ConnectionId& id, Lock&) {
     connections.erase(id);
     decoder.erase(id);
 }
@@ -702,8 +707,10 @@
     while (i != connections.end()) {
         ConnectionMap::iterator j = i++;
         MemberId m = j->second->getId().getMember();
-        if (m != self && !map.isMember(m))
-            j->second->deliverClose();
+        if (m != self && !map.isMember(m)) {
+            j->second->getBrokerConnection().closed();
+            erase(j->second->getId(), l);
+        }
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=779235&r1=779234&r2=779235&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed May 27 16:39:15 2009
@@ -155,6 +155,7 @@
     void setReady(Lock&);
     void memberUpdate(Lock&);
     void setClusterId(const framing::Uuid&, Lock&);
+    void erase(const ConnectionId&, Lock&);       
 
     // == Called in CPG dispatch thread
     void deliver( // CPG deliver callback. 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp?rev=779235&r1=779234&r2=779235&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.cpp Wed May 27 16:39:15 2009
@@ -57,4 +57,9 @@
     map.erase(c);
 }
 
+framing::FrameDecoder& Decoder::get(const ConnectionId& c) {
+    sys::Mutex::ScopedLock l(lock);
+    return map[c];
+}
+
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h?rev=779235&r1=779234&r2=779235&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Decoder.h Wed May 27 16:39:15 2009
@@ -45,7 +45,7 @@
     Decoder(FrameHandler fh) : callback(fh) {}
     void decode(const EventHeader& eh, const char* data);
     void erase(const ConnectionId&);
-    framing::FrameDecoder& get(const ConnectionId& c) { return map[c]; }
+    framing::FrameDecoder& get(const ConnectionId& c);
 
   private:
     typedef std::map<ConnectionId, framing::FrameDecoder> Map;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to