This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 9545f352aa13032c3433d704abe08a45d7bf5919 Author: Ted Ross <tr...@apache.org> AuthorDate: Wed Jun 3 15:18:26 2020 -0400 Dataplane: Updated the reference adaptor to implement connection activation --- src/adaptors/reference_adaptor.c | 150 +++++++++++++++++++++------------------ 1 file changed, 79 insertions(+), 71 deletions(-) diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c index 7bbc45d..b4d4afa 100644 --- a/src/adaptors/reference_adaptor.c +++ b/src/adaptors/reference_adaptor.c @@ -28,8 +28,8 @@ typedef struct qdr_ref_adaptor_t { qdr_core_t *core; qdr_protocol_adaptor_t *adaptor; - qd_timer_t *timer; - int sequence; + qd_timer_t *startup_timer; + qd_timer_t *activate_timer; qdr_connection_t *conn; qdr_link_t *out_link; qdr_link_t *in_link; @@ -39,11 +39,15 @@ typedef struct qdr_ref_adaptor_t { void qdr_ref_connection_activate_CT(void *context, qdr_connection_t *conn) { // - // Don't do this here, use a zero-length timer to defer to an IO thread. + // Use a zero-delay timer to defer this call to an IO thread + // + // Note that this may not be generally safe to do. There's no guarantee that multiple + // activations won't schedule multiple IO threads running this code concurrently. + // Normally, we would rely on assurances provided by the IO scheduler (Proton) that no + // connection shall ever be served by more than one thread concurrently. // qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; - - while (qdr_connection_process(adaptor->conn)) {} + qd_timer_schedule(adaptor->activate_timer, 0); } @@ -162,70 +166,72 @@ static void qdr_ref_conn_trace(void *context, qdr_connection_t *conn, bool trace } -static void on_timer(void *context) +static void on_startup(void *context) { qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; - if (adaptor->sequence == 0) { - qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted, - false, //bool is_authenticated, - true, //bool opened, - "", //char *sasl_mechanisms, - QD_INCOMING, //qd_direction_t dir, - "127.0.0.1:47756", //const char *host, - "", //const char *ssl_proto, - "", //const char *ssl_cipher, - "", //const char *user, - "", //const char *container, - pn_data(0), //pn_data_t *connection_properties, - 0, //int ssl_ssf, - false, //bool ssl, - // set if remote is a qdrouter - 0); //const qdr_router_version_t *version) - - adaptor->conn = qdr_connection_opened(adaptor->core, - adaptor->adaptor, - true, - QDR_ROLE_NORMAL, - 1, - 10000, // get this from qd_connection_t - 0, - 0, - false, - false, - false, - false, - 250, - 0, - info, - 0, - 0); - - uint64_t link_id; - qdr_terminus_t *dynamic_source = qdr_terminus(0); - qdr_terminus_set_dynamic(dynamic_source); - qdr_terminus_t *target = qdr_terminus(0); - qdr_terminus_set_address(target, "echo-service"); - - adaptor->out_link = qdr_link_first_attach(adaptor->conn, - QD_INCOMING, - qdr_terminus(0), //qdr_terminus_t *source, - target, //qdr_terminus_t *target, - "ref.1", //const char *name, - 0, //const char *terminus_addr, - &link_id); - adaptor->in_link = qdr_link_first_attach(adaptor->conn, - QD_OUTGOING, - dynamic_source, //qdr_terminus_t *source, - qdr_terminus(0), //qdr_terminus_t *target, - "ref.2", //const char *name, - 0, //const char *terminus_addr, - &link_id); - adaptor->sequence++; - } + qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted, + false, //bool is_authenticated, + true, //bool opened, + "", //char *sasl_mechanisms, + QD_INCOMING, //qd_direction_t dir, + "127.0.0.1:47756", //const char *host, + "", //const char *ssl_proto, + "", //const char *ssl_cipher, + "", //const char *user, + "", //const char *container, + pn_data(0), //pn_data_t *connection_properties, + 0, //int ssl_ssf, + false, //bool ssl, + // set if remote is a qdrouter + 0); //const qdr_router_version_t *version) + + adaptor->conn = qdr_connection_opened(adaptor->core, + adaptor->adaptor, + true, + QDR_ROLE_NORMAL, + 1, + 10000, // get this from qd_connection_t + 0, + 0, + false, + false, + false, + false, + 250, + 0, + info, + 0, + 0); + + uint64_t link_id; + qdr_terminus_t *dynamic_source = qdr_terminus(0); + qdr_terminus_set_dynamic(dynamic_source); + qdr_terminus_t *target = qdr_terminus(0); + qdr_terminus_set_address(target, "echo-service"); + + adaptor->out_link = qdr_link_first_attach(adaptor->conn, + QD_INCOMING, + qdr_terminus(0), //qdr_terminus_t *source, + target, //qdr_terminus_t *target, + "ref.1", //const char *name, + 0, //const char *terminus_addr, + &link_id); + adaptor->in_link = qdr_link_first_attach(adaptor->conn, + QD_OUTGOING, + dynamic_source, //qdr_terminus_t *source, + qdr_terminus(0), //qdr_terminus_t *target, + "ref.2", //const char *name, + 0, //const char *terminus_addr, + &link_id); +} + - qd_timer_schedule(adaptor->timer, 1000); - qdr_connection_process(adaptor->conn); +static void on_activate(void *context) +{ + qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; + + while (qdr_connection_process(adaptor->conn)) {} } @@ -236,7 +242,7 @@ static void on_timer(void *context) * 1) Register the protocol adaptor with the router-core. * 2) Prepare the protocol adaptor to be configured. */ -static void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context) +void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context) { qdr_ref_adaptor_t *adaptor = NEW(qdr_ref_adaptor_t); adaptor->core = core; @@ -260,17 +266,19 @@ static void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context) *adaptor_context = adaptor; // TEMPORARY // - adaptor->timer = qd_timer(core->qd, on_timer, adaptor); - adaptor->sequence = 0; - qd_timer_schedule(adaptor->timer, 0); + adaptor->startup_timer = qd_timer(core->qd, on_startup, adaptor); + qd_timer_schedule(adaptor->startup_timer, 0); + + adaptor->activate_timer = qd_timer(core->qd, on_activate, adaptor); } -static void qdr_ref_adaptor_final(void *adaptor_context) +void qdr_ref_adaptor_final(void *adaptor_context) { qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) adaptor_context; qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor); - qd_timer_free(adaptor->timer); + qd_timer_free(adaptor->startup_timer); + qd_timer_free(adaptor->activate_timer); free(adaptor); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org