[
https://issues.apache.org/jira/browse/DISPATCH-1160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16670408#comment-16670408
]
ASF GitHub Bot commented on DISPATCH-1160:
------------------------------------------
Github user ted-ross commented on a diff in the pull request:
https://github.com/apache/qpid-dispatch/pull/410#discussion_r229791449
--- Diff: src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/amqp.h>
+#include "module.h"
+#include "core_link_endpoint.h"
+#include "edge_addr_tracking.h"
+#include <stdio.h>
+
+
+struct qdr_addr_endpoint_state_t {
+ DEQ_LINKS(qdr_addr_endpoint_state_t);
+ qdrc_endpoint_t *endpoint;
+ qdr_connection_t *conn; // The connection
associated with the endpoint.
+ qdr_addr_tracking_module_context_t *mc;
+};
+
+DEQ_DECLARE(qdr_addr_endpoint_state_t, qdr_addr_endpoint_state_list_t);
+ALLOC_DECLARE(qdr_addr_endpoint_state_t);
+ALLOC_DEFINE(qdr_addr_endpoint_state_t);
+
+struct qdr_addr_tracking_module_context_t {
+ qdr_core_t *core;
+ qdr_addr_endpoint_state_list_t endpoint_state_list;
+ qdrc_event_subscription_t *event_sub;
+ qdrc_endpoint_desc_t addr_tracking_endpoint;
+};
+
+
+static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t *core,
qdr_address_t *addr, bool insert_addr)
+{
+ qd_message_t *msg = qd_message();
+
+ //
+ // Start header
+ //
+ qd_composed_field_t *fld = qd_compose(QD_PERFORMATIVE_HEADER, 0);
+ qd_compose_start_list(fld);
+ qd_compose_insert_bool(fld, 0); // durable
+ qd_compose_end_list(fld);
+
+ qd_composed_field_t *body =
qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+
+ qd_compose_start_list(body);
+
+ const char *addr_str = (const char
*)qd_hash_key_by_handle(addr->hash_handle);
+
+ qd_compose_insert_string(body, addr_str);
+ qd_compose_insert_bool(body, insert_addr);
+ qd_compose_end_list(body);
+
+ // Finally, compose and retuen the message so it can be sent out.
+ qd_message_compose_3(msg, fld, body);
+
+ return msg;
+}
+
+static qdr_addr_endpoint_state_t
*qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t
endpoint_state_list, qdr_connection_t *conn, qdr_link_t *link)
+{
+ qdr_addr_endpoint_state_t *endpoint_state =
DEQ_HEAD(endpoint_state_list);
+ while(endpoint_state) {
+ if (endpoint_state->conn == conn) {
+ return endpoint_state;
+ }
+ endpoint_state = DEQ_NEXT(endpoint_state);
+ }
+ return 0;
+}
+
+
+static void qdrc_address_endpoint_first_attach(void
*bind_context,
+ qdrc_endpoint_t *endpoint,
+ void
**link_context,
+ qdr_terminus_t
*remote_source,
+ qdr_terminus_t
*remote_target)
+{
+ qdr_addr_tracking_module_context_t *bc =
(qdr_addr_tracking_module_context_t *) bind_context;
+
+ qdr_addr_endpoint_state_t *endpoint_state =
new_qdr_addr_endpoint_state_t();
+
+ ZERO(endpoint_state);
+ endpoint_state->endpoint = endpoint;
+ endpoint_state->mc = bc;
+ endpoint_state->conn = qdrc_endpoint_get_connection_CT(endpoint);
+
+ DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state);
+
+
+ //
+ // The link to hard coded address QD_TERMINUS_EDGE_ADDRESS_TRACKING
should be created only if this is a receiver link
+ // and if this link is created inside the QDR_ROLE_EDGE_CONNECTION
connection.
+ //
+ if (qdrc_endpoint_get_direction_CT(endpoint) == QD_OUTGOING &&
qdrc_endpoint_get_connection_CT(endpoint)->role == QDR_ROLE_EDGE_CONNECTION) {
+ *link_context = endpoint_state;
+ qdrc_endpoint_second_attach_CT(bc->core, endpoint, remote_source,
remote_target);
+ }
+ else {
+ //
+ // We simply detach any links that dont match the above condition.
+ //
+ *link_context = 0;
+ qdrc_endpoint_detach_CT(bc->core, endpoint, 0);
+ }
+}
+
+
+static void qdrc_address_endpoint_on_first_detach(void *link_context,
+ qdr_error_t *error)
+{
+ qdr_addr_endpoint_state_t *endpoint_state =
(qdr_addr_endpoint_state_t *)link_context;
+ qdrc_endpoint_detach_CT(endpoint_state->mc->core,
endpoint_state->endpoint, 0);
+ qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
+ DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
+ free_qdr_addr_endpoint_state_t(endpoint_state);
+}
+
+
+static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t
*conn)
+{
+ if (!addr)
+ return false;
+
+ bool can_send = false;
+ if (DEQ_SIZE(addr->rlinks) > 1 || qd_bitmask_cardinality(addr->rnodes)
> 0) {
+ // There is at least one receiver for this address somewhere in
the router network
+ can_send = true;
+ }
+ if (!can_send) {
+ if (DEQ_SIZE(addr->rlinks) == 1) {
+ qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
+ if (link_ref->link->conn != conn)
+ can_send=true;
+ }
+ }
+ return can_send;
+}
+
+
+static void qdrc_send_message(qdr_core_t *core, qdr_address_t *addr,
qdrc_endpoint_t *endpoint, bool insert_addr)
+{
+ if (!addr)
+ return;
+
+ if (!endpoint)
+ return;
+
+ qd_message_t *msg = qdcm_edge_create_address_dlv(core, addr,
insert_addr);
+ qdr_delivery_t *dlv = qdrc_endpoint_delivery_CT(core, endpoint, msg);
+
+ qdrc_endpoint_send_CT(core, endpoint, dlv, true);
+}
+
+static void on_addr_event(void *context, qdrc_event_t event, qdr_address_t
*addr)
+{
+ // We only care about mobile addresses.
+ if(!qdr_address_is_mobile_CT(addr))
+ return;
+
+ qdr_addr_tracking_module_context_t *addr_tracking =
(qdr_addr_tracking_module_context_t*) context;
+ switch (event) {
+ case QDRC_EVENT_ADDR_BECAME_LOCAL_DEST : {
+ //
+ // This address transitioned from zero to one local
destination. If this address already has more than zero remote destinations,
don't do anything
+ //
+ if (qd_bitmask_cardinality(addr->rnodes) == 0) {
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ //
+ // Every inlink that has an edge context must be informed
of the appearence of this address.
+ //
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state =
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ if (qdrc_can_send_address(addr,
endpoint_state->conn) ) {
+ qdrc_endpoint_t *endpoint =
endpoint_state->endpoint;
+ qdrc_send_message(addr_tracking->core, addr,
endpoint, true);
+ }
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+ break;
+ }
+ case QDRC_EVENT_ADDR_BECAME_DEST : {
+ //
+ // This address transitioned from zero to one destination. If
this address already had local destinations
+ //
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ //
+ // Every inlink that has an edge context must be informed of
the appearence of this address.
+ //
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state =
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ if (qdrc_can_send_address(addr, endpoint_state->conn)
) {
+ qdrc_endpoint_t *endpoint =
endpoint_state->endpoint;
+ qdrc_send_message(addr_tracking->core, addr,
endpoint, true);
+ }
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+ break;
+
+ case QDRC_EVENT_ADDR_NO_LONGER_LOCAL_DEST : {
+ // The address no longer has any local destinations.
+ // If there are no remote destinations either, we have to tell
the edge routers to delete their sender links
+ if (qd_bitmask_cardinality(addr->rnodes) == 0) {
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ //
+ // Every inlink that has an edge context must be informed
of the disappearence of this address.
+ //
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state =
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ qdrc_endpoint_t *endpoint =
endpoint_state->endpoint;
+ qdrc_send_message(addr_tracking->core, addr,
endpoint, false);
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+
+ break;
+ }
+ case QDRC_EVENT_ADDR_ONE_LOCAL_DEST: {
+ //
+ // This address transitioned from N destinations to one local
dest
+ // If this address already has non-zero remote destinations,
there is no need to tell the edge routers about it
+ //
+ if (qd_bitmask_cardinality(addr->rnodes) == 0) {
+ assert(DEQ_SIZE(addr->rlinks) == 1);
+ //
+ // There should be only one rlink in the rlinks list
+ //
+ qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks);
+ qdr_link_t *link = rlink_ref->link;
+
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state =
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ qdrc_endpoint_t *endpoint =
endpoint_state->endpoint;
+ if (endpoint_state->conn == link->conn) {
+ qdrc_send_message(addr_tracking->core, addr,
endpoint, false);
+ break;
+ }
+ }
+ inlink = DEQ_NEXT(inlink);
+ }
+ }
+ }
+ break;
+ case QDRC_EVENT_ADDR_TWO_DEST: {
+ // The address transisioned from one local dest to two
destinations, The second destination might be local or remote.
+ qdr_link_ref_t *rlink_ref = DEQ_HEAD(addr->rlinks);
+ qdr_link_t *link = rlink_ref->link;
+
+ qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
+ while (inlink) {
+ if(inlink->link->edge_context != 0) {
+ qdr_addr_endpoint_state_t *endpoint_state =
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
+ qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
+ if (link->conn == endpoint_state->conn) {
+ qdrc_send_message(addr_tracking->core, addr,
endpoint, true);
--- End diff --
Will this cause redundant messages to be sent to the edge?
> Add edge address tracking module to interior routers which will inform edges
> of mobile address receiver changes
> ---------------------------------------------------------------------------------------------------------------
>
> Key: DISPATCH-1160
> URL: https://issues.apache.org/jira/browse/DISPATCH-1160
> Project: Qpid Dispatch
> Issue Type: Improvement
> Affects Versions: 1.4.1
> Reporter: Ganesh Murthy
> Assignee: Ganesh Murthy
> Priority: Major
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]