Author: aconway Date: Tue Dec 6 15:30:35 2011 New Revision: 1210983 URL: http://svn.apache.org/viewvc?rev=1210983&view=rev Log: QPID-3603: Set bridge sync parameter to 1.
Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1210983&r1=1210982&r2=1210983&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Dec 6 15:30:35 2011 @@ -116,7 +116,7 @@ bool DeliveryRecord::accept(TransactionC if (acquired) { queue->dequeue(ctxt, msg); } else if (isDelayedCompletion) { - //TODO: this is a nasty way to do this; change it + // FIXME aconway 2011-12-05: This should be done in HA code. msg.payload->getIngressCompletion().finishCompleter(); QPID_LOG(debug, "Completed " << msg.payload.get()); } Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1210983&r1=1210982&r2=1210983&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Tue Dec 6 15:30:35 2011 @@ -36,6 +36,7 @@ namespace { const std::string QPID_REPLICATOR_("qpid.replicator-"); const std::string TYPE_NAME("qpid.queue-replicator"); +const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); } namespace qpid { @@ -50,6 +51,7 @@ QueueReplicator::QueueReplicator(boost:: { // FIXME aconway 2011-11-24: consistent logging. QPID_LOG(debug, "HA: Replicating queue " << q->getName() << " " << q->getSettings()); + // Declare the replicator bridge. queue->getBroker()->getLinks().declare( link->getHost(), link->getPort(), false, // durable @@ -77,11 +79,11 @@ void QueueReplicator::initializeBridge(B framing::FieldTable settings; settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition()); + settings.setInt(QPID_SYNC_FREQUENCY, 1); qpid::framing::SequenceNumber oldest; if (queue->getOldest(oldest)) settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest); - - peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, settings); + peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 0/*acquire-pre-acquired*/, false, "", 0, settings); peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); QPID_LOG(debug, "HA: Backup activated bridge from queue " << args.i_src << " to " << args.i_dest); Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1210983&r1=1210982&r2=1210983&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Tue Dec 6 15:30:35 2011 @@ -169,12 +169,12 @@ void ReplicatingSubscription::enqueued(c // Called with lock held. void ReplicatingSubscription::generateDequeueEvent() { + QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " " << range); string buf(range.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); range.encode(buffer); range.clear(); buffer.reset(); - //generate event message boost::intrusive_ptr<Message> event = new Message(); AMQFrame method((MessageTransferBody(ProtocolVersion(), string(), 0, 0))); Modified: qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py?rev=1210983&r1=1210982&r2=1210983&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/tests/ha_tests.py Tue Dec 6 15:30:35 2011 @@ -106,6 +106,18 @@ class ShortTests(BrokerTest): verify(b, "1", p) verify(b, "2", p) + # Test a series of messages, enqueue and dequeue. + s = p.sender(queue("foo","all")) + msgs = [str(i) for i in range(10)] + for m in msgs: s.send(Message(m)) + self.assert_browse_retry(b, "foo", msgs) + self.assert_browse_retry(p, "foo", msgs) + r = p.receiver("foo") + for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content) + p.acknowledge() + self.assert_browse_retry(p, "foo", []) + self.assert_browse_retry(b, "foo", []) + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org