This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 7dc31d20eb917678c8fedbd5cd487c5c4f5f6c50 Author: Ted Ross <tr...@redhat.com> AuthorDate: Thu Oct 31 14:03:02 2019 -0400 DISPATCH-1409 - Added a test for sender-link credit. --- tests/system_tests_stuck_deliveries.py | 116 +++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/tests/system_tests_stuck_deliveries.py b/tests/system_tests_stuck_deliveries.py index c81c001..7758126 100644 --- a/tests/system_tests_stuck_deliveries.py +++ b/tests/system_tests_stuck_deliveries.py @@ -158,6 +158,11 @@ class RouterTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_10_sender_link_credit_test(self): + test = TxLinkCreditTest(self.routers[0].addresses[0]) + test.run() + self.assertEqual(None, test.error) + class Timeout(object): def __init__(self, parent): @@ -406,5 +411,116 @@ class RxLinkCreditTest(MessagingHandler): Container(self).run() +class TxLinkCreditTest(MessagingHandler): + def __init__(self, host): + super(TxLinkCreditTest, self).__init__() + self.host = host + + self.sender_conn = None + self.query_conn = None + self.addr = "rx/link/credit/test" + self.credit_issued = 0 + self.error = None + + self.stages = ['Setup', 'LinkBlocked', 'LinkUnblocked', '250Credits'] + self.stage = 0 + + def timeout(self): + self.error = "Timeout Expired - stage: %s" % self.stages[self.stage] + self.sender_conn.close() + self.query_conn.close() + if self.poll_timer: + self.poll_timer.cancel() + + def fail(self, error): + self.error = error + self.sender_conn.close() + self.query_conn.close() + if self.poll_timer: + self.poll_timer.cancel() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(30.0, Timeout(self)) + self.poll_timer = None + self.sender_conn = event.container.connect(self.host) + self.query_conn = event.container.connect(self.host) + self.reply_receiver = event.container.create_receiver(self.query_conn, None, dynamic=True) + self.query_sender = event.container.create_sender(self.query_conn, "$management") + self.sender = None + self.receiver = None + + def on_link_opened(self, event): + if event.receiver == self.reply_receiver: + self.reply_addr = event.receiver.remote_source.address + self.proxy = MgmtMsgProxy(self.reply_addr) + self.sender = event.container.create_sender(self.sender_conn, self.addr) + elif event.sender == self.sender: + self.stage = 1 + self.process() + + def process(self): + if self.stage == 1: + # + # LinkBlocked + # + msg = self.proxy.query_router() + self.query_sender.send(msg) + + elif self.stage == 2: + # + # LinkUnblocked + # + msg = self.proxy.query_router() + self.query_sender.send(msg) + + elif self.stage == 3: + # + # 250Credits + # + msg = self.proxy.query_links() + self.query_sender.send(msg) + + def on_message(self, event): + if event.receiver == self.reply_receiver: + response = self.proxy.response(event.message) + if self.stage == 1: + # + # LinkBlocked + # + if response.results[0].linksBlocked == 1: + self.receiver = event.container.create_receiver(self.sender_conn, self.addr); + self.stage = 2 + self.process() + return + + elif self.stage == 2: + # + # LinkUnblocked + # + if response.results[0].linksBlocked == 0: + self.stage = 3 + self.process() + return + + elif self.stage == 3: + # + # 250Credits + # + for link in response.results: + if 'M0' + self.addr == link.owningAddr: + if link.creditAvailable == 250: + self.fail(None) + return + + self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self)) + + def poll_timeout(self): + self.process() + + def run(self): + Container(self).run() + + if __name__== '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org