Author: aconway Date: Fri Feb 17 14:10:51 2012 New Revision: 1245509 URL: http://svn.apache.org/viewvc?rev=1245509&view=rev Log: QPID-3603: Logging improvements for bridges, links and HA classes.
Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Bridge.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Bridge.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1245509&r1=1245508&r2=1245509&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Bridge.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Bridge.cpp Fri Feb 17 14:10:51 2012 @@ -75,7 +75,7 @@ Bridge::Bridge(Link* _link, framing::Cha args.i_tag, args.i_excludes, args.i_dynamic, args.i_sync); agent->addObject(mgmtObject); } - QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, "Bridge " << name << " created from " << args.i_src << " to " << args.i_dest); } Bridge::~Bridge() @@ -114,7 +114,7 @@ void Bridge::create(Connection& c) peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options); peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, "Activated bridge " << name << " for route from queue " << args.i_src << " to " << args.i_dest); } else { FieldTable queueSettings; @@ -148,9 +148,9 @@ void Bridge::create(Connection& c) if (exchange.get() == 0) throw Exception("Exchange not found for dynamic route"); exchange->registerDynamicBridge(this); - QPID_LOG(debug, "Activated dynamic route for exchange " << args.i_src); + QPID_LOG(debug, "Activated bridge " << name << " for dynamic route for exchange " << args.i_src); } else { - QPID_LOG(debug, "Activated static route from exchange " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, "Activated bridge " << name << " for static route from exchange " << args.i_src << " to " << args.i_dest); } } if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking(); @@ -162,15 +162,16 @@ void Bridge::cancel(Connection&) peer->getMessage().cancel(args.i_dest); peer->getSession().detach(name); } + QPID_LOG(debug, "Cancelled bridge " << name); } void Bridge::closed() { if (args.i_dynamic) { - Exchange::shared_ptr exchange = link->getBroker()->getExchanges().get(args.i_src); - if (exchange.get() != 0) - exchange->removeDynamicBridge(this); + Exchange::shared_ptr exchange = link->getBroker()->getExchanges().find(args.i_src); + if (exchange.get()) exchange->removeDynamicBridge(this); } + QPID_LOG(debug, "Closed bridge " << name); } void Bridge::destroy() Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp?rev=1245509&r1=1245508&r2=1245509&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Link.cpp Fri Feb 17 14:10:51 2012 @@ -221,26 +221,28 @@ void Link::add(Bridge::shared_ptr bridge void Link::cancel(Bridge::shared_ptr bridge) { - Mutex::ScopedLock mutex(lock); + bool needIOProcessing = false; + { + Mutex::ScopedLock mutex(lock); - for (Bridges::iterator i = created.begin(); i != created.end(); i++) { - if ((*i).get() == bridge.get()) { - created.erase(i); - break; + for (Bridges::iterator i = created.begin(); i != created.end(); i++) { + if ((*i).get() == bridge.get()) { + created.erase(i); + break; + } } - } - for (Bridges::iterator i = active.begin(); i != active.end(); i++) { - if ((*i).get() == bridge.get()) { - cancellations.push_back(bridge); - bridge->closed(); - active.erase(i); - break; + for (Bridges::iterator i = active.begin(); i != active.end(); i++) { + if ((*i).get() == bridge.get()) { + cancellations.push_back(bridge); + bridge->closed(); + active.erase(i); + break; + } } + needIOProcessing = !cancellations.empty(); } - - if (!cancellations.empty()) { + if (needIOProcessing) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); - } } void Link::ioThreadProcessing() Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1245509&r1=1245508&r2=1245509&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Feb 17 14:10:51 2012 @@ -65,6 +65,7 @@ void QueueReplicator::activate() { // Take a reference to myself to ensure not deleted before initializeBridge // is called. self = shared_from_this(); + // Note this may create a new bridge or use an existing one. queue->getBroker()->getLinks().declare( link->getHost(), link->getPort(), false, // durable @@ -77,7 +78,8 @@ void QueueReplicator::activate() { "", // excludes false, // dynamic 0, // sync? - // Include shared_ptr to self to ensure we not deleted before initializeBridge is called. + // Include shared_ptr to self to ensure we are not deleted + // before initializeBridge is called. boost::bind(&QueueReplicator::initializeBridge, this, _1, _2, self) ); } @@ -88,6 +90,7 @@ void QueueReplicator::deactivate() { sys::Mutex::ScopedLock l(lock); queue->getBroker()->getLinks().destroy( link->getHost(), link->getPort(), queue->getName(), getName(), string()); + QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName); } // Called in a broker connection thread when the bridge is created. @@ -95,7 +98,7 @@ void QueueReplicator::deactivate() { void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler, boost::shared_ptr<QueueReplicator> /*self*/) { sys::Mutex::ScopedLock l(lock); - + bridgeName = bridge.getName(); framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; @@ -117,7 +120,7 @@ void QueueReplicator::initializeBridge(B peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings); peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - QPID_LOG(debug, logPrefix << "Activated bridge from " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, logPrefix << "Activated bridge " << bridgeName); } namespace { Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1245509&r1=1245508&r2=1245509&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h Fri Feb 17 14:10:51 2012 @@ -75,6 +75,7 @@ class QueueReplicator : public broker::E void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&); std::string logPrefix; + std::string bridgeName; sys::Mutex lock; boost::shared_ptr<broker::Queue> queue; boost::shared_ptr<broker::Link> link; Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1245509&r1=1245508&r2=1245509&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Feb 17 14:10:51 2012 @@ -260,7 +260,7 @@ void WiringReplicator::doEventQueueDecla string name = values[QNAME].asString(); Variant::Map argsMap = values[ARGS].asMap(); if (values[DISP] == CREATED && replicateLevel(argsMap)) { - framing::FieldTable args; + framing::FieldTable args; amqp_0_10::translate(argsMap, args); std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( @@ -287,21 +287,20 @@ void WiringReplicator::doEventQueueDecla } void WiringReplicator::doEventQueueDelete(Variant::Map& values) { + // The remote queue has already been deleted so replicator + // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); if (queue && replicateLevel(queue->getSettings())) { QPID_LOG(debug, "HA: Backup deleting queue: " << name); - broker.deleteQueue( - name, - values[USER].asString(), - values[RHOST].asString()); - // Delete the QueueReplicator exchange for this queue. - boost::shared_ptr<broker::Exchange> ex = - broker.getExchanges().find(QueueReplicator::replicatorName(name)); - boost::shared_ptr<QueueReplicator> qr = - boost::dynamic_pointer_cast<QueueReplicator>(ex); + string rname = QueueReplicator::replicatorName(name); + boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); + boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex); if (qr) qr->deactivate(); - broker.getExchanges().destroy(QueueReplicator::replicatorName(name)); + // QueueReplicator's bridge is now queued for destruction but may not + // actually be destroyed, deleting the exhange + broker.getExchanges().destroy(rname); + broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); } } @@ -455,8 +454,8 @@ void WiringReplicator::doResponseBind(Va void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { if (replicateLevel(queue->getSettings()) == RL_ALL) { boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); - qr->activate(); broker.getExchanges().registerExchange(qr); + qr->activate(); } } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org