Author: aconway Date: Fri Feb 17 14:04:08 2012 New Revision: 1245476 URL: http://svn.apache.org/viewvc?rev=1245476&view=rev Log: QPID-3603: Move wiring-replicator creation out of SemanticState::route.
Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am?rev=1245476&r1=1245475&r2=1245476&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am Fri Feb 17 14:04:08 2012 @@ -602,8 +602,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/PriorityQueue.cpp \ qpid/broker/NameGenerator.cpp \ qpid/broker/NameGenerator.h \ - qpid/ha/WiringReplicator.h \ - qpid/ha/WiringReplicator.cpp \ qpid/broker/NullMessageStore.cpp \ qpid/broker/NullMessageStore.h \ qpid/broker/OwnershipToken.h \ Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk?rev=1245476&r1=1245475&r2=1245476&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk Fri Feb 17 14:04:08 2012 @@ -23,12 +23,14 @@ dmoduleexec_LTLIBRARIES += ha.la ha_la_SOURCES = \ - qpid/ha/HaPlugin.cpp \ - qpid/ha/HaBroker.cpp \ - qpid/ha/HaBroker.h \ qpid/ha/Backup.cpp \ qpid/ha/Backup.h \ - qpid/ha/Settings.h + qpid/ha/HaBroker.cpp \ + qpid/ha/HaBroker.h \ + qpid/ha/HaPlugin.cpp \ + qpid/ha/Settings.h \ + qpid/ha/WiringReplicator.cpp \ + qpid/ha/WiringReplicator.h ha_la_LIBADD = libqpidbroker.la ha_la_LDFLAGS = $(PLUGINLDFLAGS) Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1245476&r1=1245475&r2=1245476&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Feb 17 14:04:08 2012 @@ -480,7 +480,6 @@ void SemanticState::route(intrusive_ptr< std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) { cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues()); - if (!cacheExchange) cacheExchange = ha::WiringReplicator::create(exchangeName, getSession().getBroker()); if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName); } cacheExchange->setProperties(msg); Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1245476&r1=1245475&r2=1245476&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp Fri Feb 17 14:04:08 2012 @@ -20,11 +20,13 @@ */ #include "Backup.h" #include "Settings.h" +#include "WiringReplicator.h" #include "qpid/Url.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SessionHandler.h" +#include "qpid/broker/Link.h" #include "qpid/framing/AMQP_ServerProxy.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/FieldTable.h" @@ -39,114 +41,24 @@ using namespace broker; using types::Variant; using std::string; -namespace { -const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); -const string _WHAT("_what"); -const string _CLASS_NAME("_class_name"); -const string _PACKAGE_NAME("_package_name"); -const string _SCHEMA_ID("_schema_id"); -const string OBJECT("OBJECT"); -const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); -const string QMF_DEFAULT_DIRECT("qmf.default.direct"); -const string QMF2("qmf2"); -const string QMF_OPCODE("qmf.opcode"); -const string _QUERY_REQUEST("_query_request"); -const string BROKER("broker"); -} - -void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { - framing::AMQP_ServerProxy peer(sessionHandler.out); - Variant::Map request; - request[_WHAT] = OBJECT; - Variant::Map schema; - schema[_CLASS_NAME] = className; - schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; - request[_SCHEMA_ID] = schema; - - AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); - method.setBof(true); - method.setEof(false); - method.setBos(true); - method.setEos(true); - AMQHeaderBody headerBody; - MessageProperties* props = headerBody.get<MessageProperties>(true); - props->setReplyTo(qpid::framing::ReplyTo("", queueName)); - props->setAppId(QMF2); - props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); - headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER); - AMQFrame header(headerBody); - header.setBof(false); - header.setEof(false); - header.setBos(true); - header.setEos(true); - AMQContentBody data; - qpid::amqp_0_10::MapCodec::encode(request, data.getData()); - AMQFrame content(data); - content.setBof(false); - content.setEof(true); - content.setBos(true); - content.setEos(true); - sessionHandler.out->handle(method); - sessionHandler.out->handle(header); - sessionHandler.out->handle(content); -} - -namespace { -const string QMF_DEFAULT_TOPIC("qmf.default.topic"); -const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); -const string QUEUE("queue"); -const string EXCHANGE("exchange"); -const string BINDING("binding"); -} - -// Initialize a bridge as a wiring replicator. -void bridgeInitWiringReplicator(Bridge& bridge, SessionHandler& sessionHandler) { - framing::AMQP_ServerProxy peer(sessionHandler.out); - string queueName = bridge.getQueueName(); - const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); - - //declare and bind an event queue - peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); - peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); - //subscribe to the queue - peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - - //issue a query request for queues and another for exchanges using event queue as the reply-to address - sendQuery(QUEUE, queueName, sessionHandler); - sendQuery(EXCHANGE, queueName, sessionHandler); - sendQuery(BINDING, queueName, sessionHandler); -} - Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { - // Create a link to replicate wiring if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack to identify primary. Url url(s.brokerUrl); QPID_LOG(info, "HA backup broker connecting to: " << url); - string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; - broker.getLinks().declare( // Declare the link + + // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over. + // Declare the link + std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( url[0].host, url[0].port, protocol, false, // durable s.mechanism, s.username, s.password); - - broker.getLinks().declare( // Declare the bridge - url[0].host, url[0].port, - false, // durable - QPID_WIRING_REPLICATOR, // src - QPID_WIRING_REPLICATOR, // dest - "", // key - false, // isQueue - false, // isLocal - "", // id/tag - "", // excludes - false, // dynamic - 0, // sync? - bridgeInitWiringReplicator - ); + assert(result.second); // FIXME aconway 2011-11-23: error handling + link = result.first; + boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link)); + broker.getExchanges().registerExchange(wr); + wr->initialize(); // Must be called after registering exchange. } - // FIXME aconway 2011-11-17: handle discovery of the primary broker and fail-over correctly. } }} // namespace qpid::ha Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h?rev=1245476&r1=1245475&r2=1245476&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h Fri Feb 17 14:04:08 2012 @@ -24,10 +24,12 @@ #include "Settings.h" #include "qpid/Url.h" +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { class Broker; +class Link; } namespace ha { @@ -44,6 +46,7 @@ class Backup private: broker::Broker& broker; Settings settings; + boost::shared_ptr<broker::Link> link; }; }} // namespace qpid::ha Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1245476&r1=1245475&r2=1245476&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Feb 17 14:04:08 2012 @@ -21,9 +21,13 @@ #include "WiringReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/Link.h" +#include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/Codecs.h" +#include "qpid/broker/SessionHandler.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/MessageTransferBody.h" #include "qmf/org/apache/qpid/broker/EventBind.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" @@ -40,62 +44,69 @@ using qmf::org::apache::qpid::broker::Ev using qmf::org::apache::qpid::broker::EventQueueDeclare; using qmf::org::apache::qpid::broker::EventQueueDelete; using qmf::org::apache::qpid::broker::EventSubscribe; - +using namespace framing; using std::string; using types::Variant; using namespace broker; -namespace{ +namespace { +const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); const string QPID_REPLICATE("qpid.replicate"); -const string ALL("all"); -const string WIRING("wiring"); const string CLASS_NAME("_class_name"); +const string EVENT("_event"); const string OBJECT_NAME("_object_name"); const string PACKAGE_NAME("_package_name"); -const string VALUES("_values"); -const string EVENT("_event"); -const string SCHEMA_ID("_schema_id"); const string QUERY_RESPONSE("_query_response"); +const string SCHEMA_ID("_schema_id"); +const string VALUES("_values"); -const string ARGUMENTS("arguments"); +const string ALL("all"); +const string ALTEX("altEx"); const string ARGS("args"); -const string QUEUE("queue"); -const string EXCHANGE("exchange"); +const string ARGUMENTS("arguments"); +const string AUTODEL("autoDel"); +const string AUTODELETE("autoDelete"); const string BIND("bind"); const string BINDING("binding"); +const string CREATED("created"); +const string DISP("disp"); const string DURABLE("durable"); -const string QNAME("qName"); -const string AUTODEL("autoDel"); -const string ALTEX("altEx"); -const string USER("user"); -const string RHOST("rhost"); -const string EXTYPE("exType"); +const string EXCHANGE("exchange"); const string EXNAME("exName"); -const string AUTODELETE("autoDelete"); +const string EXTYPE("exType"); +const string KEY("key"); const string NAME("name"); +const string QNAME("qName"); +const string QUEUE("queue"); +const string RHOST("rhost"); const string TYPE("type"); -const string DISP("disp"); -const string CREATED("created"); -const string KEY("key"); - +const string USER("user"); +const string WIRING("wiring"); -const string QMF_OPCODE("qmf.opcode"); -const string QMF_CONTENT("qmf.content"); +const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string QMF2("qmf2"); +const string QMF_CONTENT("qmf.content"); +const string QMF_DEFAULT_TOPIC("qmf.default.topic"); +const string QMF_OPCODE("qmf.opcode"); -const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); - +const string _WHAT("_what"); +const string _CLASS_NAME("_class_name"); +const string _PACKAGE_NAME("_package_name"); +const string _SCHEMA_ID("_schema_id"); +const string OBJECT("OBJECT"); +const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); +const string QMF_DEFAULT_DIRECT("qmf.default.direct"); +const string _QUERY_REQUEST("_query_request"); +const string BROKER("broker"); -bool isQMFv2(const Message& message) -{ +bool isQMFv2(const Message& message) { const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>(); return props && props->getAppId() == QMF2; } -template <class T> bool match(Variant::Map& schema) -{ +template <class T> bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } @@ -110,13 +121,90 @@ bool isReplicated(const Variant::Map& m) return i != m.end() && isReplicated(i->second.asString()); } +void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + Variant::Map request; + request[_WHAT] = OBJECT; + Variant::Map schema; + schema[_CLASS_NAME] = className; + schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; + request[_SCHEMA_ID] = schema; + + AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); + method.setBof(true); + method.setEof(false); + method.setBos(true); + method.setEos(true); + AMQHeaderBody headerBody; + MessageProperties* props = headerBody.get<MessageProperties>(true); + props->setReplyTo(qpid::framing::ReplyTo("", queueName)); + props->setAppId(QMF2); + props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); + headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER); + AMQFrame header(headerBody); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + AMQContentBody data; + qpid::amqp_0_10::MapCodec::encode(request, data.getData()); + AMQFrame content(data); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + sessionHandler.out->handle(method); + sessionHandler.out->handle(header); + sessionHandler.out->handle(content); +} } // namespace - -WiringReplicator::WiringReplicator(const string& name, Broker& b) : Exchange(name), broker(b) {} - WiringReplicator::~WiringReplicator() {} +WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l) + : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l) +{} + +// We need to split out the initialization so that the WiringReplicator +// can be registered as an exchange before starting the bridge. +void WiringReplicator::initialize() { + assert(link->getBroker()); + broker.getLinks().declare( + link->getHost(), link->getPort(), + false, // durable + QPID_WIRING_REPLICATOR, // src + QPID_WIRING_REPLICATOR, // dest + "", // key + false, // isQueue + false, // isLocal + "", // id/tag + "", // excludes + false, // dynamic + 0, // sync? + boost::bind(&WiringReplicator::initializeBridge, this, _1, _2) + ); +} + +// This is called in the connection IO thread when the bridge is started. +void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + string queueName = bridge.getQueueName(); + const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); + + //declare and bind an event queue + peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); + peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); + //subscribe to the queue + peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + + //issue a query request for queues and another for exchanges using event queue as the reply-to address + sendQuery(QUEUE, queueName, sessionHandler); + sendQuery(EXCHANGE, queueName, sessionHandler); + sendQuery(BINDING, queueName, sessionHandler); +} + void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { Variant::List list; try { @@ -176,7 +264,10 @@ void WiringReplicator::doEventQueueDecla args, values[USER].asString(), values[RHOST].asString()).second) { - // FIXME aconway 2011-11-22: should delete old queue and re-create from exchanges. + // FIXME aconway 2011-11-22: should delete old queue and + // re-create from event. + // Events are always up to date, whereas responses may be + // out of date. QPID_LOG(warning, "Replicated queue " << name << " already exists"); } } @@ -209,7 +300,7 @@ void WiringReplicator::doEventExchangeDe values[USER].asString(), values[RHOST].asString()).second) { // FIXME aconway 2011-11-22: should delete pre-exisitng exchange - // and re-create from event. Likewise for queues. + // and re-create from event. See comment in doEventQueueDeclare. QPID_LOG(warning, "Replicated exchange " << name << " already exists"); } } @@ -286,8 +377,6 @@ void WiringReplicator::doResponseExchang } } -// FIXME aconway 2011-11-21: refactor to remove redundancy between do* functions. - namespace { const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:"); const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:"); @@ -299,7 +388,7 @@ std::string getRefName(const std::string throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref)); const std::string name = i->second.asString(); if (name.compare(0, prefix.size(), prefix) != 0) - throw Exception(QPID_MSG("Replicator unexpected reference prefix: " << name)); + throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name)); std::string ret = name.substr(prefix.size()); return ret; } @@ -336,21 +425,6 @@ void WiringReplicator::doResponseBind(Va } catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange. } -boost::shared_ptr<Exchange> WiringReplicator::create(const string& target, Broker& broker) -{ - boost::shared_ptr<Exchange> exchange; - if (isWiringReplicatorDestination(target)) { - //TODO: need to cache the exchange - exchange.reset(new WiringReplicator(target, broker)); - } - return exchange; -} - -bool WiringReplicator::isWiringReplicatorDestination(const string& target) -{ - return target == QPID_WIRING_REPLICATOR; -} - bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h?rev=1245476&r1=1245475&r2=1245476&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h Fri Feb 17 14:04:08 2012 @@ -24,13 +24,15 @@ #include "qpid/broker/Exchange.h" #include "qpid/types/Variant.h" - -// FIXME aconway 2011-11-17: relocate to ../ha +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { class Broker; +class Link; +class Bridge; +class SessionHandler; } namespace ha { @@ -42,19 +44,23 @@ namespace ha { class WiringReplicator : public broker::Exchange { public: - WiringReplicator(const std::string&, broker::Broker&); + WiringReplicator(const boost::shared_ptr<broker::Link>&); ~WiringReplicator(); std::string getType() const; + + // Call this after the WiringReplicator has been registered as an exchange. + void initialize(); + + // Exchange methods bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); - static bool isWiringReplicatorDestination(const std::string&); - static boost::shared_ptr<broker::Exchange> create(const std::string&, broker::Broker&); static const std::string typeName; - private: + private: + void initializeBridge(broker::Bridge&, broker::SessionHandler&); void doEventQueueDeclare(types::Variant::Map& values); void doEventQueueDelete(types::Variant::Map& values); void doEventExchangeDeclare(types::Variant::Map& values); @@ -65,7 +71,10 @@ class WiringReplicator : public broker:: void doResponseBind(types::Variant::Map& values); private: + void startQueueReplicator(const std::string& name); + broker::Broker& broker; + boost::shared_ptr<broker::Link> link; }; }} // namespace qpid::broker --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org