This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new b8613b8  DISPATCH-1224 - Completed endpoint-initiated waypoints.
b8613b8 is described below

commit b8613b8a64353ee021e218c8c487a9871517966c
Author: Ted Ross <tr...@redhat.com>
AuthorDate: Fri Dec 14 17:38:08 2018 -0500

    DISPATCH-1224 - Completed endpoint-initiated waypoints.
    
    DISPATCH-1224 - Added a waypoint and multi-phase test.
    
    DISPATCH-1224 - Replaced use of sprintf with explicit characters.
---
 include/qpid/dispatch/amqp.h                       |   9 +
 include/qpid/dispatch/router_core.h                |  11 +
 python/qpid_dispatch/management/qdrouter.json      |   7 +
 .../qpid_dispatch_internal/policy/policy_local.py  |   6 +-
 src/amqp.c                                         |  19 +-
 src/policy.c                                       |  41 ++
 src/policy.h                                       |   1 +
 src/router_core/connections.c                      |   2 +-
 .../modules/address_lookup_client/lookup_client.c  |   7 +-
 src/router_core/modules/edge_router/addr_proxy.c   |  53 ++-
 src/router_core/route_control.c                    |   2 +
 src/router_core/router_core.c                      |   4 +-
 src/router_core/router_core_private.h              |   1 +
 src/router_core/terminus.c                         |  20 +
 tests/CMakeLists.txt                               |   1 +
 tests/system_tests_multi_phase.py                  | 411 +++++++++++++++++++++
 16 files changed, 574 insertions(+), 21 deletions(-)

diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index 35f7c67..9f6be37 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -127,7 +127,16 @@ extern const char * const QD_CAPABILITY_ANONYMOUS_RELAY;
 extern const char * const QD_CAPABILITY_ROUTER_CONTROL;
 extern const char * const QD_CAPABILITY_ROUTER_DATA;
 extern const char * const QD_CAPABILITY_EDGE_DOWNLINK;
+extern const char * const QD_CAPABILITY_WAYPOINT_DEFAULT;
 extern const char * const QD_CAPABILITY_WAYPOINT1;
+extern const char * const QD_CAPABILITY_WAYPOINT2;
+extern const char * const QD_CAPABILITY_WAYPOINT3;
+extern const char * const QD_CAPABILITY_WAYPOINT4;
+extern const char * const QD_CAPABILITY_WAYPOINT5;
+extern const char * const QD_CAPABILITY_WAYPOINT6;
+extern const char * const QD_CAPABILITY_WAYPOINT7;
+extern const char * const QD_CAPABILITY_WAYPOINT8;
+extern const char * const QD_CAPABILITY_WAYPOINT9;
 /// @}
 
 /** @name Dynamic Node Properties */
diff --git a/include/qpid/dispatch/router_core.h 
b/include/qpid/dispatch/router_core.h
index a03db27..136ad6a 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -316,6 +316,17 @@ void qdr_terminus_add_capability(qdr_terminus_t *term, 
const char *capability);
 bool qdr_terminus_has_capability(qdr_terminus_t *term, const char *capability);
 
 /**
+ * qdr_terminus_waypoint_capability
+ *
+ * If the terminus has a waypoint capability, return the ordinal of the
+ * waypoint.  If not, return zero.
+ *
+ * @param term A qdr_terminus pointer returned by qdr_terminus()
+ * @return 1..9 if the terminus has waypoint capability, 0 otherwise
+ */
+int qdr_terminus_waypoint_capability(qdr_terminus_t *term);
+
+/**
  * qdr_terminus_is_anonymous
  *
  * Indicate whether this terminus represents an anonymous endpoint.
diff --git a/python/qpid_dispatch/management/qdrouter.json 
b/python/qpid_dispatch/management/qdrouter.json
index 5408c9b..180a0e0 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1868,6 +1868,13 @@
                     "required": false,
                     "create": true
                 },
+                "allowWaypointLinks": {
+                    "type": "boolean",
+                    "description": "Whether this connection is allowed to 
claim 'waypoint.N' capability for attached links.  This allows endpoints to act 
as waypoints without needing auto-links.",
+                    "default": true,
+                    "required": false,
+                    "create": true
+                },
                 "sources": {
                     "type": "string",
                     "description": "A list of source addresses from which 
users in this group may receive messages. To specify multiple addresses, 
separate the addresses with either a comma or a space. If you do not specify 
any addresses, users in this group are not allowed to receive messages from any 
addresses. You can use the substitution token '${user}' to specify an address 
that contains a user's authenticated user name. You can use an asterisk ('*') 
wildcard to match one or more ch [...]
diff --git a/python/qpid_dispatch_internal/policy/policy_local.py 
b/python/qpid_dispatch_internal/policy/policy_local.py
index 9a5a1d6..2f3cfb4 100644
--- a/python/qpid_dispatch_internal/policy/policy_local.py
+++ b/python/qpid_dispatch_internal/policy/policy_local.py
@@ -70,6 +70,7 @@ class PolicyKeys(object):
     KW_ALLOW_DYNAMIC_SRC        = "allowDynamicSource"
     KW_ALLOW_ANONYMOUS_SENDER   = "allowAnonymousSender"
     KW_ALLOW_USERID_PROXY       = "allowUserIdProxy"
+    KW_ALLOW_WAYPOINT_LINKS     = "allowWaypointLinks"
     KW_SOURCES                  = "sources"
     KW_TARGETS                  = "targets"
     KW_SOURCE_PATTERN           = "sourcePattern"
@@ -143,6 +144,7 @@ class PolicyCompiler(object):
         PolicyKeys.KW_ALLOW_DYNAMIC_SRC,
         PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER,
         PolicyKeys.KW_ALLOW_USERID_PROXY,
+        PolicyKeys.KW_ALLOW_WAYPOINT_LINKS,
         PolicyKeys.KW_SOURCES,
         PolicyKeys.KW_TARGETS,
         PolicyKeys.KW_SOURCE_PATTERN,
@@ -243,6 +245,7 @@ class PolicyCompiler(object):
         policy_out[PolicyKeys.KW_ALLOW_DYNAMIC_SRC] = False
         policy_out[PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER] = False
         policy_out[PolicyKeys.KW_ALLOW_USERID_PROXY] = False
+        policy_out[PolicyKeys.KW_ALLOW_WAYPOINT_LINKS] = True
         policy_out[PolicyKeys.KW_SOURCES] = ''
         policy_out[PolicyKeys.KW_TARGETS] = ''
         policy_out[PolicyKeys.KW_SOURCE_PATTERN] = ''
@@ -278,7 +281,8 @@ class PolicyCompiler(object):
                 policy_out[key] = val_out
             elif key in [PolicyKeys.KW_ALLOW_ANONYMOUS_SENDER,
                          PolicyKeys.KW_ALLOW_DYNAMIC_SRC,
-                         PolicyKeys.KW_ALLOW_USERID_PROXY
+                         PolicyKeys.KW_ALLOW_USERID_PROXY,
+                         PolicyKeys.KW_ALLOW_WAYPOINT_LINKS
                          ]:
                 if isinstance(val, (PY_STRING_TYPE, PY_TEXT_TYPE)) and 
val.lower() in ['true', 'false']:
                     val = True if val == 'true' else False
diff --git a/src/amqp.c b/src/amqp.c
index c1a0f11..9164d60 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -32,11 +32,20 @@ const int          QD_MA_MAX_KEY_LEN = 16;
 const int          QD_MA_N_KEYS      = 4;  // max number of router annotations 
to send/receive
 const int          QD_MA_FILTER_LEN  = 5;  // N tailing inbound entries to 
search for stripping
 
-const char * const QD_CAPABILITY_ROUTER_CONTROL  = "qd.router";
-const char * const QD_CAPABILITY_ROUTER_DATA     = "qd.router-data";
-const char * const QD_CAPABILITY_EDGE_DOWNLINK   = "qd.router-edge-downlink";
-const char * const QD_CAPABILITY_WAYPOINT1       = "qd.waypoint.1";
-const char * const QD_CAPABILITY_ANONYMOUS_RELAY = "ANONYMOUS-RELAY";
+const char * const QD_CAPABILITY_ROUTER_CONTROL   = "qd.router";
+const char * const QD_CAPABILITY_ROUTER_DATA      = "qd.router-data";
+const char * const QD_CAPABILITY_EDGE_DOWNLINK    = "qd.router-edge-downlink";
+const char * const QD_CAPABILITY_WAYPOINT_DEFAULT = "qd.waypoint";
+const char * const QD_CAPABILITY_WAYPOINT1        = "qd.waypoint.1";
+const char * const QD_CAPABILITY_WAYPOINT2        = "qd.waypoint.2";
+const char * const QD_CAPABILITY_WAYPOINT3        = "qd.waypoint.3";
+const char * const QD_CAPABILITY_WAYPOINT4        = "qd.waypoint.4";
+const char * const QD_CAPABILITY_WAYPOINT5        = "qd.waypoint.5";
+const char * const QD_CAPABILITY_WAYPOINT6        = "qd.waypoint.6";
+const char * const QD_CAPABILITY_WAYPOINT7        = "qd.waypoint.7";
+const char * const QD_CAPABILITY_WAYPOINT8        = "qd.waypoint.8";
+const char * const QD_CAPABILITY_WAYPOINT9        = "qd.waypoint.9";
+const char * const QD_CAPABILITY_ANONYMOUS_RELAY  = "ANONYMOUS-RELAY";
 
 const char * const QD_DYNAMIC_NODE_PROPERTY_ADDRESS = "x-opt-qd.address";
 
diff --git a/src/policy.c b/src/policy.c
index 636292e..910a93b 100644
--- a/src/policy.c
+++ b/src/policy.c
@@ -437,6 +437,7 @@ bool qd_policy_open_lookup_user(
                         settings->allowDynamicSource   = 
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowDynamicSource", false);
                     }
                     settings->allowUserIdProxy     = 
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowUserIdProxy", false);
+                    settings->allowWaypointLinks   = 
qd_entity_opt_bool((qd_entity_t*)upolicy, "allowWaypointLinks", true);
                     if (settings->sources == 0) { //don't override if 
configured by authz plugin
                         settings->sources              = 
qd_entity_get_string((qd_entity_t*)upolicy, "sources");
                     }
@@ -873,6 +874,24 @@ bool _qd_policy_approve_link_name_tree(const char 
*username, const char *allowed
 }
 
 
+static bool qd_policy_terminus_is_waypoint(pn_terminus_t *term)
+{
+    pn_data_t *cap = pn_terminus_capabilities(term);
+    if (cap) {
+        pn_data_rewind(cap);
+        pn_data_next(cap);
+        if (cap && pn_data_type(cap) == PN_SYMBOL) {
+            pn_bytes_t sym = pn_data_get_symbol(cap);
+            size_t     len = strlen(QD_CAPABILITY_WAYPOINT_DEFAULT);
+            if (sym.size >= len && strncmp(sym.start, 
QD_CAPABILITY_WAYPOINT_DEFAULT, len) == 0)
+                return true;
+        }
+    }
+
+    return false;
+}
+
+
 bool qd_policy_approve_amqp_sender_link(pn_link_t *pn_link, qd_connection_t 
*qd_conn)
 {
     const char *hostip = qd_connection_remote_ip(qd_conn);
@@ -897,6 +916,17 @@ bool qd_policy_approve_amqp_sender_link(pn_link_t 
*pn_link, qd_connection_t *qd_
     bool lookup;
     if (target && *target) {
         // a target is specified
+        if (!qd_conn->policy_settings->allowWaypointLinks) {
+            bool waypoint = 
qd_policy_terminus_is_waypoint(pn_link_remote_target(pn_link));
+            if (waypoint) {
+                
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO,
+                       "[%"PRIu64"]: DENY AMQP Attach sender link '%s' for 
user '%s', rhost '%s', vhost '%s'.  Waypoint capability not permitted",
+                       qd_conn->connection_id, target, qd_conn->user_id, 
hostip, vhost);
+                _qd_policy_deny_amqp_sender_link(pn_link, qd_conn, 
QD_AMQP_COND_UNAUTHORIZED_ACCESS);
+                return false;
+            }
+        }
+
         lookup = qd_policy_approve_link_name(qd_conn->user_id, 
qd_conn->policy_settings, target, false);
 
         qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, 
(lookup ? QD_LOG_TRACE : QD_LOG_INFO),
@@ -959,6 +989,17 @@ bool qd_policy_approve_amqp_receiver_link(pn_link_t 
*pn_link, qd_connection_t *q
     const char * source = 
pn_terminus_get_address(pn_link_remote_source(pn_link));
     if (source && *source) {
         // a source is specified
+        if (!qd_conn->policy_settings->allowWaypointLinks) {
+            bool waypoint = 
qd_policy_terminus_is_waypoint(pn_link_remote_source(pn_link));
+            if (waypoint) {
+                
qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, QD_LOG_INFO,
+                       "[%"PRIu64"]: DENY AMQP Attach receiver link '%s' for 
user '%s', rhost '%s', vhost '%s'.  Waypoint capability not permitted",
+                       qd_conn->connection_id, source, qd_conn->user_id, 
hostip, vhost);
+                _qd_policy_deny_amqp_sender_link(pn_link, qd_conn, 
QD_AMQP_COND_UNAUTHORIZED_ACCESS);
+                return false;
+            }
+        }
+
         bool lookup = qd_policy_approve_link_name(qd_conn->user_id, 
qd_conn->policy_settings, source, true);
 
         qd_log(qd_server_dispatch(qd_conn->server)->policy->log_source, 
(lookup ? QD_LOG_TRACE : QD_LOG_INFO),
diff --git a/src/policy.h b/src/policy.h
index a1d415a..5f7af74 100644
--- a/src/policy.h
+++ b/src/policy.h
@@ -51,6 +51,7 @@ struct qd_policy__settings_s {
     bool allowDynamicSource;
     bool allowAnonymousSender;
     bool allowUserIdProxy;
+    bool allowWaypointLinks;
     char *sources;
     char *targets;
     char *sourcePattern;
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index a810a1e..d6ced98 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -387,7 +387,7 @@ qd_direction_t qdr_link_direction(const qdr_link_t *link)
 
 int qdr_link_phase(const qdr_link_t *link)
 {
-    return link && link->auto_link ? link->auto_link->phase : 0;
+    return link ? link->phase : 0;
 }
 
 
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c 
b/src/router_core/modules/address_lookup_client/lookup_client.c
index b457304..039387b 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -252,9 +252,10 @@ static qdr_address_t 
*qdr_lookup_terminus_address_CT(qdr_core_t       *core,
     //
     // If the terminus has a waypoint capability, override the configured 
phases and use the waypoint phases.
     //
-    if (qdr_terminus_has_capability(terminus, QD_CAPABILITY_WAYPOINT1)) {
-        in_phase  = 1;
-        out_phase = 0;
+    int waypoint_ordinal = qdr_terminus_waypoint_capability(terminus);
+    if (waypoint_ordinal > 0) {
+        in_phase  = waypoint_ordinal;
+        out_phase = waypoint_ordinal - 1;
     }
 
     qd_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
diff --git a/src/router_core/modules/edge_router/addr_proxy.c 
b/src/router_core/modules/edge_router/addr_proxy.c
index a6c409e..c73dfc0 100644
--- a/src/router_core/modules/edge_router/addr_proxy.c
+++ b/src/router_core/modules/edge_router/addr_proxy.c
@@ -89,6 +89,41 @@ static qdr_terminus_t *qdr_terminus_normal(const char *addr)
 }
 
 
+static void set_waypoint_capability(qdr_terminus_t *term, char phase_char, 
qd_direction_t dir, int in_phase, int out_phase)
+{
+    int phase = (int) (phase_char - '0');
+
+    //
+    // For links that are outgoing on the in_phase or incoming on the 
out_phase, don't set the
+    // waypoint capability.  These links will behave like normal client links.
+    //
+    if ((dir == QD_OUTGOING && phase == in_phase) ||
+        (dir == QD_INCOMING && phase == out_phase))
+        return;
+
+    //
+    // If the phase is outside the range of in_phase..out_phase, don't do 
anything.  This is a
+    // misconfiguration.
+    //
+    if (phase < in_phase || phase > out_phase)
+        return;
+
+    //
+    // In all remaining cases, the new links are acting as waypoints.
+    //
+    int ordinal = phase + (dir == QD_OUTGOING ? 0 : 1);
+    char cap[16];
+    char suffix[3];
+
+    strcpy(cap, QD_CAPABILITY_WAYPOINT_DEFAULT);
+    suffix[0] = '.';
+    suffix[1] = '0' + ordinal;
+    suffix[2] = '\0';
+    strcat(cap, suffix);
+    qdr_terminus_add_capability(term, cap);
+}
+
+
 static void add_inlink(qcm_edge_addr_proxy_t *ap, const char *key, 
qdr_address_t *addr)
 {
     if (addr->edge_inlink == 0) {
@@ -96,13 +131,12 @@ static void add_inlink(qcm_edge_addr_proxy_t *ap, const 
char *key, qdr_address_t
 
         if (addr->config && addr->config->out_phase > 0) {
             //
-            // If this address is configured as multi-phase, check to see if 
it is
-            // an inlink on phase-0.  If so, tell the Interior peer that this 
is
-            // for a waypoint.
+            // If this address is configured as multi-phase, we may need to
+            // add waypoint capabilities to the terminus.
             //
             const char *key = (char*) qd_hash_key_by_handle(addr->hash_handle);
-            if (key[0] == QD_ITER_HASH_PREFIX_MOBILE && key[1] == '0')
-                qdr_terminus_add_capability(term, QD_CAPABILITY_WAYPOINT1);
+            if (key[0] == QD_ITER_HASH_PREFIX_MOBILE)
+                set_waypoint_capability(term, key[1], QD_INCOMING, 
addr->config->in_phase, addr->config->out_phase);
         }
 
         qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, 
QD_LINK_ENDPOINT, QD_INCOMING,
@@ -136,13 +170,12 @@ static void add_outlink(qcm_edge_addr_proxy_t *ap, const 
char *key, qdr_address_
 
         if (addr->config && addr->config->out_phase > 0) {
             //
-            // If this address is configured as multi-phase, check to see if 
it is
-            // an outlink on phase-1.  If so, tell the Interior peer that this 
is
-            // for a waypoint.
+            // If this address is configured as multi-phase, we may need to
+            // add waypoint capabilities to the terminus.
             //
             const char *key = (char*) qd_hash_key_by_handle(addr->hash_handle);
-            if (key[0] == QD_ITER_HASH_PREFIX_MOBILE && key[1] == '1')
-                qdr_terminus_add_capability(term, QD_CAPABILITY_WAYPOINT1);
+            if (key[0] == QD_ITER_HASH_PREFIX_MOBILE)
+                set_waypoint_capability(term, key[1], QD_OUTGOING, 
addr->config->in_phase, addr->config->out_phase);
         }
 
         qdr_link_t *link = qdr_create_link_CT(ap->core, ap->edge_conn, 
QD_LINK_ENDPOINT, QD_OUTGOING,
diff --git a/src/router_core/route_control.c b/src/router_core/route_control.c
index b843436..30cfb44 100644
--- a/src/router_core/route_control.c
+++ b/src/router_core/route_control.c
@@ -255,6 +255,7 @@ static void qdr_auto_link_activate_CT(qdr_core_t *core, 
qdr_auto_link_t *al, qdr
                 qdr_terminus_set_address(term, &key[2]); // truncate the "Mp" 
annotation (where p = phase)
             al->link = qdr_create_link_CT(core, conn, QD_LINK_ENDPOINT, 
al->dir, source, target);
             al->link->auto_link = al;
+            al->link->phase     = al->phase;
             al->state = QDR_AUTO_LINK_STATE_ATTACHING;
         }
         else {
@@ -288,6 +289,7 @@ static void qdr_auto_link_deactivate_CT(qdr_core_t *core, 
qdr_auto_link_t *al, q
     if (al->link) {
         qdr_link_outbound_detach_CT(core, al->link, 0, QDR_CONDITION_NONE, 
true);
         al->link->auto_link = 0;
+        al->link->phase     = 0;
         al->link            = 0;
     }
 
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 3f21290..28649ad 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -453,12 +453,14 @@ void qdr_core_remove_address(qdr_core_t *core, 
qdr_address_t *addr)
 
 void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, 
qdr_link_t *link)
 {
+    const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
     link->owning_addr = addr;
+    if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE))
+        link->phase = (int) (key[1] - '0');
 
     if (link->link_direction == QD_OUTGOING) {
         qdr_add_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS);
         if (DEQ_SIZE(addr->rlinks) == 1) {
-            const char *key = (const char*) 
qd_hash_key_by_handle(addr->hash_handle);
             if (key && (*key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY || *key == 
QD_ITER_HASH_PREFIX_MOBILE))
                 qdr_post_mobile_added_CT(core, key, addr->treatment);
             qdr_addr_start_inlinks_CT(core, addr);
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 289614a..a2ebc8e 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -422,6 +422,7 @@ struct qdr_link_t {
     int                      attach_count;       ///< 1 or 2 depending on the 
state of the lifecycle
     int                      detach_count;       ///< 0, 1, or 2 depending on 
the state of the lifecycle
     qdr_address_t           *owning_addr;        ///< [ref] Address record 
that owns this link
+    int                      phase;
     qdr_link_t              *connected_link;     ///< [ref] If this is a 
link-route, reference the connected link
     qdrc_endpoint_t         *core_endpoint;      ///< [ref] Set if this link 
terminates on an in-core endpoint
     qdr_link_ref_t          *ref[QDR_LINK_LIST_CLASSES];  ///< Pointers to 
containing reference objects
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index 1486148..58ec9e5 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -128,6 +128,26 @@ bool qdr_terminus_has_capability(qdr_terminus_t *term, 
const char *capability)
 }
 
 
+int qdr_terminus_waypoint_capability(qdr_terminus_t *term)
+{
+    pn_data_t *cap = term->capabilities;
+    pn_data_rewind(cap);
+    pn_data_next(cap);
+    if (cap && pn_data_type(cap) == PN_SYMBOL) {
+        pn_bytes_t sym = pn_data_get_symbol(cap);
+        size_t     len = strlen(QD_CAPABILITY_WAYPOINT_DEFAULT);
+        if (sym.size >= len && strncmp(sym.start, 
QD_CAPABILITY_WAYPOINT_DEFAULT, len) == 0) {
+            if (sym.size == len)
+                return 1;    // This is the default ordinal
+            if (sym.size == len + 2 && sym.start[len + 1] > '0' && 
sym.start[len + 1] <= '9')
+                return (int) (sym.start[len + 1] - '0');
+        }
+    }
+
+    return 0;
+}
+
+
 bool qdr_terminus_is_anonymous(qdr_terminus_t *term)
 {
     return term == 0 || (term->address == 0 && !term->dynamic);
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 9c68325..6eb0547 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -126,6 +126,7 @@ foreach(py_test_module
     system_tests_priority
     system_tests_core_client
     system_tests_address_lookup
+    system_tests_multi_phase
     )
 
   add_test(${py_test_module} ${TEST_WRAP} -x unit2 -v ${py_test_module})
diff --git a/tests/system_tests_multi_phase.py 
b/tests/system_tests_multi_phase.py
new file mode 100644
index 0000000..27043d0
--- /dev/null
+++ b/tests/system_tests_multi_phase.py
@@ -0,0 +1,411 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from __future__ import unicode_literals
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import print_function
+
+from time import sleep
+from threading import Event
+from threading import Timer
+
+import unittest2 as unittest
+from proton import Message, Timeout, symbol
+from system_test import TestCase, Qdrouterd, main_module, TIMEOUT, MgmtMsgProxy
+from system_test import AsyncTestReceiver
+from system_test import AsyncTestSender
+from system_test import QdManager
+from system_tests_link_routes import ConnLinkRouteService
+from proton.handlers import MessagingHandler
+from proton.reactor import Container, DynamicNodeProperties
+from proton.utils import BlockingConnection
+from qpid_dispatch.management.client import Node
+from subprocess import PIPE, STDOUT
+import re
+
+
+class AddrTimer(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+            self.parent.check_address()
+
+
+class RouterTest(TestCase):
+
+    inter_router_port = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router"""
+        super(RouterTest, cls).setUpClass()
+
+        def router(name, mode, connection, extra=None):
+            config = [
+                ('router', {'mode': mode, 'id': name}),
+                ('listener', {'port': cls.tester.get_port(), 
'stripAnnotations': 'no'}),
+                ('address', {'prefix': 'queue', 'waypoint': 'yes'}),
+                ('address', {'prefix': 'multi', 'ingressPhase': '0', 
'egressPhase': '9'}),
+                connection
+            ]
+
+            if extra:
+                config.append(extra)
+            config = Qdrouterd.Config(config)
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+
+        inter_router_port = cls.tester.get_port()
+        edge_port_A       = cls.tester.get_port()
+        edge_port_B       = cls.tester.get_port()
+
+        router('INT.A', 'interior', ('listener', {'role': 'inter-router', 
'port': inter_router_port}),
+               ('listener', {'role': 'edge', 'port': edge_port_A}))
+        router('INT.B', 'interior', ('connector', {'name': 'connectorToA', 
'role': 'inter-router', 'port': inter_router_port}),
+               ('listener', {'role': 'edge', 'port': edge_port_B}))
+        router('EA1',   'edge',     ('connector', {'name': 'edge', 'role': 
'edge', 'port': edge_port_A}))
+        router('EA2',   'edge',     ('connector', {'name': 'edge', 'role': 
'edge', 'port': edge_port_A}))
+        router('EB1',   'edge',     ('connector', {'name': 'edge', 'role': 
'edge', 'port': edge_port_B}))
+        router('EB2',   'edge',     ('connector', {'name': 'edge', 'role': 
'edge', 'port': edge_port_B}))
+
+        cls.routers[0].wait_router_connected('INT.B')
+        cls.routers[1].wait_router_connected('INT.A')
+
+
+    def test_01_waypoint_same_interior(self):
+        test = WaypointTest(self.routers[0].addresses[0],
+                            self.routers[0].addresses[0],
+                            self.routers[0].addresses[0],
+                            'queue.01')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_02_waypoint_same_edge(self):
+        test = WaypointTest(self.routers[2].addresses[0],
+                            self.routers[2].addresses[0],
+                            self.routers[2].addresses[0],
+                            'queue.02')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_03_waypoint_edge_interior(self):
+        test = WaypointTest(self.routers[2].addresses[0],
+                            self.routers[2].addresses[0],
+                            self.routers[0].addresses[0],
+                            'queue.03')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_04_waypoint_interior_edge(self):
+        test = WaypointTest(self.routers[0].addresses[0],
+                            self.routers[0].addresses[0],
+                            self.routers[2].addresses[0],
+                            'queue.04')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_05_waypoint_interior_interior(self):
+        test = WaypointTest(self.routers[0].addresses[0],
+                            self.routers[0].addresses[0],
+                            self.routers[1].addresses[0],
+                            'queue.05')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_06_waypoint_edge_edge(self):
+        test = WaypointTest(self.routers[2].addresses[0],
+                            self.routers[5].addresses[0],
+                            self.routers[0].addresses[0],
+                            'queue.06')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_07_waypoint_edge_endpoints_int_1(self):
+        test = WaypointTest(self.routers[0].addresses[0],
+                            self.routers[1].addresses[0],
+                            self.routers[2].addresses[0],
+                            'queue.07')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_08_waypoint_edge_endpoints_int_2(self):
+        test = WaypointTest(self.routers[0].addresses[0],
+                            self.routers[1].addresses[0],
+                            self.routers[5].addresses[0],
+                            'queue.08')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_09_waypoint_int_endpoints_edge_1(self):
+        test = WaypointTest(self.routers[2].addresses[0],
+                            self.routers[5].addresses[0],
+                            self.routers[0].addresses[0],
+                            'queue.09')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_10_waypoint_int_endpoints_edge_2(self):
+        test = WaypointTest(self.routers[2].addresses[0],
+                            self.routers[5].addresses[0],
+                            self.routers[1].addresses[0],
+                            'queue.10')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_11_waypoint_int_endpoints_int_1(self):
+        test = WaypointTest(self.routers[0].addresses[0],
+                            self.routers[1].addresses[0],
+                            self.routers[0].addresses[0],
+                            'queue.11')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_12_waypoint_int_endpoints_int_2(self):
+        test = WaypointTest(self.routers[0].addresses[0],
+                            self.routers[1].addresses[0],
+                            self.routers[1].addresses[0],
+                            'queue.12')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_13_waypoint_edge_endpoints_edge_1(self):
+        test = WaypointTest(self.routers[2].addresses[0],
+                            self.routers[5].addresses[0],
+                            self.routers[3].addresses[0],
+                            'queue.13')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_14_waypoint_edge_endpoints_edge_2(self):
+        test = WaypointTest(self.routers[2].addresses[0],
+                            self.routers[5].addresses[0],
+                            self.routers[4].addresses[0],
+                            'queue.14')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_15_multiphase_1(self):
+        test = MultiPhaseTest(self.routers[2].addresses[0],
+                              self.routers[5].addresses[0],
+                              [
+                                  self.routers[0].addresses[0],
+                                  self.routers[1].addresses[0],
+                                  self.routers[2].addresses[0],
+                                  self.routers[3].addresses[0],
+                                  self.routers[4].addresses[0],
+                                  self.routers[5].addresses[0],
+                                  self.routers[0].addresses[0],
+                                  self.routers[1].addresses[0],
+                                  self.routers[2].addresses[0]
+                              ],
+                              'multi.15')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_16_multiphase_2(self):
+        test = MultiPhaseTest(self.routers[2].addresses[0],
+                              self.routers[5].addresses[0],
+                              [
+                                  self.routers[5].addresses[0],
+                                  self.routers[3].addresses[0],
+                                  self.routers[1].addresses[0],
+                                  self.routers[4].addresses[0],
+                                  self.routers[2].addresses[0],
+                                  self.routers[0].addresses[0],
+                                  self.routers[5].addresses[0],
+                                  self.routers[3].addresses[0],
+                                  self.routers[1].addresses[0]
+                              ],
+                              'multi.16')
+        test.run()
+        self.assertEqual(None, test.error)
+
+    def test_17_multiphase_3(self):
+        test = MultiPhaseTest(self.routers[1].addresses[0],
+                              self.routers[0].addresses[0],
+                              [
+                                  self.routers[0].addresses[0],
+                                  self.routers[1].addresses[0],
+                                  self.routers[2].addresses[0],
+                                  self.routers[3].addresses[0],
+                                  self.routers[4].addresses[0],
+                                  self.routers[5].addresses[0],
+                                  self.routers[0].addresses[0],
+                                  self.routers[1].addresses[0],
+                                  self.routers[2].addresses[0]
+                              ],
+                              'multi.17')
+        test.run()
+        self.assertEqual(None, test.error)
+
+
+
+class Timeout(object):
+    def __init__(self, parent):
+        self.parent = parent
+
+    def on_timer_task(self, event):
+        self.parent.timeout()
+
+
+class WaypointTest(MessagingHandler):
+    def __init__(self, sender_host, receiver_host, waypoint_host, addr):
+        super(WaypointTest, self).__init__()
+        self.sender_host   = sender_host
+        self.receiver_host = receiver_host
+        self.waypoint_host = waypoint_host
+        self.addr          = addr
+        self.count         = 300
+
+        self.sender_conn   = None
+        self.receiver_conn = None
+        self.waypoint_conn = None
+        self.error         = None
+        self.n_tx          = 0
+        self.n_rx          = 0
+        self.n_thru        = 0
+
+    def timeout(self):
+        self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_thru=%d" % 
(self.n_tx, self.n_rx, self.n_thru)
+        self.sender_conn.close()
+        self.receiver_conn.close()
+        self.waypoint_conn.close()
+
+    def fail(self, error):
+        self.error = error
+        self.sender_conn.close()
+        self.receiver_conn.close()
+        self.waypoint_conn.close()
+        self.timer.cancel()
+
+    def on_start(self, event):
+        self.timer          = event.reactor.schedule(10.0, Timeout(self))
+        self.sender_conn    = event.container.connect(self.sender_host)
+        self.receiver_conn  = event.container.connect(self.receiver_host)
+        self.waypoint_conn  = event.container.connect(self.waypoint_host)
+        self.sender         = event.container.create_sender(self.sender_conn, 
self.addr)
+        self.receiver       = 
event.container.create_receiver(self.receiver_conn, self.addr)
+        self.wp_sender      = 
event.container.create_sender(self.waypoint_conn, self.addr)
+        self.wp_receiver    = 
event.container.create_receiver(self.waypoint_conn, self.addr)
+        self.wp_sender.target.capabilities.put_object(symbol("qd.waypoint"))
+        self.wp_receiver.source.capabilities.put_object(symbol("qd.waypoint"))
+
+    def on_sendable(self, event):
+        if event.sender == self.sender:
+            while self.sender.credit > 0 and self.n_tx < self.count:
+                self.sender.send(Message("Message %d" % self.n_tx))
+                self.n_tx += 1
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            self.n_rx += 1
+            if self.n_rx == self.count and self.n_thru == self.count:
+                self.fail(None)
+        elif event.receiver == self.wp_receiver:
+            self.n_thru += 1
+            self.wp_sender.send(Message(event.message.body))
+
+    def run(self):
+        Container(self).run()
+
+
+class MultiPhaseTest(MessagingHandler):
+    def __init__(self, sender_host, receiver_host, waypoint_hosts, addr):
+        super(MultiPhaseTest, self).__init__()
+        self.sender_host    = sender_host
+        self.receiver_host  = receiver_host
+        self.waypoint_hosts = waypoint_hosts
+        self.addr           = addr
+        self.count          = 300
+
+        self.sender_conn    = None
+        self.receiver_conn  = None
+        self.waypoint_conns = []
+        self.wp_senders     = []
+        self.wp_receivers   = []
+        self.error          = None
+        self.n_tx           = 0
+        self.n_rx           = 0
+        self.n_thru         = [0,0,0,0,0,0,0,0,0]
+
+    def timeout(self):
+        self.error = "Timeout Expired - n_tx=%d, n_rx=%d, n_thru=%r" % 
(self.n_tx, self.n_rx, self.n_thru)
+        self.sender_conn.close()
+        self.receiver_conn.close()
+        for c in self.waypoint_conns:
+            c.close()
+
+    def fail(self, error):
+        self.error = error
+        self.sender_conn.close()
+        self.receiver_conn.close()
+        for c in self.waypoint_conns:
+            c.close()
+        self.timer.cancel()
+
+    def on_start(self, event):
+        self.timer          = event.reactor.schedule(10.0, Timeout(self))
+        self.sender_conn    = event.container.connect(self.sender_host)
+        self.receiver_conn  = event.container.connect(self.receiver_host)
+        self.sender         = event.container.create_sender(self.sender_conn, 
self.addr)
+        self.receiver       = 
event.container.create_receiver(self.receiver_conn, self.addr)
+        for host in self.waypoint_hosts:
+            self.waypoint_conns.append(event.container.connect(host))
+
+        ordinal = 1
+        for conn in self.waypoint_conns:
+            sender   = event.container.create_sender(conn, self.addr)
+            receiver = event.container.create_receiver(conn, self.addr)
+
+            sender.target.capabilities.put_object(symbol("qd.waypoint.%d" % 
ordinal))
+            receiver.source.capabilities.put_object(symbol("qd.waypoint.%d" % 
ordinal))
+
+            self.wp_senders.append(sender)
+            self.wp_receivers.append(receiver)
+            ordinal += 1
+
+    def on_sendable(self, event):
+        if event.sender == self.sender:
+            while self.sender.credit > 0 and self.n_tx < self.count:
+                self.sender.send(Message("Message %d" % self.n_tx))
+                self.n_tx += 1
+
+    def on_message(self, event):
+        if event.receiver == self.receiver:
+            self.n_rx += 1
+            if self.n_rx == self.count:
+                self.fail(None)
+        else:
+            idx = 0
+            for receiver in self.wp_receivers:
+                if event.receiver == receiver:
+                    self.n_thru[idx] += 1
+                    self.wp_senders[idx].send(Message(event.message.body))
+                    return
+                idx += 1
+
+    def run(self):
+        Container(self).run()
+
+
+if __name__== '__main__':
+    unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to