Author: aconway
Date: Fri Feb 17 14:05:13 2012
New Revision: 1245482

URL: http://svn.apache.org/viewvc?rev=1245482&view=rev
Log:
QPID-3603: Integrate ReplicatingSubscription into the HA code.

HaBroker registers the ConsumerFactory, QueueReplicator sets
appropriate arguments in consume command.

Modified:
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h
    qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/management-schema.xml
    qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1245482&r1=1245481&r2=1245482&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri 
Feb 17 14:05:13 2012
@@ -118,7 +118,7 @@ void SemanticState::consume(const string
     const ConsumerFactories::Factories& cf(
         session.getBroker().getConsumerFactories().get());
     ConsumerImpl::shared_ptr c;
-    for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != 
cf.end(); !c)
+    for (ConsumerFactories::Factories::const_iterator i = cf.begin(); i != 
cf.end() && !c; ++i)
         c = (*i)->create(this, name, queue, ackRequired, acquire, exclusive, 
tag,
                          resumeId, resumeTtl, arguments);
     if (!c)                     // Create plain consumer

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1245482&r1=1245481&r2=1245482&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.cpp Fri Feb 17 
14:05:13 2012
@@ -43,8 +43,8 @@ using types::Variant;
 using std::string;
 
 Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
-    // FIXME aconway 2011-11-24: identifying the primary. Only has 1 address.
-    if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack 
to identify primary.
+    // FIXME aconway 2011-11-24: identifying the primary.
+    if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary 
hack to identify primary.
         Url url(s.brokerUrl);
         QPID_LOG(info, "HA: Acting as backup to " << url);
         string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
@@ -59,12 +59,6 @@ Backup::Backup(broker::Broker& b, const 
         link = result.first;
         boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
         broker.getExchanges().registerExchange(wr);
-
-        // FIXME aconway 2011-11-25: using ReplicatingSubscription hangs the 
tests
-        // The tests pass with a plain subscription if we dont add the factory.
-//         broker.getConsumerFactories().add(
-//             boost::shared_ptr<ReplicatingSubscription::Factory>(
-//                 new ReplicatingSubscription::Factory()));
     }
 }
 

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h?rev=1245482&r1=1245481&r2=1245482&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/Backup.h Fri Feb 17 14:05:13 
2012
@@ -37,6 +37,9 @@ class Settings;
 
 /**
  * State associated with a backup broker. Manages connections to primary.
+ *
+ * THREAD SAFE: trivially because currently it only has a constructor.
+ * May need locking as the functionality grows.
  */
 class Backup
 {

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1245482&r1=1245481&r2=1245482&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Feb 17 
14:05:13 2012
@@ -21,6 +21,7 @@
 #include "Backup.h"
 #include "HaBroker.h"
 #include "Settings.h"
+#include "ReplicatingSubscription.h"
 #include "qpid/Exception.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/management/ManagementAgent.h"
@@ -61,6 +62,10 @@ HaBroker::HaBroker(broker::Broker& b, co
     QPID_LOG(notice, "HA: broker initialized, client-url=" << clientUrl
              << ", broker-url=" << brokerUrl);
     backup.reset(new Backup(broker, s));
+    // Register a factory for replicating subscriptions.
+    broker.getConsumerFactories().add(
+        boost::shared_ptr<ReplicatingSubscription::Factory>(
+            new ReplicatingSubscription::Factory()));
 }
 
 HaBroker::~HaBroker() {}

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1245482&r1=1245481&r2=1245482&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Feb 
17 14:05:13 2012
@@ -20,6 +20,7 @@
  */
 
 #include "QueueReplicator.h"
+#include "ReplicatingSubscription.h"
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Link.h"
@@ -63,14 +64,25 @@ QueueReplicator::QueueReplicator(boost::
 
 QueueReplicator::~QueueReplicator() {}
 
+// NB: This is called back ina broker connection thread when the
+// bridge is created.
 void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& 
sessionHandler) {
+    // No lock needed, no mutable member variables are used.
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& 
args(bridge.getArgs());
-    peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 
0, false, "", 0, framing::FieldTable());
+    framing::FieldTable settings;
+    // FIXME aconway 2011-11-28: string constants.
+    settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+    // FIXME aconway 2011-11-28: inconsistent use of _ vs. -
+    settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, 
queue->getPosition());
+    qpid::framing::SequenceNumber oldest;
+    if (queue->getOldest(oldest))
+        settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, 
oldest);
+
+    peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 
0, false, "", 0, settings);
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
     QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << 
args.i_dest);
-
 }
 
 
@@ -117,39 +129,13 @@ void QueueReplicator::route(Deliverable&
     }
 }
 
-bool QueueReplicator::isReplicatingLink(const std::string& name)
-{
-    return name.find(REPLICATOR) == 0;
-}
-
-bool QueueReplicator::initReplicationSettings(const std::string& target, 
QueueRegistry& queues, qpid::framing::FieldTable& settings)
-{
-    if (isReplicatingLink(target)) {
-        std::string queueName = target.substr(REPLICATOR.size());
-        boost::shared_ptr<Queue> queue = queues.find(queueName);
-        if (queue) {
-            settings.setInt("qpid.replicating-subscription", 1);
-            settings.setInt("qpid.high_sequence_number", queue->getPosition());
-            qpid::framing::SequenceNumber oldest;
-            if (queue->getOldest(oldest)) {
-                settings.setInt("qpid.low_sequence_number", oldest);
-            }
-        }
-        return true;
-    } else {
-        return false;
-    }
-}
-
 bool QueueReplicator::bind(boost::shared_ptr<Queue>, const std::string&, const 
qpid::framing::FieldTable*) { return false; }
 bool QueueReplicator::unbind(boost::shared_ptr<Queue>, const std::string&, 
const qpid::framing::FieldTable*) { return false; }
 bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* 
const, const qpid::framing::FieldTable* const) { return false; }
 
-const std::string QueueReplicator::typeName("queue-replicator");
+// FIXME aconway 2011-11-28: rationalise string constants.
+static const std::string TYPE_NAME("qpid.queue-replicator");
 
-std::string QueueReplicator::getType() const
-{
-    return typeName;
-}
+std::string QueueReplicator::getType() const { return TYPE_NAME; }
 
 }} // namespace qpid::broker

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1245482&r1=1245481&r2=1245482&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.h Fri Feb 17 
14:05:13 2012
@@ -38,7 +38,13 @@ class Deliverable;
 namespace ha {
 
 /**
- * Dummy exchange for processing replication messages
+ * Exchange created on a backup broker to replicate a queue on the primary.
+ *
+ * Puts replicated messages on the local queue, handles dequeue events.
+ * Creates a ReplicatingSubscription on the primary by passing special
+ * arguments to the consume command.
+ *
+ * THREAD SAFE.
  */
 class QueueReplicator : public broker::Exchange
 {
@@ -50,12 +56,11 @@ class QueueReplicator : public broker::E
     bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const 
framing::FieldTable*);
     void route(broker::Deliverable&, const std::string&, const 
framing::FieldTable*);
     bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, 
const framing::FieldTable* const);
-    static bool isReplicatingLink(const std::string&);
-    static bool initReplicationSettings(const std::string&, 
broker::QueueRegistry&, framing::FieldTable&);
-    static const std::string typeName;
+
   private:
     void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& 
sessionHandler);
 
+    sys::Mutex lock;
     boost::shared_ptr<broker::Queue> queue;
     boost::shared_ptr<broker::Link> link;
     framing::SequenceNumber current;

Modified: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1245482&r1=1245481&r2=1245482&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
Fri Feb 17 14:05:13 2012
@@ -30,9 +30,16 @@ namespace ha {
 
 using namespace framing;
 using namespace broker;
+using namespace std;
 
-const std::string DOLLAR("$");
-const std::string INTERNAL("_internal");
+// FIXME aconway 2011-11-28: review all arugment names, prefixes etc.
+// Do we want a common HA prefix?
+const string 
ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
+const string 
ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high_sequence_number");
+const string 
ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number");
+
+const string DOLLAR("$");
+const string INTERNAL("_internal");
 
 class ReplicationStateInitialiser
 {
@@ -61,7 +68,7 @@ class ReplicationStateInitialiser
     const qpid::framing::SequenceNumber end;
 };
 
-std::string mask(const std::string& in)
+string mask(const string& in)
 {
     return DOLLAR + in + INTERNAL;
 }
@@ -69,29 +76,30 @@ std::string mask(const std::string& in)
 boost::shared_ptr<broker::SemanticState::ConsumerImpl>
 ReplicatingSubscription::Factory::create(
     SemanticState* _parent,
-    const std::string& _name,
+    const string& _name,
     Queue::shared_ptr _queue,
     bool ack,
     bool _acquire,
     bool _exclusive,
-    const std::string& _tag,
-    const std::string& _resumeId,
+    const string& _tag,
+    const string& _resumeId,
     uint64_t _resumeTtl,
     const framing::FieldTable& _arguments
 ) {
+
     return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
         new ReplicatingSubscription(_parent, _name, _queue, ack, _acquire, 
_exclusive, _tag, _resumeId, _resumeTtl, _arguments));
 }
 
 ReplicatingSubscription::ReplicatingSubscription(
     SemanticState* _parent,
-    const std::string& _name,
+    const string& _name,
     Queue::shared_ptr _queue,
     bool ack,
     bool _acquire,
     bool _exclusive,
-    const std::string& _tag,
-    const std::string& _resumeId,
+    const string& _tag,
+    const string& _resumeId,
     uint64_t _resumeTtl,
     const framing::FieldTable& _arguments
 ) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, 
_resumeId, _resumeTtl, _arguments),
@@ -158,7 +166,7 @@ void ReplicatingSubscription::enqueued(c
 
 void ReplicatingSubscription::generateDequeueEvent()
 {
-    std::string buf(range.encodedSize(),'\0');
+    string buf(range.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
     range.encode(buffer);
     range.clear();
@@ -166,7 +174,7 @@ void ReplicatingSubscription::generateDe
 
     //generate event message
     boost::intrusive_ptr<Message> event = new Message();
-    AMQFrame method((MessageTransferBody(ProtocolVersion(), std::string(), 0, 
0)));
+    AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));
     AMQFrame header((AMQHeaderBody()));
     AMQFrame content((AMQContentBody()));
     content.castBody<AMQContentBody>()->decode(buffer, buffer.getSize());

Modified: 
qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1245482&r1=1245481&r2=1245482&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h 
Fri Feb 17 14:05:13 2012
@@ -38,7 +38,12 @@ class OwnershipToken;
 namespace ha {
 
 /**
- * Subscriber to a remote queue that replicates to a local queue.
+ * A susbcription that represents a backup replicating a queue.
+ *
+ * Runs on the primary. Delays completion of messages till the backup
+ * has acknowledged, informs backup of locally dequeued messages.
+ *
+ * THREAD UNSAFE: used only in broker connection thread.
  */
 class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
                                 public broker::QueueObserver
@@ -53,6 +58,11 @@ class ReplicatingSubscription : public b
             const framing::FieldTable& arguments);
     };
 
+    // Argument names for consume command.
+    static const std::string QPID_REPLICATING_SUBSCRIPTION;
+    static const std::string QPID_HIGH_SEQUENCE_NUMBER;
+    static const std::string QPID_LOW_SEQUENCE_NUMBER;
+
     ReplicatingSubscription(broker::SemanticState* parent,
                             const std::string& name, 
boost::shared_ptr<broker::Queue> ,
                             bool ack, bool acquire, bool exclusive, const 
std::string& tag,

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1245482&r1=1245481&r2=1245482&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Feb 
17 14:05:13 2012
@@ -446,6 +446,7 @@ void WiringReplicator::doResponseBind(Va
 }
 
 void WiringReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& 
queue) {
+    // FIXME aconway 2011-11-28: also need to remove these when queue is 
destroyed.
     if (replicateLevel(queue->getSettings()) == RL_ALL) {
         boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, 
link));
         broker.getExchanges().registerExchange(qr);
@@ -456,11 +457,6 @@ bool WiringReplicator::bind(boost::share
 bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const 
framing::FieldTable*) { return false; }
 bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, 
const framing::FieldTable* const) { return false; }
 
-const string WiringReplicator::typeName(QPID_WIRING_REPLICATOR);
-
-string WiringReplicator::getType() const
-{
-    return typeName;
-}
+string WiringReplicator::getType() const { return QPID_WIRING_REPLICATOR; }
 
 }} // namespace broker

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h?rev=1245482&r1=1245481&r2=1245482&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.h Fri Feb 
17 14:05:13 2012
@@ -38,8 +38,15 @@ class SessionHandler;
 namespace ha {
 
 /**
- * Pseudo-exchange for recreating local queues and/or exchanges on
- * receipt of QMF events indicating their creation on another node
+ * Replicate wiring on a backup broker.
+ *
+ * Implemented as an exchange that subscribes to receive QMF
+ * configuration events from the primary. It configures local queues
+ * exchanges and bindings to replicate the primary.
+ * It also creates QueueReplicators for newly replicated queues.
+ *
+ * THREAD SAFE: Has no mutable state.
+ *
  */
 class WiringReplicator : public broker::Exchange
 {
@@ -54,8 +61,6 @@ class WiringReplicator : public broker::
     void route(broker::Deliverable&, const std::string&, const 
framing::FieldTable*);
     bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, 
const framing::FieldTable* const);
 
-    static const std::string typeName;
-
   private:
     void initializeBridge(broker::Bridge&, broker::SessionHandler&);
     void doEventQueueDeclare(types::Variant::Map& values);
@@ -66,8 +71,6 @@ class WiringReplicator : public broker::
     void doResponseQueue(types::Variant::Map& values);
     void doResponseExchange(types::Variant::Map& values);
     void doResponseBind(types::Variant::Map& values);
-
-  private:
     void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
 
     broker::Broker& broker;

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/management-schema.xml
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/management-schema.xml?rev=1245482&r1=1245481&r2=1245482&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/management-schema.xml 
(original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/management-schema.xml Fri 
Feb 17 14:05:13 2012
@@ -21,7 +21,7 @@
 
   <!-- Monitor and control HA status of a broker. -->
   <class name="HaBroker">
-    <property name="status" type="sstr" desc="HA statu: PRIMARY, BACKUP, 
SOLO"/>
+    <property name="status" type="sstr" desc="HA status: PRIMARY, BACKUP, 
SOLO"/>
 
     <method name="setStatus" desc="Set HA status: PRIMARY, BACKUP, SOLO">
       <arg name="status" type="sstr" dir="I"/>

Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py?rev=1245482&r1=1245481&r2=1245482&view=diff
==============================================================================
--- qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py Fri Feb 17 
14:05:13 2012
@@ -89,7 +89,7 @@ class ShortTests(BrokerTest):
             self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
 
         # Create config, send messages before starting the backup, to test 
catch-up replication.
-        primary = self.ha_broker(name="primary")
+        primary = self.ha_broker(name="primary", broker_url="primary") # Temp 
hack to identify primary
         p = primary.connect().session()
         setup(p, "1")
         # Start the backup



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to