Author: aconway Date: Fri Feb 17 14:03:59 2012 New Revision: 1245475 URL: http://svn.apache.org/viewvc?rev=1245475&view=rev Log: QPID-3603: Replicate bindings to backup brokers.
Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1245475&r1=1245474&r2=1245475&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp Fri Feb 17 14:03:59 2012 @@ -37,70 +37,95 @@ namespace ha { using namespace framing; using namespace broker; using types::Variant; +using std::string; namespace { -const std::string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); +const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); +const string _WHAT("_what"); +const string _CLASS_NAME("_class_name"); +const string _PACKAGE_NAME("_package_name"); +const string _SCHEMA_ID("_schema_id"); +const string OBJECT("OBJECT"); +const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); +const string QMF_DEFAULT_DIRECT("qmf.default.direct"); +const string QMF2("qmf2"); +const string QMF_OPCODE("qmf.opcode"); +const string _QUERY_REQUEST("_query_request"); +const string BROKER("broker"); +} + +void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + Variant::Map request; + request[_WHAT] = OBJECT; + Variant::Map schema; + schema[_CLASS_NAME] = className; + schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; + request[_SCHEMA_ID] = schema; + + AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); + method.setBof(true); + method.setEof(false); + method.setBos(true); + method.setEos(true); + AMQHeaderBody headerBody; + MessageProperties* props = headerBody.get<MessageProperties>(true); + props->setReplyTo(qpid::framing::ReplyTo("", queueName)); + props->setAppId(QMF2); + props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); + headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER); + AMQFrame header(headerBody); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + AMQContentBody data; + qpid::amqp_0_10::MapCodec::encode(request, data.getData()); + AMQFrame content(data); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + sessionHandler.out->handle(method); + sessionHandler.out->handle(header); + sessionHandler.out->handle(content); +} + +namespace { +const string QMF_DEFAULT_TOPIC("qmf.default.topic"); +const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); +const string QUEUE("queue"); +const string EXCHANGE("exchange"); +const string BINDING("binding"); } // Initialize a bridge as a wiring replicator. void bridgeInitWiringReplicator(Bridge& bridge, SessionHandler& sessionHandler) { framing::AMQP_ServerProxy peer(sessionHandler.out); - std::string queueName = bridge.getQueueName(); + string queueName = bridge.getQueueName(); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); //declare and bind an event queue peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); - peer.getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#", FieldTable()); + peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); //subscribe to the queue peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); //issue a query request for queues and another for exchanges using event queue as the reply-to address - for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions - Variant::Map request; - request["_what"] = "OBJECT"; - Variant::Map schema; - schema["_class_name"] = (i == 0 ? "queue" : "exchange"); - schema["_package_name"] = "org.apache.qpid.broker"; - request["_schema_id"] = schema; - - AMQFrame method((MessageTransferBody(ProtocolVersion(), "qmf.default.direct", 0, 0))); - method.setBof(true); - method.setEof(false); - method.setBos(true); - method.setEos(true); - AMQHeaderBody headerBody; - MessageProperties* props = headerBody.get<MessageProperties>(true); - props->setReplyTo(qpid::framing::ReplyTo("", queueName)); - props->setAppId("qmf2"); - props->getApplicationHeaders().setString("qmf.opcode", "_query_request"); - headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker"); - AMQFrame header(headerBody); - header.setBof(false); - header.setEof(false); - header.setBos(true); - header.setEos(true); - AMQContentBody data; - qpid::amqp_0_10::MapCodec::encode(request, data.getData()); - AMQFrame content(data); - content.setBof(false); - content.setEof(true); - content.setBos(true); - content.setEos(true); - sessionHandler.out->handle(method); - sessionHandler.out->handle(header); - sessionHandler.out->handle(content); - } + sendQuery(QUEUE, queueName, sessionHandler); + sendQuery(EXCHANGE, queueName, sessionHandler); + sendQuery(BINDING, queueName, sessionHandler); } Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { // Create a link to replicate wiring - if (s.brokerUrl != "dummy") { + if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack to identify primary. Url url(s.brokerUrl); QPID_LOG(info, "HA backup broker connecting to: " << url); - std::string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; broker.getLinks().declare( // Declare the link url[0].host, url[0].port, protocol, false, // durable @@ -111,7 +136,7 @@ Backup::Backup(broker::Broker& b, const false, // durable QPID_WIRING_REPLICATOR, // src QPID_WIRING_REPLICATOR, // dest - "x", // key + "", // key false, // isQueue false, // isLocal "", // id/tag @@ -121,8 +146,7 @@ Backup::Backup(broker::Broker& b, const bridgeInitWiringReplicator ); } - // FIXME aconway 2011-11-17: need to enhance the link code to - // handle discovery of the primary broker and fail-over correctly. + // FIXME aconway 2011-11-17: handle discovery of the primary broker and fail-over correctly. } }} // namespace qpid::ha Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1245475&r1=1245474&r2=1245475&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Feb 17 14:03:59 2012 @@ -52,6 +52,7 @@ const string ALL("all"); const string WIRING("wiring"); const string CLASS_NAME("_class_name"); +const string OBJECT_NAME("_object_name"); const string PACKAGE_NAME("_package_name"); const string VALUES("_values"); const string EVENT("_event"); @@ -59,10 +60,11 @@ const string SCHEMA_ID("_schema_id"); const string QUERY_RESPONSE("_query_response"); const string ARGUMENTS("arguments"); +const string ARGS("args"); const string QUEUE("queue"); const string EXCHANGE("exchange"); const string BIND("bind"); -const string ARGS("args"); +const string BINDING("binding"); const string DURABLE("durable"); const string QNAME("qName"); const string AUTODEL("autoDel"); @@ -116,19 +118,16 @@ WiringReplicator::WiringReplicator(const WiringReplicator::~WiringReplicator() {} void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { + Variant::List list; try { - // FIXME aconway 2011-11-21: outer error handling, e.g. for decoding error. if (!isQMFv2(msg.getMessage()) || !headers) throw Exception("Unexpected message, not QMF2 event or query response."); // decode as list string content = msg.getMessage().getFrames().getContent(); - Variant::List list; amqp_0_10::ListCodec::decode(content, list); - QPID_LOG(critical, "FIXME WiringReplicator message: " << list); if (headers->getAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { - // FIXME aconway 2011-11-18: should be iterating list? Variant::Map& map = list.front().asMap(); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); @@ -142,33 +141,32 @@ void WiringReplicator::route(Deliverable else throw(Exception(QPID_MSG("WiringReplicator received unexpected event, schema=" << schema))); } } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { - QPID_LOG(critical, "FIXME WiringReplicator response: " << list); for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME]; Variant::Map& values = i->asMap()[VALUES].asMap(); - if (isReplicated(values[ARGUMENTS].asMap())) { - framing::FieldTable args; - amqp_0_10::translate(values[ARGUMENTS].asMap(), args); - if (type == QUEUE) doResponseQueue(values); - else if (type == EXCHANGE) doResponseExchange(values); - else if (type == BIND) doResponseBind(values); - else throw Exception(QPID_MSG("Ignoring unexpected class: " << type)); - } + framing::FieldTable args; + amqp_0_10::translate(values[ARGUMENTS].asMap(), args); + if (type == QUEUE) doResponseQueue(values); + else if (type == EXCHANGE) doResponseExchange(values); + else if (type == BINDING) doResponseBind(values); + else throw Exception(QPID_MSG("Ignoring unexpected class: " << type)); } } else { - QPID_LOG(warning, QPID_MSG("Ignoring QMFv2 message with headers: " << *headers)); + QPID_LOG(warning, QPID_MSG("Replicator: Ignoring QMFv2 message with headers: " << *headers)); } } catch (const std::exception& e) { - QPID_LOG(warning, "Error replicating configuration: " << e.what()); + QPID_LOG(warning, "Replicator: Error replicating configuration: " << e.what()); + QPID_LOG(debug, "Replicator: Error processing: " << list); } } void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { string name = values[QNAME].asString(); - if (values[DISP] == CREATED && isReplicated(values[ARGS].asMap())) { + Variant::Map argsMap = values[ARGS].asMap(); + if (values[DISP] == CREATED && isReplicated(argsMap)) { QPID_LOG(debug, "Creating replicated queue " << name); framing::FieldTable args; - amqp_0_10::translate(values[ARGS].asMap(), args); + amqp_0_10::translate(argsMap, args); if (!broker.createQueue( name, values[DURABLE].asBool(), @@ -178,6 +176,7 @@ void WiringReplicator::doEventQueueDecla args, values[USER].asString(), values[RHOST].asString()).second) { + // FIXME aconway 2011-11-22: should delete old queue and re-create from exchanges. QPID_LOG(warning, "Replicated queue " << name << " already exists"); } } @@ -201,7 +200,6 @@ void WiringReplicator::doEventExchangeDe string name = values[EXNAME].asString(); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - QPID_LOG(debug, "Creating replicated exchange " << name); if (!broker.createExchange( name, values[EXTYPE].asString(), @@ -210,6 +208,8 @@ void WiringReplicator::doEventExchangeDe args, values[USER].asString(), values[RHOST].asString()).second) { + // FIXME aconway 2011-11-22: should delete pre-exisitng exchange + // and re-create from event. Likewise for queues. QPID_LOG(warning, "Replicated exchange " << name << " already exists"); } } @@ -230,14 +230,13 @@ void WiringReplicator::doEventExchangeDe } void WiringReplicator::doEventBind(Variant::Map& values) { - QPID_LOG(critical, "FIXME doEventBind " << values); try { boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = broker.getQueues().find(values[QNAME].asString()); // We only replicated a binds for a replicated queue to replicated exchange. if (isReplicated(exchange->getArgs()) && isReplicated(queue->getSettings())) { framing::FieldTable args; - amqp_0_10::translate(args, values[ARGS].asMap()); + amqp_0_10::translate(values[ARGS].asMap(), args); string key = values[KEY].asString(); QPID_LOG(debug, "Replicated binding exchange=" << exchange->getName() << " queue=" << queue->getName() @@ -248,6 +247,11 @@ void WiringReplicator::doEventBind(Varia } void WiringReplicator::doResponseQueue(Variant::Map& values) { + // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate replication + Variant::Map argsMap(values[ARGUMENTS].asMap()); + if (!isReplicated(argsMap)) return; + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); QPID_LOG(debug, "Creating replicated queue " << values[NAME].asString() << " (in catch-up)"); if (!broker.createQueue( values[NAME].asString(), @@ -258,11 +262,17 @@ void WiringReplicator::doResponseQueue(V args, ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second) { + // FIXME aconway 2011-11-22: Normal to find queue already + // exists if we're failing over. QPID_LOG(warning, "Replicated queue " << values[NAME] << " already exists (in catch-up)"); } } void WiringReplicator::doResponseExchange(Variant::Map& values) { + Variant::Map argsMap(values[ARGUMENTS].asMap()); + if (!isReplicated(argsMap)) return; + framing::FieldTable args; + amqp_0_10::translate(argsMap, args); QPID_LOG(debug, "Creating replicated exchange " << values[NAME].asString() << " (in catch-up)"); if (!broker.createExchange( values[NAME].asString(), @@ -276,9 +286,54 @@ void WiringReplicator::doResponseExchang } } +// FIXME aconway 2011-11-21: refactor to remove redundancy between do* functions. + +namespace { +const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:"); +const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:"); + +std::string getRefName(const std::string& prefix, const Variant& ref) { + Variant::Map map(ref.asMap()); + Variant::Map::const_iterator i = map.find(OBJECT_NAME); + if (i == map.end()) + throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref)); + const std::string name = i->second.asString(); + if (name.compare(0, prefix.size(), prefix) != 0) + throw Exception(QPID_MSG("Replicator unexpected reference prefix: " << name)); + std::string ret = name.substr(prefix.size()); + return ret; +} + +const std::string EXCHANGE_REF("exchangeRef"); +const std::string QUEUE_REF("queueRef"); + +} // namespace + void WiringReplicator::doResponseBind(Variant::Map& values) { - QPID_LOG(critical, "FIXME doResponseBind " << values); - throw Exception("FIXME WiringReplicator: Not yet implemented - catch-up replicate bindings."); + try { + std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); + boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(exName); + if (!exchange) return; + + std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); + boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); + if (!queue) return; + + // We only replicated a bind for a replicated queue to replicated exchange. + // FIXME aconway 2011-11-22: do we always log binds between replicated ex/q + // or do we consider the bind arguments as well? + if (exchange && queue && + isReplicated(exchange->getArgs()) && isReplicated(queue->getSettings())) + { + framing::FieldTable args; + amqp_0_10::translate(values[ARGUMENTS].asMap(), args); + string key = values[KEY].asString(); + QPID_LOG(debug, "Replicated binding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->bind(queue, key, &args); + } + } catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange. } boost::shared_ptr<Exchange> WiringReplicator::create(const string& target, Broker& broker) Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py?rev=1245475&r1=1245474&r2=1245475&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py Fri Feb 17 14:03:59 2012 @@ -50,7 +50,7 @@ class ShortTests(BrokerTest): def wait(self, session, address): def check(): try: - session.receiver(address) + session.sender(address) return True except NotFound: return False assert retry(check), "Timed out waiting for %s"%(address) @@ -67,37 +67,39 @@ class ShortTests(BrokerTest): # Create some wiring before starting the backup, to test catch-up primary = self.ha_broker(name="primary") - s = primary.connect().session() - s.sender(queue%("q1", "all")).send(Message("1")) - s.sender(queue%("q2", "wiring")).send(Message("2")) - s.sender(queue%("q3", "none")).send(Message("3")) - s.sender(exchange%("e1", "all", "e1", "q2")).send(Message("4")) + p = primary.connect().session() + p.sender(queue%("q1", "all")).send(Message("1")) + p.sender(queue%("q2", "wiring")).send(Message("2")) + p.sender(queue%("q3", "none")).send(Message("3")) + p.sender(exchange%("e1", "all", "e1", "q2")).send(Message("4")) # Create some after starting backup, test steady-state replication backup = self.ha_broker(name="backup", broker_url=primary.host_port()) - s.sender(queue%("q01", "all")).send(Message("01")) - s.sender(queue%("q02", "wiring")).send(Message("02")) - s.sender(queue%("q03", "none")).send(Message("03")) - s.sender(exchange%("e01", "all", "e01", "q02")).send(Message("04")) + b = backup.connect().session() + # FIXME aconway 2011-11-21: need to wait for backup to be ready to test event replication + for a in ["q1", "q2", "e1"]: self.wait(b,a) + p.sender(queue%("q11", "all")).send(Message("11")) + p.sender(queue%("q12", "wiring")).send(Message("12")) + p.sender(queue%("q13", "none")).send(Message("13")) + p.sender(exchange%("e11", "all", "e11", "q12")).send(Message("14")) # Verify replication # FIXME aconway 2011-11-18: We should kill primary here and fail over. - s = backup.connect().session() - for a in ["q01", "q02", "e01"]: self.wait(s,a) + for a in ["q11", "q12", "e11"]: self.wait(b,a) # FIXME aconway 2011-11-18: replicate messages -# self.assert_browse(s, "q01", ["01", "04", "e01"]) -# self.assert_browse(s, "q02", []) # wiring only -# self.assert_missing(s,"q03") - s.sender("e01").send(Message("e01")) # Verify bind - self.assert_browse(s, "q02", ["e01"]) +# self.assert_browse(b, "q11", ["11", "14", "e11"]) +# self.assert_browse(b, "q12", []) # wiring only +# self.assert_missing(b,"q13") + b.sender("e11").send(Message("e11")) # Verify bind + self.assert_browse(b, "q12", ["e11"]) - for a in ["q1", "q2", "e1"]: self.wait(s,a) + for a in ["q1", "q2", "e1"]: self.wait(b,a) # FIXME aconway 2011-11-18: replicate messages -# self.assert_browse(s, "q1", ["1", "4", "e1"]) -# self.assert_browse(s, "q2", []) # wiring only -# self.assert_missing(s,"q3") - s.sender("e1").send(Message("e1")) # Verify bind - self.assert_browse(s, "q2", ["e1"]) +# self.assert_browse(b, "q1", ["1", "4", "e1"]) +# self.assert_browse(b, "q2", []) # wiring only +# self.assert_missing(b,"q3") + b.sender("e1").send(Message("e1")) # Verify bind + self.assert_browse(b, "q2", ["e1"]) if __name__ == "__main__": --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org