Author: kgiusti Date: Tue May 1 13:57:58 2012 New Revision: 1332658 URL: http://svn.apache.org/viewvc?rev=1332658&view=rev Log: QPID-3963: add testcase for federation and cluster failover
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1332658&r1=1332657&r2=1332658&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue May 1 13:57:58 2012 @@ -767,6 +767,121 @@ acl deny all all cluster.start() fetch(cluster[2]) + + def test_federation_failover(self): + """ + Verify that federation operates across failures occuring in a cluster. + Specifically: + 1) Destination cluster learns of membership changes in the source + cluster + 2) Destination cluster replicates the current state of the source + cluster to newly-added members + """ + + TIMEOUT = 30 + def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT): + """ Prove that traffic can pass from source fed broker to + destination fed broker + """ + tot_time = 0 + active = False + send_session = src_broker.connect().session() + sender = send_session.sender(src) + receive_session = dst_broker.connect().session() + receiver = receive_session.receiver(dst) + while not active and tot_time < timeout: + sender.send(Message("Hello from Source!")) + try: + receiver.fetch(timeout = 1) + receive_session.acknowledge() + # Get this far without Empty exception, and the link is good! + active = True + while True: + # Keep receiving msgs, as several may have accumulated + receiver.fetch(timeout = 1) + receive_session.acknowledge() + except Empty: + if not active: + tot_time += 1 + receiver.close() + receive_session.close() + sender.close() + send_session.close() + self.assertTrue(active, "Bridge failed to become active") + + + # 1 node cluster source, 1 node cluster destination + src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) + src_cluster.ready(); + dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) + dst_cluster.ready(); + + cmd = self.popen(["qpid-config", + "--broker", src_cluster[0].host_port(), + "add", "queue", "srcQ"], EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "exchange", "fanout", "destX"], EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "queue", "destQ"], EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "bind", "destX", "destQ"], EXPECT_EXIT_OK) + cmd.wait() + + # federate the srcQ to the destination exchange + dst_cluster[0].startQmf() + dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0] + result = dst_broker.connect(src_cluster[0].host(), src_cluster[0].port(), False, "PLAIN", + "guest", "guest", "tcp") + self.assertEqual(result.status, 0, result); + + link = dst_cluster[0].qmf_session.getObjects(_class="link")[0] + result = link.bridge(False, "srcQ", "destX", "", "", "", True, False, False, 10) + self.assertEqual(result.status, 0, result) + + # check that traffic passes + verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ") + + # add src[1] and src[2] brokers to source cluster + src_cluster.start(expect=EXPECT_EXIT_FAIL); + src_cluster.ready(); + src_cluster.start(expect=EXPECT_EXIT_FAIL); + src_cluster.ready(); + verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ") + + # Kill src[0]. dst[0] should've learned about src[1,2] + src_cluster[0].kill() + for b in src_cluster[1:]: b.ready() + verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ") + + # Kill src[1], dst[0] should still be connected + src_cluster[1].kill() + for b in src_cluster[2:]: b.ready() + verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ") + + # Add dest[1] + # dest[0] syncs dest[1] to current remote state + dst_cluster.start(expect=EXPECT_EXIT_FAIL); + dst_cluster.ready(); + verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ") + + # Kill dest[0], force failover to dest[1] + dst_cluster[0].kill() + for b in dst_cluster[1:]: b.ready() + verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ") + + for i in range(2, len(src_cluster)): src_cluster[i].kill() + for i in range(1, len(dst_cluster)): dst_cluster[i].kill() + + # Some utility code for transaction tests XA_RBROLLBACK = 1 XA_RBTIMEOUT = 2 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org