Author: aconway
Date: Fri Feb 17 14:03:49 2012
New Revision: 1245474

URL: http://svn.apache.org/viewvc?rev=1245474&view=rev
Log:
QPID-3603: Minor refactor.

Modified:
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1245474&r1=1245473&r2=1245474&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Feb 
17 14:03:49 2012
@@ -31,6 +31,9 @@
 #include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
 #include "qmf/org/apache/qpid/broker/EventSubscribe.h"
 
+namespace qpid {
+namespace ha {
+
 using qmf::org::apache::qpid::broker::EventBind;
 using qmf::org::apache::qpid::broker::EventExchangeDeclare;
 using qmf::org::apache::qpid::broker::EventExchangeDelete;
@@ -38,50 +41,49 @@ using qmf::org::apache::qpid::broker::Ev
 using qmf::org::apache::qpid::broker::EventQueueDelete;
 using qmf::org::apache::qpid::broker::EventSubscribe;
 
-namespace qpid {
-namespace ha {
-
+using std::string;
 using types::Variant;
 using namespace broker;
 
 namespace{
 
-const std::string QPID_REPLICATE("qpid.replicate");
-const std::string ALL("all");
-const std::string WIRING("wiring");
-
-const std::string CLASS_NAME("_class_name");
-const std::string PACKAGE_NAME("_package_name");
-const std::string VALUES("_values");
-const std::string EVENT("_event");
-const std::string SCHEMA_ID("_schema_id");
-const std::string QUERY_RESPONSE("_query_response");
-
-const std::string ARGUMENTS("arguments");
-const std::string QUEUE("queue");
-const std::string EXCHANGE("exchange");
-const std::string BIND("bind");
-const std::string ARGS("args");
-const std::string DURABLE("durable");
-const std::string QNAME("qName");
-const std::string AUTODEL("autoDel");
-const std::string ALTEX("altEx");
-const std::string USER("user");
-const std::string RHOST("rhost");
-const std::string EXTYPE("exType");
-const std::string EXNAME("exName");
-const std::string AUTODELETE("autoDelete");
-const std::string NAME("name");
-const std::string TYPE("type");
-const std::string DISP("disp");
-const std::string CREATED("created");
-
-
-const std::string QMF_OPCODE("qmf.opcode");
-const std::string QMF_CONTENT("qmf.content");
-const std::string QMF2("qmf2");
+const string QPID_REPLICATE("qpid.replicate");
+const string ALL("all");
+const string WIRING("wiring");
+
+const string CLASS_NAME("_class_name");
+const string PACKAGE_NAME("_package_name");
+const string VALUES("_values");
+const string EVENT("_event");
+const string SCHEMA_ID("_schema_id");
+const string QUERY_RESPONSE("_query_response");
+
+const string ARGUMENTS("arguments");
+const string QUEUE("queue");
+const string EXCHANGE("exchange");
+const string BIND("bind");
+const string ARGS("args");
+const string DURABLE("durable");
+const string QNAME("qName");
+const string AUTODEL("autoDel");
+const string ALTEX("altEx");
+const string USER("user");
+const string RHOST("rhost");
+const string EXTYPE("exType");
+const string EXNAME("exName");
+const string AUTODELETE("autoDelete");
+const string NAME("name");
+const string TYPE("type");
+const string DISP("disp");
+const string CREATED("created");
+const string KEY("key");
+
+
+const string QMF_OPCODE("qmf.opcode");
+const string QMF_CONTENT("qmf.content");
+const string QMF2("qmf2");
 
-const std::string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
 
 
 bool isQMFv2(const Message& message)
@@ -95,7 +97,7 @@ template <class T> bool match(Variant::M
     return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
 }
 
-bool isReplicated(const std::string& value) {
+bool isReplicated(const string& value) {
     return value == ALL || value == WIRING;
 }
 bool isReplicated(const framing::FieldTable& f) {
@@ -109,20 +111,22 @@ bool isReplicated(const Variant::Map& m)
 } // namespace
 
 
-WiringReplicator::WiringReplicator(const std::string& name, Broker& b) : 
Exchange(name), broker(b) {}
+WiringReplicator::WiringReplicator(const string& name, Broker& b) : 
Exchange(name), broker(b) {}
 
 WiringReplicator::~WiringReplicator() {}
 
-void WiringReplicator::route(Deliverable& msg, const std::string& /*key*/, 
const framing::FieldTable* headers) {
+void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const 
framing::FieldTable* headers) {
     try {
         // FIXME aconway 2011-11-21: outer error handling, e.g. for decoding 
error.
         if (!isQMFv2(msg.getMessage()) || !headers)
             throw Exception("Unexpected message, not QMF2 event or query 
response.");
-        // FIXME aconway 2011-11-21: string constants
-        if (headers->getAsString(QMF_CONTENT) == EVENT) { //decode as list
-            std::string content = msg.getMessage().getFrames().getContent();
-            Variant::List list;
-            amqp_0_10::ListCodec::decode(content, list);
+        // decode as list
+        string content = msg.getMessage().getFrames().getContent();
+        Variant::List list;
+        amqp_0_10::ListCodec::decode(content, list);
+
+        QPID_LOG(critical, "FIXME WiringReplicator message: " << list);
+        if (headers->getAsString(QMF_CONTENT) == EVENT) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); 
++i) {
                 // FIXME aconway 2011-11-18: should be iterating list?
                 Variant::Map& map = list.front().asMap();
@@ -133,16 +137,14 @@ void WiringReplicator::route(Deliverable
                 else if (match<EventExchangeDeclare>(schema)) 
doEventExchangeDeclare(values);
                 else if (match<EventExchangeDelete>(schema)) 
doEventExchangeDelete(values);
                 else if (match<EventBind>(schema)) doEventBind(values);
+                // FIXME aconway 2011-11-21: handle unbind & all other events.
                 else if (match<EventSubscribe>(schema)) {} // Deliberately 
ignored.
                 else throw(Exception(QPID_MSG("WiringReplicator received 
unexpected event, schema=" << schema)));
             }
         } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
-            //decode as list
-            std::string content = msg.getMessage().getFrames().getContent();
-            Variant::List list;
-            amqp_0_10::ListCodec::decode(content, list);
+            QPID_LOG(critical, "FIXME WiringReplicator response: " << list);
             for (Variant::List::iterator i = list.begin(); i != list.end(); 
++i) {
-                std::string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
+                string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
                 Variant::Map& values = i->asMap()[VALUES].asMap();
                 if (isReplicated(values[ARGUMENTS].asMap())) {
                     framing::FieldTable args;
@@ -162,7 +164,7 @@ void WiringReplicator::route(Deliverable
 }
 
 void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
-    std::string name = values[QNAME].asString();
+    string name = values[QNAME].asString();
     if (values[DISP] == CREATED && isReplicated(values[ARGS].asMap())) {
         QPID_LOG(debug, "Creating replicated queue " << name);
         framing::FieldTable args;
@@ -182,7 +184,7 @@ void WiringReplicator::doEventQueueDecla
 }
 
 void WiringReplicator::doEventQueueDelete(Variant::Map& values) {
-    std::string name = values[QNAME].asString();
+    string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
     if (queue && isReplicated(queue->getSettings())) {
         QPID_LOG(debug, "Deleting replicated queue " << name);
@@ -194,10 +196,11 @@ void WiringReplicator::doEventQueueDelet
 }
 
 void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
-    if (values[DISP] == CREATED && isReplicated(values[ARGS].asMap())) {
-        std::string name = values[EXNAME].asString();
+    Variant::Map argsMap(values[ARGS].asMap());
+    if (values[DISP] == CREATED && isReplicated(argsMap)) {
+        string name = values[EXNAME].asString();
         framing::FieldTable args;
-        amqp_0_10::translate(values[ARGS].asMap(), args);
+        amqp_0_10::translate(argsMap, args);
         QPID_LOG(debug, "Creating replicated exchange " << name);
         if (!broker.createExchange(
                 name,
@@ -213,11 +216,11 @@ void WiringReplicator::doEventExchangeDe
 }
 
 void WiringReplicator::doEventExchangeDelete(Variant::Map& values) {
-    std::string name = values[EXNAME].asString();
+    string name = values[EXNAME].asString();
     try {
         boost::shared_ptr<Exchange> exchange = broker.getExchanges().get(name);
         if (exchange && isReplicated(exchange->getArgs())) {
-            QPID_LOG(warning, "Deleting replicated exchange " << name);
+            QPID_LOG(debug, "Deleting replicated exchange " << name);
             broker.deleteExchange(
                 name,
                 values[USER].asString(),
@@ -226,9 +229,22 @@ void WiringReplicator::doEventExchangeDe
     } catch (const framing::NotFoundException&) {}
 }
 
-void WiringReplicator::doEventBind(Variant::Map&) {
-    QPID_LOG(error, "FIXME WiringReplicator: Not yet implemented - replicate 
bindings.");
-    // FIXME aconway 2011-11-18: only replicated binds of replicated q to 
replicated ex.
+void WiringReplicator::doEventBind(Variant::Map& values) {
+    QPID_LOG(critical, "FIXME doEventBind " << values);
+    try {
+        boost::shared_ptr<Exchange> exchange = 
broker.getExchanges().get(values[EXNAME].asString());
+        boost::shared_ptr<Queue> queue = 
broker.getQueues().find(values[QNAME].asString());
+        // We only replicated a binds for a replicated queue to replicated 
exchange.
+        if (isReplicated(exchange->getArgs()) && 
isReplicated(queue->getSettings())) {
+            framing::FieldTable args;
+            amqp_0_10::translate(args, values[ARGS].asMap());
+            string key = values[KEY].asString();
+            QPID_LOG(debug, "Replicated binding exchange=" << 
exchange->getName()
+                     << " queue=" << queue->getName()
+                     << " key=" << key);
+            exchange->bind(queue, key, &args);
+        }
+    } catch (const framing::NotFoundException&) {} // Ignore unreplicated 
queue or exchange.
 }
 
 void WiringReplicator::doResponseQueue(Variant::Map& values) {
@@ -260,11 +276,12 @@ void WiringReplicator::doResponseExchang
     }
 }
 
-void WiringReplicator::doResponseBind(Variant::Map& ) {
-    QPID_LOG(error, "FIXME WiringReplicator: Not yet implemented - catch-up 
replicate bindings.");
+void WiringReplicator::doResponseBind(Variant::Map& values) {
+    QPID_LOG(critical, "FIXME doResponseBind " << values);
+    throw Exception("FIXME WiringReplicator: Not yet implemented - catch-up 
replicate bindings.");
 }
 
-boost::shared_ptr<Exchange> WiringReplicator::create(const std::string& 
target, Broker& broker)
+boost::shared_ptr<Exchange> WiringReplicator::create(const string& target, 
Broker& broker)
 {
     boost::shared_ptr<Exchange> exchange;
     if (isWiringReplicatorDestination(target)) {
@@ -274,18 +291,18 @@ boost::shared_ptr<Exchange> WiringReplic
     return exchange;
 }
 
-bool WiringReplicator::isWiringReplicatorDestination(const std::string& target)
+bool WiringReplicator::isWiringReplicatorDestination(const string& target)
 {
     return target == QPID_WIRING_REPLICATOR;
 }
 
-bool WiringReplicator::bind(boost::shared_ptr<Queue>, const std::string&, 
const framing::FieldTable*) { return false; }
-bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, 
const framing::FieldTable*) { return false; }
-bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const std::string* 
const, const framing::FieldTable* const) { return false; }
+bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const 
framing::FieldTable*) { return false; }
+bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const 
framing::FieldTable*) { return false; }
+bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, 
const framing::FieldTable* const) { return false; }
 
-const std::string WiringReplicator::typeName(QPID_WIRING_REPLICATOR);
+const string WiringReplicator::typeName(QPID_WIRING_REPLICATOR);
 
-std::string WiringReplicator::getType() const
+string WiringReplicator::getType() const
 {
     return typeName;
 }

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py?rev=1245474&r1=1245473&r2=1245474&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py Fri Feb 17 
14:03:49 2012
@@ -88,16 +88,16 @@ class ShortTests(BrokerTest):
 #         self.assert_browse(s, "q01", ["01", "04", "e01"])
 #         self.assert_browse(s, "q02", []) # wiring only
 #         self.assert_missing(s,"q03")
-#         s.sender("e01").send(Message("e01")) # Verify bind
-#         self.assert_browse(s, "q02", ["e01"])
+        s.sender("e01").send(Message("e01")) # Verify bind
+        self.assert_browse(s, "q02", ["e01"])
 
         for a in ["q1", "q2", "e1"]: self.wait(s,a)
         # FIXME aconway 2011-11-18: replicate messages
 #         self.assert_browse(s, "q1", ["1", "4", "e1"])
 #         self.assert_browse(s, "q2", []) # wiring only
 #         self.assert_missing(s,"q3")
-#         s.sender("e1").send(Message("e1")) # Verify bind
-#         self.assert_browse(s, "q2", ["e1"])
+        s.sender("e1").send(Message("e1")) # Verify bind
+        self.assert_browse(s, "q2", ["e1"])
 
 
 if __name__ == "__main__":



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to