Author: kgiusti
Date: Tue May  1 13:57:58 2012
New Revision: 1332658

URL: http://svn.apache.org/viewvc?rev=1332658&view=rev
Log:
QPID-3963: add testcase for federation and cluster failover

Modified:
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1332658&r1=1332657&r2=1332658&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue May  1 13:57:58 2012
@@ -767,6 +767,121 @@ acl deny all all
         cluster.start()
         fetch(cluster[2])
 
+
+    def test_federation_failover(self):
+        """
+        Verify that federation operates across failures occuring in a cluster.
+        Specifically:
+        1) Destination cluster learns of membership changes in the source
+        cluster
+        2) Destination cluster replicates the current state of the source
+        cluster to newly-added members
+        """
+
+        TIMEOUT = 30
+        def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT):
+            """ Prove that traffic can pass from source fed broker to
+            destination fed broker
+            """
+            tot_time = 0
+            active = False
+            send_session = src_broker.connect().session()
+            sender = send_session.sender(src)
+            receive_session = dst_broker.connect().session()
+            receiver = receive_session.receiver(dst)
+            while not active and tot_time < timeout:
+                sender.send(Message("Hello from Source!"))
+                try:
+                    receiver.fetch(timeout = 1)
+                    receive_session.acknowledge()
+                    # Get this far without Empty exception, and the link is 
good!
+                    active = True
+                    while True:
+                        # Keep receiving msgs, as several may have accumulated
+                        receiver.fetch(timeout = 1)
+                        receive_session.acknowledge()
+                except Empty:
+                    if not active:
+                        tot_time += 1
+            receiver.close()
+            receive_session.close()
+            sender.close()
+            send_session.close()
+            self.assertTrue(active, "Bridge failed to become active")
+
+
+        # 1 node cluster source, 1 node cluster destination
+        src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+        src_cluster.ready();
+        dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+        dst_cluster.ready();
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", src_cluster[0].host_port(),
+                          "add", "queue", "srcQ"], EXPECT_EXIT_OK)
+        cmd.wait()
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", dst_cluster[0].host_port(),
+                          "add", "exchange", "fanout", "destX"], 
EXPECT_EXIT_OK)
+        cmd.wait()
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", dst_cluster[0].host_port(),
+                          "add", "queue", "destQ"], EXPECT_EXIT_OK)
+        cmd.wait()
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", dst_cluster[0].host_port(),
+                          "bind", "destX", "destQ"], EXPECT_EXIT_OK)
+        cmd.wait()
+
+        # federate the srcQ to the destination exchange
+        dst_cluster[0].startQmf()
+        dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0]
+        result = dst_broker.connect(src_cluster[0].host(), 
src_cluster[0].port(), False, "PLAIN",
+                                    "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0, result);
+
+        link = dst_cluster[0].qmf_session.getObjects(_class="link")[0]
+        result = link.bridge(False, "srcQ", "destX", "", "", "", True, False, 
False, 10)
+        self.assertEqual(result.status, 0, result)
+
+        # check that traffic passes
+        verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
+
+        # add src[1] and src[2] brokers to source cluster
+        src_cluster.start(expect=EXPECT_EXIT_FAIL);
+        src_cluster.ready();
+        src_cluster.start(expect=EXPECT_EXIT_FAIL);
+        src_cluster.ready();
+        verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+
+        # Kill src[0]. dst[0] should've learned about src[1,2]
+        src_cluster[0].kill()
+        for b in src_cluster[1:]: b.ready()
+        verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
+
+        # Kill src[1], dst[0] should still be connected
+        src_cluster[1].kill()
+        for b in src_cluster[2:]: b.ready()
+        verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+
+        # Add dest[1]
+        # dest[0] syncs dest[1] to current remote state
+        dst_cluster.start(expect=EXPECT_EXIT_FAIL);
+        dst_cluster.ready();
+        verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+
+        # Kill dest[0], force failover to dest[1]
+        dst_cluster[0].kill()
+        for b in dst_cluster[1:]: b.ready()
+        verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+
+        for i in range(2, len(src_cluster)): src_cluster[i].kill()
+        for i in range(1, len(dst_cluster)): dst_cluster[i].kill()
+
+
 # Some utility code for transaction tests
 XA_RBROLLBACK = 1
 XA_RBTIMEOUT = 2



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to