Author: gsim
Date: Fri Jan 23 13:55:15 2009
New Revision: 737203

URL: http://svn.apache.org/viewvc?rev=737203&view=rev
Log:
Use special management ids for objects used in state transfer to new members. 
This prevents the ids getting out of sync across the cluster and allows 
management methods to be used reliably.
 

Added:
    qpid/trunk/qpid/cpp/src/qpid/management/IdAllocator.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=737203&r1=737202&r2=737203&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Jan 23 13:55:15 2009
@@ -48,7 +48,7 @@
 namespace qpid {
 namespace broker {
 
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const 
std::string& mgmtId_, bool isLink_) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const 
std::string& mgmtId_, bool isLink_, uint64_t objectId) :
     ConnectionState(out_, broker_),
     adapter(*this, isLink_),
     isLink(isLink_),
@@ -70,9 +70,10 @@
                
                
         // TODO set last bool true if system connection
-        if (agent != 0)
+        if (agent != 0) {
             mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, 
!isLink, false);
-        agent->addObject(mgmtObject);
+            agent->addObject(mgmtObject, objectId);
+        }
         ConnectionState::setUrl(mgmtId);
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=737203&r1=737202&r2=737203&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Fri Jan 23 13:55:15 2009
@@ -64,7 +64,7 @@
                    public RefCounted
 {
   public:
-    Connection(sys::ConnectionOutputHandler* out, Broker& broker, const 
std::string& mgmtId, bool isLink = false);
+    Connection(sys::ConnectionOutputHandler* out, Broker& broker, const 
std::string& mgmtId, bool isLink = false, uint64_t objectId = 0);
     ~Connection ();
 
     /** Get the SessionHandler for channel. Create if it does not already 
exist */

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=737203&r1=737202&r2=737203&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Fri Jan 23 13:55:15 2009
@@ -22,6 +22,7 @@
 #include "Exchange.h"
 #include "ExchangeRegistry.h"
 #include "qpid/agent/ManagementAgent.h"
+#include "qpid/management/ManagementBroker.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/MessageProperties.h"
 #include "DeliverableMessage.h"
@@ -32,6 +33,7 @@
 using qpid::framing::FieldTable;
 using qpid::sys::Mutex;
 using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
@@ -109,12 +111,14 @@
             mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, 
durable);
             mgmtExchange->set_arguments(args);
             if (!durable) {
-                if (name == "")
+                if (name == "") {
                     agent->addObject (mgmtExchange, 0x1000000000000004LL);  // 
Special default exchange ID
-                else if (name == "qpid.management")
+                } else if (name == "qpid.management") {
                     agent->addObject (mgmtExchange, 0x1000000000000005LL);  // 
Special management exchange ID
-                else
-                    agent->addObject (mgmtExchange);
+                } else {
+                    ManagementBroker* mb = 
dynamic_cast<ManagementBroker*>(agent);
+                    agent->addObject (mgmtExchange, mb ? mb->allocateId(this) 
: 0);
+                }
             }
         }
     }
@@ -245,7 +249,8 @@
                     (agent, this, (Manageable*) parent, queueId, key, args);
                 if (!origin.empty())
                     mgmtBinding->set_origin(origin);
-                agent->addObject (mgmtBinding);
+                ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
+                agent->addObject (mgmtBinding, mb ? mb->allocateId(this) : 0);
             }
         }
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=737203&r1=737202&r2=737203&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Fri Jan 23 13:55:15 2009
@@ -39,6 +39,22 @@
 class ExchangeRegistry;
 
 class Exchange : public PersistableExchange, public management::Manageable {
+public:
+    struct Binding : public management::Manageable {
+        typedef boost::shared_ptr<Binding>       shared_ptr;
+        typedef std::vector<Binding::shared_ptr> vector;
+
+        Queue::shared_ptr         queue;
+        const std::string         key;
+        const framing::FieldTable args;
+        qmf::org::apache::qpid::broker::Binding* mgmtBinding;
+
+        Binding(const std::string& key, Queue::shared_ptr queue, Exchange* 
parent = 0,
+                framing::FieldTable args = framing::FieldTable(), const 
std::string& origin = std::string());
+        ~Binding();
+        management::ManagementObject* GetManagementObject() const;
+    };
+
 private:
     const std::string name;
     const bool durable;
@@ -64,20 +80,6 @@
            
     void routeIVE();
            
-    struct Binding : public management::Manageable {
-        typedef boost::shared_ptr<Binding>       shared_ptr;
-        typedef std::vector<Binding::shared_ptr> vector;
-
-        Queue::shared_ptr         queue;
-        const std::string         key;
-        const framing::FieldTable args;
-        qmf::org::apache::qpid::broker::Binding* mgmtBinding;
-
-        Binding(const std::string& key, Queue::shared_ptr queue, Exchange* 
parent = 0,
-                framing::FieldTable args = framing::FieldTable(), const 
std::string& origin = std::string());
-        ~Binding();
-        management::ManagementObject* GetManagementObject() const;
-    };
 
     struct MatchQueue {
         const Queue::shared_ptr queue;        

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=737203&r1=737202&r2=737203&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Jan 23 13:55:15 2009
@@ -30,6 +30,7 @@
 
 #include "qpid/StringUtils.h"
 #include "qpid/log/Statement.h"
+#include "qpid/management/ManagementBroker.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
@@ -46,6 +47,7 @@
 using namespace qpid::sys;
 using namespace qpid::framing;
 using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
@@ -103,8 +105,10 @@
 
             // Add the object to the management agent only if this queue is 
not durable.
             // If it's durable, we will add it later when the queue is 
assigned a persistenceId.
-            if (store == 0)
-                agent->addObject (mgmtObject);
+            if (store == 0) {
+                ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
+                agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
+            }
         }
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=737203&r1=737202&r2=737203&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Fri Jan 23 13:55:15 
2009
@@ -30,6 +30,7 @@
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/ServerInvoker.h"
 #include "qpid/log/Statement.h"
+#include "qpid/management/ManagementBroker.h"
 
 #include <boost/bind.hpp>
 #include <boost/lexical_cast.hpp>
@@ -41,6 +42,7 @@
 using sys::Mutex;
 using boost::intrusive_ptr;
 using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
@@ -65,7 +67,8 @@
             mgmtObject->set_attached (0);
             mgmtObject->set_detachedLifespan (0);
             mgmtObject->clr_expireTime();
-            agent->addObject (mgmtObject);
+            ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
+            agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
         }
     }
     attach(h);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=737203&r1=737202&r2=737203&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Fri Jan 23 13:55:15 
2009
@@ -21,13 +21,21 @@
 
 #include "qpid/cluster/Cluster.h"
 #include "qpid/cluster/ConnectionCodec.h"
+#include "qpid/cluster/DumpClient.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
 #include "qpid/shared_ptr.h"
+#include "qpid/sys/AtomicValue.h"
 #include "qpid/log/Statement.h"
 
+#include "qpid/management/ManagementBroker.h"
+#include "qpid/management/IdAllocator.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionState.h"
+
 #include <boost/utility/in_place_factory.hpp>
 #include <boost/scoped_ptr.hpp>
 
@@ -36,6 +44,9 @@
 
 using namespace std;
 using broker::Broker;
+using management::IdAllocator;
+using management::ManagementAgent;
+using management::ManagementBroker;
 
 struct ClusterValues {
     string name;
@@ -76,6 +87,46 @@
     }
 };
 
+struct DumpClientIdAllocator : management::IdAllocator
+{
+    qpid::sys::AtomicValue<uint64_t> sequence;
+
+    DumpClientIdAllocator() : sequence(0x4000000000000000LL) {}
+
+    uint64_t getIdFor(management::Manageable* m)
+    {
+        if (isDumpQueue(m) || isDumpExchange(m) || isDumpSession(m) || 
isDumpBinding(m)) {
+            return ++sequence;
+        } else {
+            return 0;
+        }
+    }
+
+    bool isDumpQueue(management::Manageable* manageable)
+    {
+        qpid::broker::Queue* queue = 
dynamic_cast<qpid::broker::Queue*>(manageable);
+        return queue && queue->getName() == DumpClient::DUMP;
+    }
+
+    bool isDumpExchange(management::Manageable* manageable)
+    {
+        qpid::broker::Exchange* exchange = 
dynamic_cast<qpid::broker::Exchange*>(manageable);
+        return exchange && exchange->getName() == DumpClient::DUMP;
+    }
+
+    bool isDumpSession(management::Manageable* manageable)
+    {
+        broker::SessionState* session = 
dynamic_cast<broker::SessionState*>(manageable);
+        return session && session->getId().getName() == DumpClient::DUMP;
+    }
+
+    bool isDumpBinding(management::Manageable* manageable)
+    {
+        broker::Exchange::Binding* binding = 
dynamic_cast<broker::Exchange::Binding*>(manageable);
+        return binding && binding->queue->getName() == DumpClient::DUMP;
+    }
+};
+
 struct ClusterPlugin : public Plugin {
 
     ClusterValues values;
@@ -102,6 +153,11 @@
             boost::shared_ptr<sys::ConnectionCodec::Factory>(
                 new ConnectionCodec::Factory(broker->getConnectionFactory(), 
*cluster)));
         
broker->getExchanges().registerExchange(cluster->getFailoverExchange());
+        ManagementBroker* mgmt = 
dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance());
+        if (mgmt) {
+            std::auto_ptr<IdAllocator> allocator(new DumpClientIdAllocator());
+            mgmt->setAllocator(allocator);
+        }
     }
 
     void earlyInitialize(Plugin::Target&) {}

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=737203&r1=737202&r2=737203&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Jan 23 13:55:15 2009
@@ -69,7 +69,7 @@
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& wrappedId, MemberId myId, bool 
isCatchUp, bool isLink)
     : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId, isLink), 
readCredit(0),
+      connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? 
++catchUpId : 0), readCredit(0),
       expectProtocolHeader(isLink)
 { init(); }
 
@@ -396,5 +396,7 @@
     QPID_LOG(debug, cluster << " decoded queue " << q->getName());    
 }
 
+qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+
 }} // namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=737203&r1=737202&r2=737203&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Jan 23 13:55:15 2009
@@ -31,6 +31,7 @@
 
 #include "qpid/broker/Connection.h"
 #include "qpid/amqp_0_10/Connection.h"
+#include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
 #include "qpid/framing/FrameDecoder.h"
@@ -173,6 +174,8 @@
     boost::shared_ptr<broker::TxBuffer> txBuffer;
     int readCredit;
     bool expectProtocolHeader;
+
+    static qpid::sys::AtomicValue<uint64_t> catchUpId;
     
   friend std::ostream& operator<<(std::ostream&, const Connection&);
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=737203&r1=737202&r2=737203&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Fri Jan 23 13:55:15 2009
@@ -94,7 +94,7 @@
       done(ok), failed(fail)
 {
     connection.open(url);
-    session = connection.newSession("dump_shared");
+    session = connection.newSession(DUMP);
 }
 
 DumpClient::~DumpClient() {}

Added: qpid/trunk/qpid/cpp/src/qpid/management/IdAllocator.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/IdAllocator.h?rev=737203&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/IdAllocator.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/management/IdAllocator.h Fri Jan 23 13:55:15 
2009
@@ -0,0 +1,42 @@
+#ifndef QPID_MANAGEMENT_IDALLOCATOR_H
+#define QPID_MANAGEMENT_IDALLOCATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Manageable.h"
+
+namespace qpid {
+namespace management {
+
+/**
+ * Interface through which plugins etc can control the mgmt object id
+ * allocation for special cases
+ */
+struct IdAllocator
+{
+    virtual uint64_t getIdFor(Manageable* object) = 0;
+    virtual ~IdAllocator() {}
+};
+
+}} // namespace qpid::management
+
+#endif  /*!QPID_MANAGEMENT_IDALLOCATOR_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/management/IdAllocator.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/management/IdAllocator.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp?rev=737203&r1=737202&r2=737203&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.cpp Fri Jan 23 
13:55:15 2009
@@ -20,6 +20,7 @@
  */
  
 #include "ManagementBroker.h"
+#include "IdAllocator.h"
 #include "qpid/broker/DeliverableMessage.h"
 #include "qpid/log/Statement.h"
 #include <qpid/broker/Message.h>
@@ -1135,3 +1136,16 @@
     inBuffer.restore(); // restore original position
     return end - start;
 }
+
+void ManagementBroker::setAllocator(std::auto_ptr<IdAllocator> a)
+{
+    Mutex::ScopedLock lock (addLock);
+    allocator = a;
+}
+
+uint64_t ManagementBroker::allocateId(Manageable* object)
+{
+    Mutex::ScopedLock lock (addLock);
+    if (allocator.get()) return allocator->getIdFor(object);
+    return 0;
+}

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=737203&r1=737202&r2=737203&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Fri Jan 23 
13:55:15 2009
@@ -32,10 +32,13 @@
 #include "Manageable.h"
 #include "qmf/org/apache/qpid/broker/Agent.h"
 #include <qpid/framing/AMQFrame.h>
+#include <memory>
 
 namespace qpid {
 namespace management {
 
+struct IdAllocator;
+
 class ManagementBroker : public ManagementAgent
 {
 private:
@@ -43,7 +46,6 @@
     int threadPoolSize;
 
 public:
-
     ManagementBroker ();
     virtual ~ManagementBroker ();
 
@@ -78,6 +80,8 @@
     uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
     int getSignalFd () { assert(0); return -1; }
 
+    void setAllocator(std::auto_ptr<IdAllocator> allocator);
+    uint64_t allocateId(Manageable* object);
 private:
     friend class ManagementAgent;
 
@@ -179,6 +183,8 @@
     uint32_t                     nextRequestSequence;
     bool                         clientWasAdded;
 
+    std::auto_ptr<IdAllocator> allocator;
+
 #   define MA_BUFFER_SIZE 65536
     char inputBuffer[MA_BUFFER_SIZE];
     char outputBuffer[MA_BUFFER_SIZE];



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to