Author: kpvdr Date: Tue May 26 18:22:48 2009 New Revision: 778827 URL: http://svn.apache.org/viewvc?rev=778827&view=rev Log: Persistent cluster test added which checks for recovery of queue and messages after all nodes in a cluster are killed. Test does not run if no store is loaded.
Modified: qpid/trunk/qpid/cpp/src/tests/cluster.py qpid/trunk/qpid/cpp/src/tests/testlib.py Modified: qpid/trunk/qpid/cpp/src/tests/cluster.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster.py?rev=778827&r1=778826&r2=778827&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/cluster.py (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster.py Tue May 26 18:22:48 2009 @@ -270,14 +270,58 @@ except: self.killAllClusters() raise + + def test_Cluster_12_KillAllNodesRecoverMessages(self): + """Create a cluster, add and delete messages, kill all nodes then recover cluster and messages""" + if not self._storeEnable: + print " No store loaded, skipped" + return + try: + clusterName = "cluster-12" + exchangeName = "test-exchange-12" + queueName = "test-queue-12" + self.createCheckCluster(clusterName, 4) + self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName) + txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) + rxMsgs = self.receiveMsgs(1, clusterName, queueName, 10) + txMsgs += self.sendMsgs(2, clusterName, exchangeName, queueName, 20) + rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20) + self.killNode(0, clusterName) + self.createClusterNode(4, clusterName) + self.checkNumClusterBrokers(clusterName, 4) + txMsgs += self.sendMsgs(4, clusterName, exchangeName, queueName, 20) + rxMsgs += self.receiveMsgs(1, clusterName, queueName, 20) + self.killNode(2, clusterName) + self.createClusterNode(0, clusterName) + self.createClusterNode(5, clusterName) + self.checkNumClusterBrokers(clusterName, 5) + txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) + rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20) + self.killAllClusters() + self.checkNumClusterBrokers(clusterName, 0) + self.createCluster(clusterName) + self.createClusterNode(3, clusterName) # last node to be used + self.createClusterNode(0, clusterName) + self.createClusterNode(1, clusterName) + self.createClusterNode(2, clusterName) + self.createClusterNode(4, clusterName) + self.createClusterNode(5, clusterName) + rxMsgs += self.receiveMsgs(0, clusterName, queueName, 10) + if txMsgs != rxMsgs: + print "txMsgs=%s" % txMsgs + print "rxMsgs=%s" % rxMsgs + self.fail("Send - receive message mismatch") + except: + self.killAllClusters() + raise - def test_Cluster_12_TopicExchange(self): + def test_Cluster_13_TopicExchange(self): """Create topic exchange in a cluster and make sure it replicates correctly""" try: - clusterName = "cluster-12" + clusterName = "cluster-13" self.createCheckCluster(clusterName, 4) - topicExchangeName = "test-exchange-12" - topicQueueNameKeyList = {"test-queue-12-A" : "#.A", "test-queue-12-B" : "#.B", "test-queue-12-C" : "C.#", "test-queue-12-D" : "D.#"} + topicExchangeName = "test-exchange-13" + topicQueueNameKeyList = {"test-queue-13-A" : "#.A", "test-queue-13-B" : "#.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.#"} self.createBindTopicExchangeQueues(2, clusterName, topicExchangeName, topicQueueNameKeyList) # Place initial messages @@ -293,16 +337,16 @@ self.createClusterNode(5, clusterName) self.checkNumClusterBrokers(clusterName, 4) # Pull 10 messages from each queue - rxMsgsA = self.receiveMsgs(1, clusterName, "test-queue-12-A", 10) # (10, 20, 10, 10) - rxMsgsB = self.receiveMsgs(3, clusterName, "test-queue-12-B", 10) # (10, 10, 10, 10) - rxMsgsC = self.receiveMsgs(4, clusterName, "test-queue-12-C", 10) # (10, 10, 0, 10) - rxMsgsD = self.receiveMsgs(5, clusterName, "test-queue-12-D", 10) # (10, 10, 0, 0) + rxMsgsA = self.receiveMsgs(1, clusterName, "test-queue-13-A", 10) # (10, 20, 10, 10) + rxMsgsB = self.receiveMsgs(3, clusterName, "test-queue-13-B", 10) # (10, 10, 10, 10) + rxMsgsC = self.receiveMsgs(4, clusterName, "test-queue-13-C", 10) # (10, 10, 0, 10) + rxMsgsD = self.receiveMsgs(5, clusterName, "test-queue-13-D", 10) # (10, 10, 0, 0) # Kill and add another node self.killNode(4, clusterName) self.createClusterNode(6, clusterName) self.checkNumClusterBrokers(clusterName, 4) # Add two more queues - self.createBindTopicExchangeQueues(6, clusterName, topicExchangeName, {"test-queue-12-E" : "#.bye.A", "test-queue-12-F" : "#.bye.B"}) + self.createBindTopicExchangeQueues(6, clusterName, topicExchangeName, {"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : "#.bye.B"}) # Place more messages txMsgs = self.sendMsgs(3, clusterName, topicExchangeName, "C.bye.A", 10) # (20, 10, 10, 0, 10, 0) txMsgsA += txMsgs @@ -319,12 +363,12 @@ self.killNode(6, clusterName) self.checkNumClusterBrokers(clusterName, 1) # Pull all remaining messages from each queue - rxMsgsA += self.receiveMsgs(5, clusterName, "test-queue-12-A", 20) - rxMsgsB += self.receiveMsgs(5, clusterName, "test-queue-12-B", 30) - rxMsgsC += self.receiveMsgs(5, clusterName, "test-queue-12-C", 10) - rxMsgsD += self.receiveMsgs(5, clusterName, "test-queue-12-D", 20) - rxMsgsE = self.receiveMsgs(5, clusterName, "test-queue-12-E", 10) - rxMsgsF = self.receiveMsgs(5, clusterName, "test-queue-12-F", 20) + rxMsgsA += self.receiveMsgs(5, clusterName, "test-queue-13-A", 20) + rxMsgsB += self.receiveMsgs(5, clusterName, "test-queue-13-B", 30) + rxMsgsC += self.receiveMsgs(5, clusterName, "test-queue-13-C", 10) + rxMsgsD += self.receiveMsgs(5, clusterName, "test-queue-13-D", 20) + rxMsgsE = self.receiveMsgs(5, clusterName, "test-queue-13-E", 10) + rxMsgsF = self.receiveMsgs(5, clusterName, "test-queue-13-F", 20) # Check messages self.stopCheckAll() if txMsgsA != rxMsgsA: @@ -343,20 +387,20 @@ self.killAllClusters() raise - def test_Cluster_13_FanoutExchange(self): + def test_Cluster_14_FanoutExchange(self): """Create fanout exchange in a cluster and make sure it replicates correctly""" try: - clusterName = "cluster-13" + clusterName = "cluster-14" self.createCheckCluster(clusterName, 4) - fanoutExchangeName = "test-exchange-13" - fanoutQueueNameList = ["test-queue-13-A", "test-queue-13-B", "test-queue-13-C"] + fanoutExchangeName = "test-exchange-14" + fanoutQueueNameList = ["test-queue-14-A", "test-queue-14-B", "test-queue-14-C"] self.createBindFanoutExchangeQueues(2, clusterName, fanoutExchangeName, fanoutQueueNameList) # Place initial 20 messages, retrieve 10 txMsg = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20) - rxMsgA = self.receiveMsgs(1, clusterName, "test-queue-13-A", 10) - rxMsgB = self.receiveMsgs(3, clusterName, "test-queue-13-B", 10) - rxMsgC = self.receiveMsgs(0, clusterName, "test-queue-13-C", 10) + rxMsgA = self.receiveMsgs(1, clusterName, "test-queue-14-A", 10) + rxMsgB = self.receiveMsgs(3, clusterName, "test-queue-14-B", 10) + rxMsgC = self.receiveMsgs(0, clusterName, "test-queue-14-C", 10) # Kill and add some nodes self.killNode(0, clusterName) self.killNode(2, clusterName) @@ -365,34 +409,34 @@ self.checkNumClusterBrokers(clusterName, 4) # Place another 20 messages, retrieve 20 txMsg += self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20) - rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-13-A", 20) - rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-13-B", 20) - rxMsgC += self.receiveMsgs(4, clusterName, "test-queue-13-C", 20) + rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-14-A", 20) + rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-14-B", 20) + rxMsgC += self.receiveMsgs(4, clusterName, "test-queue-14-C", 20) # Kill and add another node self.killNode(4, clusterName) self.createClusterNode(6, clusterName) self.checkNumClusterBrokers(clusterName, 4) # Add another 2 queues - self.createBindFanoutExchangeQueues(6, clusterName, fanoutExchangeName, ["test-queue-13-D", "test-queue-13-E"]) + self.createBindFanoutExchangeQueues(6, clusterName, fanoutExchangeName, ["test-queue-14-D", "test-queue-14-E"]) # Place another 20 messages, retrieve 20 tmp = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20) txMsg += tmp - rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-13-A", 20) - rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-13-B", 20) - rxMsgC += self.receiveMsgs(6, clusterName, "test-queue-13-C", 20) - rxMsgD = self.receiveMsgs(6, clusterName, "test-queue-13-D", 10) - rxMsgE = self.receiveMsgs(6, clusterName, "test-queue-13-E", 10) + rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-14-A", 20) + rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-14-B", 20) + rxMsgC += self.receiveMsgs(6, clusterName, "test-queue-14-C", 20) + rxMsgD = self.receiveMsgs(6, clusterName, "test-queue-14-D", 10) + rxMsgE = self.receiveMsgs(6, clusterName, "test-queue-14-E", 10) # Kill all nodes but one self.killNode(1, clusterName) self.killNode(3, clusterName) self.killNode(6, clusterName) self.checkNumClusterBrokers(clusterName, 1) # Pull all remaining messages from each queue - rxMsgA += self.receiveMsgs(5, clusterName, "test-queue-13-A", 10) - rxMsgB += self.receiveMsgs(5, clusterName, "test-queue-13-B", 10) - rxMsgC += self.receiveMsgs(5, clusterName, "test-queue-13-C", 10) - rxMsgD += self.receiveMsgs(5, clusterName, "test-queue-13-D", 10) - rxMsgE += self.receiveMsgs(5, clusterName, "test-queue-13-E", 10) + rxMsgA += self.receiveMsgs(5, clusterName, "test-queue-14-A", 10) + rxMsgB += self.receiveMsgs(5, clusterName, "test-queue-14-B", 10) + rxMsgC += self.receiveMsgs(5, clusterName, "test-queue-14-C", 10) + rxMsgD += self.receiveMsgs(5, clusterName, "test-queue-14-D", 10) + rxMsgE += self.receiveMsgs(5, clusterName, "test-queue-14-E", 10) # Check messages self.stopCheckAll() if txMsg != rxMsgA: @@ -408,7 +452,6 @@ except: self.killAllClusters() raise - # Start the test here Modified: qpid/trunk/qpid/cpp/src/tests/testlib.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/testlib.py?rev=778827&r1=778826&r2=778827&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/testlib.py (original) +++ qpid/trunk/qpid/cpp/src/tests/testlib.py Tue May 26 18:22:48 2009 @@ -188,7 +188,7 @@ args += " --load-module %s" % self._storeLib self._clusterDict[clusterName][nodeNumber] = self.startBroker(args, logFile) - def createCluster(self, clusterName, numberNodes): + def createCluster(self, clusterName, numberNodes = 0): """Create a cluster containing an initial number of nodes""" self._clusterDict[clusterName] = {} for n in range(0, numberNodes): @@ -215,7 +215,9 @@ def getClusterTupleList(self, clusterName): """Get list of (pid, port) tuples of all nodes in named cluster""" - return self._clusterDict[clusterName].values() + if clusterName in self._clusterDict: + return self._clusterDict[clusterName].values() + return [] def getNumClusterBrokers(self, clusterName): """Get total number of brokers in named cluster""" --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org