Author: kgiusti
Date: Tue May  1 13:57:13 2012
New Revision: 1332653

URL: http://svn.apache.org/viewvc?rev=1332653&view=rev
Log:
QPID-3963: subscribe link to remote broker's to amq.failover exchange.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1332653&r1=1332652&r2=1332653&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue May  1 13:57:13 2012
@@ -31,6 +31,8 @@
 #include "qpid/framing/enum.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/broker/AclModule.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/UrlArray.h"
 
 namespace qpid {
 namespace broker {
@@ -65,6 +67,50 @@ struct LinkTimerTask : public sys::Timer
     sys::Timer& timer;
 };
 
+
+namespace {
+    const std::string FAILOVER_EXCHANGE("amq.failover");
+    const std::string FAILOVER_HEADER_KEY("amq.failover");
+    const framing::ChannelId FAILOVER_CHANNEL(framing::CHANNEL_HIGH_BIT | 1);  
  // reserved for this link
+}
+
+/** LinkExchange is used by the link to subscribe to the remote broker's 
amq.failover exchange.
+ */
+class LinkExchange : public broker::Exchange
+{
+public:
+    LinkExchange(Link& _link, const std::string& name) : Exchange(name), 
link(_link) {}
+    ~LinkExchange() {};
+    std::string getType() const { return std::string("qpid.LinkExchange"); }
+
+    // Exchange methods - set up to prevent binding/unbinding etc from clients!
+    bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const 
framing::FieldTable*) { return false; }
+    bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const 
framing::FieldTable*) { return false; }
+    bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, 
const framing::FieldTable* const) {return false;}
+
+    // Process messages sent from the remote's amq.failover exchange by 
extracting the failover URLs
+    // and saving them should the Link need to reconnect.
+    void route(broker::Deliverable& msg)
+    {
+        const framing::FieldTable* headers = 
msg.getMessage().getApplicationHeaders();
+        framing::Array addresses;
+        if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) {
+            // convert the Array of addresses to a single Url container for 
used with setUrl():
+            std::vector<Url> urlVec;
+            Url urls;
+            urlVec = urlArrayToVector(addresses);
+            for(size_t i = 0; i < urlVec.size(); ++i)
+                urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end());
+            QPID_LOG(debug, "Remote broker has provided these failover 
addresses= " << urls);
+            link.setUrl(urls);
+        }
+    }
+
+private:
+    Link& link;
+};
+
+
 Link::Link(LinkRegistry*  _links,
            MessageStore*  _store,
            const string&        _host,
@@ -106,15 +152,22 @@ Link::Link(LinkRegistry*  _links,
         startConnectionLH();
     }
     broker->getTimer().add(timerTask);
+
+    exchange.reset(new broker::LinkExchange(*this,
+                                            "qpid.link." + 
framing::Uuid(true).str()));
+    broker->getExchanges().registerExchange(exchange);
 }
 
 Link::~Link ()
 {
-    if (state == STATE_OPERATIONAL && connection != 0)
-        connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by 
management");
+    if (state == STATE_OPERATIONAL && connection != 0) {
+        closeConnection("closed by management");
+    }
 
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
+
+    broker->getExchanges().destroy(exchange->getName());
 }
 
 void Link::setStateLH (int newState)
@@ -185,6 +238,20 @@ void Link::setUrl(const Url& u) {
     reconnectNext = 0;
 }
 
+
+namespace {
+    /** invoked when session used to subscribe to remote's amq.failover 
exchange detaches */
+    void sessionDetached(Link *link) {
+        // ??? really not sure what the right thing to do here, if anything...
+        // ??? Q: do I need to cancel the subscription and detached the 
session in the I/O thread (???)
+        // e.g:
+        //peer->getMessage().cancel(args.i_dest);
+        //peer->getSession().detach(name);
+        QPID_LOG(debug, "detached from 'amq.failover' for link: " << 
link->getName());
+    }
+}
+
+
 void Link::opened() {
     Mutex::ScopedLock mutex(lock);
     if (!connection) return;
@@ -198,6 +265,40 @@ void Link::opened() {
         reconnectNext = 0;
         QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
     }
+
+    //
+    // attempt to subscribe to failover exchange for updates from remote
+    //
+
+    const std::string queueName = "qpid.link." + framing::Uuid(true).str();
+
+    SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL);
+    sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) );
+    sessionHandler.attachAs(getName());
+
+    framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+
+    remoteBroker.getQueue().declare(queueName,
+                                    "",         // alt-exchange
+                                    false,      // passive
+                                    false,      // durable
+                                    true,       // exclusive
+                                    true,       // auto-delete
+                                    FieldTable());
+    remoteBroker.getExchange().bind(queueName,
+                                    FAILOVER_EXCHANGE,
+                                    "",     // no key
+                                    FieldTable());
+    remoteBroker.getMessage().subscribe(queueName,
+                                        exchange->getName(),
+                                        1,           // implied-accept mode
+                                        0,           // pre-acquire mode
+                                        false,       // exclusive
+                                        "",          // resume-id
+                                        0,           // resume-ttl
+                                        FieldTable());
+    remoteBroker.getMessage().flow(exchange->getName(), 0, 0xFFFFFFFF);
+    remoteBroker.getMessage().flow(exchange->getName(), 1, 0xFFFFFFFF);
 }
 
 void Link::closed(int, std::string text)
@@ -241,9 +342,7 @@ void Link::destroy ()
         Mutex::ScopedLock mutex(lock);
 
         QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " 
removed by management");
-        if (connection)
-            connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by 
management");
-        connection = 0;
+        closeConnection("closed by management");
         setStateLH(STATE_CLOSED);
 
         // Move the bridges to be deleted into a local vector so there is no
@@ -399,7 +498,8 @@ bool Link::hideManagement() const {
 uint Link::nextChannel()
 {
     Mutex::ScopedLock mutex(lock);
-
+    if (channelCounter >= framing::CHANNEL_MAX)
+        channelCounter = 1;
     return channelCounter++;
 }
 
@@ -542,4 +642,21 @@ void Link::setPassive(bool passive)
     }
 }
 
+
+/** utility to clean up connection resources correctly */
+void Link::closeConnection( const std::string& reason)
+{
+    if (connection != 0) {
+        // cancel our subscription to the failover exchange
+        SessionHandler& sessionHandler = 
connection->getChannel(FAILOVER_CHANNEL);
+        if (sessionHandler.getSession()) {
+            framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+            remoteBroker.getMessage().cancel(exchange->getName());
+            remoteBroker.getSession().detach(getName());
+        }
+        connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
+        connection = 0;
+    }
+}
+
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=1332653&r1=1332652&r2=1332653&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Tue May  1 13:57:13 2012
@@ -47,6 +47,7 @@ namespace broker {
 class LinkRegistry;
 class Broker;
 class Connection;
+class LinkExchange;
 
 class Link : public PersistableConfig, public management::Manageable {
   private:
@@ -77,8 +78,8 @@ class Link : public PersistableConfig, p
     uint channelCounter;
     Connection* connection;
     management::ManagementAgent* agent;
-
     boost::intrusive_ptr<sys::TimerTask> timerTask;
+    boost::shared_ptr<broker::LinkExchange> exchange;
 
     static const int STATE_WAITING     = 1;
     static const int STATE_CONNECTING  = 2;
@@ -100,8 +101,9 @@ class Link : public PersistableConfig, p
     void opened();      // Called when connection is open (after create)
     void closed(int, std::string);   // Called when connection goes away
     void reconnectLH(const Address&); //called by LinkRegistry
+    void closeConnection(const std::string& reason);
 
-  friend class LinkRegistry; // to call established, opened, closed
+    friend class LinkRegistry; // to call established, opened, closed
 
   public:
     typedef boost::shared_ptr<Link> shared_ptr;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to