This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new 8d5d3e4 DISPATCH-2219: fix inter-router link priorities 8d5d3e4 is described below commit 8d5d3e41cda09cdc56fceab3bb7f52add87eac6a Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Fri Sep 10 13:18:07 2021 -0400 DISPATCH-2219: fix inter-router link priorities This closes #1367 --- include/qpid/dispatch/amqp.h | 7 +++ src/message_private.h | 4 -- src/router_core/connections.c | 60 ++++++++++++++++-------- src/router_core/core_link_endpoint.c | 3 +- src/router_core/forwarder.c | 3 +- src/router_core/modules/edge_router/addr_proxy.c | 11 +++-- src/router_core/route_control.c | 2 +- src/router_core/router_core_private.h | 6 ++- 8 files changed, 63 insertions(+), 33 deletions(-) diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index 9b8665a..7c3ef08 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -218,4 +218,11 @@ extern const char * const QD_AMQP_COND_MESSAGE_SIZE_EXCEEDED; #define QD_AMQP_LINK_ROLE_RECEIVER true /// @}; +/** @name AMQP Message priority. */ +/// @{ +#define QDR_N_PRIORITIES 10 +#define QDR_MAX_PRIORITY (QDR_N_PRIORITIES - 1) +#define QDR_DEFAULT_PRIORITY 4 +/// @}; + #endif diff --git a/src/message_private.h b/src/message_private.h index e7217e8..944612e 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -179,10 +179,6 @@ ALLOC_DECLARE(qd_message_content_t); /** Initialize logging */ void qd_message_initialize(); -#define QDR_N_PRIORITIES 10 -#define QDR_MAX_PRIORITY (QDR_N_PRIORITIES - 1) -#define QDR_DEFAULT_PRIORITY 4 - // These expect content->lock to be locked. bool _Q2_holdoff_should_block_LH(const qd_message_content_t *content); bool _Q2_holdoff_should_unblock_LH(const qd_message_content_t *content); diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 4f28d48..12812ad 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -618,6 +618,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, link->zero_credit_time = conn->core->uptime_ticks; link->terminus_survives_disconnect = qdr_terminus_survives_disconnect(local_terminus); link->no_route = no_route; + link->priority = QDR_DEFAULT_PRIORITY; link->strip_annotations_in = conn->strip_annotations_in; link->strip_annotations_out = conn->strip_annotations_out; @@ -630,9 +631,10 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, tsan_reset_delivery_ids(initial_delivery, link->conn->identity, link->identity); } - if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_CONTROL)) + if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_CONTROL)) { link->link_type = QD_LINK_CONTROL; - else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_DATA)) + link->priority = QDR_MAX_PRIORITY; + } else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_DATA)) link->link_type = QD_LINK_ROUTER; else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_EDGE_DOWNLINK)) { if (conn->core->router_mode == QD_ROUTER_MODE_INTERIOR && @@ -1120,7 +1122,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, qd_direction_t dir, qdr_terminus_t *source, qdr_terminus_t *target, - qd_session_class_t ssn_class) + qd_session_class_t ssn_class, + uint8_t priority) { // // Create a new link, initiated by the router core. This will involve issuing a first-attach outbound. @@ -1148,6 +1151,7 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, link->attach_count = 1; link->core_ticks = core->uptime_ticks; link->zero_credit_time = core->uptime_ticks; + link->priority = priority; link->strip_annotations_in = conn->strip_annotations_in; link->strip_annotations_out = conn->strip_annotations_out; @@ -1407,15 +1411,23 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo // inter-router links: Two (in and out) for control, 2 * QDR_N_PRIORITIES for // routed-message transfer. // - (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control(), QD_SSN_ROUTER_CONTROL); - (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control(), QD_SSN_ROUTER_CONTROL); + (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, + qdr_terminus_router_control(), qdr_terminus_router_control(), + QD_SSN_ROUTER_CONTROL, QDR_MAX_PRIORITY); + (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, + qdr_terminus_router_control(), qdr_terminus_router_control(), + QD_SSN_ROUTER_CONTROL, QDR_MAX_PRIORITY); STATIC_ASSERT((QD_SSN_ROUTER_DATA_PRI_9 - QD_SSN_ROUTER_DATA_PRI_0 + 1) == QDR_N_PRIORITIES, PRIORITY_SESSION_NOT_SAME); for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) { // a session is reserved for each priority link qd_session_class_t sc = (qd_session_class_t)(QD_SSN_ROUTER_DATA_PRI_0 + priority); - (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data(), sc); - (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data(), sc); + (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, + qdr_terminus_router_data(), qdr_terminus_router_data(), + sc, priority); + (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, + qdr_terminus_router_data(), qdr_terminus_router_data(), + sc, priority); } } } @@ -1462,12 +1474,12 @@ qdr_link_t *qdr_connection_new_streaming_link_CT(qdr_core_t *core, qdr_connectio case QDR_ROLE_INTER_ROUTER: out_link = qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data(), - QD_SSN_LINK_STREAMING); + QD_SSN_LINK_STREAMING, QDR_DEFAULT_PRIORITY); break; case QDR_ROLE_EDGE_CONNECTION: out_link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, QD_OUTGOING, qdr_terminus(0), qdr_terminus(0), - QD_SSN_LINK_STREAMING); + QD_SSN_LINK_STREAMING, QDR_DEFAULT_PRIORITY); break; default: assert(false); @@ -1599,14 +1611,20 @@ static void qdr_detach_link_control_CT(qdr_core_t *core, qdr_connection_t *conn, static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link) { assert(link->link_type == QD_LINK_ROUTER); - // The first QDR_N_PRIORITIES (10) QDR_LINK_ROUTER links to attach over the - // connection are the shared priority links. These links are attached in - // priority order starting at zero. - int next_pri = core->data_links_by_mask_bit[conn->mask_bit].count; - if (next_pri < QDR_N_PRIORITIES) { - link->priority = next_pri; - core->data_links_by_mask_bit[conn->mask_bit].links[next_pri] = link; - core->data_links_by_mask_bit[conn->mask_bit].count += 1; + // The first 2 x QDR_N_PRIORITIES (10) QDR_LINK_ROUTER links to attach over + // the inter-router connection are the shared priority links. These links + // are attached in priority order starting at zero. + if (link->link_direction == QD_OUTGOING) { + int next_pri = core->data_links_by_mask_bit[conn->mask_bit].count; + if (next_pri < QDR_N_PRIORITIES) { + link->priority = next_pri; + core->data_links_by_mask_bit[conn->mask_bit].links[next_pri] = link; + core->data_links_by_mask_bit[conn->mask_bit].count += 1; + } + } else { + if (conn->next_pri < QDR_N_PRIORITIES) { + link->priority = conn->next_pri++; + } } } @@ -1792,8 +1810,10 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act break; } - case QD_LINK_CONTROL: case QD_LINK_ROUTER: + qdr_attach_link_data_CT(core, conn, link); + // fall-through: + case QD_LINK_CONTROL: qdr_link_outbound_second_attach_CT(core, link, source, target); qdr_link_issue_credit_CT(core, link, link->capacity, false); break; @@ -1918,8 +1938,10 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *ac qdr_link_issue_credit_CT(core, link, link->capacity, false); break; - case QD_LINK_CONTROL: case QD_LINK_ROUTER: + qdr_attach_link_data_CT(core, conn, link); + // fall-through + case QD_LINK_CONTROL: qdr_link_issue_credit_CT(core, link, link->capacity, false); break; diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c index fdecf92..b65ac6a 100644 --- a/src/router_core/core_link_endpoint.c +++ b/src/router_core/core_link_endpoint.c @@ -76,8 +76,7 @@ qdrc_endpoint_t *qdrc_endpoint_create_link_CT(qdr_core_t *core, ep->desc = desc; ep->link_context = link_context; ep->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, dir, source, target, - QD_SSN_CORE_ENDPOINT); - + QD_SSN_CORE_ENDPOINT, QDR_DEFAULT_PRIORITY); ep->link->core_endpoint = ep; return ep; } diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 642b4cd..3bc3a8f 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -1037,7 +1037,8 @@ void qdr_forward_link_direct_CT(qdr_core_t *core, out_link->zero_credit_time = core->uptime_ticks; out_link->strip_annotations_in = conn->strip_annotations_in; out_link->strip_annotations_out = conn->strip_annotations_out; - + out_link->priority = in_link->priority; + if (strip) { out_link->strip_prefix = strip; } diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c index 0e76626..53ead53 100644 --- a/src/router_core/modules/edge_router/addr_proxy.c +++ b/src/router_core/modules/edge_router/addr_proxy.c @@ -159,7 +159,8 @@ static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_t } qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_INCOMING, - term, qdr_terminus_normal(0), QD_SSN_ENDPOINT); + term, qdr_terminus_normal(0), QD_SSN_ENDPOINT, + QDR_DEFAULT_PRIORITY); qdr_core_bind_address_link_CT(ap->core, addr, link); addr->edge_inlink = link; } @@ -202,7 +203,8 @@ static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key, qdr_address_ } qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, QD_LINK_ENDPOINT, QD_OUTGOING, - qdr_terminus_normal(0), term, QD_SSN_ENDPOINT); + qdr_terminus_normal(0), term, QD_SSN_ENDPOINT, + QDR_DEFAULT_PRIORITY); addr->edge_outlink = link; } } @@ -281,7 +283,8 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c qdr_link_t *out_link = qdr_create_link_CT(ap->core, conn, QD_LINK_ENDPOINT, QD_OUTGOING, qdr_terminus(0), qdr_terminus(0), - QD_SSN_ENDPOINT); + QD_SSN_ENDPOINT, + QDR_DEFAULT_PRIORITY); // // Associate the anonymous sender with the edge connection address. This will cause @@ -297,7 +300,7 @@ static void on_conn_event(void *context, qdrc_event_t event, qdr_connection_t *c QD_LINK_ENDPOINT, QD_INCOMING, qdr_terminus_edge_downlink(ap->core->router_id), qdr_terminus_edge_downlink(0), - QD_SSN_ENDPOINT); + QD_SSN_ENDPOINT, QDR_DEFAULT_PRIORITY); // // Attach a receiving link for edge address tracking updates. diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index 7adfdc3..3f898b7 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -257,7 +257,7 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, qdr_auto_link_t *al, qdr } else qdr_terminus_set_address(term, &key[2]); // truncate the "Mp" annotation (where p = phase) al->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, al->dir, source, target, - QD_SSN_ENDPOINT); + QD_SSN_ENDPOINT, QDR_DEFAULT_PRIORITY); al->link->auto_link = al; al->link->phase = al->phase; al->link->fallback = al->fallback; diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 9754465..219290c 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -673,6 +673,8 @@ struct qdr_connection_t { qdr_core_t *core; bool incoming; bool in_activate_list; + bool closed; // This bit is used in the case where a client is trying to force close this connection. + uint8_t next_pri; // for incoming inter-router data links qdr_connection_role_t role; int inter_router_cost; qdr_conn_identifier_t *conn_id; @@ -693,7 +695,6 @@ struct qdr_connection_t { qdr_conn_oper_status_t oper_status; qdr_conn_admin_status_t admin_status; qdr_error_t *error; - bool closed; // This bit is used in the case where a client is trying to force close this connection. uint32_t conn_uptime; // Timestamp which can be used to calculate the number of seconds this connection has been up and running. uint32_t last_delivery_time; // Timestamp which can be used to calculate the number of seconds since the last delivery arrived on this connection. bool enable_protocol_trace; // Has trace level logging been turned on for this connection. @@ -1010,7 +1011,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, qd_direction_t dir, qdr_terminus_t *source, qdr_terminus_t *target, - qd_session_class_t ssn_class); + qd_session_class_t ssn_class, + uint8_t priority); void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_error_t *error, qdr_condition_t condition, bool close); void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org