Author: aconway Date: Wed Oct 13 20:09:18 2010 New Revision: 1022279 URL: http://svn.apache.org/viewvc?rev=1022279&view=rev Log: Test client to measure cluster lag: greater delays in responses from some brokers.
Note yet incorporated into an automated test, but this is a useful stand-alone test client. Added: qpid/trunk/qpid/cpp/src/tests/qpid-cluster-lag.py (with props) Modified: qpid/trunk/qpid/cpp/src/tests/qpid-test-cluster Added: qpid/trunk/qpid/cpp/src/tests/qpid-cluster-lag.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cluster-lag.py?rev=1022279&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/qpid-cluster-lag.py (added) +++ qpid/trunk/qpid/cpp/src/tests/qpid-cluster-lag.py Wed Oct 13 20:09:18 2010 @@ -0,0 +1,93 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""%prog [options] broker... +Check for brokers that lag behind other brokers in a cluster.""" + +import os, os.path, sys, socket, time, re +from qpid.messaging import * +from optparse import OptionParser +from threading import Thread + +class Browser(Thread): + def __init__(self, broker, queue, timeout): + Thread.__init__(self) + self.broker = broker + self.queue = queue + self.timeout = timeout + self.error = None + self.time = None + + def run(self): + try: + self.connection = Connection(self.broker) + self.connection.open() + self.session = self.connection.session() + self.receiver = self.session.receiver("%s;{mode:browse}"%self.queue) + self.msg = self.receiver.fetch(timeout=self.timeout) + self.time = time.time() + if (self.msg.content != self.queue): + raise Exception("Wrong message content, expected '%s' found '%s'"% + (self.queue, self.msg.content)) + except Empty: + self.error = "No message on queue %s"%self.queue + except Exception, e: + self.error = "Error: %s"%e + +def main(argv): + op = OptionParser(usage=__doc__) + op.add_option("--timeout", type="float", default=None, metavar="TIMEOUT", + help="Give up after TIMEOUT milliseconds, default never timeout") + (opts, args) = op.parse_args(argv) + if (len(args) <= 1): op.error("No brokers were specified") + brokers = args[1:] + + # Put a message on a uniquely named queue. + queue = "%s:%s:%s"%(os.path.basename(args[0]), socket.gethostname(), os.getpid()) + connection = Connection(brokers[0]) + connection.open() + session = connection.session() + sender = session.sender( + "%s;{create:always,delete:always,node:{durable:False}}"%queue) + sender.send(Message(content=queue)) + start = time.time() + # Browse for the message on each broker + if opts.timeout: opts.timeout + threads = [Browser(b, queue, opts.timeout) for b in brokers] + for t in threads: t.start() + delays=[] + + for t in threads: + t.join() + if t.error: + delay=t.error + else: + delay = t.time-start + delays.append([delay, t.broker]) + print "%s: %s"%(t.broker,delay) + if delays: + delays.sort() + print "lag: %s (%s-%s)"%(delays[-1][0] - delays[0][0], delays[-1][1], delays[0][1]) + # Clean up + sender.close() + session.close() + connection.close() + +if __name__ == "__main__": sys.exit(main(sys.argv)) Propchange: qpid/trunk/qpid/cpp/src/tests/qpid-cluster-lag.py ------------------------------------------------------------------------------ svn:executable = * Modified: qpid/trunk/qpid/cpp/src/tests/qpid-test-cluster URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-test-cluster?rev=1022279&r1=1022278&r2=1022279&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/qpid-test-cluster (original) +++ qpid/trunk/qpid/cpp/src/tests/qpid-test-cluster Wed Oct 13 20:09:18 2010 @@ -18,9 +18,6 @@ # under the License. # -DEFAULT_CONF=~/qpid-test-qpidd.conf -DEFAULT_ENV=~/qpid-test-env.sh - usage() { echo "Usage: `basename $0` [options] start|stop|restart|check [qpidd-args] Start/stop/restart a cluster on hosts in \$HOSTS via ssh. @@ -35,8 +32,8 @@ Options: exit 1 } -absdir() { echo `cd $1 && pwd`; } -copyall() { for h in $HOSTS; do rsync $1 $RSYNC_USER$h:$(absdir `dirname $1`); done; } +DEFAULT_CONF=~/qpid-test-qpidd.conf +DEFAULT_ENV=~/qpid-test-env.sh test -f $DEFAULT_CONF && CONF_FILE=$DEFAULT_CONF test -f $DEFAULT_ENV && ENV_FILE=$DEFAULT_ENV @@ -54,21 +51,21 @@ CMD=$1; shift QPIDD_ARGS="$QPIDD_ARGS $*" if test -n "$CONF_FILE"; then - copyall $CONF_FILE + RSYNCFILES="$RSYNCFILES $CONF_FILE" QPIDD_ARGS="$QPIDD_ARGS --config $CONF_FILE" QPID_PORT=${QPID_PORT:-`awk -F= '/^ *port=/ {print $2}' $CONF_FILE`} fi if test -n "$ENV_FILE"; then - copyall $ENV_FILE + RSYNCFILES="$RSYNCFILES $ENV_FILE" SOURCE_ENV="source $ENV_FILE && " fi - +test -n "$RSYNCFILES" && rsynchosts $RSYNCFILES do_start() { for h in $HOSTS; do COMMAND="qpidd -d $QPIDD_ARGS" id -nG | grep '\<ais\>' >/dev/null && COMMAND="sg ais -c '$COMMAND'" - ssh $SSHOPTS $h "$SOURCE_ENV $COMMAND" + ssh $SSHOPTS $h "$SOURCE_ENV $COMMAND" || { echo "error on $h: $COMMAND"; exit 1; } done } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org