Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 9c5666f6d -> bb4f31219


DISPATCH-1139 : address priority support and test


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/bb4f3121
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/bb4f3121
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/bb4f3121

Branch: refs/heads/master
Commit: bb4f3121989cb4b91d4e364287868da1594f3acf
Parents: 9c5666f
Author: Michael Goulish <mgoul...@redhat.com>
Authored: Thu Oct 18 11:07:45 2018 -0400
Committer: Michael Goulish <mgoul...@redhat.com>
Committed: Thu Oct 18 11:07:45 2018 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/message.h               |   5 +-
 python/qpid_dispatch/management/qdrouter.json |   6 +
 src/message.c                                 |  15 +-
 src/message_private.h                         |   1 +
 src/router_config.c                           |   5 +
 src/router_core/agent_config_address.c        |  19 +
 src/router_core/agent_config_address.h        |   2 +-
 src/router_core/connections.c                 |  14 +-
 src/router_core/core_link_endpoint.c          |   2 +-
 src/router_core/forwarder.c                   |  10 +-
 src/router_core/route_control.c               |   2 +-
 src/router_core/router_core.c                 |   1 +
 src/router_core/router_core_private.h         |   6 +-
 tests/system_tests_priority.py                | 417 ++++++++++++++++-----
 14 files changed, 378 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 02a950e..8af8c9b 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -422,10 +422,9 @@ void qd_message_set_aborted(const qd_message_t *msg, bool 
aborted);
 /**
  * Return message priority
  * @param msg A pointer to the message
- * @param priority [out] The priority value, if present
- * @return True iff the priority was present in the message header
+ * @return The message priority value. Default if not present.
  */
-bool qd_message_get_priority(qd_message_t *msg, uint8_t *priority);
+uint8_t qd_message_get_priority(qd_message_t *msg);
 
 
 ///@}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json 
b/python/qpid_dispatch/management/qdrouter.json
index e31f54c..21bc4cf 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1111,6 +1111,12 @@
                     "description": "Advanced - Override the egress phase for 
this address",
                     "create": true,
                     "required": false
+                },
+                "priority": {
+                    "type": "integer",
+                    "description": "All messages sent to this address which 
lack an intrinsic priority will be assigned this priority.",
+                    "create": true,
+                    "required": false
                 }
             }
         },

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index a3682c1..66d9955 100644
--- a/src/message.c
+++ b/src/message.c
@@ -789,6 +789,7 @@ static void qd_message_parse_priority(qd_message_t *in_msg)
                 if (qd_parse_tag(priority_field) != QD_AMQP_NULL) {
                     uint32_t value = qd_parse_as_uint(priority_field);
                     content->priority = value >= QDR_N_PRIORITIES ? 
QDR_N_PRIORITIES - 1 : (uint8_t) (value & 0x00ff);
+                    content->priority = value > QDR_MAX_PRIORITY ? 
QDR_MAX_PRIORITY : (uint8_t) (value & 0x00ff);
                     content->priority_present = true;
                 }
             }
@@ -1077,17 +1078,21 @@ void qd_message_add_fanout(qd_message_t *in_msg)
     sys_atomic_inc(&msg->content->fanout);
 }
 
-bool qd_message_get_priority(qd_message_t *msg, uint8_t *priority)
+/**
+* There are two sources of priority information --
+* message and address. Address takes precedence, falling
+* through when no address priority has been specified.
+* This also means that messages must always have a priority,
+* using default value if sender leaves it unspecified.
+*/
+uint8_t qd_message_get_priority(qd_message_t *msg)
 {
     qd_message_content_t *content = MSG_CONTENT(msg);
 
     if (!content->priority_parsed)
         qd_message_parse_priority(msg);
 
-    if (content->priority_present)
-        *priority = content->priority;
-
-    return content->priority_present;
+    return content->priority_present ? content->priority : 
QDR_DEFAULT_PRIORITY;
 }
 
 bool qd_message_receive_complete(qd_message_t *in_msg)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 2f3a400..3658597 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -148,6 +148,7 @@ void qd_message_initialize();
 qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *msg);
 
 #define QDR_N_PRIORITIES     10
+#define QDR_MAX_PRIORITY     (QDR_N_PRIORITIES - 1)
 #define QDR_DEFAULT_PRIORITY  4
 
 ///@}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_config.c
----------------------------------------------------------------------
diff --git a/src/router_config.c b/src/router_config.c
index 9ae9f9f..837dde5 100644
--- a/src/router_config.c
+++ b/src/router_config.c
@@ -78,6 +78,7 @@ qd_error_t qd_router_configure_address(qd_router_t *router, 
qd_entity_t *entity)
         bool  waypoint  = qd_entity_opt_bool(entity, "waypoint", false);
         long  in_phase  = qd_entity_opt_long(entity, "ingressPhase", -1);
         long  out_phase = qd_entity_opt_long(entity, "egressPhase", -1);
+        long  priority  = qd_entity_opt_long(entity, "priority",    -1);
 
         //
         // Formulate this configuration create it through the core management 
API.
@@ -108,6 +109,10 @@ qd_error_t qd_router_configure_address(qd_router_t 
*router, qd_entity_t *entity)
         qd_compose_insert_string(body, "waypoint");
         qd_compose_insert_bool(body, waypoint);
 
+        qd_compose_insert_string(body, "priority");
+        qd_compose_insert_long(body, priority);
+
+
         if (in_phase >= 0) {
             qd_compose_insert_string(body, "ingressPhase");
             qd_compose_insert_int(body, in_phase);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/agent_config_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_config_address.c 
b/src/router_core/agent_config_address.c
index 0c36f03..31be6ee 100644
--- a/src/router_core/agent_config_address.c
+++ b/src/router_core/agent_config_address.c
@@ -31,6 +31,7 @@
 #define QDR_CONFIG_ADDRESS_IN_PHASE      6
 #define QDR_CONFIG_ADDRESS_OUT_PHASE     7
 #define QDR_CONFIG_ADDRESS_PATTERN       8
+#define QDR_CONFIG_ADDRESS_PRIORITY      9
 
 const char *qdr_config_address_columns[] =
     {"name",
@@ -42,6 +43,7 @@ const char *qdr_config_address_columns[] =
      "ingressPhase",
      "egressPhase",
      "pattern",
+     "priority",
      0};
 
 const char *CONFIG_ADDRESS_TYPE = 
"org.apache.qpid.dispatch.router.config.address";
@@ -119,6 +121,10 @@ static void 
qdr_config_address_insert_column_CT(qdr_address_config_t *addr, int
     case QDR_CONFIG_ADDRESS_OUT_PHASE:
         qd_compose_insert_int(body, addr->out_phase);
         break;
+
+    case QDR_CONFIG_ADDRESS_PRIORITY:
+        qd_compose_insert_int(body, addr->priority);
+        break;
     }
 }
 
@@ -340,6 +346,7 @@ void qdra_config_address_create_CT(qdr_core_t         *core,
         qd_parsed_field_t *waypoint_field  = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QDR_CONFIG_ADDRESS_WAYPOINT]);
         qd_parsed_field_t *in_phase_field  = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QDR_CONFIG_ADDRESS_IN_PHASE]);
         qd_parsed_field_t *out_phase_field = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QDR_CONFIG_ADDRESS_OUT_PHASE]);
+        qd_parsed_field_t *priority_field  = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QDR_CONFIG_ADDRESS_PRIORITY]);
 
         //
         // Either a prefix or a pattern field is mandatory.  Prefix and pattern
@@ -386,6 +393,7 @@ void qdra_config_address_create_CT(qdr_core_t         *core,
         bool waypoint  = waypoint_field  ? qd_parse_as_bool(waypoint_field) : 
false;
         long in_phase  = in_phase_field  ? qd_parse_as_long(in_phase_field)  : 
-1;
         long out_phase = out_phase_field ? qd_parse_as_long(out_phase_field) : 
-1;
+        long priority  = priority_field  ? qd_parse_as_long(priority_field)  : 
-1;
 
         //
         // Handle the address-phasing logic.  If the phases are provided, use 
them.  Otherwise
@@ -407,6 +415,16 @@ void qdra_config_address_create_CT(qdr_core_t         
*core,
         }
 
         //
+        // Validate the priority values.
+        //
+        if (priority > QDR_MAX_PRIORITY ) {
+            query->status = QD_AMQP_BAD_REQUEST;
+            query->status.description = "Priority value, if present, must be 
between 0 and QDR_MAX_PRIORITY";
+            qd_log(core->agent_log, QD_LOG_ERROR, "Error performing CREATE of 
%s: %s", CONFIG_ADDRESS_TYPE, query->status.description);
+            break;
+        }
+
+        //
         // The request is good.  Create the entity and insert it into the hash 
index and list.
         //
 
@@ -419,6 +437,7 @@ void qdra_config_address_create_CT(qdr_core_t         *core,
         addr->out_phase = out_phase;
         addr->is_prefix = !!prefix_field;
         addr->pattern   = pattern;
+        addr->priority  = priority;
         pattern = 0;
 
         qd_iterator_reset_view(iter, ITER_VIEW_ALL);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/agent_config_address.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_config_address.h 
b/src/router_core/agent_config_address.h
index fd94b76..38c3365 100644
--- a/src/router_core/agent_config_address.h
+++ b/src/router_core/agent_config_address.h
@@ -35,7 +35,7 @@ void qdra_config_address_get_CT(qdr_core_t    *core,
 char *qdra_config_address_validate_pattern_CT(qd_parsed_field_t *pattern_field,
                                               bool is_prefix,
                                               const char **error);
-#define QDR_CONFIG_ADDRESS_COLUMN_COUNT 9
+#define QDR_CONFIG_ADDRESS_COLUMN_COUNT 10
 
 const char *qdr_config_address_columns[QDR_CONFIG_ADDRESS_COLUMN_COUNT + 1];
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 0117efd..c8d3a04 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -220,7 +220,7 @@ int qdr_connection_process(qdr_connection_t *conn)
 
     sys_mutex_lock(conn->work_lock);
     DEQ_MOVE(conn->work_list, work_list);
-    for (int priority = 0; priority < QDR_N_PRIORITIES; ++ priority) {
+    for (int priority = 0; priority <= QDR_MAX_PRIORITY; ++ priority) {
         DEQ_MOVE(conn->links_with_work[priority], links_with_work[priority]);
 
         //
@@ -258,7 +258,7 @@ int qdr_connection_process(qdr_connection_t *conn)
     }
 
     // Process the links_with_work array from highest to lowest priority.
-    for (int priority = QDR_N_PRIORITIES - 1; priority >= 0; -- priority) {
+    for (int priority = QDR_MAX_PRIORITY; priority >= 0; -- priority) {
         do {
             qdr_link_work_t *link_work;
             free_link = false;
@@ -1010,7 +1010,7 @@ void qdr_link_outbound_second_attach_CT(qdr_core_t *core, 
qdr_link_t *link, qdr_
 }
 
 
-qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, 
qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase)
+qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, 
qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase, int 
*priority)
 {
     qdr_address_config_t *addr = 0;
     qd_iterator_view_t old_view = qd_iterator_get_view(iter);
@@ -1024,6 +1024,7 @@ qd_address_treatment_t 
qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connec
 
     if (in_phase)  *in_phase  = addr ? addr->in_phase  : 0;
     if (out_phase) *out_phase = addr ? addr->out_phase : 0;
+    if (priority)  *priority  = addr ? addr->priority  : -1;
 
 
     return addr ? addr->treatment : core->qd->default_treatment;
@@ -1230,7 +1231,8 @@ static qdr_address_t 
*qdr_lookup_terminus_address_CT(qdr_core_t       *core,
     int in_phase;
     int out_phase;
     int addr_phase;
-    qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, 
iter, &in_phase, &out_phase);
+    int priority;
+    qd_address_treatment_t treat = qdr_treatment_for_address_CT(core, conn, 
iter, &in_phase, &out_phase, &priority);
 
     qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
     qd_iterator_annotate_prefix(iter, '\0'); // Cancel previous override
@@ -1259,6 +1261,8 @@ static qdr_address_t 
*qdr_lookup_terminus_address_CT(qdr_core_t       *core,
     if (!!addr && addr->core_endpoint != 0)
         *core_endpoint = true;
 
+    if (addr)
+        addr->priority = priority;
     return addr;
 }
 
@@ -1465,7 +1469,7 @@ static void qdr_attach_link_data_CT(qdr_core_t *core, 
qdr_connection_t *conn, qd
         // As inter-router links are attached to this connection, they
         // are assigned priorities in the order in which they are attached.
         int next_slot = core->data_links_by_mask_bit[conn->mask_bit].count ++;
-        if (next_slot >= QDR_N_PRIORITIES) {
+        if (next_slot > QDR_MAX_PRIORITY) {
             qd_log(core->log, QD_LOG_ERROR, "Attempt to attach too many 
inter-router links for priority sheaf.");
         }
         link->priority = next_slot;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/core_link_endpoint.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_link_endpoint.c 
b/src/router_core/core_link_endpoint.c
index 05e7fb8..01df2c5 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -42,7 +42,7 @@ void qdrc_endpoint_bind_mobile_address_CT(qdr_core_t          
 *core,
 
     qd_hash_retrieve(core->addr_hash, iter, (void*) &addr);
     if (!addr) {
-        qd_address_treatment_t treatment = qdr_treatment_for_address_CT(core, 
0, iter, 0, 0);
+        qd_address_treatment_t treatment = qdr_treatment_for_address_CT(core, 
0, iter, 0, 0, 0);
         if (treatment == QD_TREATMENT_UNAVAILABLE)
             treatment = QD_TREATMENT_ANYCAST_BALANCED;
         addr = qdr_address_CT(core, treatment);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 80f9b07..e1d6d57 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -280,15 +280,13 @@ void qdr_forward_on_message_CT(qdr_core_t *core, 
qdr_subscription_t *sub, qdr_li
  * This function returns a priority value for a message (and address).  If the 
message
  * has no priority header, the default priority is chosen.
  *
- * TODO: Add the ability to get the priority from the address if not present 
in the message
+ * If the address has a defined priority, that value takes precedence.
+ * Otherwise the message priority is used, which has a default value
+ * if none was explicitly set.
  */
 static uint8_t qdr_forward_effective_priority(qd_message_t *msg, qdr_address_t 
*addr)
 {
-    uint8_t priority;
-    bool    has_priority = qd_message_get_priority(msg, &priority);
-    if (!has_priority)
-        priority = QDR_DEFAULT_PRIORITY;
-    return priority;
+    return addr->priority >= 0 ? addr->priority : qd_message_get_priority(msg);
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/route_control.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index 977ac5f..98ccf61 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -472,7 +472,7 @@ qdr_auto_link_t *qdr_route_add_auto_link_CT(qdr_core_t      
    *core,
 
     qd_hash_retrieve(core->addr_hash, iter, (void*) &al->addr);
     if (!al->addr) {
-        qd_address_treatment_t treatment = qdr_treatment_for_address_CT(core, 
0, iter, 0, 0);
+        qd_address_treatment_t treatment = qdr_treatment_for_address_CT(core, 
0, iter, 0, 0, 0);
         if (treatment == QD_TREATMENT_UNAVAILABLE) {
             //if associated address is not defined, assume balanced
             treatment = QD_TREATMENT_ANYCAST_BALANCED;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 19c8f35..76ceba9 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -299,6 +299,7 @@ qdr_address_t *qdr_address_CT(qdr_core_t *core, 
qd_address_treatment_t treatment
     addr->rnodes    = qd_bitmask(0);
     addr->add_prefix = 0;
     addr->del_prefix = 0;
+    addr->priority   = -1;
     return addr;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 9e1cfcc..b1da8a9 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -284,6 +284,7 @@ struct qdr_query_t {
     int                      next_offset;
     bool                     more;
     qd_amqp_error_t          status;
+    uint8_t                  priority;
 };
 
 DEQ_DECLARE(qdr_query_t, qdr_query_list_t); 
@@ -521,6 +522,8 @@ struct qdr_address_t {
     uint64_t deliveries_ingress_route_container;
 
     ///@}
+
+    int priority;
 };
 
 ALLOC_DECLARE(qdr_address_t);
@@ -544,6 +547,7 @@ struct qdr_address_config_t {
     qd_address_treatment_t  treatment;
     int                     in_phase;
     int                     out_phase;
+    int                     priority;
 };
 
 ALLOC_DECLARE(qdr_address_config_t);
@@ -877,7 +881,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t 
*core, qdr_delivery_t *pe
 void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t 
*dlv);
 void qdr_connection_free(qdr_connection_t *conn);
 void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
-qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, 
qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase);
+qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, 
qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase, int 
*priority);
 qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, 
qd_iterator_t *iter);
 qdr_edge_t *qdr_edge(qdr_core_t *);
 void qdr_edge_free(qdr_edge_t *);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/bb4f3121/tests/system_tests_priority.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_priority.py b/tests/system_tests_priority.py
index 1180a2d..887b574 100644
--- a/tests/system_tests_priority.py
+++ b/tests/system_tests_priority.py
@@ -17,6 +17,7 @@
 # under the License.
 #
 
+
 from __future__ import unicode_literals
 from __future__ import division
 from __future__ import absolute_import
@@ -24,19 +25,21 @@ from __future__ import print_function
 
 import unittest2 as unittest
 from proton          import Message, Timeout
-from system_test     import TestCase, Qdrouterd, main_module
+from system_test     import TestCase, Qdrouterd, main_module, Process
 from proton.handlers import MessagingHandler
 from proton.reactor  import Container
 
 import time
 import math
+from qpid_dispatch_internal.compat import UNICODE
 
-import pdb
 
 
-#================================================================
+
+
+#------------------------------------------------
 # Helper classes for all tests.
-#================================================================
+#------------------------------------------------
 
 class Timeout(object):
     """
@@ -52,6 +55,31 @@ class Timeout(object):
 
 
 
+class ManagementMessageHelper ( object ):
+    """
+    Format management messages.
+    """
+    def __init__ ( self, reply_addr ):
+        self.reply_addr = reply_addr
+
+    def make_router_link_query ( self ) :
+        props = { 'count':      '100',
+                  'operation':  'QUERY',
+                  'entityType': 'org.apache.qpid.dispatch.router.link',
+                  'name':       'self',
+                  'type':       'org.amqp.management'
+                }
+        attrs = []
+        attrs.append ( UNICODE('linkType') )
+        attrs.append ( UNICODE('linkDir') )
+        attrs.append ( UNICODE('deliveryCount') )
+        attrs.append ( UNICODE('priority') )
+
+        msg_body = { }
+        msg_body [ 'attributeNames' ] = attrs
+        return Message ( body=msg_body, properties=props, 
reply_to=self.reply_addr )
+
+
 
 
 #================================================================
@@ -79,102 +107,124 @@ class PriorityTests ( TestCase ):
 
         cls.routers = []
 
+        # The sender will send all its messages with magic_message_priority.
+        # The first router will set target addr priority to 
magic_address_priority.
+        # It is important *not* to choose 4 for either of these priorities,
+        # since that is the default message priority.
+        cls.magic_message_priority = 3
+        cls.magic_address_priority = 7
+
         link_cap = 100
         A_client_port = cls.tester.get_port()
         B_client_port = cls.tester.get_port()
+        C_client_port = cls.tester.get_port()
 
         A_inter_router_port = cls.tester.get_port()
         B_inter_router_port = cls.tester.get_port()
+        C_inter_router_port = cls.tester.get_port()
 
         A_config = [
                      ( 'listener',
-                       { 'port': A_client_port,
-                         'role': 'normal',
-                         'stripAnnotations': 'no',
-                         'linkCapacity': link_cap
+                       { 'port'             : A_client_port,
+                         'role'             : 'normal',
+                         'linkCapacity'     : link_cap,
+                         'stripAnnotations' : 'no'
                        }
                      ),
                      ( 'listener',
-                       { 'role': 'inter-router',
-                         'port': A_inter_router_port,
-                         'stripAnnotations': 'no',
-                         'linkCapacity': link_cap
+                       { 'role'             : 'inter-router',
+                         'port'             : A_inter_router_port,
+                         'linkCapacity'     : link_cap,
+                         'stripAnnotations' : 'no'
                        }
-                     )
+                     ),
+                     ( 'address',
+                       { 'prefix'       : 'speedy',
+                         'priority'     : cls.magic_address_priority,
+                         'distribution' : 'closest'
+                       }
+                     ),
                    ]
 
         cls.B_config = [
                          ( 'listener',
-                           { 'port': B_client_port,
-                             'role': 'normal',
-                             'stripAnnotations': 'no',
-                             'linkCapacity': link_cap
+                           { 'port'             : B_client_port,
+                             'role'             : 'normal',
+                             'linkCapacity'     : link_cap,
+                             'stripAnnotations' : 'no'
                            }
                          ),
                          ( 'listener',
-                           { 'role': 'inter-router',
-                             'port': B_inter_router_port,
-                             'stripAnnotations': 'no',
-                             'linkCapacity': link_cap
+                           { 'role'             : 'inter-router',
+                             'port'             : B_inter_router_port,
+                             'linkCapacity'     : link_cap,
+                             'stripAnnotations' : 'no'
                            }
                          ),
                          ( 'connector',
-                           {  'name': 'BA_connector',
-                              'role': 'inter-router',
-                              'port': A_inter_router_port,
-                              'verifyHostname': 'no',
-                              'stripAnnotations': 'no',
-                              'linkCapacity': link_cap
+                           { 'name'             : 'BA_connector',
+                             'role'             : 'inter-router',
+                             'port'             : A_inter_router_port,
+                             'verifyHostname'   : 'no',
+                             'linkCapacity'     : link_cap,
+                             'stripAnnotations' : 'no'
                            }
                          )
                       ]
 
+        C_config = [
+                     ( 'listener',
+                       { 'port'             : C_client_port,
+                         'role'             : 'normal',
+                         'linkCapacity'     : link_cap,
+                         'stripAnnotations' : 'no'
+                       }
+                     ),
+                     ( 'listener',
+                       { 'role'             : 'inter-router',
+                         'port'             : C_inter_router_port,
+                         'linkCapacity'     : link_cap,
+                         'stripAnnotations' : 'no'
+                       }
+                     ),
+                     ( 'connector',
+                       { 'name'             : 'CB_connector',
+                         'role'             : 'inter-router',
+                         'port'             : B_inter_router_port,
+                         'verifyHostname'   : 'no',
+                         'linkCapacity'     : link_cap,
+                         'stripAnnotations' : 'no'
+                       }
+                     )
+                  ]
+
 
         router ( 'A', A_config )
         router ( 'B', cls.B_config )
+        router ( 'C', C_config )
 
 
         router_A = cls.routers[0]
         router_B = cls.routers[1]
+        router_C = cls.routers[2]
 
         router_A.wait_router_connected('B')
+        router_A.wait_router_connected('C')
 
         cls.client_addrs = ( router_A.addresses[0],
-                             router_B.addresses[0]
+                             router_B.addresses[0],
+                             router_C.addresses[0]
                            )
 
-    def kill_router_B ( self ) :
-        status = self.routers[1].poll()
-        self.routers[1].teardown()
-        status = self.routers[1].poll()
-        del self.routers[1]
-
-    
-    def new_router_B ( self ) :
-        name = 'B'
-        config = [ ('router',  {'mode': 'interior', 'id': name}),
-                   ('address', {'prefix': 'closest',   'distribution': 
'closest'}),
-                   ('address', {'prefix': 'balanced',  'distribution': 
'balanced'}),
-                   ('address', {'prefix': 'multicast', 'distribution': 
'multicast'})
-                 ]    \
-                 + self.B_config
-
-        config = Qdrouterd.Config(config)
-        self.routers.append(self.tester.qdrouterd(name, config, wait=False))
-        # We need a little pause here, or A will always show 
-        # a status of None (i.e. Good) even if it is, in fact, 
-        # dying. (i.e. Bad)
-        time.sleep ( 2 )
-        status = self.routers[0].poll()
-        return status
-
-    
+
     def test_priority ( self ):
         name = 'test_01'
         test = Priority ( self,
                           name,
                           self.client_addrs,
-                          "closest/01"
+                          "speedy/01",
+                          self.magic_message_priority,
+                          self.magic_address_priority
                         )
         test.run()
         self.assertEqual ( None, test.error )
@@ -185,88 +235,247 @@ class PriorityTests ( TestCase ):
 #     Tests
 #================================================================
 
+
 class Priority ( MessagingHandler ):
 
-    def __init__ ( self, parent, test_name, client_addrs, destination ):
+    # In this test we will have a linear network of 3 routers.
+    # The sender attaches at A, and the receiver at C.
+    #
+    #  receiver <--- C <--- B <--- A <--- sender
+    #
+    # Priority -- whether message or address -- only operates
+    # on inter-router links. The links from A to B will show
+    # address-priority overriding message-priority. When a
+    # router does not set any message priority, then messages
+    # are routed acording to their intrinsic priority which
+    # was assigned by the sender. This will be shown by the
+    # connection from router B to C.
+    #
+    # The address that the clients use has a prefix of 'speedy'.
+    # Router A will assign a priority of magic_addr_priority to all
+    # 'speedy' addresses.
+    # No other routers will assign any address priorities.
+    #
+    # The sending client will assign a priority of magic_msg_priority
+    # to all the messages it sends.
+    #
+    # So what should happen is:
+    #
+    # 1. at router A, all the 'speedy' messages go out with
+    #    magic_addr_priority, because addr priority takes precedence.
+    #
+    # 2. at router B, they all go out with magic_msg_priority,
+    #    because that router has not assigned any addr priority,
+    #    so the intrinsic message priorities are used.
+    #
+    # 3. Nothing special happens at router C, because it is sending
+    #    the messages out over a connection to an endpoint, which
+    #    is not an inter-router connection.
+    #
+    # In this test we will send a known number of messages and
+    # then send management queries to A and B to learn at what
+    # priorities the messages actually travelled.
+
+    def __init__ ( self, parent, test_name, client_addrs, destination, 
magic_msg_priority, magic_addr_priority ):
         super(Priority, self).__init__(prefetch=10)
 
         self.parent          = parent
         self.client_addrs    = client_addrs
         self.dest            = destination
 
+        self.magic_msg_priority  = magic_msg_priority
+        self.magic_addr_priority = magic_addr_priority
+
         self.error           = None
         self.sender          = None
         self.receiver        = None
-        self.test_timer      = None
-        self.fail_timer      = None
+        self.send_timer      = None
+        self.n_messages      = 100
         self.n_sent          = 0
-        self.n_accepted      = 0
-        self.n_released      = 0
         self.send_conn       = None
         self.recv_conn       = None
-        self.n_messages      = 100
         self.n_received      = 0
-        self.test_start_time = None
-        self.test_end_time   = None
-        self.timer_count     = 0
         self.reactor         = None
-        self.router_B_count  = 0
+        self.timer_count     = 0
+        self.sent_queries    = False
+        self.finishing       = False
+        self.goals           = 0
+        self.n_goals         = 2
+        self.connections     = list()
+        self.A_addr          = self.client_addrs[0]
+        self.B_addr          = self.client_addrs[1]
+        self.C_addr          = self.client_addrs[2]
+        self.routers = {
+                         'A' : dict(),
+                         'B' : dict()
+                       }
 
 
     # Shut down everything and exit.
     def bail ( self, text ):
+        self.send_timer.cancel ( )
+        self.finishing = True
         self.error = text
+        for conn in self.connections :
+            conn.close()
 
-        self.send_conn.close()
-        self.recv_conn.close ( )
-        self.test_timer.cancel ( )
-        self.fail_timer.cancel ( )
+
+    def make_connection ( self, event, addr ) :
+        cnx = event.container.connect ( addr )
+        self.connections.append ( cnx )
+        return cnx
+
+
+    def on_start ( self, event ):
+        self.reactor = event.reactor
+        self.send_conn  = self.make_connection ( event,  self.A_addr )
+        self.recv_conn  = self.make_connection ( event,  self.C_addr )
+
+        self.sender     = event.container.create_sender   ( self.send_conn, 
self.dest )
+        self.receiver   = event.container.create_receiver ( self.recv_conn, 
self.dest )
+        self.receiver.flow ( 100 )
+
+        self.routers['A'] ['mgmt_conn']     = self.make_connection ( event, 
self.A_addr )
+        self.routers['A'] ['mgmt_receiver'] = event.container.create_receiver 
( self.routers['A'] ['mgmt_conn'], dynamic=True )
+        self.routers['A'] ['mgmt_sender']   = event.container.create_sender   
( self.routers['A'] ['mgmt_conn'], "$management" )
+
+        self.routers['B'] ['mgmt_conn']     = self.make_connection ( event, 
self.B_addr )
+        self.routers['B'] ['mgmt_receiver'] = event.container.create_receiver 
( self.routers['B'] ['mgmt_conn'], dynamic=True )
+        self.routers['B'] ['mgmt_sender']   = event.container.create_sender   
( self.routers['B'] ['mgmt_conn'], "$management" )
+
+        self.send_timer = event.reactor.schedule ( 2, Timeout(self, "send") )
 
 
-    # There is no need in this test to send messages at all. Just the
-    # process of killing and replacing router B will kill router A if
-    # it is not cleaning up its sheafs of prioritized links properly.
-    #
-    # Each time this timer goes off we will kill router B and make a 
-    # replacement for it, then make sure that A has survived the process.
-    #
-    # To see A die in this test, comment out the call to qdr_reset_sheaf()
-    # in connections.c, rebuild, and run this test.
     def timeout ( self, name ):
-        if name == 'fail':
-            self.bail ( "Test hanging." )
-            return
-        if name == 'test':
-            self.timer_count += 1
-            if 0 == (self.timer_count % 3 ) :
-                self.parent.kill_router_B ( )
-                self.kill_B = 0
-                time.sleep(2)
-                # Make a new B router, and see if A survives.
-                A_status = self.parent.new_router_B ( )
-                if A_status != None :
-                    self.bail ( "Router A died when new router B was created." 
)
-                    return
-                if self.router_B_count >= 2 :
-                    # We have killed & restarted B 2 times. Good enough.
-                    self.bail ( None )
-                    return
-                self.router_B_count += 1
-            self.test_timer = self.reactor.schedule ( 1, Timeout(self, "test") 
)
+        if name == 'send':
+            self.send ( )
+            if not self.sent_queries :
+                self.test_timer = self.reactor.schedule ( 1, Timeout(self, 
"send") )
 
 
+    def on_link_opened ( self, event ) :
+        # A mgmt link has opened. Create its management helper.
+        # ( Now we know the address that the management helper should use as
+        # the "reply-to" in its management message. )
+        if event.receiver == self.routers['A'] ['mgmt_receiver'] :
+            event.receiver.flow ( 1000 )
+            self.routers['A'] ['mgmt_helper'] = ManagementMessageHelper ( 
event.receiver.remote_source.address )
 
-    def on_start ( self, event ):
-        self.reactor = event.reactor
-        self.test_timer = event.reactor.schedule ( 1,  Timeout(self, "test") )
-        self.fail_timer = event.reactor.schedule ( 30, Timeout(self, "fail") )
-        self.send_conn  = event.container.connect ( self.client_addrs[0] ) # A
-        self.recv_conn  = event.container.connect ( self.client_addrs[1] ) # B
+        elif event.receiver == self.routers['B'] ['mgmt_receiver'] :
+            event.receiver.flow ( 1000 )
+            self.routers['B'] ['mgmt_helper'] = ManagementMessageHelper ( 
event.receiver.remote_source.address )
+
+
+    def send ( self ) :
+        if self.sender.credit <= 0:
+            self.receiver.flow ( 100 )
+            return
+
+        # First send the payload messages.
+        if self.n_sent < self.n_messages :
+            for i in xrange(50) :
+                msg = Message ( body=self.n_sent )
+                msg.priority = 3
+                self.sender.send ( msg )
+                self.n_sent += 1
+        # Then send the management queries.
+        # But only send them once.
+        elif not self.sent_queries  :
+            # Query router A.
+            mgmt_helper = self.routers['A'] ['mgmt_helper']
+            mgmt_sender = self.routers['A'] ['mgmt_sender']
+            msg = mgmt_helper.make_router_link_query ( )
+            mgmt_sender.send ( msg )
+
+            # Query router B.
+            mgmt_helper = self.routers['B'] ['mgmt_helper']
+            mgmt_sender = self.routers['B'] ['mgmt_sender']
+            msg = mgmt_helper.make_router_link_query ( )
+            mgmt_sender.send ( msg )
+
+            self.sent_queries = True
+
+
+    # This test has two goals: get the response from router A
+    # and from router B. As they come in, we check them. If
+    # the response is unsatisfactory we bail out
+    def goal_satisfied ( self ) :
+        self.goals += 1
+        if self.goals >= self.n_goals :
+            self.bail ( None )
+
+
+    def on_message ( self, event ) :
+
+        # Don't take any more messages if 'bail' has been called.
+        if self.finishing :
+            return
+
+        msg = event.message
+
+        if event.receiver == self.routers['A'] ['mgmt_receiver'] :
+            # Router A has only one set of outgoing links, and it
+            # has set a priority for our target address. We should
+            # see all the messages we sent go out with that priority.
+            magic = self.magic_addr_priority
+            if 'results' in msg.body :
+                results = msg.body['results']
+                # I do not want to trust the possibility that the
+                # results will be returned to me in priority-order.
+                # Instead, I explicitly asked for the link priority
+                # in the management query that was sent. Now I will
+                # loop through all the results, and look for the one
+                # with the desired priority.
+                for i in range(len(results)) :
+                    result = results[i]
+                    role          = result[0]
+                    dir           = result[1]
+                    message_count = result[2]
+                    priority      = result[3]
+                    if role == "inter-router" and dir == "out" and priority == 
magic  :
+                        if message_count >= self.n_messages :
+                            self.goal_satisfied ( )
+                            return
+                        else :
+                            self.bail ( "Router A priority %d had %d messages 
instead of %d." %
+                                        (magic, message_count, 
self.n_messages) )
+                            return
+
+        elif event.receiver == self.routers['B'] ['mgmt_receiver'] :
+            # Router B has two sets of outgoing links, and it has not
+            # set a priority for the target address. We should see all
+            # of our messages going out over the message-intrinsic
+            # priority that the sending client used -- one one of those
+            # two sets of outgoing links.
+            magic = self.magic_msg_priority
+            if 'results' in msg.body :
+                message_counts = list()
+                results = msg.body['results']
+                for i in range(len(results)) :
+                    result = results[i]
+                    role          = result[0]
+                    dir           = result[1]
+                    message_count = result[2]
+                    priority      = result[3]
+                    if role == "inter-router" and dir == "out" :
+                        if priority == magic :
+                            message_counts.append ( message_count )
+
+                if self.n_messages in message_counts :
+                    self.goal_satisfied ( )
+                else :
+                    self.bail ( "No outgoing link on router B had %d messages 
at priority 3" % self.n_messages )
+
+        else :
+            # This is a payload message -- not management. Just count it.
+            self.n_received += 1
 
 
     def run(self):
         Container(self).run()
 
 
+
+
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to