[ https://issues.apache.org/jira/browse/DISPATCH-1160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670408#comment-16670408 ]
ASF GitHub Bot commented on DISPATCH-1160: ------------------------------------------ Github user ted-ross commented on a diff in the pull request: https://github.com/apache/qpid-dispatch/pull/410#discussion_r229791449 --- Diff: src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c --- @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/amqp.h> +#include "module.h" +#include "core_link_endpoint.h" +#include "edge_addr_tracking.h" +#include <stdio.h> + + +struct qdr_addr_endpoint_state_t { + DEQ_LINKS(qdr_addr_endpoint_state_t); + qdrc_endpoint_t *endpoint; + qdr_connection_t *conn; // The connection associated with the endpoint. + qdr_addr_tracking_module_context_t *mc; +}; + +DEQ_DECLARE(qdr_addr_endpoint_state_t, qdr_addr_endpoint_state_list_t); +ALLOC_DECLARE(qdr_addr_endpoint_state_t); +ALLOC_DEFINE(qdr_addr_endpoint_state_t); + +struct qdr_addr_tracking_module_context_t { + qdr_core_t *core; + qdr_addr_endpoint_state_list_t endpoint_state_list; + qdrc_event_subscription_t *event_sub; + qdrc_endpoint_desc_t addr_tracking_endpoint; +}; + + +static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core, qdr_address_t *addr, bool insert_addr) +{ + qd_message_t *msg = qd_message(); + + // + // Start header + // + qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0); + qd_compose_start_list(fld); + qd_compose_insert_bool(fld, 0); // durable + qd_compose_end_list(fld); + + qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + + qd_compose_start_list(body); + + const char *addr_str = (const char *)qd_hash_key_by_handle(addr->hash_handle); + + qd_compose_insert_string(body, addr_str); + qd_compose_insert_bool(body, insert_addr); + qd_compose_end_list(body); + + // Finally, compose and retuen the message so it can be sent out. + qd_message_compose_3(msg, fld, body); + + return msg; +} + +static qdr_addr_endpoint_state_t *qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t endpoint_state_list, qdr_connection_t *conn, qdr_link_t *link) +{ + qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(endpoint_state_list); + while(endpoint_state) { + if (endpoint_state->conn == conn) { + return endpoint_state; + } + endpoint_state = DEQ_NEXT(endpoint_state); + } + return 0; +} + + +static void qdrc_address_endpoint_first_attach(void *bind_context, + qdrc_endpoint_t *endpoint, + void **link_context, + qdr_terminus_t *remote_source, + qdr_terminus_t *remote_target) +{ + qdr_addr_tracking_module_context_t *bc = (qdr_addr_tracking_module_context_t *) bind_context; + + qdr_addr_endpoint_state_t *endpoint_state = new_qdr_addr_endpoint_state_t(); + + ZERO(endpoint_state); + endpoint_state->endpoint = endpoint; + endpoint_state->mc = bc; + endpoint_state->conn = qdrc_endpoint_get_connection_CT(endpoint); + + DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state); + + + // + // The link to hard coded address QD_TERMINUS_EDGE_ADDRESS_TRACKING should be created only if this is a receiver link + // and if this link is created inside the QDR_ROLE_EDGE_CONNECTION connection. + // + if (qdrc_endpoint_get_direction_CT(endpoint) == QD_OUTGOING && qdrc_endpoint_get_connection_CT(endpoint)->role == QDR_ROLE_EDGE_CONNECTION) { + *link_context = endpoint_state; + qdrc_endpoint_second_attach_CT(bc->core, endpoint, remote_source, remote_target); + } + else { + // + // We simply detach any links that dont match the above condition. + // + *link_context = 0; + qdrc_endpoint_detach_CT(bc->core, endpoint, 0); + } +} + + +static void qdrc_address_endpoint_on_first_detach(void *link_context, + qdr_error_t *error) +{ + qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)link_context; + qdrc_endpoint_detach_CT(endpoint_state->mc->core, endpoint_state->endpoint, 0); + qdr_addr_tracking_module_context_t *mc = endpoint_state->mc; + DEQ_REMOVE(mc->endpoint_state_list, endpoint_state); + free_qdr_addr_endpoint_state_t(endpoint_state); +} + + +static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t *conn) +{ + if (!addr) + return false; + + bool can_send = false; + if (DEQ_SIZE(addr->rlinks) > 1 || qd_bitmask_cardinality(addr->rnodes) > 0) { + // There is at least one receiver for this address somewhere in the router network + can_send = true; + } + if (!can_send) { + if (DEQ_SIZE(addr->rlinks) == 1) { + qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks); + if (link_ref->link->conn != conn) + can_send=true; + } + } + return can_send; +} + + +static void qdrc_send_message(qdr_core_t *core, qdr_address_t *addr, qdrc_endpoint_t *endpoint, bool insert_addr) +{ + if (!addr) + return; + + if (!endpoint) + return; + + qd_message_t *msg = qdcm_edge_create_address_dlv(core, addr, insert_addr); + qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(core, endpoint, msg); + + qdrc_endpoint_send_CT(core, endpoint, dlv, true); +} + +static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t *addr) +{ + // We only care about mobile addresses. + if(!qdr_address_is_mobile_CT(addr)) + return; + + qdr_addr_tracking_module_context_t *addr_tracking = (qdr_addr_tracking_module_context_t*) context; + switch (event) { + case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST : { + // + // This address transitioned from zero to one local destination. If this address already has more than zero remote destinations, don't do anything + // + if (qd_bitmask_cardinality(addr->rnodes) == 0) { + qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks); + // + // Every inlink that has an edge context must be informed of the appearence of this address. + // + while (inlink) { + if(inlink->link->edge_context != 0) { + qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context; + if (qdrc_can_send_address(addr, endpoint_state->conn) ) { + qdrc_endpoint_t *endpoint = endpoint_state->endpoint; + qdrc_send_message(addr_tracking->core, addr, endpoint, true); + } + } + inlink = DEQ_NEXT(inlink); + } + } + break; + } + case QDRC_EVENT_ADDR_BECAME_DEST : { + // + // This address transitioned from zero to one destination. If this address already had local destinations + // + qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks); + // + // Every inlink that has an edge context must be informed of the appearence of this address. + // + while (inlink) { + if(inlink->link->edge_context != 0) { + qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context; + if (qdrc_can_send_address(addr, endpoint_state->conn) ) { + qdrc_endpoint_t *endpoint = endpoint_state->endpoint; + qdrc_send_message(addr_tracking->core, addr, endpoint, true); + } + } + inlink = DEQ_NEXT(inlink); + } + } + break; + + case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST : { + // The address no longer has any local destinations. + // If there are no remote destinations either, we have to tell the edge routers to delete their sender links + if (qd_bitmask_cardinality(addr->rnodes) == 0) { + qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks); + // + // Every inlink that has an edge context must be informed of the disappearence of this address. + // + while (inlink) { + if(inlink->link->edge_context != 0) { + qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context; + qdrc_endpoint_t *endpoint = endpoint_state->endpoint; + qdrc_send_message(addr_tracking->core, addr, endpoint, false); + } + inlink = DEQ_NEXT(inlink); + } + } + + break; + } + case QDRC_EVENT_ADDR_ONE_LOCAL_DEST: { + // + // This address transitioned from N destinations to one local dest + // If this address already has non-zero remote destinations, there is no need to tell the edge routers about it + // + if (qd_bitmask_cardinality(addr->rnodes) == 0) { + assert(DEQ_SIZE(addr->rlinks) == 1); + // + // There should be only one rlink in the rlinks list + // + qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks); + qdr_link_t *link = rlink_ref->link; + + qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks); + while (inlink) { + if(inlink->link->edge_context != 0) { + qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context; + qdrc_endpoint_t *endpoint = endpoint_state->endpoint; + if (endpoint_state->conn == link->conn) { + qdrc_send_message(addr_tracking->core, addr, endpoint, false); + break; + } + } + inlink = DEQ_NEXT(inlink); + } + } + } + break; + case QDRC_EVENT_ADDR_TWO_DEST: { + // The address transisioned from one local dest to two destinations, The second destination might be local or remote. + qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks); + qdr_link_t *link = rlink_ref->link; + + qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks); + while (inlink) { + if(inlink->link->edge_context != 0) { + qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t *)inlink->link->edge_context; + qdrc_endpoint_t *endpoint = endpoint_state->endpoint; + if (link->conn == endpoint_state->conn) { + qdrc_send_message(addr_tracking->core, addr, endpoint, true); --- End diff -- Will this cause redundant messages to be sent to the edge? > Add edge address tracking module to interior routers which will inform edges > of mobile address receiver changes > --------------------------------------------------------------------------------------------------------------- > > Key: DISPATCH-1160 > URL: https://issues.apache.org/jira/browse/DISPATCH-1160 > Project: Qpid Dispatch > Issue Type: Improvement > Affects Versions: 1.4.1 > Reporter: Ganesh Murthy > Assignee: Ganesh Murthy > Priority: Major > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org