Author: kgiusti
Date: Tue May  1 13:57:21 2012
New Revision: 1332654

URL: http://svn.apache.org/viewvc?rev=1332654&view=rev
Log:
QPID-3963: fix naming of link exchange, and exchange creation/replication 
handling

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
    qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=1332654&r1=1332653&r2=1332654&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Tue May  1 
13:57:21 2012
@@ -24,6 +24,7 @@
 #include "qpid/broker/FanOutExchange.h"
 #include "qpid/broker/HeadersExchange.h"
 #include "qpid/broker/TopicExchange.h"
+#include "qpid/broker/Link.h"
 #include "qpid/management/ManagementDirectExchange.h"
 #include "qpid/management/ManagementTopicExchange.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -58,6 +59,8 @@ pair<Exchange::shared_ptr, bool> Exchang
             exchange = Exchange::shared_ptr(new ManagementDirectExchange(name, 
durable, args, parent, broker));
         }else if (type == ManagementTopicExchange::typeName) {
             exchange = Exchange::shared_ptr(new ManagementTopicExchange(name, 
durable, args, parent, broker));
+        }else if (type == Link::exchangeTypeName) {
+            exchange = Link::linkExchangeFactory(name);
         }else{
             FunctionMap::iterator i =  factory.find(type);
             if (i == factory.end()) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1332654&r1=1332653&r2=1332654&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue May  1 13:57:21 2012
@@ -79,9 +79,9 @@ namespace {
 class LinkExchange : public broker::Exchange
 {
 public:
-    LinkExchange(Link& _link, const std::string& name) : Exchange(name), 
link(_link) {}
+    LinkExchange(const std::string& name) : Exchange(name), link(0) {}
     ~LinkExchange() {};
-    std::string getType() const { return std::string("qpid.LinkExchange"); }
+    std::string getType() const { return Link::exchangeTypeName; }
 
     // Exchange methods - set up to prevent binding/unbinding etc from clients!
     bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const 
framing::FieldTable*) { return false; }
@@ -92,6 +92,7 @@ public:
     // and saving them should the Link need to reconnect.
     void route(broker::Deliverable& msg)
     {
+        if (!link) return;
         const framing::FieldTable* headers = 
msg.getMessage().getApplicationHeaders();
         framing::Array addresses;
         if (headers && headers->getArray(FAILOVER_HEADER_KEY, addresses)) {
@@ -102,15 +103,26 @@ public:
             for(size_t i = 0; i < urlVec.size(); ++i)
                 urls.insert(urls.end(), urlVec[i].begin(), urlVec[i].end());
             QPID_LOG(debug, "Remote broker has provided these failover 
addresses= " << urls);
-            link.setUrl(urls);
+            link->setUrl(urls);
         }
     }
 
+    void setLink(Link *_link)
+    {
+        assert(!link);
+        link = _link;
+    }
+
 private:
-    Link& link;
+    Link *link;
 };
 
 
+boost::shared_ptr<Exchange> Link::linkExchangeFactory( const std::string& 
_name )
+{
+    return Exchange::shared_ptr(new LinkExchange(_name));
+}
+
 Link::Link(LinkRegistry*  _links,
            MessageStore*  _store,
            const string&        _host,
@@ -122,8 +134,9 @@ Link::Link(LinkRegistry*  _links,
            const string&        _password,
            Broker*        _broker,
            Manageable*    parent)
-    : links(_links), store(_store), host(_host), port(_port),
-      transport(_transport),
+    : links(_links), store(_store),
+      configuredTransport(_transport), configuredHost(_host), 
configuredPort(_port),
+      host(_host), port(_port), transport(_transport),
       durable(_durable),
       authMechanism(_authMechanism), username(_username), password(_password),
       persistenceId(0), mgmtObject(0), broker(_broker), state(0),
@@ -153,9 +166,13 @@ Link::Link(LinkRegistry*  _links,
     }
     broker->getTimer().add(timerTask);
 
-    exchange.reset(new broker::LinkExchange(*this,
-                                            "qpid.link." + 
framing::Uuid(true).str()));
-    broker->getExchanges().registerExchange(exchange);
+    stringstream _name;
+    _name << "qpid.link." << transport << ":" << host << ":" << port;
+    std::pair<Exchange::shared_ptr, bool> rc = 
broker->getExchanges().declare(_name.str(),
+                                                                              
exchangeTypeName);
+    exchange = boost::static_pointer_cast<LinkExchange>(rc.first);
+    assert(exchange);
+    exchange->setLink(this);
 }
 
 Link::~Link ()
@@ -270,11 +287,11 @@ void Link::opened() {
     // attempt to subscribe to failover exchange for updates from remote
     //
 
-    const std::string queueName = "qpid.link." + framing::Uuid(true).str();
+    const std::string queueName = "qpid.link." + exchange->getName();
 
     SessionHandler& sessionHandler = connection->getChannel(FAILOVER_CHANNEL);
     sessionHandler.setDetachedCallback( boost::bind(&sessionDetached, this) );
-    sessionHandler.attachAs(getName());
+    sessionHandler.attachAs(exchange->getName());
 
     framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
 
@@ -341,7 +358,7 @@ void Link::destroy ()
     {
         Mutex::ScopedLock mutex(lock);
 
-        QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " 
removed by management");
+        QPID_LOG (info, "Inter-broker link to " << configuredHost << ":" << 
configuredPort << " removed by management");
         closeConnection("closed by management");
         setStateLH(STATE_CLOSED);
 
@@ -363,7 +380,7 @@ void Link::destroy ()
     for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
         (*i)->destroy();
     toDelete.clear();
-    links->destroy (host, port);
+    links->destroy (configuredHost, configuredPort);
 }
 
 void Link::add(Bridge::shared_ptr bridge)
@@ -518,7 +535,7 @@ void Link::setPersistenceId(uint64_t id)
 
 const string& Link::getName() const
 {
-    return host;
+    return configuredHost;
 }
 
 Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
@@ -544,9 +561,9 @@ Link::shared_ptr Link::decode(LinkRegist
 void Link::encode(Buffer& buffer) const
 {
     buffer.putShortString(string("link"));
-    buffer.putShortString(host);
-    buffer.putShort(port);
-    buffer.putShortString(transport);
+    buffer.putShortString(configuredHost);
+    buffer.putShort(configuredPort);
+    buffer.putShortString(configuredTransport);
     buffer.putOctet(durable ? 1 : 0);
     buffer.putShortString(authMechanism);
     buffer.putShortString(username);
@@ -555,10 +572,10 @@ void Link::encode(Buffer& buffer) const
 
 uint32_t Link::encodedSize() const
 {
-    return host.size() + 1 // short-string (host)
+    return configuredHost.size() + 1 // short-string (host)
         + 5                // short-string ("link")
         + 2                // port
-        + transport.size() + 1 // short-string(transport)
+        + configuredTransport.size() + 1 // short-string(transport)
         + 1                // durable
         + authMechanism.size() + 1
         + username.size() + 1
@@ -613,7 +630,7 @@ Manageable::status_t Link::ManagementMet
         }
 
         std::pair<Bridge::shared_ptr, bool> result =
-            links->declare (host, port, iargs.i_durable, iargs.i_src,
+            links->declare (configuredHost, configuredPort, iargs.i_durable, 
iargs.i_src,
                             iargs.i_dest, iargs.i_key, iargs.i_srcIsQueue,
                             iargs.i_srcIsLocal, iargs.i_tag, iargs.i_excludes,
                             iargs.i_dynamic, iargs.i_sync);
@@ -652,11 +669,24 @@ void Link::closeConnection( const std::s
         if (sessionHandler.getSession()) {
             framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
             remoteBroker.getMessage().cancel(exchange->getName());
-            remoteBroker.getSession().detach(getName());
+            remoteBroker.getSession().detach(exchange->getName());
         }
         connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
         connection = 0;
     }
 }
 
+/** returns the current remote's address, and connection state */
+bool Link::getRemoteAddress(qpid::Address& addr) const
+{
+    addr.protocol = transport;
+    addr.host = host;
+    addr.port = port;
+
+    return state == STATE_OPERATIONAL;
+}
+
+
+const std::string Link::exchangeTypeName("qpid.LinkExchange");
+
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=1332654&r1=1332653&r2=1332654&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Tue May  1 13:57:21 2012
@@ -54,9 +54,16 @@ class Link : public PersistableConfig, p
     sys::Mutex          lock;
     LinkRegistry*       links;
     MessageStore*       store;
-    std::string        host;
-    uint16_t      port;
-    std::string        transport;
+
+    // these remain constant across failover - used to identify this link
+    const std::string   configuredTransport;
+    const std::string   configuredHost;
+    const uint16_t      configuredPort;
+    // these reflect the current address of remote - will change during 
failover
+    std::string         host;
+    uint16_t            port;
+    std::string         transport;
+
     bool          durable;
     std::string        authMechanism;
     std::string        username;
@@ -121,9 +128,16 @@ class Link : public PersistableConfig, p
          management::Manageable* parent = 0);
     virtual ~Link();
 
-    std::string getHost() { return host; }
-    uint16_t    getPort() { return port; }
-    std::string getTransport() { return transport; }
+    /** these return the *configured* transport/host/port, which does not 
change over the
+        lifetime of the Link */
+    std::string getHost() const { return configuredHost; }
+    uint16_t    getPort() const { return configuredPort; }
+    std::string getTransport() const { return configuredTransport; }
+
+    /** returns the current address of the remote, which may be different from 
the
+        configured transport/host/port due to failover. Returns true if 
connection is
+        active */
+    bool getRemoteAddress(qpid::Address& addr) const;
 
     bool isDurable() { return durable; }
     void maintenanceVisit ();
@@ -155,6 +169,9 @@ class Link : public PersistableConfig, p
     management::ManagementObject*    GetManagementObject(void) const;
     management::Manageable::status_t ManagementMethod(uint32_t, 
management::Args&, std::string&);
 
+    // manage the exchange owned by this link
+    static const std::string exchangeTypeName;
+    static boost::shared_ptr<Exchange> linkExchangeFactory(const std::string& 
name);
 };
 }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1332654&r1=1332653&r2=1332654&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue May  1 13:57:21 
2012
@@ -299,22 +299,29 @@ std::string LinkRegistry::getUsername(co
     return link->getUsername();
 }
 
+/** note: returns the current remote host (may be different from the host 
originally
+    configured for the Link due to failover) */
 std::string LinkRegistry::getHost(const std::string& key)
 {
-    Link::shared_ptr link = findLink(key);
-    if (!link)
-        return string();
+     Link::shared_ptr link = findLink(key);
+     if (!link)
+         return string();
 
-    return link->getHost();
+     qpid::Address addr;
+     link->getRemoteAddress(addr);
+     return addr.host;
 }
 
+/** returns the current remote port (ditto above) */
 uint16_t LinkRegistry::getPort(const std::string& key)
 {
     Link::shared_ptr link = findLink(key);
     if (!link)
         return 0;
 
-    return link->getPort();
+     qpid::Address addr;
+     link->getRemoteAddress(addr);
+     return addr.port;
 }
 
 std::string LinkRegistry::getPassword(const std::string& key)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to