Author: kpvdr Date: Wed May 6 16:57:38 2009 New Revision: 772359 URL: http://svn.apache.org/viewvc?rev=772359&view=rev Log: Added the ability to start and stop a test broker from within the python test framework. Also added some cluster test functionality
Modified: qpid/trunk/qpid/python/qpid/testlib.py Modified: qpid/trunk/qpid/python/qpid/testlib.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=772359&r1=772358&r2=772359&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/testlib.py (original) +++ qpid/trunk/qpid/python/qpid/testlib.py Wed May 6 16:57:38 2009 @@ -53,6 +53,7 @@ class TestRunner: SPEC_FOLDER = "../specs" + qpidd = os.getenv("QPIDD") """Runs unit tests. @@ -73,6 +74,9 @@ 0-10-errata - use the 0-10 specification with qpid errata. -e/--errata <errata.xml> : file containing amqp XML errata -b/--broker [amqps://][<user>[/<password>]...@]<host>[:<port>] : broker to connect to + -B/--start-broker <broker-args> : start a local broker using broker-args; set QPIDD + env to point to broker executable. broker-args will be + prepended with "--daemon --port=0" -v/--verbose : verbose - lists tests as they are run. -d/--debug : enable debug logging. -i/--ignore <test> : ignore the named test. @@ -82,6 +86,34 @@ """ sys.exit(1) + def startBroker(self, brokerArgs): + """Start a single broker daemon""" + if TestRunner.qpidd == None: + self._die("QPIDD environment var must point to qpidd when using -B/--start-broker") + cmd = "%s --daemon --port=0 %s" % (TestRunner.qpidd, brokerArgs) + portStr = os.popen(cmd).read() + if len(portStr) == 0: + self._die("%s failed to start" % TestRunner.qpidd) + port = int(portStr) + pid = int(os.popen("%s -p %d -c" % (TestRunner.qpidd, port)).read()) + print "Started broker: pid=%d, port=%d" % (pid, port) + self.brokerTuple = (pid, port) + self.setBroker("localhost:%d" % port) + + def stopBroker(self): + """Stop the broker using qpidd -q""" + if self.brokerTuple: + ret = os.spawnl(os.P_WAIT, TestRunner.qpidd, TestRunner.qpidd, "--port=%d" % self.brokerTuple[1], "-q") + if ret != 0: + self._die("stop_node(): pid=%d port=%d: qpidd -q returned %d" % (self.brokerTuple[0], self.brokerTuple[1], ret)) + print "Stopped broker: pid=%d, port=%d" % self.brokerTuple + + def killBroker(self): + """Kill the broker using kill -9 (SIGTERM)""" + if self.brokerTuple: + os.kill(self.brokerTuple[0], signal.SIGTERM) + print "Killed broker: pid=%d, port=%d" % self.brokerTuple + def setBroker(self, broker): try: self.url = URL(broker) @@ -122,17 +154,22 @@ self.skip_self_test = False try: - opts, self.tests = getopt(args, "s:e:b:h?dvSi:I:F:", + opts, self.tests = getopt(args, "s:e:b:B:h?dvSi:I:F:", ["help", "spec", "errata=", "broker=", - "verbose", "skip-self-test", "ignore", + "start-broker=", "verbose", "skip-self-test", "ignore", "ignore-file", "spec-folder"]) except GetoptError, e: self._die(str(e)) + # check for mutually exclusive options + if "-B" in opts or "--start-broker" in opts: + if "-b" in opts or "--broker" in opts: + self._die("Cannot use -B/--start-broker and -b/broker options together") for opt, value in opts: if opt in ("-?", "-h", "--help"): self._die() if opt in ("-s", "--spec"): self.specfile = value if opt in ("-e", "--errata"): self.errata.append(value) if opt in ("-b", "--broker"): self.setBroker(value) + if opt in ("-B", "--start-broker"): self.startBroker(value) if opt in ("-v", "--verbose"): self.verbose = 2 if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG) if opt in ("-i", "--ignore"): self.ignore.append(value) @@ -182,6 +219,7 @@ return unittest.defaultTestLoader.loadTestsFromNames(self.tests) def run(self, args=sys.argv[1:]): + self.brokerTuple = None self._parseargs(args) runner = unittest.TextTestRunner(descriptions=False, verbosity=self.verbose) @@ -193,6 +231,7 @@ for t in self.ignore: print t print "=======================================" + self.stopBroker() return result.wasSuccessful() def connect(self, host=None, port=None, spec=None, user=None, password=None, tune_params=None): @@ -390,3 +429,196 @@ session.message_subscribe(**keys) session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL) session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL) + + +class TestBaseCluster(unittest.TestCase): + """ + Base class for cluster tests. Provides methods for starting and stopping clusters and cluster nodes. + """ + _tempDir = os.getenv("TMPDIR") + _qpidd = os.getenv("QPIDD") + _storeLib = os.getenv("LIBSTORE") + _clusterLib = os.getenv("LIBCLUSTER") + + # --- Cluster helper functions --- + + """ + _clusterDict is a dictionary of clusters: + key = cluster name (string) + val = dictionary of node numbers: + key = node number (int) + val = tuple containing (pid, port) + For example, two clusters "TestCluster0" and "TestCluster1" containing several nodes would look as follows: + {"TestCluster0": {0: (pid0-0, port0-0), 1: (pid0-1, port0-1), ...}, "TestCluster1": {0: (pid1-0, port1-0), 1: (pid1-1, port1-1), ...}} + where pidm-n and portm-n are the int pid and port for TestCluster m node n respectively. + """ + _clusterDict = {} + + """Index for (pid, port) tuple""" + PID = 0 + PORT = 1 + + def startBroker(self, qpiddArgs, logFile = None): + """Start a single broker daemon, returns tuple (pid, port)""" + if self._qpidd == None: + raise Exception("Environment variable QPIDD is not set") + cmd = "%s --daemon --port=0 %s" % (self._qpidd, qpiddArgs) + portStr = os.popen(cmd).read() + if len(portStr) == 0: + err = "Broker daemon startup failed." + if logFile != None: + err += " See log file %s" % logFile + raise Exception(err) + port = int(portStr) + pid = int(os.popen("%s -p %d -c" % (self._qpidd, port)).read()) + #print "started broker: pid=%d, port=%d" % (pid, port) + return (pid, port) + + def createClusterNode(self, nodeNumber, clusterName): + """Create a node and add it to the named cluster""" + if self._tempDir == None: + raise Exception("Environment variable TMPDIR is not set") + if self._storeLib == None: + raise Exception("Environment variable LIBSTORE is not set") + if self._clusterLib == None: + raise Exception("Environment variable LIBCLUSTER is not set") + name = "%s-%d" % (clusterName, nodeNumber) + dataDir = os.path.join(self._tempDir, "cluster", name) + logFile = "%s.log" % dataDir + args = "--no-module-dir --load-module=%s --load-module=%s --data-dir=%s --cluster-name=%s --auth=no --log-enable=error+ --log-to-file=%s" % \ + (self._storeLib, self._clusterLib, dataDir, clusterName, logFile) + self._clusterDict[clusterName][nodeNumber] = self.startBroker(args, logFile) + + def createCluster(self, clusterName, numberNodes): + """Create a cluster containing an initial number of nodes""" + self._clusterDict[clusterName] = {} + for n in range(0, numberNodes): + self.createClusterNode(n, clusterName) + + def getTupleList(self): + """Get list of (pid, port) tuples of all known cluster brokers""" + tList = [] + for l in self._clusterDict.itervalues(): + for t in l.itervalues(): + tList.append(t) + return tList + + def getNumBrokers(self): + """Get total number of brokers in all known clusters""" + return len(self.getTupleList()) + + def checkNumBrokers(self, expected): + """Check that the total number of brokers in all known clusters is the expected value""" + if self.getNumBrokers() != expected: + raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers())) + + def getClusterTupleList(self, clusterName): + """Get list of (pid, port) tuples of all nodes in named cluster""" + return self._clusterDict[clusterName].values() + + def getNumClusterBrokers(self, clusterName): + """Get total number of brokers in named cluster""" + return len(self.getClusterTupleList(clusterName)) + + def getNodeTuple(self, nodeNumber, clusterName): + """Get the (pid, port) tuple for the given cluster node""" + return self._clusterDict[clusterName][nodeNumber] + + def checkNumClusterBrokers(self, clusterName, expected): + """Check that the total number of brokers in the named cluster is the expected value""" + if self.getNumClusterBrokers(clusterName) != expected: + raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \ + (clusterName, expected, self.getNumClusterBrokers(clusterName))) + + def clusterExists(self, clusterName): + """ Return True if clusterName exists, False otherwise""" + return clusterName in self._clusterDict.keys() + + def clusterNodeExists(self, clusterName, nodeNumber): + """ Return True if nodeNumber in clusterName exists, False otherwise""" + if clusterName in self._clusterDict.keys(): + return nodeNumber in self._clusterDict[nodeName] + return False + + def createCheckCluster(self, clusterName, size): + """Create a cluster using the given name and size, then check the number of brokers""" + self.createCluster(clusterName, size) + self.checkNumClusterBrokers(clusterName, size) + + # Kill cluster nodes using signal 9 + + def killNode(self, nodeNumber, clusterName, updateDict = True): + """Kill the given node in the named cluster using kill -9""" + pid = self.getNodeTuple(nodeNumber, clusterName)[self.PID] + os.kill(pid, signal.SIGTERM) + #print "killed broker: pid=%d" % pid + if updateDict: + del(self._clusterDict[clusterName][nodeNumber]) + + def killCluster(self, clusterName, updateDict = True): + """Kill all nodes in the named cluster""" + for n in self._clusterDict[clusterName].iterkeys(): + self.killNode(n, clusterName, False) + if updateDict: + del(self._clusterDict[clusterName]) + + def killClusterCheck(self, clusterName): + """Kill the named cluster and check that the name is removed from the cluster dictionary""" + self.killCluster(clusterName) + if self.clusterExists(clusterName): + raise Exception("Unable to kill cluster %s; %d nodes still exist" % \ + (clusterName, self.getNumClusterBrokers(clusterName))) + + def killAllClusters(self): + """Kill all known clusters""" + for n in self._clusterDict.iterkeys(): + self.killCluster(n, False) + self._clusterDict.clear() + + def killAllClustersCheck(self): + """Kill all known clusters and check that the cluster dictionary is empty""" + self.killAllClusters() + self.checkNumBrokers(0) + + # Stop cluster nodes using qpidd -q + + def stopNode(self, nodeNumber, clusterName, updateDict = True): + """Stop the given node in the named cluster using qpidd -q""" + port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT] + ret = os.spawnl(os.P_WAIT, self._qpidd, self._qpidd, "--port=%d" % port, "-q") + if ret != 0: + raise Exception("stop_node(): cluster=\"%s\" nodeNumber=%d pid=%d port=%d: qpidd -q returned %d" % \ + (clusterName, nodeNumber, self.getNodeTuple(nodeNumber, clusterName)[self.PID], port, ret)) + #print "stopped broker: port=%d" % port + if updateDict: + del(self._clusterDict[clusterName][nodeNumber]) + + def stopAllClusters(self): + """Stop all known clusters""" + for n in self._clusterDict.iterkeys(): + self.stopCluster(n, False) + self._clusterDict.clear() + + + def stopCluster(self, clusterName, updateDict = True): + """Stop all nodes in the named cluster""" + for n in self._clusterDict[clusterName].iterkeys(): + self.stopNode(n, clusterName, False) + if updateDict: + del(self._clusterDict[clusterName]) + + def stopCheckCluster(self, clusterName): + """Stop the named cluster and check that the name is removed from the cluster dictionary""" + self.stopCluster(clusterName) + if self.clusterExists(clusterName): + raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName))) + def stopCheckAll(self): + """Kill all known clusters and check that the cluster dictionary is empty""" + self.stopAllClusters() + self.checkNumBrokers(0) + + def setUp(self): + pass + + def tearDown(self): + pass --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org