Author: aconway Date: Fri Feb 17 14:03:49 2012 New Revision: 1245474 URL: http://svn.apache.org/viewvc?rev=1245474&view=rev Log: QPID-3603: Minor refactor.
Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1245474&r1=1245473&r2=1245474&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Feb 17 14:03:49 2012 @@ -31,6 +31,9 @@ #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventSubscribe.h" +namespace qpid { +namespace ha { + using qmf::org::apache::qpid::broker::EventBind; using qmf::org::apache::qpid::broker::EventExchangeDeclare; using qmf::org::apache::qpid::broker::EventExchangeDelete; @@ -38,50 +41,49 @@ using qmf::org::apache::qpid::broker::Ev using qmf::org::apache::qpid::broker::EventQueueDelete; using qmf::org::apache::qpid::broker::EventSubscribe; -namespace qpid { -namespace ha { - +using std::string; using types::Variant; using namespace broker; namespace{ -const std::string QPID_REPLICATE("qpid.replicate"); -const std::string ALL("all"); -const std::string WIRING("wiring"); - -const std::string CLASS_NAME("_class_name"); -const std::string PACKAGE_NAME("_package_name"); -const std::string VALUES("_values"); -const std::string EVENT("_event"); -const std::string SCHEMA_ID("_schema_id"); -const std::string QUERY_RESPONSE("_query_response"); - -const std::string ARGUMENTS("arguments"); -const std::string QUEUE("queue"); -const std::string EXCHANGE("exchange"); -const std::string BIND("bind"); -const std::string ARGS("args"); -const std::string DURABLE("durable"); -const std::string QNAME("qName"); -const std::string AUTODEL("autoDel"); -const std::string ALTEX("altEx"); -const std::string USER("user"); -const std::string RHOST("rhost"); -const std::string EXTYPE("exType"); -const std::string EXNAME("exName"); -const std::string AUTODELETE("autoDelete"); -const std::string NAME("name"); -const std::string TYPE("type"); -const std::string DISP("disp"); -const std::string CREATED("created"); - - -const std::string QMF_OPCODE("qmf.opcode"); -const std::string QMF_CONTENT("qmf.content"); -const std::string QMF2("qmf2"); +const string QPID_REPLICATE("qpid.replicate"); +const string ALL("all"); +const string WIRING("wiring"); + +const string CLASS_NAME("_class_name"); +const string PACKAGE_NAME("_package_name"); +const string VALUES("_values"); +const string EVENT("_event"); +const string SCHEMA_ID("_schema_id"); +const string QUERY_RESPONSE("_query_response"); + +const string ARGUMENTS("arguments"); +const string QUEUE("queue"); +const string EXCHANGE("exchange"); +const string BIND("bind"); +const string ARGS("args"); +const string DURABLE("durable"); +const string QNAME("qName"); +const string AUTODEL("autoDel"); +const string ALTEX("altEx"); +const string USER("user"); +const string RHOST("rhost"); +const string EXTYPE("exType"); +const string EXNAME("exName"); +const string AUTODELETE("autoDelete"); +const string NAME("name"); +const string TYPE("type"); +const string DISP("disp"); +const string CREATED("created"); +const string KEY("key"); + + +const string QMF_OPCODE("qmf.opcode"); +const string QMF_CONTENT("qmf.content"); +const string QMF2("qmf2"); -const std::string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); +const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); bool isQMFv2(const Message& message) @@ -95,7 +97,7 @@ template <class T> bool match(Variant::M return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } -bool isReplicated(const std::string& value) { +bool isReplicated(const string& value) { return value == ALL || value == WIRING; } bool isReplicated(const framing::FieldTable& f) { @@ -109,20 +111,22 @@ bool isReplicated(const Variant::Map& m) } // namespace -WiringReplicator::WiringReplicator(const std::string& name, Broker& b) : Exchange(name), broker(b) {} +WiringReplicator::WiringReplicator(const string& name, Broker& b) : Exchange(name), broker(b) {} WiringReplicator::~WiringReplicator() {} -void WiringReplicator::route(Deliverable& msg, const std::string& /*key*/, const framing::FieldTable* headers) { +void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { try { // FIXME aconway 2011-11-21: outer error handling, e.g. for decoding error. if (!isQMFv2(msg.getMessage()) || !headers) throw Exception("Unexpected message, not QMF2 event or query response."); - // FIXME aconway 2011-11-21: string constants - if (headers->getAsString(QMF_CONTENT) == EVENT) { //decode as list - std::string content = msg.getMessage().getFrames().getContent(); - Variant::List list; - amqp_0_10::ListCodec::decode(content, list); + // decode as list + string content = msg.getMessage().getFrames().getContent(); + Variant::List list; + amqp_0_10::ListCodec::decode(content, list); + + QPID_LOG(critical, "FIXME WiringReplicator message: " << list); + if (headers->getAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { // FIXME aconway 2011-11-18: should be iterating list? Variant::Map& map = list.front().asMap(); @@ -133,16 +137,14 @@ void WiringReplicator::route(Deliverable else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values); else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values); else if (match<EventBind>(schema)) doEventBind(values); + // FIXME aconway 2011-11-21: handle unbind & all other events. else if (match<EventSubscribe>(schema)) {} // Deliberately ignored. else throw(Exception(QPID_MSG("WiringReplicator received unexpected event, schema=" << schema))); } } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { - //decode as list - std::string content = msg.getMessage().getFrames().getContent(); - Variant::List list; - amqp_0_10::ListCodec::decode(content, list); + QPID_LOG(critical, "FIXME WiringReplicator response: " << list); for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { - std::string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME]; + string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME]; Variant::Map& values = i->asMap()[VALUES].asMap(); if (isReplicated(values[ARGUMENTS].asMap())) { framing::FieldTable args; @@ -162,7 +164,7 @@ void WiringReplicator::route(Deliverable } void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { - std::string name = values[QNAME].asString(); + string name = values[QNAME].asString(); if (values[DISP] == CREATED && isReplicated(values[ARGS].asMap())) { QPID_LOG(debug, "Creating replicated queue " << name); framing::FieldTable args; @@ -182,7 +184,7 @@ void WiringReplicator::doEventQueueDecla } void WiringReplicator::doEventQueueDelete(Variant::Map& values) { - std::string name = values[QNAME].asString(); + string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); if (queue && isReplicated(queue->getSettings())) { QPID_LOG(debug, "Deleting replicated queue " << name); @@ -194,10 +196,11 @@ void WiringReplicator::doEventQueueDelet } void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) { - if (values[DISP] == CREATED && isReplicated(values[ARGS].asMap())) { - std::string name = values[EXNAME].asString(); + Variant::Map argsMap(values[ARGS].asMap()); + if (values[DISP] == CREATED && isReplicated(argsMap)) { + string name = values[EXNAME].asString(); framing::FieldTable args; - amqp_0_10::translate(values[ARGS].asMap(), args); + amqp_0_10::translate(argsMap, args); QPID_LOG(debug, "Creating replicated exchange " << name); if (!broker.createExchange( name, @@ -213,11 +216,11 @@ void WiringReplicator::doEventExchangeDe } void WiringReplicator::doEventExchangeDelete(Variant::Map& values) { - std::string name = values[EXNAME].asString(); + string name = values[EXNAME].asString(); try { boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name); if (exchange && isReplicated(exchange->getArgs())) { - QPID_LOG(warning, "Deleting replicated exchange " << name); + QPID_LOG(debug, "Deleting replicated exchange " << name); broker.deleteExchange( name, values[USER].asString(), @@ -226,9 +229,22 @@ void WiringReplicator::doEventExchangeDe } catch (const framing::NotFoundException&) {} } -void WiringReplicator::doEventBind(Variant::Map&) { - QPID_LOG(error, "FIXME WiringReplicator: Not yet implemented - replicate bindings."); - // FIXME aconway 2011-11-18: only replicated binds of replicated q to replicated ex. +void WiringReplicator::doEventBind(Variant::Map& values) { + QPID_LOG(critical, "FIXME doEventBind " << values); + try { + boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(values[EXNAME].asString()); + boost::shared_ptr<Queue> queue = broker.getQueues().find(values[QNAME].asString()); + // We only replicated a binds for a replicated queue to replicated exchange. + if (isReplicated(exchange->getArgs()) && isReplicated(queue->getSettings())) { + framing::FieldTable args; + amqp_0_10::translate(args, values[ARGS].asMap()); + string key = values[KEY].asString(); + QPID_LOG(debug, "Replicated binding exchange=" << exchange->getName() + << " queue=" << queue->getName() + << " key=" << key); + exchange->bind(queue, key, &args); + } + } catch (const framing::NotFoundException&) {} // Ignore unreplicated queue or exchange. } void WiringReplicator::doResponseQueue(Variant::Map& values) { @@ -260,11 +276,12 @@ void WiringReplicator::doResponseExchang } } -void WiringReplicator::doResponseBind(Variant::Map& ) { - QPID_LOG(error, "FIXME WiringReplicator: Not yet implemented - catch-up replicate bindings."); +void WiringReplicator::doResponseBind(Variant::Map& values) { + QPID_LOG(critical, "FIXME doResponseBind " << values); + throw Exception("FIXME WiringReplicator: Not yet implemented - catch-up replicate bindings."); } -boost::shared_ptr<Exchange> WiringReplicator::create(const std::string& target, Broker& broker) +boost::shared_ptr<Exchange> WiringReplicator::create(const string& target, Broker& broker) { boost::shared_ptr<Exchange> exchange; if (isWiringReplicatorDestination(target)) { @@ -274,18 +291,18 @@ boost::shared_ptr<Exchange> WiringReplic return exchange; } -bool WiringReplicator::isWiringReplicatorDestination(const std::string& target) +bool WiringReplicator::isWiringReplicatorDestination(const string& target) { return target == QPID_WIRING_REPLICATOR; } -bool WiringReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const framing::FieldTable*) { return false; } -bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, const framing::FieldTable*) { return false; } -bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const, const framing::FieldTable* const) { return false; } +bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } +bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } +bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } -const std::string WiringReplicator::typeName(QPID_WIRING_REPLICATOR); +const string WiringReplicator::typeName(QPID_WIRING_REPLICATOR); -std::string WiringReplicator::getType() const +string WiringReplicator::getType() const { return typeName; } Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py?rev=1245474&r1=1245473&r2=1245474&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py Fri Feb 17 14:03:49 2012 @@ -88,16 +88,16 @@ class ShortTests(BrokerTest): # self.assert_browse(s, "q01", ["01", "04", "e01"]) # self.assert_browse(s, "q02", []) # wiring only # self.assert_missing(s,"q03") -# s.sender("e01").send(Message("e01")) # Verify bind -# self.assert_browse(s, "q02", ["e01"]) + s.sender("e01").send(Message("e01")) # Verify bind + self.assert_browse(s, "q02", ["e01"]) for a in ["q1", "q2", "e1"]: self.wait(s,a) # FIXME aconway 2011-11-18: replicate messages # self.assert_browse(s, "q1", ["1", "4", "e1"]) # self.assert_browse(s, "q2", []) # wiring only # self.assert_missing(s,"q3") -# s.sender("e1").send(Message("e1")) # Verify bind -# self.assert_browse(s, "q2", ["e1"]) + s.sender("e1").send(Message("e1")) # Verify bind + self.assert_browse(s, "q2", ["e1"]) if __name__ == "__main__": --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org