This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 97fe8ff22182ba56678dc6b766479c43e49ce2d5 Author: Ted Ross <tr...@apache.org> AuthorDate: Mon Jun 1 16:40:53 2020 -0400 Dataplane: Re-factored direct-AMQP to use the protocol-adaptor interface. --- include/qpid/dispatch/protocol_adaptor.h | 133 ++++++++++++++++++++++++------- src/router_core/connections.c | 130 ++++++++++++------------------ src/router_core/router_core.c | 49 ++++++++++++ src/router_core/router_core_private.h | 57 +++++++------ src/router_core/transfer.c | 4 +- src/router_node.c | 46 +++++++---- 6 files changed, 267 insertions(+), 152 deletions(-) diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h index 33916ab..9eb94c9 100644 --- a/include/qpid/dispatch/protocol_adaptor.h +++ b/include/qpid/dispatch/protocol_adaptor.h @@ -50,22 +50,73 @@ typedef struct qdr_connection_info_t qdr_connection_info_t; */ typedef void (*qdr_connection_activate_t) (void *context, qdr_connection_t *conn); -typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t *conn, qdr_link_t *link, - qdr_terminus_t *source, qdr_terminus_t *target, - qd_session_class_t); +/** + * qdr_link_first_attach_t callback + */ +typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t *conn, qdr_link_t *link, + qdr_terminus_t *source, qdr_terminus_t *target, + qd_session_class_t); + +/** + * qdr_link_second_attach_t callback + */ typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target); -typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close); -typedef void (*qdr_link_flow_t) (void *context, qdr_link_t *link, int credit); -typedef void (*qdr_link_offer_t) (void *context, qdr_link_t *link, int delivery_count); -typedef void (*qdr_link_drained_t) (void *context, qdr_link_t *link); -typedef void (*qdr_link_drain_t) (void *context, qdr_link_t *link, bool mode); -typedef int (*qdr_link_push_t) (void *context, qdr_link_t *link, int limit); -typedef uint64_t (*qdr_link_deliver_t) (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled); -typedef int (*qdr_link_get_credit_t) (void *context, qdr_link_t *link); -typedef void (*qdr_delivery_update_t) (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled); -typedef void (*qdr_connection_close_t) (void *context, qdr_connection_t *conn, qdr_error_t *error); -typedef void (*qdr_connection_trace_t) (void *context, qdr_connection_t *conn, bool trace); + +/** + * qdr_link_detach_t callback + */ +typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close); + +/** + * qdr_link_flow_t callback + */ +typedef void (*qdr_link_flow_t) (void *context, qdr_link_t *link, int credit); + +/** + * qdr_link_offer_t callback + */ +typedef void (*qdr_link_offer_t) (void *context, qdr_link_t *link, int delivery_count); + +/** + * qdr_link_drained_t callback + */ +typedef void (*qdr_link_drained_t) (void *context, qdr_link_t *link); + +/** + * qdr_link_drain_t callback + */ +typedef void (*qdr_link_drain_t) (void *context, qdr_link_t *link, bool mode); + +/** + * qdr_link_push_t callback + */ +typedef int (*qdr_link_push_t) (void *context, qdr_link_t *link, int limit); + +/** + * qdr_link_deliver_t callback + */ +typedef uint64_t (*qdr_link_deliver_t) (void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled); + +/** + * qdr_link_get_credit_t callback + */ +typedef int (*qdr_link_get_credit_t) (void *context, qdr_link_t *link); + +/** + * qdr_delivery_update_t callback + */ +typedef void (*qdr_delivery_update_t) (void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled); + +/** + * qdr_connection_close_t callback + */ +typedef void (*qdr_connection_close_t) (void *context, qdr_connection_t *conn, qdr_error_t *error); + +/** + * qdr_connection_trace_t callback + */ +typedef void (*qdr_connection_trace_t) (void *context, qdr_connection_t *conn, bool trace); /** @@ -74,6 +125,26 @@ typedef void (*qdr_connection_trace_t) (void *context, qdr_connection_t *conn, ****************************************************************************** */ +qdr_protocol_adaptor_t *qdr_protocol_adaptor(qdr_core_t *core, + const char *name, + void *context, + qdr_connection_activate_t activate, + qdr_link_first_attach_t first_attach, + qdr_link_second_attach_t second_attach, + qdr_link_detach_t detach, + qdr_link_flow_t flow, + qdr_link_offer_t offer, + qdr_link_drained_t drained, + qdr_link_drain_t drain, + qdr_link_push_t push, + qdr_link_deliver_t deliver, + qdr_link_get_credit_t get_credit, + qdr_delivery_update_t delivery_update, + qdr_connection_close_t conn_close, + qdr_connection_trace_t conn_trace); + +void qdr_protocol_adaptor_free(qdr_core_t *core, qdr_protocol_adaptor_t *adaptor); + /** ****************************************************************************** @@ -104,6 +175,7 @@ typedef void (*qdr_connection_bind_context_t) (qdr_connection_t *context, void* * Once a new connection has been both remotely and locally opened, the core must be notified. * * @param core Pointer to the core object + * @param protocol_adaptor Pointer to the protocol adaptor handling the connection * @param incoming True iff this connection is associated with a listener, False if a connector * @param role The configured role of this connection * @param cost If the role is inter_router, this is the configured cost for the connection. @@ -118,22 +190,23 @@ typedef void (*qdr_connection_bind_context_t) (qdr_connection_t *context, void* * @param vhost If non-null, this is the vhost of the connection to be used for multi-tenancy. * @return Pointer to a connection object that can be used to refer to this connection over its lifetime. */ -qdr_connection_t *qdr_connection_opened(qdr_core_t *core, - bool incoming, - qdr_connection_role_t role, - int cost, - uint64_t management_id, - const char *label, - const char *remote_container_id, - bool strip_annotations_in, - bool strip_annotations_out, - bool policy_allow_dynamic_link_routes, - bool policy_allow_admin_status_update, - int link_capacity, - const char *vhost, - qdr_connection_info_t *connection_info, - qdr_connection_bind_context_t context_binder, - void* bind_token); +qdr_connection_t *qdr_connection_opened(qdr_core_t *core, + qdr_protocol_adaptor_t *protocol_adaptor, + bool incoming, + qdr_connection_role_t role, + int cost, + uint64_t management_id, + const char *label, + const char *remote_container_id, + bool strip_annotations_in, + bool strip_annotations_out, + bool policy_allow_dynamic_link_routes, + bool policy_allow_admin_status_update, + int link_capacity, + const char *vhost, + qdr_connection_info_t *connection_info, + qdr_connection_bind_context_t context_binder, + void *bind_token); /** * qdr_connection_closed diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 2d05061..67ee1cb 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -67,27 +67,29 @@ qdr_terminus_t *qdr_terminus_router_data(void) // Interface Functions //================================================================================== -qdr_connection_t *qdr_connection_opened(qdr_core_t *core, - bool incoming, - qdr_connection_role_t role, - int cost, - uint64_t management_id, - const char *label, - const char *remote_container_id, - bool strip_annotations_in, - bool strip_annotations_out, - bool policy_allow_dynamic_link_routes, - bool policy_allow_admin_status_update, - int link_capacity, - const char *vhost, - qdr_connection_info_t *connection_info, +qdr_connection_t *qdr_connection_opened(qdr_core_t *core, + qdr_protocol_adaptor_t *protocol_adaptor, + bool incoming, + qdr_connection_role_t role, + int cost, + uint64_t management_id, + const char *label, + const char *remote_container_id, + bool strip_annotations_in, + bool strip_annotations_out, + bool policy_allow_dynamic_link_routes, + bool policy_allow_admin_status_update, + int link_capacity, + const char *vhost, + qdr_connection_info_t *connection_info, qdr_connection_bind_context_t context_binder, - void *bind_token) + void *bind_token) { qdr_action_t *action = qdr_action(qdr_connection_opened_CT, "connection_opened"); qdr_connection_t *conn = new_qdr_connection_t(); ZERO(conn); + conn->protocol_adaptor = protocol_adaptor; conn->identity = management_id; conn->connection_info = connection_info; conn->core = core; @@ -236,26 +238,28 @@ void qdr_record_link_credit(qdr_core_t *core, qdr_link_t *link) // // Get Proton's view of this link's available credit. // - int pn_credit = core->get_credit_handler(core->user_context, link); + if (link && link->conn && link->conn->protocol_adaptor) { + int pn_credit = link->conn->protocol_adaptor->get_credit_handler(link->conn->protocol_adaptor->user_context, link); - if (link->credit_reported > 0 && pn_credit == 0) { - // - // The link has transitioned from positive credit to zero credit. - // - link->zero_credit_time = core->uptime_ticks; - } else if (link->credit_reported == 0 && pn_credit > 0) { - // - // The link has transitioned from zero credit to positive credit. - // Clear the recorded time. - // - link->zero_credit_time = 0; - if (link->reported_as_blocked) { - link->reported_as_blocked = false; - core->links_blocked--; + if (link->credit_reported > 0 && pn_credit == 0) { + // + // The link has transitioned from positive credit to zero credit. + // + link->zero_credit_time = core->uptime_ticks; + } else if (link->credit_reported == 0 && pn_credit > 0) { + // + // The link has transitioned from zero credit to positive credit. + // Clear the recorded time. + // + link->zero_credit_time = 0; + if (link->reported_as_blocked) { + link->reported_as_blocked = false; + core->links_blocked--; + } } - } - link->credit_reported = pn_credit; + link->credit_reported = pn_credit; + } } @@ -273,7 +277,7 @@ int qdr_connection_process(qdr_connection_t *conn) int event_count = 0; if (conn->closed) { - core->conn_close_handler(core->user_context, conn, conn->error); + conn->protocol_adaptor->conn_close_handler(conn->protocol_adaptor->user_context, conn, conn->error); return 0; } @@ -302,19 +306,19 @@ int qdr_connection_process(qdr_connection_t *conn) switch (work->work_type) { case QDR_CONNECTION_WORK_FIRST_ATTACH : - core->first_attach_handler(core->user_context, conn, work->link, work->source, work->target, work->ssn_class); + conn->protocol_adaptor->first_attach_handler(conn->protocol_adaptor->user_context, conn, work->link, work->source, work->target, work->ssn_class); break; case QDR_CONNECTION_WORK_SECOND_ATTACH : - core->second_attach_handler(core->user_context, work->link, work->source, work->target); + conn->protocol_adaptor->second_attach_handler(conn->protocol_adaptor->user_context, work->link, work->source, work->target); break; case QDR_CONNECTION_WORK_TRACING_ON : - core->conn_trace_handler(core->user_context, conn, true); + conn->protocol_adaptor->conn_trace_handler(conn->protocol_adaptor->user_context, conn, true); break; case QDR_CONNECTION_WORK_TRACING_OFF : - core->conn_trace_handler(core->user_context, conn, false); + conn->protocol_adaptor->conn_trace_handler(conn->protocol_adaptor->user_context, conn, false); break; } @@ -353,7 +357,7 @@ int qdr_connection_process(qdr_connection_t *conn) qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries); while (dref) { - core->delivery_update_handler(core->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled); + conn->protocol_adaptor->delivery_update_handler(conn->protocol_adaptor->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled); qdr_delivery_decref(core, dref->dlv, "qdr_connection_process - remove from updated list"); qdr_del_delivery_ref(&updated_deliveries, dref); dref = DEQ_HEAD(updated_deliveries); @@ -364,7 +368,7 @@ int qdr_connection_process(qdr_connection_t *conn) switch (link_work->work_type) { case QDR_LINK_WORK_DELIVERY : { - int count = core->push_handler(core->user_context, link, link_work->value); + int count = conn->protocol_adaptor->push_handler(conn->protocol_adaptor->user_context, link, link_work->value); assert(count <= link_work->value); link_work->value -= count; break; @@ -372,20 +376,20 @@ int qdr_connection_process(qdr_connection_t *conn) case QDR_LINK_WORK_FLOW : if (link_work->value > 0) - core->flow_handler(core->user_context, link, link_work->value); + conn->protocol_adaptor->flow_handler(conn->protocol_adaptor->user_context, link, link_work->value); if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_SET) - core->drain_handler(core->user_context, link, true); + conn->protocol_adaptor->drain_handler(conn->protocol_adaptor->user_context, link, true); else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_CLEAR) - core->drain_handler(core->user_context, link, false); + conn->protocol_adaptor->drain_handler(conn->protocol_adaptor->user_context, link, false); else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_DRAINED) - core->drained_handler(core->user_context, link); + conn->protocol_adaptor->drained_handler(conn->protocol_adaptor->user_context, link); break; case QDR_LINK_WORK_FIRST_DETACH : case QDR_LINK_WORK_SECOND_DETACH : - core->detach_handler(core->user_context, link, link_work->error, - link_work->work_type == QDR_LINK_WORK_FIRST_DETACH, - link_work->close_link); + conn->protocol_adaptor->detach_handler(conn->protocol_adaptor->user_context, link, link_work->error, + link_work->work_type == QDR_LINK_WORK_FIRST_DETACH, + link_work->close_link); detach_sent = true; break; } @@ -632,40 +636,6 @@ static void qdr_link_processing_complete(qdr_core_t *core, qdr_link_t *link) -void qdr_connection_handlers(qdr_core_t *core, - void *context, - qdr_connection_activate_t activate, - qdr_link_first_attach_t first_attach, - qdr_link_second_attach_t second_attach, - qdr_link_detach_t detach, - qdr_link_flow_t flow, - qdr_link_offer_t offer, - qdr_link_drained_t drained, - qdr_link_drain_t drain, - qdr_link_push_t push, - qdr_link_deliver_t deliver, - qdr_link_get_credit_t get_credit, - qdr_delivery_update_t delivery_update, - qdr_connection_close_t conn_close, - qdr_connection_trace_t conn_trace) -{ - core->user_context = context; - core->first_attach_handler = first_attach; - core->second_attach_handler = second_attach; - core->detach_handler = detach; - core->flow_handler = flow; - core->offer_handler = offer; - core->drained_handler = drained; - core->drain_handler = drain; - core->push_handler = push; - core->deliver_handler = deliver; - core->get_credit_handler = get_credit; - core->delivery_update_handler = delivery_update; - core->conn_close_handler = conn_close; - core->conn_trace_handler = conn_trace; -} - - //================================================================================== // In-Thread Functions //================================================================================== diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index d2d24d9..08d2b7d 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -908,6 +908,7 @@ static void qdr_global_stats_request_CT(qdr_core_t *core, qdr_action_t *action, qdr_post_general_work_CT(core, work); } + void qdr_request_global_stats(qdr_core_t *core, qdr_global_stats_t *stats, qdr_global_stats_handler_t callback, void *context) { qdr_action_t *action = qdr_action(qdr_global_stats_request_CT, "global_stats_request"); @@ -917,3 +918,51 @@ void qdr_request_global_stats(qdr_core_t *core, qdr_global_stats_t *stats, qdr_g qdr_action_enqueue(core, action); } + +qdr_protocol_adaptor_t *qdr_protocol_adaptor(qdr_core_t *core, + const char *name, + void *context, + qdr_connection_activate_t activate, + qdr_link_first_attach_t first_attach, + qdr_link_second_attach_t second_attach, + qdr_link_detach_t detach, + qdr_link_flow_t flow, + qdr_link_offer_t offer, + qdr_link_drained_t drained, + qdr_link_drain_t drain, + qdr_link_push_t push, + qdr_link_deliver_t deliver, + qdr_link_get_credit_t get_credit, + qdr_delivery_update_t delivery_update, + qdr_connection_close_t conn_close, + qdr_connection_trace_t conn_trace) +{ + qdr_protocol_adaptor_t *adaptor = NEW(qdr_protocol_adaptor_t); + + DEQ_ITEM_INIT(adaptor); + adaptor->name = name; + adaptor->user_context = context; + adaptor->first_attach_handler = first_attach; + adaptor->second_attach_handler = second_attach; + adaptor->detach_handler = detach; + adaptor->flow_handler = flow; + adaptor->offer_handler = offer; + adaptor->drained_handler = drained; + adaptor->drain_handler = drain; + adaptor->push_handler = push; + adaptor->deliver_handler = deliver; + adaptor->get_credit_handler = get_credit; + adaptor->delivery_update_handler = delivery_update; + adaptor->conn_close_handler = conn_close; + adaptor->conn_trace_handler = conn_trace; + + DEQ_INSERT_TAIL(core->protocol_adaptors, adaptor); + return adaptor; +} + + +void qdr_protocol_adaptor_free(qdr_core_t *core, qdr_protocol_adaptor_t *adaptor) +{ + DEQ_REMOVE(core->protocol_adaptors, adaptor); + free(adaptor); +} diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 49d9650..f91ac9f 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -656,6 +656,7 @@ typedef enum { struct qdr_connection_t { DEQ_LINKS(qdr_connection_t); DEQ_LINKS_N(ACTIVATE, qdr_connection_t); + qdr_protocol_adaptor_t *protocol_adaptor; uint64_t identity; qdr_core_t *core; bool incoming; @@ -784,6 +785,33 @@ typedef struct qdr_priority_sheaf_t { int count; } qdr_priority_sheaf_t; + +struct qdr_protocol_adaptor_t { + DEQ_LINKS(qdr_protocol_adaptor_t); + const char *name; + + // + // Callbacks + // + void *user_context; + qdr_link_first_attach_t first_attach_handler; + qdr_link_second_attach_t second_attach_handler; + qdr_link_detach_t detach_handler; + qdr_link_flow_t flow_handler; + qdr_link_offer_t offer_handler; + qdr_link_drained_t drained_handler; + qdr_link_drain_t drain_handler; + qdr_link_push_t push_handler; + qdr_link_deliver_t deliver_handler; + qdr_link_get_credit_t get_credit_handler; + qdr_delivery_update_t delivery_update_handler; + qdr_connection_close_t conn_close_handler; + qdr_connection_trace_t conn_trace_handler; +}; + +DEQ_DECLARE(qdr_protocol_adaptor_t, qdr_protocol_adaptor_list_t); + + struct qdr_core_t { qd_dispatch_t *qd; qd_log_source_t *log; @@ -801,11 +829,12 @@ struct qdr_core_t { qd_timer_t *work_timer; uint32_t uptime_ticks; - qdr_connection_list_t open_connections; - qdr_connection_t *active_edge_connection; - qdr_connection_list_t connections_to_activate; - qdr_link_list_t open_links; - qdr_connection_ref_list_t streaming_connections; + qdr_protocol_adaptor_list_t protocol_adaptors; + qdr_connection_list_t open_connections; + qdr_connection_t *active_edge_connection; + qdr_connection_list_t connections_to_activate; + qdr_link_list_t open_links; + qdr_connection_ref_list_t streaming_connections; qdrc_attach_addr_lookup_t addr_lookup_handler; void *addr_lookup_context; @@ -821,24 +850,6 @@ struct qdr_core_t { qdr_link_lost_t rt_link_lost; // - // Connection section - // - void *user_context; - qdr_link_first_attach_t first_attach_handler; - qdr_link_second_attach_t second_attach_handler; - qdr_link_detach_t detach_handler; - qdr_link_flow_t flow_handler; - qdr_link_offer_t offer_handler; - qdr_link_drained_t drained_handler; - qdr_link_drain_t drain_handler; - qdr_link_push_t push_handler; - qdr_link_deliver_t deliver_handler; - qdr_link_get_credit_t get_credit_handler; - qdr_delivery_update_t delivery_update_handler; - qdr_connection_close_t conn_close_handler; - qdr_connection_trace_t conn_trace_handler; - - // // Events section // qdrc_event_subscription_list_t conn_event_subscriptions; diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 7d7aa9c..add176a 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -163,7 +163,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) do { settled = dlv->settled; sys_mutex_unlock(conn->work_lock); - new_disp = core->deliver_handler(core->user_context, link, dlv, settled); + new_disp = conn->protocol_adaptor->deliver_handler(conn->protocol_adaptor->user_context, link, dlv, settled); sys_mutex_lock(conn->work_lock); } while (settled != dlv->settled); // oops missed the settlement send_complete = qdr_delivery_send_complete(dlv); @@ -238,7 +238,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) } if (offer != -1) - core->offer_handler(core->user_context, link, offer); + conn->protocol_adaptor->offer_handler(conn->protocol_adaptor->user_context, link, offer); } return num_deliveries_completed; diff --git a/src/router_node.c b/src/router_node.c index ce59329..a5278dc 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -37,6 +37,8 @@ const char *QD_ROUTER_NODE_TYPE = "router.node"; const char *QD_ROUTER_ADDRESS_TYPE = "router.address"; const char *QD_ROUTER_LINK_TYPE = "router.link"; +static qdr_protocol_adaptor_t *amqp_direct_adaptor = 0; + static char *router_role = "inter-router"; static char *container_role = "route-container"; static char *edge_role = "edge"; @@ -1199,7 +1201,13 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool is_ssl, (rversion_found) ? &rversion : 0); - qdr_connection_opened(router->router_core, inbound, role, cost, connection_id, name, + qdr_connection_opened(router->router_core, + amqp_direct_adaptor, + inbound, + role, + cost, + connection_id, + name, pn_connection_remote_container(pn_conn), conn->strip_annotations_in, conn->strip_annotations_out, @@ -1208,7 +1216,8 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool link_capacity, vhost, connection_info, - bind_connection_context, conn); + bind_connection_context, + conn); char props_str[1000]; size_t props_len = 1000; @@ -1968,21 +1977,23 @@ void qd_router_setup_late(qd_dispatch_t *qd) qd->router->tracemask = qd_tracemask(); qd->router->router_core = qdr_core(qd, qd->router->router_mode, qd->router->router_area, qd->router->router_id); - qdr_connection_handlers(qd->router->router_core, (void*) qd->router, - CORE_connection_activate, - CORE_link_first_attach, - CORE_link_second_attach, - CORE_link_detach, - CORE_link_flow, - CORE_link_offer, - CORE_link_drained, - CORE_link_drain, - CORE_link_push, - CORE_link_deliver, - CORE_link_get_credit, - CORE_delivery_update, - CORE_close_connection, - CORE_conn_trace); + amqp_direct_adaptor = qdr_protocol_adaptor(qd->router->router_core, + "amqp-direct", + (void*) qd->router, + CORE_connection_activate, + CORE_link_first_attach, + CORE_link_second_attach, + CORE_link_detach, + CORE_link_flow, + CORE_link_offer, + CORE_link_drained, + CORE_link_drain, + CORE_link_push, + CORE_link_deliver, + CORE_link_get_credit, + CORE_delivery_update, + CORE_close_connection, + CORE_conn_trace); qd_router_python_setup(qd->router); qd_timer_schedule(qd->router->timer, 1000); @@ -1994,6 +2005,7 @@ void qd_router_free(qd_router_t *router) qd_container_set_default_node_type(router->qd, 0, 0, QD_DIST_BOTH); + qdr_protocol_adaptor_free(router->router_core, amqp_direct_adaptor); qdr_core_free(router->router_core); qd_tracemask_free(router->tracemask); qd_timer_free(router->timer); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org