Repository: qpid-dispatch Updated Branches: refs/heads/master 8701feb7f -> 58567f979
DISPATCH-1197 - Added system test to make sure that streaming deliveries are handled without stalling when receiver goes away causing messages to be released Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/58567f97 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/58567f97 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/58567f97 Branch: refs/heads/master Commit: 58567f979deef3ecc4d797610f92539396de5f40 Parents: 8701feb Author: Ganesh Murthy <gmur...@redhat.com> Authored: Thu Dec 6 16:52:25 2018 -0500 Committer: Ganesh Murthy <gmur...@redhat.com> Committed: Thu Dec 6 16:52:25 2018 -0500 ---------------------------------------------------------------------- tests/system_tests_one_router.py | 92 +++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/58567f97/tests/system_tests_one_router.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index dd7cd31..fbddc49 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -422,6 +422,11 @@ class OneRouterTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_42_unsettled_large_message_test(self): + test = UnsettledLargeMessageTest(self.address, 250) + test.run() + self.assertEqual(None, test.error) + class Entity(object): def __init__(self, status_code, status_description, attrs): @@ -2856,5 +2861,92 @@ class RejectDispositionTest(MessagingHandler): Container(self).run() +class UnsettledLargeMessageTest(MessagingHandler): + def __init__(self, addr, n_messages): + super (UnsettledLargeMessageTest, self).__init__() + self.addr = addr + self.n_messages = n_messages + self.sender = None + self.receiver = None + self.sender_conn = None + self.recv_conn = None + self.n_sent = 0 + self.n_received = 0 + self.error = None + self.test_timer = None + self.max_receive = 1 + self.custom_timer = None + self.timer = None + self.n_accepted = 0 + self.n_modified = 0 + self.n_released = 0 + self.str1 = "0123456789abcdef" + self.msg_str = "" + for i in range(16384): + self.msg_str += self.str1 + + def run (self): + Container(self).run() + + def check_if_done(self): + # self.n_accepted + self.n_modified + self.n_released will never + # equal self.n_messages without the fix for DISPATCH-1197 because + # the router will never pull the data from the proton buffers once + # the router hits q2_holdoff + if self.n_accepted + self.n_modified + \ + self.n_released == self.n_messages: + self.timer.cancel() + self.sender_conn.close() + + def timeout(self): + self.error = "Timeout Expired: sent=%d accepted=%d " \ + "released=%d modified=%d" % (self.n_messages, + self.n_accepted, + self.n_released, + self.n_modified) + + def on_start (self, event): + self.sender_conn = event.container.connect(self.addr) + self.recv_conn = event.container.connect(self.addr) + self.receiver = event.container.create_receiver(self.recv_conn, + "test_42") + self.sender = event.container.create_sender(self.sender_conn, + "test_42") + self.timer = event.reactor.schedule(15, Timeout(self)) + + def on_accepted(self, event): + self.n_accepted += 1 + + def on_released(self, event): + if event.delivery.remote_state == Delivery.MODIFIED: + self.n_modified += 1 + else: + self.n_released += 1 + + self.check_if_done() + + def on_sendable(self, event): + while self.n_sent < self.n_messages: + msg = Message(id=(self.n_sent + 1), + body={'sequence': (self.n_sent + 1), + 'msg_str': self.msg_str}) + # Presettle the delivery. + self.sender.send (msg) + self.n_sent += 1 + + def on_message(self, event): + self.n_received += 1 + if self.n_received == self.max_receive: + # Close the receiver connection after receiving just one message + # This will cause the release of multi-frame deliveries. + # Meanwhile the sender will keep sending but will run into + # the q2_holodd situation and never recover. + # The sender link will be stalled + # This test will NEVER pass without the fix to DISPATCH-1197 + # Receiver bails after receiving max_receive messages. + self.receiver.close() + self.recv_conn.close() + + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org