Author: aconway Date: Fri Feb 17 14:06:22 2012 New Revision: 1245489 URL: http://svn.apache.org/viewvc?rev=1245489&view=rev Log: QPID-3603: Cleaned up log messages, update qpid-cluster-benchmark to set replicate=all
Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py qpid/branches/qpid-3603-7/qpid/cpp/src/tests/qpid-cluster-benchmark qpid/branches/qpid-3603-7/qpid/cpp/src/tests/qpid-cpp-benchmark Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1245489&r1=1245488&r2=1245489&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Feb 17 14:06:22 2012 @@ -86,7 +86,7 @@ void QueueReplicator::initializeBridge(B peer.getMessage().subscribe(args.i_src, args.i_dest, 0/*accept-explicit*/, 0/*acquire-pre-acquired*/, false, "", 0, settings); peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - QPID_LOG(debug, "HA: Backup activated bridge from queue " << args.i_src << " to " << args.i_dest); + QPID_LOG(debug, "HA: Backup activated bridge from " << args.i_src << " to " << args.i_dest); } void QueueReplicator::route(Deliverable& msg, const std::string& key, const qpid::framing::FieldTable* /*args*/) Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1245489&r1=1245488&r2=1245489&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Feb 17 14:06:22 2012 @@ -138,8 +138,9 @@ ReplicatingSubscription::ReplicatingSubs //local queue (i.e. master) is empty range.add(lwm, queue->getPosition()); } - QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << " are " << range - << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << queue->getPosition() << ")"); + QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() << ": " + << range << " (lwm=" << lwm << ", hwm=" << hwm + << ", current=" << queue->getPosition() << ")"); //set position of 'cursor' position = hwm; } Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1245489&r1=1245488&r2=1245489&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Fri Feb 17 14:06:22 2012 @@ -229,18 +229,14 @@ void WiringReplicator::route(Deliverable Variant::Map& map = i->asMap(); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); - QPID_LOG(debug, "HA: Backup received event: schema=" << schema + QPID_LOG(trace, "HA: Backup received event: schema=" << schema << " values=" << values); if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values); else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values); else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values); else if (match<EventBind>(schema)) doEventBind(values); - // FIXME aconway 2011-11-21: handle unbind & all other events. - else if (match<EventSubscribe>(schema)) {} // Deliberately ignored. - // FIXME aconway 2011-12-02: error handling - else throw(Exception(QPID_MSG("Backup received unexpected event, schema=" - << schema))); + // FIXME aconway 2011-11-21: handle unbind & all other relevant events. } } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { @@ -253,15 +249,13 @@ void WiringReplicator::route(Deliverable if (type == QUEUE) doResponseQueue(values); else if (type == EXCHANGE) doResponseExchange(values); else if (type == BINDING) doResponseBind(values); - else throw Exception(QPID_MSG("HA: Unexpected response type: " << type)); + // FIXME aconway 2011-12-06: handle all relevant response types. } } else { - QPID_LOG(error, QPID_MSG("HA: Backup received unexpected message: " - << *headers)); + QPID_LOG(error, "HA: Backup replication got unexpected message: " << *headers); } } catch (const std::exception& e) { - QPID_LOG(error, "HA: Backup replication error: " << e.what()); - QPID_LOG(error, "HA: Backup replication error while processing: " << list); + QPID_LOG(error, "HA: Backup replication error: " << e.what() << ": while handling: " << list); } } Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py?rev=1245489&r1=1245488&r2=1245489&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/tests/ha_tests.py Fri Feb 17 14:06:22 2012 @@ -118,6 +118,29 @@ class ShortTests(BrokerTest): self.assert_browse_retry(p, "foo", []) self.assert_browse_retry(b, "foo", []) + def test_sync(self): + def queue(name, replicate): + return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate) + primary = self.ha_broker(name="primary", broker_url="primary") # Temp hack to identify primary + p = primary.connect().session() + s = p.sender(queue("q","all")) + for m in [str(i) for i in range(0,10)]: s.send(m) + s.sync() + backup1 = self.ha_broker(name="backup1", broker_url=primary.host_port()) + for m in [str(i) for i in range(10,20)]: s.send(m) + s.sync() + backup2 = self.ha_broker(name="backup2", broker_url=primary.host_port()) + for m in [str(i) for i in range(20,30)]: s.send(m) + s.sync() + msgs = [str(i) for i in range(30)] + + b = backup1.connect().session() + self.assert_browse_retry(b, "q", msgs) + + b = backup2.connect().session() + self.assert_browse_retry(backup2.connect().session(), "q", msgs) + + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:]) Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/tests/qpid-cluster-benchmark URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/tests/qpid-cluster-benchmark?rev=1245489&r1=1245488&r2=1245489&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/tests/qpid-cluster-benchmark (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/tests/qpid-cluster-benchmark Fri Feb 17 14:06:22 2012 @@ -30,7 +30,7 @@ RECEIVERS="-r 3" BROKERS= # Local broker CLIENT_HOSTS= # No ssh, all clients are local -while getopts "m:f:n:b:q:s:r:c:txy" opt; do +while getopts "m:f:n:b:q:s:r:c:txyv" opt; do case $opt in m) MESSAGES="-m $OPTARG";; f) FLOW="--flow-control $OPTARG";; @@ -43,13 +43,15 @@ while getopts "m:f:n:b:q:s:r:c:txy" opt; t) TCP_NODELAY="--connection-options {tcp-nodelay:true}";; x) SAVE_RECEIVED="--save-received";; y) NO_DELETE="--no-delete";; + v) OPTS="--verbose";; *) echo "Unknown option"; exit 1;; esac done +REPLICATE="node:{x-declare:{arguments:{'qpid.replicate':all}}}" BROKER=$(echo $BROKERS | sed s/,.*//) run_test() { echo $*; shift; "$@"; echo; echo; echo; } -OPTS="$REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY $NO_DELETE" -run_test "Queue contention:" qpid-cpp-benchmark $OPTS -run_test "No queue contention: :" qpid-cpp-benchmark $OPTS --group-receivers +OPTS="$OPTS $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY $NO_DELETE" +OPTS="$OPTS --create-option $REPLICATE" +run_test "Benchmark:" qpid-cpp-benchmark $OPTS Modified: qpid/branches/qpid-3603-7/qpid/cpp/src/tests/qpid-cpp-benchmark URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603-7/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1245489&r1=1245488&r2=1245489&view=diff ============================================================================== --- qpid/branches/qpid-3603-7/qpid/cpp/src/tests/qpid-cpp-benchmark (original) +++ qpid/branches/qpid-3603-7/qpid/cpp/src/tests/qpid-cpp-benchmark Fri Feb 17 14:06:22 2012 @@ -55,6 +55,8 @@ op.add_option("--send-option", default=[ help="Additional option for sending addresses") op.add_option("--receive-option", default=[], action="append", type="str", help="Additional option for receiving addresses") +op.add_option("--create-option", default=[], action="append", type="str", + help="Additional option for creating addresses") op.add_option("--send-arg", default=[], action="append", type="str", help="Additional argument for qpid-send") op.add_option("--receive-arg", default=[], action="append", type="str", @@ -75,6 +77,7 @@ op.add_option("--verbose", default=False help="Show commands executed") op.add_option("--no-delete", default=False, action="store_true", help="Don't delete the test queues.") + single_quote_re = re.compile("'") def posix_quote(string): """ Quote a string for use as an argument in a posix shell""" @@ -176,7 +179,7 @@ def queue_exists(queue,broker): return False finally: c.close() -def recreate_queues(queues, brokers, no_delete): +def recreate_queues(queues, brokers, no_delete, opts): c = qpid.messaging.Connection(brokers[0]) c.open() s = c.session() @@ -187,7 +190,9 @@ def recreate_queues(queues, brokers, no_ # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate for b in brokers: while queue_exists(q,b): time.sleep(0.1); - s.sender("%s;{create:always}"%q) + address = "%s;{%s}"%(q, ",".join(opts.create_option + ["create:always"])) + if opts.verbose: print "Creating", address + s.sender(address) # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes to propagate for b in brokers: while not queue_exists(q,b): time.sleep(0.1); @@ -285,7 +290,7 @@ def main(): queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)] try: for i in xrange(opts.repeat): - recreate_queues(queues, opts.broker, opts.no_delete) + recreate_queues(queues, opts.broker, opts.no_delete, opts) ready_receiver = ReadyReceiver(ready_queue, opts.broker[0]) if opts.group_receivers: # Run receivers for same queue against same broker. --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org