Author: aconway
Date: Fri Feb 17 14:04:08 2012
New Revision: 1245476

URL: http://svn.apache.org/viewvc?rev=1245476&view=rev
Log:
QPID-3603: Move wiring-replicator creation out of SemanticState::route.

Modified:
    qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am
    qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am?rev=1245476&r1=1245475&r2=1245476&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/Makefile.am Fri Feb 17 14:04:08 2012
@@ -602,8 +602,6 @@ libqpidbroker_la_SOURCES = \
   qpid/broker/PriorityQueue.cpp \
   qpid/broker/NameGenerator.cpp \
   qpid/broker/NameGenerator.h \
-  qpid/ha/WiringReplicator.h \
-  qpid/ha/WiringReplicator.cpp \
   qpid/broker/NullMessageStore.cpp \
   qpid/broker/NullMessageStore.h \
   qpid/broker/OwnershipToken.h \

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk?rev=1245476&r1=1245475&r2=1245476&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/ha.mk Fri Feb 17 14:04:08 2012
@@ -23,12 +23,14 @@
 dmoduleexec_LTLIBRARIES += ha.la
 
 ha_la_SOURCES =                                        \
-  qpid/ha/HaPlugin.cpp                         \
-  qpid/ha/HaBroker.cpp                         \
-  qpid/ha/HaBroker.h                           \
   qpid/ha/Backup.cpp                           \
   qpid/ha/Backup.h                             \
-  qpid/ha/Settings.h
+  qpid/ha/HaBroker.cpp                         \
+  qpid/ha/HaBroker.h                           \
+  qpid/ha/HaPlugin.cpp                         \
+  qpid/ha/Settings.h                           \
+  qpid/ha/WiringReplicator.cpp                 \
+  qpid/ha/WiringReplicator.h
 
 ha_la_LIBADD = libqpidbroker.la
 ha_la_LDFLAGS = $(PLUGINLDFLAGS)

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1245476&r1=1245475&r2=1245476&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri 
Feb 17 14:04:08 2012
@@ -480,7 +480,6 @@ void SemanticState::route(intrusive_ptr<
     std::string exchangeName = msg->getExchangeName();
     if (!cacheExchange || cacheExchange->getName() != exchangeName || 
cacheExchange->isDestroyed()) {
         cacheExchange = QueueReplicator::create(exchangeName, 
getSession().getBroker().getQueues());
-        if (!cacheExchange) cacheExchange = 
ha::WiringReplicator::create(exchangeName, getSession().getBroker());
         if (!cacheExchange) cacheExchange = 
session.getBroker().getExchanges().get(exchangeName);
     }
     cacheExchange->setProperties(msg);

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1245476&r1=1245475&r2=1245476&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp Fri Feb 17 
14:04:08 2012
@@ -20,11 +20,13 @@
  */
 #include "Backup.h"
 #include "Settings.h"
+#include "WiringReplicator.h"
 #include "qpid/Url.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/Link.h"
 #include "qpid/framing/AMQP_ServerProxy.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/FieldTable.h"
@@ -39,114 +41,24 @@ using namespace broker;
 using types::Variant;
 using std::string;
 
-namespace {
-const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
-const string _WHAT("_what");
-const string _CLASS_NAME("_class_name");
-const string _PACKAGE_NAME("_package_name");
-const string _SCHEMA_ID("_schema_id");
-const string OBJECT("OBJECT");
-const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
-const string QMF_DEFAULT_DIRECT("qmf.default.direct");
-const string QMF2("qmf2");
-const string QMF_OPCODE("qmf.opcode");
-const string _QUERY_REQUEST("_query_request");
-const string BROKER("broker");
-}
-
-void sendQuery(const string className, const string& queueName, 
SessionHandler& sessionHandler) {
-    framing::AMQP_ServerProxy peer(sessionHandler.out);
-    Variant::Map request;
-    request[_WHAT] = OBJECT;
-    Variant::Map schema;
-    schema[_CLASS_NAME] = className;
-    schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
-    request[_SCHEMA_ID] = schema;
-
-    AMQFrame method((MessageTransferBody(ProtocolVersion(), 
QMF_DEFAULT_DIRECT, 0, 0)));
-    method.setBof(true);
-    method.setEof(false);
-    method.setBos(true);
-    method.setEos(true);
-    AMQHeaderBody headerBody;
-    MessageProperties* props = headerBody.get<MessageProperties>(true);
-    props->setReplyTo(qpid::framing::ReplyTo("", queueName));
-    props->setAppId(QMF2);
-    props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
-    
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
-    AMQFrame header(headerBody);
-    header.setBof(false);
-    header.setEof(false);
-    header.setBos(true);
-    header.setEos(true);
-    AMQContentBody data;
-    qpid::amqp_0_10::MapCodec::encode(request, data.getData());
-    AMQFrame content(data);
-    content.setBof(false);
-    content.setEof(true);
-    content.setBos(true);
-    content.setEos(true);
-    sessionHandler.out->handle(method);
-    sessionHandler.out->handle(header);
-    sessionHandler.out->handle(content);
-}
-
-namespace {
-const string QMF_DEFAULT_TOPIC("qmf.default.topic");
-const string 
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
-const string QUEUE("queue");
-const string EXCHANGE("exchange");
-const string BINDING("binding");
-}
-
-// Initialize a bridge as a wiring replicator.
-void bridgeInitWiringReplicator(Bridge& bridge, SessionHandler& 
sessionHandler) {
-    framing::AMQP_ServerProxy peer(sessionHandler.out);
-    string queueName = bridge.getQueueName();
-    const qmf::org::apache::qpid::broker::ArgsLinkBridge& 
args(bridge.getArgs());
-
-    //declare and bind an event queue
-    peer.getQueue().declare(queueName, "", false, false, true, true, 
FieldTable());
-    peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, 
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
-    //subscribe to the queue
-    peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, 
FieldTable());
-    peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
-    peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
-
-    //issue a query request for queues and another for exchanges using event 
queue as the reply-to address
-    sendQuery(QUEUE, queueName, sessionHandler);
-    sendQuery(EXCHANGE, queueName, sessionHandler);
-    sendQuery(BINDING, queueName, sessionHandler);
-}
-
 Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
-    // Create a link to replicate wiring
     if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack 
to identify primary.
         Url url(s.brokerUrl);
         QPID_LOG(info, "HA backup broker connecting to: " << url);
-
         string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
-        broker.getLinks().declare( // Declare the link
+
+        // FIXME aconway 2011-11-17: TBD: link management, discovery, 
fail-over.
+        // Declare the link
+        std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
             url[0].host, url[0].port, protocol,
             false,              // durable
             s.mechanism, s.username, s.password);
-
-        broker.getLinks().declare( // Declare the bridge
-            url[0].host, url[0].port,
-            false,              // durable
-            QPID_WIRING_REPLICATOR, // src
-            QPID_WIRING_REPLICATOR, // dest
-            "",                 // key
-            false,              // isQueue
-            false,              // isLocal
-            "",                 // id/tag
-            "",                 // excludes
-            false,              // dynamic
-            0,                  // sync?
-            bridgeInitWiringReplicator
-        );
+        assert(result.second);  // FIXME aconway 2011-11-23: error handling
+        link = result.first;
+        boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
+        broker.getExchanges().registerExchange(wr);
+        wr->initialize();       // Must be called after registering exchange.
     }
-    // FIXME aconway 2011-11-17: handle discovery of the primary broker and 
fail-over correctly.
 }
 
 }} // namespace qpid::ha

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h?rev=1245476&r1=1245475&r2=1245476&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h Fri Feb 17 14:04:08 
2012
@@ -24,10 +24,12 @@
 
 #include "Settings.h"
 #include "qpid/Url.h"
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
 namespace broker {
 class Broker;
+class Link;
 }
 
 namespace ha {
@@ -44,6 +46,7 @@ class Backup
   private:
     broker::Broker& broker;
     Settings settings;
+    boost::shared_ptr<broker::Link> link;
 };
 }} // namespace qpid::ha
 

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1245476&r1=1245475&r2=1245476&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Feb 
17 14:04:08 2012
@@ -21,9 +21,13 @@
 #include "WiringReplicator.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/FieldTable.h"
 #include "qpid/log/Statement.h"
 #include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/SessionHandler.h"
 #include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/MessageTransferBody.h"
 #include "qmf/org/apache/qpid/broker/EventBind.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
 #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
@@ -40,62 +44,69 @@ using qmf::org::apache::qpid::broker::Ev
 using qmf::org::apache::qpid::broker::EventQueueDeclare;
 using qmf::org::apache::qpid::broker::EventQueueDelete;
 using qmf::org::apache::qpid::broker::EventSubscribe;
-
+using namespace framing;
 using std::string;
 using types::Variant;
 using namespace broker;
 
-namespace{
+namespace {
 
+const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
 const string QPID_REPLICATE("qpid.replicate");
-const string ALL("all");
-const string WIRING("wiring");
 
 const string CLASS_NAME("_class_name");
+const string EVENT("_event");
 const string OBJECT_NAME("_object_name");
 const string PACKAGE_NAME("_package_name");
-const string VALUES("_values");
-const string EVENT("_event");
-const string SCHEMA_ID("_schema_id");
 const string QUERY_RESPONSE("_query_response");
+const string SCHEMA_ID("_schema_id");
+const string VALUES("_values");
 
-const string ARGUMENTS("arguments");
+const string ALL("all");
+const string ALTEX("altEx");
 const string ARGS("args");
-const string QUEUE("queue");
-const string EXCHANGE("exchange");
+const string ARGUMENTS("arguments");
+const string AUTODEL("autoDel");
+const string AUTODELETE("autoDelete");
 const string BIND("bind");
 const string BINDING("binding");
+const string CREATED("created");
+const string DISP("disp");
 const string DURABLE("durable");
-const string QNAME("qName");
-const string AUTODEL("autoDel");
-const string ALTEX("altEx");
-const string USER("user");
-const string RHOST("rhost");
-const string EXTYPE("exType");
+const string EXCHANGE("exchange");
 const string EXNAME("exName");
-const string AUTODELETE("autoDelete");
+const string EXTYPE("exType");
+const string KEY("key");
 const string NAME("name");
+const string QNAME("qName");
+const string QUEUE("queue");
+const string RHOST("rhost");
 const string TYPE("type");
-const string DISP("disp");
-const string CREATED("created");
-const string KEY("key");
-
+const string USER("user");
+const string WIRING("wiring");
 
-const string QMF_OPCODE("qmf.opcode");
-const string QMF_CONTENT("qmf.content");
+const string 
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
 const string QMF2("qmf2");
+const string QMF_CONTENT("qmf.content");
+const string QMF_DEFAULT_TOPIC("qmf.default.topic");
+const string QMF_OPCODE("qmf.opcode");
 
-const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
-
+const string _WHAT("_what");
+const string _CLASS_NAME("_class_name");
+const string _PACKAGE_NAME("_package_name");
+const string _SCHEMA_ID("_schema_id");
+const string OBJECT("OBJECT");
+const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string QMF_DEFAULT_DIRECT("qmf.default.direct");
+const string _QUERY_REQUEST("_query_request");
+const string BROKER("broker");
 
-bool isQMFv2(const Message& message)
-{
+bool isQMFv2(const Message& message) {
     const framing::MessageProperties* props = 
message.getProperties<framing::MessageProperties>();
     return props && props->getAppId() == QMF2;
 }
 
-template <class T> bool match(Variant::Map& schema)
-{
+template <class T> bool match(Variant::Map& schema) {
     return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
 }
 
@@ -110,13 +121,90 @@ bool isReplicated(const Variant::Map& m)
     return i != m.end() && isReplicated(i->second.asString());
 }
 
+void sendQuery(const string className, const string& queueName, 
SessionHandler& sessionHandler) {
+    framing::AMQP_ServerProxy peer(sessionHandler.out);
+    Variant::Map request;
+    request[_WHAT] = OBJECT;
+    Variant::Map schema;
+    schema[_CLASS_NAME] = className;
+    schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+    request[_SCHEMA_ID] = schema;
+
+    AMQFrame method((MessageTransferBody(ProtocolVersion(), 
QMF_DEFAULT_DIRECT, 0, 0)));
+    method.setBof(true);
+    method.setEof(false);
+    method.setBos(true);
+    method.setEos(true);
+    AMQHeaderBody headerBody;
+    MessageProperties* props = headerBody.get<MessageProperties>(true);
+    props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+    props->setAppId(QMF2);
+    props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
+    
headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+    AMQFrame header(headerBody);
+    header.setBof(false);
+    header.setEof(false);
+    header.setBos(true);
+    header.setEos(true);
+    AMQContentBody data;
+    qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+    AMQFrame content(data);
+    content.setBof(false);
+    content.setEof(true);
+    content.setBos(true);
+    content.setEos(true);
+    sessionHandler.out->handle(method);
+    sessionHandler.out->handle(header);
+    sessionHandler.out->handle(content);
+}
 } // namespace
 
-
-WiringReplicator::WiringReplicator(const string& name, Broker& b) : 
Exchange(name), broker(b) {}
-
 WiringReplicator::~WiringReplicator() {}
 
+WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l)
+    : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
+{}
+
+// We need to split out the initialization so that the WiringReplicator
+// can be registered as an exchange before starting the bridge.
+void WiringReplicator::initialize() {
+    assert(link->getBroker());
+    broker.getLinks().declare(
+        link->getHost(), link->getPort(),
+        false,              // durable
+        QPID_WIRING_REPLICATOR, // src
+        QPID_WIRING_REPLICATOR, // dest
+        "",                 // key
+        false,              // isQueue
+        false,              // isLocal
+        "",                 // id/tag
+        "",                 // excludes
+        false,              // dynamic
+        0,                  // sync?
+        boost::bind(&WiringReplicator::initializeBridge, this, _1, _2)
+    );
+}
+
+// This is called in the connection IO thread when the bridge is started.
+void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& 
sessionHandler) {
+    framing::AMQP_ServerProxy peer(sessionHandler.out);
+    string queueName = bridge.getQueueName();
+    const qmf::org::apache::qpid::broker::ArgsLinkBridge& 
args(bridge.getArgs());
+
+    //declare and bind an event queue
+    peer.getQueue().declare(queueName, "", false, false, true, true, 
FieldTable());
+    peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, 
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
+    //subscribe to the queue
+    peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, 
FieldTable());
+    peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+    peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+    //issue a query request for queues and another for exchanges using event 
queue as the reply-to address
+    sendQuery(QUEUE, queueName, sessionHandler);
+    sendQuery(EXCHANGE, queueName, sessionHandler);
+    sendQuery(BINDING, queueName, sessionHandler);
+}
+
 void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const 
framing::FieldTable* headers) {
     Variant::List list;
     try {
@@ -176,7 +264,10 @@ void WiringReplicator::doEventQueueDecla
                 args,
                 values[USER].asString(),
                 values[RHOST].asString()).second) {
-            // FIXME aconway 2011-11-22: should delete old queue and re-create 
from exchanges.
+            // FIXME aconway 2011-11-22: should delete old queue and
+            // re-create from event.
+            // Events are always up to date, whereas responses may be
+            // out of date.
             QPID_LOG(warning, "Replicated queue " << name << " already 
exists");
         }
     }
@@ -209,7 +300,7 @@ void WiringReplicator::doEventExchangeDe
                 values[USER].asString(),
                 values[RHOST].asString()).second) {
             // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
-            // and re-create from event. Likewise for queues.
+            // and re-create from event. See comment in doEventQueueDeclare.
             QPID_LOG(warning, "Replicated exchange " << name << " already 
exists");
         }
     }
@@ -286,8 +377,6 @@ void WiringReplicator::doResponseExchang
     }
 }
 
-// FIXME aconway 2011-11-21: refactor to remove redundancy between do* 
functions.
-
 namespace {
 const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:");
 const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:");
@@ -299,7 +388,7 @@ std::string getRefName(const std::string
         throw Exception(QPID_MSG("Replicator: invalid object reference: " << 
ref));
     const std::string name = i->second.asString();
     if (name.compare(0, prefix.size(), prefix) != 0)
-        throw Exception(QPID_MSG("Replicator unexpected reference prefix: " << 
name));
+        throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " 
<< name));
     std::string ret = name.substr(prefix.size());
     return ret;
 }
@@ -336,21 +425,6 @@ void WiringReplicator::doResponseBind(Va
     } catch (const framing::NotFoundException& e) {} // Ignore unreplicated 
queue or exchange.
 }
 
-boost::shared_ptr<Exchange> WiringReplicator::create(const string& target, 
Broker& broker)
-{
-    boost::shared_ptr<Exchange> exchange;
-    if (isWiringReplicatorDestination(target)) {
-        //TODO: need to cache the exchange
-        exchange.reset(new WiringReplicator(target, broker));
-    }
-    return exchange;
-}
-
-bool WiringReplicator::isWiringReplicatorDestination(const string& target)
-{
-    return target == QPID_WIRING_REPLICATOR;
-}
-
 bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const 
framing::FieldTable*) { return false; }
 bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const 
framing::FieldTable*) { return false; }
 bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, 
const framing::FieldTable* const) { return false; }

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h?rev=1245476&r1=1245475&r2=1245476&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h Fri Feb 
17 14:04:08 2012
@@ -24,13 +24,15 @@
 
 #include "qpid/broker/Exchange.h"
 #include "qpid/types/Variant.h"
-
-// FIXME aconway 2011-11-17: relocate to ../ha
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
 
 namespace broker {
 class Broker;
+class Link;
+class Bridge;
+class SessionHandler;
 }
 
 namespace ha {
@@ -42,19 +44,23 @@ namespace ha {
 class WiringReplicator : public broker::Exchange
 {
   public:
-    WiringReplicator(const std::string&, broker::Broker&);
+    WiringReplicator(const boost::shared_ptr<broker::Link>&);
     ~WiringReplicator();
     std::string getType() const;
+
+    // Call this after the WiringReplicator has been registered as an exchange.
+    void initialize();
+
+    // Exchange methods
     bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const 
framing::FieldTable*);
     bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const 
framing::FieldTable*);
     void route(broker::Deliverable&, const std::string&, const 
framing::FieldTable*);
     bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, 
const framing::FieldTable* const);
 
-    static bool isWiringReplicatorDestination(const std::string&);
-    static boost::shared_ptr<broker::Exchange> create(const std::string&, 
broker::Broker&);
     static const std::string typeName;
-  private:
 
+  private:
+    void initializeBridge(broker::Bridge&, broker::SessionHandler&);
     void doEventQueueDeclare(types::Variant::Map& values);
     void doEventQueueDelete(types::Variant::Map& values);
     void doEventExchangeDeclare(types::Variant::Map& values);
@@ -65,7 +71,10 @@ class WiringReplicator : public broker::
     void doResponseBind(types::Variant::Map& values);
 
   private:
+    void startQueueReplicator(const std::string& name);
+
     broker::Broker& broker;
+    boost::shared_ptr<broker::Link> link;
 };
 }} // namespace qpid::broker
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to