Repository: qpid-dispatch Updated Branches: refs/heads/master 03ba19e65 -> 710b7059b
DISPATCH-1096 - priority messaging support Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/710b7059 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/710b7059 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/710b7059 Branch: refs/heads/master Commit: 710b7059b77a72337a25c79b1e4e01856ca484b7 Parents: 03ba19e Author: Michael Goulish <mgoul...@redhat.com> Authored: Mon Sep 10 11:53:37 2018 -0400 Committer: Ted Ross <tr...@redhat.com> Committed: Mon Sep 10 14:19:25 2018 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/iterator.h | 2 + include/qpid/dispatch/message.h | 7 + src/iterator.c | 8 ++ src/message.c | 90 +++++++++--- src/message_private.h | 11 ++ src/router_core/connections.c | 216 ++++++++++++++++------------- src/router_core/forwarder.c | 52 +++++-- src/router_core/route_tables.c | 7 +- src/router_core/router_core_private.h | 14 +- src/router_core/transfer.c | 15 +- 10 files changed, 287 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/include/qpid/dispatch/iterator.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/iterator.h b/include/qpid/dispatch/iterator.h index fa4267c..74d73ca 100644 --- a/include/qpid/dispatch/iterator.h +++ b/include/qpid/dispatch/iterator.h @@ -327,6 +327,8 @@ int qd_iterator_ncopy(qd_iterator_t *iter, unsigned char* buffer, int n); */ unsigned char *qd_iterator_copy(qd_iterator_t *iter); +uint8_t qd_iterator_uint8(qd_iterator_t *iter); + /** * Return a new iterator that is a duplicate of the original iterator, referring * to the same base data. If the input iterator pointer is NULL, the duplicate http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/include/qpid/dispatch/message.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index ca2ab47..ec0b901 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -411,6 +411,13 @@ bool qd_message_aborted(const qd_message_t *msg); */ void qd_message_set_aborted(const qd_message_t *msg, bool aborted); +/** + * Return message priority + * @param msg A pointer to the message + */ +uint8_t qd_message_get_priority(qd_message_t *msg); + + ///@} #endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/iterator.c ---------------------------------------------------------------------- diff --git a/src/iterator.c b/src/iterator.c index 8889b17..910aa19 100644 --- a/src/iterator.c +++ b/src/iterator.c @@ -768,6 +768,14 @@ char* qd_iterator_strncpy(qd_iterator_t *iter, char* buffer, int n) } +uint8_t qd_iterator_uint8(qd_iterator_t *iter ) { + qd_iterator_reset(iter); + if (qd_iterator_end(iter)) + return 0; + return (uint8_t) qd_iterator_octet(iter); +} + + unsigned char *qd_iterator_copy(qd_iterator_t *iter) { if (!iter) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/message.c ---------------------------------------------------------------------- diff --git a/src/message.c b/src/message.c index fce3394..35363cf 100644 --- a/src/message.c +++ b/src/message.c @@ -724,22 +724,57 @@ static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_me // else 0) static qd_field_location_t *qd_message_header_field(qd_message_t *msg, qd_message_field_t field) { - qd_message_content_t *content = MSG_CONTENT(msg); - - if (!content->section_message_header.parsed) { - if (!qd_message_check(msg, QD_DEPTH_HEADER) || !content->section_message_header.parsed) - return 0; - } - - switch (field) { - case QD_FIELD_HEADER: - return &content->section_message_properties; - default: - // TBD: add header fields as needed (see qd_message_properties_field() - // as an example) - assert(false); + int first_header_field = QD_FIELD_DURABLE, + last_header_field = QD_FIELD_DELIVERY_COUNT; + static const intptr_t offsets[] = { + // position of the fields' qd_field_location_t in the message content object + (intptr_t) &((qd_message_content_t *)0)->field_durable, + (intptr_t) &((qd_message_content_t *)0)->field_priority, + (intptr_t) &((qd_message_content_t *)0)->field_ttl, + (intptr_t) &((qd_message_content_t *)0)->field_first_acquirer, + (intptr_t) &((qd_message_content_t *)0)->field_delivery_count + }; + if (!(first_header_field <= field && field <= last_header_field)) { + assert ( 0 ); return 0; } + + qd_message_content_t *content = MSG_CONTENT(msg); + if (!content->section_message_header.parsed) { + if (!qd_message_check(msg, QD_DEPTH_HEADER) || !content->section_message_header.parsed) + return 0; + } + // If it's already been parsed, just return it. + const int index = field - first_header_field; + qd_field_location_t *const location = (qd_field_location_t *)((char *)content + offsets[index]); + if (location->parsed) + return location; + // requested field not parsed out. Need to parse out up to the requested field: + qd_field_location_t section = content->section_message_header; + qd_buffer_t *buffer = section.buffer; + unsigned char *cursor = qd_buffer_base(buffer) + section.offset; + advance(&cursor, &buffer, section.hdr_length, 0, 0); + int start = start_list(&cursor, &buffer); + if (index > start) + return 0; // properties list too short + // Make sure that all fields up to the requested one are parsed. + int position = 0; + while (position < index) { + qd_field_location_t *f = (qd_field_location_t *)((char *)content + offsets[position]); + // If it's parsed, advance over it. If not, parse it. + if (f->parsed) + advance(&cursor, &buffer, f->hdr_length + f->length, 0, 0); + else + if (!traverse_field(&cursor, &buffer, f)) + return 0; + position++; + } + // all fields previous to the target have now been parsed and cursor/buffer + // are in the correct position, parse out the field: + if (traverse_field(&cursor, &buffer, location)) + return location; + else + return 0; } @@ -892,7 +927,8 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg) copy->ma_phase = msg->ma_phase; copy->strip_annotations_in = msg->strip_annotations_in; - copy->content = content; + copy->content = content; + copy->content->priority = content->priority; copy->sent_depth = QD_DEPTH_NONE; copy->cursor.buffer = 0; @@ -1018,6 +1054,28 @@ void qd_message_add_fanout(qd_message_t *in_msg) sys_atomic_inc(&msg->content->fanout); } +static void message_set_priority(qd_message_t *in_msg, uint8_t priority) +{ + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + msg->content->priority = priority < QDR_N_PRIORITIES ? priority : QDR_N_PRIORITIES - 1; +} + +uint8_t qd_message_get_priority(qd_message_t *in_msg) +{ + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + + uint8_t priority = 0; + qd_iterator_t *priority_iterator = qd_message_field_iterator(in_msg, QD_FIELD_PRIORITY); + if (priority_iterator) { + if (qd_iterator_remaining(priority_iterator) > 0) { + priority = qd_iterator_uint8(priority_iterator); + message_set_priority(in_msg, priority); + } + } + qd_iterator_free(priority_iterator); + return msg->content->priority; +} + bool qd_message_receive_complete(qd_message_t *in_msg) { if (!in_msg) @@ -1800,7 +1858,7 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b qd_compose_start_list(field); qd_compose_insert_bool(field, 0); // durable - //qd_compose_insert_null(field); // priority + qd_compose_insert_null(field); // priority //qd_compose_insert_null(field); // ttl //qd_compose_insert_boolean(field, 0); // first-acquirer //qd_compose_insert_uint(field, 0); // delivery-count http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/message_private.h ---------------------------------------------------------------------- diff --git a/src/message_private.h b/src/message_private.h index fe8147f..14d2593 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -77,6 +77,14 @@ typedef struct { qd_field_location_t section_body; // The message body: Data qd_field_location_t section_footer; // The footer qd_field_location_t field_user_annotations; // Opaque user message annotations, not a real field. + + // header fields + qd_field_location_t field_durable; + qd_field_location_t field_priority; + qd_field_location_t field_ttl; + qd_field_location_t field_first_acquirer; + qd_field_location_t field_delivery_count; + qd_field_location_t field_message_id; // The string value of the message-id qd_field_location_t field_user_id; // The string value of the user-id qd_field_location_t field_to; // The string value of the to field @@ -115,6 +123,7 @@ typedef struct { bool q2_input_holdoff; // hold off calling pn_link_recv bool aborted; // receive completed with abort flag set bool disable_q2_holdoff; // Disable the Q2 flow control + uint8_t priority; } qd_message_content_t; typedef struct { @@ -142,6 +151,8 @@ void qd_message_initialize(); qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *msg); +#define QDR_N_PRIORITIES 10 + ///@} #endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 786877e..9716d7e 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -205,7 +205,7 @@ const char *qdr_connection_get_tenant_space(const qdr_connection_t *conn, int *l int qdr_connection_process(qdr_connection_t *conn) { qdr_connection_work_list_t work_list; - qdr_link_ref_list_t links_with_work; + qdr_link_ref_list_t links_with_work[QDR_N_PRIORITIES]; qdr_core_t *core = conn->core; qdr_link_ref_t *ref; @@ -216,7 +216,9 @@ int qdr_connection_process(qdr_connection_t *conn) sys_mutex_lock(conn->work_lock); DEQ_MOVE(conn->work_list, work_list); - DEQ_MOVE(conn->links_with_work, links_with_work); + for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) { + DEQ_MOVE(conn->links_with_work[priority], links_with_work[priority]); + } sys_mutex_unlock(conn->work_lock); event_count += DEQ_SIZE(work_list); @@ -241,97 +243,100 @@ int qdr_connection_process(qdr_connection_t *conn) work = DEQ_HEAD(work_list); } - do { - qdr_link_work_t *link_work; - free_link = false; + // Process the links_with_work array from highest to lowest priority. + for (int priority = QDR_N_PRIORITIES - 1; priority >= 0; -- priority) { + do { + qdr_link_work_t *link_work; + free_link = false; - sys_mutex_lock(conn->work_lock); - ref = DEQ_HEAD(links_with_work); - if (ref) { - link = ref->link; - qdr_del_link_ref(&links_with_work, ref->link, QDR_LINK_LIST_CLASS_WORK); + sys_mutex_lock(conn->work_lock); + ref = DEQ_HEAD(links_with_work[priority]); + if (ref) { + link = ref->link; + qdr_del_link_ref(links_with_work + priority, ref->link, QDR_LINK_LIST_CLASS_WORK); + + link_work = DEQ_HEAD(link->work_list); + if (link_work) { + DEQ_REMOVE_HEAD(link->work_list); + link_work->processing = true; + } + } else + link = 0; + sys_mutex_unlock(conn->work_lock); - link_work = DEQ_HEAD(link->work_list); - if (link_work) { - DEQ_REMOVE_HEAD(link->work_list); - link_work->processing = true; - } - } else - link = 0; - sys_mutex_unlock(conn->work_lock); + if (link) { - if (link) { + // + // Handle disposition/settlement updates + // + qdr_delivery_ref_list_t updated_deliveries; + sys_mutex_lock(conn->work_lock); + DEQ_MOVE(link->updated_deliveries, updated_deliveries); + sys_mutex_unlock(conn->work_lock); - // - // Handle disposition/settlement updates - // - qdr_delivery_ref_list_t updated_deliveries; - sys_mutex_lock(conn->work_lock); - DEQ_MOVE(link->updated_deliveries, updated_deliveries); - sys_mutex_unlock(conn->work_lock); + qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries); + while (dref) { + core->delivery_update_handler(core->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled); + qdr_delivery_decref(core, dref->dlv, "qdr_connection_process - remove from updated list"); + qdr_del_delivery_ref(&updated_deliveries, dref); + dref = DEQ_HEAD(updated_deliveries); + event_count++; + } - qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries); - while (dref) { - core->delivery_update_handler(core->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled); - qdr_delivery_decref(core, dref->dlv, "qdr_connection_process - remove from updated list"); - qdr_del_delivery_ref(&updated_deliveries, dref); - dref = DEQ_HEAD(updated_deliveries); - event_count++; - } + while (link_work) { + switch (link_work->work_type) { + case QDR_LINK_WORK_DELIVERY : + { + int count = core->push_handler(core->user_context, link, link_work->value); + assert(count <= link_work->value); + link_work->value -= count; + break; + } - while (link_work) { - switch (link_work->work_type) { - case QDR_LINK_WORK_DELIVERY : - { - int count = core->push_handler(core->user_context, link, link_work->value); - assert(count <= link_work->value); - link_work->value -= count; + case QDR_LINK_WORK_FLOW : + if (link_work->value > 0) + core->flow_handler(core->user_context, link, link_work->value); + if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_SET) + core->drain_handler(core->user_context, link, true); + else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_CLEAR) + core->drain_handler(core->user_context, link, false); + else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_DRAINED) + core->drained_handler(core->user_context, link); break; - } - case QDR_LINK_WORK_FLOW : - if (link_work->value > 0) - core->flow_handler(core->user_context, link, link_work->value); - if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_SET) - core->drain_handler(core->user_context, link, true); - else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_CLEAR) - core->drain_handler(core->user_context, link, false); - else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_DRAINED) - core->drained_handler(core->user_context, link); - break; - - case QDR_LINK_WORK_FIRST_DETACH : - core->detach_handler(core->user_context, link, link_work->error, true, link_work->close_link); - break; - - case QDR_LINK_WORK_SECOND_DETACH : - core->detach_handler(core->user_context, link, link_work->error, false, link_work->close_link); - free_link = true; - break; - } + case QDR_LINK_WORK_FIRST_DETACH : + core->detach_handler(core->user_context, link, link_work->error, true, link_work->close_link); + break; - sys_mutex_lock(conn->work_lock); - if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0 && !link->detach_received) { - DEQ_INSERT_HEAD(link->work_list, link_work); - link_work->processing = false; - link_work = 0; // Halt work processing - } else { - qdr_error_free(link_work->error); - free_qdr_link_work_t(link_work); - link_work = DEQ_HEAD(link->work_list); - if (link_work) { - DEQ_REMOVE_HEAD(link->work_list); - link_work->processing = true; + case QDR_LINK_WORK_SECOND_DETACH : + core->detach_handler(core->user_context, link, link_work->error, false, link_work->close_link); + free_link = true; + break; } + + sys_mutex_lock(conn->work_lock); + if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0 && !link->detach_received) { + DEQ_INSERT_HEAD(link->work_list, link_work); + link_work->processing = false; + link_work = 0; // Halt work processing + } else { + qdr_error_free(link_work->error); + free_qdr_link_work_t(link_work); + link_work = DEQ_HEAD(link->work_list); + if (link_work) { + DEQ_REMOVE_HEAD(link->work_list); + link_work->processing = true; + } + } + sys_mutex_unlock(conn->work_lock); + event_count++; } - sys_mutex_unlock(conn->work_lock); - event_count++; - } - if (free_link) - qdr_link_delete(link); - } - } while (free_link || link); + if (free_link) + qdr_link_delete(link); + } + } while (free_link || link); + } return event_count; } @@ -569,7 +574,8 @@ void qdr_link_enqueue_work_CT(qdr_core_t *core, sys_mutex_lock(conn->work_lock); DEQ_INSERT_TAIL(link->work_list, work); - qdr_add_link_ref(&conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK); + // Enqueue work at priority 0. + qdr_add_link_ref(conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK); sys_mutex_unlock(conn->work_lock); qdr_connection_activate_CT(core, conn); @@ -811,7 +817,9 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li if (link->link_type == QD_LINK_CONTROL) core->control_links_by_mask_bit[conn->mask_bit] = 0; if (link->link_type == QD_LINK_ROUTER) - core->data_links_by_mask_bit[conn->mask_bit] = 0; + for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) + if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority]) + core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0; } // @@ -844,7 +852,9 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_li // qdr_del_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION); sys_mutex_lock(conn->work_lock); - qdr_del_link_ref(&conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK); + for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) { + qdr_del_link_ref(conn->links_with_work + priority, link, QDR_LINK_LIST_CLASS_WORK); + } sys_mutex_unlock(conn->work_lock); // @@ -1260,12 +1270,15 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo if (!conn->incoming) { // // The connector-side of inter-router/edge-uplink connections is responsible for setting up the - // inter-router links: Two (in and out) for control, two for routed-message transfer. + // inter-router links: Two (in and out) for control, 2 * QDR_N_PRIORITIES for routed-message transfer. // (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control()); (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control()); - (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data()); - (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data()); + + for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) { + (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data()); + (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data()); + } } } @@ -1320,10 +1333,13 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo // // Remove the references in the links_with_work list // - qdr_link_ref_t *link_ref = DEQ_HEAD(conn->links_with_work); - while (link_ref) { - qdr_del_link_ref(&conn->links_with_work, link_ref->link, QDR_LINK_LIST_CLASS_WORK); - link_ref = DEQ_HEAD(conn->links_with_work); + qdr_link_ref_t *link_ref; + for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) { + link_ref = DEQ_HEAD(conn->links_with_work[priority]); + while (link_ref) { + qdr_del_link_ref(conn->links_with_work + priority, link_ref->link, QDR_LINK_LIST_CLASS_WORK); + link_ref = DEQ_HEAD(conn->links_with_work[priority]); + } } // @@ -1411,8 +1427,14 @@ static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn, // static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link) { - if (conn->role == QDR_ROLE_INTER_ROUTER) - core->data_links_by_mask_bit[conn->mask_bit] = link; + if (conn->role == QDR_ROLE_INTER_ROUTER) { + int next_slot = core->data_links_by_mask_bit[conn->mask_bit].count ++; + if (next_slot >= QDR_N_PRIORITIES) { + qd_log(core->log, QD_LOG_ERROR, "Attempt to attach too many inter-router links for priority sheaf."); + return; + } + core->data_links_by_mask_bit[conn->mask_bit].links[next_slot] = link; + } // // TODO - This needs to be refactored in terms of a non-inter-router link type @@ -1451,8 +1473,12 @@ static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qd static void qdr_detach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link) { if (conn->role == QDR_ROLE_INTER_ROUTER) - core->data_links_by_mask_bit[conn->mask_bit] = 0; - + for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) { + if (link == core->data_links_by_mask_bit[conn->mask_bit].links[priority]) { + core->data_links_by_mask_bit[conn->mask_bit].links[priority] = 0; + break; + } + } // // TODO - This needs to be refactored in terms of a non-inter-router link type // http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 2840725..4364c47 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -24,6 +24,28 @@ #include "forwarder.h" +static qdr_link_t * peer_data_link(qdr_core_t *core, + qdr_node_t *node, + int priority) +{ + int nlmb = node->link_mask_bit; + + if (nlmb < 0 || priority < 0) + return 0; + + // Try to return the requested priority link, but if it does + // not exist, return the closest one that is lower. + qdr_link_t * link = 0; + while (1) { + if ((link = core->data_links_by_mask_bit[nlmb].links[priority])) + return link; + if (-- priority < 0) + return 0; + } + return link; +} + + //================================================================================== // Built-in Forwarders //================================================================================== @@ -153,7 +175,7 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_core_t *core, qdr_link_t *link } -void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv) +void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery_t *out_dlv, int priority) { sys_mutex_lock(out_link->conn->work_lock); @@ -186,7 +208,7 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, qdr_delivery work->value = 1; DEQ_INSERT_TAIL(out_link->work_list, work); } - qdr_add_link_ref(&out_link->conn->links_with_work, out_link, QDR_LINK_LIST_CLASS_WORK); + qdr_add_link_ref(out_link->conn->links_with_work + priority, out_link, QDR_LINK_LIST_CLASS_WORK); out_dlv->link_work = work; sys_mutex_unlock(out_link->conn->work_lock); @@ -243,6 +265,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core, qd_bitmask_t *link_exclusion = !!in_delivery ? in_delivery->link_exclusion : 0; bool presettled = !!in_delivery ? in_delivery->settled : true; bool receive_complete = qd_message_receive_complete(qdr_delivery_message(in_delivery)); + int priority = qd_message_get_priority(msg); // // If the delivery is not presettled, set the settled flag for forwarding so all @@ -262,7 +285,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core, while (link_ref) { qdr_link_t *out_link = link_ref->link; qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg); - qdr_forward_deliver_CT(core, out_link, out_delivery); + qdr_forward_deliver_CT(core, out_link, out_delivery, priority); fanout++; if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER) { addr->deliveries_egress++; @@ -321,7 +344,7 @@ int qdr_forward_multicast_CT(qdr_core_t *core, else next_node = rnode; - dest_link = control ? PEER_CONTROL_LINK(core, next_node) : PEER_DATA_LINK(core, next_node); + dest_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority); if (dest_link && qd_bitmask_value(rnode->valid_origins, origin)) qd_bitmask_set_bit(link_set, dest_link->conn->mask_bit); } @@ -334,10 +357,10 @@ int qdr_forward_multicast_CT(qdr_core_t *core, qd_bitmask_clear_bit(link_set, link_bit); dest_link = control ? core->control_links_by_mask_bit[link_bit] : - core->data_links_by_mask_bit[link_bit]; + core->data_links_by_mask_bit[link_bit].links[priority]; if (dest_link && (!link_exclusion || qd_bitmask_value(link_exclusion, link_bit) == 0)) { qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg); - qdr_forward_deliver_CT(core, dest_link, out_delivery); + qdr_forward_deliver_CT(core, dest_link, out_delivery, priority); fanout++; addr->deliveries_transit++; if (dest_link->link_type == QD_LINK_ROUTER) @@ -462,7 +485,7 @@ int qdr_forward_closest_CT(qdr_core_t *core, if (link_ref) { out_link = link_ref->link; out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg); - qdr_forward_deliver_CT(core, out_link, out_delivery); + qdr_forward_deliver_CT(core, out_link, out_delivery, qd_message_get_priority(msg)); // // If there are multiple local subscribers, rotate the list of link references @@ -509,10 +532,11 @@ int qdr_forward_closest_CT(qdr_core_t *core, else next_node = rnode; - out_link = control ? PEER_CONTROL_LINK(core, next_node) : PEER_DATA_LINK(core, next_node); + uint8_t priority = qd_message_get_priority(msg); + out_link = control ? PEER_CONTROL_LINK(core, next_node) : peer_data_link(core, next_node, priority); if (out_link) { out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg); - qdr_forward_deliver_CT(core, out_link, out_delivery); + qdr_forward_deliver_CT(core, out_link, out_delivery, priority); addr->deliveries_transit++; if (out_link->link_type == QD_LINK_ROUTER) core->deliveries_transit++; @@ -613,7 +637,8 @@ int qdr_forward_balanced_CT(qdr_core_t *core, for (QD_BITMASK_EACH(addr->rnodes, node_bit, c)) { qdr_node_t *rnode = core->routers_by_mask_bit[node_bit]; qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode; - qdr_link_t *link = PEER_DATA_LINK(core, next_node); + uint8_t priority = qd_message_get_priority(msg); + qdr_link_t *link = peer_data_link(core, next_node, priority); if (!link) continue; int link_bit = link->conn->mask_bit; int value = addr->outstanding_deliveries[link_bit]; @@ -660,7 +685,7 @@ int qdr_forward_balanced_CT(qdr_core_t *core, if (chosen_link) { qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg); - qdr_forward_deliver_CT(core, chosen_link, out_delivery); + qdr_forward_deliver_CT(core, chosen_link, out_delivery, qd_message_get_priority(msg)); // // If the delivery is unsettled and the link is inter-router, account for the outstanding delivery. @@ -761,8 +786,9 @@ bool qdr_forward_link_balanced_CT(qdr_core_t *core, else next_node = rnode; - if (next_node && PEER_DATA_LINK(core, next_node)) - conn = PEER_DATA_LINK(core, next_node)->conn; + qdr_link_t * pdl = peer_data_link(core, next_node, 0); + if (next_node && pdl) + conn = pdl->conn; } } } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/route_tables.c ---------------------------------------------------------------------- diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index 3072496..a4d3bc1 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -242,11 +242,14 @@ void qdr_route_table_setup_CT(qdr_core_t *core) core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width()); core->control_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width()); - core->data_links_by_mask_bit = NEW_PTR_ARRAY(qdr_link_t, qd_bitmask_width()); + core->data_links_by_mask_bit = NEW_ARRAY(qdr_priority_sheaf_t, qd_bitmask_width()); for (int idx = 0; idx < qd_bitmask_width(); idx++) { core->routers_by_mask_bit[idx] = 0; core->control_links_by_mask_bit[idx] = 0; - core->data_links_by_mask_bit[idx] = 0; + core->data_links_by_mask_bit[idx].count = 0; + for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) + core->data_links_by_mask_bit[idx].links[priority] = 0; + } } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index a68e115..5e3f4c2 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -20,6 +20,7 @@ */ #include "dispatch_private.h" +#include "message_private.h" #include <qpid/dispatch/router_core.h> #include <qpid/dispatch/threading.h> #include <qpid/dispatch/atomic.h> @@ -290,7 +291,8 @@ DEQ_DECLARE(qdr_node_t, qdr_node_list_t); void qdr_router_node_free(qdr_core_t *core, qdr_node_t *rnode); #define PEER_CONTROL_LINK(c,n) ((n->link_mask_bit >= 0) ? (c)->control_links_by_mask_bit[n->link_mask_bit] : 0) -#define PEER_DATA_LINK(c,n) ((n->link_mask_bit >= 0) ? (c)->data_links_by_mask_bit[n->link_mask_bit] : 0) +// PEER_DATA_LINK has gotten more complex with prioritized links, and is now a function, peer_data_link(). + struct qdr_router_ref_t { @@ -566,7 +568,7 @@ struct qdr_connection_t { qdr_connection_work_list_t work_list; sys_mutex_t *work_lock; qdr_link_ref_list_t links; - qdr_link_ref_list_t links_with_work; + qdr_link_ref_list_t links_with_work[QDR_N_PRIORITIES]; char *tenant_space; int tenant_space_len; qdr_connection_info_t *connection_info; @@ -662,6 +664,10 @@ struct qdr_conn_identifier_t { ALLOC_DECLARE(qdr_conn_identifier_t); DEQ_DECLARE(qdr_exchange_t, qdr_exchange_list_t); +typedef struct qdr_priority_sheaf_t { + qdr_link_t *links[QDR_N_PRIORITIES]; + int count; +} qdr_priority_sheaf_t; struct qdr_core_t { qd_dispatch_t *qd; @@ -738,7 +744,7 @@ struct qdr_core_t { qd_bitmask_t *neighbor_free_mask; qdr_node_t **routers_by_mask_bit; qdr_link_t **control_links_by_mask_bit; - qdr_link_t **data_links_by_mask_bit; + qdr_priority_sheaf_t *data_links_by_mask_bit; uint64_t cost_epoch; uint64_t next_tag; @@ -838,7 +844,7 @@ void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work); void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local); bool qdr_is_addr_treatment_multicast(qdr_address_t *addr); qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *peer, qdr_link_t *link, qd_message_t *msg); -void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv); +void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, int priority); void qdr_connection_free(qdr_connection_t *conn); void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn); qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/710b7059/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 2031b0d..ae1dbd9 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -743,7 +743,8 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar // if (link->stalled_outbound) { link->stalled_outbound = false; - qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK); + // Adding this work at priority 0. + qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK); activate = true; } @@ -781,7 +782,8 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t *action, bool discar if (work) DEQ_INSERT_TAIL(link->work_list, work); if (DEQ_SIZE(link->undelivered) > 0 || drain_was_set) { - qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK); + // Adding this work at priority 0. + qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK); activate = true; } sys_mutex_unlock(link->conn->work_lock); @@ -999,7 +1001,8 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis peer->tag_length = action->args.connection.tag_length; memcpy(peer->tag, action->args.connection.tag, peer->tag_length); - qdr_forward_deliver_CT(core, link->connected_link, peer); + // Adding this work at priority 0. + qdr_forward_deliver_CT(core, link->connected_link, peer, 0); link->total_deliveries++; @@ -1171,7 +1174,8 @@ void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv) if (work) { sys_mutex_lock(peer->link->conn->work_lock); if (work->processing || work == DEQ_HEAD(peer->link->work_list)) { - qdr_add_link_ref(&peer->link->conn->links_with_work, peer->link, QDR_LINK_LIST_CLASS_WORK); + // Adding this work at priority 0. + qdr_add_link_ref(peer->link->conn->links_with_work, peer->link, QDR_LINK_LIST_CLASS_WORK); sys_mutex_unlock(peer->link->conn->work_lock); // @@ -1357,7 +1361,8 @@ void qdr_delivery_push_CT(qdr_core_t *core, qdr_delivery_t *dlv) if (dlv->where != QDR_DELIVERY_IN_UNDELIVERED) { qdr_delivery_incref(dlv, "qdr_delivery_push_CT - add to updated list"); qdr_add_delivery_ref_CT(&link->updated_deliveries, dlv); - qdr_add_link_ref(&link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK); + // Adding this work at priority 0. + qdr_add_link_ref(link->conn->links_with_work, link, QDR_LINK_LIST_CLASS_WORK); activate = true; } sys_mutex_unlock(link->conn->work_lock); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org