This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 97fe8ff22182ba56678dc6b766479c43e49ce2d5
Author: Ted Ross <tr...@apache.org>
AuthorDate: Mon Jun 1 16:40:53 2020 -0400

    Dataplane: Re-factored direct-AMQP to use the protocol-adaptor interface.
---
 include/qpid/dispatch/protocol_adaptor.h | 133 ++++++++++++++++++++++++-------
 src/router_core/connections.c            | 130 ++++++++++++------------------
 src/router_core/router_core.c            |  49 ++++++++++++
 src/router_core/router_core_private.h    |  57 +++++++------
 src/router_core/transfer.c               |   4 +-
 src/router_node.c                        |  46 +++++++----
 6 files changed, 267 insertions(+), 152 deletions(-)

diff --git a/include/qpid/dispatch/protocol_adaptor.h 
b/include/qpid/dispatch/protocol_adaptor.h
index 33916ab..9eb94c9 100644
--- a/include/qpid/dispatch/protocol_adaptor.h
+++ b/include/qpid/dispatch/protocol_adaptor.h
@@ -50,22 +50,73 @@ typedef struct qdr_connection_info_t   
qdr_connection_info_t;
  */
 typedef void (*qdr_connection_activate_t) (void *context, qdr_connection_t 
*conn);
 
-typedef void (*qdr_link_first_attach_t)  (void *context, qdr_connection_t 
*conn, qdr_link_t *link,
-                                          qdr_terminus_t *source, 
qdr_terminus_t *target,
-                                          qd_session_class_t);
+/**
+ * qdr_link_first_attach_t callback
+ */
+typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t 
*conn, qdr_link_t *link,
+                                         qdr_terminus_t *source, 
qdr_terminus_t *target,
+                                         qd_session_class_t);
+
+/**
+ * qdr_link_second_attach_t callback
+ */
 typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link,
                                           qdr_terminus_t *source, 
qdr_terminus_t *target);
-typedef void (*qdr_link_detach_t)        (void *context, qdr_link_t *link, 
qdr_error_t *error, bool first, bool close);
-typedef void (*qdr_link_flow_t)          (void *context, qdr_link_t *link, int 
credit);
-typedef void (*qdr_link_offer_t)         (void *context, qdr_link_t *link, int 
delivery_count);
-typedef void (*qdr_link_drained_t)       (void *context, qdr_link_t *link);
-typedef void (*qdr_link_drain_t)         (void *context, qdr_link_t *link, 
bool mode);
-typedef int  (*qdr_link_push_t)          (void *context, qdr_link_t *link, int 
limit);
-typedef uint64_t (*qdr_link_deliver_t)   (void *context, qdr_link_t *link, 
qdr_delivery_t *delivery, bool settled);
-typedef int (*qdr_link_get_credit_t)     (void *context, qdr_link_t *link);
-typedef void (*qdr_delivery_update_t)    (void *context, qdr_delivery_t *dlv, 
uint64_t disp, bool settled);
-typedef void (*qdr_connection_close_t)   (void *context, qdr_connection_t 
*conn, qdr_error_t *error);
-typedef void (*qdr_connection_trace_t)   (void *context, qdr_connection_t 
*conn, bool trace);
+
+/**
+ * qdr_link_detach_t callback
+ */
+typedef void (*qdr_link_detach_t) (void *context, qdr_link_t *link, 
qdr_error_t *error, bool first, bool close);
+
+/**
+ * qdr_link_flow_t callback
+ */
+typedef void (*qdr_link_flow_t) (void *context, qdr_link_t *link, int credit);
+
+/**
+ * qdr_link_offer_t callback
+ */
+typedef void (*qdr_link_offer_t) (void *context, qdr_link_t *link, int 
delivery_count);
+
+/**
+ * qdr_link_drained_t callback
+ */
+typedef void (*qdr_link_drained_t) (void *context, qdr_link_t *link);
+
+/**
+ * qdr_link_drain_t callback
+ */
+typedef void (*qdr_link_drain_t) (void *context, qdr_link_t *link, bool mode);
+
+/**
+ * qdr_link_push_t callback
+ */
+typedef int  (*qdr_link_push_t) (void *context, qdr_link_t *link, int limit);
+
+/**
+ * qdr_link_deliver_t callback
+ */
+typedef uint64_t (*qdr_link_deliver_t) (void *context, qdr_link_t *link, 
qdr_delivery_t *delivery, bool settled);
+
+/**
+ * qdr_link_get_credit_t callback
+ */
+typedef int (*qdr_link_get_credit_t) (void *context, qdr_link_t *link);
+
+/**
+ * qdr_delivery_update_t callback
+ */
+typedef void (*qdr_delivery_update_t) (void *context, qdr_delivery_t *dlv, 
uint64_t disp, bool settled);
+
+/**
+ * qdr_connection_close_t callback
+ */
+typedef void (*qdr_connection_close_t) (void *context, qdr_connection_t *conn, 
qdr_error_t *error);
+
+/**
+ * qdr_connection_trace_t callback
+ */
+typedef void (*qdr_connection_trace_t) (void *context, qdr_connection_t *conn, 
bool trace);
 
 
 /**
@@ -74,6 +125,26 @@ typedef void (*qdr_connection_trace_t)   (void *context, 
qdr_connection_t *conn,
  ******************************************************************************
  */
 
+qdr_protocol_adaptor_t *qdr_protocol_adaptor(qdr_core_t                *core,
+                                             const char                *name,
+                                             void                      
*context,
+                                             qdr_connection_activate_t  
activate,
+                                             qdr_link_first_attach_t    
first_attach,
+                                             qdr_link_second_attach_t   
second_attach,
+                                             qdr_link_detach_t          detach,
+                                             qdr_link_flow_t            flow,
+                                             qdr_link_offer_t           offer,
+                                             qdr_link_drained_t         
drained,
+                                             qdr_link_drain_t           drain,
+                                             qdr_link_push_t            push,
+                                             qdr_link_deliver_t         
deliver,
+                                             qdr_link_get_credit_t      
get_credit,
+                                             qdr_delivery_update_t      
delivery_update,
+                                             qdr_connection_close_t     
conn_close,
+                                             qdr_connection_trace_t     
conn_trace);
+
+void qdr_protocol_adaptor_free(qdr_core_t *core, qdr_protocol_adaptor_t 
*adaptor);
+
 
 /**
  ******************************************************************************
@@ -104,6 +175,7 @@ typedef void (*qdr_connection_bind_context_t) 
(qdr_connection_t *context, void*
  * Once a new connection has been both remotely and locally opened, the core 
must be notified.
  *
  * @param core Pointer to the core object
+ * @param protocol_adaptor Pointer to the protocol adaptor handling the 
connection
  * @param incoming True iff this connection is associated with a listener, 
False if a connector
  * @param role The configured role of this connection
  * @param cost If the role is inter_router, this is the configured cost for 
the connection.
@@ -118,22 +190,23 @@ typedef void (*qdr_connection_bind_context_t) 
(qdr_connection_t *context, void*
  * @param vhost If non-null, this is the vhost of the connection to be used 
for multi-tenancy.
  * @return Pointer to a connection object that can be used to refer to this 
connection over its lifetime.
  */
-qdr_connection_t *qdr_connection_opened(qdr_core_t            *core,
-                                        bool                   incoming,
-                                        qdr_connection_role_t  role,
-                                        int                    cost,
-                                        uint64_t               management_id,
-                                        const char            *label,
-                                        const char            
*remote_container_id,
-                                        bool                   
strip_annotations_in,
-                                        bool                   
strip_annotations_out,
-                                        bool                   
policy_allow_dynamic_link_routes,
-                                        bool                   
policy_allow_admin_status_update,
-                                        int                    link_capacity,
-                                        const char            *vhost,
-                                        qdr_connection_info_t *connection_info,
-                                        qdr_connection_bind_context_t 
context_binder,
-                                        void* bind_token);
+qdr_connection_t *qdr_connection_opened(qdr_core_t                    *core,
+                                        qdr_protocol_adaptor_t        
*protocol_adaptor,
+                                        bool                           
incoming,
+                                        qdr_connection_role_t          role,
+                                        int                            cost,
+                                        uint64_t                       
management_id,
+                                        const char                    *label,
+                                        const char                    
*remote_container_id,
+                                        bool                           
strip_annotations_in,
+                                        bool                           
strip_annotations_out,
+                                        bool                           
policy_allow_dynamic_link_routes,
+                                        bool                           
policy_allow_admin_status_update,
+                                        int                            
link_capacity,
+                                        const char                    *vhost,
+                                        qdr_connection_info_t         
*connection_info,
+                                        qdr_connection_bind_context_t  
context_binder,
+                                        void                          
*bind_token);
 
 /**
  * qdr_connection_closed
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 2d05061..67ee1cb 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -67,27 +67,29 @@ qdr_terminus_t *qdr_terminus_router_data(void)
 // Interface Functions
 
//==================================================================================
 
-qdr_connection_t *qdr_connection_opened(qdr_core_t            *core,
-                                        bool                   incoming,
-                                        qdr_connection_role_t  role,
-                                        int                    cost,
-                                        uint64_t               management_id,
-                                        const char            *label,
-                                        const char            
*remote_container_id,
-                                        bool                   
strip_annotations_in,
-                                        bool                   
strip_annotations_out,
-                                        bool                   
policy_allow_dynamic_link_routes,
-                                        bool                   
policy_allow_admin_status_update,
-                                        int                    link_capacity,
-                                        const char            *vhost,
-                                        qdr_connection_info_t *connection_info,
+qdr_connection_t *qdr_connection_opened(qdr_core_t                   *core,
+                                        qdr_protocol_adaptor_t       
*protocol_adaptor,
+                                        bool                          incoming,
+                                        qdr_connection_role_t         role,
+                                        int                           cost,
+                                        uint64_t                      
management_id,
+                                        const char                   *label,
+                                        const char                   
*remote_container_id,
+                                        bool                          
strip_annotations_in,
+                                        bool                          
strip_annotations_out,
+                                        bool                          
policy_allow_dynamic_link_routes,
+                                        bool                          
policy_allow_admin_status_update,
+                                        int                           
link_capacity,
+                                        const char                   *vhost,
+                                        qdr_connection_info_t        
*connection_info,
                                         qdr_connection_bind_context_t 
context_binder,
-                                        void                  *bind_token)
+                                        void                         
*bind_token)
 {
     qdr_action_t     *action = qdr_action(qdr_connection_opened_CT, 
"connection_opened");
     qdr_connection_t *conn   = new_qdr_connection_t();
 
     ZERO(conn);
+    conn->protocol_adaptor      = protocol_adaptor;
     conn->identity              = management_id;
     conn->connection_info       = connection_info;
     conn->core                  = core;
@@ -236,26 +238,28 @@ void qdr_record_link_credit(qdr_core_t *core, qdr_link_t 
*link)
     //
     // Get Proton's view of this link's available credit.
     //
-    int pn_credit = core->get_credit_handler(core->user_context, link);
+    if (link && link->conn && link->conn->protocol_adaptor) {
+        int pn_credit = 
link->conn->protocol_adaptor->get_credit_handler(link->conn->protocol_adaptor->user_context,
 link);
 
-    if (link->credit_reported > 0 && pn_credit == 0) {
-        //
-        // The link has transitioned from positive credit to zero credit.
-        //
-        link->zero_credit_time = core->uptime_ticks;
-    } else if (link->credit_reported == 0 && pn_credit > 0) {
-        //
-        // The link has transitioned from zero credit to positive credit.
-        // Clear the recorded time.
-        //
-        link->zero_credit_time = 0;
-        if (link->reported_as_blocked) {
-            link->reported_as_blocked = false;
-            core->links_blocked--;
+        if (link->credit_reported > 0 && pn_credit == 0) {
+            //
+            // The link has transitioned from positive credit to zero credit.
+            //
+            link->zero_credit_time = core->uptime_ticks;
+        } else if (link->credit_reported == 0 && pn_credit > 0) {
+            //
+            // The link has transitioned from zero credit to positive credit.
+            // Clear the recorded time.
+            //
+            link->zero_credit_time = 0;
+            if (link->reported_as_blocked) {
+                link->reported_as_blocked = false;
+                core->links_blocked--;
+            }
         }
-    }
 
-    link->credit_reported = pn_credit;
+        link->credit_reported = pn_credit;
+    }
 }
 
 
@@ -273,7 +277,7 @@ int qdr_connection_process(qdr_connection_t *conn)
     int event_count = 0;
 
     if (conn->closed) {
-        core->conn_close_handler(core->user_context, conn, conn->error);
+        
conn->protocol_adaptor->conn_close_handler(conn->protocol_adaptor->user_context,
 conn, conn->error);
         return 0;
     }
 
@@ -302,19 +306,19 @@ int qdr_connection_process(qdr_connection_t *conn)
 
         switch (work->work_type) {
         case QDR_CONNECTION_WORK_FIRST_ATTACH :
-            core->first_attach_handler(core->user_context, conn, work->link, 
work->source, work->target, work->ssn_class);
+            
conn->protocol_adaptor->first_attach_handler(conn->protocol_adaptor->user_context,
 conn, work->link, work->source, work->target, work->ssn_class);
             break;
 
         case QDR_CONNECTION_WORK_SECOND_ATTACH :
-            core->second_attach_handler(core->user_context, work->link, 
work->source, work->target);
+            
conn->protocol_adaptor->second_attach_handler(conn->protocol_adaptor->user_context,
 work->link, work->source, work->target);
             break;
 
         case QDR_CONNECTION_WORK_TRACING_ON :
-            core->conn_trace_handler(core->user_context, conn, true);
+            
conn->protocol_adaptor->conn_trace_handler(conn->protocol_adaptor->user_context,
 conn, true);
             break;
 
         case QDR_CONNECTION_WORK_TRACING_OFF :
-            core->conn_trace_handler(core->user_context, conn, false);
+            
conn->protocol_adaptor->conn_trace_handler(conn->protocol_adaptor->user_context,
 conn, false);
             break;
 
         }
@@ -353,7 +357,7 @@ int qdr_connection_process(qdr_connection_t *conn)
 
             qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries);
             while (dref) {
-                core->delivery_update_handler(core->user_context, dref->dlv, 
dref->dlv->disposition, dref->dlv->settled);
+                
conn->protocol_adaptor->delivery_update_handler(conn->protocol_adaptor->user_context,
 dref->dlv, dref->dlv->disposition, dref->dlv->settled);
                 qdr_delivery_decref(core, dref->dlv, "qdr_connection_process - 
remove from updated list");
                 qdr_del_delivery_ref(&updated_deliveries, dref);
                 dref = DEQ_HEAD(updated_deliveries);
@@ -364,7 +368,7 @@ int qdr_connection_process(qdr_connection_t *conn)
                 switch (link_work->work_type) {
                 case QDR_LINK_WORK_DELIVERY :
                     {
-                        int count = core->push_handler(core->user_context, 
link, link_work->value);
+                        int count = 
conn->protocol_adaptor->push_handler(conn->protocol_adaptor->user_context, 
link, link_work->value);
                         assert(count <= link_work->value);
                         link_work->value -= count;
                         break;
@@ -372,20 +376,20 @@ int qdr_connection_process(qdr_connection_t *conn)
 
                 case QDR_LINK_WORK_FLOW :
                     if (link_work->value > 0)
-                        core->flow_handler(core->user_context, link, 
link_work->value);
+                        
conn->protocol_adaptor->flow_handler(conn->protocol_adaptor->user_context, 
link, link_work->value);
                     if      (link_work->drain_action == 
QDR_LINK_WORK_DRAIN_ACTION_SET)
-                        core->drain_handler(core->user_context, link, true);
+                        
conn->protocol_adaptor->drain_handler(conn->protocol_adaptor->user_context, 
link, true);
                     else if (link_work->drain_action == 
QDR_LINK_WORK_DRAIN_ACTION_CLEAR)
-                        core->drain_handler(core->user_context, link, false);
+                        
conn->protocol_adaptor->drain_handler(conn->protocol_adaptor->user_context, 
link, false);
                     else if (link_work->drain_action == 
QDR_LINK_WORK_DRAIN_ACTION_DRAINED)
-                        core->drained_handler(core->user_context, link);
+                        
conn->protocol_adaptor->drained_handler(conn->protocol_adaptor->user_context, 
link);
                     break;
 
                 case QDR_LINK_WORK_FIRST_DETACH :
                 case QDR_LINK_WORK_SECOND_DETACH :
-                    core->detach_handler(core->user_context, link, 
link_work->error,
-                                         link_work->work_type == 
QDR_LINK_WORK_FIRST_DETACH,
-                                         link_work->close_link);
+                    
conn->protocol_adaptor->detach_handler(conn->protocol_adaptor->user_context, 
link, link_work->error,
+                                                           
link_work->work_type == QDR_LINK_WORK_FIRST_DETACH,
+                                                           
link_work->close_link);
                     detach_sent = true;
                     break;
                 }
@@ -632,40 +636,6 @@ static void qdr_link_processing_complete(qdr_core_t *core, 
qdr_link_t *link)
 
 
 
-void qdr_connection_handlers(qdr_core_t                *core,
-                             void                      *context,
-                             qdr_connection_activate_t  activate,
-                             qdr_link_first_attach_t    first_attach,
-                             qdr_link_second_attach_t   second_attach,
-                             qdr_link_detach_t          detach,
-                             qdr_link_flow_t            flow,
-                             qdr_link_offer_t           offer,
-                             qdr_link_drained_t         drained,
-                             qdr_link_drain_t           drain,
-                             qdr_link_push_t            push,
-                             qdr_link_deliver_t         deliver,
-                             qdr_link_get_credit_t      get_credit,
-                             qdr_delivery_update_t      delivery_update,
-                             qdr_connection_close_t     conn_close,
-                             qdr_connection_trace_t     conn_trace)
-{
-    core->user_context            = context;
-    core->first_attach_handler    = first_attach;
-    core->second_attach_handler   = second_attach;
-    core->detach_handler          = detach;
-    core->flow_handler            = flow;
-    core->offer_handler           = offer;
-    core->drained_handler         = drained;
-    core->drain_handler           = drain;
-    core->push_handler            = push;
-    core->deliver_handler         = deliver;
-    core->get_credit_handler      = get_credit;
-    core->delivery_update_handler = delivery_update;
-    core->conn_close_handler      = conn_close;
-    core->conn_trace_handler      = conn_trace;
-}
-
-
 
//==================================================================================
 // In-Thread Functions
 
//==================================================================================
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index d2d24d9..08d2b7d 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -908,6 +908,7 @@ static void qdr_global_stats_request_CT(qdr_core_t *core, 
qdr_action_t *action,
     qdr_post_general_work_CT(core, work);
 }
 
+
 void qdr_request_global_stats(qdr_core_t *core, qdr_global_stats_t *stats, 
qdr_global_stats_handler_t callback, void *context)
 {
     qdr_action_t *action = qdr_action(qdr_global_stats_request_CT, 
"global_stats_request");
@@ -917,3 +918,51 @@ void qdr_request_global_stats(qdr_core_t *core, 
qdr_global_stats_t *stats, qdr_g
     qdr_action_enqueue(core, action);
 }
 
+
+qdr_protocol_adaptor_t *qdr_protocol_adaptor(qdr_core_t                *core,
+                                             const char                *name,
+                                             void                      
*context,
+                                             qdr_connection_activate_t  
activate,
+                                             qdr_link_first_attach_t    
first_attach,
+                                             qdr_link_second_attach_t   
second_attach,
+                                             qdr_link_detach_t          detach,
+                                             qdr_link_flow_t            flow,
+                                             qdr_link_offer_t           offer,
+                                             qdr_link_drained_t         
drained,
+                                             qdr_link_drain_t           drain,
+                                             qdr_link_push_t            push,
+                                             qdr_link_deliver_t         
deliver,
+                                             qdr_link_get_credit_t      
get_credit,
+                                             qdr_delivery_update_t      
delivery_update,
+                                             qdr_connection_close_t     
conn_close,
+                                             qdr_connection_trace_t     
conn_trace)
+{
+    qdr_protocol_adaptor_t *adaptor = NEW(qdr_protocol_adaptor_t);
+
+    DEQ_ITEM_INIT(adaptor);
+    adaptor->name                    = name;
+    adaptor->user_context            = context;
+    adaptor->first_attach_handler    = first_attach;
+    adaptor->second_attach_handler   = second_attach;
+    adaptor->detach_handler          = detach;
+    adaptor->flow_handler            = flow;
+    adaptor->offer_handler           = offer;
+    adaptor->drained_handler         = drained;
+    adaptor->drain_handler           = drain;
+    adaptor->push_handler            = push;
+    adaptor->deliver_handler         = deliver;
+    adaptor->get_credit_handler      = get_credit;
+    adaptor->delivery_update_handler = delivery_update;
+    adaptor->conn_close_handler      = conn_close;
+    adaptor->conn_trace_handler      = conn_trace;
+
+    DEQ_INSERT_TAIL(core->protocol_adaptors, adaptor);
+    return adaptor;
+}
+
+
+void qdr_protocol_adaptor_free(qdr_core_t *core, qdr_protocol_adaptor_t 
*adaptor)
+{
+    DEQ_REMOVE(core->protocol_adaptors, adaptor);
+    free(adaptor);
+}
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 49d9650..f91ac9f 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -656,6 +656,7 @@ typedef enum {
 struct qdr_connection_t {
     DEQ_LINKS(qdr_connection_t);
     DEQ_LINKS_N(ACTIVATE, qdr_connection_t);
+    qdr_protocol_adaptor_t     *protocol_adaptor;
     uint64_t                    identity;
     qdr_core_t                 *core;
     bool                        incoming;
@@ -784,6 +785,33 @@ typedef struct qdr_priority_sheaf_t {
     int count;
 } qdr_priority_sheaf_t;
 
+
+struct qdr_protocol_adaptor_t {
+    DEQ_LINKS(qdr_protocol_adaptor_t);
+    const char *name;
+
+    //
+    // Callbacks
+    //
+    void                     *user_context;
+    qdr_link_first_attach_t   first_attach_handler;
+    qdr_link_second_attach_t  second_attach_handler;
+    qdr_link_detach_t         detach_handler;
+    qdr_link_flow_t           flow_handler;
+    qdr_link_offer_t          offer_handler;
+    qdr_link_drained_t        drained_handler;
+    qdr_link_drain_t          drain_handler;
+    qdr_link_push_t           push_handler;
+    qdr_link_deliver_t        deliver_handler;
+    qdr_link_get_credit_t     get_credit_handler;
+    qdr_delivery_update_t     delivery_update_handler;
+    qdr_connection_close_t    conn_close_handler;
+    qdr_connection_trace_t    conn_trace_handler;
+};
+
+DEQ_DECLARE(qdr_protocol_adaptor_t, qdr_protocol_adaptor_list_t);
+
+
 struct qdr_core_t {
     qd_dispatch_t     *qd;
     qd_log_source_t   *log;
@@ -801,11 +829,12 @@ struct qdr_core_t {
     qd_timer_t              *work_timer;
     uint32_t                 uptime_ticks;
 
-    qdr_connection_list_t      open_connections;
-    qdr_connection_t          *active_edge_connection;
-    qdr_connection_list_t      connections_to_activate;
-    qdr_link_list_t            open_links;
-    qdr_connection_ref_list_t  streaming_connections;
+    qdr_protocol_adaptor_list_t  protocol_adaptors;
+    qdr_connection_list_t        open_connections;
+    qdr_connection_t            *active_edge_connection;
+    qdr_connection_list_t        connections_to_activate;
+    qdr_link_list_t              open_links;
+    qdr_connection_ref_list_t    streaming_connections;
 
     qdrc_attach_addr_lookup_t  addr_lookup_handler;
     void                      *addr_lookup_context;
@@ -821,24 +850,6 @@ struct qdr_core_t {
     qdr_link_lost_t          rt_link_lost;
 
     //
-    // Connection section
-    //
-    void                     *user_context;
-    qdr_link_first_attach_t   first_attach_handler;
-    qdr_link_second_attach_t  second_attach_handler;
-    qdr_link_detach_t         detach_handler;
-    qdr_link_flow_t           flow_handler;
-    qdr_link_offer_t          offer_handler;
-    qdr_link_drained_t        drained_handler;
-    qdr_link_drain_t          drain_handler;
-    qdr_link_push_t           push_handler;
-    qdr_link_deliver_t        deliver_handler;
-    qdr_link_get_credit_t     get_credit_handler;
-    qdr_delivery_update_t     delivery_update_handler;
-    qdr_connection_close_t    conn_close_handler;
-    qdr_connection_trace_t    conn_trace_handler;
-
-    //
     // Events section
     //
     qdrc_event_subscription_list_t conn_event_subscriptions;
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 7d7aa9c..add176a 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -163,7 +163,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
                 do {
                     settled = dlv->settled;
                     sys_mutex_unlock(conn->work_lock);
-                    new_disp = core->deliver_handler(core->user_context, link, 
dlv, settled);
+                    new_disp = 
conn->protocol_adaptor->deliver_handler(conn->protocol_adaptor->user_context, 
link, dlv, settled);
                     sys_mutex_lock(conn->work_lock);
                 } while (settled != dlv->settled);  // oops missed the 
settlement
                 send_complete = qdr_delivery_send_complete(dlv);
@@ -238,7 +238,7 @@ int qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
         }
 
         if (offer != -1)
-            core->offer_handler(core->user_context, link, offer);
+            
conn->protocol_adaptor->offer_handler(conn->protocol_adaptor->user_context, 
link, offer);
     }
 
     return num_deliveries_completed;
diff --git a/src/router_node.c b/src/router_node.c
index ce59329..a5278dc 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -37,6 +37,8 @@ const char *QD_ROUTER_NODE_TYPE = "router.node";
 const char *QD_ROUTER_ADDRESS_TYPE = "router.address";
 const char *QD_ROUTER_LINK_TYPE = "router.link";
 
+static qdr_protocol_adaptor_t *amqp_direct_adaptor = 0;
+
 static char *router_role    = "inter-router";
 static char *container_role = "route-container";
 static char *edge_role      = "edge";
@@ -1199,7 +1201,13 @@ static void AMQP_opened_handler(qd_router_t *router, 
qd_connection_t *conn, bool
                                                                  is_ssl,
                                                                  
(rversion_found) ? &rversion : 0);
 
-    qdr_connection_opened(router->router_core, inbound, role, cost, 
connection_id, name,
+    qdr_connection_opened(router->router_core,
+                          amqp_direct_adaptor,
+                          inbound,
+                          role,
+                          cost,
+                          connection_id,
+                          name,
                           pn_connection_remote_container(pn_conn),
                           conn->strip_annotations_in,
                           conn->strip_annotations_out,
@@ -1208,7 +1216,8 @@ static void AMQP_opened_handler(qd_router_t *router, 
qd_connection_t *conn, bool
                           link_capacity,
                           vhost,
                           connection_info,
-                          bind_connection_context, conn);
+                          bind_connection_context,
+                          conn);
 
     char   props_str[1000];
     size_t props_len = 1000;
@@ -1968,21 +1977,23 @@ void qd_router_setup_late(qd_dispatch_t *qd)
     qd->router->tracemask   = qd_tracemask();
     qd->router->router_core = qdr_core(qd, qd->router->router_mode, 
qd->router->router_area, qd->router->router_id);
 
-    qdr_connection_handlers(qd->router->router_core, (void*) qd->router,
-                            CORE_connection_activate,
-                            CORE_link_first_attach,
-                            CORE_link_second_attach,
-                            CORE_link_detach,
-                            CORE_link_flow,
-                            CORE_link_offer,
-                            CORE_link_drained,
-                            CORE_link_drain,
-                            CORE_link_push,
-                            CORE_link_deliver,
-                            CORE_link_get_credit,
-                            CORE_delivery_update,
-                            CORE_close_connection,
-                            CORE_conn_trace);
+    amqp_direct_adaptor = qdr_protocol_adaptor(qd->router->router_core,
+                                               "amqp-direct",
+                                               (void*) qd->router,
+                                               CORE_connection_activate,
+                                               CORE_link_first_attach,
+                                               CORE_link_second_attach,
+                                               CORE_link_detach,
+                                               CORE_link_flow,
+                                               CORE_link_offer,
+                                               CORE_link_drained,
+                                               CORE_link_drain,
+                                               CORE_link_push,
+                                               CORE_link_deliver,
+                                               CORE_link_get_credit,
+                                               CORE_delivery_update,
+                                               CORE_close_connection,
+                                               CORE_conn_trace);
 
     qd_router_python_setup(qd->router);
     qd_timer_schedule(qd->router->timer, 1000);
@@ -1994,6 +2005,7 @@ void qd_router_free(qd_router_t *router)
 
     qd_container_set_default_node_type(router->qd, 0, 0, QD_DIST_BOTH);
 
+    qdr_protocol_adaptor_free(router->router_core, amqp_direct_adaptor);
     qdr_core_free(router->router_core);
     qd_tracemask_free(router->tracemask);
     qd_timer_free(router->timer);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to