This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 1837ece08add3cfd1af377be4dfeb3203dc32339 Author: Ted Ross <tr...@redhat.com> AuthorDate: Thu Oct 31 12:17:21 2019 -0400 DISPATCH-1409 - Added test case, fixed accounting bug found by the test case. --- src/router_core/connections.c | 10 ++- tests/system_tests_stuck_deliveries.py | 136 +++++++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 4 deletions(-) diff --git a/src/router_core/connections.c b/src/router_core/connections.c index a9229d1..984a7c2 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -233,17 +233,19 @@ void qdr_record_link_credit(qdr_core_t *core, qdr_link_t *link) if (link->credit_reported > 0 && pn_credit == 0) { // // The link has transitioned from positive credit to zero credit. - // Mark it as eligible for logging and record the time. // - link->reported_as_blocked = false; link->zero_credit_time = core->uptime_ticks; - core->links_blocked--; - } else if (link->credit_reported == 0 && pn_credit > 0) + } else if (link->credit_reported == 0 && pn_credit > 0) { // // The link has transitioned from zero credit to positive credit. // Clear the recorded time. // link->zero_credit_time = 0; + if (link->reported_as_blocked) { + link->reported_as_blocked = false; + core->links_blocked--; + } + } link->credit_reported = pn_credit; } diff --git a/tests/system_tests_stuck_deliveries.py b/tests/system_tests_stuck_deliveries.py index c4f8b21..c81c001 100644 --- a/tests/system_tests_stuck_deliveries.py +++ b/tests/system_tests_stuck_deliveries.py @@ -153,6 +153,11 @@ class RouterTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_09_receiver_link_credit_test(self): + test = RxLinkCreditTest(self.routers[0].addresses[0]) + test.run() + self.assertEqual(None, test.error) + class Timeout(object): def __init__(self, parent): @@ -270,5 +275,136 @@ class DelayedSettlementTest(MessagingHandler): Container(self).run() +class RxLinkCreditTest(MessagingHandler): + def __init__(self, host): + super(RxLinkCreditTest, self).__init__(prefetch = 0) + self.host = host + + self.receiver_conn = None + self.query_conn = None + self.addr = "rx/link/credit/test" + self.credit_issued = 0 + self.error = None + + self.stages = ['Setup', 'LinkBlocked', 'LinkUnblocked', '10Credits', '20Credits'] + self.stage = 0 + + def timeout(self): + self.error = "Timeout Expired - stage: %s" % self.stages[self.stage] + self.receiver_conn.close() + self.query_conn.close() + if self.poll_timer: + self.poll_timer.cancel() + + def fail(self, error): + self.error = error + self.receiver_conn.close() + self.query_conn.close() + if self.poll_timer: + self.poll_timer.cancel() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(30.0, Timeout(self)) + self.poll_timer = None + self.receiver_conn = event.container.connect(self.host) + self.query_conn = event.container.connect(self.host) + self.reply_receiver = event.container.create_receiver(self.query_conn, None, dynamic=True) + self.query_sender = event.container.create_sender(self.query_conn, "$management") + self.receiver = None + + def on_link_opened(self, event): + if event.receiver == self.reply_receiver: + self.reply_addr = event.receiver.remote_source.address + self.proxy = MgmtMsgProxy(self.reply_addr) + self.receiver = event.container.create_receiver(self.receiver_conn, self.addr) + self.reply_receiver.flow(1) + elif event.receiver == self.receiver: + self.stage = 1 + self.process() + + def process(self): + if self.stage == 1: + # + # LinkBlocked + # + msg = self.proxy.query_router() + self.query_sender.send(msg) + + elif self.stage == 2: + # + # LinkUnblocked + # + msg = self.proxy.query_router() + self.query_sender.send(msg) + + elif self.stage == 3: + # + # 10Credits + # + msg = self.proxy.query_links() + self.query_sender.send(msg) + + elif self.stage == 4: + # + # 20Credits + # + msg = self.proxy.query_links() + self.query_sender.send(msg) + + def on_message(self, event): + if event.receiver == self.reply_receiver: + response = self.proxy.response(event.message) + self.reply_receiver.flow(1) + if self.stage == 1: + # + # LinkBlocked + # + if response.results[0].linksBlocked == 1: + self.receiver.flow(10) + self.stage = 2 + self.process() + return + + elif self.stage == 2: + # + # LinkUnblocked + # + if response.results[0].linksBlocked == 0: + self.stage = 3 + self.process() + return + + elif self.stage == 3: + # + # 10Credits + # + for link in response.results: + if 'M0' + self.addr == link.owningAddr: + if link.creditAvailable == 10: + self.receiver.flow(10) + self.stage = 4 + self.process() + return + + elif self.stage == 4: + # + # 20Credits + # + for link in response.results: + if 'M0' + self.addr == link.owningAddr: + if link.creditAvailable == 20: + self.fail(None) + return + + self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self)) + + def poll_timeout(self): + self.process() + + def run(self): + Container(self).run() + + if __name__== '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org