Author: kgiusti Date: Tue May 1 13:57:50 2012 New Revision: 1332657 URL: http://svn.apache.org/viewvc?rev=1332657&view=rev Log: QPID-3963: replicate learned failover addresses to new cluster members
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Link.h qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp qpid/trunk/qpid/cpp/xml/cluster.xml Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1332657&r1=1332656&r2=1332657&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue May 1 13:57:50 2012 @@ -104,7 +104,7 @@ public: urlVec = urlArrayToVector(addresses); for(size_t i = 0; i < urlVec.size(); ++i) urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end()); - QPID_LOG(notice, "Remote broker has provided these failover addresses= " << urls); + QPID_LOG(debug, "Remote broker has provided these failover addresses= " << urls); link->setUrl(urls); } } @@ -253,6 +253,7 @@ void Link::established(Connection* c) void Link::setUrl(const Url& u) { + QPID_LOG(info, "Setting remote broker failover addresses for link '" << getName() << "' to these urls: " << u); Mutex::ScopedLock mutex(lock); url = u; reconnectNext = 0; @@ -687,6 +688,35 @@ bool Link::getRemoteAddress(qpid::Addres } +// FieldTable keys for internal state data +namespace { + const std::string FAILOVER_ADDRESSES("failover-addresses"); + const std::string FAILOVER_INDEX("failover-index"); +} + +void Link::getState(framing::FieldTable& state) const +{ + state.clear(); + Mutex::ScopedLock mutex(lock); + if (!url.empty()) { + state.setString(FAILOVER_ADDRESSES, url.str()); + state.setInt(FAILOVER_INDEX, reconnectNext); + } +} + +void Link::setState(const framing::FieldTable& state) +{ + Mutex::ScopedLock mutex(lock); + if (state.isSet(FAILOVER_ADDRESSES)) { + Url failovers(state.getAsString(FAILOVER_ADDRESSES)); + setUrl(failovers); + } + if (state.isSet(FAILOVER_INDEX)) { + reconnectNext = state.getAsInt(FAILOVER_INDEX); + } +} + + const std::string Link::exchangeTypeName("qpid.LinkExchange"); }} // namespace qpid::broker Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=1332657&r1=1332656&r2=1332657&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Tue May 1 13:57:50 2012 @@ -51,7 +51,7 @@ class LinkExchange; class Link : public PersistableConfig, public management::Manageable { private: - sys::Mutex lock; + mutable sys::Mutex lock; LinkRegistry* links; MessageStore* store; @@ -174,6 +174,10 @@ class Link : public PersistableConfig, p // manage the exchange owned by this link static const std::string exchangeTypeName; static boost::shared_ptr<Exchange> linkExchangeFactory(const std::string& name); + + // replicate internal state of this Link for clustering + void getState(framing::FieldTable& state) const; + void setState(const framing::FieldTable& state); }; } } Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1332657&r1=1332656&r2=1332657&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue May 1 13:57:50 2012 @@ -797,6 +797,54 @@ void Connection::config(const std::strin else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << kind)); } +namespace { + // find a Link that matches the given Address + class LinkFinder { + qpid::Address id; + boost::shared_ptr<broker::Link> link; + public: + LinkFinder(const qpid::Address& _id) : id(_id) {} + boost::shared_ptr<broker::Link> getLink() { return link; } + void operator() (boost::shared_ptr<broker::Link> l) + { + if (!link) { + qpid::Address addr(l->getTransport(), l->getHost(), l->getPort()); + if (id == addr) { + link = l; + } + } + } + }; +} + +void Connection::internalState(const std::string& type, + const std::string& name, + const framing::FieldTable& state) +{ + if (type == "link") { + // name is the string representation of the Link's _configured_ destination address + Url dest; + try { + dest = name; + } catch(...) { + throw Exception(QPID_MSG("Update failed, invalid format for Link destination address: " << name)); + } + assert(dest.size()); + LinkFinder finder(dest[0]); + cluster.getBroker().getLinks().eachLink(boost::ref(finder)); + if (finder.getLink()) { + try { + finder.getLink()->setState(state); + } catch(...) { + throw Exception(QPID_MSG("Update failed, invalid state for Link " << name << ", state: " << state)); + } + QPID_LOG(debug, cluster << " updated link " << dest[0] << " with state: " << state); + } else throw Exception(QPID_MSG("Update failed, unable to find Link named: " << name)); + } + else throw Exception(QPID_MSG("Update failed, invalid object type for internal state replication: " << type)); +} + + void Connection::doCatchupIoCallbacks() { // We need to process IO callbacks during the catch-up phase in // order to service asynchronous completions for messages Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1332657&r1=1332656&r2=1332657&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue May 1 13:57:50 2012 @@ -200,6 +200,8 @@ class Connection : const std::string& instance); void config(const std::string& encoded); + void internalState(const std::string& type, const std::string& name, + const framing::FieldTable& state); void setSecureConnection ( broker::SecureConnection * sc ); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1332657&r1=1332656&r2=1332657&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue May 1 13:57:50 2012 @@ -688,7 +688,15 @@ void UpdateClient::updateLinks() { void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) { QPID_LOG(debug, *this << " updating link " << link->getHost() << ":" << link->getPort()); - ClusterConnectionProxy(session).config(encode(*link)); + ClusterConnectionProxy(session).config(encode(*link)); // push the configuration + // now push the current state + framing::FieldTable state; + link->getState(state); + std::ostringstream os; + os << qpid::Address(link->getTransport(), link->getHost(), link->getPort()); + ClusterConnectionProxy(session).internalState(std::string("link"), + os.str(), + state); } void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& bridge) { Modified: qpid/trunk/qpid/cpp/xml/cluster.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1332657&r1=1332656&r2=1332657&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/xml/cluster.xml (original) +++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue May 1 13:57:50 2012 @@ -326,6 +326,12 @@ <field name="dequeueSincePurge" type="uint32"/> </control> + <!-- Replicate the internal state for an object - e.g. Links, bridges, etc --> + <control name="internal-state" code="0x42"> + <field name="type" type="str8"/> <!-- The type of object the state is for (e.g. 'link') --> + <field name="name" type="str8"/> <!-- Identifies the particular object to be updated --> + <field name="state" type="map"/> <!-- The internal state for the object --> + </control> </class> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org