Repository: qpid-dispatch Updated Branches: refs/heads/master 0f8fb609e -> 5d7304f2c
DISPATCH-1045 - Release the delivery only after the entire message has been received Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/5d7304f2 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/5d7304f2 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/5d7304f2 Branch: refs/heads/master Commit: 5d7304f2c4caf206c9304fcbd22be0a972116e3d Parents: 0f8fb60 Author: Ganesh Murthy <gmur...@redhat.com> Authored: Fri Jun 22 16:01:51 2018 -0400 Committer: Ganesh Murthy <gmur...@redhat.com> Committed: Thu Jun 28 09:22:49 2018 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/router_core.h | 1 + src/router_core/transfer.c | 41 ++++++++++++++++++----- src/router_node.c | 57 +++++++++++++++++++++++--------- tests/system_tests_one_router.py | 51 ++++++++++++++++++++++++++-- 4 files changed, 124 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5d7304f2/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 17d3002..e0a1f41 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -660,6 +660,7 @@ bool qdr_delivery_send_complete(const qdr_delivery_t *delivery); bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery); void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent); bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery); +uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery); void qdr_delivery_set_aborted(const qdr_delivery_t *delivery, bool aborted); bool qdr_delivery_is_aborted(const qdr_delivery_t *delivery); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5d7304f2/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 7729a8e..5ee4ae3 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -58,6 +58,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterato dlv->link_exclusion = link_exclusion; dlv->ingress_index = ingress_index; dlv->error = 0; + dlv->disposition = 0; qdr_delivery_incref(dlv, "qdr_link_deliver - newly created delivery, add to action list"); qdr_delivery_incref(dlv, "qdr_link_deliver - protect returned value"); @@ -86,6 +87,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg, dlv->link_exclusion = link_exclusion; dlv->ingress_index = ingress_index; dlv->error = 0; + dlv->disposition = 0; qdr_delivery_incref(dlv, "qdr_link_deliver_to - newly created delivery, add to action list"); qdr_delivery_incref(dlv, "qdr_link_deliver_to - protect returned value"); @@ -108,11 +110,12 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t * qdr_delivery_t *dlv = new_qdr_delivery_t(); ZERO(dlv); - dlv->link = link; - dlv->msg = msg; - dlv->settled = settled; - dlv->presettled = settled; - dlv->error = 0; + dlv->link = link; + dlv->msg = msg; + dlv->settled = settled; + dlv->presettled = settled; + dlv->error = 0; + dlv->disposition = 0; qdr_delivery_read_extension_state(dlv, disposition, disposition_data, true); qdr_delivery_incref(dlv, "qdr_link_deliver_to_routed_link - newly created delivery, add to action list"); @@ -339,6 +342,13 @@ bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery) return qd_message_receive_complete(delivery->msg); } +uint64_t qdr_delivery_disposition(const qdr_delivery_t *delivery) +{ + if (!delivery) + return 0; + return delivery->disposition; +} + void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label) { @@ -569,6 +579,7 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *de qd_bitmask_free(delivery->link_exclusion); qdr_error_free(delivery->error); + free_qdr_delivery_t(delivery); } @@ -812,8 +823,12 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery link->dropped_presettled_deliveries++; if (dlv->link->link_type == QD_LINK_ENDPOINT) core->dropped_presettled_deliveries++; - } else - qdr_delivery_release_CT(core, dlv); + } else { + if (more) + dlv->disposition = PN_RELEASED; + else + qdr_delivery_release_CT(core, dlv); + } if (qdr_is_addr_treatment_multicast(link->owning_addr)) qdr_link_issue_credit_CT(core, link, 1, false); @@ -870,13 +885,21 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery // If the delivery is not settled, release it. // if (!dlv->settled) { - qdr_delivery_release_CT(core, dlv); // // Set the discard flag on the message only if the message is not completely received yet. // - if (more) + if (more) { + // + // Since more of the messgae is still arriving, we want to wait until after the enter message arrives to release it. + // Dont release it now. + // qd_message_set_discard(dlv->msg, true); + dlv->disposition = PN_RELEASED; + } + else { + qdr_delivery_release_CT(core, dlv); + } } // http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5d7304f2/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index 39da87d..c305e54 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -288,11 +288,6 @@ static void AMQP_rx_handler(void* context, qd_link_t *link) bool receive_complete = qd_message_receive_complete(msg); if (receive_complete) { - // - // The entire message has been received and we are ready to consume the delivery by calling pn_link_advance(). - // - pn_link_advance(pn_link); - if (!qd_message_aborted(msg)) { // Since the entire message has been received, we can print out its contents to the log if necessary. if (cf->log_message) { @@ -312,12 +307,31 @@ static void AMQP_rx_handler(void* context, qd_link_t *link) pn_link_name(pn_link)); } + // + // The entire message has been received and we are ready to consume the delivery by calling pn_link_advance(). + // + pn_link_advance(pn_link); + + // + // The entire message has been received but this message needs to be discarded + // + if (qd_message_is_discard(msg)) { + if (qdr_delivery_disposition(delivery) != 0) + pn_delivery_update(pnd, qdr_delivery_disposition(delivery)); + pn_delivery_settle(pnd); + qdr_delivery_decref(router->router_core, delivery, "release protection of return from delivery discard"); + } + // Link stalling may have ignored some delivery events. // If there's another delivery pending then reschedule this. pn_delivery_t *npnd = pn_link_current(pn_link); if (npnd) { qd_connection_invoke_deferred(conn, deferred_AMQP_rx_handler, link); } + + if (qd_message_is_discard(msg)) { + return; + } } // @@ -339,14 +353,21 @@ static void AMQP_rx_handler(void* context, qd_link_t *link) // A delivery object was already available via pn_delivery_get_context. This means a qdr_delivery was already created. Use it to continue delivery. // if (delivery) { - qdr_deliver_continue(delivery); // - // Settle the proton delivery only if all the data has been received. + // Call continue only if the discard flag on the message is not set + // We should not continue processing the message after it has been discarded // - if (pn_delivery_settled(pnd) && receive_complete) { - qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd); - pn_delivery_settle(pnd); + if (!qd_message_is_discard(msg)) { + qdr_deliver_continue(delivery); + + // + // Settle the proton delivery only if all the data has been received. + // + if (pn_delivery_settled(pnd) && receive_complete) { + qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd); + pn_delivery_settle(pnd); + } } } else { @@ -418,11 +439,17 @@ static void AMQP_rx_handler(void* context, qd_link_t *link) } if (delivery) { - qdr_deliver_continue(delivery); - if (receive_complete) { - if (pn_delivery_settled(pnd)) { - qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd); - pn_delivery_settle(pnd); + // + // Call continue only if the discard flag on the message is not set + // We should not continue processing the message after it has been discarded + // + if (!qd_message_is_discard(msg)) { + qdr_deliver_continue(delivery); + if (receive_complete) { + if (pn_delivery_settled(pnd)) { + qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd); + pn_delivery_settle(pnd); + } } } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/5d7304f2/tests/system_tests_one_router.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py index 50b3390..afadf81 100644 --- a/tests/system_tests_one_router.py +++ b/tests/system_tests_one_router.py @@ -52,8 +52,6 @@ class MultiTimeout ( object ): self.parent.timeout ( self.name ) - - class OneRouterTest(TestCase): """System tests involving a single router""" @classmethod @@ -414,6 +412,11 @@ class OneRouterTest(TestCase): client.connection.close() + def test_40_anonymous_sender_no_receiver(self): + test = AnonymousSenderNoRecvLargeMessagedTest(self.address) + test.run() + self.assertEqual(None, test.error) + class Entity(object): def __init__(self, status_code, status_description, attrs): @@ -2228,6 +2231,7 @@ class MulticastUnsettledTest(MessagingHandler): def run(self): Container(self).run() + class LargeMessageStreamTest(MessagingHandler): def __init__(self, address): super(LargeMessageStreamTest, self).__init__() @@ -2322,6 +2326,7 @@ class MultiframePresettledTest(MessagingHandler): def run(self): Container(self).run() + class MulticastUnsettledNoReceiverTest(MessagingHandler): """ Creates a sender to a multicast address. Router provides a credit of 'linkCapacity' to this sender even @@ -2400,6 +2405,48 @@ class MulticastUnsettledNoReceiverTest(MessagingHandler): Container(self).run() +class AnonymousSenderNoRecvLargeMessagedTest(MessagingHandler): + def __init__(self, address): + super(AnonymousSenderNoRecvLargeMessagedTest, self).__init__(auto_accept=False) + self.timer = None + self.conn = None + self.sender = None + self.address = address + self.released = False + self.error = None + self.body = "" + for i in range(20000): + self.body += "0123456789101112131415" + + def timeout(self): + self.error = "Timeout Expired:, delivery not released. " + self.conn.close() + + def check_if_done(self): + if self.released: + self.sender.close() + self.conn.close() + self.timer.cancel() + + def on_start(self, event): + self.timer = event.reactor.schedule(TIMEOUT, Timeout(self)) + self.conn = event.container.connect(self.address) + # This sender is an anonymous sender + self.sender = event.container.create_sender(self.conn) + + def on_sendable(self, event): + msg = Message(body=self.body, address="someaddress") + # send(msg) calls the stream function which streams data from sender to the router + event.sender.send(msg) + + def on_released(self, event): + self.released = True + self.check_if_done() + + def run(self): + Container(self).run() + + class ReleasedVsModifiedTest(MessagingHandler): def __init__(self, address): super(ReleasedVsModifiedTest, self).__init__(prefetch=0, auto_accept=False) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org