Author: shuston Date: Fri Oct 22 21:40:39 2010 New Revision: 1026501 URL: http://svn.apache.org/viewvc?rev=1026501&view=rev Log: Test module to run recovery-oriented store tests.
Added: qpid/trunk/qpid/cpp/src/tests/store.py Added: qpid/trunk/qpid/cpp/src/tests/store.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/store.py?rev=1026501&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/store.py (added) +++ qpid/trunk/qpid/cpp/src/tests/store.py Fri Oct 22 21:40:39 2010 @@ -0,0 +1,197 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import errno, os, time +from qpid.brokertest import * +from qpid import compat, session +from qpid.util import connect +from qpid.connection import Connection +from qpid.datatypes import Message, uuid4 +from qpid.queue import Empty + +class StoreTests(BrokerTest): + + XA_RBROLLBACK = 1 + XA_RBTIMEOUT = 2 + XA_OK = 0 + tx_counter = 0 + + def configure(self, config): + self.config = config + self.defines = self.config.defines + BrokerTest.configure(self, config) + + def setup_connection(self): + socket = connect(self._broker.host(), self._broker.port()) + return Connection(sock=socket) + + def setup_session(self): + self.conn.start() + return self.conn.session(str(uuid4())) + + def start_session(self): + self.conn = self.setup_connection() + self.ssn = self.setup_session() + + def setUp(self): + BrokerTest.setUp(self) + self._broker = self.broker() + self.start_session() + + def cycle_broker(self): + # tearDown resets working dir; change it back after. + d = os.getcwd() + BrokerTest.tearDown(self) + os.chdir(d) + self._broker = None + self._broker = self.broker() + self.conn = self.setup_connection() + self.ssn = self.setup_session() + + def xid(self, txid): + StoreTests.tx_counter += 1 + branchqual = "v%s" % StoreTests.tx_counter + return self.ssn.xid(format=0, global_id=txid, branch_id=branchqual) + + def testDurableExchange(self): + try: + self.ssn.exchange_delete(exchange="DE1") + except: + # restart the session busted from the exception + self.start_session() + + self.ssn.exchange_declare(exchange="DE1", type="direct", durable=True) + response = self.ssn.exchange_query(name="DE1") + self.assert_(response.durable) + self.assert_(not response.not_found) + + # Cycle the broker and make sure the exchange recovers + self.cycle_broker() + response = self.ssn.exchange_query(name="DE1") + self.assert_(response.durable) + self.assert_(not response.not_found) + + self.ssn.exchange_delete(exchange="DE1") + + def testDurableQueues(self): + try: + self.ssn.queue_delete(queue="DQ1") + except: + self.start_session() + + self.ssn.queue_declare(queue="DQ1", durable=True) + response = self.ssn.queue_query(queue="DQ1") + self.assertEqual("DQ1", response.queue) + self.assert_(response.durable) + + # Cycle the broker and make sure the queue recovers + self.cycle_broker() + response = self.ssn.queue_query(queue="DQ1") + self.assertEqual("DQ1", response.queue) + self.assert_(response.durable) + + self.ssn.queue_delete(queue="DQ1") + + def testDurableBindings(self): + try: + self.ssn.exchange_unbind(queue="DB_Q1", exchange="DB_E1", binding_key="K1") + except: + self.start_session() + try: + self.ssn.exchange_delete(exchange="DB_E1") + except: + self.start_session() + try: + self.ssn.queue_delete(queue="DB_Q1") + except: + self.start_session() + + self.ssn.queue_declare(queue="DB_Q1", durable=True) + self.ssn.exchange_declare(exchange="DB_E1", type="direct", durable=True) + self.ssn.exchange_bind(exchange="DB_E1", queue="DB_Q1", binding_key="K1") + + # Cycle the broker and make sure the binding recovers + self.cycle_broker() + response = self.ssn.exchange_bound(exchange="DB_E1", queue="DB_Q1", binding_key="K1") + self.assert_(not response.exchange_not_found) + self.assert_(not response.queue_not_found) + self.assert_(not response.queue_not_matched) + self.assert_(not response.key_not_matched) + + self.ssn.exchange_unbind(queue="DB_Q1", exchange="DB_E1", binding_key="K1") + self.ssn.exchange_delete(exchange="DB_E1") + self.ssn.queue_delete(queue="DB_Q1") + + def testDtxRecoverPrepared(self): + try: + self.ssn.exchange_unbind(queue="Dtx_Q", exchange="Dtx_E", binding_key="Dtx") + except: + self.start_session() + try: + self.ssn.exchange_delete(exchange="Dtx_E") + except: + self.start_session() + try: + self.ssn.queue_delete(queue="Dtx_Q") + except: + self.start_session() + + self.ssn.queue_declare(queue="Dtx_Q", auto_delete=False, durable=True) + self.ssn.exchange_declare(exchange="Dtx_E", type="direct", durable=True) + self.ssn.exchange_bind(exchange="Dtx_E", queue="Dtx_Q", binding_key="Dtx") + txid = self.xid("DtxRecoverPrepared") + self.ssn.dtx_select() + self.ssn.dtx_start(xid=txid) + # 2 = delivery_mode.persistent + dp = self.ssn.delivery_properties(routing_key="Dtx_Q", delivery_mode=2) + self.ssn.message_transfer(message=Message(dp, "transactional message")) + self.ssn.dtx_end(xid=txid) + self.assertEqual(self.XA_OK, self.ssn.dtx_prepare(xid=txid).status) + # Cycle the broker and make sure the xid is there, the message is not + # queued. + self.cycle_broker() + # The txid should be recovered and in doubt + xids = self.ssn.dtx_recover().in_doubt + xid_matched = False + for x in xids: + self.assertEqual(txid.format, x.format) + self.assertEqual(txid.global_id, x.global_id) + self.assertEqual(txid.branch_id, x.branch_id) + xid_matched = True + self.assert_(xid_matched) + self.ssn.message_subscribe(destination="dtx_msgs", queue="Dtx_Q", accept_mode=1, acquire_mode=0) + self.ssn.message_flow(unit = 1, value = 0xFFFFFFFFL, destination = "dtx_msgs") + self.ssn.message_flow(unit = 0, value = 10, destination = "dtx_msgs") + message_arrivals = self.ssn.incoming("dtx_msgs") + try: + message_arrivals.get(timeout=1) + assert False, 'Message present in queue before commit' + except Empty: pass + self.ssn.dtx_select() + self.assertEqual(self.XA_OK, self.ssn.dtx_commit(xid=txid, one_phase=False).status) + try: + msg = message_arrivals.get(timeout=1) + self.assertEqual("transactional message", msg.body) + except Empty: + assert False, 'Message should be present after dtx commit but is not' + + self.ssn.exchange_unbind(queue="Dtx_Q", exchange="Dtx_E", binding_key="Dtx") + self.ssn.exchange_delete(exchange="Dtx_E") + self.ssn.queue_delete(queue="Dtx_Q") --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org