Author: aconway
Date: Fri Feb 17 14:12:47 2012
New Revision: 1245525

URL: http://svn.apache.org/viewvc?rev=1245525&view=rev
Log:
QPID-3603: Refactor LinkRegistry to use a  ConnectionObserver.

Modified:
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObserver.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObservers.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ConnectionExcluder.h

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h?rev=1245525&r1=1245524&r2=1245525&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Broker.h Fri Feb 17 
14:12:47 2012
@@ -179,6 +179,7 @@ public:
     std::auto_ptr<MessageStore> store;
     AclModule* acl;
     DataDir dataDir;
+    ConnectionObservers connectionObservers;
 
     QueueRegistry queues;
     ExchangeRegistry exchanges;
@@ -201,7 +202,6 @@ public:
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
     ConnectionCounter connectionCounter;
     ConsumerFactories consumerFactories;
-    ConnectionObservers connectionObservers;
 
   public:
     virtual ~Broker();

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1245525&r1=1245524&r2=1245525&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Connection.cpp Fri Feb 
17 14:12:47 2012
@@ -104,8 +104,7 @@ Connection::Connection(ConnectionOutputH
     outboundTracker(*this)
 {
     outboundTracker.wrap(out);
-    if (link)
-        links.notifyConnection(mgmtId, this);
+    broker.getConnectionObservers().connection(*this);
     // In a cluster, allow adding the management object to be delayed.
     if (!delayManagement) addManagementObject();
     if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
@@ -143,8 +142,7 @@ Connection::~Connection()
         if (!link && isClusterSafe())
             agent->raiseEvent(_qmf::EventClientDisconnect(mgmtId, 
ConnectionState::getUserId()));
     }
-    if (link)
-        links.notifyClosed(mgmtId);
+    broker.getConnectionObservers().closed(*this);
 
     if (heartbeatTimer)
         heartbeatTimer->cancel();
@@ -165,8 +163,7 @@ void Connection::received(framing::AMQFr
         recordFromClient(frame);
     if (!wasOpen && isOpen()) {
         doIoCallbacks(); // Do any callbacks registered before we opened.
-        // FIXME aconway 2012-01-18: generic observer points.
-        broker.getConnectionObservers().connect(*this);
+        broker.getConnectionObservers().opened(*this);
     }
 }
 
@@ -267,8 +264,7 @@ string Connection::getAuthCredentials()
 
 void Connection::notifyConnectionForced(const string& text)
 {
-    if (link)
-        links.notifyConnectionForced(mgmtId, text);
+    broker.getConnectionObservers().forced(*this, text);
 }
 
 void Connection::setUserId(const string& userId)

Modified: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObserver.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObserver.h?rev=1245525&r1=1245524&r2=1245525&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObserver.h 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObserver.h Fri 
Feb 17 14:12:47 2012
@@ -37,14 +37,19 @@ class ConnectionObserver
   public:
     virtual ~ConnectionObserver() {}
 
-    /** Called when a connection is opened and authentication has been
-     * performed.
+    /** Called when a connection is first established. */
+    virtual void connection(Connection&) {}
+
+    /** Called when the opening negotiation is done and the connection is 
authenticated.
      * @exception Throwing an exception will abort the connection.
      */
-    virtual void connect(Connection& connection) = 0;
+    virtual void opened(Connection&) {}
+
+    /** Called when a connection is closed. */
+    virtual void closed(Connection&) {}
 
-    /** Called when a connection is torn down. */
-    virtual void disconnect(Connection& connection) = 0;
+    /** Called when a connection is forced closed. */
+    virtual void forced(Connection&, const std::string& /*message*/) {}
 };
 
 }} // namespace qpid::broker

Modified: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObservers.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObservers.h?rev=1245525&r1=1245524&r2=1245525&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObservers.h 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/ConnectionObservers.h 
Fri Feb 17 14:12:47 2012
@@ -38,17 +38,29 @@ class ConnectionObservers : public Conne
         observers.push_back(observer);
     }
 
-    // implementation of ConnectionObserver interface
-    void connect(Connection& c) {
-        std::for_each(observers.begin(), observers.end(), 
boost::bind(&ConnectionObserver::connect, _1, boost::ref(c)));
+    void connection(Connection& c) {
+        each(boost::bind(&ConnectionObserver::connection, _1, boost::ref(c)));
     }
-    void disconnect(Connection& c) {
-        std::for_each(observers.begin(), observers.end(), 
boost::bind(&ConnectionObserver::disconnect, _1, boost::ref(c)));
+
+    void opened(Connection& c) {
+        each(boost::bind(&ConnectionObserver::opened, _1, boost::ref(c)));
+    }
+
+    void closed(Connection& c) {
+        each(boost::bind(&ConnectionObserver::closed, _1, boost::ref(c)));
+    }
+
+    void forced(Connection& c, const std::string& text) {
+        each(boost::bind(&ConnectionObserver::forced, _1, boost::ref(c), 
text));
     }
 
   private:
     typedef std::vector<boost::shared_ptr<ConnectionObserver> > Observers;
     Observers observers;
+
+    template <class F> void each(F f) {
+        std::for_each(observers.begin(), observers.end(), f);
+    }
 };
 
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=1245525&r1=1245524&r2=1245525&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/LinkRegistry.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Fri Feb 
17 14:12:47 2012
@@ -48,6 +48,16 @@ LinkRegistry::LinkRegistry () :
 {
 }
 
+namespace {
+struct ConnectionObserverImpl : public ConnectionObserver {
+    LinkRegistry& links;
+    ConnectionObserverImpl(LinkRegistry& l) : links(l) {}
+    void connection(Connection& c) { links.notifyConnection(c.getMgmtId(), 
&c); }
+    void closed(Connection& c) { links.notifyClosed(c.getMgmtId()); }
+    void forced(Connection& c, const string& text) { 
links.notifyConnectionForced(c.getMgmtId(), text); }
+};
+}
+
 LinkRegistry::LinkRegistry (Broker* _broker) :
     broker(_broker), timer(&broker->getTimer()),
     maintenanceTask(new Periodic(*this)),
@@ -55,6 +65,8 @@ LinkRegistry::LinkRegistry (Broker* _bro
     realm(broker->getOptions().realm)
 {
     timer->add(maintenanceTask);
+    broker->getConnectionObservers().add(
+        boost::shared_ptr<ConnectionObserver>(new 
ConnectionObserverImpl(*this)));
 }
 
 LinkRegistry::~LinkRegistry()

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ConnectionExcluder.h?rev=1245525&r1=1245524&r2=1245525&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ConnectionExcluder.h 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ConnectionExcluder.h Fri Feb 
17 14:12:47 2012
@@ -42,7 +42,7 @@ class ConnectionExcluder : public broker
     ConnectionExcluder(string adminUser_, PrimaryTest isPrimary_)
         : adminUser(adminUser_), isPrimary(isPrimary_) {}
 
-    void connect(broker::Connection& connection) {
+    void opened(broker::Connection& connection) {
         if (!isPrimary() && !connection.isLink()
             && !connection.isAuthenticatedUser(adminUser))
         {
@@ -59,8 +59,6 @@ class ConnectionExcluder : public broker
         }
     }
 
-    void disconnect(broker::Connection&) {}
-
   private:
     string adminUser;
     PrimaryTest isPrimary;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to