This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 5bb50a1c6cd5571b81ab562ec6fe2360c46ce860 Author: Ted Ross <tr...@apache.org> AuthorDate: Fri Jul 17 14:04:01 2020 -0400 Dataplane: WIP --- include/qpid/dispatch/message.h | 17 +------------ src/adaptors/reference_adaptor.c | 22 +++++++---------- src/message.c | 28 ++++------------------ src/message_private.h | 1 + src/python_embedded.c | 2 +- src/router_core/core_client_api.c | 2 +- .../modules/test_hooks/core_test_hooks.c | 2 +- 7 files changed, 18 insertions(+), 56 deletions(-) diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 5a0b42a..f9e61d1 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -275,26 +275,11 @@ ssize_t qd_message_field_copy(qd_message_t *msg, qd_message_field_t field, char // Convenience Functions void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *buffers); -void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content); +void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content, bool complete); void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2); void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3); /** - * Send a message with optional headers and an optional raw body with the option of starting - * a streaming transfer. - * - * @param msg The message being composed - * @param headers A composed field with 1 or more header sections (incl body performative) or NULL - * @param body A buffer list of raw body content or NULL - * @param complete True if the message is to be receive-complete. - * False if more content will arrive later. - */ -void qd_message_compose_5(qd_message_t *msg, - qd_composed_field_t *headers, - qd_buffer_list_t *body, - bool complete); - -/** * qd_message_extend * * Extend the content of a streaming message with more buffers. diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c index 2e1d98e..a0032a1 100644 --- a/src/adaptors/reference_adaptor.c +++ b/src/adaptors/reference_adaptor.c @@ -152,8 +152,6 @@ static void qdr_ref_detach(void *context, qdr_link_t *link, qdr_error_t *error, static void qdr_ref_flow(void *context, qdr_link_t *link, int credit) { qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; - qd_buffer_list_t buffers; - qd_buffer_t *buf; printf("qdr_ref_flow: %d credits issued\n", credit); @@ -177,15 +175,12 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit) */ qd_compose_end_list(props); + props = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, props); + qd_compose_insert_string(props, "Test Payload"); + qd_message_t *msg = qd_message(); - DEQ_INIT(buffers); - buf = qd_buffer(); - char *insert = (char*) qd_buffer_cursor(buf); - memcpy(insert, "\x00\x53\x77\xa1\x0cTest Payload", 17); - qd_buffer_insert(buf, 17); - DEQ_INSERT_HEAD(buffers, buf); - - qd_message_compose_5(msg, props, &buffers, true); + + qd_message_compose_2(msg, props, true); qd_compose_free(props); qdr_link_deliver(adaptor->out_link_1, msg, 0, false, 0, 0); @@ -205,7 +200,7 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit) adaptor->streaming_message = qd_message(); - qd_message_compose_5(adaptor->streaming_message, props, 0, false); + qd_message_compose_2(adaptor->streaming_message, props, false); qd_compose_free(props); printf("qdr_ref_flow: Starting a streaming delivery\n"); @@ -409,6 +404,7 @@ static void on_stream(void *context) qd_buffer_list_t buffer_list; DEQ_INIT(buffer_list); qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length); + qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length); // // Compose a DATA performative for this section of the stream @@ -423,7 +419,7 @@ static void on_stream(void *context) qd_compose_free(field); // - // Notify the router that more data has arrived on the delivery + // Notify the router that more data is ready to be pushed out on the delivery // qdr_delivery_continue(adaptor->core, adaptor->streaming_delivery, false); } @@ -493,4 +489,4 @@ void qdr_ref_adaptor_final(void *adaptor_context) /** * Declare the adaptor so that it will self-register on process startup. */ -//QDR_CORE_ADAPTOR_DECLARE("ref-adaptor", qdr_ref_adaptor_init, qdr_ref_adaptor_final) +QDR_CORE_ADAPTOR_DECLARE("ref-adaptor", qdr_ref_adaptor_init, qdr_ref_adaptor_final) diff --git a/src/message.c b/src/message.c index c00b909..2b758b0 100644 --- a/src/message.c +++ b/src/message.c @@ -2224,14 +2224,14 @@ void qd_message_compose_1(qd_message_t *msg, const char *to, qd_buffer_list_t *b } -void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field) +void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field, bool complete) { qd_message_content_t *content = MSG_CONTENT(msg); - content->receive_complete = true; - qd_buffer_list_t *field_buffers = qd_compose_buffers(field); - content->buffers = *field_buffers; + content->buffers = *field_buffers; + content->receive_complete = complete; + DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers. } @@ -2264,26 +2264,6 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_com } -void qd_message_compose_5(qd_message_t *msg, - qd_composed_field_t *headers, - qd_buffer_list_t *body, - bool complete) -{ - qd_message_content_t *content = MSG_CONTENT(msg); - qd_buffer_list_t *headers_buffers = headers ? qd_compose_buffers(headers) : 0; - - DEQ_INIT(content->buffers); - if (headers_buffers) - DEQ_APPEND(content->buffers, (*headers_buffers)); - - if (body) { - DEQ_APPEND(content->buffers, (*body)); - } - - content->receive_complete = complete; -} - - int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field) { qd_message_content_t *content = MSG_CONTENT(msg); diff --git a/src/message_private.h b/src/message_private.h index 47f8f3e..d2b62f7 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -135,6 +135,7 @@ typedef struct { qd_buffer_list_t ma_trace; // trace list in outgoing message annotations qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations int ma_phase; // phase for the override address + qd_field_location_t body_section; // Location of the current parsed body section bool strip_annotations_in; bool send_complete; // Has the message been completely received and completely sent? bool tag_sent; // Tags are sent diff --git a/src/python_embedded.c b/src/python_embedded.c index 7265eb2..9465929 100644 --- a/src/python_embedded.c +++ b/src/python_embedded.c @@ -764,7 +764,7 @@ static PyObject *qd_python_send(PyObject *self, PyObject *args) if (compose_python_message(&field, message, ioa->qd) == QD_ERROR_NONE) { qd_message_t *msg = qd_message(); - qd_message_compose_2(msg, field); + qd_message_compose_2(msg, field, true); qd_composed_field_t *ingress = qd_compose_subfield(0); qd_compose_insert_string(ingress, qd_router_id(ioa->qd)); diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c index 3b2b85c..5b52929 100644 --- a/src/router_core/core_client_api.c +++ b/src/router_core/core_client_api.c @@ -687,7 +687,7 @@ static qd_message_t *_create_message_CT(qdrc_client_t *client, } else if (req->app_properties) { qd_message_compose_3(message, fld, req->app_properties); } else { - qd_message_compose_2(message, fld); + qd_message_compose_2(message, fld, true); } qd_compose_free(fld); qd_compose_free(req->body); diff --git a/src/router_core/modules/test_hooks/core_test_hooks.c b/src/router_core/modules/test_hooks/core_test_hooks.c index 263b9d9..a810d35 100644 --- a/src/router_core/modules/test_hooks/core_test_hooks.c +++ b/src/router_core/modules/test_hooks/core_test_hooks.c @@ -108,7 +108,7 @@ static void source_send(test_endpoint_t *ep, bool presettled) qd_compose_insert_string(field, stringbuf); dlv = qdrc_endpoint_delivery_CT(ep->node->core, ep->ep, msg); - qd_message_compose_2(msg, field); + qd_message_compose_2(msg, field, true); qd_compose_free(field); qdrc_endpoint_send_CT(ep->node->core, ep->ep, dlv, presettled); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org