This commit broke the Java Build by altering the management-schema.xml which is used commonly between the two brokers to generate QMF classes.
Moreover, prior to this commit there was no JIRA or discussion on the list about a possible change... nor does the commit itself do much to explain what the change means (although since it relates to clusters, I can take a fair stab at guessing the answer to "is a connection on the Java Broker a shadow connection?" is false.) As I discussed in e-mails at the beginning of the year, I think we should be aiming at keeping the broker functionality as close a possible between the two brokers. This requires all of us to discuss upcoming proposed changes to things like management-schema.xml before we commit them so that both communities can have there say, and we can co-ordinate changes so neither community breaks the other's build. The schema as-is is significantly deficient for the Java Broker since it is designed around the limitations of the C++ broker wrt Virtual Hosts (there can be only one) and transports/ports (similarly restricted). However, while I would like to rectify this before our next release... I will be raising a JIRA and expect to discuss the impact and co-ordinate timing through the list... That is unless people would rather I just commit the change... and let the C++ guys work out how to fix their build after I break it ;-) BTW thanks to Rajith for putting in an emergency fix to the Java codebase to make it compile again... -- Rob ---------- Forwarded message ---------- From: <acon...@apache.org> Date: 6 February 2010 00:02 Subject: svn commit: r907123 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/cluster/ cpp/xml/ python/commands/ specs/ To: comm...@qpid.apache.org Author: aconway Date: Fri Feb 5 23:02:45 2010 New Revision: 907123 URL: http://svn.apache.org/viewvc?rev=907123&view=rev Log: Consistent connection names across a cluster. - use the same host:port for connections and their shadows. - add shadow property to managment connection to identify shadows. - updated qpid-stat and qpid-cluster to filter on shadow property. Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h qpid/trunk/qpid/cpp/xml/cluster.xml qpid/trunk/qpid/python/commands/qpid-cluster qpid/trunk/qpid/python/commands/qpid-stat qpid/trunk/qpid/specs/management-schema.xml Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Feb 5 23:02:45 2010 @@ -72,7 +72,7 @@ } }; -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_, uint64_t objectId) : +Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_, uint64_t objectId, bool shadow_) : ConnectionState(out_, broker_), ssf(ssf), adapter(*this, isLink_), @@ -84,7 +84,7 @@ agent(0), timer(broker_.getTimer()), errorListener(0), - shadow(false) + shadow(shadow_) { Manageable* parent = broker.GetVhostObject(); @@ -95,10 +95,10 @@ { agent = broker_.getManagementAgent(); - // TODO set last bool true if system connection if (agent != 0) { mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false); + mgmtObject->set_shadow(shadow); agent->addObject(mgmtObject, objectId, true); } ConnectionState::setUrl(mgmtId); Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=907123&r1=907122&r2=907123&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Feb 5 23:02:45 2010 @@ -79,7 +79,7 @@ }; Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, unsigned int ssf, - bool isLink = false, uint64_t objectId = 0); + bool isLink = false, uint64_t objectId = 0, bool shadow=false); ~Connection (); /** Get the SessionHandler for channel. Create if it does not already exist */ @@ -132,8 +132,6 @@ /** True if this is a shadow connection in a cluster. */ bool isShadow() { return shadow; } - /** Called by cluster to mark shadow connections */ - void setShadow() { shadow = true; } // Used by cluster to update connection status sys::AggregateOutput& getOutputTasks() { return outputTasks; } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=907123&r1=907122&r2=907123&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Feb 5 23:02:45 2010 @@ -509,10 +509,8 @@ assert(cp); } else { // New remote connection, create a shadow. - std::ostringstream mgmtId; unsigned int ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0; - mgmtId << id; - cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf); + cp = new Connection(*this, shadowOut, announce->getManagementId(), id, ssf); } connections.insert(ConnectionMap::value_type(id, cp)); } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=907123&r1=907122&r2=907123&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Feb 5 23:02:45 2010 @@ -23,6 +23,7 @@ #include "Cluster.h" #include "UpdateReceiver.h" +#include "qpid/assert.h" #include "qpid/broker/SessionState.h" #include "qpid/broker/SemanticState.h" #include "qpid/broker/TxBuffer.h" @@ -74,28 +75,30 @@ // Shadow connection -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, + const std::string& mgmtId, const ConnectionId& id, unsigned int ssf) : cluster(c), self(id), catchUp(false), output(*this, out), - connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId, ssf), + connectionCtor(&output, cluster.getBroker(), mgmtId, ssf, false, 0, true), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self), - consumerNumbering(c.getUpdateReceiver().consumerNumbering) + updateIn(c.getUpdateReceiver()) {} // Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& logId, MemberId member, + const std::string& mgmtId, MemberId member, bool isCatchUp, bool isLink, unsigned int ssf ) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), connectionCtor(&output, cluster.getBroker(), - isCatchUp ? shadowPrefix+logId : logId, + mgmtId, ssf, isLink, - isCatchUp ? ++catchUpId : 0), + isCatchUp ? ++catchUpId : 0, + isCatchUp), // isCatchUp => shadow expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self), - consumerNumbering(c.getUpdateReceiver().consumerNumbering) + updateIn(c.getUpdateReceiver()) { cluster.addLocalConnection(this); if (isLocalClient()) { @@ -104,12 +107,14 @@ QPID_LOG(info, "new client connection " << *this); giveReadCredit(cluster.getSettings().readMax); cluster.getMulticast().mcastControl( - ClusterConnectionAnnounceBody(ProtocolVersion(), getSsf()), getId()); + ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId, getSsf()), getId()); } else { - // Catch-up connections initialized immediately. + // Catch-up shadow connections initialized using nextShadow id. assert(catchUp); QPID_LOG(info, "new catch-up connection " << *this); + connectionCtor.mgmtId = updateIn.nextShadowMgmtId; + updateIn.nextShadowMgmtId.clear(); init(); } } @@ -127,7 +132,6 @@ connection->setClusterOrderOutput(nullFrameHandler); // Disable client throttling, done by active node. connection->setClientThrottling(false); - connection->setShadow(); // Mark the connection as a shadow. } if (!isCatchUp()) connection->setErrorListener(this); @@ -138,8 +142,9 @@ output.giveReadCredit(credit); } -void Connection::announce(uint32_t ssf) { - assert(ssf == connectionCtor.ssf); +void Connection::announce(const std::string& mgmtId, uint32_t ssf) { + QPID_ASSERT(mgmtId == connectionCtor.mgmtId); + QPID_ASSERT(ssf == connectionCtor.ssf); init(); } @@ -296,13 +301,17 @@ return sessionState().getSemanticState(); } +void Connection::shadowPrepare(const std::string& mgmtId) { + updateIn.nextShadowMgmtId = mgmtId; +} + void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position) { broker::SemanticState::ConsumerImpl& c = semanticState().find(name); c.position = position; c.setBlocked(blocked); if (notifyEnabled) c.enableNotify(); else c.disableNotify(); - consumerNumbering.add(c.shared_from_this()); + updateIn.consumerNumbering.add(c.shared_from_this()); } @@ -337,10 +346,15 @@ OutputTask* task = &session->getSemanticState().find(name); connection->getOutputTasks().addOutputTask(task); } - -void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) { + +void Connection::shadowReady( + uint64_t memberId, uint64_t connectionId, const string& mgmtId, + const string& username, const string& fragment, uint32_t sendMax) +{ + QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId()); ConnectionId shadowId = ConnectionId(memberId, connectionId); - QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); + QPID_LOG(debug, cluster << " catch-up connection " << *this + << " becomes shadow " << shadowId); self = shadowId; connection->setUserId(username); // OK to use decoder here because cluster is stalled for update. @@ -355,7 +369,7 @@ { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); cluster.updateInDone(ClusterMap(joiners, members, frameSeq, configSeq)); - consumerNumbering.clear(); + updateIn.consumerNumbering.clear(); self.second = 0; // Mark this as completed update connection. } @@ -503,9 +517,9 @@ } void Connection::addQueueListener(const std::string& q, uint32_t listener) { - if (listener >= consumerNumbering.size()) + if (listener >= updateIn.consumerNumbering.size()) throw Exception(QPID_MSG("Invalid listener ID: " << listener)); - findQueue(q)->getListeners().addListener(consumerNumbering[listener]); + findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]); } void Connection::managementSchema(const std::string& data) { Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=907123&r1=907122&r2=907123&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Feb 5 23:02:45 2010 @@ -65,10 +65,10 @@ public: /** Local connection. */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink, + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId, bool catchUp, bool isLink, unsigned int ssf); /** Shadow connection. */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id, + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id, unsigned int ssf); ~Connection(); @@ -109,6 +109,8 @@ // ==== Used in catch-up mode to build initial state. // // State update methods. + void shadowPrepare(const std::string&); + void sessionState(const framing::SequenceNumber& replayStart, const framing::SequenceNumber& sendCommandPoint, const framing::SequenceSet& sentIncomplete, @@ -119,7 +121,12 @@ void outputTask(uint16_t channel, const std::string& name); - void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax); + void shadowReady(uint64_t memberId, + uint64_t connectionId, + const std::string& managementId, + const std::string& username, + const std::string& fragment, + uint32_t sendMax); void membership(const framing::FieldTable&, const framing::FieldTable&, const framing::SequenceNumber& frameSeq, @@ -156,7 +163,7 @@ void exchange(const std::string& encoded); void giveReadCredit(int credit); - void announce(uint32_t ssf); + void announce(const std::string& mgmtId, uint32_t ssf); void abort(); void deliverClose(); @@ -182,6 +189,7 @@ unsigned int ssf; bool isLink; uint64_t objectId; + bool shadow; ConnectionCtor( sys::ConnectionOutputHandler* out_, @@ -189,12 +197,15 @@ const std::string& mgmtId_, unsigned int ssf_, bool isLink_=false, - uint64_t objectId_=0 - ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_), isLink(isLink_), objectId(objectId_) {} + uint64_t objectId_=0, + bool shadow_=false + ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_), + isLink(isLink_), objectId(objectId_), shadow(shadow_) + {} std::auto_ptr<broker::Connection> construct() { return std::auto_ptr<broker::Connection>( - new broker::Connection(out, broker, mgmtId, ssf, isLink, objectId)); + new broker::Connection(out, broker, mgmtId, ssf, isLink, objectId, shadow)); } }; @@ -225,7 +236,7 @@ boost::shared_ptr<broker::TxBuffer> txBuffer; bool expectProtocolHeader; McastFrameHandler mcastFrameHandler; - UpdateReceiver::ConsumerNumbering& consumerNumbering; + UpdateReceiver& updateIn; static qpid::sys::AtomicValue<uint64_t> catchUpId; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=907123&r1=907122&r2=907123&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Fri Feb 5 23:02:45 2010 @@ -57,6 +57,7 @@ #include <boost/bind.hpp> #include <boost/cast.hpp> #include <algorithm> +#include <sstream> namespace qpid { namespace cluster { @@ -148,7 +149,7 @@ ClusterConnectionProxy(session).expiryId(expiry.getId()); updateManagementAgent(); - + ClusterConnectionMembershipBody membership; map.toMethodBody(membership); AMQFrame frame(membership); @@ -328,6 +329,14 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) { QPID_LOG(debug, updaterId << " updating connection " << *updateConnection); + + // Send the management ID first on the main connection. + std::string mgmtId = updateConnection->getBrokerConnection().getMgmtId(); + ClusterConnectionProxy(session).shadowPrepare(mgmtId); + // Make sure its received before opening shadow connection + session.sync(); + + // Open shadow connection and update it. shadowConnection = catchUpConnection(); broker::Connection& bc = updateConnection->getBrokerConnection(); @@ -341,6 +350,7 @@ ClusterConnectionProxy(shadowConnection).shadowReady( updateConnection->getId().getMember(), updateConnection->getId().getNumber(), + bc.getMgmtId(), bc.getUserId(), string(fragment.first, fragment.second), updateConnection->getOutput().getSendMax() Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h?rev=907123&r1=907122&r2=907123&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h Fri Feb 5 23:02:45 2010 @@ -36,6 +36,9 @@ /** Numbering used to identify Queue listeners as consumers */ typedef Numbering<boost::shared_ptr<broker::SemanticState::ConsumerImpl> > ConsumerNumbering; ConsumerNumbering consumerNumbering; + + /** Management-id for the next shadow connection */ + std::string nextShadowMgmtId; }; }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/xml/cluster.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=907123&r1=907122&r2=907123&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/xml/cluster.xml (original) +++ qpid/trunk/qpid/cpp/xml/cluster.xml Fri Feb 5 23:02:45 2010 @@ -117,6 +117,7 @@ <!-- Announce a new connection --> <control name="announce" code="0x1"> + <field name="management-id" type="str16"/> <!-- Security Strength Factor (ssf): if the transport provides encryption (e.g. ssl), ssf is the bit length of the key. Zero if no encryption provided. --> @@ -135,13 +136,18 @@ <control name="abort" code="0x4"/> <!-- Update controls. Sent to a new broker in joining mode. - A connection is updateed as followed: - - open as a normal connection. + A connection is updated as followed: + - send the shadow's management ID in shadow-perpare on the update connection + - open the shadow as a normal connection. - attach sessions, create consumers, set flow with normal AMQP cokmmands. - send /reset additional session state with controls below. - send shadow-ready to mark end of shadow update. - send membership when entire update is complete. --> + <!-- Prepare to send a shadow connection with the given ID. --> + <control name="shadow-prepare" code="0x0F"> + <field name="management-id" type="str16"/> + </control> <!-- Consumer state that cannot be set by standard AMQP controls. --> <control name="consumer-state" code="0x10"> @@ -202,6 +208,7 @@ <control name="shadow-ready" code="0x20" label="End of shadow connection update."> <field name="member-id" type="uint64"/> <field name="connection-id" type="uint64"/> + <field name="management-id" type="str16"/> <field name="user-name" type="str8"/> <field name="fragment" type="str32"/> <field name="send-max" type="uint32"/> Modified: qpid/trunk/qpid/python/commands/qpid-cluster URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-cluster?rev=907123&r1=907122&r2=907123&view=diff ============================================================================== --- qpid/trunk/qpid/python/commands/qpid-cluster (original) +++ qpid/trunk/qpid/python/commands/qpid-cluster Fri Feb 5 23:02:45 2010 @@ -193,7 +193,6 @@ self.qmf.delBroker(self.broker) self.broker = None self.brokers = [] - pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") idx = 0 for host in hostList: @@ -209,7 +208,7 @@ print "Clients on Member: ID=%s:" % displayList[idx] connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker) for conn in connList: - if pattern.match(conn.address): + if not conn.shadow: if self.config._numeric or self.config._delConn: a = conn.address else: Modified: qpid/trunk/qpid/python/commands/qpid-stat URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-stat?rev=907123&r1=907122&r2=907123&view=diff ============================================================================== --- qpid/trunk/qpid/python/commands/qpid-stat (original) +++ qpid/trunk/qpid/python/commands/qpid-stat Fri Feb 5 23:02:45 2010 @@ -34,7 +34,6 @@ _limit = 50 _increasing = False _sortcol = None -pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$") def Usage (): print "Usage: qpid-stat [OPTIONS] [broker-addr]" @@ -108,7 +107,7 @@ list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent) for conn in list: - if pattern.match(conn.address): + if not conn.shadow: self.connections[conn.getObjectId()] = conn list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent) Modified: qpid/trunk/qpid/specs/management-schema.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=907123&r1=907122&r2=907123&view=diff ============================================================================== --- qpid/trunk/qpid/specs/management-schema.xml (original) +++ qpid/trunk/qpid/specs/management-schema.xml Fri Feb 5 23:02:45 2010 @@ -236,6 +236,7 @@ <property name="remoteProcessName" type="sstr" access="RO" optional="y" desc="Name of executable running as remote client"/> <property name="remotePid" type="uint32" access="RO" optional="y" desc="Process ID of remote client"/> <property name="remoteParentPid" type="uint32" access="RO" optional="y" desc="Parent Process ID of remote client"/> + <property name="shadow" type="bool" access="RO" desc="True for shadow connections"/> <statistic name="closing" type="bool" desc="This client is closing by management request"/> <statistic name="framesFromClient" type="count64"/> <statistic name="framesToClient" type="count64"/> --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:dev-subscr...@qpid.apache.org