Author: aconway Date: Tue Jun 22 13:29:52 2010 New Revision: 956882 URL: http://svn.apache.org/viewvc?rev=956882&view=rev Log: Fix cluster broker crashes when management is active.
Cluser brokers were exiting with errors "modified cluster state outside cluster context" and "confirmed < (50+0) but only sent < (49+0)" Fix was to: - delay completion of incoming update till update connection closes. - delay addding new connections to managment until connection is announced. Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp qpid/trunk/qpid/cpp/src/tests/cluster_tests.py qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects qpid/trunk/qpid/cpp/xml/cluster.xml Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=956882&r1=956881&r2=956882&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Jun 22 13:29:52 2010 @@ -76,8 +76,14 @@ struct ConnectionTimeoutTask : public sy } }; -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, - const qpid::sys::SecuritySettings& external, bool isLink_, uint64_t objectId, bool shadow_) : +Connection::Connection(ConnectionOutputHandler* out_, + Broker& broker_, const + std::string& mgmtId_, + const qpid::sys::SecuritySettings& external, + bool isLink_, + uint64_t objectId_, + bool shadow_, + bool delayManagement) : ConnectionState(out_, broker_), securitySettings(external), adapter(*this, isLink_, shadow_), @@ -89,26 +95,30 @@ Connection::Connection(ConnectionOutputH agent(0), timer(broker_.getTimer()), errorListener(0), + objectId(objectId_), shadow(shadow_) { - Manageable* parent = broker.GetVhostObject(); - if (isLink) links.notifyConnection(mgmtId, this); + // In a cluster, allow adding the management object to be delayed. + if (!delayManagement) addManagementObject(); + if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); +} - if (parent != 0) - { - agent = broker_.getManagementAgent(); - - // TODO set last bool true if system connection +void Connection::addManagementObject() { + assert(agent == 0); + assert(mgmtObject == 0); + Manageable* parent = broker.GetVhostObject(); + if (parent != 0) { + agent = broker.getManagementAgent(); if (agent != 0) { + // TODO set last bool true if system connection mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false); mgmtObject->set_shadow(shadow); agent->addObject(mgmtObject, objectId); } ConnectionState::setUrl(mgmtId); } - if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); } void Connection::requestIOProcessing(boost::function0<void> callback) Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=956882&r1=956881&r2=956882&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jun 22 13:29:52 2010 @@ -79,9 +79,15 @@ class Connection : public sys::Connectio virtual void connectionError(const std::string&) = 0; }; - Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, + Connection(sys::ConnectionOutputHandler* out, + Broker& broker, + const std::string& mgmtId, const qpid::sys::SecuritySettings&, - bool isLink = false, uint64_t objectId = 0, bool shadow=false); + bool isLink = false, + uint64_t objectId = 0, + bool shadow=false, + bool delayManagement = false); + ~Connection (); /** Get the SessionHandler for channel. Create if it does not already exist */ @@ -139,6 +145,9 @@ class Connection : public sys::Connectio // Used by cluster to update connection status sys::AggregateOutput& getOutputTasks() { return outputTasks; } + /** Cluster delays adding management object in the constructor then calls this. */ + void addManagementObject(); + const qpid::sys::SecuritySettings& getExternalSecuritySettings() const { return securitySettings; @@ -166,6 +175,7 @@ class Connection : public sys::Connectio boost::intrusive_ptr<sys::TimerTask> heartbeatTimer; boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer; ErrorListener* errorListener; + uint64_t objectId; bool shadow; public: Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=956882&r1=956881&r2=956882&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jun 22 13:29:52 2010 @@ -194,7 +194,7 @@ namespace _qmf = ::qmf::org::apache::qpi * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 904565; +const uint32_t Cluster::CLUSTER_VERSION = 956001; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -269,6 +269,7 @@ Cluster::Cluster(const ClusterSettings& lastAliveCount(0), lastBroker(false), updateRetracted(false), + updateClosed(false), error(*this) { // We give ownership of the timer to the broker and keep a plain pointer. @@ -863,6 +864,14 @@ void Cluster::updateStart(const MemberId connectionSettings(settings))); } +// Called in network thread +void Cluster::updateInClosed() { + Lock l(lock); + assert(!updateClosed); + updateClosed = true; + checkUpdateIn(l); +} + // Called in update thread. void Cluster::updateInDone(const ClusterMap& m) { Lock l(lock); @@ -879,6 +888,7 @@ void Cluster::updateInRetracted() { void Cluster::checkUpdateIn(Lock& l) { if (state != UPDATEE) return; // Wait till we reach the stall point. + if (!updateClosed) return; // Wait till update connection closes. if (updatedMap) { // We're up to date map = *updatedMap; failoverExchange->setUrls(getUrls(l)); @@ -895,6 +905,7 @@ void Cluster::checkUpdateIn(Lock& l) { } else if (updateRetracted) { // Update was retracted, request another update updateRetracted = false; + updateClosed = false; state = JOINER; QPID_LOG(notice, *this << " update retracted, sending new update request."); mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=956882&r1=956881&r2=956882&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Jun 22 13:29:52 2010 @@ -97,6 +97,7 @@ class Cluster : private Cpg::Handler, pu void leave(); // Update completed - called in update thread + void updateInClosed(); void updateInDone(const ClusterMap&); void updateInRetracted(); @@ -277,7 +278,7 @@ class Cluster : private Cpg::Handler, pu bool lastBroker; sys::Thread updateThread; boost::optional<ClusterMap> updatedMap; - bool updateRetracted; + bool updateRetracted, updateClosed; ErrorCheck error; UpdateReceiver updateReceiver; ClusterTimer* timer; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=956882&r1=956881&r2=956882&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Jun 22 13:29:52 2010 @@ -22,7 +22,6 @@ #include "UpdateClient.h" #include "Cluster.h" #include "UpdateReceiver.h" - #include "qpid/assert.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" @@ -43,7 +42,6 @@ #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" - #include <boost/current_function.hpp> @@ -99,10 +97,9 @@ Connection::Connection(Cluster& c, sys:: { cluster.addLocalConnection(this); if (isLocalClient()) { - // Local clients are announced to the cluster - // and initialized when the announce is received. giveReadCredit(cluster.getSettings().readMax); // Flow control - init(); + // Delay adding the connection to the management map until announce() + connectionCtor.delayManagement = true; } else { // Catch-up shadow connections initialized using nextShadow id. @@ -110,9 +107,9 @@ Connection::Connection(Cluster& c, sys:: if (!updateIn.nextShadowMgmtId.empty()) connectionCtor.mgmtId = updateIn.nextShadowMgmtId; updateIn.nextShadowMgmtId.clear(); - init(); - } - QPID_LOG(info, "incoming connection " << *this); + } + init(); + QPID_LOG(debug, cluster << " local connection " << *this); } void Connection::setSecureConnection(broker::SecureConnection* sc) { @@ -152,8 +149,11 @@ void Connection::announce( QPID_ASSERT(ssf == connectionCtor.external.ssf); QPID_ASSERT(authid == connectionCtor.external.authid); QPID_ASSERT(nodict == connectionCtor.external.nodict); - // Local connections are already initialized. - if (isShadow()) { + // Local connections are already initialized but with management delayed. + if (isLocalClient()) { + connection->addManagementObject(); + } + else if (isShadow()) { init(); // Play initial frames into the connection. Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size()); @@ -162,8 +162,9 @@ void Connection::announce( connection->received(frame); connection->setUserId(username); } - // Raise the connection management event now that the connection is replicated. + // Do managment actions now that the connection is replicated. connection->raiseConnectEvent(); + QPID_LOG(debug, cluster << " replicated connection " << *this); } Connection::~Connection() { @@ -249,6 +250,7 @@ void Connection::closed() { if (isUpdated()) { QPID_LOG(debug, cluster << " update connection closed " << *this); close(); + cluster.updateInClosed(); } else if (catchUp) { QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); @@ -259,7 +261,8 @@ void Connection::closed() { // closed and process any outstanding frames from the cluster // until self-delivery of deliver-close. output.closeOutput(); - cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self); + cluster.getMulticast().mcastControl( + ClusterConnectionDeliverCloseBody(ProtocolVersion(), false), self); } } catch (const std::exception& e) { @@ -268,17 +271,21 @@ void Connection::closed() { } // Self-delivery of close message, close the connection. -void Connection::deliverClose () { - assert(!catchUp); - close(); +void Connection::deliverClose (bool aborted) { + QPID_LOG(debug, cluster << " replicated close of " << *this); + if (connection.get()) { + if (aborted) connection->abort(); + else connection->closed(); + connection.reset(); + } cluster.erase(self); } // Close the connection void Connection::close() { + QPID_LOG(debug, cluster << " local close of " << *this); if (connection.get()) { connection->closed(); - // Ensure we delete the broker::Connection in the deliver thread. connection.reset(); } } @@ -286,11 +293,9 @@ void Connection::close() { // The connection has been killed for misbehaving, called in connection thread. void Connection::abort() { if (connection.get()) { - connection->abort(); - // Ensure we delete the broker::Connection in the deliver thread. - connection.reset(); + cluster.getMulticast().mcastControl( + ClusterConnectionDeliverCloseBody(ProtocolVersion(), true), self); } - cluster.erase(self); } // ConnectionCodec::decode receives read buffers from directly-connected clients. Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=956882&r1=956881&r2=956882&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Jun 22 13:29:52 2010 @@ -170,7 +170,7 @@ class Connection : const std::string& initFrames); void close(); void abort(); - void deliverClose(); + void deliverClose(bool); OutputInterceptor& getOutput() { return output; } @@ -194,6 +194,7 @@ class Connection : bool isLink; uint64_t objectId; bool shadow; + bool delayManagement; ConnectionCtor( sys::ConnectionOutputHandler* out_, @@ -202,14 +203,19 @@ class Connection : const qpid::sys::SecuritySettings& external_, bool isLink_=false, uint64_t objectId_=0, - bool shadow_=false + bool shadow_=false, + bool delayManagement_=false ) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_), - isLink(isLink_), objectId(objectId_), shadow(shadow_) + isLink(isLink_), objectId(objectId_), shadow(shadow_), + delayManagement(delayManagement_) {} std::auto_ptr<broker::Connection> construct() { return std::auto_ptr<broker::Connection>( - new broker::Connection(out, broker, mgmtId, external, isLink, objectId, shadow)); + new broker::Connection( + out, broker, mgmtId, external, isLink, objectId, + shadow, delayManagement) + ); } }; Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=956882&r1=956881&r2=956882&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Jun 22 13:29:52 2010 @@ -2321,6 +2321,23 @@ void ManagementAgent::importAgents(qpid: } } +namespace { +bool isNotDeleted(const ManagementObjectMap::value_type& value) { + return !value.second->isDeleted(); +} + +size_t countNotDeleted(const ManagementObjectMap& map) { + return std::count_if(map.begin(), map.end(), isNotDeleted); +} + +void dumpMap(std::ostream& o, const ManagementObjectMap& map) { + for (ManagementObjectMap::const_iterator i = map.begin(); i != map.end(); ++i) { + if (!i->second->isDeleted()) + o << endl << " " << i->second->getObjectId().getV2Key(); + } +} +} // namespace + string ManagementAgent::debugSnapshot() { ostringstream msg; msg << " management snapshot:"; @@ -2328,8 +2345,8 @@ string ManagementAgent::debugSnapshot() i != remoteAgents.end(); ++i) msg << " " << i->second->routingKey; msg << " packages: " << packages.size(); - msg << " objects: " << managementObjects.size(); - msg << " new objects: " << newManagementObjects.size(); + msg << " objects: " << countNotDeleted(managementObjects); + msg << " new objects: " << countNotDeleted(newManagementObjects); return msg.str(); } Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp?rev=956882&r1=956881&r2=956882&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp Tue Jun 22 13:29:52 2010 @@ -43,8 +43,15 @@ void assertClusterSafe() { } } -ClusterSafeScope::ClusterSafeScope() { inContext = true; } -ClusterSafeScope::~ClusterSafeScope() { inContext = false; } +ClusterSafeScope::ClusterSafeScope() { + assert(!inContext); + inContext = true; +} + +ClusterSafeScope::~ClusterSafeScope() { + assert(inContext); + inContext = false; +} void enableClusterSafe() { inCluster = true; } Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=956882&r1=956881&r2=956882&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Jun 22 13:29:52 2010 @@ -255,7 +255,7 @@ class LongTests(BrokerTest): StoppableThread.stop(self) # def test_management - args=["--mgmt-pub-interval", 1] # Publish management information every second. + args = ["--mgmt-pub-interval", 1] # Publish management information every second. # Use store if present. if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] cluster = self.cluster(3, args) Modified: qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests?rev=956882&r1=956881&r2=956882&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests (original) +++ qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests Tue Jun 22 13:29:52 2010 @@ -20,5 +20,5 @@ # srcdir=`dirname $0` -$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=2 +$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=4 Modified: qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects?rev=956882&r1=956881&r2=956882&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects (original) +++ qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects Tue Jun 22 13:29:52 2010 @@ -1,6 +1,5 @@ #!/usr/bin/env python -# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information @@ -19,390 +18,83 @@ # under the License. # -import os -import getopt -import sys -import locale -import socket -import re -from qmf.console import Session, SchemaClass - -_host = "localhost" -_connTimeout = 10 -_verbose = 0 -_del_test = False; -pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") -_debug_recursion = 0 - -def Usage (): - print "Usage: verify_cluster_objects [OPTIONS] [broker-addr]" - print - print " broker-addr is in the form: [username/passw...@] hostname | ip-address [:<port>]" - print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/gu...@localhost" - print - print " This program contacts every node of a cluster, loads all manageable objects from" - print " those nodes and verifies that the management data is identical across the clusters." - print - print "Options:" - print " --timeout seconds (10) Maximum time to wait for broker connection" - print " --verbose level (0) Show details of objects and their IDs" - print " --delete Delete some objects after creation, to test synchup" - print - sys.exit (1) - -class IpAddr: - def __init__(self, text): - if text.find("@") != -1: - tokens = text.split("@") - text = tokens[1] - if text.find(":") != -1: - tokens = text.split(":") - text = tokens[0] - self.port = int(tokens[1]) - else: - self.port = 5672 - self.dottedQuad = socket.gethostbyname(text) - nums = self.dottedQuad.split(".") - self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3]) - - def bestAddr(self, addrPortList): - bestDiff = 0xFFFFFFFFL - bestAddr = None - for addrPort in addrPortList: - diff = IpAddr(addrPort[0]).addr ^ self.addr - if diff < bestDiff: - bestDiff = diff - bestAddr = addrPort - return bestAddr - -class ObjectId: - """Object identity, use for dictionaries by object id""" - def __init__(self, object): self.object = object - def __eq__(self, other): return self.object is other.object - def __hash__(self): return hash(id(self.object)) - -class Broker(object): - def __init__(self, qmf, broker): - self.broker = broker - self.qmf = qmf +# Verify managment objects are consistent in a cluster. +# Arguments: url of one broker in the cluster. - agents = qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a - - bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", - _agent=self.brokerAgent)[0] - self.currentTime = bobj.getTimestamps()[0] - try: - self.uptime = bobj.uptime - except: - self.uptime = 0 - self.tablesByName = {} - self.package = "org.apache.qpid.broker" - self.id_cache = {} # Cache for getAbstractId - - def getUrl(self): - return self.broker.getUrl() - - def getData(self): - if _verbose > 1: - print "Broker:", self.broker - - classList = self.qmf.getClasses(self.package) - for cls in classList: - if self.qmf.getSchema(cls).kind == SchemaClass.CLASS_KIND_TABLE: - self.loadTable(cls) - - - # - # this should be a method on an object, but is kept here for now, until - # we finish sorting out the treatment of names in qmfv2 - # - def getAbstractId(self, object): - """ return a string the of the hierarchical name """ - if (ObjectId(object) in self.id_cache): return self.id_cache[ObjectId(object)] - global _debug_recursion - result = u"" - valstr = u"" - _debug_recursion += 1 - debug_prefix = _debug_recursion - if (_verbose > 9): - print debug_prefix, " enter gai: props ", object._properties - for property, value in object._properties: - - # we want to recurse on things which are refs. we tell by - # asking each property if it's an index. I think... - if (_verbose > 9): - print debug_prefix, " prop ", property, " val " , value, " idx ", - property.index, " type ", property.type - - # property is an instance, you can ask its type, name, etc. - - # special case system refs, as they will never be the same on - # distinct cluster nodes. later we probably want a different - # way of representing these objects, like for instance don't - # include the system ref in the hierarchy. - - if property.name == "systemRef": - _debug_recursion -= 1 - self.id_cache[ObjectId(object)] = "" - return "" - - if property.index: - if result != u"": - result += u":" - if property.type == 10: - try: - recursive_objects = object._session.getObjects(_objectId = value, _broker=object._broker) - if (_verbose > 9): - print debug_prefix, " r ", recursive_objects[0] - for rp, rv in recursive_objects[0]._properties: - print debug_prefix, " rrr ", rp, " idx-p ", rp.index, " v ", rv - print debug_prefix, " recursing on ", recursive_objects[0] - valstr = self.getAbstractId(recursive_objects[0]) - if (_verbose > 9): - print debug_prefix, " recursing on ", recursive_objects[0], - " -> ", valstr - except Exception, e: - if (_verbose > 9): - print debug_prefix, " except ", e - valstr = u"<undecodable>" - else: - # this yields UUID-blah. not good. try something else - # valstr = value.__repr__() - # print debug_prefix, " val ", value - - # yetch. this needs to be abstracted someplace? I don't - # think we have the infrastructure we need to make these id - # strings be sensible in the general case - if property.name == "systemId": - # special case. try to do something sensible about systemref objects - valstr = object.nodeName - else: - valstr = value.__repr__() # I think... - result += valstr - if (_verbose > 9): - print debug_prefix, " id ", self, " -> ", result - _debug_recursion -= 1 - self.id_cache[ObjectId(object)] = result - return result - - def loadTable(self, cls): - if _verbose > 1: - print " Class:", cls.getClassName() - list = self.qmf.getObjects(_class=cls.getClassName(), - _package=cls.getPackageName(), - _agent=self.brokerAgent) - - # tables-by-name maps class name to a table by object-name of - # objects. ie use the class name ("broker", "queue", etc) to - # index tables-by-name, returning a second table, use the - # object name to index that to get an object. - - self.tablesByName[cls.getClassName()] = {} - for obj in list: - # make sure we aren't colliding on name. it's an internal - # error (ie, the name-generation code is busted) if we do - key = self.getAbstractId(obj) - if key in self.tablesByName[cls.getClassName()]: - raise Exception("internal error: collision for %s on key %s\n" - % (obj, key)) - - self.tablesByName[cls.getClassName()][key] = obj - if _verbose > 1: - print " ", obj.getObjectId(), " ", obj.getIndex(), " ", key +import qmf.console, sys, re +class Session(qmf.console.Session): + """A qmf.console.Session that caches useful values""" -class BrokerManager: def __init__(self): - self.brokerName = None - self.qmf = None - self.broker = None - self.brokers = [] - self.cluster = None - - def SetBroker(self, brokerUrl): - self.url = brokerUrl - self.qmf = Session() - self.broker = self.qmf.addBroker(brokerUrl, _connTimeout) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a - - def Disconnect(self): - if self.broker: - self.qmf.delBroker(self.broker) - - def _getCluster(self): - packages = self.qmf.getPackages() - if "org.apache.qpid.cluster" not in packages: - return None - - clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent) - if len(clusters) == 0: - print "Clustering is installed but not enabled on the broker." - return None - - self.cluster = clusters[0] - - def _getHostList(self, urlList): - hosts = [] - hostAddr = IpAddr(_host) - for url in urlList: - if url.find("amqp:") != 0: - raise Exception("Invalid URL 1") - url = url[5:] - addrs = str(url).split(",") - addrList = [] - for addr in addrs: - tokens = addr.split(":") - if len(tokens) != 3: - raise Exception("Invalid URL 2") - addrList.append((tokens[1], tokens[2])) - - # Find the address in the list that is most likely to be - # in the same subnet as the address with which we made the - # original QMF connection. This increases the probability - # that we will be able to reach the cluster member. - - best = hostAddr.bestAddr(addrList) - bestUrl = best[0] + ":" + best[1] - hosts.append(bestUrl) - return hosts - - - # the main fun which tests for broker state "identity". now that - # we're using qmf2 style object names across the board, that test - # means that we are ensuring that for all objects of a given - # class, an object of that class with the same object name exists - # on the peer broker. - - def verify(self): - if _verbose > 0: - print "Connecting to the cluster..." - self._getCluster() - if self.cluster: - memberList = self.cluster.members.split(";") - hostList = self._getHostList(memberList) - self.qmf.delBroker(self.broker) - self.broker = None - for host in hostList: - b = self.qmf.addBroker(host, _connTimeout) - self.brokers.append(Broker(self.qmf, b)) - if _verbose > 0: - print " ", b - else: - raise Exception("Failed - Not a cluster") - - failures = [] - - # Wait until connections to all nodes are established before - # loading the management data. This will ensure that the - # objects are all stable and the same. - if _verbose > 0: - print "Loading management data from nodes..." - for broker in self.brokers: - broker.getData() - - # If we're testing delete-some-objects functionality, create a - # few widgets here and then delete them. - if _del_test: - if _verbose > 0: - print "Running delete test" - # just stick 'em in the first broker - b = self.brokers[0] - session = b.qmf.brokers[0].getAmqpSession() - session.queue_declare(queue="foo", exclusive=True, auto_delete=True) - session.exchange_bind(exchange="amq.direct", - queue="foo", binding_key="foo") - session.queue_declare(queue="bar", exclusive=True, auto_delete=True) - session.exchange_bind(exchange="amq.direct", - queue="bar", binding_key="bar") - # now delete 'em - session.exchange_unbind(queue="foo", exchange="amq.direct", binding_key="foo") - session.exchange_unbind(queue="bar", exchange="amq.direct", binding_key="bar") - session.queue_delete("bar") - session.queue_delete("foo") - - # Verify that each node has the same set of objects (based on - # object name). - if _verbose > 0: - print "Verifying objects based on object name..." - base = self.brokers[0] - for broker in self.brokers[1:]: - - # walk over the class names, for each class (with some - # exceptions) walk over the objects of that class, making - # sure they match between broker A and broker B - - for className in base.tablesByName: - if className in ["broker", "system", "connection"]: - continue - - tab1 = base.tablesByName[className] - tab2 = broker.tablesByName[className] - - for key in tab1: - if key not in tab2: - failures.append("%s key %s not found on node %s" % - (className, key, broker.getUrl())) - for key in tab2: - if key not in tab1: - failures.append("%s key %s not found on node %s" % - (className, key, base.getUrl())) - - if len(failures) > 0: - print "Failures:" - for failure in failures: - print " %s" % failure - raise Exception("Failures") - - if _verbose > 0: - print "Success" - -## -## Main Program -## - -try: - longOpts = ("verbose=", "timeout=", "delete") - (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "", longOpts) -except: - Usage() - -try: - encoding = locale.getpreferredencoding() - cargs = [a.decode(encoding) for a in encArgs] -except: - cargs = encArgs - -for opt in optlist: - if opt[0] == "--timeout": - _connTimeout = int(opt[1]) - if _connTimeout == 0: - _connTimeout = None - elif opt[0] == "--verbose": - _verbose = int(opt[1]) - elif opt[0] == "--delete": - _del_test = True; - else: - Usage() - -nargs = len(cargs) -bm = BrokerManager() - -if nargs == 1: - _host = cargs[0] - -try: - bm.SetBroker(_host) - bm.verify() -except KeyboardInterrupt: - print -except Exception,e: - print "Failed: %s - %s" % (e.__class__.__name__, e) - sys.exit(1) + qmf.console.Session.__init__(self) + self.classes = None + + def all_classes(self): + if self.classes is None: + self.classes = [c for p in self.getPackages() for c in self.getClasses(p)] + return self.classes + +class Broker: + def __init__(self, url, qmf): + self.url = url + self.qmf = qmf + self.broker = self.qmf.addBroker(url) + self.broker._waitForStable() + self.objects = None + self.ignore_list = [ re.compile("org.apache.qpid.broker:system:") ] + + def get_objects(self): + def ignore(name): + for m in (m for m in self.ignore_list if m.match(name)): + return True + if self.objects is None: + obj_list = [] + for c in self.qmf.all_classes(): + for o in self.qmf.getObjects(_key=c, _broker=self.broker): + name=o.getObjectId().getObject() + if not ignore(name): obj_list.append(name) + self.objects = set(obj_list) + if (len(obj_list) != len(self.objects)): + raise Exception("Duplicates in object list for %s"%(self.url)) + return self.objects + + def compare(self,other): + def compare1(x,y): + diff = x.get_objects() - y.get_objects() + if diff: + print "ERROR: found on %s but not %s"%(x, y) + for o in diff: print " %s"%(o) + return False + return True + + so = compare1(self, other) + os = compare1(other, self) + return so and os + + def __str__(self): return self.url + + def get_cluster(self): + """Given one Broker, return list of all brokers in its cluster""" + clusters = self.qmf.getObjects(_class="cluster") + if not clusters: raise ("%s is not a cluster member"%(self.url)) + def first_address(url): + """Python doesn't understand the brokers URL syntax. Extract a simple addres""" + return re.compile("amqp:tcp:([^,]*)").match(url).group(1) + return [Broker(first_address(url), self.qmf) for url in clusters[0].members.split(";")] + + def __del__(self): self.qmf.delBroker(self.broker) + +def main(argv=None): + if argv is None: argv = sys.argv + qmf = Session() + brokers = Broker(argv[1], qmf).get_cluster() + base = brokers.pop(0) + result = 0 + for b in brokers: + if not base.compare(b): result = 1 + del base + del brokers + return result -bm.Disconnect() +if __name__ == "__main__": sys.exit(main()) Modified: qpid/trunk/qpid/cpp/xml/cluster.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=956882&r1=956881&r2=956882&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/xml/cluster.xml (original) +++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Jun 22 13:29:52 2010 @@ -134,7 +134,9 @@ </control> <!-- Marks the cluster-wide point when a connection is considered closed. --> - <control name="deliver-close" code="0x2"/> + <control name="deliver-close" code="0x2"> + <field name="aborted" type="bit"/> + </control> <!-- Permission to generate output up to the limit. --> <control name="deliver-do-output" code="0x3"> --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org