Repository: qpid-dispatch
Updated Branches:
  refs/heads/master c80efc25d -> 04419edae


DISPATCH-1156 - Added detection and prevention of edge-echo in the delivery 
forwarders.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/04419eda
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/04419eda
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/04419eda

Branch: refs/heads/master
Commit: 04419edae53a8548e7fd8afdd47d54872bf6492a
Parents: c80efc2
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 24 17:38:45 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 24 17:38:45 2018 -0400

----------------------------------------------------------------------
 src/router_core/connections.c         | 10 +++++
 src/router_core/forwarder.c           | 68 +++++++++++++++++++++---------
 src/router_core/router_core_private.h |  3 +-
 src/router_core/transfer.c            |  6 ++-
 4 files changed, 65 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/04419eda/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 09f3878..bbd7a27 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1524,6 +1524,11 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t 
*core, qdr_action_t *act
     qdr_add_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
 
     //
+    // Mark the link as an edge link if it's inside an edge-uplink connection.
+    //
+    link->edge = (conn->role == QDR_ROLE_EDGE_UPLINK);
+
+    //
     // Reject any attaches of inter-router links that arrive on connections 
that are not inter-router.
     //
     if (((link->link_type == QD_LINK_CONTROL || link->link_type == 
QD_LINK_ROUTER) &&
@@ -1755,6 +1760,11 @@ static void qdr_link_inbound_second_attach_CT(qdr_core_t 
*core, qdr_action_t *ac
 
     link->oper_status = QDR_LINK_OPER_UP;
 
+    //
+    // Mark the link as an edge link if it's inside an edge-uplink connection.
+    //
+    link->edge = (conn->role == QDR_ROLE_EDGE_UPLINK);
+
     if (link->core_endpoint) {
         qdrc_endpoint_do_second_attach_CT(core, link->core_endpoint, source, 
target);
         return;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/04419eda/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index e1d6d57..ca3f5df 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -290,6 +290,15 @@ static uint8_t qdr_forward_effective_priority(qd_message_t 
*msg, qdr_address_t *
 }
 
 
+/**
+ * Determine if forwarding a delivery onto a link will result in edge-echo.
+ */
+static inline bool qdr_forward_edge_echo_CT(qdr_delivery_t *in_dlv, qdr_link_t 
*out_link)
+{
+    return (in_dlv && in_dlv->via_edge && in_dlv->link->conn == 
out_link->conn);
+}
+
+
 int qdr_forward_multicast_CT(qdr_core_t      *core,
                              qdr_address_t   *addr,
                              qd_message_t    *msg,
@@ -323,21 +332,28 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     if (!addr->local || exclude_inprocess) {
         qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
         while (link_ref) {
-            qdr_link_t     *out_link     = link_ref->link;
-            qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, 
in_delivery, out_link, msg);
+            qdr_link_t *out_link = link_ref->link;
 
-            // Store the out_link and out_delivery so we can forward the 
delivery later on
-            qdr_forward_deliver_info_t *deliver_info = 
new_qdr_forward_deliver_info_t();
-            ZERO(deliver_info);
-            deliver_info->out_dlv = out_delivery;
-            deliver_info->out_link = out_link;
-            DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+            //
+            // Only forward via links that don't result in edge-echo.
+            //
+            if (!qdr_forward_edge_echo_CT(in_delivery, out_link)) {
+                qdr_delivery_t *out_delivery = 
qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
 
-            fanout++;
-            if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type 
!= QD_LINK_ROUTER) {
-                addr->deliveries_egress++;
-                core->deliveries_egress++;
+                // Store the out_link and out_delivery so we can forward the 
delivery later on
+                qdr_forward_deliver_info_t *deliver_info = 
new_qdr_forward_deliver_info_t();
+                ZERO(deliver_info);
+                deliver_info->out_dlv = out_delivery;
+                deliver_info->out_link = out_link;
+                DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
+
+                fanout++;
+                if (out_link->link_type != QD_LINK_CONTROL && 
out_link->link_type != QD_LINK_ROUTER) {
+                    addr->deliveries_egress++;
+                    core->deliveries_egress++;
+                }
             }
+
             link_ref = DEQ_NEXT(link_ref);
         }
     }
@@ -543,6 +559,13 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
     // Forward to a local subscriber.
     //
     qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
+
+    //
+    // If this link results in edge-echo, skip to the next link in the list.
+    //
+    while (link_ref && qdr_forward_edge_echo_CT(in_delivery, link_ref->link))
+        link_ref = DEQ_NEXT(link_ref);
+
     if (link_ref) {
         out_link     = link_ref->link;
         out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, 
out_link, msg);
@@ -658,15 +681,20 @@ int qdr_forward_balanced_CT(qdr_core_t      *core,
         bool        eligible = link->capacity > value;
 
         //
-        // If this is the best eligible link so far, record the fact.
-        // Otherwise, if this is the best ineligible link, make note of that.
+        // Only consider links that do not result in edge-echo.
         //
-        if (eligible && eligible_link_value > value) {
-            best_eligible_link  = link;
-            eligible_link_value = value;
-        } else if (!eligible && ineligible_link_value > value) {
-            best_ineligible_link  = link;
-            ineligible_link_value = value;
+        if (!qdr_forward_edge_echo_CT(in_delivery, link)) {
+            //
+            // If this is the best eligible link so far, record the fact.
+            // Otherwise, if this is the best ineligible link, make note of 
that.
+            //
+            if (eligible && eligible_link_value > value) {
+                best_eligible_link  = link;
+                eligible_link_value = value;
+            } else if (!eligible && ineligible_link_value > value) {
+                best_ineligible_link  = link;
+                ineligible_link_value = value;
+            }
         }
 
         link_ref = DEQ_NEXT(link_ref);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/04419eda/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index b5e354a..352f568 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -370,7 +370,7 @@ struct qdr_delivery_t {
     qdr_subscription_list_t subscriptions;
     qdr_delivery_ref_list_t peers;             /// Use this list if there if 
the delivery has more than one peer.
     bool                    multicast;         /// True if this delivery is 
targeted for a multicast address.
-
+    bool                    via_edge;          /// True if this delivery 
arrived via an edge-uplink connection.
 };
 
 ALLOC_DECLARE(qdr_delivery_t);
@@ -426,6 +426,7 @@ struct qdr_link_t {
     bool                     drain_mode;
     bool                     stalled_outbound;  ///< Indicates that this link 
is stalled on outbound buffer backpressure
     bool                     detach_received;
+    bool                     edge;              ///< True if this link is in 
an edge-uplink connection
     char                    *strip_prefix;
     char                    *insert_prefix;
     bool                     terminus_survives_disconnect;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/04419eda/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 7b7d950..30e83a3 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -986,9 +986,13 @@ static void qdr_link_deliver_CT(qdr_core_t *core, 
qdr_action_t *action, bool dis
         return;
 
     qdr_delivery_t *dlv  = action->args.connection.delivery;
+    bool            more = action->args.connection.more;
     qdr_link_t     *link = dlv->link;
 
-    bool more = action->args.connection.more;
+    //
+    // If the link is an edge link, mark this delivery as via-edge
+    //
+    dlv->via_edge = link->edge;
 
     //
     // If this link has a core_endpoint, direct deliveries to that endpoint.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to