Author: kgiusti
Date: Tue May  1 13:57:50 2012
New Revision: 1332657

URL: http://svn.apache.org/viewvc?rev=1332657&view=rev
Log:
QPID-3963: replicate learned failover addresses to new cluster members

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1332657&r1=1332656&r2=1332657&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue May  1 13:57:50 2012
@@ -104,7 +104,7 @@ public:
             urlVec = urlArrayToVector(addresses);
             for(size_t i = 0; i < urlVec.size(); ++i)
                 urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end());
-            QPID_LOG(notice, "Remote broker has provided these failover 
addresses= " << urls);
+            QPID_LOG(debug, "Remote broker has provided these failover 
addresses= " << urls);
             link->setUrl(urls);
         }
     }
@@ -253,6 +253,7 @@ void Link::established(Connection* c)
 
 
 void Link::setUrl(const Url& u) {
+    QPID_LOG(info, "Setting remote broker failover addresses for link '" << 
getName() << "' to these urls: " << u);
     Mutex::ScopedLock mutex(lock);
     url = u;
     reconnectNext = 0;
@@ -687,6 +688,35 @@ bool Link::getRemoteAddress(qpid::Addres
 }
 
 
+// FieldTable keys for internal state data
+namespace {
+    const std::string FAILOVER_ADDRESSES("failover-addresses");
+    const std::string FAILOVER_INDEX("failover-index");
+}
+
+void Link::getState(framing::FieldTable& state) const
+{
+    state.clear();
+    Mutex::ScopedLock mutex(lock);
+    if (!url.empty()) {
+        state.setString(FAILOVER_ADDRESSES, url.str());
+        state.setInt(FAILOVER_INDEX, reconnectNext);
+    }
+}
+
+void Link::setState(const framing::FieldTable& state)
+{
+    Mutex::ScopedLock mutex(lock);
+    if (state.isSet(FAILOVER_ADDRESSES)) {
+        Url failovers(state.getAsString(FAILOVER_ADDRESSES));
+        setUrl(failovers);
+    }
+    if (state.isSet(FAILOVER_INDEX)) {
+        reconnectNext = state.getAsInt(FAILOVER_INDEX);
+    }
+}
+
+
 const std::string Link::exchangeTypeName("qpid.LinkExchange");
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=1332657&r1=1332656&r2=1332657&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Tue May  1 13:57:50 2012
@@ -51,7 +51,7 @@ class LinkExchange;
 
 class Link : public PersistableConfig, public management::Manageable {
   private:
-    sys::Mutex          lock;
+    mutable sys::Mutex  lock;
     LinkRegistry*       links;
     MessageStore*       store;
 
@@ -174,6 +174,10 @@ class Link : public PersistableConfig, p
     // manage the exchange owned by this link
     static const std::string exchangeTypeName;
     static boost::shared_ptr<Exchange> linkExchangeFactory(const std::string& 
name);
+
+    // replicate internal state of this Link for clustering
+    void getState(framing::FieldTable& state) const;
+    void setState(const framing::FieldTable& state);
 };
 }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1332657&r1=1332656&r2=1332657&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue May  1 13:57:50 2012
@@ -797,6 +797,54 @@ void Connection::config(const std::strin
     else throw Exception(QPID_MSG("Update failed, invalid kind of config: " << 
kind));
 }
 
+namespace {
+    // find a Link that matches the given Address
+    class LinkFinder {
+        qpid::Address id;
+        boost::shared_ptr<broker::Link> link;
+    public:
+        LinkFinder(const qpid::Address& _id) : id(_id) {}
+        boost::shared_ptr<broker::Link> getLink() { return link; }
+        void operator() (boost::shared_ptr<broker::Link> l)
+        {
+            if (!link) {
+                qpid::Address addr(l->getTransport(), l->getHost(), 
l->getPort());
+                if (id == addr) {
+                    link = l;
+                }
+            }
+        }
+    };
+}
+
+void Connection::internalState(const std::string& type,
+                               const std::string& name,
+                               const framing::FieldTable& state)
+{
+    if (type == "link") {
+        // name is the string representation of the Link's _configured_ 
destination address
+        Url dest;
+        try {
+            dest = name;
+        } catch(...) {
+            throw Exception(QPID_MSG("Update failed, invalid format for Link 
destination address: " << name));
+        }
+        assert(dest.size());
+        LinkFinder finder(dest[0]);
+        cluster.getBroker().getLinks().eachLink(boost::ref(finder));
+        if (finder.getLink()) {
+            try {
+                finder.getLink()->setState(state);
+            } catch(...) {
+                throw Exception(QPID_MSG("Update failed, invalid state for 
Link " << name << ", state: " << state));
+            }
+            QPID_LOG(debug, cluster << " updated link " << dest[0] << " with 
state: " << state);
+        } else throw Exception(QPID_MSG("Update failed, unable to find Link 
named: " << name));
+    }
+    else throw Exception(QPID_MSG("Update failed, invalid object type for 
internal state replication: " << type));
+}
+
+
 void Connection::doCatchupIoCallbacks() {
     // We need to process IO callbacks during the catch-up phase in
     // order to service asynchronous completions for messages

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1332657&r1=1332656&r2=1332657&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue May  1 13:57:50 2012
@@ -200,6 +200,8 @@ class Connection :
                               const std::string& instance);
 
     void config(const std::string& encoded);
+    void internalState(const std::string& type, const std::string& name,
+                       const framing::FieldTable& state);
 
     void setSecureConnection ( broker::SecureConnection * sc );
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1332657&r1=1332656&r2=1332657&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue May  1 13:57:50 
2012
@@ -688,7 +688,15 @@ void UpdateClient::updateLinks() {
 void UpdateClient::updateLink(const boost::shared_ptr<broker::Link>& link) {
     QPID_LOG(debug, *this << " updating link "
              << link->getHost() << ":" << link->getPort());
-    ClusterConnectionProxy(session).config(encode(*link));
+    ClusterConnectionProxy(session).config(encode(*link));  // push the 
configuration
+    // now push the current state
+    framing::FieldTable state;
+    link->getState(state);
+    std::ostringstream os;
+    os << qpid::Address(link->getTransport(), link->getHost(), 
link->getPort());
+    ClusterConnectionProxy(session).internalState(std::string("link"),
+                                                  os.str(),
+                                                  state);
 }
 
 void UpdateClient::updateBridge(const boost::shared_ptr<broker::Bridge>& 
bridge) {

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1332657&r1=1332656&r2=1332657&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue May  1 13:57:50 2012
@@ -326,6 +326,12 @@
       <field name="dequeueSincePurge" type="uint32"/>
     </control>
 
+    <!-- Replicate the internal state for an object - e.g. Links, bridges, etc 
-->
+    <control name="internal-state" code="0x42">
+      <field name="type" type="str8"/> <!-- The type of object the state is 
for (e.g. 'link') -->
+      <field name="name" type="str8"/> <!-- Identifies the particular object 
to be updated -->
+      <field name="state" type="map"/> <!-- The internal state for the object 
-->
+    </control>
 
   </class>
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to