Author: gsim Date: Thu Nov 10 15:12:06 2011 New Revision: 1200368 URL: http://svn.apache.org/viewvc?rev=1200368&view=rev Log: QPID-3603: Initial (very rough) cut of queue and exchange propagation from one node to another
Modified: qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.cpp qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.h qpid/branches/qpid-3603/qpid/cpp/src/CMakeLists.txt qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionAdapter.cpp qpid/branches/qpid-3603/qpid/specs/management-schema.xml qpid/branches/qpid-3603/qpid/tools/src/py/qpid-config Modified: qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.cpp?rev=1200368&r1=1200367&r2=1200368&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.cpp Thu Nov 10 15:12:06 2011 @@ -99,3 +99,8 @@ void Event/*MGEN:Event.NameCap*/::mapEnc using namespace ::qpid::types; /*MGEN:Event.ArgMap*/ } + +bool Event/*MGEN:Event.NameCap*/::match(const std::string& evt, const std::string& pkg) +{ + return eventName == evt && packageName == pkg; +} Modified: qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.h?rev=1200368&r1=1200367&r2=1200368&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.h (original) +++ qpid/branches/qpid-3603/qpid/cpp/managementgen/qmfgen/templates/Event.h Thu Nov 10 15:12:06 2011 @@ -51,6 +51,8 @@ class Event/*MGEN:Event.NameCap*/ : publ uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; } void encode(std::string& buffer) const; void mapEncode(::qpid::types::Variant::Map& map) const; + + static bool match(const std::string& evt, const std::string& pkg); }; }/*MGEN:Event.CloseNamespaces*/ Modified: qpid/branches/qpid-3603/qpid/cpp/src/CMakeLists.txt URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/CMakeLists.txt?rev=1200368&r1=1200367&r2=1200368&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/CMakeLists.txt (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/CMakeLists.txt Thu Nov 10 15:12:06 2011 @@ -999,6 +999,7 @@ set (qpidbroker_SOURCES qpid/broker/LegacyLVQ.cpp qpid/broker/MessageDeque.cpp qpid/broker/MessageMap.cpp + qpid/broker/NodeClone.cpp qpid/broker/PriorityQueue.cpp qpid/broker/Queue.cpp qpid/broker/QueueCleaner.cpp Modified: qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am?rev=1200368&r1=1200367&r2=1200368&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/Makefile.am Thu Nov 10 15:12:06 2011 @@ -593,6 +593,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/PriorityQueue.cpp \ qpid/broker/NameGenerator.cpp \ qpid/broker/NameGenerator.h \ + qpid/broker/NodeClone.h \ + qpid/broker/NodeClone.cpp \ qpid/broker/NullMessageStore.cpp \ qpid/broker/NullMessageStore.h \ qpid/broker/OwnershipToken.h \ Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=1200368&r1=1200367&r2=1200368&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Nov 10 15:12:06 2011 @@ -24,17 +24,28 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/Link.h" #include "qpid/broker/LinkRegistry.h" +#include "qpid/broker/NodeClone.h" #include "qpid/broker/QueueReplicator.h" #include "qpid/broker/SessionState.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/types/Variant.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/framing/Uuid.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include <iostream> using qpid::framing::FieldTable; using qpid::framing::Uuid; using qpid::framing::Buffer; +using qpid::framing::AMQFrame; +using qpid::framing::AMQContentBody; +using qpid::framing::AMQHeaderBody; +using qpid::framing::MessageProperties; +using qpid::framing::MessageTransferBody; +using qpid::types::Variant; using qpid::management::ManagementAgent; using std::string; namespace _qmf = qmf::org::apache::qpid::broker; @@ -105,6 +116,52 @@ void Bridge::create(Connection& c) peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); + } else if (NodeClone::isNodeCloneDestination(args.i_dest)) { + //declare and bind an event queue + peer->getQueue().declare(queueName, "", false, false, true, true, FieldTable()); + peer->getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#", FieldTable()); + //subscribe to the queue + peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + + //issue a query request for queues and another for exchanges using event queue as the reply-to address + for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions + Variant::Map request; + request["_what"] = "OBJECT"; + Variant::Map schema; + schema["_class_name"] = (i == 0 ? "queue" : "exchange"); + schema["_package_name"] = "org.apache.qpid.broker"; + request["_schema_id"] = schema; + + AMQFrame method((MessageTransferBody(qpid::framing::ProtocolVersion(), "qmf.default.direct", 0, 0))); + method.setBof(true); + method.setEof(false); + method.setBos(true); + method.setEos(true); + AMQHeaderBody headerBody; + MessageProperties* props = headerBody.get<MessageProperties>(true); + props->setReplyTo(qpid::framing::ReplyTo("", queueName)); + props->setAppId("qmf2"); + props->getApplicationHeaders().setString("qmf.opcode", "_query_request"); + headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker"); + AMQFrame header(headerBody); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + AMQContentBody data; + qpid::amqp_0_10::MapCodec::encode(request, data.getData()); + AMQFrame content(data); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + sessionHandler.out->handle(method); + sessionHandler.out->handle(header); + sessionHandler.out->handle(content); + } + } else { FieldTable queueSettings; Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1200368&r1=1200367&r2=1200368&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Broker.cpp Thu Nov 10 15:12:06 2011 @@ -904,7 +904,7 @@ std::pair<boost::shared_ptr<Queue>, bool //event instead? managementAgent->raiseEvent( _qmf::EventQueueDeclare(connectionId, userId, name, - durable, owner, autodelete, + durable, owner, autodelete, alternateExchange, ManagementAgent::toMap(arguments), "created")); } Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1200368&r1=1200367&r2=1200368&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Nov 10 15:12:06 2011 @@ -25,6 +25,7 @@ #include "qpid/broker/DtxAck.h" #include "qpid/broker/DtxTimeout.h" #include "qpid/broker/Message.h" +#include "qpid/broker/NodeClone.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueReplicator.h" #include "qpid/broker/SessionContext.h" @@ -694,6 +695,7 @@ void SemanticState::route(intrusive_ptr< std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) { cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues()); + if (!cacheExchange) cacheExchange = NodeClone::create(exchangeName, getSession().getBroker()); if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName); } cacheExchange->setProperties(msg); Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionAdapter.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=1200368&r1=1200367&r2=1200368&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Thu Nov 10 15:12:06 2011 @@ -321,8 +321,8 @@ void SessionAdapter::QueueHandlerImpl::d ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), - name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments), - "existing")); + name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments), + "existing")); } } Modified: qpid/branches/qpid-3603/qpid/specs/management-schema.xml URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/specs/management-schema.xml?rev=1200368&r1=1200367&r2=1200368&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/specs/management-schema.xml (original) +++ qpid/branches/qpid-3603/qpid/specs/management-schema.xml Thu Nov 10 15:12:06 2011 @@ -425,7 +425,7 @@ <event name="clientDisconnect" sev="inform" args="rhost, user"/> <event name="brokerLinkUp" sev="inform" args="rhost"/> <event name="brokerLinkDown" sev="warn" args="rhost"/> - <event name="queueDeclare" sev="inform" args="rhost, user, qName, durable, excl, autoDel, args, disp"/> + <event name="queueDeclare" sev="inform" args="rhost, user, qName, durable, excl, autoDel, altEx, args, disp"/> <event name="queueDelete" sev="inform" args="rhost, user, qName"/> <event name="exchangeDeclare" sev="inform" args="rhost, user, exName, exType, altEx, durable, autoDel, args, disp"/> <event name="exchangeDelete" sev="inform" args="rhost, user, exName"/> Modified: qpid/branches/qpid-3603/qpid/tools/src/py/qpid-config URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/tools/src/py/qpid-config?rev=1200368&r1=1200367&r2=1200368&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/tools/src/py/qpid-config (original) +++ qpid/branches/qpid-3603/qpid/tools/src/py/qpid-config Thu Nov 10 15:12:06 2011 @@ -492,6 +492,12 @@ class BrokerManager: etype = args[0] ename = args[1] declArgs = {} + for a in config._extra_arguments: + r = a.split("=", 1) + if len(r) == 2: value = r[1] + else: value = None + declArgs[r[0]] = value + if config._msgSequence: declArgs[MSG_SEQUENCE] = 1 if config._ive: --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org