This is an automated email from the ASF dual-hosted git repository. gmurthy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 6d03790 DISPATCH-2319: Modified test to verify closed receiver before sending second batch of presettled messages 6d03790 is described below commit 6d03790f6a47032bff4f751d57b2035349056b7c Author: Ganesh Murthy <gmur...@apache.org> AuthorDate: Tue Feb 1 15:15:41 2022 -0500 DISPATCH-2319: Modified test to verify closed receiver before sending second batch of presettled messages --- tests/system_tests_one_router.py | 119 ++++++++++++++++++++++++++------------- 1 file changed, 80 insertions(+), 39 deletions(-) diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index 8da2ebc..ac6dcf1 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -659,10 +659,9 @@ class OneRouterTest(TestCase): def test_43_dropped_presettled_receiver_stops(self): local_node = Node.connect(self.address, timeout=TIMEOUT) res = local_node.query('org.apache.qpid.dispatch.router') - deliveries_ingress = res.attribute_names.index('deliveriesIngress') - presettled_dropped_count = res.attribute_names.index('droppedPresettledDeliveries') - ingress_delivery_count = res.results[0][deliveries_ingress] - test = DroppedPresettledTest(self.address, 200, ingress_delivery_count, presettled_dropped_count) + presettled_dropped_count_index = res.attribute_names.index('droppedPresettledDeliveries') + presettled_dropped_count = res.results[0][presettled_dropped_count_index] + test = DroppedPresettledTest(self.address, 200, presettled_dropped_count) test.run() self.assertIsNone(test.error) @@ -1462,7 +1461,39 @@ class PreSettled (MessagingHandler) : self.bail(None) -class PresettledCustomTimeout: +class SendPresettledAfterReceiverCloses(object): + def __init__(self, parent): + self.parent = parent + self.num_tries = 0 + + def on_timer_task(self, event): + self.num_tries += 1 + local_node = Node.connect(self.parent.addr, timeout=TIMEOUT) + res = local_node.query('org.apache.qpid.dispatch.router.link') + owning_addr_index = res.attribute_names.index('owningAddr') + has_address = False + for out in res.results: + owning_addr = out[owning_addr_index] + # Check if the receiver's address is present in the router's address table. + # If the address is still there, try one more time until self.parent.max_tries. + if self.parent.addr in owning_addr: + has_address = True + break + + if has_address: + if self.num_tries == self.parent.max_tries: + self.parent.bail("Address %s is still in routing table" % owning_addr) + else: + self.parent.schedule_send_timer() + else: + # Address is not there in the address table anymore. + # Send the remaining messages. These presettled messages must be + # dropped by the router which we will verify using the router's + # droppedPresettledDeliveries + self.parent.send_remaining() + + +class PresettledCustomTimeout(object): def __init__(self, parent): self.parent = parent self.num_tries = 0 @@ -1471,34 +1502,36 @@ class PresettledCustomTimeout: self.num_tries += 1 local_node = Node.connect(self.parent.addr, timeout=TIMEOUT) res = local_node.query('org.apache.qpid.dispatch.router') - deliveries_ingress = res.attribute_names.index( - 'deliveriesIngress') - presettled_deliveries_dropped = res.attribute_names.index( - 'droppedPresettledDeliveries') - ingress_delivery_count = res.results[0][deliveries_ingress] - self.parent.cancel_custom() + presettled_deliveries_dropped_index = res.attribute_names.index('droppedPresettledDeliveries') + presettled_dropped_count = res.results[0][presettled_deliveries_dropped_index] - deliveries_dropped_diff = presettled_deliveries_dropped - self.parent.begin_dropped_presettled_count + deliveries_dropped_diff = presettled_dropped_count - self.parent.begin_dropped_presettled_count # Without the fix for DISPATCH-1213 the ingress count will be less than # 200 because the sender link has stalled. The q2_holdoff happened # and so all the remaining messages are still in the # proton buffers. - deliveries_ingress_diff = ingress_delivery_count - self.parent.begin_ingress_count - if deliveries_ingress_diff + deliveries_dropped_diff > self.parent.n_messages: + if deliveries_dropped_diff == self.parent.n_messages - self.parent.max_receive: self.parent.bail(None) else: if self.num_tries == self.parent.max_tries: self.parent.bail("Messages sent to the router is %d, " - "Messages processed by the router is %d" % + "Messages dropped by the router is %d" % (self.parent.n_messages, - deliveries_ingress_diff + deliveries_dropped_diff)) + deliveries_dropped_diff)) else: - self.parent.schedule_timer() + self.parent.schedule_custom_timer() class DroppedPresettledTest(MessagingHandler): - def __init__(self, addr, n_messages, begin_ingress_count, begin_dropped_presettled_count): + """ + First send 10 large messages and a receiver receives them all and exits. + These first 10 messages are presettled messages and the router network did not + drop them. + Now send an additional 190 messages and make sure they are dropped by checking + the droppedPresettledCount + """ + def __init__(self, addr, n_messages, begin_dropped_presettled_count): super(DroppedPresettledTest, self).__init__() self.addr = addr self.n_messages = n_messages @@ -1512,19 +1545,21 @@ class DroppedPresettledTest(MessagingHandler): self.test_timer = None self.max_receive = 10 self.custom_timer = None + self.send_timer = None self.timer = None self.begin_dropped_presettled_count = begin_dropped_presettled_count - self.begin_ingress_count = begin_ingress_count self.str1 = "0123456789abcdef" self.msg_str = "" self.max_tries = 10 self.reactor = None for i in range(8192): self.msg_str += self.str1 - self.timer_instance = PresettledCustomTimeout(self) - def schedule_timer(self): - self.custom_timer = self.reactor.schedule(0.5, self.timer_instance) + def schedule_custom_timer(self): + self.custom_timer = self.reactor.schedule(0.5, PresettledCustomTimeout(self)) + + def schedule_send_timer(self): + self.send_timer = self.reactor.schedule(0.5, SendPresettledAfterReceiverCloses(self)) def run(self): Container(self).run() @@ -1535,7 +1570,10 @@ class DroppedPresettledTest(MessagingHandler): if self.recv_conn: self.recv_conn.close() self.timer.cancel() - self.custom_timer.cancel() + if self.custom_timer: + self.custom_timer.cancel() + if self.send_timer: + self.send_timer.cancel() def timeout(self,): self.bail("Timeout Expired: %d messages received, %d expected." % @@ -1551,18 +1589,24 @@ class DroppedPresettledTest(MessagingHandler): "test_43") self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self)) - def cancel_custom(self): - self.custom_timer.cancel() + def send(self): + msg = Message(id=(self.n_sent + 1), + body={'sequence': (self.n_sent + 1), + 'msg_str': self.msg_str}) + # Presettle the delivery. + dlv = self.sender.send(msg) + dlv.settle() + self.n_sent += 1 - def on_sendable(self, event): + def send_remaining(self): while self.n_sent < self.n_messages: - msg = Message(id=(self.n_sent + 1), - body={'sequence': (self.n_sent + 1), - 'msg_str': self.msg_str}) - # Presettle the delivery. - dlv = self.sender.send(msg) - dlv.settle() - self.n_sent += 1 + self.send() + self.schedule_custom_timer() + + def on_sendable(self, event): + # Send only self.max_receive messages. + while self.n_sent < self.max_receive: + self.send() def on_message(self, event): self.n_received += 1 @@ -1570,12 +1614,9 @@ class DroppedPresettledTest(MessagingHandler): # Receiver bails after receiving max_receive messages. self.receiver.close() self.recv_conn.close() - - # The sender is only sending 200 large messages which is less - # that the initial credit of 250 that the router gives. - # Lets do a qdstat to find out if all 200 messages is handled - # by the router. - self.schedule_timer() + # When self.max_receive messages have been received and the receiver has been closed + # we try to check to see if the receiver's address is gone from the address table. + self.schedule_send_timer() class MulticastUnsettled (MessagingHandler) : --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org