This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new 6d03790 DISPATCH-2319: Modified test to verify closed receiver before
sending second batch of presettled messages
6d03790 is described below
commit 6d03790f6a47032bff4f751d57b2035349056b7c
Author: Ganesh Murthy <[email protected]>
AuthorDate: Tue Feb 1 15:15:41 2022 -0500
DISPATCH-2319: Modified test to verify closed receiver before sending
second batch of presettled messages
---
tests/system_tests_one_router.py | 119 ++++++++++++++++++++++++++-------------
1 file changed, 80 insertions(+), 39 deletions(-)
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index 8da2ebc..ac6dcf1 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -659,10 +659,9 @@ class OneRouterTest(TestCase):
def test_43_dropped_presettled_receiver_stops(self):
local_node = Node.connect(self.address, timeout=TIMEOUT)
res = local_node.query('org.apache.qpid.dispatch.router')
- deliveries_ingress = res.attribute_names.index('deliveriesIngress')
- presettled_dropped_count =
res.attribute_names.index('droppedPresettledDeliveries')
- ingress_delivery_count = res.results[0][deliveries_ingress]
- test = DroppedPresettledTest(self.address, 200,
ingress_delivery_count, presettled_dropped_count)
+ presettled_dropped_count_index =
res.attribute_names.index('droppedPresettledDeliveries')
+ presettled_dropped_count =
res.results[0][presettled_dropped_count_index]
+ test = DroppedPresettledTest(self.address, 200,
presettled_dropped_count)
test.run()
self.assertIsNone(test.error)
@@ -1462,7 +1461,39 @@ class PreSettled (MessagingHandler) :
self.bail(None)
-class PresettledCustomTimeout:
+class SendPresettledAfterReceiverCloses(object):
+ def __init__(self, parent):
+ self.parent = parent
+ self.num_tries = 0
+
+ def on_timer_task(self, event):
+ self.num_tries += 1
+ local_node = Node.connect(self.parent.addr, timeout=TIMEOUT)
+ res = local_node.query('org.apache.qpid.dispatch.router.link')
+ owning_addr_index = res.attribute_names.index('owningAddr')
+ has_address = False
+ for out in res.results:
+ owning_addr = out[owning_addr_index]
+ # Check if the receiver's address is present in the router's
address table.
+ # If the address is still there, try one more time until
self.parent.max_tries.
+ if self.parent.addr in owning_addr:
+ has_address = True
+ break
+
+ if has_address:
+ if self.num_tries == self.parent.max_tries:
+ self.parent.bail("Address %s is still in routing table" %
owning_addr)
+ else:
+ self.parent.schedule_send_timer()
+ else:
+ # Address is not there in the address table anymore.
+ # Send the remaining messages. These presettled messages must be
+ # dropped by the router which we will verify using the router's
+ # droppedPresettledDeliveries
+ self.parent.send_remaining()
+
+
+class PresettledCustomTimeout(object):
def __init__(self, parent):
self.parent = parent
self.num_tries = 0
@@ -1471,34 +1502,36 @@ class PresettledCustomTimeout:
self.num_tries += 1
local_node = Node.connect(self.parent.addr, timeout=TIMEOUT)
res = local_node.query('org.apache.qpid.dispatch.router')
- deliveries_ingress = res.attribute_names.index(
- 'deliveriesIngress')
- presettled_deliveries_dropped = res.attribute_names.index(
- 'droppedPresettledDeliveries')
- ingress_delivery_count = res.results[0][deliveries_ingress]
- self.parent.cancel_custom()
+ presettled_deliveries_dropped_index =
res.attribute_names.index('droppedPresettledDeliveries')
+ presettled_dropped_count =
res.results[0][presettled_deliveries_dropped_index]
- deliveries_dropped_diff = presettled_deliveries_dropped -
self.parent.begin_dropped_presettled_count
+ deliveries_dropped_diff = presettled_dropped_count -
self.parent.begin_dropped_presettled_count
# Without the fix for DISPATCH-1213 the ingress count will be less
than
# 200 because the sender link has stalled. The q2_holdoff happened
# and so all the remaining messages are still in the
# proton buffers.
- deliveries_ingress_diff = ingress_delivery_count -
self.parent.begin_ingress_count
- if deliveries_ingress_diff + deliveries_dropped_diff >
self.parent.n_messages:
+ if deliveries_dropped_diff == self.parent.n_messages -
self.parent.max_receive:
self.parent.bail(None)
else:
if self.num_tries == self.parent.max_tries:
self.parent.bail("Messages sent to the router is %d, "
- "Messages processed by the router is %d" %
+ "Messages dropped by the router is %d" %
(self.parent.n_messages,
- deliveries_ingress_diff +
deliveries_dropped_diff))
+ deliveries_dropped_diff))
else:
- self.parent.schedule_timer()
+ self.parent.schedule_custom_timer()
class DroppedPresettledTest(MessagingHandler):
- def __init__(self, addr, n_messages, begin_ingress_count,
begin_dropped_presettled_count):
+ """
+ First send 10 large messages and a receiver receives them all and exits.
+ These first 10 messages are presettled messages and the router network did
not
+ drop them.
+ Now send an additional 190 messages and make sure they are dropped by
checking
+ the droppedPresettledCount
+ """
+ def __init__(self, addr, n_messages, begin_dropped_presettled_count):
super(DroppedPresettledTest, self).__init__()
self.addr = addr
self.n_messages = n_messages
@@ -1512,19 +1545,21 @@ class DroppedPresettledTest(MessagingHandler):
self.test_timer = None
self.max_receive = 10
self.custom_timer = None
+ self.send_timer = None
self.timer = None
self.begin_dropped_presettled_count = begin_dropped_presettled_count
- self.begin_ingress_count = begin_ingress_count
self.str1 = "0123456789abcdef"
self.msg_str = ""
self.max_tries = 10
self.reactor = None
for i in range(8192):
self.msg_str += self.str1
- self.timer_instance = PresettledCustomTimeout(self)
- def schedule_timer(self):
- self.custom_timer = self.reactor.schedule(0.5, self.timer_instance)
+ def schedule_custom_timer(self):
+ self.custom_timer = self.reactor.schedule(0.5,
PresettledCustomTimeout(self))
+
+ def schedule_send_timer(self):
+ self.send_timer = self.reactor.schedule(0.5,
SendPresettledAfterReceiverCloses(self))
def run(self):
Container(self).run()
@@ -1535,7 +1570,10 @@ class DroppedPresettledTest(MessagingHandler):
if self.recv_conn:
self.recv_conn.close()
self.timer.cancel()
- self.custom_timer.cancel()
+ if self.custom_timer:
+ self.custom_timer.cancel()
+ if self.send_timer:
+ self.send_timer.cancel()
def timeout(self,):
self.bail("Timeout Expired: %d messages received, %d expected." %
@@ -1551,18 +1589,24 @@ class DroppedPresettledTest(MessagingHandler):
"test_43")
self.timer = event.reactor.schedule(TIMEOUT, TestTimeout(self))
- def cancel_custom(self):
- self.custom_timer.cancel()
+ def send(self):
+ msg = Message(id=(self.n_sent + 1),
+ body={'sequence': (self.n_sent + 1),
+ 'msg_str': self.msg_str})
+ # Presettle the delivery.
+ dlv = self.sender.send(msg)
+ dlv.settle()
+ self.n_sent += 1
- def on_sendable(self, event):
+ def send_remaining(self):
while self.n_sent < self.n_messages:
- msg = Message(id=(self.n_sent + 1),
- body={'sequence': (self.n_sent + 1),
- 'msg_str': self.msg_str})
- # Presettle the delivery.
- dlv = self.sender.send(msg)
- dlv.settle()
- self.n_sent += 1
+ self.send()
+ self.schedule_custom_timer()
+
+ def on_sendable(self, event):
+ # Send only self.max_receive messages.
+ while self.n_sent < self.max_receive:
+ self.send()
def on_message(self, event):
self.n_received += 1
@@ -1570,12 +1614,9 @@ class DroppedPresettledTest(MessagingHandler):
# Receiver bails after receiving max_receive messages.
self.receiver.close()
self.recv_conn.close()
-
- # The sender is only sending 200 large messages which is less
- # that the initial credit of 250 that the router gives.
- # Lets do a qdstat to find out if all 200 messages is handled
- # by the router.
- self.schedule_timer()
+ # When self.max_receive messages have been received and the
receiver has been closed
+ # we try to check to see if the receiver's address is gone from
the address table.
+ self.schedule_send_timer()
class MulticastUnsettled (MessagingHandler) :
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]