Author: aconway
Date: Wed Jun  9 20:29:32 2010
New Revision: 953147

URL: http://svn.apache.org/viewvc?rev=953147&view=rev
Log:
Fix cluster-safe assertion in connection negotiation.

See https://bugzilla.redhat.com/show_bug.cgi?id=602347.
In a cluster, raise the management connect event when processing 
cluster.announce.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=953147&r1=953146&r2=953147&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Wed Jun  9 20:29:32 2010
@@ -24,6 +24,7 @@
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/ClusterSafe.h"
 
 #include "qpid/log/Statement.h"
 #include "qpid/ptr_map.h"
@@ -121,7 +122,9 @@ Connection::~Connection()
 {
     if (mgmtObject != 0) {
         mgmtObject->resourceDestroy();
-        if (!isLink)
+        // In a cluster, Connections destroyed during shutdown are in
+        // a cluster-unsafe context. Don't raise an event in that case.
+        if (!isLink && isClusterSafe())
             agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, 
ConnectionState::getUserId()));
     }
     if (isLink)
@@ -202,6 +205,13 @@ void Connection::notifyConnectionForced(
 void Connection::setUserId(const string& userId)
 {
     ConnectionState::setUserId(userId);
+    // In a cluster, the cluster code will raise the connect event
+    // when the connection is replicated to the cluster.
+    if (!sys::isCluster())
+        raiseConnectEvent();
+}
+
+void Connection::raiseConnectEvent() {
     if (mgmtObject != 0) {
         mgmtObject->set_authIdentity(userId);
         agent->raiseEvent(_qmf::EventClientConnect(mgmtId, userId));

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=953147&r1=953146&r2=953147&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Wed Jun  9 20:29:32 2010
@@ -111,6 +111,7 @@ class Connection : public sys::Connectio
     std::string getAuthCredentials();
     void notifyConnectionForced(const std::string& text);
     void setUserId(const string& uid);
+    void raiseConnectEvent();
     const std::string& getUserId() const { return 
ConnectionState::getUserId(); }
     const std::string& getMgmtId() const { return mgmtId; }
     management::ManagementAgent* getAgent() const { return agent; }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=953147&r1=953146&r2=953147&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jun  9 20:29:32 2010
@@ -165,6 +165,8 @@ void Connection::announce(
             connection->received(frame);
          connection->setUserId(username);
     }
+    // Raise the connection management event now that the connection is 
replicated.
+    connection->raiseConnectEvent();
 }
 
 Connection::~Connection() {

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp?rev=953147&r1=953146&r2=953147&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp Wed Jun  9 20:29:32 2010
@@ -32,8 +32,12 @@ bool inCluster = false;
 QPID_TSS bool inContext = false;
 }
 
+bool isClusterSafe() { return !inCluster || inContext; }
+
+bool isCluster() { return inCluster; }
+
 void assertClusterSafe()  {
-    if (inCluster && !inContext) {
+    if (!isClusterSafe()) {
         QPID_LOG(critical, "Modified cluster state outside of cluster 
context");
         ::abort();
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h?rev=953147&r1=953146&r2=953147&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.h Wed Jun  9 20:29:32 2010
@@ -42,6 +42,20 @@ namespace sys {
 QPID_COMMON_EXTERN void assertClusterSafe();
 
 /**
+ * In a non-clustered broker, returns true.
+ *
+ * In a clustered broker returns true if we are in a context where it
+ * is safe to modify cluster state.
+ *
+ * This function is in the common library rather than the cluster
+ * library because it is called by code in the broker library.
+ */
+QPID_COMMON_EXTERN bool isClusterSafe();
+
+/** Return true in a clustered broker */
+QPID_COMMON_EXTERN bool isCluster();
+
+/**
  * Base class for classes that encapsulate state which is replicated
  * to all members of a cluster. Acts as a marker for clustered state
  * and provides functions to assist detecting bugs in cluster
@@ -53,7 +67,8 @@ struct ClusterSafeScope {
 };
 
 /**
- * Enable cluster-safe assertions. By defaul they are no-ops.
+ * Enable cluster-safe assertions. By default they are no-ops.
+ * Called by cluster code.
  */
 void enableClusterSafe();
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=953147&r1=953146&r2=953147&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Wed Jun  9 20:29:32 2010
@@ -223,6 +223,7 @@ class LongTests(BrokerTest):
             """Start ordinary clients for a broker. Start one client per 
broker.
             Round-robin on a colllection of different clients."""
             cmds=[
+                ["qpid-tool", "localhost:%s"%(broker.port())],
                 ["qpid-perftest", "--count", 50000,
                  "--base-name", str(qpid.datatypes.uuid4()), "--port", 
broker.port()],
                 ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
@@ -234,14 +235,15 @@ class LongTests(BrokerTest):
             cmd = ["qpid-stat", "-b", "localhost:%s" %(broker.port())]
             mclients.append(ClientLoop(broker, cmd))
 
-        endtime = time.time() + self.duration()
+        duration = max(self.duration(), 5)
+        endtime = time.time() + duration
         alive = 0                       # First live cluster member
         for i in range(len(cluster)):
             start_clients(cluster[i], i)
         start_mclients(cluster[alive])
 
         while time.time() < endtime:
-            time.sleep(min(5,self.duration()/2))
+            time.sleep(min(5,duration/2))
             for b in cluster[alive:]: b.ready() # Check if a broker crashed.
             # Kill the first broker, expect the clients to fail. 
             for c in clients[alive] + mclients: c.expect_fail()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to