This is an automated email from the ASF dual-hosted git repository. gmurthy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new b7918db DISPATCH-2329: Explicity check for released counts instead of delivery counts of deliveries that are being released b7918db is described below commit b7918db600decd9b610f8401ff306261674cfdb8 Author: Ganesh Murthy <gmur...@apache.org> AuthorDate: Mon Feb 14 11:56:42 2022 -0500 DISPATCH-2329: Explicity check for released counts instead of delivery counts of deliveries that are being released --- tests/system_tests_delivery_counts.py | 50 ++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 12 deletions(-) diff --git a/tests/system_tests_delivery_counts.py b/tests/system_tests_delivery_counts.py index 81917af..41be214 100644 --- a/tests/system_tests_delivery_counts.py +++ b/tests/system_tests_delivery_counts.py @@ -980,12 +980,18 @@ class OneRouterLinkCountersTest(TestCase): if rx_limit is None else rx_limit self.sent = 0 + self.received = 0 self.timer = 0 self.poll_timer = None self.conn = None self.sender_stats = None self.receiver_stats = None self.large_message = large_message + self.reactor = None + self.sender = None + self.receiver = None + self.max_attempts = 10 + self.num_attempts = 0 def timeout(self): self._cleanup() @@ -1001,24 +1007,40 @@ class OneRouterLinkCountersTest(TestCase): self.timer.cancel() self.timer = None + def _get_sender_receiver_stats(self): + self.receiver_stats = get_link_info("Rx_Test01", self.router_addr) + self.sender_stats = get_link_info("Tx_Test01", self.router_addr) + def poll_timeout(self): """ - Periodically check the deliveryCount on the receiver. Once it - reaches rx_limit the test is complete: gather link statistics - before closing the clients + Periodically check the deliveryCount or the releasedCount on the receiver and sender. """ - li = get_link_info("Rx_Test01", self.router_addr) - if li and li['deliveryCount'] == self.rx_limit: - self.receiver_stats = li - self.sender_stats = get_link_info("Tx_Test01", self.router_addr) + restart_poll_timer = True + self._get_sender_receiver_stats() + if self.receiver_stats and self.outcome == Delivery.RELEASED and \ + self.receiver_stats['releasedCount'] == self.rx_limit: + if self.sender_stats and self.sender_stats['releasedCount'] == self.rx_limit: + # We do not want to check just the deliveryCount here. The deliveryCount gets + # updated much earlier than the releasedCount. We will check the releasedCount instead. + # Check the releasedCount on the sender and the receiver. This is because it takes time + # to propagate the outcome back to the sender the sender's outcomes would lag behind the receivers + restart_poll_timer = False + self._cleanup() + elif self.receiver_stats and self.receiver_stats['deliveryCount'] == self.rx_limit: + restart_poll_timer = False self._cleanup() - else: - self.poll_timer = self.reactor.schedule(0.5, PollTimeout(self)) + + if restart_poll_timer: + self.num_attempts += 1 + if self.num_attempts == self.max_attempts: + # There is something wrong, fail the test. + self.timeout() + else: + self.poll_timer = self.reactor.schedule(0.5, PollTimeout(self)) def on_start(self, event): self.reactor = event.reactor self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) - self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self)) self.conn = event.container.connect(self.router_addr) self.receiver = event.container.create_receiver(self.conn, source="Test01", @@ -1029,7 +1051,7 @@ class OneRouterLinkCountersTest(TestCase): name="Tx_Test01") def on_sendable(self, event): - if self.sent < self.count: + while self.sent < self.count: if self.large_message: dlv = self.sender.send(Message(body=LARGE_PAYLOAD)) else: @@ -1039,10 +1061,14 @@ class OneRouterLinkCountersTest(TestCase): self.sent += 1 def on_message(self, event): + self.received += 1 if self.outcome: event.delivery.update(self.outcome) event.delivery.settle() - # otherwise just drop it + + # Start up a poll timer once all the deliveries have been sent/received. + if self.received == self.rx_limit and self.sent == self.count: + self.poll_timer = event.reactor.schedule(0.5, PollTimeout(self)) def run(self): Container(self).run() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org