This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 1463337dcc13a73b357fbfe15b5d8f0f8dba34a8 Author: Ted Ross <tr...@apache.org> AuthorDate: Mon Jun 8 09:49:37 2020 -0400 Dataplane: Updates to the message-extend (return buffer count for flow control). Added bidirectional streaming test to ref adaptor. --- include/qpid/dispatch/message.h | 8 ++- src/adaptors/reference_adaptor.c | 73 ++++++++++++++++------ src/message.c | 5 +- .../streaming_link_scrubber.c | 2 +- 4 files changed, 67 insertions(+), 21 deletions(-) diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 0f63b1a..1a7bce2 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -294,9 +294,15 @@ void qd_message_compose_5(qd_message_t *msg, bool complete); /** + * qd_message_extend + * * Extend the content of a streaming message with more buffers. + * + * @param msg Pointer to a message + * @param buffers A list of buffers to be appended to the end of the message's stream + * @return The number of buffers stored in the message's content */ -void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers); +int qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers); /** Put string representation of a message suitable for logging in buffer. diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c index f227542..7a65c17 100644 --- a/src/adaptors/reference_adaptor.c +++ b/src/adaptors/reference_adaptor.c @@ -37,6 +37,7 @@ typedef struct qdr_ref_adaptor_t { qdr_connection_t *conn; qdr_link_t *out_link_1; qdr_link_t *out_link_2; + qdr_link_t *in_link_2; qdr_link_t *dynamic_in_link; char *reply_to; qd_message_t *streaming_message; @@ -123,6 +124,16 @@ static void qdr_ref_second_attach(void *context, qdr_link_t *link, "ref.2", //const char *name, 0, //const char *terminus_addr, &link_id); + + source = qdr_terminus(0); + qdr_terminus_set_address(source, address2); + adaptor->in_link_2 = qdr_link_first_attach(adaptor->conn, + QD_OUTGOING, + source, //qdr_terminus_t *source, + qdr_terminus(0), //qdr_terminus_t *target, + "ref.3", //const char *name, + 0, //const char *terminus_addr, + &link_id); } } @@ -193,6 +204,7 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit) qd_message_compose_5(adaptor->streaming_message, props, 0, false); qd_compose_free(props); + printf("qdr_ref_flow: Starting a streaming delivery\n"); adaptor->streaming_delivery = qdr_link_deliver(adaptor->out_link_2, adaptor->streaming_message, 0, false, 0, 0); adaptor->stream_count = 0; @@ -231,20 +243,34 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; qd_message_t *msg = qdr_delivery_message(delivery); - qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_BODY); - - if (status == QD_MESSAGE_DEPTH_OK) { - qd_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY); - char *body = (char*) qd_iterator_copy(body_iter); - printf("qdr_ref_deliver: message received, body=%s\n", body); - free(body); - qd_iterator_free(body_iter); - qd_message_set_send_complete(msg); + qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES); + + switch (status) { + case QD_MESSAGE_DEPTH_OK: { + if (qd_message_receive_complete(msg)) { + qd_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY); + char *body = (char*) qd_iterator_copy(body_iter); + printf("qdr_ref_deliver: complete message received, body=%s\n", body); + free(body); + qd_iterator_free(body_iter); + qd_message_set_send_complete(msg); + qdr_link_flow(adaptor->core, link, 1, false); + return PN_ACCEPTED; // This will cause the delivery to be settled + } + break; } - qdr_link_flow(adaptor->core, link, 1, false); + case QD_MESSAGE_DEPTH_INVALID: + printf("qdr_ref_deliver: message invalid\n"); + qdr_link_flow(adaptor->core, link, 1, false); + break; + + case QD_MESSAGE_DEPTH_INCOMPLETE: + printf("qdr_ref_deliver: message incomplete\n"); + break; + } - return PN_ACCEPTED; // This will cause the delivery to be settled + return 0; } @@ -269,7 +295,12 @@ static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t } printf("qdr_ref_delivery_update: disp=%s settled=%s\n", dispname, settled ? "true" : "false"); - if (settled && qdr_delivery_link(dlv) == adaptor->out_link_1) + if (qdr_delivery_link(dlv) == adaptor->out_link_2 && qdr_delivery_message(dlv) == adaptor->streaming_message) { + adaptor->streaming_message = 0; + adaptor->stream_count = 0; + } + + if (settled) qdr_delivery_decref(adaptor->core, dlv, "qdr_ref_delivery_update - settled delivery"); } @@ -352,23 +383,30 @@ static void on_stream(void *context) qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; const char *content = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; const size_t content_length = strlen(content); - qd_buffer_t *buf = qd_buffer(); - qd_buffer_list_t buffers; + + if (!adaptor->streaming_message) + return; + + qd_buffer_t *buf = qd_buffer(); + qd_buffer_list_t buffers; DEQ_INIT(buffers); DEQ_INSERT_TAIL(buffers, buf); memcpy(qd_buffer_cursor(buf), content, content_length); qd_buffer_insert(buf, content_length); - qd_message_extend(adaptor->streaming_message, &buffers); + int depth = qd_message_extend(adaptor->streaming_message, &buffers); qdr_delivery_continue(adaptor->core, adaptor->streaming_delivery, false); - if (adaptor->stream_count < 30) { - qd_timer_schedule(adaptor->stream_timer, 1000); + if (adaptor->stream_count < 10) { + qd_timer_schedule(adaptor->stream_timer, 100); adaptor->stream_count++; + printf("on_stream: sent streamed frame %d, depth=%d\n", adaptor->stream_count, depth); } else { qd_message_set_receive_complete(adaptor->streaming_message); adaptor->streaming_message = 0; + adaptor->stream_count = 0; + printf("on_stream: completed streaming send, depth=%d\n", depth); } } @@ -404,7 +442,6 @@ void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context) qdr_ref_conn_trace); *adaptor_context = adaptor; - // TEMPORARY // adaptor->startup_timer = qd_timer(core->qd, on_startup, adaptor); qd_timer_schedule(adaptor->startup_timer, 0); diff --git a/src/message.c b/src/message.c index 8b833f3..97bcab2 100644 --- a/src/message.c +++ b/src/message.c @@ -2218,10 +2218,11 @@ void qd_message_compose_5(qd_message_t *msg, } -void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers) +int qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers) { qd_message_content_t *content = MSG_CONTENT(msg); qd_buffer_t *buf = DEQ_HEAD(*buffers); + int count; LOCK(content->lock); while (buf) { @@ -2230,7 +2231,9 @@ void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers) } DEQ_APPEND(content->buffers, (*buffers)); + count = DEQ_SIZE(content->buffers); UNLOCK(content->lock); + return count; } diff --git a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c index 1c578ca..126a09b 100644 --- a/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c +++ b/src/router_core/modules/streaming_link_scrubber/streaming_link_scrubber.c @@ -184,4 +184,4 @@ static void qcm_streaming_link_scrubber_final_CT(void *module_context) } -QDR_CORE_MODULE_DECLARE("streaming_link_scruber", qcm_streaming_link_scrubber_enable_CT, qcm_streaming_link_scrubber_init_CT, qcm_streaming_link_scrubber_final_CT) +QDR_CORE_MODULE_DECLARE("streaming_link_scrubber", qcm_streaming_link_scrubber_enable_CT, qcm_streaming_link_scrubber_init_CT, qcm_streaming_link_scrubber_final_CT) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org