This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 3b58b218cce9b66cc2f291710724ea377d7e9bbb Author: Ted Ross <tr...@apache.org> AuthorDate: Wed Jun 3 14:43:22 2020 -0400 Dataplane: Renamed tcp_adaptor to reference_adaptor. Added more test content to the reference adaptor. It now sends messages to a fixed address. Fixed qdr_terminus_format to show the dynamically-assigned address for dynamis termini. --- src/CMakeLists.txt | 2 +- src/adaptors/reference_adaptor.c | 280 +++++++++++++++++++++++++++++++++++++++ src/adaptors/tcp_adaptor.c | 145 -------------------- src/router_core/terminus.c | 12 +- 4 files changed, 289 insertions(+), 150 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8c0c7f7..37a61d1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -38,7 +38,7 @@ add_custom_command ( # Build the qpid-dispatch library. set(qpid_dispatch_SOURCES - adaptors/tcp_adaptor.c + adaptors/reference_adaptor.c alloc_pool.c amqp.c bitmask.c diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c new file mode 100644 index 0000000..7bbc45d --- /dev/null +++ b/src/adaptors/reference_adaptor.c @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qpid/dispatch/ctools.h" +#include "qpid/dispatch/protocol_adaptor.h" +#include "delivery.h" +#include "qpid/dispatch/timer.h" +#include "qpid/dispatch/message.h" +#include <stdio.h> +#include <inttypes.h> + +typedef struct qdr_ref_adaptor_t { + qdr_core_t *core; + qdr_protocol_adaptor_t *adaptor; + qd_timer_t *timer; + int sequence; + qdr_connection_t *conn; + qdr_link_t *out_link; + qdr_link_t *in_link; +} qdr_ref_adaptor_t; + + +void qdr_ref_connection_activate_CT(void *context, qdr_connection_t *conn) +{ + // + // Don't do this here, use a zero-length timer to defer to an IO thread. + // + qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; + + while (qdr_connection_process(adaptor->conn)) {} +} + + +static void qdr_ref_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link, + qdr_terminus_t *source, qdr_terminus_t *target, + qd_session_class_t session_class) +{ +} + + +static void qdr_ref_second_attach(void *context, qdr_link_t *link, + qdr_terminus_t *source, qdr_terminus_t *target) +{ + char ftarget[100]; + char fsource[100]; + + ftarget[0] = '\0'; + fsource[0] = '\0'; + + if (!!source) { + size_t size = 100; + qdr_terminus_format(source, fsource, &size); + } + + if (!!target) { + size_t size = 100; + qdr_terminus_format(target, ftarget, &size); + } + + printf("qdr_ref_second_attach: source=%s target=%s\n", fsource, ftarget); +} + + +static void qdr_ref_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close) +{ +} + + +static void qdr_ref_flow(void *context, qdr_link_t *link, int credit) +{ + qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; + qd_buffer_list_t buffers; + qd_buffer_t *buf; + + printf("qdr_ref_flow: %d credits issued\n", credit); + + qd_message_t *msg = qd_message(); + DEQ_INIT(buffers); + buf = qd_buffer(); + char *insert = (char*) qd_buffer_cursor(buf); + strcpy(insert, "Test Payload"); + qd_buffer_insert(buf, 13); + DEQ_INSERT_HEAD(buffers, buf); + qd_message_compose_1(msg, "echo-service", &buffers); + + qdr_link_deliver(adaptor->out_link, msg, 0, false, 0, 0); +} + + +static void qdr_ref_offer(void *context, qdr_link_t *link, int delivery_count) +{ +} + + +static void qdr_ref_drained(void *context, qdr_link_t *link) +{ +} + + +static void qdr_ref_drain(void *context, qdr_link_t *link, bool mode) +{ +} + + +static int qdr_ref_push(void *context, qdr_link_t *link, int limit) +{ + return 0; +} + + +static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled) +{ + return 0; +} + + +static int qdr_ref_get_credit(void *context, qdr_link_t *link) +{ + return 0; +} + + +static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled) +{ + char *dispname; + + switch (disp) { + case PN_ACCEPTED: dispname = "ACCEPTED"; break; + case PN_REJECTED: dispname = "REJECTED"; break; + case PN_RELEASED: dispname = "RELEASED"; break; + case PN_MODIFIED: dispname = "MODIFIED"; break; + default: + dispname = "<UNKNOWN>"; + } + printf("qdr_ref_delivery_update: disp=%s settled=%s\n", dispname, settled ? "true" : "false"); +} + + +static void qdr_ref_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error) +{ +} + + +static void qdr_ref_conn_trace(void *context, qdr_connection_t *conn, bool trace) +{ +} + + +static void on_timer(void *context) +{ + qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; + + if (adaptor->sequence == 0) { + qdr_connection_info_t *info = qdr_connection_info(false, //bool is_encrypted, + false, //bool is_authenticated, + true, //bool opened, + "", //char *sasl_mechanisms, + QD_INCOMING, //qd_direction_t dir, + "127.0.0.1:47756", //const char *host, + "", //const char *ssl_proto, + "", //const char *ssl_cipher, + "", //const char *user, + "", //const char *container, + pn_data(0), //pn_data_t *connection_properties, + 0, //int ssl_ssf, + false, //bool ssl, + // set if remote is a qdrouter + 0); //const qdr_router_version_t *version) + + adaptor->conn = qdr_connection_opened(adaptor->core, + adaptor->adaptor, + true, + QDR_ROLE_NORMAL, + 1, + 10000, // get this from qd_connection_t + 0, + 0, + false, + false, + false, + false, + 250, + 0, + info, + 0, + 0); + + uint64_t link_id; + qdr_terminus_t *dynamic_source = qdr_terminus(0); + qdr_terminus_set_dynamic(dynamic_source); + qdr_terminus_t *target = qdr_terminus(0); + qdr_terminus_set_address(target, "echo-service"); + + adaptor->out_link = qdr_link_first_attach(adaptor->conn, + QD_INCOMING, + qdr_terminus(0), //qdr_terminus_t *source, + target, //qdr_terminus_t *target, + "ref.1", //const char *name, + 0, //const char *terminus_addr, + &link_id); + adaptor->in_link = qdr_link_first_attach(adaptor->conn, + QD_OUTGOING, + dynamic_source, //qdr_terminus_t *source, + qdr_terminus(0), //qdr_terminus_t *target, + "ref.2", //const char *name, + 0, //const char *terminus_addr, + &link_id); + adaptor->sequence++; + } + + qd_timer_schedule(adaptor->timer, 1000); + qdr_connection_process(adaptor->conn); +} + + +/** + * This initialization function will be invoked when the router core is ready for the protocol + * adaptor to be created. This function must: + * + * 1) Register the protocol adaptor with the router-core. + * 2) Prepare the protocol adaptor to be configured. + */ +static void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context) +{ + qdr_ref_adaptor_t *adaptor = NEW(qdr_ref_adaptor_t); + adaptor->core = core; + adaptor->adaptor = qdr_protocol_adaptor(core, + "reference", // name + adaptor, // context + qdr_ref_connection_activate_CT, + qdr_ref_first_attach, + qdr_ref_second_attach, + qdr_ref_detach, + qdr_ref_flow, + qdr_ref_offer, + qdr_ref_drained, + qdr_ref_drain, + qdr_ref_push, + qdr_ref_deliver, + qdr_ref_get_credit, + qdr_ref_delivery_update, + qdr_ref_conn_close, + qdr_ref_conn_trace); + *adaptor_context = adaptor; + + // TEMPORARY // + adaptor->timer = qd_timer(core->qd, on_timer, adaptor); + adaptor->sequence = 0; + qd_timer_schedule(adaptor->timer, 0); +} + + +static void qdr_ref_adaptor_final(void *adaptor_context) +{ + qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) adaptor_context; + qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor); + qd_timer_free(adaptor->timer); + free(adaptor); +} + +/** + * Declare the adaptor so that it will self-register on process startup. + */ +QDR_CORE_ADAPTOR_DECLARE("ref-adaptor", qdr_ref_adaptor_init, qdr_ref_adaptor_final) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c deleted file mode 100644 index ab5dc48..0000000 --- a/src/adaptors/tcp_adaptor.c +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qpid/dispatch/ctools.h" -#include "qpid/dispatch/protocol_adaptor.h" -#include "delivery.h" -#include <stdio.h> -#include <inttypes.h> - -typedef struct qdr_tcp_adaptor_t { - qdr_core_t *core; - qdr_protocol_adaptor_t *adaptor; -} qdr_tcp_adaptor_t; - - -static void qdr_tcp_first_attach(void *context, qdr_connection_t *conn, qdr_link_t *link, - qdr_terminus_t *source, qdr_terminus_t *target, - qd_session_class_t session_class) -{ -} - - -static void qdr_tcp_second_attach(void *context, qdr_link_t *link, - qdr_terminus_t *source, qdr_terminus_t *target) -{ -} - - -static void qdr_tcp_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close) -{ -} - - -static void qdr_tcp_flow(void *context, qdr_link_t *link, int credit) -{ -} - - -static void qdr_tcp_offer(void *context, qdr_link_t *link, int delivery_count) -{ -} - - -static void qdr_tcp_drained(void *context, qdr_link_t *link) -{ -} - - -static void qdr_tcp_drain(void *context, qdr_link_t *link, bool mode) -{ -} - - -static int qdr_tcp_push(void *context, qdr_link_t *link, int limit) -{ - return 0; -} - - -static uint64_t qdr_tcp_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled) -{ - return 0; -} - - -static int qdr_tcp_get_credit(void *context, qdr_link_t *link) -{ - return 0; -} - - -static void qdr_tcp_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled) -{ -} - - -static void qdr_tcp_conn_close(void *context, qdr_connection_t *conn, qdr_error_t *error) -{ -} - - -static void qdr_tcp_conn_trace(void *context, qdr_connection_t *conn, bool trace) -{ -} - - -/** - * This initialization function will be invoked when the router core is ready for the protocol - * adaptor to be created. This function must: - * - * 1) Register the protocol adaptor with the router-core. - * 2) Prepare the protocol adaptor to be configured. - */ -static void qdr_tcp_adaptor_init(qdr_core_t *core, void **adaptor_context) -{ - qdr_tcp_adaptor_t *adaptor = NEW(qdr_tcp_adaptor_t); - adaptor->core = core; - adaptor->adaptor = qdr_protocol_adaptor(core, - "tcp", // name - adaptor, // context - 0, // activate - qdr_tcp_first_attach, - qdr_tcp_second_attach, - qdr_tcp_detach, - qdr_tcp_flow, - qdr_tcp_offer, - qdr_tcp_drained, - qdr_tcp_drain, - qdr_tcp_push, - qdr_tcp_deliver, - qdr_tcp_get_credit, - qdr_tcp_delivery_update, - qdr_tcp_conn_close, - qdr_tcp_conn_trace); - *adaptor_context = adaptor; -} - - -static void qdr_tcp_adaptor_final(void *adaptor_context) -{ - qdr_tcp_adaptor_t *adaptor = (qdr_tcp_adaptor_t*) adaptor_context; - qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor); - free(adaptor); -} - -/** - * Declare the adaptor so that it will self-register on process startup. - */ -QDR_CORE_ADAPTOR_DECLARE("tcp-adaptor", qdr_tcp_adaptor_init, qdr_tcp_adaptor_final) diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c index 9674987..921baf8 100644 --- a/src/router_core/terminus.c +++ b/src/router_core/terminus.c @@ -92,13 +92,17 @@ void qdr_terminus_format(qdr_terminus_t *term, char *output, size_t *size) break; } - if (term->dynamic) - len = safe_snprintf(output, *size, "<dynamic>"); - else if (term->address && term->address->iterator) { + if (term->dynamic) { + len = safe_snprintf(output, *size, "(dyn)"); + output += len; + *size -= len; + } + + if (term->address && term->address->iterator) { qd_iterator_reset_view(term->address->iterator, ITER_VIEW_ALL); len = qd_iterator_ncopy(term->address->iterator, (unsigned char*) output, *size - 1); output[len] = 0; - } else if (term->address == 0) + } else len = safe_snprintf(output, *size, "<none>"); output += len; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org