Author: gsim
Date: Thu Nov 10 15:12:06 2011
New Revision: 1200368

URL: http://svn.apache.org/viewvc?rev=1200368&view=rev
Log:
QPID-3603: Initial (very rough) cut of queue and exchange propagation from one 
node to another

Modified:
    qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
    qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.h
    qpid/branches/qpid-3603/qpid/cpp/src/CMakeLists.txt
    qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    qpid/branches/qpid-3603/qpid/specs/management-schema.xml
    qpid/branches/qpid-3603/qpid/tools/src/py/qpid-config

Modified: 
qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.cpp?rev=1200368&r1=1200367&r2=1200368&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.cpp 
(original)
+++ qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.cpp 
Thu Nov 10 15:12:06 2011
@@ -99,3 +99,8 @@ void Event/*MGEN:Event.NameCap*/::mapEnc
     using namespace ::qpid::types;
 /*MGEN:Event.ArgMap*/
 }
+
+bool Event/*MGEN:Event.NameCap*/::match(const std::string& evt, const 
std::string& pkg)
+{
+    return eventName == evt && packageName == pkg;
+}

Modified: 
qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.h?rev=1200368&r1=1200367&r2=1200368&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.h 
(original)
+++ qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.h Thu 
Nov 10 15:12:06 2011
@@ -51,6 +51,8 @@ class Event/*MGEN:Event.NameCap*/ : publ
     uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; }
     void encode(std::string& buffer) const;
     void mapEncode(::qpid::types::Variant::Map& map) const;
+
+    static bool match(const std::string& evt, const std::string& pkg);
 };
 
 }/*MGEN:Event.CloseNamespaces*/

Modified: qpid/branches/qpid-3603/qpid/cpp/src/CMakeLists.txt
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/CMakeLists.txt?rev=1200368&r1=1200367&r2=1200368&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/CMakeLists.txt Thu Nov 10 15:12:06 2011
@@ -999,6 +999,7 @@ set (qpidbroker_SOURCES
      qpid/broker/LegacyLVQ.cpp
      qpid/broker/MessageDeque.cpp
      qpid/broker/MessageMap.cpp
+     qpid/broker/NodeClone.cpp
      qpid/broker/PriorityQueue.cpp
      qpid/broker/Queue.cpp
      qpid/broker/QueueCleaner.cpp

Modified: qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am?rev=1200368&r1=1200367&r2=1200368&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am Thu Nov 10 15:12:06 2011
@@ -593,6 +593,8 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/PriorityQueue.cpp \
   qpid/broker/NameGenerator.cpp \
   qpid/broker/NameGenerator.h \
+  qpid/broker/NodeClone.h \
+  qpid/broker/NodeClone.cpp \
   qpid/broker/NullMessageStore.cpp \
   qpid/broker/NullMessageStore.h \
   qpid/broker/OwnershipToken.h \

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1200368&r1=1200367&r2=1200368&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Nov 10 
15:12:06 2011
@@ -24,17 +24,28 @@
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/LinkRegistry.h"
+#include "qpid/broker/NodeClone.h"
 #include "qpid/broker/QueueReplicator.h"
 #include "qpid/broker/SessionState.h"
 
 #include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/framing/Uuid.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
 #include "qpid/log/Statement.h"
 #include <iostream>
 
 using qpid::framing::FieldTable;
 using qpid::framing::Uuid;
 using qpid::framing::Buffer;
+using qpid::framing::AMQFrame;
+using qpid::framing::AMQContentBody;
+using qpid::framing::AMQHeaderBody;
+using qpid::framing::MessageProperties;
+using qpid::framing::MessageTransferBody;
+using qpid::types::Variant;
 using qpid::management::ManagementAgent;
 using std::string;
 namespace _qmf = qmf::org::apache::qpid::broker;
@@ -105,6 +116,52 @@ void Bridge::create(Connection& c)
         peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
         peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
         QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " 
<< args.i_dest);
+    } else if (NodeClone::isNodeCloneDestination(args.i_dest)) {
+        //declare and bind an event queue
+        peer->getQueue().declare(queueName, "", false, false, true, true, 
FieldTable());
+        peer->getExchange().bind(queueName, "qmf.default.topic", 
"agent.ind.event.org_apache_qpid_broker.#", FieldTable());
+        //subscribe to the queue
+        peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 
0, FieldTable());
+        peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+        peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+        //issue a query request for queues and another for exchanges using 
event queue as the reply-to address
+        for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable 
utility functions
+            Variant::Map request;
+            request["_what"] = "OBJECT";
+            Variant::Map schema;
+            schema["_class_name"] = (i == 0 ? "queue" : "exchange");
+            schema["_package_name"] = "org.apache.qpid.broker";
+            request["_schema_id"] = schema;
+
+            AMQFrame 
method((MessageTransferBody(qpid::framing::ProtocolVersion(), 
"qmf.default.direct", 0, 0)));
+            method.setBof(true);
+            method.setEof(false);
+            method.setBos(true);
+            method.setEos(true);
+            AMQHeaderBody headerBody;
+            MessageProperties* props = headerBody.get<MessageProperties>(true);
+            props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+            props->setAppId("qmf2");
+            props->getApplicationHeaders().setString("qmf.opcode", 
"_query_request");
+            
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
+            AMQFrame header(headerBody);
+            header.setBof(false);
+            header.setEof(false);
+            header.setBos(true);
+            header.setEos(true);
+            AMQContentBody data;
+            qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+            AMQFrame content(data);
+            content.setBof(false);
+            content.setEof(true);
+            content.setBos(true);
+            content.setEos(true);
+            sessionHandler.out->handle(method);
+            sessionHandler.out->handle(header);
+            sessionHandler.out->handle(content);
+        }
+
     } else {
         FieldTable queueSettings;
 

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1200368&r1=1200367&r2=1200368&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.cpp Thu Nov 10 
15:12:06 2011
@@ -904,7 +904,7 @@ std::pair<boost::shared_ptr<Queue>, bool
             //event instead?
             managementAgent->raiseEvent(
                 _qmf::EventQueueDeclare(connectionId, userId, name,
-                                        durable, owner, autodelete,
+                                        durable, owner, autodelete, 
alternateExchange,
                                         ManagementAgent::toMap(arguments),
                                         "created"));
         }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1200368&r1=1200367&r2=1200368&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp 
(original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Nov 
10 15:12:06 2011
@@ -25,6 +25,7 @@
 #include "qpid/broker/DtxAck.h"
 #include "qpid/broker/DtxTimeout.h"
 #include "qpid/broker/Message.h"
+#include "qpid/broker/NodeClone.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/QueueReplicator.h"
 #include "qpid/broker/SessionContext.h"
@@ -694,6 +695,7 @@ void SemanticState::route(intrusive_ptr<
     std::string exchangeName = msg->getExchangeName();
     if (!cacheExchange || cacheExchange->getName() != exchangeName || 
cacheExchange->isDestroyed()) {
         cacheExchange = QueueReplicator::create(exchangeName, 
getSession().getBroker().getQueues());
+        if (!cacheExchange) cacheExchange = NodeClone::create(exchangeName, 
getSession().getBroker());
         if (!cacheExchange) cacheExchange = 
session.getBroker().getExchanges().get(exchangeName);
     }
     cacheExchange->setProperties(msg);

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1200368&r1=1200367&r2=1200368&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionAdapter.cpp 
(original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Thu Nov 
10 15:12:06 2011
@@ -321,8 +321,8 @@ void SessionAdapter::QueueHandlerImpl::d
             ManagementAgent* agent = getBroker().getManagementAgent();
             if (agent)
                 
agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), 
getConnection().getUserId(),
-                                                      name, durable, 
exclusive, autoDelete, ManagementAgent::toMap(arguments),
-                                                      "existing"));
+                                                          name, durable, 
exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments),
+                                                          "existing"));
         }
 
     }

Modified: qpid/branches/qpid-3603/qpid/specs/management-schema.xml
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/specs/management-schema.xml?rev=1200368&r1=1200367&r2=1200368&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/specs/management-schema.xml (original)
+++ qpid/branches/qpid-3603/qpid/specs/management-schema.xml Thu Nov 10 
15:12:06 2011
@@ -425,7 +425,7 @@
   <event name="clientDisconnect"  sev="inform" args="rhost, user"/>
   <event name="brokerLinkUp"      sev="inform" args="rhost"/>
   <event name="brokerLinkDown"    sev="warn"   args="rhost"/>
-  <event name="queueDeclare"      sev="inform" args="rhost, user, qName, 
durable, excl, autoDel, args, disp"/>
+  <event name="queueDeclare"      sev="inform" args="rhost, user, qName, 
durable, excl, autoDel, altEx, args, disp"/>
   <event name="queueDelete"       sev="inform" args="rhost, user, qName"/>
   <event name="exchangeDeclare"   sev="inform" args="rhost, user, exName, 
exType, altEx, durable, autoDel, args, disp"/>
   <event name="exchangeDelete"    sev="inform" args="rhost, user, exName"/>

Modified: qpid/branches/qpid-3603/qpid/tools/src/py/qpid-config
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/tools/src/py/qpid-config?rev=1200368&r1=1200367&r2=1200368&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/tools/src/py/qpid-config (original)
+++ qpid/branches/qpid-3603/qpid/tools/src/py/qpid-config Thu Nov 10 15:12:06 
2011
@@ -492,6 +492,12 @@ class BrokerManager:
         etype = args[0]
         ename = args[1]
         declArgs = {}
+        for a in config._extra_arguments:
+            r = a.split("=", 1)
+            if len(r) == 2: value = r[1]
+            else: value = None
+            declArgs[r[0]] = value
+
         if config._msgSequence:
             declArgs[MSG_SEQUENCE] = 1
         if config._ive:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to