Author: aconway Date: Mon Apr 29 15:57:59 2013 New Revision: 1477165 URL: http://svn.apache.org/r1477165 Log: QPID-4787: HA brokers find self-address in brokers_url.
HA brokers need to know their own addresses, but it is not safe to simply use local hosts name and Broker::getPort() since the broker may be listening on multiple addresses. The solution is to have brokers check the ha-rokers-url for their own address while doing the initial status check of the cluster. Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp qpid/trunk/qpid/cpp/src/tests/ha_tests.py Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp?rev=1477165&r1=1477164&r2=1477165&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp Mon Apr 29 15:57:59 2013 @@ -89,8 +89,12 @@ void BrokerInfo::assign(const Variant::M } std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) { - return o << b.getHostName() << ":" << b.getPort() << "(" - << printable(b.getStatus()) << ")"; + o << "FIXME:"; + o << b.getSystemId().str().substr(0,7); + if (!b.getHostName().empty()) + o << "@" << b.getHostName() << ":" << b.getPort(); + o << "(" << printable(b.getStatus()) << ")"; + return o; } std::ostream& operator<<(std::ostream& o, const BrokerInfo::Set& infos) { Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp?rev=1477165&r1=1477164&r2=1477165&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp Mon Apr 29 15:57:59 2013 @@ -22,6 +22,7 @@ #include "ConnectionObserver.h" #include "BrokerInfo.h" #include "HaBroker.h" +#include "qpid/Url.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Connection.h" #include "qpid/log/Statement.h" @@ -41,6 +42,17 @@ bool ConnectionObserver::getBrokerInfo(c return false; } +bool ConnectionObserver::getAddress(const broker::Connection& connection, Address& addr) { + Url url; + url.parseNoThrow( + connection.getClientProperties().getAsString(ConnectionObserver::ADDRESS_TAG).c_str()); + if (!url.empty()) { + addr = url[0]; + return true; + } + return false; +} + void ConnectionObserver::setObserver(const ObserverPtr& o, const std::string& newlogPrefix) { sys::Mutex::ScopedLock l(lock); @@ -60,17 +72,20 @@ bool ConnectionObserver::isSelf(const br void ConnectionObserver::opened(broker::Connection& connection) { try { + if (isSelf(connection)) { // Reject self connections + // Set my own address if there is an address header. + Address addr; + if (getAddress(connection, addr)) haBroker.setAddress(addr); + QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId()); + connection.abort(); + return; + } if (connection.isLink()) return; // Allow outgoing links. if (connection.getClientProperties().isSet(ADMIN_TAG)) { QPID_LOG(debug, logPrefix << "Accepted admin connection: " << connection.getMgmtId()); return; // No need to call observer, always allow admins. } - if (isSelf(connection)) { // Reject self connections - QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId()); - connection.abort(); - return; - } ObserverPtr o(getObserver()); if (o) o->opened(connection); } @@ -94,5 +109,6 @@ void ConnectionObserver::closed(broker:: const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin"; const std::string ConnectionObserver::BACKUP_TAG="qpid.ha-backup"; +const std::string ConnectionObserver::ADDRESS_TAG="qpid.ha-address"; }} // namespace qpid::ha Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h?rev=1477165&r1=1477164&r2=1477165&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/ConnectionObserver.h Mon Apr 29 15:57:59 2013 @@ -29,6 +29,8 @@ #include "boost/shared_ptr.hpp" namespace qpid { +class Address; + namespace ha { class BrokerInfo; class HaBroker; @@ -50,8 +52,10 @@ class ConnectionObserver : public broker static const std::string ADMIN_TAG; static const std::string BACKUP_TAG; + static const std::string ADDRESS_TAG; - static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info); + static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo&); + static bool getAddress(const broker::Connection& connection, Address&); ConnectionObserver(HaBroker& haBroker, const types::Uuid& self); Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1477165&r1=1477164&r2=1477165&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Apr 29 15:57:59 2013 @@ -84,15 +84,7 @@ bool isNone(const std::string& x) { retu // Called in Plugin::initialize void HaBroker::initialize() { - // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port. - membership.add( - BrokerInfo( - membership.getSelf(), - settings.cluster ? JOINING : membership.getStatus(), - broker.getSystem()->getNodeName(), - broker.getPort(broker::Broker::TCP_TRANSPORT) - ) - ); + if (settings.cluster) membership.setStatus(JOINING); QPID_LOG(notice, "Initializing: " << membership.getInfo()); // Set up the management object. @@ -207,4 +199,11 @@ BrokerStatus HaBroker::getStatus() const return membership.getStatus(); } +void HaBroker::setAddress(const Address& a) { + QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a); + BrokerInfo b(membership.getSelf(), membership.getStatus(), a.host, a.port); + membership.add(b); +} + + }} // namespace qpid::ha Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1477165&r1=1477164&r2=1477165&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Mon Apr 29 15:57:59 2013 @@ -90,6 +90,8 @@ class HaBroker : public management::Mana Membership& getMembership() { return membership; } types::Uuid getSystemId() const { return systemId; } + void setAddress(const Address&); // set self address from a self-connection + private: void setPublicUrl(const Url&); Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1477165&r1=1477164&r2=1477165&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp Mon Apr 29 15:57:59 2013 @@ -19,6 +19,7 @@ * */ #include "StatusCheck.h" +#include "ConnectionObserver.h" #include "qpid/log/Statement.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/Connection.h" @@ -55,7 +56,9 @@ void StatusCheckThread::run() { try { Variant::Map options, clientProperties; clientProperties = brokerInfo.asMap(); // Detect self connections. - clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups. + clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups. + clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str(); + clientProperties[ConnectionObserver::BACKUP_TAG] = brokerInfo.asMap(); options["client-properties"] = clientProperties; options["heartbeat"] = statusCheck.linkHeartbeatInterval/sys::TIME_SEC; Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1477165&r1=1477164&r2=1477165&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original) +++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Apr 29 15:57:59 2013 @@ -545,26 +545,25 @@ class ReplicationTests(HaBrokerTest): """Check that broker information is correctly published via management""" cluster = HaCluster(self, 3) + def ha_broker(broker): + ha_broker = broker.agent().getHaBroker(); + ha_broker.update() + return ha_broker + for broker in cluster: # Make sure HA system-id matches broker's - qmf = broker.agent().getHaBroker() - self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef)) + self.assertEqual(ha_broker(broker).systemId, UUID(broker.agent().getBroker().systemRef)) - cluster_ports = map(lambda b: b.port(), cluster) - cluster_ports.sort() - def ports(qmf): - qmf.update() - return sorted(map(lambda b: b["port"], qmf.members)) # Check that all brokers have the same membership as the cluster - for broker in cluster: - qmf = broker.agent().getHaBroker() - assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker) + def check_ids(broker): + cluster_ids = set([ ha_broker(b).systemId for b in cluster]) + broker_ids = set([m["system-id"] for m in ha_broker(broker).members]) + assert retry(lambda: cluster_ids == broker_ids, 1), "%s != %s on %s"%(cluster_ids, broker_ids, broker) + + for broker in cluster: check_ids(broker) + # Add a new broker, check it is updated everywhere b = cluster.start() - cluster_ports.append(b.port()) - cluster_ports.sort() - for broker in cluster: - qmf = broker.agent().getHaBroker() - assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf)) + for broker in cluster: check_ids(broker) def test_auth(self): """Verify that authentication does not interfere with replication.""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org