Author: tross Date: Fri Jul 12 21:44:14 2013 New Revision: 1502698 URL: http://svn.apache.org/r1502698 Log: QPID-4967 - Router code advances o Fixed handling of SASL on outbound connections o Added Send and Receive message paths in and out of Python modules o Overhauled the route-table data structures - Multicasting is now supported (multiple sender links with the same address) - Support has been added for message-based routing semantics as well as link-based o Two Dispatch processes connected to each other will now discover each other as neighbors
Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h qpid/trunk/qpid/extras/dispatch/src/agent.c qpid/trunk/qpid/extras/dispatch/src/container.c qpid/trunk/qpid/extras/dispatch/src/parse.c qpid/trunk/qpid/extras/dispatch/src/python_embedded.c qpid/trunk/qpid/extras/dispatch/src/router_node.c qpid/trunk/qpid/extras/dispatch/src/server.c Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h?rev=1502698&r1=1502697&r2=1502698&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h (original) +++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h Fri Jul 12 21:44:14 2013 @@ -88,10 +88,10 @@ typedef struct { int dx_container_register_node_type(dx_dispatch_t *dispatch, const dx_node_type_t *nt); -void dx_container_set_default_node_type(dx_dispatch_t *dispatch, - const dx_node_type_t *nt, - void *node_context, - dx_dist_mode_t supported_dist); +dx_node_t *dx_container_set_default_node_type(dx_dispatch_t *dispatch, + const dx_node_type_t *nt, + void *node_context, + dx_dist_mode_t supported_dist); dx_node_t *dx_container_create_node(dx_dispatch_t *dispatch, const dx_node_type_t *nt, Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h?rev=1502698&r1=1502697&r2=1502698&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h (original) +++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h Fri Jul 12 21:44:14 2013 @@ -31,7 +31,6 @@ typedef void (*dx_router_message_cb)(voi dx_address_t *dx_router_register_address(dx_dispatch_t *dx, - bool is_local, const char *address, dx_router_message_cb handler, void *context); Modified: qpid/trunk/qpid/extras/dispatch/src/agent.c URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/agent.c?rev=1502698&r1=1502697&r2=1502698&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/src/agent.c (original) +++ qpid/trunk/qpid/extras/dispatch/src/agent.c Fri Jul 12 21:44:14 2013 @@ -262,7 +262,7 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx) DEQ_INIT(agent->out_fifo); agent->lock = sys_mutex(); agent->timer = dx_timer(dx, dx_agent_deferred_handler, agent); - agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent); + agent->address = dx_router_register_address(dx, "agent", dx_agent_rx_handler, agent); return agent; } Modified: qpid/trunk/qpid/extras/dispatch/src/container.c URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/container.c?rev=1502698&r1=1502697&r2=1502698&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/src/container.c (original) +++ qpid/trunk/qpid/extras/dispatch/src/container.c Fri Jul 12 21:44:14 2013 @@ -513,10 +513,10 @@ int dx_container_register_node_type(dx_d } -void dx_container_set_default_node_type(dx_dispatch_t *dx, - const dx_node_type_t *nt, - void *context, - dx_dist_mode_t supported_dist) +dx_node_t *dx_container_set_default_node_type(dx_dispatch_t *dx, + const dx_node_type_t *nt, + void *context, + dx_dist_mode_t supported_dist) { dx_container_t *container = dx->container; @@ -530,6 +530,8 @@ void dx_container_set_default_node_type( container->default_node = 0; dx_log(module, LOG_TRACE, "Default node removed"); } + + return container->default_node; } Modified: qpid/trunk/qpid/extras/dispatch/src/parse.c URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/parse.c?rev=1502698&r1=1502697&r2=1502698&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/src/parse.c (original) +++ qpid/trunk/qpid/extras/dispatch/src/parse.c Fri Jul 12 21:44:14 2013 @@ -37,13 +37,14 @@ ALLOC_DECLARE(dx_parsed_field_t); ALLOC_DEFINE(dx_parsed_field_t); -static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count) +static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count, uint32_t *clen) { if (dx_field_iterator_end(iter)) return "Insufficient Data to Determine Tag"; - *tag = dx_field_iterator_octet(iter); - *count = 0; - *length = 0; + *tag = dx_field_iterator_octet(iter); + *count = 0; + *length = 0; + *clen = 0; switch (*tag & 0xF0) { case 0x40: *length = 0; break; @@ -78,6 +79,7 @@ static char *get_type_info(dx_field_iter *count += ((unsigned int) dx_field_iterator_octet(iter)) << 24; *count += ((unsigned int) dx_field_iterator_octet(iter)) << 16; *count += ((unsigned int) dx_field_iterator_octet(iter)) << 8; + *clen = 3; // fall through to the next case case 0xC0: @@ -85,6 +87,7 @@ static char *get_type_info(dx_field_iter if (dx_field_iterator_end(iter)) return "Insufficient Data to Determine Count"; *count += (unsigned int) dx_field_iterator_octet(iter); + *clen += 1; break; } @@ -108,13 +111,13 @@ static dx_parsed_field_t *dx_parse_inter uint32_t length; uint32_t count; + uint32_t length_of_count; - field->parse_error = get_type_info(iter, &field->tag, &length, &count); + field->parse_error = get_type_info(iter, &field->tag, &length, &count, &length_of_count); if (!field->parse_error) { field->raw_iter = dx_field_iterator_sub(iter, length); - if (count == 0 && length > 0) - dx_field_iterator_advance(iter, length); + dx_field_iterator_advance(iter, length - length_of_count); for (uint32_t idx = 0; idx < count; idx++) { dx_parsed_field_t *child = dx_parse_internal(field->raw_iter, field); DEQ_INSERT_TAIL(field->children, child); Modified: qpid/trunk/qpid/extras/dispatch/src/python_embedded.c URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/python_embedded.c?rev=1502698&r1=1502697&r2=1502698&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/src/python_embedded.c (original) +++ qpid/trunk/qpid/extras/dispatch/src/python_embedded.c Fri Jul 12 21:44:14 2013 @@ -396,6 +396,7 @@ static PyTypeObject LogAdapterType = { typedef struct { PyObject_HEAD PyObject *handler; + PyObject *handler_rx_call; dx_dispatch_t *dx; dx_address_t *address; } IoAdapter; @@ -403,9 +404,66 @@ typedef struct { static void dx_io_rx_handler(void *context, dx_message_t *msg) { - //IoAdapter *self = (IoAdapter*) context; + IoAdapter *self = (IoAdapter*) context; - // TODO - Parse the incoming message and send it to the python handler. + // + // Parse the message through the body and exit if the message is not well formed. + // + if (!dx_message_check(msg, DX_DEPTH_BODY)) + return; + + // + // Get an iterator for the application-properties. Exit if the message has none. + // + dx_field_iterator_t *ap = dx_message_field_iterator(msg, DX_FIELD_APPLICATION_PROPERTIES); + if (ap == 0) + return; + + // + // Try to get a map-view of the application-properties. + // + dx_parsed_field_t *ap_map = dx_parse(ap); + if (ap_map == 0 || !dx_parse_ok(ap_map) || !dx_parse_is_map(ap_map)) { + dx_field_iterator_free(ap); + dx_parse_free(ap_map); + return; + } + + // + // Get an iterator for the body. Exit if the message has none. + // + dx_field_iterator_t *body = dx_message_field_iterator(msg, DX_FIELD_BODY); + if (body == 0) { + dx_field_iterator_free(ap); + dx_parse_free(ap_map); + return; + } + + // + // Try to get a map-view of the body. + // + dx_parsed_field_t *body_map = dx_parse(body); + if (body_map == 0 || !dx_parse_ok(body_map) || !dx_parse_is_map(body_map)) { + printf("XXXX %s\n", dx_parse_error(body_map)); + dx_field_iterator_free(ap); + dx_field_iterator_free(body); + dx_parse_free(ap_map); + dx_parse_free(body_map); + return; + } + + PyObject *pAP = dx_field_to_py(ap_map); + PyObject *pBody = dx_field_to_py(body_map); + + PyObject *pArgs = PyTuple_New(2); + PyTuple_SetItem(pArgs, 0, pAP); + PyTuple_SetItem(pArgs, 1, pBody); + + PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs); + Py_DECREF(pArgs); + if (pValue) { + Py_DECREF(pValue); + } } @@ -415,9 +473,14 @@ static int IoAdapter_init(IoAdapter *sel if (!PyArg_ParseTuple(args, "Os", &self->handler, &address)) return -1; + self->handler_rx_call = PyObject_GetAttrString(self->handler, "receive"); + if (!self->handler_rx_call || !PyCallable_Check(self->handler_rx_call)) + return -1; + Py_INCREF(self->handler); + Py_INCREF(self->handler_rx_call); self->dx = dispatch; - self->address = dx_router_register_address(self->dx, true, address, dx_io_rx_handler, self); + self->address = dx_router_register_address(self->dx, address, dx_io_rx_handler, self); return 0; } @@ -426,6 +489,7 @@ static void IoAdapter_dealloc(IoAdapter* { dx_router_unregister_address(self->address); Py_DECREF(self->handler); + Py_DECREF(self->handler_rx_call); self->ob_type->tp_free((PyObject*)self); } Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1502698&r1=1502697&r2=1502698&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original) +++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Fri Jul 12 21:44:14 2013 @@ -28,8 +28,9 @@ static char *module = "ROUTER"; static void dx_router_python_setup(dx_router_t *router); static void dx_pyrouter_tick(dx_router_t *router); -//static char *local_prefix = "_local/"; -//static char *topo_prefix = "_topo/"; +static char *router_address = "_local/qdxrouter"; +static char *local_prefix = "_local/"; +//static char *topo_prefix = "_topo/"; /** * Address Types and Processing: @@ -46,54 +47,74 @@ static void dx_pyrouter_tick(dx_router_t * <mobile> M<mobile> forward+handler forward */ -struct dx_router_t { - dx_dispatch_t *dx; - const char *router_area; - const char *router_id; - dx_node_t *node; - dx_link_list_t in_links; - dx_link_list_t out_links; - dx_message_list_t in_fifo; - sys_mutex_t *lock; - dx_timer_t *timer; - hash_t *out_hash; - uint64_t dtag; - PyObject *pyRouter; - PyObject *pyTick; -}; +typedef struct dx_router_link_t dx_router_link_t; +typedef struct dx_router_node_t dx_router_node_t; -typedef struct { - dx_link_t *link; - dx_message_list_t out_fifo; -} dx_router_link_t; + +typedef enum { + DX_LINK_ENDPOINT, // A link to a connected endpoint + DX_LINK_ROUTER, // A link to a peer router in the same area + DX_LINK_AREA // A link to a peer router in a different area (area boundary) +} dx_link_type_t; + + +struct dx_router_link_t { + DEQ_LINKS(dx_router_link_t); + dx_direction_t link_direction; + dx_link_type_t link_type; + dx_address_t *owning_addr; // [ref] Address record that owns this link + dx_link_t *link; // [own] Link pointer + dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link + dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link + dx_message_list_t out_fifo; // Message FIFO for outgoing messages +}; ALLOC_DECLARE(dx_router_link_t); ALLOC_DEFINE(dx_router_link_t); +DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t); - -typedef struct { +struct dx_router_node_t { + DEQ_LINKS(dx_router_node_t); const char *id; - dx_router_link_t *next_hop; + dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node + dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node // list of valid origins (pointers to router_node) - (bit masks?) -} dx_router_node_t; +}; ALLOC_DECLARE(dx_router_node_t); ALLOC_DEFINE(dx_router_node_t); +DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t); struct dx_address_t { - int is_local; - dx_router_message_cb handler; // In-Process Consumer - void *handler_context; - dx_router_link_t *rlink; // Locally-Connected Consumer - TODO: Make this a list - dx_router_node_t *rnode; // Remotely-Connected Consumer - TODO: Make this a list + dx_router_message_cb handler; // In-Process Consumer + void *handler_context; + dx_router_link_list_t rlinks; // Locally-Connected Consumers + dx_router_node_list_t rnodes; // Remotely-Connected Consumers }; ALLOC_DECLARE(dx_address_t); ALLOC_DEFINE(dx_address_t); +struct dx_router_t { + dx_dispatch_t *dx; + const char *router_area; + const char *router_id; + dx_node_t *node; + dx_router_link_list_t in_links; + dx_router_node_list_t routers; + dx_message_list_t in_fifo; + sys_mutex_t *lock; + dx_timer_t *timer; + hash_t *out_hash; + uint64_t dtag; + PyObject *pyRouter; + PyObject *pyTick; +}; + + /** * Outbound Delivery Handler */ @@ -141,10 +162,11 @@ static void router_tx_handler(void* cont */ static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) { - dx_router_t *router = (dx_router_t*) context; - pn_link_t *pn_link = pn_delivery_link(delivery); - dx_message_t *msg; - int valid_message = 0; + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = pn_delivery_link(delivery); + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + dx_message_t *msg; + int valid_message = 0; // // Receive the message into a local representation. If the returned message @@ -158,20 +180,49 @@ static void router_rx_handler(void* cont return; // - // Validate the message through the Properties section + // Consume the delivery and issue a replacement credit // - valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES); - pn_link_advance(pn_link); pn_link_flow(pn_link, 1); + sys_mutex_lock(router->lock); + + // + // Handle the Link-Routing case. If this incoming link is associated with a connected + // link, simply deliver the message to the outgoing link. There is no need to validate + // the message in this case. + // + if (rlink->connected_link) { + dx_router_link_t *clink = rlink->connected_link; + pn_link_t *pn_outlink = dx_link_pn(clink->link); + DEQ_INSERT_TAIL(clink->out_fifo, msg); + sys_mutex_unlock(router->lock); + + pn_link_offered(pn_outlink, DEQ_SIZE(clink->out_fifo)); + dx_link_activate(clink->link); + sys_mutex_unlock(router->lock); + + return; + } + + // + // We are performing Message-Routing, therefore we will need to validate the message + // through the Properties section so we can access the TO field. + // + dx_message_t *in_process_copy = 0; + dx_router_message_cb handler = 0; + void *handler_context = 0; + + valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES); + if (valid_message) { dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO); dx_address_t *addr; if (iter) { dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); - sys_mutex_lock(router->lock); hash_retrieve(router->out_hash, iter, (void*) &addr); + dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST); + int is_local = dx_field_iterator_prefix(iter, local_prefix); dx_field_iterator_free(iter); if (addr) { @@ -179,51 +230,71 @@ static void router_rx_handler(void* cont // To field is valid and contains a known destination. Handle the various // cases for forwarding. // - // Forward to the in-process handler for this message if there is one. - // Note: If the handler is going to queue the message for deferred processing, - // it must copy the message. This function assumes that the handler - // will process the message synchronously and be finished with it upon - // completion. - // - if (addr->handler) - addr->handler(addr->handler_context, msg); - - // - // Forward to the local link for the locally-connected consumer, if present. - // TODO - Don't forward if this is a "_local" address. - // - if (addr->rlink) { - pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link); - dx_message_t *copy = dx_message_copy(msg); - DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy); - pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo)); - dx_link_activate(addr->rlink->link); + + // + // Forward to the in-process handler for this message if there is one. The + // actual invocation of the handler will occur later after we've released + // the lock. + // + if (addr->handler) { + in_process_copy = dx_message_copy(msg); + handler = addr->handler; + handler_context = addr->handler_context; } // - // Forward to the next-hop for a remotely-connected consumer, if present. - // Don't forward if this is a "_local" address. + // If the address form is local (i.e. is prefixed by _local), don't forward + // outside of the router process. // - if (addr->rnode) { - // TODO + if (!is_local) { + // + // Forward to all of the local links receiving this address. + // + dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); + while (dest_link) { + pn_link_t *pn_outlink = dx_link_pn(dest_link->link); + dx_message_t *copy = dx_message_copy(msg); + DEQ_INSERT_TAIL(dest_link->out_fifo, copy); + pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo)); + dx_link_activate(dest_link->link); + dest_link = DEQ_NEXT(dest_link); + } + + // + // Forward to the next-hops for remote destinations. + // + dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); + while (dest_node) { + if (dest_node->next_hop) + dest_link = dest_node->next_hop->peer_link; + else + dest_link = dest_node->peer_link; + if (dest_link) { + pn_link_t *pn_outlink = dx_link_pn(dest_link->link); + dx_message_t *copy = dx_message_copy(msg); + DEQ_INSERT_TAIL(dest_link->out_fifo, copy); + pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo)); + dx_link_activate(dest_link->link); + } + dest_node = DEQ_NEXT(dest_node); + } } } else { // // To field contains an unknown address. Release the message. // + // TODO - Undeliverable processing pn_delivery_update(delivery, PN_RELEASED); pn_delivery_settle(delivery); } - sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher? - dx_free_message(msg); - // - // If this was a pre-settled delivery, we must also locally settle it. + // Since we are message-routing, there is no end-to-end disposition or + // settlement. Accept and settle the delivery now. // - if (pn_delivery_settled(delivery)) - pn_delivery_settle(delivery); + pn_delivery_update(delivery, PN_ACCEPTED); + pn_delivery_settle(delivery); } } else { // @@ -232,8 +303,16 @@ static void router_rx_handler(void* cont pn_delivery_update(delivery, PN_REJECTED); pn_delivery_settle(delivery); pn_delivery_set_context(delivery, 0); - dx_free_message(msg); } + + sys_mutex_unlock(router->lock); + dx_free_message(msg); + + // + // Invoke the in-process handler now that the lock is released. + // + if (handler) + handler(handler_context, in_process_copy); } @@ -242,7 +321,14 @@ static void router_rx_handler(void* cont */ static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *delivery) { - pn_link_t *pn_link = pn_delivery_link(delivery); + pn_link_t *pn_link = pn_delivery_link(delivery); + //dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + + // + // TODO - Propagate disposition and settlement between deliveries on a link-routed + // link pair. + // + return; if (pn_link_is_sender(pn_link)) { uint64_t disp = pn_delivery_remote_state(delivery); @@ -290,25 +376,35 @@ static void router_disp_handler(void* co */ static int router_incoming_link_handler(void* context, dx_link_t *link) { - dx_router_t *router = (dx_router_t*) context; - dx_link_item_t *item = new_dx_link_item_t(); - pn_link_t *pn_link = dx_link_pn(link); - - if (item) { - DEQ_ITEM_INIT(item); - item->link = link; + dx_router_t *router = (dx_router_t*) context; + dx_router_link_t *rlink = new_dx_router_link_t(); + pn_link_t *pn_link = dx_link_pn(link); - sys_mutex_lock(router->lock); - DEQ_INSERT_TAIL(router->in_links, item); - sys_mutex_unlock(router->lock); + DEQ_ITEM_INIT(rlink); + rlink->link_direction = DX_INCOMING; + rlink->link_type = DX_LINK_ENDPOINT; + rlink->owning_addr = 0; + rlink->link = link; + rlink->connected_link = 0; + rlink->peer_link = 0; + DEQ_INIT(rlink->out_fifo); // Won't be used + + dx_link_set_context(link, rlink); + + sys_mutex_lock(router->lock); + DEQ_INSERT_TAIL(router->in_links, rlink); + sys_mutex_unlock(router->lock); + + pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); + pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + pn_link_flow(pn_link, 1000); + pn_link_open(pn_link); + + // + // TODO - If the address has link-route semantics, create all associated + // links needed to go with this one. + // - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); - pn_link_flow(pn_link, 1000); - pn_link_open(pn_link); - } else { - pn_link_close(pn_link); - } return 0; } @@ -327,41 +423,44 @@ static int router_outgoing_link_handler( return 0; } - dx_router_link_t *rlink = new_dx_router_link_t(); - rlink->link = link; + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST); + dx_router_link_t *rlink = new_dx_router_link_t(); + + int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address); + + DEQ_ITEM_INIT(rlink); + rlink->link_direction = DX_OUTGOING; + rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT; + rlink->link = link; + rlink->connected_link = 0; + rlink->peer_link = 0; DEQ_INIT(rlink->out_fifo); + dx_link_set_context(link, rlink); + dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); dx_address_t *addr; - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); - sys_mutex_lock(router->lock); hash_retrieve(router->out_hash, iter, (void**) &addr); if (!addr) { addr = new_dx_address_t(); - addr->is_local = 0; addr->handler = 0; addr->handler_context = 0; - addr->rlink = 0; - addr->rnode = 0; + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); hash_insert(router->out_hash, iter, addr); } dx_field_iterator_free(iter); - if (addr->rlink == 0) { - addr->rlink = rlink; - pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); - pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); - pn_link_open(pn_link); - sys_mutex_unlock(router->lock); - dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt); - return 0; - } + rlink->owning_addr = addr; + DEQ_INSERT_TAIL(addr->rlinks, rlink); - dx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt); - pn_link_close(pn_link); + pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link)); + pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link)); + pn_link_open(pn_link); sys_mutex_unlock(router->lock); + dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt); return 0; } @@ -403,45 +502,36 @@ static int router_writable_link_handler( */ static int router_link_detach_handler(void* context, dx_link_t *link, int closed) { - dx_router_t *router = (dx_router_t*) context; - pn_link_t *pn_link = dx_link_pn(link); - const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); - dx_link_item_t *item; + dx_router_t *router = (dx_router_t*) context; + pn_link_t *pn_link = dx_link_pn(link); + dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link); + const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link)); - if (!r_tgt) + if (!rlink) return 0; sys_mutex_lock(router->lock); if (pn_link_is_sender(pn_link)) { - item = DEQ_HEAD(router->out_links); + DEQ_REMOVE(rlink->owning_addr->rlinks, rlink); - dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); - dx_address_t *addr; - if (iter) { - hash_retrieve(router->out_hash, iter, (void**) &addr); - if (addr) { - hash_remove(router->out_hash, iter); - free_dx_router_link_t(addr->rlink); - free_dx_address_t(addr); - dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); + if ((rlink->owning_addr->handler == 0) && + (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) && + (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) { + dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH); + dx_address_t *addr; + if (iter) { + hash_retrieve(router->out_hash, iter, (void**) &addr); + if (addr == rlink->owning_addr) { + hash_remove(router->out_hash, iter); + free_dx_router_link_t(rlink); + free_dx_address_t(addr); + dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt); + } + dx_field_iterator_free(iter); } - dx_field_iterator_free(iter); - } - } - else - item = DEQ_HEAD(router->in_links); - - while (item) { - if (item->link == link) { - if (pn_link_is_sender(pn_link)) - DEQ_REMOVE(router->out_links, item); - else - DEQ_REMOVE(router->in_links, item); - free_dx_link_item_t(item); - break; } - item = item->next; - } + } else + DEQ_REMOVE(router->in_links, rlink); sys_mutex_unlock(router->lock); return 0; @@ -455,6 +545,81 @@ static void router_inbound_open_handler( static void router_outbound_open_handler(void *type_context, dx_connection_t *conn) { + // TODO - Make sure this connection is annotated as an inter-router transport. + // Ignore otherwise + + dx_router_t *router = (dx_router_t*) type_context; + dx_field_iterator_t *aiter = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH); + dx_link_t *sender; + dx_link_t *receiver; + dx_router_link_t *rlink; + + // + // Create an incoming link and put it in the in-links collection. The address + // of the remote source of this link is '_local/qdxrouter'. + // + receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx"); + pn_terminus_set_address(dx_link_remote_source(receiver), router_address); + pn_terminus_set_address(dx_link_target(receiver), router_address); + + rlink = new_dx_router_link_t(); + + DEQ_ITEM_INIT(rlink); + rlink->link_direction = DX_INCOMING; + rlink->link_type = DX_LINK_ROUTER; + rlink->owning_addr = 0; + rlink->link = receiver; + rlink->connected_link = 0; + rlink->peer_link = 0; + DEQ_INIT(rlink->out_fifo); // Won't be used + + dx_link_set_context(receiver, rlink); + + sys_mutex_lock(router->lock); + DEQ_INSERT_TAIL(router->in_links, rlink); + sys_mutex_unlock(router->lock); + + // + // Create an outgoing link with a local source of '_local/qdxrouter' and place + // it in the routing table. + // + sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx"); + pn_terminus_set_address(dx_link_remote_target(sender), router_address); + pn_terminus_set_address(dx_link_source(sender), router_address); + + rlink = new_dx_router_link_t(); + + DEQ_ITEM_INIT(rlink); + rlink->link_direction = DX_OUTGOING; + rlink->link_type = DX_LINK_ROUTER; + rlink->link = sender; + rlink->connected_link = 0; + rlink->peer_link = 0; + DEQ_INIT(rlink->out_fifo); + + dx_link_set_context(sender, rlink); + + dx_address_t *addr; + + sys_mutex_lock(router->lock); + hash_retrieve(router->out_hash, aiter, (void**) &addr); + if (!addr) { + addr = new_dx_address_t(); + addr->handler = 0; + addr->handler_context = 0; + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); + hash_insert(router->out_hash, aiter, addr); + } + + rlink->owning_addr = addr; + DEQ_INSERT_TAIL(addr->rlinks, rlink); + sys_mutex_unlock(router->lock); + + pn_link_open(dx_link_pn(receiver)); + pn_link_open(dx_link_pn(sender)); + pn_link_flow(dx_link_pn(receiver), 1000); + dx_field_iterator_free(aiter); } @@ -494,22 +659,23 @@ dx_router_t *dx_router(dx_dispatch_t *dx } dx_router_t *router = NEW(dx_router_t); - dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH); - DEQ_INIT(router->in_links); - DEQ_INIT(router->out_links); - DEQ_INIT(router->in_fifo); + router_node.type_context = router; router->dx = dx; - router->lock = sys_mutex(); router->router_area = area; router->router_id = id; + router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH); + DEQ_INIT(router->in_links); + DEQ_INIT(router->routers); + DEQ_INIT(router->in_fifo); + router->lock = sys_mutex(); + router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); + router->out_hash = hash(10, 32, 0); + router->dtag = 1; + router->pyRouter = 0; + router->pyTick = 0; - router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); - - router->out_hash = hash(10, 32, 0); - router->dtag = 1; - router->pyRouter = 0; // // Inform the field iterator module of this router's id and area. The field iterator @@ -547,46 +713,44 @@ void dx_router_free(dx_router_t *router) dx_address_t *dx_router_register_address(dx_dispatch_t *dx, - bool is_local, const char *address, dx_router_message_cb handler, void *context) { - char addr[1000]; - dx_address_t *ad = new_dx_address_t(); + char addr_string[1000]; + dx_router_t *router = dx->router; + dx_address_t *addr; dx_field_iterator_t *iter; - int result; - if (!ad) - return 0; + strcpy(addr_string, "L"); // Local Hash-Key Space + strcat(addr_string, address); + iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST); - ad->is_local = is_local; - ad->handler = handler; - ad->handler_context = context; - ad->rlink = 0; - - if (ad->is_local) - strcpy(addr, "L"); // Local Hash-Key Space - else - strcpy(addr, "M"); // Mobile Hash-Key Space - - strcat(addr, address); - iter = dx_field_iterator_string(addr, ITER_VIEW_NO_HOST); - result = hash_insert(dx->router->out_hash, iter, ad); - dx_field_iterator_free(iter); - if (result != 0) { - free_dx_address_t(ad); - return 0; + sys_mutex_lock(router->lock); + hash_retrieve(router->out_hash, iter, (void**) &addr); + if (!addr) { + addr = new_dx_address_t(); + addr->handler = 0; + addr->handler_context = 0; + DEQ_INIT(addr->rlinks); + DEQ_INIT(addr->rnodes); + hash_insert(router->out_hash, iter, addr); } + dx_field_iterator_free(iter); + + addr->handler = handler; + addr->handler_context = context; + + sys_mutex_unlock(router->lock); dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address); - return ad; + return addr; } void dx_router_unregister_address(dx_address_t *ad) { - free_dx_address_t(ad); + //free_dx_address_t(ad); } @@ -601,12 +765,36 @@ void dx_router_send(dx_dispatch_t sys_mutex_lock(router->lock); hash_retrieve(router->out_hash, address, (void*) &addr); if (addr) { - if (addr->rlink) { - pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link); + // + // Forward to all of the local links receiving this address. + // + dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks); + while (dest_link) { + pn_link_t *pn_outlink = dx_link_pn(dest_link->link); dx_message_t *copy = dx_message_copy(msg); - DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy); - pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo)); - dx_link_activate(addr->rlink->link); + DEQ_INSERT_TAIL(dest_link->out_fifo, copy); + pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo)); + dx_link_activate(dest_link->link); + dest_link = DEQ_NEXT(dest_link); + } + + // + // Forward to the next-hops for remote destinations. + // + dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes); + while (dest_node) { + if (dest_node->next_hop) + dest_link = dest_node->next_hop->peer_link; + else + dest_link = dest_node->peer_link; + if (dest_link) { + pn_link_t *pn_outlink = dx_link_pn(dest_link->link); + dx_message_t *copy = dx_message_copy(msg); + DEQ_INSERT_TAIL(dest_link->out_fifo, copy); + pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo)); + dx_link_activate(dest_link->link); + } + dest_node = DEQ_NEXT(dest_node); } } sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher? Modified: qpid/trunk/qpid/extras/dispatch/src/server.c URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/server.c?rev=1502698&r1=1502697&r2=1502698&view=diff ============================================================================== --- qpid/trunk/qpid/extras/dispatch/src/server.c (original) +++ qpid/trunk/qpid/extras/dispatch/src/server.c Fri Jul 12 21:44:14 2013 @@ -91,26 +91,32 @@ static dx_thread_t *thread(dx_server_t * } -static void thread_process_listeners(pn_driver_t *driver) +static void thread_process_listeners(dx_server_t *dx_server) { + pn_driver_t *driver = dx_server->driver; pn_listener_t *listener = pn_driver_listener(driver); pn_connector_t *cxtr; dx_connection_t *ctx; while (listener) { - dx_log(module, LOG_TRACE, "Accepting Connection"); cxtr = pn_listener_accept(listener); + dx_log(module, LOG_TRACE, "Accepting Connection from %s", pn_connector_name(cxtr)); ctx = new_dx_connection_t(); ctx->state = CONN_STATE_OPENING; ctx->owner_thread = CONTEXT_NO_OWNER; ctx->enqueued = 0; ctx->pn_cxtr = cxtr; - ctx->pn_conn = 0; ctx->listener = (dx_listener_t*) pn_listener_context(listener); ctx->connector = 0; ctx->context = ctx->listener->context; ctx->ufd = 0; + pn_connection_t *conn = pn_connection(); + pn_connection_set_container(conn, dx_server->container_name); + pn_connector_set_connection(cxtr, conn); + pn_connection_set_context(conn, ctx); + ctx->pn_conn = conn; + // // Get a pointer to the transport so we can insert security components into it // @@ -201,20 +207,12 @@ static int process_connector(dx_server_t // Call the handler that is appropriate for the connector's state. // switch (ctx->state) { - case CONN_STATE_CONNECTING: - if (!pn_connector_closed(cxtr)) { - //ctx->state = CONN_STATE_SASL_CLIENT; - assert(ctx->connector); - ctx->connector->state = CXTR_STATE_OPEN; - events = 1; - } else { + case CONN_STATE_CONNECTING: { + if (pn_connector_closed(cxtr)) { ctx->state = CONN_STATE_FAILED; events = 0; + break; } - break; - - case CONN_STATE_OPENING: - ctx->state = CONN_STATE_OPERATIONAL; pn_connection_t *conn = pn_connection(); pn_connection_set_container(conn, dx_server->container_name); @@ -222,20 +220,71 @@ static int process_connector(dx_server_t pn_connection_set_context(conn, ctx); ctx->pn_conn = conn; - dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy + pn_transport_t *tport = pn_connector_transport(cxtr); + const dx_server_config_t *config = ctx->connector->config; - if (ctx->listener) { - ce = DX_CONN_EVENT_LISTENER_OPEN; - } else if (ctx->connector) { - ce = DX_CONN_EVENT_CONNECTOR_OPEN; - ctx->connector->delay = 0; - } else - assert(0); + // + // Set up SSL if appropriate + // + if (config->ssl_enabled) { + pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT); + pn_ssl_domain_set_credentials(domain, + config->ssl_certificate_file, + config->ssl_private_key_file, + config->ssl_password); + + if (config->ssl_require_peer_authentication) + pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, config->ssl_trusted_certificate_db); + + pn_ssl_t *ssl = pn_ssl(tport); + pn_ssl_init(ssl, domain, 0); + pn_ssl_domain_free(domain); + } - dx_server->conn_handler(dx_server->conn_handler_context, - ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); + // + // Set up SASL + // + pn_sasl_t *sasl = pn_sasl(tport); + pn_sasl_mechanisms(sasl, config->sasl_mechanisms); + pn_sasl_client(sasl); + + ctx->state = CONN_STATE_OPENING; + assert(ctx->connector); + ctx->connector->state = CXTR_STATE_OPEN; events = 1; break; + } + + case CONN_STATE_OPENING: { + pn_transport_t *tport = pn_connector_transport(cxtr); + pn_sasl_t *sasl = pn_sasl(tport); + + if (pn_sasl_outcome(sasl) == PN_SASL_OK) { + ctx->state = CONN_STATE_OPERATIONAL; + + dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy + + if (ctx->listener) { + ce = DX_CONN_EVENT_LISTENER_OPEN; + } else if (ctx->connector) { + ce = DX_CONN_EVENT_CONNECTOR_OPEN; + ctx->connector->delay = 0; + } else + assert(0); + + dx_server->conn_handler(dx_server->conn_handler_context, + ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr)); + events = 1; + break; + } + else if (pn_sasl_outcome(sasl) != PN_SASL_NONE) { + ctx->state = CONN_STATE_FAILED; + if (ctx->connector) { + const dx_server_config_t *config = ctx->connector->config; + dx_log(module, LOG_TRACE, "Connection to %s:%s failed", config->host, config->port); + } + } + } case CONN_STATE_OPERATIONAL: if (pn_connector_closed(cxtr)) { @@ -411,7 +460,7 @@ static void *thread_run(void *arg) // // Process listeners (incoming connections). // - thread_process_listeners(dx_server->driver); + thread_process_listeners(dx_server); // // Traverse the list of connectors-needing-service from the proton driver. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org