Author: aconway
Date: Fri Feb 17 14:03:59 2012
New Revision: 1245475

URL: http://svn.apache.org/viewvc?rev=1245475&view=rev
Log:
QPID-3603: Replicate bindings to backup brokers.

Modified:
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1245475&r1=1245474&r2=1245475&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp Fri Feb 17 
14:03:59 2012
@@ -37,70 +37,95 @@ namespace ha {
 using namespace framing;
 using namespace broker;
 using types::Variant;
+using std::string;
 
 namespace {
-const std::string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
+const string _WHAT("_what");
+const string _CLASS_NAME("_class_name");
+const string _PACKAGE_NAME("_package_name");
+const string _SCHEMA_ID("_schema_id");
+const string OBJECT("OBJECT");
+const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string QMF_DEFAULT_DIRECT("qmf.default.direct");
+const string QMF2("qmf2");
+const string QMF_OPCODE("qmf.opcode");
+const string _QUERY_REQUEST("_query_request");
+const string BROKER("broker");
+}
+
+void sendQuery(const string className, const string& queueName, 
SessionHandler& sessionHandler) {
+    framing::AMQP_ServerProxy peer(sessionHandler.out);
+    Variant::Map request;
+    request[_WHAT] = OBJECT;
+    Variant::Map schema;
+    schema[_CLASS_NAME] = className;
+    schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+    request[_SCHEMA_ID] = schema;
+
+    AMQFrame method((MessageTransferBody(ProtocolVersion(), 
QMF_DEFAULT_DIRECT, 0, 0)));
+    method.setBof(true);
+    method.setEof(false);
+    method.setBos(true);
+    method.setEos(true);
+    AMQHeaderBody headerBody;
+    MessageProperties* props = headerBody.get<MessageProperties>(true);
+    props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+    props->setAppId(QMF2);
+    props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
+    
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+    AMQFrame header(headerBody);
+    header.setBof(false);
+    header.setEof(false);
+    header.setBos(true);
+    header.setEos(true);
+    AMQContentBody data;
+    qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+    AMQFrame content(data);
+    content.setBof(false);
+    content.setEof(true);
+    content.setBos(true);
+    content.setEos(true);
+    sessionHandler.out->handle(method);
+    sessionHandler.out->handle(header);
+    sessionHandler.out->handle(content);
+}
+
+namespace {
+const string QMF_DEFAULT_TOPIC("qmf.default.topic");
+const string 
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
+const string QUEUE("queue");
+const string EXCHANGE("exchange");
+const string BINDING("binding");
 }
 
 // Initialize a bridge as a wiring replicator.
 void bridgeInitWiringReplicator(Bridge& bridge, SessionHandler& 
sessionHandler) {
     framing::AMQP_ServerProxy peer(sessionHandler.out);
-    std::string queueName = bridge.getQueueName();
+    string queueName = bridge.getQueueName();
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& 
args(bridge.getArgs());
 
     //declare and bind an event queue
     peer.getQueue().declare(queueName, "", false, false, true, true, 
FieldTable());
-    peer.getExchange().bind(queueName, "qmf.default.topic", 
"agent.ind.event.org_apache_qpid_broker.#", FieldTable());
+    peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, 
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
     //subscribe to the queue
     peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, 
FieldTable());
     peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
     peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
 
     //issue a query request for queues and another for exchanges using event 
queue as the reply-to address
-    for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable 
utility functions
-        Variant::Map request;
-        request["_what"] = "OBJECT";
-        Variant::Map schema;
-        schema["_class_name"] = (i == 0 ? "queue" : "exchange");
-        schema["_package_name"] = "org.apache.qpid.broker";
-        request["_schema_id"] = schema;
-
-        AMQFrame method((MessageTransferBody(ProtocolVersion(), 
"qmf.default.direct", 0, 0)));
-        method.setBof(true);
-        method.setEof(false);
-        method.setBos(true);
-        method.setEos(true);
-        AMQHeaderBody headerBody;
-        MessageProperties* props = headerBody.get<MessageProperties>(true);
-        props->setReplyTo(qpid::framing::ReplyTo("", queueName));
-        props->setAppId("qmf2");
-        props->getApplicationHeaders().setString("qmf.opcode", 
"_query_request");
-        
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
-        AMQFrame header(headerBody);
-        header.setBof(false);
-        header.setEof(false);
-        header.setBos(true);
-        header.setEos(true);
-        AMQContentBody data;
-        qpid::amqp_0_10::MapCodec::encode(request, data.getData());
-        AMQFrame content(data);
-        content.setBof(false);
-        content.setEof(true);
-        content.setBos(true);
-        content.setEos(true);
-        sessionHandler.out->handle(method);
-        sessionHandler.out->handle(header);
-        sessionHandler.out->handle(content);
-    }
+    sendQuery(QUEUE, queueName, sessionHandler);
+    sendQuery(EXCHANGE, queueName, sessionHandler);
+    sendQuery(BINDING, queueName, sessionHandler);
 }
 
 Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
     // Create a link to replicate wiring
-    if (s.brokerUrl != "dummy") {
+    if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack 
to identify primary.
         Url url(s.brokerUrl);
         QPID_LOG(info, "HA backup broker connecting to: " << url);
 
-        std::string protocol = url[0].protocol.empty() ? "tcp" : 
url[0].protocol;
+        string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
         broker.getLinks().declare( // Declare the link
             url[0].host, url[0].port, protocol,
             false,              // durable
@@ -111,7 +136,7 @@ Backup::Backup(broker::Broker& b, const 
             false,              // durable
             QPID_WIRING_REPLICATOR, // src
             QPID_WIRING_REPLICATOR, // dest
-            "x",                // key
+            "",                 // key
             false,              // isQueue
             false,              // isLocal
             "",                 // id/tag
@@ -121,8 +146,7 @@ Backup::Backup(broker::Broker& b, const 
             bridgeInitWiringReplicator
         );
     }
-    // FIXME aconway 2011-11-17: need to enhance the link code to
-    // handle discovery of the primary broker and fail-over correctly.
+    // FIXME aconway 2011-11-17: handle discovery of the primary broker and 
fail-over correctly.
 }
 
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1245475&r1=1245474&r2=1245475&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Feb 
17 14:03:59 2012
@@ -52,6 +52,7 @@ const string ALL("all");
 const string WIRING("wiring");
 
 const string CLASS_NAME("_class_name");
+const string OBJECT_NAME("_object_name");
 const string PACKAGE_NAME("_package_name");
 const string VALUES("_values");
 const string EVENT("_event");
@@ -59,10 +60,11 @@ const string SCHEMA_ID("_schema_id");
 const string QUERY_RESPONSE("_query_response");
 
 const string ARGUMENTS("arguments");
+const string ARGS("args");
 const string QUEUE("queue");
 const string EXCHANGE("exchange");
 const string BIND("bind");
-const string ARGS("args");
+const string BINDING("binding");
 const string DURABLE("durable");
 const string QNAME("qName");
 const string AUTODEL("autoDel");
@@ -116,19 +118,16 @@ WiringReplicator::WiringReplicator(const
 WiringReplicator::~WiringReplicator() {}
 
 void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const 
framing::FieldTable* headers) {
+    Variant::List list;
     try {
-        // FIXME aconway 2011-11-21: outer error handling, e.g. for decoding 
error.
         if (!isQMFv2(msg.getMessage()) || !headers)
             throw Exception("Unexpected message, not QMF2 event or query 
response.");
         // decode as list
         string content = msg.getMessage().getFrames().getContent();
-        Variant::List list;
         amqp_0_10::ListCodec::decode(content, list);
 
-        QPID_LOG(critical, "FIXME WiringReplicator message: " << list);
         if (headers->getAsString(QMF_CONTENT) == EVENT) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); 
++i) {
-                // FIXME aconway 2011-11-18: should be iterating list?
                 Variant::Map& map = list.front().asMap();
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
@@ -142,33 +141,32 @@ void WiringReplicator::route(Deliverable
                 else throw(Exception(QPID_MSG("WiringReplicator received 
unexpected event, schema=" << schema)));
             }
         } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
-            QPID_LOG(critical, "FIXME WiringReplicator response: " << list);
             for (Variant::List::iterator i = list.begin(); i != list.end(); 
++i) {
                 string type = i->asMap()[SCHEMA_ID].asMap()[CLASS_NAME];
                 Variant::Map& values = i->asMap()[VALUES].asMap();
-                if (isReplicated(values[ARGUMENTS].asMap())) {
-                    framing::FieldTable args;
-                    amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
-                    if      (type == QUEUE) doResponseQueue(values);
-                    else if (type == EXCHANGE) doResponseExchange(values);
-                    else if (type == BIND) doResponseBind(values);
-                    else throw Exception(QPID_MSG("Ignoring unexpected class: 
" << type));
-                }
+                framing::FieldTable args;
+                amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+                if      (type == QUEUE) doResponseQueue(values);
+                else if (type == EXCHANGE) doResponseExchange(values);
+                else if (type == BINDING) doResponseBind(values);
+                else throw Exception(QPID_MSG("Ignoring unexpected class: " << 
type));
             }
         } else {
-            QPID_LOG(warning, QPID_MSG("Ignoring QMFv2 message with headers: " 
<< *headers));
+            QPID_LOG(warning, QPID_MSG("Replicator: Ignoring QMFv2 message 
with headers: " << *headers));
         }
     } catch (const std::exception& e) {
-        QPID_LOG(warning, "Error replicating configuration: " << e.what());
+        QPID_LOG(warning, "Replicator: Error replicating configuration: " << 
e.what());
+        QPID_LOG(debug, "Replicator: Error processing: " << list);
     }
 }
 
 void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
     string name = values[QNAME].asString();
-    if (values[DISP] == CREATED && isReplicated(values[ARGS].asMap())) {
+    Variant::Map argsMap = values[ARGS].asMap();
+    if (values[DISP] == CREATED && isReplicated(argsMap)) {
         QPID_LOG(debug, "Creating replicated queue " << name);
         framing::FieldTable args;
-        amqp_0_10::translate(values[ARGS].asMap(), args);
+        amqp_0_10::translate(argsMap, args);
         if (!broker.createQueue(
                 name,
                 values[DURABLE].asBool(),
@@ -178,6 +176,7 @@ void WiringReplicator::doEventQueueDecla
                 args,
                 values[USER].asString(),
                 values[RHOST].asString()).second) {
+            // FIXME aconway 2011-11-22: should delete old queue and re-create 
from exchanges.
             QPID_LOG(warning, "Replicated queue " << name << " already 
exists");
         }
     }
@@ -201,7 +200,6 @@ void WiringReplicator::doEventExchangeDe
         string name = values[EXNAME].asString();
         framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
-        QPID_LOG(debug, "Creating replicated exchange " << name);
         if (!broker.createExchange(
                 name,
                 values[EXTYPE].asString(),
@@ -210,6 +208,8 @@ void WiringReplicator::doEventExchangeDe
                 args,
                 values[USER].asString(),
                 values[RHOST].asString()).second) {
+            // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
+            // and re-create from event. Likewise for queues.
             QPID_LOG(warning, "Replicated exchange " << name << " already 
exists");
         }
     }
@@ -230,14 +230,13 @@ void WiringReplicator::doEventExchangeDe
 }
 
 void WiringReplicator::doEventBind(Variant::Map& values) {
-    QPID_LOG(critical, "FIXME doEventBind " << values);
     try {
         boost::shared_ptr<Exchange> exchange = 
broker.getExchanges().get(values[EXNAME].asString());
         boost::shared_ptr<Queue> queue = 
broker.getQueues().find(values[QNAME].asString());
         // We only replicated a binds for a replicated queue to replicated 
exchange.
         if (isReplicated(exchange->getArgs()) && 
isReplicated(queue->getSettings())) {
             framing::FieldTable args;
-            amqp_0_10::translate(args, values[ARGS].asMap());
+            amqp_0_10::translate(values[ARGS].asMap(), args);
             string key = values[KEY].asString();
             QPID_LOG(debug, "Replicated binding exchange=" << 
exchange->getName()
                      << " queue=" << queue->getName()
@@ -248,6 +247,11 @@ void WiringReplicator::doEventBind(Varia
 }
 
 void WiringReplicator::doResponseQueue(Variant::Map& values) {
+    // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate 
replication
+    Variant::Map argsMap(values[ARGUMENTS].asMap());
+    if (!isReplicated(argsMap)) return;
+    framing::FieldTable args;
+    amqp_0_10::translate(argsMap, args);
     QPID_LOG(debug, "Creating replicated queue " << values[NAME].asString() << 
" (in catch-up)");
     if (!broker.createQueue(
             values[NAME].asString(),
@@ -258,11 +262,17 @@ void WiringReplicator::doResponseQueue(V
             args,
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/).second) {
+        // FIXME aconway 2011-11-22: Normal to find queue already
+        // exists if we're failing over.
         QPID_LOG(warning, "Replicated queue " << values[NAME] << " already 
exists (in catch-up)");
     }
 }
 
 void WiringReplicator::doResponseExchange(Variant::Map& values) {
+    Variant::Map argsMap(values[ARGUMENTS].asMap());
+    if (!isReplicated(argsMap)) return;
+    framing::FieldTable args;
+    amqp_0_10::translate(argsMap, args);
     QPID_LOG(debug, "Creating replicated exchange " << values[NAME].asString() 
<< " (in catch-up)");
     if (!broker.createExchange(
             values[NAME].asString(),
@@ -276,9 +286,54 @@ void WiringReplicator::doResponseExchang
     }
 }
 
+// FIXME aconway 2011-11-21: refactor to remove redundancy between do* 
functions.
+
+namespace {
+const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:");
+const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:");
+
+std::string getRefName(const std::string& prefix, const Variant& ref) {
+    Variant::Map map(ref.asMap());
+    Variant::Map::const_iterator i = map.find(OBJECT_NAME);
+    if (i == map.end())
+        throw Exception(QPID_MSG("Replicator: invalid object reference: " << 
ref));
+    const std::string name = i->second.asString();
+    if (name.compare(0, prefix.size(), prefix) != 0)
+        throw Exception(QPID_MSG("Replicator unexpected reference prefix: " << 
name));
+    std::string ret = name.substr(prefix.size());
+    return ret;
+}
+
+const std::string EXCHANGE_REF("exchangeRef");
+const std::string QUEUE_REF("queueRef");
+
+} // namespace
+
 void WiringReplicator::doResponseBind(Variant::Map& values) {
-    QPID_LOG(critical, "FIXME doResponseBind " << values);
-    throw Exception("FIXME WiringReplicator: Not yet implemented - catch-up 
replicate bindings.");
+    try {
+        std::string exName = getRefName(EXCHANGE_REF_PREFIX, 
values[EXCHANGE_REF]);
+        boost::shared_ptr<Exchange> exchange = 
broker.getExchanges().get(exName);
+        if (!exchange) return;
+
+        std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
+        boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+        if (!queue) return;
+
+        // We only replicated a bind for a replicated queue to replicated 
exchange.
+        // FIXME aconway 2011-11-22: do we always log binds between replicated 
ex/q
+        // or do we consider the bind arguments as well?
+        if (exchange && queue &&
+            isReplicated(exchange->getArgs()) && 
isReplicated(queue->getSettings()))
+        {
+            framing::FieldTable args;
+            amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
+            string key = values[KEY].asString();
+            QPID_LOG(debug, "Replicated binding exchange=" << 
exchange->getName()
+                     << " queue=" << queue->getName()
+                     << " key=" << key);
+            exchange->bind(queue, key, &args);
+        }
+    } catch (const framing::NotFoundException& e) {} // Ignore unreplicated 
queue or exchange.
 }
 
 boost::shared_ptr<Exchange> WiringReplicator::create(const string& target, 
Broker& broker)

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py?rev=1245475&r1=1245474&r2=1245475&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py Fri Feb 17 
14:03:59 2012
@@ -50,7 +50,7 @@ class ShortTests(BrokerTest):
     def wait(self, session, address):
         def check():
             try:
-                session.receiver(address)
+                session.sender(address)
                 return True
             except NotFound: return False
         assert retry(check), "Timed out waiting for %s"%(address)
@@ -67,37 +67,39 @@ class ShortTests(BrokerTest):
 
         # Create some wiring before starting the backup, to test catch-up
         primary = self.ha_broker(name="primary")
-        s = primary.connect().session()
-        s.sender(queue%("q1", "all")).send(Message("1"))
-        s.sender(queue%("q2", "wiring")).send(Message("2"))
-        s.sender(queue%("q3", "none")).send(Message("3"))
-        s.sender(exchange%("e1", "all", "e1", "q2")).send(Message("4"))
+        p = primary.connect().session()
+        p.sender(queue%("q1", "all")).send(Message("1"))
+        p.sender(queue%("q2", "wiring")).send(Message("2"))
+        p.sender(queue%("q3", "none")).send(Message("3"))
+        p.sender(exchange%("e1", "all", "e1", "q2")).send(Message("4"))
 
         # Create some after starting backup, test steady-state replication
         backup  = self.ha_broker(name="backup", broker_url=primary.host_port())
-        s.sender(queue%("q01", "all")).send(Message("01"))
-        s.sender(queue%("q02", "wiring")).send(Message("02"))
-        s.sender(queue%("q03", "none")).send(Message("03"))
-        s.sender(exchange%("e01", "all", "e01", "q02")).send(Message("04"))
+        b = backup.connect().session()
+        # FIXME aconway 2011-11-21: need to wait for backup to be ready to 
test event replication
+        for a in ["q1", "q2", "e1"]: self.wait(b,a)
+        p.sender(queue%("q11", "all")).send(Message("11"))
+        p.sender(queue%("q12", "wiring")).send(Message("12"))
+        p.sender(queue%("q13", "none")).send(Message("13"))
+        p.sender(exchange%("e11", "all", "e11", "q12")).send(Message("14"))
 
         # Verify replication
         # FIXME aconway 2011-11-18: We should kill primary here and fail over.
-        s = backup.connect().session()
-        for a in ["q01", "q02", "e01"]: self.wait(s,a)
+        for a in ["q11", "q12", "e11"]: self.wait(b,a)
         # FIXME aconway 2011-11-18: replicate messages
-#         self.assert_browse(s, "q01", ["01", "04", "e01"])
-#         self.assert_browse(s, "q02", []) # wiring only
-#         self.assert_missing(s,"q03")
-        s.sender("e01").send(Message("e01")) # Verify bind
-        self.assert_browse(s, "q02", ["e01"])
+#         self.assert_browse(b, "q11", ["11", "14", "e11"])
+#         self.assert_browse(b, "q12", []) # wiring only
+#         self.assert_missing(b,"q13")
+        b.sender("e11").send(Message("e11")) # Verify bind
+        self.assert_browse(b, "q12", ["e11"])
 
-        for a in ["q1", "q2", "e1"]: self.wait(s,a)
+        for a in ["q1", "q2", "e1"]: self.wait(b,a)
         # FIXME aconway 2011-11-18: replicate messages
-#         self.assert_browse(s, "q1", ["1", "4", "e1"])
-#         self.assert_browse(s, "q2", []) # wiring only
-#         self.assert_missing(s,"q3")
-        s.sender("e1").send(Message("e1")) # Verify bind
-        self.assert_browse(s, "q2", ["e1"])
+#         self.assert_browse(b, "q1", ["1", "4", "e1"])
+#         self.assert_browse(b, "q2", []) # wiring only
+#         self.assert_missing(b,"q3")
+        b.sender("e1").send(Message("e1")) # Verify bind
+        self.assert_browse(b, "q2", ["e1"])
 
 
 if __name__ == "__main__":



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to