Repository: qpid-dispatch Updated Branches: refs/heads/1.1.x d44e8a98b -> 2e48f9042
DISPATCH-966 - Fixed two bugs preventing very large inter-router messages from being delivered. These were regressions introduced with large-message streaming. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/2e48f904 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/2e48f904 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/2e48f904 Branch: refs/heads/1.1.x Commit: 2e48f9042fca10a1657a6a2afa50f93fabb6338f Parents: d44e8a9 Author: Ted Ross <tr...@redhat.com> Authored: Thu May 17 12:54:04 2018 -0400 Committer: Ted Ross <tr...@redhat.com> Committed: Thu May 17 12:56:05 2018 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/message.h | 12 ++---------- src/message.c | 18 +++++++++--------- src/message_private.h | 1 + src/router_core/forwarder.c | 9 ++++++--- src/router_core/transfer.c | 4 ++-- 5 files changed, 20 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2e48f904/include/qpid/dispatch/message.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 78c0b95..ca2ab47 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -365,19 +365,11 @@ size_t qd_message_fanout(qd_message_t *msg); void qd_message_add_fanout(qd_message_t *msg); /** - * Setter for message Q2 input_holdoff state + * Disable the Q2-holdoff for this message. * * @param msg A pointer to the message */ -void qd_message_set_Q2_input_holdoff(qd_message_t *msg, bool holdoff); - -/** - * Accessor for message Q2 input_holdoff state - * - * @param msg A pointer to the message - * @return true if input is being held off - */ -bool qd_message_get_Q2_input_holdoff(qd_message_t *msg); +void qd_message_Q2_holdoff_disable(qd_message_t *msg); /** * Test if attempt to retreive message data through qd_message_recv should block http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2e48f904/src/message.c ---------------------------------------------------------------------- diff --git a/src/message.c b/src/message.c index 03f475b..ba45ff4 100644 --- a/src/message.c +++ b/src/message.c @@ -1933,21 +1933,21 @@ int qd_message_get_phase_val(qd_message_t *msg) } -void qd_message_set_Q2_input_holdoff(qd_message_t *msg, bool holdoff) +void qd_message_Q2_holdoff_disable(qd_message_t *msg) { - ((qd_message_pvt_t*)msg)->content->q2_input_holdoff = holdoff; -} - - -bool qd_message_get_Q2_input_holdoff(qd_message_t *msg) -{ - return ((qd_message_pvt_t*)msg)->content->q2_input_holdoff; + if (!msg) + return; + qd_message_pvt_t *msg_pvt = (qd_message_pvt_t*) msg; + msg_pvt->content->disable_q2_holdoff = true; } bool qd_message_Q2_holdoff_should_block(qd_message_t *msg) { - return DEQ_SIZE(((qd_message_pvt_t*)msg)->content->buffers) >= QD_QLIMIT_Q2_UPPER; + if (!msg) + return false; + qd_message_pvt_t *msg_pvt = (qd_message_pvt_t*) msg; + return !msg_pvt->content->disable_q2_holdoff && DEQ_SIZE(msg_pvt->content->buffers) >= QD_QLIMIT_Q2_UPPER; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2e48f904/src/message_private.h ---------------------------------------------------------------------- diff --git a/src/message_private.h b/src/message_private.h index 62d438b..fe8147f 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -114,6 +114,7 @@ typedef struct { bool receive_complete; // true if the message has been completely received, false otherwise bool q2_input_holdoff; // hold off calling pn_link_recv bool aborted; // receive completed with abort flag set + bool disable_q2_holdoff; // Disable the Q2 flow control } qd_message_content_t; typedef struct { http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2e48f904/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 7ab8a46..fca19c3 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -346,13 +346,14 @@ int qdr_forward_multicast_CT(qdr_core_t *core, // if (receive_complete) qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg); - else + else { // // Receive is not complete, we will store the sub in in_delivery->subscriptions so we can send the message to the subscription // after the message fully arrives // DEQ_INSERT_TAIL(in_delivery->subscriptions, sub); - + qd_message_Q2_holdoff_disable(msg); + } fanout++; addr->deliveries_to_container++; @@ -408,12 +409,14 @@ int qdr_forward_closest_CT(qdr_core_t *core, // if (receive_complete) qdr_forward_on_message_CT(core, sub, in_delivery ? in_delivery->link : 0, msg); - else + else { // // Receive is not complete, we will store the sub in in_delivery->subscriptions so we can send the message to the subscription // after the message fully arrives // DEQ_INSERT_TAIL(in_delivery->subscriptions, sub); + qd_message_Q2_holdoff_disable(msg); + } // // If the incoming delivery is not settled, it should be accepted and settled here. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2e48f904/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index e8d37ef..706a26d 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -1144,9 +1144,9 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from action"); // - // If it is already in the undelivered list or it has no peers, don't try to deliver this again. + // If it is already in the undelivered list, don't try to deliver this again. // - if (in_dlv->where == QDR_DELIVERY_IN_UNDELIVERED || !qdr_delivery_has_peer_CT(in_dlv)) + if (in_dlv->where == QDR_DELIVERY_IN_UNDELIVERED) return; qdr_deliver_continue_peers_CT(core, in_dlv); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org