Author: aconway
Date: Fri Feb 17 14:10:51 2012
New Revision: 1245509

URL: http://svn.apache.org/viewvc?rev=1245509&view=rev
Log:
QPID-3603: Logging improvements for bridges, links and HA classes.

Modified:
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1245509&r1=1245508&r2=1245509&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Bridge.cpp Fri Feb 17 
14:10:51 2012
@@ -75,7 +75,7 @@ Bridge::Bridge(Link* _link, framing::Cha
              args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync);
         agent->addObject(mgmtObject);
     }
-    QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << 
args.i_dest);
+    QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " 
to " << args.i_dest);
 }
 
 Bridge::~Bridge()
@@ -114,7 +114,7 @@ void Bridge::create(Connection& c)
         peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 
: 1, 0, false, "", 0, options);
         peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
         peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
-        QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " 
<< args.i_dest);
+        QPID_LOG(debug, "Activated bridge " << name << " for route from queue 
" << args.i_src << " to " << args.i_dest);
     } else {
         FieldTable queueSettings;
 
@@ -148,9 +148,9 @@ void Bridge::create(Connection& c)
             if (exchange.get() == 0)
                 throw Exception("Exchange not found for dynamic route");
             exchange->registerDynamicBridge(this);
-            QPID_LOG(debug, "Activated dynamic route for exchange " << 
args.i_src);
+            QPID_LOG(debug, "Activated bridge " << name << " for dynamic route 
for exchange " << args.i_src);
         } else {
-            QPID_LOG(debug, "Activated static route from exchange " << 
args.i_src << " to " << args.i_dest);
+            QPID_LOG(debug, "Activated bridge " << name << " for static route 
from exchange " << args.i_src << " to " << args.i_dest);
         }
     }
     if (args.i_srcIsLocal) 
sessionHandler.getSession()->enableReceiverTracking();
@@ -162,15 +162,16 @@ void Bridge::cancel(Connection&)
         peer->getMessage().cancel(args.i_dest);
         peer->getSession().detach(name);
     }
+    QPID_LOG(debug, "Cancelled bridge " << name);
 }
 
 void Bridge::closed()
 {
     if (args.i_dynamic) {
-        Exchange::shared_ptr exchange = 
link->getBroker()->getExchanges().get(args.i_src);
-        if (exchange.get() != 0)
-            exchange->removeDynamicBridge(this);
+        Exchange::shared_ptr exchange = 
link->getBroker()->getExchanges().find(args.i_src);
+        if (exchange.get()) exchange->removeDynamicBridge(this);
     }
+    QPID_LOG(debug, "Closed bridge " << name);
 }
 
 void Bridge::destroy()

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp?rev=1245509&r1=1245508&r2=1245509&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp Fri Feb 17 
14:10:51 2012
@@ -221,26 +221,28 @@ void Link::add(Bridge::shared_ptr bridge
 
 void Link::cancel(Bridge::shared_ptr bridge)
 {
-    Mutex::ScopedLock mutex(lock);
+    bool needIOProcessing = false;
+    {
+        Mutex::ScopedLock mutex(lock);
 
-    for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
-        if ((*i).get() == bridge.get()) {
-            created.erase(i);
-            break;
+        for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+            if ((*i).get() == bridge.get()) {
+                created.erase(i);
+                break;
+            }
         }
-    }
-    for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
-        if ((*i).get() == bridge.get()) {
-            cancellations.push_back(bridge);
-            bridge->closed();
-            active.erase(i);
-            break;
+        for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+            if ((*i).get() == bridge.get()) {
+                cancellations.push_back(bridge);
+                bridge->closed();
+                active.erase(i);
+                break;
+            }
         }
+        needIOProcessing = !cancellations.empty();
     }
-
-    if (!cancellations.empty()) {
+    if (needIOProcessing)
         connection->requestIOProcessing 
(boost::bind(&Link::ioThreadProcessing, this));
-    }
 }
 
 void Link::ioThreadProcessing()

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1245509&r1=1245508&r2=1245509&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Feb 
17 14:10:51 2012
@@ -65,6 +65,7 @@ void QueueReplicator::activate() {
     // Take a reference to myself to ensure not deleted before initializeBridge
     // is called.
     self = shared_from_this();
+    // Note this may create a new bridge or use an existing one.
     queue->getBroker()->getLinks().declare(
         link->getHost(), link->getPort(),
         false,              // durable
@@ -77,7 +78,8 @@ void QueueReplicator::activate() {
         "",                 // excludes
         false,              // dynamic
         0,                  // sync?
-        // Include shared_ptr to self to ensure we not deleted before 
initializeBridge is called.
+        // Include shared_ptr to self to ensure we are not deleted
+        // before initializeBridge is called.
         boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, self)
     );
 }
@@ -88,6 +90,7 @@ void QueueReplicator::deactivate() {
     sys::Mutex::ScopedLock l(lock);
     queue->getBroker()->getLinks().destroy(
         link->getHost(), link->getPort(), queue->getName(), getName(), 
string());
+    QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
 }
 
 // Called in a broker connection thread when the bridge is created.
@@ -95,7 +98,7 @@ void QueueReplicator::deactivate() {
 void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& 
sessionHandler,
                                        boost::shared_ptr<QueueReplicator> 
/*self*/) {
     sys::Mutex::ScopedLock l(lock);
-
+    bridgeName = bridge.getName();
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& 
args(bridge.getArgs());
     framing::FieldTable settings;
@@ -117,7 +120,7 @@ void QueueReplicator::initializeBridge(B
     peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 
1/*not-acquired*/, false, "", 0, settings);
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
-    QPID_LOG(debug, logPrefix << "Activated bridge from " << args.i_src << " 
to " << args.i_dest);
+    QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName);
 }
 
 namespace {

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1245509&r1=1245508&r2=1245509&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h Fri Feb 17 
14:10:51 2012
@@ -75,6 +75,7 @@ class QueueReplicator : public broker::E
     void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
 
     std::string logPrefix;
+    std::string bridgeName;
     sys::Mutex lock;
     boost::shared_ptr<broker::Queue> queue;
     boost::shared_ptr<broker::Link> link;

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1245509&r1=1245508&r2=1245509&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Feb 
17 14:10:51 2012
@@ -260,7 +260,7 @@ void WiringReplicator::doEventQueueDecla
     string name = values[QNAME].asString();
     Variant::Map argsMap = values[ARGS].asMap();
     if (values[DISP] == CREATED && replicateLevel(argsMap)) {
-         framing::FieldTable args;
+        framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
         std::pair<boost::shared_ptr<Queue>, bool> result =
             broker.createQueue(
@@ -287,21 +287,20 @@ void WiringReplicator::doEventQueueDecla
 }
 
 void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
+    // The remote queue has already been deleted so replicator
+    // sessions may be closed by a "queue deleted" exception.
     string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
     if (queue && replicateLevel(queue->getSettings())) {
         QPID_LOG(debug, "HA: Backup deleting queue: " << name);
-        broker.deleteQueue(
-            name,
-            values[USER].asString(),
-            values[RHOST].asString());
-        // Delete the QueueReplicator exchange for this queue.
-        boost::shared_ptr<broker::Exchange> ex =
-            broker.getExchanges().find(QueueReplicator::replicatorName(name));
-        boost::shared_ptr<QueueReplicator> qr =
-            boost::dynamic_pointer_cast<QueueReplicator>(ex);
+        string rname = QueueReplicator::replicatorName(name);
+        boost::shared_ptr<broker::Exchange> ex = 
broker.getExchanges().find(rname);
+        boost::shared_ptr<QueueReplicator> qr = 
boost::dynamic_pointer_cast<QueueReplicator>(ex);
         if (qr) qr->deactivate();
-        broker.getExchanges().destroy(QueueReplicator::replicatorName(name));
+        // QueueReplicator's bridge is now queued for destruction but may not
+        // actually be destroyed, deleting the exhange
+        broker.getExchanges().destroy(rname);
+        broker.deleteQueue(name, values[USER].asString(), 
values[RHOST].asString());
     }
 }
 
@@ -455,8 +454,8 @@ void WiringReplicator::doResponseBind(Va
 void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& 
queue) {
     if (replicateLevel(queue->getSettings()) == RL_ALL) {
         boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, 
link));
-        qr->activate();
         broker.getExchanges().registerExchange(qr);
+        qr->activate();
     }
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to