Author: aconway
Date: Tue Dec  6 15:30:35 2011
New Revision: 1210983

URL: http://svn.apache.org/viewvc?rev=1210983&view=rev
Log:
QPID-3603: Set bridge sync parameter to 1.

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1210983&r1=1210982&r2=1210983&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp 
(original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Dec 
 6 15:30:35 2011
@@ -116,7 +116,7 @@ bool DeliveryRecord::accept(TransactionC
         if (acquired) {
             queue->dequeue(ctxt, msg);
         } else if (isDelayedCompletion) {
-            //TODO: this is a nasty way to do this; change it
+            // FIXME aconway 2011-12-05: This should be done in HA code.
             msg.payload->getIngressCompletion().finishCompleter();
             QPID_LOG(debug, "Completed " << msg.payload.get());
         }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1210983&r1=1210982&r2=1210983&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Tue Dec  6 
15:30:35 2011
@@ -36,6 +36,7 @@
 namespace {
 const std::string QPID_REPLICATOR_("qpid.replicator-");
 const std::string TYPE_NAME("qpid.queue-replicator");
+const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 }
 
 namespace qpid {
@@ -50,6 +51,7 @@ QueueReplicator::QueueReplicator(boost::
 {
     // FIXME aconway 2011-11-24: consistent logging.
     QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << 
q->getSettings());
+    // Declare the replicator bridge.
     queue->getBroker()->getLinks().declare(
         link->getHost(), link->getPort(),
         false,              // durable
@@ -77,11 +79,11 @@ void QueueReplicator::initializeBridge(B
     framing::FieldTable settings;
     settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
     settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, 
queue->getPosition());
+    settings.setInt(QPID_SYNC_FREQUENCY, 1);
     qpid::framing::SequenceNumber oldest;
     if (queue->getOldest(oldest))
         settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, 
oldest);
-
-    peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 
0, false, "", 0, settings);
+    peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 
0/*acquire-pre-acquired*/, false, "", 0, settings);
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
     QPID_LOG(debug, "HA: Backup activated bridge from queue " << args.i_src << 
" to " << args.i_dest);

Modified: 
qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1210983&r1=1210982&r2=1210983&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
(original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
Tue Dec  6 15:30:35 2011
@@ -169,12 +169,12 @@ void ReplicatingSubscription::enqueued(c
 // Called with lock held.
 void ReplicatingSubscription::generateDequeueEvent()
 {
+    QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " 
" << range);
     string buf(range.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
     range.encode(buffer);
     range.clear();
     buffer.reset();
-
     //generate event message
     boost::intrusive_ptr<Message> event = new Message();
     AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0)));

Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py?rev=1210983&r1=1210982&r2=1210983&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Tue Dec  6 15:30:35 
2011
@@ -106,6 +106,18 @@ class ShortTests(BrokerTest):
         verify(b, "1", p)
         verify(b, "2", p)
 
+        # Test a series of messages, enqueue and dequeue.
+        s = p.sender(queue("foo","all"))
+        msgs = [str(i) for i in range(10)]
+        for m in msgs: s.send(Message(m))
+        self.assert_browse_retry(b, "foo", msgs)
+        self.assert_browse_retry(p, "foo", msgs)
+        r = p.receiver("foo")
+        for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content)
+        p.acknowledge()
+        self.assert_browse_retry(p, "foo", [])
+        self.assert_browse_retry(b, "foo", [])
+
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)
     os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + 
sys.argv[1:])



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to