Author: aconway
Date: Tue Nov  8 14:41:43 2011
New Revision: 1199265

URL: http://svn.apache.org/viewvc?rev=1199265&view=rev
Log:
NO-JIRA: Improvements to benchmark scripts.

Modified:
    qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark
    qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark?rev=1199265&r1=1199264&r2=1199265&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark Tue Nov  8 14:41:43 
2011
@@ -1,4 +1,4 @@
-#!/bin/bash
+#!/bin/sh
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -20,39 +20,35 @@
 
 # Benchmark script for comparing cluster performance.
 
-# Default values
-PORT="5672"
-COUNT=10000
-FLOW=100             # Flow control limit on queue depth for latency.
-REPEAT=10
-QUEUES=4
-CLIENTS=3
+# Default options
+MESSAGES="-m 10000"
+FLOW="--flow-control 100"            # Flow control limit on queue depth for 
latency.
+REPEAT="--repeat 10"
+QUEUES="-q 6"
+SENDERS="-s 3"
+RECEIVERS="-r 3"
+BROKERS=                       # Local broker
+CLIENT_HOSTS=                  # No ssh, all clients are local
 
-while getopts "p:c:f:r:t:b:q:c" opt; do
+while getopts "m:f:n:b:q:s:r:c:x:t" opt; do
     case $opt in
-       p) PORT=$OPTARG;;
-       c) COUNT=$OPTARG;;
-       f) FLOW=$OPTARG;;
-       r) REPEAT=$OPTARG;;
-       s) SCALE=$OPTARG;;
-       b) BROKERS=$OPTARG;;
-       q) QUEUES=$OPTARG;;
-       c) CLIENTS=$OPTARG;;
+       m) MESSAGES="-m $OPTARG";;
+       f) FLOW="--flow-control $OPTARG";;
+       n) REPEAT="--repeat $OPTARG";;
+       b) BROKERS="-b $OPTARG";;
+       q) QUEUES="-q $OPTARG";;
+       s) SENDERS="-s $OPTARG";;
+       r) RECEIVERS="-r $OPTARG";;
+       c) CLIENT_HOSTS="-c $OPTARG";;
+       x) SAVE_RECEIVED="--save-received";;
+       t) TCP_NODELAY="--connection-options {tcp-nodelay:true}";;
        *) echo "Unknown option"; exit 1;;
     esac
 done
-
-BROKERS=${BROKERS:-$(echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g")} # Broker URL 
list
-BROKER=`echo $BROKERS | awk -F, '{print $1}'` # First broker
-
+BROKER=$(echo $BROKERS | sed s/,.*//)
 run_test() { echo $*; shift; "$@"; echo; echo; echo; }
 
-# Multiple pubs/subs connect via multiple brokers (active-active)
-run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS 
--no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT
-
-# Multiple pubs/subs connect via single broker (active-passive)
-run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER 
--no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT
-
-# Latency
-run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER 
--connection-options '{tcp-nodelay:true}' -m $COUNT --flow-control $FLOW
+OPTS="$REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES 
$CLIENT_HOSTS $SAVE_RECEIVED $TCP_NODELAY"
+run_test "Queue contention:" qpid-cpp-benchmark $OPTS
+run_test "No queue contention: :" qpid-cpp-benchmark $OPTS --group-receivers
 

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark?rev=1199265&r1=1199264&r2=1199265&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-cpp-benchmark Tue Nov  8 14:41:43 2011
@@ -67,14 +67,19 @@ op.add_option("--flow-control", default=
               help="Flow control each sender to limit queue depth to 2*N. 0 
means no flow control.")
 op.add_option("--durable", default=False, action="store_true",
               help="Use durable queues and messages")
-
+op.add_option("--save-received", default=False, action="store_true",
+              help="Save received message content to files 
<queuename>-receiver-<n>.msg")
+op.add_option("--group-receivers", default=False, action="store_true",
+              help="Run receivers for the same queue on the same host.")
+op.add_option("--verbose", default=False, action="store_true",
+              help="Show commands executed")
 single_quote_re = re.compile("'")
 def posix_quote(string):
     """ Quote a string for use as an argument in a posix shell"""
     return "'" + single_quote_re.sub("\\'", string) + "'";
 
 def ssh_command(host, command):
-    """Convert command into an ssh command on host with quoting"""
+    """ Convert command into an ssh command on host with quoting"""
     return ["ssh", host] + [posix_quote(arg) for arg in command]
 
 class Clients:
@@ -89,10 +94,16 @@ class Clients:
             try: c.kill()
             except: pass
 
+class  PopenCommand(Popen):
+    """Like Popen but you can query for the command"""
+    def __init__(self, command, *args, **kwargs):
+        self.command = command
+        Popen.__init__(self, command, *args, **kwargs)
+
 clients = Clients()
 
 def start_receive(queue, index, opts, ready_queue, broker, host):
-    address_opts=["create:receiver"] + opts.receive_option
+    address_opts=opts.receive_option
     if opts.durable: address_opts += ["node:{durable:true}"]
     address="%s;{%s}"%(queue,",".join(address_opts))
     msg_total=opts.senders*opts.messages
@@ -108,17 +119,20 @@ def start_receive(queue, index, opts, re
                "--receive-rate", str(opts.receive_rate),
                "--report-total",
                "--ack-frequency", str(opts.ack_frequency),
-               "--ready-address", ready_queue,
+               "--ready-address", "%s;{create:always}"%ready_queue,
                "--report-header=no"
                ]
+    if opts.save_received:
+        command += ["--save-content=%s-receiver-%s.msg"%(queue,index)]
     command += opts.receive_arg
     if opts.connection_options:
         command += ["--connection-options",opts.connection_options]
     if host: command = ssh_command(host, command)
-    return clients.add(Popen(command, stdout=PIPE))
+    if opts.verbose: print "Receiver: ", command
+    return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
 
 def start_send(queue, opts, broker, host):
-    address="%s;{%s}"%(queue,",".join(opts.send_option))
+    address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
     command = ["qpid-send",
                "-b", broker,
                "-a", address,
@@ -136,33 +150,52 @@ def start_send(queue, opts, broker, host
     if opts.connection_options:
         command += ["--connection-options",opts.connection_options]
     if host: command = ssh_command(host, command)
-    return clients.add(Popen(command, stdout=PIPE))
+    if opts.verbose: print "Sender: ", command
+    return clients.add(PopenCommand(command, stdout=PIPE, stderr=PIPE))
+
+def error_msg(out, err):
+    return ("\n[stdout]\n%s\n[stderr]\n%s[end]"%(out, err))
 
 def first_line(p):
     out,err=p.communicate()
-    if p.returncode != 0: raise Exception("Process failed: %s"%(out.strip()))
+    if p.returncode != 0:
+        raise Exception("Process exit %d: %s"%(p.returncode, 
error_msg(out,err)))
     return out.split("\n")[0]
 
-def delete_queues(queues, broker):
+def queue_exists(queue,broker):
     c = qpid.messaging.Connection(broker)
     c.open()
-    for q in queues:
+    try:
+        s = c.session()
         try:
-            s = c.session()
-            snd = s.sender("%s;{delete:always}"%(q))
-            snd.close()
-            s.sync()
-        except qpid.messaging.exceptions.NotFound: pass # Ignore "no such 
queue"
+            s.sender(queue)
+            return True
+        except qpid.messaging.exceptions.NotFound:
+            return False
+    finally: c.close()
+
+def recreate_queues(queues, brokers):
+    c = qpid.messaging.Connection(brokers[0])
+    c.open()
+    s = c.session()
+    for q in queues:
+        try: s.sender("%s;{delete:always}"%(q)).close()
+        except qpid.messaging.exceptions.NotFound: pass
+        # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes 
to propagate
+        for b in brokers:
+            while queue_exists(q,b): time.sleep(0.1);
+        s.sender("%s;{create:always}"%q)
+        # FIXME aconway 2011-05-04: new cluster async wiring, wait for changes 
to propagate
+        for b in brokers:
+            while not queue_exists(q,b): time.sleep(0.1);
     c.close()
 
 def print_header(timestamp):
-    if timestamp: latency_header="\tl-min\tl-max\tl-avg"
+    if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp"
     else: latency_header=""
-    print "send-tp\t\trecv-tp%s"%latency_header
+    print "send-tp\trecv-tp%s"%latency_header
 
 def parse(parser, lines):               # Parse sender/receiver output
-    for l in lines:
-        fn_val = zip(parser, l)
     return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in 
lines]
 
 def parse_senders(senders):
@@ -171,32 +204,35 @@ def parse_senders(senders):
 def parse_receivers(receivers):
     return parse([int,float,float,float],[first_line(p) for p in receivers if 
p])
 
-def print_data(send_stats, recv_stats):
+def print_data(send_stats, recv_stats, total_tp):
     for send,recv in map(None, send_stats, recv_stats):
         line=""
         if send: line += "%d"%send[0]
         if recv:
-            line += "\t\t%d"%recv[0]
+            line += "\t%d"%recv[0]
             if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
+        if total_tp is not None:
+            line += "\t%d"%total_tp
+            total_tp = None
         print line
 
-def print_summary(send_stats, recv_stats):
+def print_summary(send_stats, recv_stats, total_tp):
     def avg(s): sum(s) / len(s)
     send_tp = sum([l[0] for l in send_stats])
     recv_tp = sum([l[0] for l in recv_stats])
-    summary = "%d\t\t%d"%(send_tp, recv_tp)
+    summary = "%d\t%d"%(send_tp, recv_tp)
     if recv_stats and len(recv_stats[0]) == 4:
         l_min = sum(l[1] for l in recv_stats)/len(recv_stats)
         l_max = sum(l[2] for l in recv_stats)/len(recv_stats)
         l_avg = sum(l[3] for l in recv_stats)/len(recv_stats)
         summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg)
+    summary += "\t%d"%total_tp
     print summary
 
 
 class ReadyReceiver:
     """A receiver for ready messages"""
     def __init__(self, queue, broker):
-        delete_queues([queue], broker)
         self.connection = qpid.messaging.Connection(broker)
         self.connection.open()
         self.receiver = self.connection.session().receiver(
@@ -212,7 +248,8 @@ class ReadyReceiver:
             for r in receivers:
                 if (r.poll() is not None):
                     out,err=r.communicate()
-                    raise Exception("Receiver error: %s"%(out))
+                    raise Exception("Receiver error: %s\n%s" %
+                                    (" ".join(r.command), error_msg(out,err)))
             raise Exception("Timed out waiting for receivers to be ready")
 
 def flatten(l):
@@ -231,9 +268,12 @@ class RoundRobin:
 
 def main():
     opts, args = op.parse_args()
-    if not opts.broker: opts.broker = ["127.0.0.1"] # Deafult to local broker
-    opts.broker = flatten(opts.broker)
     opts.client_host = flatten(opts.client_host)
+    if not opts.broker:
+        if opts.client_host:
+            raise Exception("--broker must be specified if --client_host is.")
+        opts.broker = ["127.0.0.1"] # Deafult to local broker
+    opts.broker = flatten(opts.broker)
     brokers = RoundRobin(opts.broker)
     client_hosts = RoundRobin(opts.client_host)
     send_out = ""
@@ -242,19 +282,33 @@ def main():
     queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
     try:
         for i in xrange(opts.repeat):
-            delete_queues(queues, opts.broker[0])
+            recreate_queues(queues, opts.broker)
             ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
-            receivers = [start_receive(q, j, opts, ready_queue, 
brokers.next(), client_hosts.next())
-                         for q in queues for j in xrange(opts.receivers)]
+
+            if opts.group_receivers: # Run receivers for same queue against 
same broker.
+                receivers = []
+                for q in queues:
+                    b = brokers.next()
+                    for j in xrange(opts.receivers):
+                        receivers.append(
+                            start_receive(q, j, opts, ready_queue, b, 
client_hosts.next()))
+            else:                       # Don't group receivers
+                receivers = [start_receive(q, j, opts, ready_queue,
+                                           brokers.next(), client_hosts.next())
+                             for q in queues for j in xrange(opts.receivers)]
+
             ready_receiver.wait(filter(None, receivers)) # Wait for receivers 
to be ready.
+            start = time.time()
             senders = [start_send(q, opts,brokers.next(), client_hosts.next())
                        for q in queues for j in xrange(opts.senders)]
             if opts.report_header and i == 0: print_header(opts.timestamp)
+            for p in senders + receivers: p.wait()
+            total_sent = opts.queues * opts.senders * opts.messages
+            total_tp = total_sent / (time.time()-start)
             send_stats=parse_senders(senders)
             recv_stats=parse_receivers(receivers)
-            if opts.summarize: print_summary(send_stats, recv_stats)
-            else: print_data(send_stats, recv_stats)
-            delete_queues(queues, opts.broker[0])
+            if opts.summarize: print_summary(send_stats, recv_stats, total_tp)
+            else: print_data(send_stats, recv_stats, total_tp)
     finally: clients.kill()             # No strays
 
 if __name__ == "__main__": main()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to