This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new 67e8ef6 DISPATCH-1309 - Convert the delivery->link pointer to a safe pointer. 67e8ef6 is described below commit 67e8ef6f10a4a3b2f92f39aa63f9aa0f51800000 Author: Ted Ross <tr...@redhat.com> AuthorDate: Thu Apr 11 13:03:53 2019 -0400 DISPATCH-1309 - Convert the delivery->link pointer to a safe pointer. --- include/qpid/dispatch/alloc_malloc.h | 1 + include/qpid/dispatch/alloc_pool.h | 1 + include/qpid/dispatch/router_core.h | 2 +- src/router_core/connections.c | 8 ++-- src/router_core/core_link_endpoint.c | 4 +- src/router_core/forwarder.c | 12 +++--- src/router_core/router_core_private.h | 26 ++++++------ src/router_core/transfer.c | 80 ++++++++++++++++++++--------------- src/router_node.c | 4 +- 9 files changed, 79 insertions(+), 59 deletions(-) diff --git a/include/qpid/dispatch/alloc_malloc.h b/include/qpid/dispatch/alloc_malloc.h index d89ed28..ffc0d84 100644 --- a/include/qpid/dispatch/alloc_malloc.h +++ b/include/qpid/dispatch/alloc_malloc.h @@ -57,6 +57,7 @@ typedef struct { #define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0, 0) static inline uint32_t qd_alloc_sequence(void *p) { return 0; } +static inline void qd_nullify_safe_ptr(qd_alloc_safe_ptr_t *sp) { } static inline void qd_alloc_initialize(void) {} static inline void qd_alloc_debug_dump(const char *file) {} static inline void qd_alloc_finalize(void) {} diff --git a/include/qpid/dispatch/alloc_pool.h b/include/qpid/dispatch/alloc_pool.h index 4f6931c..14641da 100644 --- a/include/qpid/dispatch/alloc_pool.h +++ b/include/qpid/dispatch/alloc_pool.h @@ -79,6 +79,7 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool); /** De-allocate from a thread pool. Use via ALLOC_DECLARE */ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p); uint32_t qd_alloc_sequence(void *p); +static inline void qd_nullify_safe_ptr(qd_alloc_safe_ptr_t *sp) { sp->ptr = 0; } /** * Declare functions new_T and alloc_T diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 9f9567d..1e48c20 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -660,7 +660,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg, qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *msg, bool settled, const uint8_t *tag, int tag_length, uint64_t disposition, pn_data_t* disposition_state); -qdr_delivery_t *qdr_deliver_continue(qdr_delivery_t *delivery); +qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery); int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit); diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 2143580..4956c7e 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -706,7 +706,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c // Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc // qdr_increment_delivery_counters_CT(core, ref->dlv); - ref->dlv->link = 0; + qd_nullify_safe_ptr(&ref->dlv->link_sp); // // Now our reference // @@ -747,7 +747,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c // Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc // qdr_increment_delivery_counters_CT(core, dlv); - dlv->link = 0; + qd_nullify_safe_ptr(&dlv->link_sp); // // Now the undelivered-list reference @@ -791,7 +791,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c // Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc // qdr_increment_delivery_counters_CT(core, dlv); - dlv->link = 0; + qd_nullify_safe_ptr(&dlv->link_sp); // // Now the unsettled-list reference @@ -823,7 +823,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c // Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc // qdr_increment_delivery_counters_CT(core, dlv); - dlv->link = 0; + qd_nullify_safe_ptr(&dlv->link_sp); // This decref is for the removing the delivery from the settled list qdr_delivery_decref_CT(core, dlv, "qdr_link_cleanup_deliveries_CT - remove from settled list"); diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c index 6ee4922..92260b3 100644 --- a/src/router_core/core_link_endpoint.c +++ b/src/router_core/core_link_endpoint.c @@ -115,7 +115,7 @@ void qdrc_endpoint_send_CT(qdr_core_t *core, qdrc_endpoint_t *ep, qdr_delivery_t { uint64_t *tag = (uint64_t*) dlv->tag; - dlv->link = ep->link; + set_safe_ptr_qdr_link_t(ep->link, &dlv->link_sp); dlv->settled = presettled; dlv->presettled = presettled; *tag = core->next_tag++; @@ -133,7 +133,7 @@ qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core, qdrc_endpoint_t *end uint64_t *tag = (uint64_t*) dlv->tag; ZERO(dlv); - dlv->link = endpoint->link; + set_safe_ptr_qdr_link_t(endpoint->link, &dlv->link_sp); dlv->msg = message; *tag = core->next_tag++; dlv->tag_length = 8; diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index a58e165..b7f2ad9 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -117,7 +117,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in uint64_t *tag = (uint64_t*) out_dlv->tag; ZERO(out_dlv); - out_dlv->link = link; + set_safe_ptr_qdr_link_t(link, &out_dlv->link_sp); out_dlv->msg = qd_message_copy(msg); out_dlv->settled = !in_dlv || in_dlv->settled; out_dlv->presettled = out_dlv->settled; @@ -298,7 +298,8 @@ static uint8_t qdr_forward_effective_priority(qd_message_t *msg, qdr_address_t * */ static inline bool qdr_forward_edge_echo_CT(qdr_delivery_t *in_dlv, qdr_link_t *out_link) { - return (in_dlv && in_dlv->via_edge && in_dlv->link->conn == out_link->conn); + qdr_link_t *link = in_dlv ? safe_deref_qdr_link_t(in_dlv->link_sp) : 0; + return (in_dlv && in_dlv->via_edge && link && link->conn == out_link->conn); } @@ -313,9 +314,10 @@ static void qdr_forward_to_subscriber(qdr_core_t *core, qdr_subscription_t *sub, // Only if the message has been completely received, forward it to the subscription // Subscriptions, at the moment, dont have the ability to deal with partial messages // - if (receive_complete) - qdr_forward_on_message_CT(core, sub, in_dlv ? in_dlv->link : 0, in_msg); - else { + if (receive_complete) { + qdr_link_t *link = in_dlv ? safe_deref_qdr_link_t(in_dlv->link_sp) : 0; + qdr_forward_on_message_CT(core, sub, link, in_msg); + } else { // // Receive is not complete, we will store the sub in // in_dlv->subscriptions so we can send the message to the subscription diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 1e4f69c..e4666a5 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -40,6 +40,19 @@ typedef struct qdr_connection_ref_t qdr_connection_ref_t; typedef struct qdr_exchange qdr_exchange_t; typedef struct qdr_edge_t qdr_edge_t; +ALLOC_DECLARE(qdr_address_t); +ALLOC_DECLARE(qdr_address_config_t); +ALLOC_DECLARE(qdr_node_t); +ALLOC_DECLARE(qdr_router_ref_t); +ALLOC_DECLARE(qdr_link_ref_t); +ALLOC_DECLARE(qdr_link_route_t); +ALLOC_DECLARE(qdr_auto_link_t); +ALLOC_DECLARE(qdr_conn_identifier_t); +ALLOC_DECLARE(qdr_connection_ref_t); + +ALLOC_DECLARE(qdr_connection_t); +ALLOC_DECLARE(qdr_link_t); + #include "core_link_endpoint.h" #include "core_events.h" @@ -338,7 +351,6 @@ struct qdr_node_t { int cost; }; -ALLOC_DECLARE(qdr_node_t); DEQ_DECLARE(qdr_node_t, qdr_node_list_t); void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode); @@ -352,7 +364,6 @@ struct qdr_router_ref_t { qdr_node_t *router; }; -ALLOC_DECLARE(qdr_router_ref_t); DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t); typedef enum { @@ -386,7 +397,7 @@ struct qdr_delivery_t { void *context; sys_atomic_t ref_count; bool ref_counted; /// Used to protect against ref count going 1 -> 0 -> 1 - qdr_link_t *link; + qdr_link_t_sp link_sp; /// Safe pointer to the link qdr_delivery_t *peer; /// Use this peer if the delivery has one and only one peer. qdr_delivery_ref_t *next_peer_ref; qd_message_t *msg; @@ -494,7 +505,6 @@ struct qdr_link_t { uint32_t core_ticks; }; -ALLOC_DECLARE(qdr_link_t); DEQ_DECLARE(qdr_link_t, qdr_link_list_t); struct qdr_link_ref_t { @@ -502,7 +512,6 @@ struct qdr_link_ref_t { qdr_link_t *link; }; -ALLOC_DECLARE(qdr_link_ref_t); DEQ_DECLARE(qdr_link_ref_t, qdr_link_ref_list_t); void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link, int cls); @@ -515,7 +524,6 @@ struct qdr_connection_ref_t { qdr_connection_t *conn; }; -ALLOC_DECLARE(qdr_connection_ref_t); DEQ_DECLARE(qdr_connection_ref_t, qdr_connection_ref_list_t); void qdr_add_connection_ref(qdr_connection_ref_list_t *ref_list, qdr_connection_t *conn); @@ -580,7 +588,6 @@ struct qdr_address_t { int priority; }; -ALLOC_DECLARE(qdr_address_t); DEQ_DECLARE(qdr_address_t, qdr_address_list_t); qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t treatment, qdr_address_config_t *config); @@ -605,7 +612,6 @@ struct qdr_address_config_t { int priority; }; -ALLOC_DECLARE(qdr_address_config_t); DEQ_DECLARE(qdr_address_config_t, qdr_address_config_list_t); void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t *addr); bool qdr_is_addr_treatment_multicast(qdr_address_t *addr); @@ -681,7 +687,6 @@ struct qdr_connection_t { bool closed; // This bit is used in the case where a client is trying to force close this connection. }; -ALLOC_DECLARE(qdr_connection_t); DEQ_DECLARE(qdr_connection_t, qdr_connection_list_t); #define QDR_IS_LINK_ROUTE_PREFIX(p) ((p) == QD_ITER_HASH_PREFIX_LINKROUTE_ADDR_IN || (p) == QD_ITER_HASH_PREFIX_LINKROUTE_ADDR_OUT) @@ -709,7 +714,6 @@ struct qdr_link_route_t { qdr_connection_t *parent_conn; }; -ALLOC_DECLARE(qdr_link_route_t); void qdr_core_delete_link_route(qdr_core_t *core, qdr_link_route_t *lr); void qdr_core_delete_auto_link (qdr_core_t *core, qdr_auto_link_t *al); @@ -755,7 +759,6 @@ struct qdr_auto_link_t { char *last_error; }; -ALLOC_DECLARE(qdr_auto_link_t); DEQ_DECLARE(qdr_auto_link_t, qdr_auto_link_list_t); @@ -767,7 +770,6 @@ struct qdr_conn_identifier_t { qdr_auto_link_list_t auto_link_refs; }; -ALLOC_DECLARE(qdr_conn_identifier_t); DEQ_DECLARE(qdr_exchange_t, qdr_exchange_list_t); typedef struct qdr_priority_sheaf_t { diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 7b1af31..3e6fe73 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -49,7 +49,7 @@ qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, qd_message_t *msg, qd_iterato qdr_delivery_t *dlv = new_qdr_delivery_t(); ZERO(dlv); - dlv->link = link; + set_safe_ptr_qdr_link_t(link, &dlv->link_sp); dlv->msg = msg; dlv->to_addr = 0; dlv->origin = ingress; @@ -78,7 +78,7 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, qd_message_t *msg, qdr_delivery_t *dlv = new_qdr_delivery_t(); ZERO(dlv); - dlv->link = link; + set_safe_ptr_qdr_link_t(link, &dlv->link_sp); dlv->msg = msg; dlv->to_addr = addr; dlv->origin = ingress; @@ -110,7 +110,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t * qdr_delivery_t *dlv = new_qdr_delivery_t(); ZERO(dlv); - dlv->link = link; + set_safe_ptr_qdr_link_t(link, &dlv->link_sp); dlv->msg = msg; dlv->settled = settled; dlv->presettled = settled; @@ -130,7 +130,7 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t * } -qdr_delivery_t *qdr_deliver_continue(qdr_delivery_t *in_dlv) +qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *in_dlv) { qdr_action_t *action = qdr_action(qdr_deliver_continue_CT, "deliver_continue"); action->args.connection.delivery = in_dlv; @@ -140,7 +140,7 @@ qdr_delivery_t *qdr_deliver_continue(qdr_delivery_t *in_dlv) // This incref is for the action reference qdr_delivery_incref(in_dlv, "qdr_deliver_continue - add to action list"); - qdr_action_enqueue(in_dlv->link->core, action); + qdr_action_enqueue(core, action); return in_dlv; } @@ -337,7 +337,7 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery) qdr_link_t *qdr_delivery_link(const qdr_delivery_t *delivery) { - return delivery ? delivery->link : 0; + return delivery ? safe_deref_qdr_link_t(delivery->link_sp) : 0; } @@ -391,8 +391,9 @@ void qdr_delivery_incref(qdr_delivery_t *delivery, const char *label) uint32_t rc = sys_atomic_inc(&delivery->ref_count); assert(rc > 0 || !delivery->ref_counted); delivery->ref_counted = true; - if (delivery->link) - qd_log(delivery->link->core->log, QD_LOG_DEBUG, + qdr_link_t *link = qdr_delivery_link(delivery); + if (link) + qd_log(link->core->log, QD_LOG_DEBUG, "Delivery incref: dlv:%lx rc:%"PRIu32" %s", (long) delivery, rc + 1, label); } @@ -515,7 +516,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv) // Remove a delivery from its unsettled list. Side effects include issuing // replacement credit and visiting the link-quiescence algorithm // - qdr_link_t *link = dlv->link; + qdr_link_t *link = qdr_delivery_link(dlv); qdr_connection_t *conn = link ? link->conn : 0; bool moved = false; @@ -561,7 +562,7 @@ bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *dlv) void qdr_increment_delivery_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery) { - qdr_link_t *link = delivery->link; + qdr_link_t *link = qdr_delivery_link(delivery); if (link) { bool do_rate = false; @@ -915,7 +916,12 @@ static long qdr_addr_path_count_CT(qdr_address_t *addr) static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr, bool more) { - if (dlv->link->link_type == QD_LINK_ENDPOINT) + qdr_link_t *dlv_link = qdr_delivery_link(dlv); + + if (!dlv_link) + return; + + if (dlv_link->link_type == QD_LINK_ENDPOINT) core->deliveries_ingress++; if (addr && addr == link->owning_addr && qdr_addr_path_count_CT(addr) == 0) { @@ -930,7 +936,7 @@ static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery if (dlv->settled) { // Increment the presettled_dropped_deliveries on the in_link link->dropped_presettled_deliveries++; - if (dlv->link->link_type == QD_LINK_ENDPOINT) + if (dlv_link->link_type == QD_LINK_ENDPOINT) core->dropped_presettled_deliveries++; // @@ -1062,7 +1068,10 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis qdr_delivery_t *dlv = action->args.connection.delivery; bool more = action->args.connection.more; - qdr_link_t *link = dlv->link; + qdr_link_t *link = qdr_delivery_link(dlv); + + if (!link) + return; // // Record the ingress time so we can track the age of this delivery. @@ -1203,6 +1212,9 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool qdr_error_t *error = action->args.delivery.error; bool error_unassigned = true; + qdr_link_t *dlv_link = qdr_delivery_link(dlv); + qdr_link_t *peer_link = qdr_delivery_link(peer); + // // Logic: // @@ -1228,7 +1240,7 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool if (settled) { if (peer) { peer->settled = true; - if (peer->link) { + if (peer_link) { peer_moved = qdr_delivery_settled_CT(core, peer); if (peer_moved) push = true; @@ -1236,15 +1248,15 @@ static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool qdr_delivery_unlink_peers_CT(core, dlv, peer); } - if (dlv->link) + if (dlv_link) dlv_moved = qdr_delivery_settled_CT(core, dlv); } // // If the delivery's link has a core endpoint, notify the endpoint of the update // - if (dlv->link && dlv->link->core_endpoint) - qdrc_endpoint_do_update_CT(core, dlv->link->core_endpoint, dlv, settled); + if (dlv_link && dlv_link->core_endpoint) + qdrc_endpoint_do_update_CT(core, dlv_link->core_endpoint, dlv, settled); if (push) qdr_delivery_push_CT(core, peer); @@ -1276,29 +1288,30 @@ static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv) { qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv); + while (peer) { - qdr_link_work_t *work = peer->link_work; + qdr_link_work_t *work = peer->link_work; + qdr_link_t *peer_link = qdr_delivery_link(peer); + // // Determines if the peer connection can be activated. // For a large message, the peer delivery's link_work MUST be at the head of the peer link's work list. This link work is only removed // after the streaming message has been sent. // - if (work) { - sys_mutex_lock(peer->link->conn->work_lock); - if (work->processing || work == DEQ_HEAD(peer->link->work_list)) { + if (!!work && !!peer_link) { + sys_mutex_lock(peer_link->conn->work_lock); + if (work->processing || work == DEQ_HEAD(peer_link->work_list)) { // Adding this work at priority 0. - qdr_add_link_ref(peer->link->conn->links_with_work, peer->link, QDR_LINK_LIST_CLASS_WORK); - sys_mutex_unlock(peer->link->conn->work_lock); + qdr_add_link_ref(peer_link->conn->links_with_work, peer_link, QDR_LINK_LIST_CLASS_WORK); + sys_mutex_unlock(peer_link->conn->work_lock); // // Activate the outgoing connection for later processing. // - qdr_connection_activate_CT(core, peer->link->conn); - } - else { - sys_mutex_unlock(peer->link->conn->work_lock); - + qdr_connection_activate_CT(core, peer_link->conn); } + else + sys_mutex_unlock(peer_link->conn->work_lock); } peer = qdr_delivery_next_peer_CT(in_dlv); @@ -1313,11 +1326,12 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool qdr_delivery_t *in_dlv = action->args.connection.delivery; bool more = action->args.connection.more; + qdr_link_t *link = qdr_delivery_link(in_dlv); // // If it is already in the undelivered list, don't try to deliver this again. // - if (in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) { + if (!!link && in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) { qdr_deliver_continue_peers_CT(core, in_dlv); qd_message_t *msg = qdr_delivery_message(in_dlv); @@ -1330,7 +1344,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool qdr_subscription_t *sub = DEQ_HEAD(in_dlv->subscriptions); while (sub) { DEQ_REMOVE_HEAD(in_dlv->subscriptions); - qdr_forward_on_message_CT(core, sub, in_dlv->link, in_dlv->msg); + qdr_forward_on_message_CT(core, sub, link, in_dlv->msg); sub = DEQ_HEAD(in_dlv->subscriptions); } @@ -1367,7 +1381,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool // Remove the delivery from the settled list and decref the in_dlv. in_dlv->where = QDR_DELIVERY_NOWHERE; - DEQ_REMOVE(in_dlv->link->settled, in_dlv); + DEQ_REMOVE(link->settled, in_dlv); qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from settled list"); } } @@ -1472,10 +1486,10 @@ void qdr_addr_start_inlinks_CT(qdr_core_t *core, qdr_address_t *addr) void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv) { - if (!dlv || !dlv->link) + qdr_link_t *link = qdr_delivery_link(dlv); + if (!link) return; - qdr_link_t *link = dlv->link; bool activate = false; sys_mutex_lock(link->conn->work_lock); diff --git a/src/router_node.c b/src/router_node.c index 2114ce8..19d2ae2 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -375,7 +375,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link) // We should not continue processing the message after it has been discarded // if (!qd_message_is_discard(msg)) { - qdr_deliver_continue(delivery); + qdr_deliver_continue(router->router_core, delivery); } } else { @@ -452,7 +452,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link) // We should not continue processing the message after it has been discarded // if (!qd_message_is_discard(msg)) { - qdr_deliver_continue(delivery); + qdr_deliver_continue(router->router_core, delivery); } return next_delivery; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org