Repository: qpid-dispatch Updated Branches: refs/heads/master 9c5666f6d -> bb4f31219
DISPATCH-1139 : address priority support and test Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/bb4f3121 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/bb4f3121 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/bb4f3121 Branch: refs/heads/master Commit: bb4f3121989cb4b91d4e364287868da1594f3acf Parents: 9c5666f Author: Michael Goulish <mgoul...@redhat.com> Authored: Thu Oct 18 11:07:45 2018 -0400 Committer: Michael Goulish <mgoul...@redhat.com> Committed: Thu Oct 18 11:07:45 2018 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/message.h | 5 +- python/qpid_dispatch/management/qdrouter.json | 6 + src/message.c | 15 +- src/message_private.h | 1 + src/router_config.c | 5 + src/router_core/agent_config_address.c | 19 + src/router_core/agent_config_address.h | 2 +- src/router_core/connections.c | 14 +- src/router_core/core_link_endpoint.c | 2 +- src/router_core/forwarder.c | 10 +- src/router_core/route_control.c | 2 +- src/router_core/router_core.c | 1 + src/router_core/router_core_private.h | 6 +- tests/system_tests_priority.py | 417 ++++++++++++++++----- 14 files changed, 378 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/include/qpid/dispatch/message.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 02a950e..8af8c9b 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -422,10 +422,9 @@ void qd_message_set_aborted(const qd_message_t *msg, bool aborted); /** * Return message priority * @param msg A pointer to the message - * @param priority [out] The priority value, if present - * @return True iff the priority was present in the message header + * @return The message priority value. Default if not present. */ -bool qd_message_get_priority(qd_message_t *msg, uint8_t *priority); +uint8_t qd_message_get_priority(qd_message_t *msg); ///@} http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/python/qpid_dispatch/management/qdrouter.json ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index e31f54c..21bc4cf 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -1111,6 +1111,12 @@ "description": "Advanced - Override the egress phase for this address", "create": true, "required": false + }, + "priority": { + "type": "integer", + "description": "All messages sent to this address which lack an intrinsic priority will be assigned this priority.", + "create": true, + "required": false } } }, http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/message.c ---------------------------------------------------------------------- diff --git a/src/message.c b/src/message.c index a3682c1..66d9955 100644 --- a/src/message.c +++ b/src/message.c @@ -789,6 +789,7 @@ static void qd_message_parse_priority(qd_message_t *in_msg) if (qd_parse_tag(priority_field) != QD_AMQP_NULL) { uint32_t value = qd_parse_as_uint(priority_field); content->priority = value >= QDR_N_PRIORITIES ? QDR_N_PRIORITIES - 1 : (uint8_t) (value & 0x00ff); + content->priority = value > QDR_MAX_PRIORITY ? QDR_MAX_PRIORITY : (uint8_t) (value & 0x00ff); content->priority_present = true; } } @@ -1077,17 +1078,21 @@ void qd_message_add_fanout(qd_message_t *in_msg) sys_atomic_inc(&msg->content->fanout); } -bool qd_message_get_priority(qd_message_t *msg, uint8_t *priority) +/** +* There are two sources of priority information -- +* message and address. Address takes precedence, falling +* through when no address priority has been specified. +* This also means that messages must always have a priority, +* using default value if sender leaves it unspecified. +*/ +uint8_t qd_message_get_priority(qd_message_t *msg) { qd_message_content_t *content = MSG_CONTENT(msg); if (!content->priority_parsed) qd_message_parse_priority(msg); - if (content->priority_present) - *priority = content->priority; - - return content->priority_present; + return content->priority_present ? content->priority : QDR_DEFAULT_PRIORITY; } bool qd_message_receive_complete(qd_message_t *in_msg) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/message_private.h ---------------------------------------------------------------------- diff --git a/src/message_private.h b/src/message_private.h index 2f3a400..3658597 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -148,6 +148,7 @@ void qd_message_initialize(); qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *msg); #define QDR_N_PRIORITIES 10 +#define QDR_MAX_PRIORITY (QDR_N_PRIORITIES - 1) #define QDR_DEFAULT_PRIORITY 4 ///@} http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_config.c ---------------------------------------------------------------------- diff --git a/src/router_config.c b/src/router_config.c index 9ae9f9f..837dde5 100644 --- a/src/router_config.c +++ b/src/router_config.c @@ -78,6 +78,7 @@ qd_error_t qd_router_configure_address(qd_router_t *router, qd_entity_t *entity) bool waypoint = qd_entity_opt_bool(entity, "waypoint", false); long in_phase = qd_entity_opt_long(entity, "ingressPhase", -1); long out_phase = qd_entity_opt_long(entity, "egressPhase", -1); + long priority = qd_entity_opt_long(entity, "priority", -1); // // Formulate this configuration create it through the core management API. @@ -108,6 +109,10 @@ qd_error_t qd_router_configure_address(qd_router_t *router, qd_entity_t *entity) qd_compose_insert_string(body, "waypoint"); qd_compose_insert_bool(body, waypoint); + qd_compose_insert_string(body, "priority"); + qd_compose_insert_long(body, priority); + + if (in_phase >= 0) { qd_compose_insert_string(body, "ingressPhase"); qd_compose_insert_int(body, in_phase); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/agent_config_address.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent_config_address.c b/src/router_core/agent_config_address.c index 0c36f03..31be6ee 100644 --- a/src/router_core/agent_config_address.c +++ b/src/router_core/agent_config_address.c @@ -31,6 +31,7 @@ #define QDR_CONFIG_ADDRESS_IN_PHASE 6 #define QDR_CONFIG_ADDRESS_OUT_PHASE 7 #define QDR_CONFIG_ADDRESS_PATTERN 8 +#define QDR_CONFIG_ADDRESS_PRIORITY 9 const char *qdr_config_address_columns[] = {"name", @@ -42,6 +43,7 @@ const char *qdr_config_address_columns[] = "ingressPhase", "egressPhase", "pattern", + "priority", 0}; const char *CONFIG_ADDRESS_TYPE = "org.apache.qpid.dispatch.router.config.address"; @@ -119,6 +121,10 @@ static void qdr_config_address_insert_column_CT(qdr_address_config_t *addr, int case QDR_CONFIG_ADDRESS_OUT_PHASE: qd_compose_insert_int(body, addr->out_phase); break; + + case QDR_CONFIG_ADDRESS_PRIORITY: + qd_compose_insert_int(body, addr->priority); + break; } } @@ -340,6 +346,7 @@ void qdra_config_address_create_CT(qdr_core_t *core, qd_parsed_field_t *waypoint_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_WAYPOINT]); qd_parsed_field_t *in_phase_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_IN_PHASE]); qd_parsed_field_t *out_phase_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_OUT_PHASE]); + qd_parsed_field_t *priority_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_PRIORITY]); // // Either a prefix or a pattern field is mandatory. Prefix and pattern @@ -386,6 +393,7 @@ void qdra_config_address_create_CT(qdr_core_t *core, bool waypoint = waypoint_field ? qd_parse_as_bool(waypoint_field) : false; long in_phase = in_phase_field ? qd_parse_as_long(in_phase_field) : -1; long out_phase = out_phase_field ? qd_parse_as_long(out_phase_field) : -1; + long priority = priority_field ? qd_parse_as_long(priority_field) : -1; // // Handle the address-phasing logic. If the phases are provided, use them. Otherwise @@ -407,6 +415,16 @@ void qdra_config_address_create_CT(qdr_core_t *core, } // + // Validate the priority values. + // + if (priority > QDR_MAX_PRIORITY ) { + query->status = QD_AMQP_BAD_REQUEST; + query->status.description = "Priority value, if present, must be between 0 and QDR_MAX_PRIORITY"; + qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of %s: %s", CONFIG_ADDRESS_TYPE, query->status.description); + break; + } + + // // The request is good. Create the entity and insert it into the hash index and list. // @@ -419,6 +437,7 @@ void qdra_config_address_create_CT(qdr_core_t *core, addr->out_phase = out_phase; addr->is_prefix = !!prefix_field; addr->pattern = pattern; + addr->priority = priority; pattern = 0; qd_iterator_reset_view(iter, ITER_VIEW_ALL); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/agent_config_address.h ---------------------------------------------------------------------- diff --git a/src/router_core/agent_config_address.h b/src/router_core/agent_config_address.h index fd94b76..38c3365 100644 --- a/src/router_core/agent_config_address.h +++ b/src/router_core/agent_config_address.h @@ -35,7 +35,7 @@ void qdra_config_address_get_CT(qdr_core_t *core, char *qdra_config_address_validate_pattern_CT(qd_parsed_field_t *pattern_field, bool is_prefix, const char **error); -#define QDR_CONFIG_ADDRESS_COLUMN_COUNT 9 +#define QDR_CONFIG_ADDRESS_COLUMN_COUNT 10 const char *qdr_config_address_columns[QDR_CONFIG_ADDRESS_COLUMN_COUNT + 1]; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 0117efd..c8d3a04 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -220,7 +220,7 @@ int qdr_connection_process(qdr_connection_t *conn) sys_mutex_lock(conn->work_lock); DEQ_MOVE(conn->work_list, work_list); - for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) { + for (int priority = 0; priority <= QDR_MAX_PRIORITY; ++ priority) { DEQ_MOVE(conn->links_with_work[priority], links_with_work[priority]); // @@ -258,7 +258,7 @@ int qdr_connection_process(qdr_connection_t *conn) } // Process the links_with_work array from highest to lowest priority. - for (int priority = QDR_N_PRIORITIES - 1; priority >= 0; -- priority) { + for (int priority = QDR_MAX_PRIORITY; priority >= 0; -- priority) { do { qdr_link_work_t *link_work; free_link = false; @@ -1010,7 +1010,7 @@ void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_ } -qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase) +qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase, int *priority) { qdr_address_config_t *addr = 0; qd_iterator_view_t old_view = qd_iterator_get_view(iter); @@ -1024,6 +1024,7 @@ qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connec if (in_phase) *in_phase = addr ? addr->in_phase : 0; if (out_phase) *out_phase = addr ? addr->out_phase : 0; + if (priority) *priority = addr ? addr->priority : -1; return addr ? addr->treatment : core->qd->default_treatment; @@ -1230,7 +1231,8 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, int in_phase; int out_phase; int addr_phase; - qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, iter, &in_phase, &out_phase); + int priority; + qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, iter, &in_phase, &out_phase, &priority); qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override @@ -1259,6 +1261,8 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, if (!!addr && addr->core_endpoint != 0) *core_endpoint = true; + if (addr) + addr->priority = priority; return addr; } @@ -1465,7 +1469,7 @@ static void qdr_attach_link_data_CT(qdr_core_t *core, qdr_connection_t *conn, qd // As inter-router links are attached to this connection, they // are assigned priorities in the order in which they are attached. int next_slot = core->data_links_by_mask_bit[conn->mask_bit].count ++; - if (next_slot >= QDR_N_PRIORITIES) { + if (next_slot > QDR_MAX_PRIORITY) { qd_log(core->log, QD_LOG_ERROR, "Attempt to attach too many inter-router links for priority sheaf."); } link->priority = next_slot; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/core_link_endpoint.c ---------------------------------------------------------------------- diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c index 05e7fb8..01df2c5 100644 --- a/src/router_core/core_link_endpoint.c +++ b/src/router_core/core_link_endpoint.c @@ -42,7 +42,7 @@ void qdrc_endpoint_bind_mobile_address_CT(qdr_core_t *core, qd_hash_retrieve(core->addr_hash, iter, (void*) &addr); if (!addr) { - qd_address_treatment_t treatment = qdr_treatment_for_address_CT(core, 0, iter, 0, 0); + qd_address_treatment_t treatment = qdr_treatment_for_address_CT(core, 0, iter, 0, 0, 0); if (treatment == QD_TREATMENT_UNAVAILABLE) treatment = QD_TREATMENT_ANYCAST_BALANCED; addr = qdr_address_CT(core, treatment); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index 80f9b07..e1d6d57 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -280,15 +280,13 @@ void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, qdr_li * This function returns a priority value for a message (and address). If the message * has no priority header, the default priority is chosen. * - * TODO: Add the ability to get the priority from the address if not present in the message + * If the address has a defined priority, that value takes precedence. + * Otherwise the message priority is used, which has a default value + * if none was explicitly set. */ static uint8_t qdr_forward_effective_priority(qd_message_t *msg, qdr_address_t *addr) { - uint8_t priority; - bool has_priority = qd_message_get_priority(msg, &priority); - if (!has_priority) - priority = QDR_DEFAULT_PRIORITY; - return priority; + return addr->priority >= 0 ? addr->priority : qd_message_get_priority(msg); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/route_control.c ---------------------------------------------------------------------- diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c index 977ac5f..98ccf61 100644 --- a/src/router_core/route_control.c +++ b/src/router_core/route_control.c @@ -472,7 +472,7 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t *core, qd_hash_retrieve(core->addr_hash, iter, (void*) &al->addr); if (!al->addr) { - qd_address_treatment_t treatment = qdr_treatment_for_address_CT(core, 0, iter, 0, 0); + qd_address_treatment_t treatment = qdr_treatment_for_address_CT(core, 0, iter, 0, 0, 0); if (treatment == QD_TREATMENT_UNAVAILABLE) { //if associated address is not defined, assume balanced treatment = QD_TREATMENT_ANYCAST_BALANCED; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/router_core.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 19c8f35..76ceba9 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -299,6 +299,7 @@ qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_treatment_t treatment addr->rnodes = qd_bitmask(0); addr->add_prefix = 0; addr->del_prefix = 0; + addr->priority = -1; return addr; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 9e1cfcc..b1da8a9 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -284,6 +284,7 @@ struct qdr_query_t { int next_offset; bool more; qd_amqp_error_t status; + uint8_t priority; }; DEQ_DECLARE(qdr_query_t, qdr_query_list_t); @@ -521,6 +522,8 @@ struct qdr_address_t { uint64_t deliveries_ingress_route_container; ///@} + + int priority; }; ALLOC_DECLARE(qdr_address_t); @@ -544,6 +547,7 @@ struct qdr_address_config_t { qd_address_treatment_t treatment; int in_phase; int out_phase; + int priority; }; ALLOC_DECLARE(qdr_address_config_t); @@ -877,7 +881,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *pe void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv); void qdr_connection_free(qdr_connection_t *conn); void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn); -qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase); +qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase, int *priority); qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, qd_iterator_t *iter); qdr_edge_t *qdr_edge(qdr_core_t *); void qdr_edge_free(qdr_edge_t *); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/tests/system_tests_priority.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_priority.py b/tests/system_tests_priority.py index 1180a2d..887b574 100644 --- a/tests/system_tests_priority.py +++ b/tests/system_tests_priority.py @@ -17,6 +17,7 @@ # under the License. # + from __future__ import unicode_literals from __future__ import division from __future__ import absolute_import @@ -24,19 +25,21 @@ from __future__ import print_function import unittest2 as unittest from proton import Message, Timeout -from system_test import TestCase, Qdrouterd, main_module +from system_test import TestCase, Qdrouterd, main_module, Process from proton.handlers import MessagingHandler from proton.reactor import Container import time import math +from qpid_dispatch_internal.compat import UNICODE -import pdb -#================================================================ + + +#------------------------------------------------ # Helper classes for all tests. -#================================================================ +#------------------------------------------------ class Timeout(object): """ @@ -52,6 +55,31 @@ class Timeout(object): +class ManagementMessageHelper ( object ): + """ + Format management messages. + """ + def __init__ ( self, reply_addr ): + self.reply_addr = reply_addr + + def make_router_link_query ( self ) : + props = { 'count': '100', + 'operation': 'QUERY', + 'entityType': 'org.apache.qpid.dispatch.router.link', + 'name': 'self', + 'type': 'org.amqp.management' + } + attrs = [] + attrs.append ( UNICODE('linkType') ) + attrs.append ( UNICODE('linkDir') ) + attrs.append ( UNICODE('deliveryCount') ) + attrs.append ( UNICODE('priority') ) + + msg_body = { } + msg_body [ 'attributeNames' ] = attrs + return Message ( body=msg_body, properties=props, reply_to=self.reply_addr ) + + #================================================================ @@ -79,102 +107,124 @@ class PriorityTests ( TestCase ): cls.routers = [] + # The sender will send all its messages with magic_message_priority. + # The first router will set target addr priority to magic_address_priority. + # It is important *not* to choose 4 for either of these priorities, + # since that is the default message priority. + cls.magic_message_priority = 3 + cls.magic_address_priority = 7 + link_cap = 100 A_client_port = cls.tester.get_port() B_client_port = cls.tester.get_port() + C_client_port = cls.tester.get_port() A_inter_router_port = cls.tester.get_port() B_inter_router_port = cls.tester.get_port() + C_inter_router_port = cls.tester.get_port() A_config = [ ( 'listener', - { 'port': A_client_port, - 'role': 'normal', - 'stripAnnotations': 'no', - 'linkCapacity': link_cap + { 'port' : A_client_port, + 'role' : 'normal', + 'linkCapacity' : link_cap, + 'stripAnnotations' : 'no' } ), ( 'listener', - { 'role': 'inter-router', - 'port': A_inter_router_port, - 'stripAnnotations': 'no', - 'linkCapacity': link_cap + { 'role' : 'inter-router', + 'port' : A_inter_router_port, + 'linkCapacity' : link_cap, + 'stripAnnotations' : 'no' } - ) + ), + ( 'address', + { 'prefix' : 'speedy', + 'priority' : cls.magic_address_priority, + 'distribution' : 'closest' + } + ), ] cls.B_config = [ ( 'listener', - { 'port': B_client_port, - 'role': 'normal', - 'stripAnnotations': 'no', - 'linkCapacity': link_cap + { 'port' : B_client_port, + 'role' : 'normal', + 'linkCapacity' : link_cap, + 'stripAnnotations' : 'no' } ), ( 'listener', - { 'role': 'inter-router', - 'port': B_inter_router_port, - 'stripAnnotations': 'no', - 'linkCapacity': link_cap + { 'role' : 'inter-router', + 'port' : B_inter_router_port, + 'linkCapacity' : link_cap, + 'stripAnnotations' : 'no' } ), ( 'connector', - { 'name': 'BA_connector', - 'role': 'inter-router', - 'port': A_inter_router_port, - 'verifyHostname': 'no', - 'stripAnnotations': 'no', - 'linkCapacity': link_cap + { 'name' : 'BA_connector', + 'role' : 'inter-router', + 'port' : A_inter_router_port, + 'verifyHostname' : 'no', + 'linkCapacity' : link_cap, + 'stripAnnotations' : 'no' } ) ] + C_config = [ + ( 'listener', + { 'port' : C_client_port, + 'role' : 'normal', + 'linkCapacity' : link_cap, + 'stripAnnotations' : 'no' + } + ), + ( 'listener', + { 'role' : 'inter-router', + 'port' : C_inter_router_port, + 'linkCapacity' : link_cap, + 'stripAnnotations' : 'no' + } + ), + ( 'connector', + { 'name' : 'CB_connector', + 'role' : 'inter-router', + 'port' : B_inter_router_port, + 'verifyHostname' : 'no', + 'linkCapacity' : link_cap, + 'stripAnnotations' : 'no' + } + ) + ] + router ( 'A', A_config ) router ( 'B', cls.B_config ) + router ( 'C', C_config ) router_A = cls.routers[0] router_B = cls.routers[1] + router_C = cls.routers[2] router_A.wait_router_connected('B') + router_A.wait_router_connected('C') cls.client_addrs = ( router_A.addresses[0], - router_B.addresses[0] + router_B.addresses[0], + router_C.addresses[0] ) - def kill_router_B ( self ) : - status = self.routers[1].poll() - self.routers[1].teardown() - status = self.routers[1].poll() - del self.routers[1] - - - def new_router_B ( self ) : - name = 'B' - config = [ ('router', {'mode': 'interior', 'id': name}), - ('address', {'prefix': 'closest', 'distribution': 'closest'}), - ('address', {'prefix': 'balanced', 'distribution': 'balanced'}), - ('address', {'prefix': 'multicast', 'distribution': 'multicast'}) - ] \ - + self.B_config - - config = Qdrouterd.Config(config) - self.routers.append(self.tester.qdrouterd(name, config, wait=False)) - # We need a little pause here, or A will always show - # a status of None (i.e. Good) even if it is, in fact, - # dying. (i.e. Bad) - time.sleep ( 2 ) - status = self.routers[0].poll() - return status - - + def test_priority ( self ): name = 'test_01' test = Priority ( self, name, self.client_addrs, - "closest/01" + "speedy/01", + self.magic_message_priority, + self.magic_address_priority ) test.run() self.assertEqual ( None, test.error ) @@ -185,88 +235,247 @@ class PriorityTests ( TestCase ): # Tests #================================================================ + class Priority ( MessagingHandler ): - def __init__ ( self, parent, test_name, client_addrs, destination ): + # In this test we will have a linear network of 3 routers. + # The sender attaches at A, and the receiver at C. + # + # receiver <--- C <--- B <--- A <--- sender + # + # Priority -- whether message or address -- only operates + # on inter-router links. The links from A to B will show + # address-priority overriding message-priority. When a + # router does not set any message priority, then messages + # are routed acording to their intrinsic priority which + # was assigned by the sender. This will be shown by the + # connection from router B to C. + # + # The address that the clients use has a prefix of 'speedy'. + # Router A will assign a priority of magic_addr_priority to all + # 'speedy' addresses. + # No other routers will assign any address priorities. + # + # The sending client will assign a priority of magic_msg_priority + # to all the messages it sends. + # + # So what should happen is: + # + # 1. at router A, all the 'speedy' messages go out with + # magic_addr_priority, because addr priority takes precedence. + # + # 2. at router B, they all go out with magic_msg_priority, + # because that router has not assigned any addr priority, + # so the intrinsic message priorities are used. + # + # 3. Nothing special happens at router C, because it is sending + # the messages out over a connection to an endpoint, which + # is not an inter-router connection. + # + # In this test we will send a known number of messages and + # then send management queries to A and B to learn at what + # priorities the messages actually travelled. + + def __init__ ( self, parent, test_name, client_addrs, destination, magic_msg_priority, magic_addr_priority ): super(Priority, self).__init__(prefetch=10) self.parent = parent self.client_addrs = client_addrs self.dest = destination + self.magic_msg_priority = magic_msg_priority + self.magic_addr_priority = magic_addr_priority + self.error = None self.sender = None self.receiver = None - self.test_timer = None - self.fail_timer = None + self.send_timer = None + self.n_messages = 100 self.n_sent = 0 - self.n_accepted = 0 - self.n_released = 0 self.send_conn = None self.recv_conn = None - self.n_messages = 100 self.n_received = 0 - self.test_start_time = None - self.test_end_time = None - self.timer_count = 0 self.reactor = None - self.router_B_count = 0 + self.timer_count = 0 + self.sent_queries = False + self.finishing = False + self.goals = 0 + self.n_goals = 2 + self.connections = list() + self.A_addr = self.client_addrs[0] + self.B_addr = self.client_addrs[1] + self.C_addr = self.client_addrs[2] + self.routers = { + 'A' : dict(), + 'B' : dict() + } # Shut down everything and exit. def bail ( self, text ): + self.send_timer.cancel ( ) + self.finishing = True self.error = text + for conn in self.connections : + conn.close() - self.send_conn.close() - self.recv_conn.close ( ) - self.test_timer.cancel ( ) - self.fail_timer.cancel ( ) + + def make_connection ( self, event, addr ) : + cnx = event.container.connect ( addr ) + self.connections.append ( cnx ) + return cnx + + + def on_start ( self, event ): + self.reactor = event.reactor + self.send_conn = self.make_connection ( event, self.A_addr ) + self.recv_conn = self.make_connection ( event, self.C_addr ) + + self.sender = event.container.create_sender ( self.send_conn, self.dest ) + self.receiver = event.container.create_receiver ( self.recv_conn, self.dest ) + self.receiver.flow ( 100 ) + + self.routers['A'] ['mgmt_conn'] = self.make_connection ( event, self.A_addr ) + self.routers['A'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['A'] ['mgmt_conn'], dynamic=True ) + self.routers['A'] ['mgmt_sender'] = event.container.create_sender ( self.routers['A'] ['mgmt_conn'], "$management" ) + + self.routers['B'] ['mgmt_conn'] = self.make_connection ( event, self.B_addr ) + self.routers['B'] ['mgmt_receiver'] = event.container.create_receiver ( self.routers['B'] ['mgmt_conn'], dynamic=True ) + self.routers['B'] ['mgmt_sender'] = event.container.create_sender ( self.routers['B'] ['mgmt_conn'], "$management" ) + + self.send_timer = event.reactor.schedule ( 2, Timeout(self, "send") ) - # There is no need in this test to send messages at all. Just the - # process of killing and replacing router B will kill router A if - # it is not cleaning up its sheafs of prioritized links properly. - # - # Each time this timer goes off we will kill router B and make a - # replacement for it, then make sure that A has survived the process. - # - # To see A die in this test, comment out the call to qdr_reset_sheaf() - # in connections.c, rebuild, and run this test. def timeout ( self, name ): - if name == 'fail': - self.bail ( "Test hanging." ) - return - if name == 'test': - self.timer_count += 1 - if 0 == (self.timer_count % 3 ) : - self.parent.kill_router_B ( ) - self.kill_B = 0 - time.sleep(2) - # Make a new B router, and see if A survives. - A_status = self.parent.new_router_B ( ) - if A_status != None : - self.bail ( "Router A died when new router B was created." ) - return - if self.router_B_count >= 2 : - # We have killed & restarted B 2 times. Good enough. - self.bail ( None ) - return - self.router_B_count += 1 - self.test_timer = self.reactor.schedule ( 1, Timeout(self, "test") ) + if name == 'send': + self.send ( ) + if not self.sent_queries : + self.test_timer = self.reactor.schedule ( 1, Timeout(self, "send") ) + def on_link_opened ( self, event ) : + # A mgmt link has opened. Create its management helper. + # ( Now we know the address that the management helper should use as + # the "reply-to" in its management message. ) + if event.receiver == self.routers['A'] ['mgmt_receiver'] : + event.receiver.flow ( 1000 ) + self.routers['A'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address ) - def on_start ( self, event ): - self.reactor = event.reactor - self.test_timer = event.reactor.schedule ( 1, Timeout(self, "test") ) - self.fail_timer = event.reactor.schedule ( 30, Timeout(self, "fail") ) - self.send_conn = event.container.connect ( self.client_addrs[0] ) # A - self.recv_conn = event.container.connect ( self.client_addrs[1] ) # B + elif event.receiver == self.routers['B'] ['mgmt_receiver'] : + event.receiver.flow ( 1000 ) + self.routers['B'] ['mgmt_helper'] = ManagementMessageHelper ( event.receiver.remote_source.address ) + + + def send ( self ) : + if self.sender.credit <= 0: + self.receiver.flow ( 100 ) + return + + # First send the payload messages. + if self.n_sent < self.n_messages : + for i in xrange(50) : + msg = Message ( body=self.n_sent ) + msg.priority = 3 + self.sender.send ( msg ) + self.n_sent += 1 + # Then send the management queries. + # But only send them once. + elif not self.sent_queries : + # Query router A. + mgmt_helper = self.routers['A'] ['mgmt_helper'] + mgmt_sender = self.routers['A'] ['mgmt_sender'] + msg = mgmt_helper.make_router_link_query ( ) + mgmt_sender.send ( msg ) + + # Query router B. + mgmt_helper = self.routers['B'] ['mgmt_helper'] + mgmt_sender = self.routers['B'] ['mgmt_sender'] + msg = mgmt_helper.make_router_link_query ( ) + mgmt_sender.send ( msg ) + + self.sent_queries = True + + + # This test has two goals: get the response from router A + # and from router B. As they come in, we check them. If + # the response is unsatisfactory we bail out + def goal_satisfied ( self ) : + self.goals += 1 + if self.goals >= self.n_goals : + self.bail ( None ) + + + def on_message ( self, event ) : + + # Don't take any more messages if 'bail' has been called. + if self.finishing : + return + + msg = event.message + + if event.receiver == self.routers['A'] ['mgmt_receiver'] : + # Router A has only one set of outgoing links, and it + # has set a priority for our target address. We should + # see all the messages we sent go out with that priority. + magic = self.magic_addr_priority + if 'results' in msg.body : + results = msg.body['results'] + # I do not want to trust the possibility that the + # results will be returned to me in priority-order. + # Instead, I explicitly asked for the link priority + # in the management query that was sent. Now I will + # loop through all the results, and look for the one + # with the desired priority. + for i in range(len(results)) : + result = results[i] + role = result[0] + dir = result[1] + message_count = result[2] + priority = result[3] + if role == "inter-router" and dir == "out" and priority == magic : + if message_count >= self.n_messages : + self.goal_satisfied ( ) + return + else : + self.bail ( "Router A priority %d had %d messages instead of %d." % + (magic, message_count, self.n_messages) ) + return + + elif event.receiver == self.routers['B'] ['mgmt_receiver'] : + # Router B has two sets of outgoing links, and it has not + # set a priority for the target address. We should see all + # of our messages going out over the message-intrinsic + # priority that the sending client used -- one one of those + # two sets of outgoing links. + magic = self.magic_msg_priority + if 'results' in msg.body : + message_counts = list() + results = msg.body['results'] + for i in range(len(results)) : + result = results[i] + role = result[0] + dir = result[1] + message_count = result[2] + priority = result[3] + if role == "inter-router" and dir == "out" : + if priority == magic : + message_counts.append ( message_count ) + + if self.n_messages in message_counts : + self.goal_satisfied ( ) + else : + self.bail ( "No outgoing link on router B had %d messages at priority 3" % self.n_messages ) + + else : + # This is a payload message -- not management. Just count it. + self.n_received += 1 def run(self): Container(self).run() + + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org