[ 
https://issues.apache.org/jira/browse/DISPATCH-1160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670408#comment-16670408
 ] 

ASF GitHub Bot commented on DISPATCH-1160:
------------------------------------------

Github user ted-ross commented on a diff in the pull request:

    https://github.com/apache/qpid-dispatch/pull/410#discussion_r229791449
  
    --- Diff: src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c 
---
    @@ -0,0 +1,372 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +#include <qpid/dispatch/ctools.h>
    +#include <qpid/dispatch/amqp.h>
    +#include "module.h"
    +#include "core_link_endpoint.h"
    +#include "edge_addr_tracking.h"
    +#include <stdio.h>
    +
    +
    +struct qdr_addr_endpoint_state_t {
    +    DEQ_LINKS(qdr_addr_endpoint_state_t);
    +    qdrc_endpoint_t                    *endpoint;
    +    qdr_connection_t                   *conn;    // The connection 
associated with the endpoint.
    +    qdr_addr_tracking_module_context_t *mc;
    +};
    +
    +DEQ_DECLARE(qdr_addr_endpoint_state_t, qdr_addr_endpoint_state_list_t);
    +ALLOC_DECLARE(qdr_addr_endpoint_state_t);
    +ALLOC_DEFINE(qdr_addr_endpoint_state_t);
    +
    +struct  qdr_addr_tracking_module_context_t {
    +    qdr_core_t                     *core;
    +    qdr_addr_endpoint_state_list_t  endpoint_state_list;
    +    qdrc_event_subscription_t      *event_sub;
    +    qdrc_endpoint_desc_t           addr_tracking_endpoint;
    +};
    +
    +
    +static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core, 
qdr_address_t   *addr, bool insert_addr)
    +{
    +    qd_message_t *msg = qd_message();
    +
    +    //
    +    // Start header
    +    //
    +    qd_composed_field_t *fld   = qd_compose(QD_PERFORMATIVE_HEADER, 0);
    +    qd_compose_start_list(fld);
    +    qd_compose_insert_bool(fld, 0);     // durable
    +    qd_compose_end_list(fld);
    +
    +    qd_composed_field_t *body = 
qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
    +
    +    qd_compose_start_list(body);
    +
    +    const char *addr_str = (const char 
*)qd_hash_key_by_handle(addr->hash_handle);
    +
    +    qd_compose_insert_string(body, addr_str);
    +    qd_compose_insert_bool(body, insert_addr);
    +    qd_compose_end_list(body);
    +
    +    // Finally, compose and retuen the message so it can be sent out.
    +    qd_message_compose_3(msg, fld, body);
    +
    +    return msg;
    +}
    +
    +static qdr_addr_endpoint_state_t 
*qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t  
endpoint_state_list, qdr_connection_t *conn, qdr_link_t *link)
    +{
    +    qdr_addr_endpoint_state_t *endpoint_state = 
DEQ_HEAD(endpoint_state_list);
    +    while(endpoint_state) {
    +        if (endpoint_state->conn == conn) {
    +            return endpoint_state;
    +        }
    +        endpoint_state = DEQ_NEXT(endpoint_state);
    +    }
    +    return 0;
    +}
    +
    +
    +static void qdrc_address_endpoint_first_attach(void              
*bind_context,
    +                                               qdrc_endpoint_t   *endpoint,
    +                                               void             
**link_context,
    +                                               qdr_terminus_t   
*remote_source,
    +                                               qdr_terminus_t   
*remote_target)
    +{
    +    qdr_addr_tracking_module_context_t *bc = 
(qdr_addr_tracking_module_context_t *) bind_context;
    +
    +    qdr_addr_endpoint_state_t *endpoint_state = 
new_qdr_addr_endpoint_state_t();
    +
    +    ZERO(endpoint_state);
    +    endpoint_state->endpoint = endpoint;
    +    endpoint_state->mc       = bc;
    +    endpoint_state->conn     = qdrc_endpoint_get_connection_CT(endpoint);
    +
    +    DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state);
    +
    +
    +    //
    +    // The link to hard coded address QD_TERMINUS_EDGE_ADDRESS_TRACKING 
should be created only if this is a receiver link
    +    // and if this link is created inside the QDR_ROLE_EDGE_CONNECTION 
connection.
    +    //
    +    if (qdrc_endpoint_get_direction_CT(endpoint) == QD_OUTGOING && 
qdrc_endpoint_get_connection_CT(endpoint)->role == QDR_ROLE_EDGE_CONNECTION) {
    +        *link_context = endpoint_state;
    +        qdrc_endpoint_second_attach_CT(bc->core, endpoint, remote_source, 
remote_target);
    +   }
    +    else {
    +        //
    +        // We simply detach any links that dont match the above condition.
    +        //
    +        *link_context = 0;
    +        qdrc_endpoint_detach_CT(bc->core, endpoint, 0);
    +    }
    +}
    +
    +
    +static void qdrc_address_endpoint_on_first_detach(void *link_context,
    +                                              qdr_error_t *error)
    +{
    +    qdr_addr_endpoint_state_t *endpoint_state  = 
(qdr_addr_endpoint_state_t *)link_context;
    +    qdrc_endpoint_detach_CT(endpoint_state->mc->core, 
endpoint_state->endpoint, 0);
    +    qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
    +    DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
    +    free_qdr_addr_endpoint_state_t(endpoint_state);
    +}
    +
    +
    +static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t 
*conn)
    +{
    +    if (!addr)
    +        return false;
    +
    +    bool can_send = false;
    +    if (DEQ_SIZE(addr->rlinks) > 1 || qd_bitmask_cardinality(addr->rnodes) 
> 0) {
    +        // There is at least one receiver for this address somewhere in 
the router network
    +        can_send = true;
    +    }
    +    if (!can_send) {
    +        if (DEQ_SIZE(addr->rlinks) == 1) {
    +            qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
    +            if (link_ref->link->conn != conn)
    +                can_send=true;
    +        }
    +    }
    +    return can_send;
    +}
    +
    +
    +static void qdrc_send_message(qdr_core_t *core, qdr_address_t *addr, 
qdrc_endpoint_t *endpoint, bool insert_addr)
    +{
    +    if (!addr)
    +        return;
    +
    +    if (!endpoint)
    +        return;
    +
    +    qd_message_t *msg = qdcm_edge_create_address_dlv(core, addr, 
insert_addr);
    +    qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(core, endpoint, msg);
    +
    +    qdrc_endpoint_send_CT(core, endpoint, dlv, true);
    +}
    +
    +static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t 
*addr)
    +{
    +    // We only care about mobile addresses.
    +    if(!qdr_address_is_mobile_CT(addr))
    +        return;
    +
    +    qdr_addr_tracking_module_context_t *addr_tracking = 
(qdr_addr_tracking_module_context_t*) context;
    +    switch (event) {
    +        case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST : {
    +            //
    +            // This address transitioned from zero to one local 
destination. If this address already has more than zero remote destinations, 
don't do anything
    +            //
    +            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
    +                qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +                //
    +                // Every inlink that has an edge context must be informed 
of the appearence of this address.
    +                //
    +                while (inlink) {
    +                    if(inlink->link->edge_context != 0) {
    +                        qdr_addr_endpoint_state_t *endpoint_state = 
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
    +                        if (qdrc_can_send_address(addr, 
endpoint_state->conn) ) {
    +                            qdrc_endpoint_t *endpoint = 
endpoint_state->endpoint;
    +                            qdrc_send_message(addr_tracking->core, addr, 
endpoint, true);
    +                        }
    +                    }
    +                    inlink = DEQ_NEXT(inlink);
    +                }
    +            }
    +            break;
    +        }
    +        case QDRC_EVENT_ADDR_BECAME_DEST : {
    +            //
    +            // This address transitioned from zero to one destination. If 
this address already had local destinations
    +            //
    +            qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +            //
    +            // Every inlink that has an edge context must be informed of 
the appearence of this address.
    +            //
    +            while (inlink) {
    +                if(inlink->link->edge_context != 0) {
    +                    qdr_addr_endpoint_state_t *endpoint_state = 
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
    +                    if (qdrc_can_send_address(addr, endpoint_state->conn) 
) {
    +                        qdrc_endpoint_t *endpoint = 
endpoint_state->endpoint;
    +                        qdrc_send_message(addr_tracking->core, addr, 
endpoint, true);
    +                    }
    +                }
    +                inlink = DEQ_NEXT(inlink);
    +            }
    +        }
    +        break;
    +
    +        case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST : {
    +            // The address no longer has any local destinations.
    +            // If there are no remote destinations either, we have to tell 
the edge routers to delete their sender links
    +            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
    +                qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +                //
    +                // Every inlink that has an edge context must be informed 
of the disappearence of this address.
    +                //
    +                while (inlink) {
    +                    if(inlink->link->edge_context != 0) {
    +                        qdr_addr_endpoint_state_t *endpoint_state = 
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
    +                        qdrc_endpoint_t *endpoint = 
endpoint_state->endpoint;
    +                        qdrc_send_message(addr_tracking->core, addr, 
endpoint, false);
    +                    }
    +                    inlink = DEQ_NEXT(inlink);
    +                }
    +            }
    +
    +            break;
    +        }
    +        case QDRC_EVENT_ADDR_ONE_LOCAL_DEST: {
    +            //
    +            // This address transitioned from N destinations to one local 
dest
    +            // If this address already has non-zero remote destinations, 
there is no need to tell the edge routers about it
    +            //
    +            if (qd_bitmask_cardinality(addr->rnodes) == 0) {
    +                assert(DEQ_SIZE(addr->rlinks) == 1);
    +                //
    +                // There should be only one rlink in the rlinks list
    +                //
    +                qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks);
    +                qdr_link_t *link = rlink_ref->link;
    +
    +                qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +                while (inlink) {
    +                    if(inlink->link->edge_context != 0) {
    +                        qdr_addr_endpoint_state_t *endpoint_state = 
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
    +                        qdrc_endpoint_t *endpoint = 
endpoint_state->endpoint;
    +                        if (endpoint_state->conn == link->conn) {
    +                            qdrc_send_message(addr_tracking->core, addr, 
endpoint, false);
    +                            break;
    +                        }
    +                    }
    +                    inlink = DEQ_NEXT(inlink);
    +                }
    +            }
    +        }
    +        break;
    +        case QDRC_EVENT_ADDR_TWO_DEST: {
    +            // The address transisioned from one local dest to two 
destinations, The second destination might be local or remote.
    +            qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks);
    +            qdr_link_t *link = rlink_ref->link;
    +
    +            qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
    +            while (inlink) {
    +                if(inlink->link->edge_context != 0) {
    +                    qdr_addr_endpoint_state_t *endpoint_state = 
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
    +                    qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
    +                    if (link->conn == endpoint_state->conn) {
    +                        qdrc_send_message(addr_tracking->core, addr, 
endpoint, true);
    --- End diff --
    
    Will this cause redundant messages to be sent to the edge?


> Add edge address tracking module to interior routers which will inform edges 
> of mobile address receiver changes
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: DISPATCH-1160
>                 URL: https://issues.apache.org/jira/browse/DISPATCH-1160
>             Project: Qpid Dispatch
>          Issue Type: Improvement
>    Affects Versions: 1.4.1
>            Reporter: Ganesh Murthy
>            Assignee: Ganesh Murthy
>            Priority: Major
>             Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to