This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 02261342d835f8cf3a09e9e15121e43da5222ab8 Author: Ted Ross <tr...@apache.org> AuthorDate: Wed Jun 3 18:02:25 2020 -0400 Dataplane: Added a 5th message compose variant to provide: - optional properties - optional application-properties - optional body in the form of a buffer list - indication of receive-complete --- include/qpid/dispatch/message.h | 6 +++ src/adaptors/reference_adaptor.c | 80 ++++++++++++++++++++++++++++++---------- src/message.c | 27 ++++++++++++++ 3 files changed, 93 insertions(+), 20 deletions(-) diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 63f0b47..55c3dba 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -278,6 +278,12 @@ void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content); void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2); void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3); +void qd_message_compose_5(qd_message_t *msg, + qd_composed_field_t *properties, + qd_composed_field_t *application_properties, + qd_buffer_list_t *body, + bool complete); + /** Put string representation of a message suitable for logging in buffer. * @return buffer */ diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c index ea0ca6f..b6ea5b4 100644 --- a/src/adaptors/reference_adaptor.c +++ b/src/adaptors/reference_adaptor.c @@ -25,6 +25,8 @@ #include <stdio.h> #include <inttypes.h> +static char *address = "echo-service"; + typedef struct qdr_ref_adaptor_t { qdr_core_t *core; qdr_protocol_adaptor_t *adaptor; @@ -33,6 +35,7 @@ typedef struct qdr_ref_adaptor_t { qdr_connection_t *conn; qdr_link_t *out_link; qdr_link_t *in_link; + char *reply_to; } qdr_ref_adaptor_t; @@ -61,6 +64,7 @@ static void qdr_ref_first_attach(void *context, qdr_connection_t *conn, qdr_link static void qdr_ref_second_attach(void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target) { + qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; #define TERM_SIZE 200 char ftarget[TERM_SIZE]; char fsource[TERM_SIZE]; @@ -79,6 +83,25 @@ static void qdr_ref_second_attach(void *context, qdr_link_t *link, } printf("qdr_ref_second_attach: source=%s target=%s\n", fsource, ftarget); + + if (link == adaptor->in_link) { + uint64_t link_id; + qdr_terminus_t *target = qdr_terminus(0); + + qdr_terminus_set_address(target, address); + + adaptor->out_link = qdr_link_first_attach(adaptor->conn, + QD_INCOMING, + qdr_terminus(0), //qdr_terminus_t *source, + target, //qdr_terminus_t *target, + "ref.1", //const char *name, + 0, //const char *terminus_addr, + &link_id); + + qd_iterator_t *reply_iter = qdr_terminus_get_address(source); + adaptor->reply_to = (char*) qd_iterator_copy(reply_iter); + printf("qdr_ref_second_attach: reply-to=%s\n", adaptor->reply_to); + } } @@ -95,16 +118,40 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit) printf("qdr_ref_flow: %d credits issued\n", credit); - qd_message_t *msg = qd_message(); - DEQ_INIT(buffers); - buf = qd_buffer(); - char *insert = (char*) qd_buffer_cursor(buf); - strcpy(insert, "Test Payload"); - qd_buffer_insert(buf, 13); - DEQ_INSERT_HEAD(buffers, buf); - qd_message_compose_1(msg, "echo-service", &buffers); - - qdr_link_deliver(adaptor->out_link, msg, 0, false, 0, 0); + if (link == adaptor->out_link) { + qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0); + qd_compose_start_list(props); + qd_compose_insert_null(props); // message-id + qd_compose_insert_null(props); // user-id + qd_compose_insert_null(props); // to + qd_compose_insert_null(props); // subject + qd_compose_insert_string(props, adaptor->reply_to); // reply-to + /* + qd_compose_insert_null(props); // correlation-id + qd_compose_insert_null(props); // content-type + qd_compose_insert_null(props); // content-encoding + qd_compose_insert_timestamp(props, 0); // absolute-expiry-time + qd_compose_insert_timestamp(props, 0); // creation-time + qd_compose_insert_null(props); // group-id + qd_compose_insert_uint(props, 0); // group-sequence + qd_compose_insert_null(props); // reply-to-group-id + */ + qd_compose_end_list(props); + + qd_message_t *msg = qd_message(); + DEQ_INIT(buffers); + buf = qd_buffer(); + char *insert = (char*) qd_buffer_cursor(buf); + strcpy(insert, "Test Payload"); + qd_buffer_insert(buf, 13); + DEQ_INSERT_HEAD(buffers, buf); + + qd_message_compose_5(msg, props, 0, &buffers, true); + qd_compose_free(props); + + qdr_delivery_t *dlv = qdr_link_deliver(adaptor->out_link, msg, 0, false, 0, 0); + qdr_delivery_decref(adaptor->core, dlv, "release protection of return from deliver"); + } } @@ -208,16 +255,7 @@ static void on_startup(void *context) uint64_t link_id; qdr_terminus_t *dynamic_source = qdr_terminus(0); qdr_terminus_set_dynamic(dynamic_source); - qdr_terminus_t *target = qdr_terminus(0); - qdr_terminus_set_address(target, "echo-service"); - - adaptor->out_link = qdr_link_first_attach(adaptor->conn, - QD_INCOMING, - qdr_terminus(0), //qdr_terminus_t *source, - target, //qdr_terminus_t *target, - "ref.1", //const char *name, - 0, //const char *terminus_addr, - &link_id); + adaptor->in_link = qdr_link_first_attach(adaptor->conn, QD_OUTGOING, dynamic_source, //qdr_terminus_t *source, @@ -246,6 +284,7 @@ static void on_activate(void *context) void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context) { qdr_ref_adaptor_t *adaptor = NEW(qdr_ref_adaptor_t); + ZERO(adaptor); adaptor->core = core; adaptor->adaptor = qdr_protocol_adaptor(core, "reference", // name @@ -280,6 +319,7 @@ void qdr_ref_adaptor_final(void *adaptor_context) qdr_protocol_adaptor_free(adaptor->core, adaptor->adaptor); qd_timer_free(adaptor->startup_timer); qd_timer_free(adaptor->activate_timer); + free(adaptor->reply_to); free(adaptor); } diff --git a/src/message.c b/src/message.c index 2ce6890..81ba863 100644 --- a/src/message.c +++ b/src/message.c @@ -2179,6 +2179,33 @@ void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *field1, qd_com } +void qd_message_compose_5(qd_message_t *msg, + qd_composed_field_t *properties, + qd_composed_field_t *application_properties, + qd_buffer_list_t *body, + bool complete) +{ + qd_message_content_t *content = MSG_CONTENT(msg); + qd_buffer_list_t *properties_buffers = properties ? qd_compose_buffers(properties) : 0; + qd_buffer_list_t *application_properties_buffers = application_properties ? qd_compose_buffers(application_properties) : 0; + + DEQ_INIT(content->buffers); + if (properties_buffers) + DEQ_APPEND(content->buffers, (*properties_buffers)); + if (application_properties_buffers) + DEQ_APPEND(content->buffers, (*application_properties_buffers)); + + if (body) { + qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); + qd_compose_insert_binary_buffers(field, body); + DEQ_APPEND(content->buffers, (*qd_compose_buffers(field))); + qd_compose_free(field); + } + + content->receive_complete = complete; +} + + qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg) { return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org