Author: aconway Date: Fri Feb 17 14:12:47 2012 New Revision: 1245525 URL: http://svn.apache.org/viewvc?rev=1245525&view=rev Log: QPID-3603: Refactor LinkRegistry to use a ConnectionObserver.
Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Connection.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObserver.h qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObservers.h qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/LinkRegistry.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ConnectionExcluder.h Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h?rev=1245525&r1=1245524&r2=1245525&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h Fri Feb 17 14:12:47 2012 @@ -179,6 +179,7 @@ public: std::auto_ptr<MessageStore> store; AclModule* acl; DataDir dataDir; + ConnectionObservers connectionObservers; QueueRegistry queues; ExchangeRegistry exchanges; @@ -201,7 +202,6 @@ public: boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; ConnectionCounter connectionCounter; ConsumerFactories consumerFactories; - ConnectionObservers connectionObservers; public: virtual ~Broker(); Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Connection.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1245525&r1=1245524&r2=1245525&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Connection.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Connection.cpp Fri Feb 17 14:12:47 2012 @@ -104,8 +104,7 @@ Connection::Connection(ConnectionOutputH outboundTracker(*this) { outboundTracker.wrap(out); - if (link) - links.notifyConnection(mgmtId, this); + broker.getConnectionObservers().connection(*this); // In a cluster, allow adding the management object to be delayed. if (!delayManagement) addManagementObject(); if (!isShadow()) broker.getConnectionCounter().inc_connectionCount(); @@ -143,8 +142,7 @@ Connection::~Connection() if (!link && isClusterSafe()) agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, ConnectionState::getUserId())); } - if (link) - links.notifyClosed(mgmtId); + broker.getConnectionObservers().closed(*this); if (heartbeatTimer) heartbeatTimer->cancel(); @@ -165,8 +163,7 @@ void Connection::received(framing::AMQFr recordFromClient(frame); if (!wasOpen && isOpen()) { doIoCallbacks(); // Do any callbacks registered before we opened. - // FIXME aconway 2012-01-18: generic observer points. - broker.getConnectionObservers().connect(*this); + broker.getConnectionObservers().opened(*this); } } @@ -267,8 +264,7 @@ string Connection::getAuthCredentials() void Connection::notifyConnectionForced(const string& text) { - if (link) - links.notifyConnectionForced(mgmtId, text); + broker.getConnectionObservers().forced(*this, text); } void Connection::setUserId(const string& userId) Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObserver.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObserver.h?rev=1245525&r1=1245524&r2=1245525&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObserver.h (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObserver.h Fri Feb 17 14:12:47 2012 @@ -37,14 +37,19 @@ class ConnectionObserver public: virtual ~ConnectionObserver() {} - /** Called when a connection is opened and authentication has been - * performed. + /** Called when a connection is first established. */ + virtual void connection(Connection&) {} + + /** Called when the opening negotiation is done and the connection is authenticated. * @exception Throwing an exception will abort the connection. */ - virtual void connect(Connection& connection) = 0; + virtual void opened(Connection&) {} + + /** Called when a connection is closed. */ + virtual void closed(Connection&) {} - /** Called when a connection is torn down. */ - virtual void disconnect(Connection& connection) = 0; + /** Called when a connection is forced closed. */ + virtual void forced(Connection&, const std::string& /*message*/) {} }; }} // namespace qpid::broker Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObservers.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObservers.h?rev=1245525&r1=1245524&r2=1245525&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObservers.h (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObservers.h Fri Feb 17 14:12:47 2012 @@ -38,17 +38,29 @@ class ConnectionObservers : public Conne observers.push_back(observer); } - // implementation of ConnectionObserver interface - void connect(Connection& c) { - std::for_each(observers.begin(), observers.end(), boost::bind(&ConnectionObserver::connect, _1, boost::ref(c))); + void connection(Connection& c) { + each(boost::bind(&ConnectionObserver::connection, _1, boost::ref(c))); } - void disconnect(Connection& c) { - std::for_each(observers.begin(), observers.end(), boost::bind(&ConnectionObserver::disconnect, _1, boost::ref(c))); + + void opened(Connection& c) { + each(boost::bind(&ConnectionObserver::opened, _1, boost::ref(c))); + } + + void closed(Connection& c) { + each(boost::bind(&ConnectionObserver::closed, _1, boost::ref(c))); + } + + void forced(Connection& c, const std::string& text) { + each(boost::bind(&ConnectionObserver::forced, _1, boost::ref(c), text)); } private: typedef std::vector<boost::shared_ptr<ConnectionObserver> > Observers; Observers observers; + + template <class F> void each(F f) { + std::for_each(observers.begin(), observers.end(), f); + } }; }} // namespace qpid::broker Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/LinkRegistry.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1245525&r1=1245524&r2=1245525&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Fri Feb 17 14:12:47 2012 @@ -48,6 +48,16 @@ LinkRegistry::LinkRegistry () : { } +namespace { +struct ConnectionObserverImpl : public ConnectionObserver { + LinkRegistry& links; + ConnectionObserverImpl(LinkRegistry& l) : links(l) {} + void connection(Connection& c) { links.notifyConnection(c.getMgmtId(), &c); } + void closed(Connection& c) { links.notifyClosed(c.getMgmtId()); } + void forced(Connection& c, const string& text) { links.notifyConnectionForced(c.getMgmtId(), text); } +}; +} + LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), timer(&broker->getTimer()), maintenanceTask(new Periodic(*this)), @@ -55,6 +65,8 @@ LinkRegistry::LinkRegistry (Broker* _bro realm(broker->getOptions().realm) { timer->add(maintenanceTask); + broker->getConnectionObservers().add( + boost::shared_ptr<ConnectionObserver>(new ConnectionObserverImpl(*this))); } LinkRegistry::~LinkRegistry() Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ConnectionExcluder.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ConnectionExcluder.h?rev=1245525&r1=1245524&r2=1245525&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ConnectionExcluder.h (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ConnectionExcluder.h Fri Feb 17 14:12:47 2012 @@ -42,7 +42,7 @@ class ConnectionExcluder : public broker ConnectionExcluder(string adminUser_, PrimaryTest isPrimary_) : adminUser(adminUser_), isPrimary(isPrimary_) {} - void connect(broker::Connection& connection) { + void opened(broker::Connection& connection) { if (!isPrimary() && !connection.isLink() && !connection.isAuthenticatedUser(adminUser)) { @@ -59,8 +59,6 @@ class ConnectionExcluder : public broker } } - void disconnect(broker::Connection&) {} - private: string adminUser; PrimaryTest isPrimary; --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org