This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit a307cf433ee35c1f6be53ab9eae0b57225b2125a Author: Ted Ross <tr...@apache.org> AuthorDate: Thu Jun 4 15:22:29 2020 -0400 Dataplane: Added calls in message.h for streaming putput from adaptors. Renamed qdr_deliver_continue* to qdr_delivery_continue* --- include/qpid/dispatch/message.h | 22 ++++++ src/adaptors/reference_adaptor.c | 141 ++++++++++++++++++++++++++++++--------- src/message.c | 16 +++++ src/router_core/connections.c | 4 +- src/router_core/delivery.c | 20 +++--- src/router_core/delivery.h | 4 +- src/router_node.c | 2 +- 7 files changed, 164 insertions(+), 45 deletions(-) diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 29b2335..0f63b1a 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -278,11 +278,27 @@ void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content); void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2); void qd_message_compose_4(qd_message_t *msg, qd_composed_field_t *content1, qd_composed_field_t *content2, qd_composed_field_t *content3); +/** + * Send a message with optional headers and an optional raw body with the option of starting + * a streaming transfer. + * + * @param msg The message being composed + * @param headers A composed field with 1 or more header sections (incl body performative) or NULL + * @param body A buffer list of raw body content or NULL + * @param complete True if the message is to be receive-complete. + * False if more content will arrive later. + */ void qd_message_compose_5(qd_message_t *msg, qd_composed_field_t *headers, qd_buffer_list_t *body, bool complete); +/** + * Extend the content of a streaming message with more buffers. + */ +void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers); + + /** Put string representation of a message suitable for logging in buffer. * @return buffer */ @@ -369,6 +385,12 @@ void qd_message_set_send_complete(qd_message_t *msg); /** + * Flag the message as being receive-complete. + */ +void qd_message_set_receive_complete(qd_message_t *msg); + + +/** * Returns true if the delivery tag has already been sent. */ bool qd_message_tag_sent(qd_message_t *msg); diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c index c7ca07c..f227542 100644 --- a/src/adaptors/reference_adaptor.c +++ b/src/adaptors/reference_adaptor.c @@ -25,17 +25,23 @@ #include <stdio.h> #include <inttypes.h> -static char *address = "examples"; +static char *address1 = "examples"; +static char *address2 = "stream"; typedef struct qdr_ref_adaptor_t { qdr_core_t *core; qdr_protocol_adaptor_t *adaptor; qd_timer_t *startup_timer; qd_timer_t *activate_timer; + qd_timer_t *stream_timer; qdr_connection_t *conn; - qdr_link_t *out_link; - qdr_link_t *in_link; + qdr_link_t *out_link_1; + qdr_link_t *out_link_2; + qdr_link_t *dynamic_in_link; char *reply_to; + qd_message_t *streaming_message; + qdr_delivery_t *streaming_delivery; + int stream_count; } qdr_ref_adaptor_t; @@ -84,25 +90,39 @@ static void qdr_ref_second_attach(void *context, qdr_link_t *link, printf("qdr_ref_second_attach: source=%s target=%s\n", fsource, ftarget); - if (link == adaptor->in_link) { - uint64_t link_id; - qdr_terminus_t *target = qdr_terminus(0); - - qdr_terminus_set_address(target, address); - - adaptor->out_link = qdr_link_first_attach(adaptor->conn, - QD_INCOMING, - qdr_terminus(0), //qdr_terminus_t *source, - target, //qdr_terminus_t *target, - "ref.1", //const char *name, - 0, //const char *terminus_addr, - &link_id); - - qdr_link_flow(adaptor->core, adaptor->out_link, 10, false); - + if (link == adaptor->dynamic_in_link) { + // + // The dynamic in-link has been attached. Get the reply-to address and open + // a couple of out-links. + // qd_iterator_t *reply_iter = qdr_terminus_get_address(source); adaptor->reply_to = (char*) qd_iterator_copy(reply_iter); printf("qdr_ref_second_attach: reply-to=%s\n", adaptor->reply_to); + + // + // Open an out-link for each address + // + uint64_t link_id; + qdr_terminus_t *target = qdr_terminus(0); + + qdr_terminus_set_address(target, address1); + adaptor->out_link_1 = qdr_link_first_attach(adaptor->conn, + QD_INCOMING, + qdr_terminus(0), //qdr_terminus_t *source, + target, //qdr_terminus_t *target, + "ref.1", //const char *name, + 0, //const char *terminus_addr, + &link_id); + + target = qdr_terminus(0); + qdr_terminus_set_address(target, address2); + adaptor->out_link_2 = qdr_link_first_attach(adaptor->conn, + QD_INCOMING, + qdr_terminus(0), //qdr_terminus_t *source, + target, //qdr_terminus_t *target, + "ref.2", //const char *name, + 0, //const char *terminus_addr, + &link_id); } } @@ -120,7 +140,7 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit) printf("qdr_ref_flow: %d credits issued\n", credit); - if (link == adaptor->out_link) { + if (link == adaptor->out_link_1) { qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0); qd_compose_start_list(props); qd_compose_insert_null(props); // message-id @@ -151,8 +171,34 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit) qd_message_compose_5(msg, props, &buffers, true); qd_compose_free(props); - qdr_delivery_t *dlv = qdr_link_deliver(adaptor->out_link, msg, 0, false, 0, 0); - qdr_delivery_decref(adaptor->core, dlv, "release protection of return from deliver"); + qdr_link_deliver(adaptor->out_link_1, msg, 0, false, 0, 0); + // Keep return-protection delivery reference as the adaptor's reference + } else if (link == adaptor->out_link_2) { + // + // Begin streaming a long message on the link. + // + qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0); + qd_compose_start_list(props); + qd_compose_insert_null(props); // message-id + qd_compose_insert_null(props); // user-id + qd_compose_insert_null(props); // to + qd_compose_insert_null(props); // subject + qd_compose_insert_string(props, adaptor->reply_to); // reply-to + qd_compose_end_list(props); + + props = qd_compose(QD_PERFORMATIVE_BODY_DATA, props); + + adaptor->streaming_message = qd_message(); + + qd_message_compose_5(adaptor->streaming_message, props, 0, false); + qd_compose_free(props); + + adaptor->streaming_delivery = + qdr_link_deliver(adaptor->out_link_2, adaptor->streaming_message, 0, false, 0, 0); + adaptor->stream_count = 0; + // Keep return-protection delivery reference as the adaptor's reference + + qd_timer_schedule(adaptor->stream_timer, 1000); } } @@ -210,7 +256,8 @@ static int qdr_ref_get_credit(void *context, qdr_link_t *link) static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t disp, bool settled) { - char *dispname; + qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; + char *dispname; switch (disp) { case PN_ACCEPTED: dispname = "ACCEPTED"; break; @@ -221,6 +268,9 @@ static void qdr_ref_delivery_update(void *context, qdr_delivery_t *dlv, uint64_t dispname = "<UNKNOWN>"; } printf("qdr_ref_delivery_update: disp=%s settled=%s\n", dispname, settled ? "true" : "false"); + + if (settled && qdr_delivery_link(dlv) == adaptor->out_link_1) + qdr_delivery_decref(adaptor->core, dlv, "qdr_ref_delivery_update - settled delivery"); } @@ -272,16 +322,20 @@ static void on_startup(void *context) 0); // bind_token uint64_t link_id; + + // + // Create a dynamic receiver + // qdr_terminus_t *dynamic_source = qdr_terminus(0); qdr_terminus_set_dynamic(dynamic_source); - adaptor->in_link = qdr_link_first_attach(adaptor->conn, - QD_OUTGOING, - dynamic_source, //qdr_terminus_t *source, - qdr_terminus(0), //qdr_terminus_t *target, - "ref.2", //const char *name, - 0, //const char *terminus_addr, - &link_id); + adaptor->dynamic_in_link = qdr_link_first_attach(adaptor->conn, + QD_OUTGOING, + dynamic_source, //qdr_terminus_t *source, + qdr_terminus(0), //qdr_terminus_t *target, + "ref.0", //const char *name, + 0, //const char *terminus_addr, + &link_id); } @@ -293,6 +347,32 @@ static void on_activate(void *context) } +static void on_stream(void *context) +{ + qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; + const char *content = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + const size_t content_length = strlen(content); + qd_buffer_t *buf = qd_buffer(); + qd_buffer_list_t buffers; + + DEQ_INIT(buffers); + DEQ_INSERT_TAIL(buffers, buf); + + memcpy(qd_buffer_cursor(buf), content, content_length); + qd_buffer_insert(buf, content_length); + qd_message_extend(adaptor->streaming_message, &buffers); + qdr_delivery_continue(adaptor->core, adaptor->streaming_delivery, false); + + if (adaptor->stream_count < 30) { + qd_timer_schedule(adaptor->stream_timer, 1000); + adaptor->stream_count++; + } else { + qd_message_set_receive_complete(adaptor->streaming_message); + adaptor->streaming_message = 0; + } +} + + /** * This initialization function will be invoked when the router core is ready for the protocol * adaptor to be created. This function must: @@ -329,6 +409,7 @@ void qdr_ref_adaptor_init(qdr_core_t *core, void **adaptor_context) qd_timer_schedule(adaptor->startup_timer, 0); adaptor->activate_timer = qd_timer(core->qd, on_activate, adaptor); + adaptor->stream_timer = qd_timer(core->qd, on_stream, adaptor); } diff --git a/src/message.c b/src/message.c index be0c7a8..87c559e 100644 --- a/src/message.c +++ b/src/message.c @@ -1232,6 +1232,15 @@ void qd_message_set_send_complete(qd_message_t *in_msg) } +void qd_message_set_receive_complete(qd_message_t *in_msg) +{ + if (!!in_msg) { + qd_message_content_t *content = MSG_CONTENT(in_msg); + content->receive_complete = true; + } +} + + bool qd_message_tag_sent(qd_message_t *in_msg) { if (!in_msg) @@ -2209,6 +2218,13 @@ void qd_message_compose_5(qd_message_t *msg, } +void qd_message_extend(qd_message_t *msg, qd_buffer_list_t *buffers) +{ + qd_message_content_t *content = MSG_CONTENT(msg); + DEQ_APPEND(content->buffers, (*buffers)); +} + + qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg) { return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress; diff --git a/src/router_core/connections.c b/src/router_core/connections.c index d6c7e40..88c4737 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -824,7 +824,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c if (!qdr_delivery_receive_complete(dlv)) { qdr_delivery_set_aborted(dlv, true); - qdr_deliver_continue_peers_CT(core, dlv, false); + qdr_delivery_continue_peers_CT(core, dlv, false); } if (dlv->multicast) { @@ -873,7 +873,7 @@ static void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *c if (!qdr_delivery_receive_complete(dlv)) { qdr_delivery_set_aborted(dlv, true); - qdr_deliver_continue_peers_CT(core, dlv, false); + qdr_delivery_continue_peers_CT(core, dlv, false); } peer = qdr_delivery_first_peer_CT(dlv); diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index 662ac2d..412683d 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -25,7 +25,7 @@ ALLOC_DEFINE(qdr_delivery_t); static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_delivery_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_delete_delivery_internal_CT(qdr_core_t *core, qdr_delivery_t *delivery); static bool qdr_delivery_anycast_update_CT(qdr_core_t *core, qdr_delivery_t *dlv, qdr_delivery_t *peer, uint64_t new_disp, bool settled, @@ -211,10 +211,10 @@ void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *deliver } -qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core,qdr_delivery_t *in_dlv, bool settled) +qdr_delivery_t *qdr_delivery_continue(qdr_core_t *core,qdr_delivery_t *in_dlv, bool settled) { - qdr_action_t *action = qdr_action(qdr_deliver_continue_CT, "deliver_continue"); + qdr_action_t *action = qdr_action(qdr_delivery_continue_CT, "delivery_continue"); action->args.delivery.delivery = in_dlv; qd_message_t *msg = qdr_delivery_message(in_dlv); @@ -222,7 +222,7 @@ qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core,qdr_delivery_t *in_dlv, bo action->args.delivery.presettled = settled; // This incref is for the action reference - qdr_delivery_incref(in_dlv, "qdr_deliver_continue - add to action list"); + qdr_delivery_incref(in_dlv, "qdr_delivery_continue - add to action list"); qdr_action_enqueue(core, action); return in_dlv; } @@ -1050,7 +1050,7 @@ static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool } -void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more) +void qdr_delivery_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more) { qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv); @@ -1100,7 +1100,7 @@ void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, boo } -static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_delivery_continue_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { if (discard) return; @@ -1123,7 +1123,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool // If it is already in the undelivered list, don't try to deliver this again. // if (!!link && in_dlv->where != QDR_DELIVERY_IN_UNDELIVERED) { - qdr_deliver_continue_peers_CT(core, in_dlv, more); + qdr_delivery_continue_peers_CT(core, in_dlv, more); qd_message_t *msg = qdr_delivery_message(in_dlv); @@ -1147,7 +1147,7 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool // We dont want to deal with such deliveries. // if (in_dlv->settled && in_dlv->where == QDR_DELIVERY_NOWHERE) { - qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from action 1"); + qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_continue_CT - remove from action 1"); return; } @@ -1179,13 +1179,13 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, bool DEQ_REMOVE(link->settled, in_dlv); // expect: action holds a ref to in_dlv, so it should not be freed here assert(sys_atomic_get(&in_dlv->ref_count) > 1); - qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from settled list"); + qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_continue_CT - remove from settled list"); } } } // This decref is for the action reference - qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove from action 2"); + qdr_delivery_decref_CT(core, in_dlv, "qdr_delivery_continue_CT - remove from action 2"); } diff --git a/src/router_core/delivery.h b/src/router_core/delivery.h index ce8fd60..7e50efb 100644 --- a/src/router_core/delivery.h +++ b/src/router_core/delivery.h @@ -124,7 +124,7 @@ void qdr_delivery_remote_state_updated(qdr_core_t *core, qdr_delivery_t *deliver bool settled, qdr_error_t *error, pn_data_t *ext_state, bool ref_given); /* invoked when incoming message data arrives - schedule core thread */ -qdr_delivery_t *qdr_deliver_continue(qdr_core_t *core, qdr_delivery_t *delivery, bool settled); +qdr_delivery_t *qdr_delivery_continue(qdr_core_t *core, qdr_delivery_t *delivery, bool settled); // @@ -153,7 +153,7 @@ qdr_delivery_t *qdr_delivery_first_peer_CT(qdr_delivery_t *dlv); qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t *dlv); /* schedules all peer deliveries with work for I/O processing */ -void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more); +void qdr_delivery_continue_peers_CT(qdr_core_t *core, qdr_delivery_t *in_dlv, bool more); /* update the links counters with respect to its delivery */ void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delivery); diff --git a/src/router_node.c b/src/router_node.c index 95f9619..bd60542 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -436,7 +436,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link) // if (delivery) { - qdr_deliver_continue(router->router_core, delivery, pn_delivery_settled(pnd)); + qdr_delivery_continue(router->router_core, delivery, pn_delivery_settled(pnd)); return next_delivery; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org