Ok, after much gnashing of teeth, here is an updated patch. The cluster plugin registers an id allocator that allocates IDs from a specific range for anything related to the state transfer.
Index: src/qpid/cluster/ClusterPlugin.cpp
===================================================================
--- src/qpid/cluster/ClusterPlugin.cpp	(revision 736978)
+++ src/qpid/cluster/ClusterPlugin.cpp	(working copy)
@@ -21,13 +21,21 @@
 
 #include "qpid/cluster/Cluster.h"
 #include "qpid/cluster/ConnectionCodec.h"
+#include "qpid/cluster/DumpClient.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/Plugin.h"
 #include "qpid/Options.h"
 #include "qpid/shared_ptr.h"
+#include "qpid/sys/AtomicValue.h"
 #include "qpid/log/Statement.h"
 
+#include "qpid/management/ManagementBroker.h"
+#include "qpid/management/IdAllocator.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/SessionState.h"
+
 #include <boost/utility/in_place_factory.hpp>
 #include <boost/scoped_ptr.hpp>
 
@@ -36,6 +44,9 @@
 
 using namespace std;
 using broker::Broker;
+using management::IdAllocator;
+using management::ManagementAgent;
+using management::ManagementBroker;
 
 struct ClusterValues {
     string name;
@@ -76,6 +87,46 @@
     }
 };
 
+struct DumpClientIdAllocator : management::IdAllocator
+{
+    qpid::sys::AtomicValue<uint64_t> sequence;
+
+    DumpClientIdAllocator() : sequence(0x4000000000000000LL) {}
+
+    uint64_t getIdFor(management::Manageable* m)
+    {
+        if (isDumpQueue(m) || isDumpExchange(m) || isDumpSession(m) || isDumpBinding(m)) {
+            return ++sequence;
+        } else {
+            return 0;
+        }
+    }
+
+    bool isDumpQueue(management::Manageable* manageable)
+    {
+        qpid::broker::Queue* queue = dynamic_cast<qpid::broker::Queue*>(manageable);
+        return queue && queue->getName() == DumpClient::DUMP;
+    }
+
+    bool isDumpExchange(management::Manageable* manageable)
+    {
+        qpid::broker::Exchange* exchange = dynamic_cast<qpid::broker::Exchange*>(manageable);
+        return exchange && exchange->getName() == DumpClient::DUMP;
+    }
+
+    bool isDumpSession(management::Manageable* manageable)
+    {
+        broker::SessionState* session = dynamic_cast<broker::SessionState*>(manageable);
+        return session && session->getId().getName() == DumpClient::DUMP;
+    }
+
+    bool isDumpBinding(management::Manageable* manageable)
+    {
+        broker::Exchange::Binding* binding = dynamic_cast<broker::Exchange::Binding*>(manageable);
+        return binding && binding->queue->getName() == DumpClient::DUMP;
+    }
+};
+
 struct ClusterPlugin : public Plugin {
 
     ClusterValues values;
@@ -102,6 +153,11 @@
             boost::shared_ptr<sys::ConnectionCodec::Factory>(
                 new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
         broker->getExchanges().registerExchange(cluster->getFailoverExchange());
+        ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance());
+        if (mgmt) {
+            std::auto_ptr<IdAllocator> allocator(new DumpClientIdAllocator());
+            mgmt->setAllocator(allocator);
+        }
     }
 
     void earlyInitialize(Plugin::Target&) {}
Index: src/qpid/cluster/Connection.h
===================================================================
--- src/qpid/cluster/Connection.h	(revision 736978)
+++ src/qpid/cluster/Connection.h	(working copy)
@@ -31,6 +31,7 @@
 
 #include "qpid/broker/Connection.h"
 #include "qpid/amqp_0_10/Connection.h"
+#include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/ConnectionInputHandler.h"
 #include "qpid/sys/ConnectionOutputHandler.h"
 #include "qpid/framing/FrameDecoder.h"
@@ -173,6 +174,8 @@
     boost::shared_ptr<broker::TxBuffer> txBuffer;
     int readCredit;
     bool expectProtocolHeader;
+
+    static qpid::sys::AtomicValue<uint64_t> catchUpId;
     
   friend std::ostream& operator<<(std::ostream&, const Connection&);
 };
Index: src/qpid/cluster/DumpClient.cpp
===================================================================
--- src/qpid/cluster/DumpClient.cpp	(revision 736978)
+++ src/qpid/cluster/DumpClient.cpp	(working copy)
@@ -94,7 +94,7 @@
       done(ok), failed(fail)
 {
     connection.open(url);
-    session = connection.newSession("dump_shared");
+    session = connection.newSession(DUMP);
 }
 
 DumpClient::~DumpClient() {}
Index: src/qpid/cluster/Connection.cpp
===================================================================
--- src/qpid/cluster/Connection.cpp	(revision 736978)
+++ src/qpid/cluster/Connection.cpp	(working copy)
@@ -69,7 +69,7 @@
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
     : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId, isLink), readCredit(0),
+      connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), readCredit(0),
       expectProtocolHeader(isLink)
 { init(); }
 
@@ -396,5 +396,7 @@
     QPID_LOG(debug, cluster << " decoded queue " << q->getName());    
 }
 
+qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
+
 }} // namespace qpid::cluster
 
Index: src/qpid/broker/Exchange.cpp
===================================================================
--- src/qpid/broker/Exchange.cpp	(revision 736978)
+++ src/qpid/broker/Exchange.cpp	(working copy)
@@ -22,6 +22,7 @@
 #include "Exchange.h"
 #include "ExchangeRegistry.h"
 #include "qpid/agent/ManagementAgent.h"
+#include "qpid/management/ManagementBroker.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/MessageProperties.h"
 #include "DeliverableMessage.h"
@@ -32,6 +33,7 @@
 using qpid::framing::FieldTable;
 using qpid::sys::Mutex;
 using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
@@ -109,12 +111,14 @@
             mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable);
             mgmtExchange->set_arguments(args);
             if (!durable) {
-                if (name == "")
+                if (name == "") {
                     agent->addObject (mgmtExchange, 0x1000000000000004LL);  // Special default exchange ID
-                else if (name == "qpid.management")
+                } else if (name == "qpid.management") {
                     agent->addObject (mgmtExchange, 0x1000000000000005LL);  // Special management exchange ID
-                else
-                    agent->addObject (mgmtExchange);
+                } else {
+                    ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
+                    agent->addObject (mgmtExchange, mb ? mb->allocateId(this) : 0);
+                }
             }
         }
     }
@@ -245,7 +249,8 @@
                     (agent, this, (Manageable*) parent, queueId, key, args);
                 if (!origin.empty())
                     mgmtBinding->set_origin(origin);
-                agent->addObject (mgmtBinding);
+                ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
+                agent->addObject (mgmtBinding, mb ? mb->allocateId(this) : 0);
             }
         }
     }
Index: src/qpid/broker/SessionState.cpp
===================================================================
--- src/qpid/broker/SessionState.cpp	(revision 736978)
+++ src/qpid/broker/SessionState.cpp	(working copy)
@@ -30,6 +30,7 @@
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/ServerInvoker.h"
 #include "qpid/log/Statement.h"
+#include "qpid/management/ManagementBroker.h"
 
 #include <boost/bind.hpp>
 #include <boost/lexical_cast.hpp>
@@ -41,6 +42,7 @@
 using sys::Mutex;
 using boost::intrusive_ptr;
 using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
@@ -65,7 +67,8 @@
             mgmtObject->set_attached (0);
             mgmtObject->set_detachedLifespan (0);
             mgmtObject->clr_expireTime();
-            agent->addObject (mgmtObject);
+            ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
+            agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
         }
     }
     attach(h);
Index: src/qpid/broker/Queue.cpp
===================================================================
--- src/qpid/broker/Queue.cpp	(revision 736978)
+++ src/qpid/broker/Queue.cpp	(working copy)
@@ -30,6 +30,7 @@
 
 #include "qpid/StringUtils.h"
 #include "qpid/log/Statement.h"
+#include "qpid/management/ManagementBroker.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
@@ -46,6 +47,7 @@
 using namespace qpid::sys;
 using namespace qpid::framing;
 using qpid::management::ManagementAgent;
+using qpid::management::ManagementBroker;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
@@ -103,8 +105,10 @@
 
             // Add the object to the management agent only if this queue is not durable.
             // If it's durable, we will add it later when the queue is assigned a persistenceId.
-            if (store == 0)
-                agent->addObject (mgmtObject);
+            if (store == 0) {
+                ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
+                agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
+            }
         }
     }
 }
Index: src/qpid/broker/Exchange.h
===================================================================
--- src/qpid/broker/Exchange.h	(revision 736978)
+++ src/qpid/broker/Exchange.h	(working copy)
@@ -39,6 +39,22 @@
 class ExchangeRegistry;
 
 class Exchange : public PersistableExchange, public management::Manageable {
+public:
+    struct Binding : public management::Manageable {
+        typedef boost::shared_ptr<Binding>       shared_ptr;
+        typedef std::vector<Binding::shared_ptr> vector;
+
+        Queue::shared_ptr         queue;
+        const std::string         key;
+        const framing::FieldTable args;
+        qmf::org::apache::qpid::broker::Binding* mgmtBinding;
+
+        Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0,
+                framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
+        ~Binding();
+        management::ManagementObject* GetManagementObject() const;
+    };
+
 private:
     const std::string name;
     const bool durable;
@@ -64,21 +80,7 @@
            
     void routeIVE();
            
-    struct Binding : public management::Manageable {
-        typedef boost::shared_ptr<Binding>       shared_ptr;
-        typedef std::vector<Binding::shared_ptr> vector;
 
-        Queue::shared_ptr         queue;
-        const std::string         key;
-        const framing::FieldTable args;
-        qmf::org::apache::qpid::broker::Binding* mgmtBinding;
-
-        Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0,
-                framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
-        ~Binding();
-        management::ManagementObject* GetManagementObject() const;
-    };
-
     struct MatchQueue {
         const Queue::shared_ptr queue;        
         MatchQueue(Queue::shared_ptr q);
Index: src/qpid/broker/Connection.h
===================================================================
--- src/qpid/broker/Connection.h	(revision 736978)
+++ src/qpid/broker/Connection.h	(working copy)
@@ -64,7 +64,7 @@
                    public RefCounted
 {
   public:
-    Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false);
+    Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, bool isLink = false, uint64_t objectId = 0);
     ~Connection ();
 
     /** Get the SessionHandler for channel. Create if it does not already exist */
Index: src/qpid/broker/Connection.cpp
===================================================================
--- src/qpid/broker/Connection.cpp	(revision 736978)
+++ src/qpid/broker/Connection.cpp	(working copy)
@@ -48,7 +48,7 @@
 namespace qpid {
 namespace broker {
 
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_, uint64_t objectId) :
     ConnectionState(out_, broker_),
     adapter(*this, isLink_),
     isLink(isLink_),
@@ -70,9 +70,10 @@
 		
 		
         // TODO set last bool true if system connection
-        if (agent != 0)
+        if (agent != 0) {
             mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
-        agent->addObject(mgmtObject);
+            agent->addObject(mgmtObject, objectId);
+        }
         ConnectionState::setUrl(mgmtId);
     }
 }
Index: src/qpid/management/ManagementBroker.cpp
===================================================================
--- src/qpid/management/ManagementBroker.cpp	(revision 736978)
+++ src/qpid/management/ManagementBroker.cpp	(working copy)
@@ -20,6 +20,7 @@
  */
  
 #include "ManagementBroker.h"
+#include "IdAllocator.h"
 #include "qpid/broker/DeliverableMessage.h"
 #include "qpid/log/Statement.h"
 #include <qpid/broker/Message.h>
@@ -1135,3 +1136,16 @@
     inBuffer.restore(); // restore original position
     return end - start;
 }
+
+void ManagementBroker::setAllocator(std::auto_ptr<IdAllocator> a)
+{
+    Mutex::ScopedLock lock (addLock);
+    allocator = a;
+}
+
+uint64_t ManagementBroker::allocateId(Manageable* object)
+{
+    Mutex::ScopedLock lock (addLock);
+    if (allocator.get()) return allocator->getIdFor(object);
+    return 0;
+}
Index: src/qpid/management/IdAllocator.h
===================================================================
--- src/qpid/management/IdAllocator.h	(revision 0)
+++ src/qpid/management/IdAllocator.h	(revision 0)
@@ -0,0 +1,42 @@
+#ifndef QPID_MANAGEMENT_IDALLOCATOR_H
+#define QPID_MANAGEMENT_IDALLOCATOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Manageable.h"
+
+namespace qpid {
+namespace management {
+
+/**
+ * Interface through which plugins etc can control the mgmt object id
+ * allocation for special cases
+ */
+struct IdAllocator
+{
+    virtual uint64_t getIdFor(Manageable* object) = 0;
+    virtual ~IdAllocator() {}
+};
+
+}} // namespace qpid::management
+
+#endif  /*!QPID_MANAGEMENT_IDALLOCATOR_H*/

Property changes on: src/qpid/management/IdAllocator.h
___________________________________________________________________
Name: svn:keywords
   + Rev Date
Name: svn:eol-style
   + native

Index: src/qpid/management/ManagementBroker.h
===================================================================
--- src/qpid/management/ManagementBroker.h	(revision 736978)
+++ src/qpid/management/ManagementBroker.h	(working copy)
@@ -32,10 +32,13 @@
 #include "Manageable.h"
 #include "qmf/org/apache/qpid/broker/Agent.h"
 #include <qpid/framing/AMQFrame.h>
+#include <memory>
 
 namespace qpid {
 namespace management {
 
+struct IdAllocator;
+
 class ManagementBroker : public ManagementAgent
 {
 private:
@@ -43,7 +46,6 @@
     int threadPoolSize;
 
 public:
-
     ManagementBroker ();
     virtual ~ManagementBroker ();
 
@@ -78,6 +80,8 @@
     uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
     int getSignalFd () { assert(0); return -1; }
 
+    void setAllocator(std::auto_ptr<IdAllocator> allocator);
+    uint64_t allocateId(Manageable* object);
 private:
     friend class ManagementAgent;
 
@@ -179,6 +183,8 @@
     uint32_t                     nextRequestSequence;
     bool                         clientWasAdded;
 
+    std::auto_ptr<IdAllocator> allocator;
+
 #   define MA_BUFFER_SIZE 65536
     char inputBuffer[MA_BUFFER_SIZE];
     char outputBuffer[MA_BUFFER_SIZE];

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to