Author: aconway Date: Fri Feb 17 14:06:34 2012 New Revision: 1245490 URL: http://svn.apache.org/viewvc?rev=1245490&view=rev Log: QPID-3603: Fix QueueReplicator subscription parameters.
- Queue::destroyed cleans up observers. - Clean up log messages, comments, some variable names. - Improvements to brokertest.py Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Queue.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h qpid/branches/qpid-3603-7/qpid/cpp/src/tests/brokertest.py qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=1245490&r1=1245489&r2=1245490&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Feb 17 14:06:34 2012 @@ -118,7 +118,8 @@ bool DeliveryRecord::accept(TransactionC } else if (isDelayedCompletion) { // FIXME aconway 2011-12-05: This should be done in HA code. msg.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(debug, "Completed " << msg.payload.get()); + QPID_LOG(debug, "Completed " << msg.queue->getName() + << "[" << msg.position << "]"); } setEnded(); QPID_LOG(debug, "Accepted " << id); Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Queue.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1245490&r1=1245489&r2=1245490&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Queue.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/broker/Queue.cpp Fri Feb 17 14:06:34 2012 @@ -1176,6 +1176,10 @@ void Queue::destroyed() } if (autoDeleteTask) autoDeleteTask = boost::intrusive_ptr<TimerTask>(); notifyDeleted(); + { + Mutex::ScopedLock locker(messageLock); + observers.clear(); + } } void Queue::notifyDeleted() Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1245490&r1=1245489&r2=1245490&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Feb 17 14:06:34 2012 @@ -83,7 +83,7 @@ void QueueReplicator::initializeBridge(B qpid::framing::SequenceNumber oldest; if (queue->getOldest(oldest)) settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, oldest); - peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 0/*acquire-pre-acquired*/, false, "", 0, settings); + peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false, "", 0, settings); peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest); @@ -98,6 +98,7 @@ void QueueReplicator::route(Deliverable& qpid::framing::SequenceSet latest; latest.decode(buffer); + QPID_LOG(trace, "HA: Backup received dequeues: " << latest); //TODO: should be able to optimise the following for (qpid::framing::SequenceSet::iterator i = latest.begin(); i != latest.end(); i++) { if (current < *i) { Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1245490&r1=1245489&r2=1245490&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Feb 17 14:06:34 2012 @@ -48,23 +48,26 @@ class ReplicationStateInitialiser ReplicationStateInitialiser( qpid::framing::SequenceSet& r, const qpid::framing::SequenceNumber& s, - const qpid::framing::SequenceNumber& e) : results(r), start(s), end(e) + const qpid::framing::SequenceNumber& e) : dequeues(r), start(s), end(e) { - results.add(start, end); + dequeues.add(start, end); } void operator()(const QueuedMessage& message) { if (message.position < start) { //replica does not have a message that should still be on the queue QPID_LOG(warning, "HA: Replica missing message " << QueuePos(message)); + // FIXME aconway 2011-12-09: we want the replica to dump + // its messages and start from scratch in this case. } else if (message.position >= start && message.position <= end) { - //i.e. message is within the intial range and has not been dequeued, so remove it from the results - results.remove(message.position); + //i.e. message is within the intial range and has not been dequeued, + //so remove it from the dequeues + dequeues.remove(message.position); } //else message has not been seen by replica yet so can be ignored here } private: - qpid::framing::SequenceSet& results; + qpid::framing::SequenceSet& dequeues; const qpid::framing::SequenceNumber start; const qpid::framing::SequenceNumber end; }; @@ -94,6 +97,7 @@ ReplicatingSubscription::Factory::create rs.reset(new ReplicatingSubscription( parent, name, queue, ack, false, exclusive, tag, resumeId, resumeTtl, arguments)); + // FIXME aconway 2011-12-08: need to removeObserver also. queue->addObserver(rs); } return rs; @@ -115,6 +119,12 @@ ReplicatingSubscription::ReplicatingSubs events(new Queue(mask(name))), consumer(new DelegatingConsumer(*this)) { + // FIXME aconway 2011-12-09: Here we take advantage of existing + // messages on the backup queue to reduce replication + // effort. However if the backup queue is inconsistent with being + // a backup of the primary queue, then we want to issue a warning + // and tell the backup to dump its messages and start replicating + // from scratch. QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << queue->getName()); if (arguments.isSet(QPID_HIGH_SEQUENCE_NUMBER)) { qpid::framing::SequenceNumber hwm = arguments.getAsInt(QPID_HIGH_SEQUENCE_NUMBER); @@ -127,19 +137,22 @@ ReplicatingSubscription::ReplicatingSubs qpid::framing::SequenceNumber oldest; if (queue->getOldest(oldest)) { if (oldest >= hwm) { - range.add(lwm, --oldest); + dequeues.add(lwm, --oldest); } else if (oldest >= lwm) { - ReplicationStateInitialiser initialiser(range, lwm, hwm); + ReplicationStateInitialiser initialiser(dequeues, lwm, hwm); queue->eachMessage(initialiser); - } else { //i.e. have older message on master than is reported to exist on replica - QPID_LOG(warning, "HA: Replica missing message on master"); + } else { //i.e. older message on master than is reported to exist on replica + // FIXME aconway 2011-12-09: dump and start from scratch? + QPID_LOG(warning, "HA: Replica missing message on primary"); } } else { //local queue (i.e. master) is empty - range.add(lwm, queue->getPosition()); + dequeues.add(lwm, queue->getPosition()); + // FIXME aconway 2011-12-09: if hwm > + // queue->getPosition(), dump and start from scratch? } QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << ": " - << range << " (lwm=" << lwm << ", hwm=" << hwm + << dequeues << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << queue->getPosition() << ")"); //set position of 'cursor' position = hwm; @@ -162,7 +175,7 @@ ReplicatingSubscription::~ReplicatingSub //under the message lock in the queue void ReplicatingSubscription::enqueued(const QueuedMessage& m) { - QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m)); + QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m) << " on " << getName()); //delay completion m.payload->getIngressCompletion().startCompleter(); } @@ -170,11 +183,11 @@ void ReplicatingSubscription::enqueued(c // Called with lock held. void ReplicatingSubscription::generateDequeueEvent() { - QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " " << range); - string buf(range.encodedSize(),'\0'); + QPID_LOG(trace, "HA: Sending dequeue event " << getQueue()->getName() << " " << dequeues << " on " << getName()); + string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); - range.encode(buffer); - range.clear(); + dequeues.encode(buffer); + dequeues.clear(); buffer.reset(); //generate event message boost::intrusive_ptr<Message> event = new Message(); @@ -199,24 +212,20 @@ void ReplicatingSubscription::generateDe events->deliver(event); } -// FIXME aconway 2011-12-02: is it safe to defer dequues to doDispatch() like this? -// If a queue is drained with no new messages coming on -// will the messages be dequeued on the backup? - -//called after the message has been removed from the deque and under -//the message lock in the queue +// Called after the message has been removed from the deque and under +// the message lock in the queue. void ReplicatingSubscription::dequeued(const QueuedMessage& m) { { sys::Mutex::ScopedLock l(lock); - range.add(m.position); - // FIXME aconway 2011-11-29: q[pos] logging - QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) << "; subscription is at " << position); + dequeues.add(m.position); + QPID_LOG(trace, "HA: Added " << QueuePos(m) + << " to dequeue event; subscription at " << position); } - notify(); + notify(); // Ensure a call to doDispatch if (m.position > position) { m.payload->getIngressCompletion().finishCompleter(); - QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early due to dequeue"); + QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early, dequeued."); } } @@ -224,7 +233,7 @@ bool ReplicatingSubscription::doDispatch { { sys::Mutex::ScopedLock l(lock); - if (!range.empty()) { + if (!dequeues.empty()) { generateDequeueEvent(); } } Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1245490&r1=1245489&r2=1245490&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Fri Feb 17 14:06:34 2012 @@ -86,7 +86,7 @@ class ReplicatingSubscription : public b private: boost::shared_ptr<broker::Queue> events; boost::shared_ptr<broker::Consumer> consumer; - qpid::framing::SequenceSet range; + qpid::framing::SequenceSet dequeues; void generateDequeueEvent(); class DelegatingConsumer : public Consumer Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/tests/brokertest.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/tests/brokertest.py?rev=1245490&r1=1245489&r2=1245490&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/tests/brokertest.py (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/tests/brokertest.py Fri Feb 17 14:06:34 2012 @@ -500,30 +500,30 @@ class BrokerTest(TestCase): cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd) return cluster - def browse(self, session, queue, timeout=0): + def browse(self, session, queue, timeout=0, transform=lambda m: m.content): """Return a list with the contents of each message on queue.""" r = session.receiver("%s;{mode:browse}"%(queue)) r.capacity = 100 try: contents = [] try: - while True: contents.append(r.fetch(timeout=timeout).content) + while True: contents.append(transform(r.fetch(timeout=timeout))) except messaging.Empty: pass finally: r.close() return contents - def assert_browse(self, session, queue, expect_contents, timeout=0): + def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda d:m.content): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" - actual_contents = self.browse(session, queue, timeout) + actual_contents = self.browse(session, queue, timeout, transform=transform) self.assertEqual(expect_contents, actual_contents) - def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01): + def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content): """Wait up to timeout for contents of queue to match expect_contents""" - def test(): return self.browse(session, queue, 0) == expect_contents + test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents retry(test, timeout, delay) - self.assertEqual(expect_contents, self.browse(session, queue, 0)) + self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform)) def join(thread, timeout=10): thread.join(timeout) Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py?rev=1245490&r1=1245489&r2=1245490&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py Fri Feb 17 14:06:34 2012 @@ -118,9 +118,12 @@ class ShortTests(BrokerTest): self.assert_browse_retry(p, "foo", []) self.assert_browse_retry(b, "foo", []) + def qpid_replicate(self, value="all"): + return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value + def test_sync(self): def queue(name, replicate): - return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate) + return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate)) primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary p = primary.connect().session() s = p.sender(queue("q","all")) @@ -134,12 +137,47 @@ class ShortTests(BrokerTest): s.sync() msgs = [str(i) for i in range(30)] - b = backup1.connect().session() - self.assert_browse_retry(b, "q", msgs) - - b = backup2.connect().session() + self.assert_browse_retry(backup1.connect().session(), "q", msgs) self.assert_browse_retry(backup2.connect().session(), "q", msgs) + def test_send_receive(self): + # FIXME aconway 2011-12-09: test with concurrent senders/receivers. + debug = ["-t"] # FIXME aconway 2011-12-08: + primary = self.ha_broker(name="primary", broker_url="primary", args=debug) + backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port(), args=debug) + backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port(), args=debug) + sender = self.popen( + ["qpid-send", + "--broker", primary.host_port(), + "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")), + "--messages=1000", # FIXME aconway 2011-12-09: + "--content-string=x" + ]) + receiver = self.popen( + ["qpid-receive", + "--broker", primary.host_port(), + "--address", "q;{create:always,%s}"%(self.qpid_replicate("all")), + "--messages=990", # FIXME aconway 2011-12-09: + "--timeout=10" + ]) + try: + self.assertEqual(sender.wait(), 0) + self.assertEqual(receiver.wait(), 0) + expect = [long(i) for i in range(991, 1001)] + sn = lambda m: m.properties["sn"] + self.assert_browse_retry(backup1.connect().session(), "q", expect, transform=sn) + self.assert_browse_retry(backup2.connect().session(), "q", expect, transform=sn) + except: + # FIXME aconway 2011-12-09: + print self.browse(primary.connect().session(), "q", transform=sn) + print self.browse(backup1.connect().session(), "q", transform=sn) + print self.browse(backup2.connect().session(), "q", transform=sn) +# os.system("/home/remote/aconway/qpidha/dbg/examples/messaging/drain -b %s 'q;{mode:browse}'"%(primary.host_port())) +# print "---- backup1" +# os.system("/home/remote/aconway/qpidha/dbg/examples/messaging/drain -b %s 'q;{mode:browse}'"%(backup1.host_port())) +# print "---- backup2" +# os.system("/home/remote/aconway/qpidha/dbg/examples/messaging/drain -b %s 'q;{mode:browse}'"%(backup2.host_port())) + raise if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org