This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors-2 in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push: new cef0f5a DISPATCH-1742 - Added function for asynchronous completion of sent-messages. Improved API docs. DISPATCH-1742 - Updated reference adaptor to make multiple extend calls. DISPATCH-1804 - Changed body_data API to stream_data to encompass both body_data and footer performatives. cef0f5a is described below commit cef0f5a5ffae788b2692551775df9e4daba1305b Author: Ted Ross <tr...@apache.org> AuthorDate: Wed Oct 28 16:44:19 2020 -0400 DISPATCH-1742 - Added function for asynchronous completion of sent-messages. Improved API docs. DISPATCH-1742 - Updated reference adaptor to make multiple extend calls. DISPATCH-1804 - Changed body_data API to stream_data to encompass both body_data and footer performatives. --- include/qpid/dispatch/http1_codec.h | 12 +-- include/qpid/dispatch/message.h | 80 +++++++------- include/qpid/dispatch/protocol_adaptor.h | 37 +++++++ src/adaptors/http1/http1_adaptor.c | 20 ++-- src/adaptors/http1/http1_client.c | 32 +++--- src/adaptors/http1/http1_codec.c | 16 +-- src/adaptors/http1/http1_private.h | 8 +- src/adaptors/http1/http1_server.c | 28 ++--- src/adaptors/http2/http2_adaptor.c | 130 +++++++++++------------ src/adaptors/http2/http2_adaptor.h | 12 +-- src/adaptors/reference_adaptor.c | 94 +++++++++++------ src/adaptors/tcp_adaptor.c | 50 +++++---- src/message.c | 173 +++++++++++++++++-------------- src/message_private.h | 42 ++++---- src/router_core/transfer.c | 42 +++++++- tests/message_test.c | 103 +++++++++--------- 16 files changed, 498 insertions(+), 381 deletions(-) diff --git a/include/qpid/dispatch/http1_codec.h b/include/qpid/dispatch/http1_codec.h index 6967cc0..132f06c 100644 --- a/include/qpid/dispatch/http1_codec.h +++ b/include/qpid/dispatch/http1_codec.h @@ -87,12 +87,12 @@ typedef struct h1_codec_config_t { // void (*tx_buffers)(h1_codec_request_state_t *hrs, qd_buffer_list_t *data, unsigned int len); - // tx_body_data() - // Called with body_data containing encoded HTTP message data. Only + // tx_stream_data() + // Called with stream_data containing encoded HTTP message data. Only // called if the outgoing HTTP message has a body. The caller assumes - // ownership of the body_data and must release it when done. + // ownership of the stream_data and must release it when done. // - void (*tx_body_data)(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data); + void (*tx_stream_data)(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data); // // RX message callbacks @@ -227,9 +227,9 @@ int h1_codec_tx_response(h1_codec_request_state_t *hrs, int status_code, const c // int h1_codec_tx_add_header(h1_codec_request_state_t *hrs, const char *key, const char *value); -// Stream outgoing body data. Ownership of body_data is passed to the caller. +// Stream outgoing body data. Ownership of stream_data is passed to the caller. // -int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data); +int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data); // outgoing message construction complete. The request_complete() callback MAY // occur during this call. diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index b797eca..41ec73b 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -59,8 +59,8 @@ // Callback for status change (confirmed persistent, loaded-in-memory, etc.) -typedef struct qd_message_t qd_message_t; -typedef struct qd_message_body_data_t qd_message_body_data_t; +typedef struct qd_message_t qd_message_t; +typedef struct qd_message_stream_data_t qd_message_stream_data_t; /** Amount of message to be parsed. */ typedef enum { @@ -310,93 +310,93 @@ int qd_message_extend(qd_message_t *msg, qd_composed_field_t *field); /** - * qd_message_body_data_iterator + * qd_message_stream_data_iterator * * Return an iterator that references the content (not the performative headers) * of the entire body-data section. * * The returned iterator must eventually be freed by the caller. * - * @param body_data Pointer to a body_data object produced by qd_message_next_body_data - * @return Pointer to an iterator referencing the body_data content + * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data + * @return Pointer to an iterator referencing the stream_data content */ -qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data); +qd_iterator_t *qd_message_stream_data_iterator(const qd_message_stream_data_t *stream_data); /** - * qd_message_body_data_buffer_count + * qd_message_stream_data_buffer_count * * Return the number of buffers that are needed to hold this body-data's content. * - * @param body_data Pointer to a body_data object produced by qd_message_next_body_data - * @return Number of pn_raw_buffers needed to contain the entire content of this body_data. + * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data + * @return Number of pn_raw_buffers needed to contain the entire content of this stream_data. */ -int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data); +int qd_message_stream_data_buffer_count(const qd_message_stream_data_t *stream_data); /** - * qd_message_body_data_buffers + * qd_message_stream_data_buffers * - * Populate an array of pn_raw_buffer_t objects with references to the body_data's content. + * Populate an array of pn_raw_buffer_t objects with references to the stream_data's content. * - * @param body_data Pointer to a body_data object produced by qd_message_next_body_data + * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data * @param buffers Pointer to an array of pn_raw_buffer_t objects - * @param offset The offset (in the body_data's buffer set) from which copying should begin + * @param offset The offset (in the stream_data's buffer set) from which copying should begin * @param count The number of pn_raw_buffer_t objects in the buffers array * @return The number of pn_raw_buffer_t objects that were overwritten */ -int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count); +int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw_buffer_t *buffers, int offset, int count); /** - * qd_message_body_data_payload_length + * qd_message_stream_data_payload_length * - * Given a body_data object, return the length of the payload. - * This will equal the sum of the length of all qd_buffer_t objects contained in payload portion of the body_data object + * Given a stream_data object, return the length of the payload. + * This will equal the sum of the length of all qd_buffer_t objects contained in payload portion of the stream_data object * - * @param body_data Pointer to a body_data object produced by qd_message_next_body_data + * @param stream_data Pointer to a stream_data object produced by qd_message_next_stream_data * @return The length of the payload of the passed in body data object. */ -size_t qd_message_body_data_payload_length(const qd_message_body_data_t *body_data); +size_t qd_message_stream_data_payload_length(const qd_message_stream_data_t *stream_data); /** - * qd_message_body_data_release + * qd_message_stream_data_release * * Release buffers that were associated with a body-data section. It is not required that body-data * objects be released in the same order in which they were offered. * - * Once this function is called, the caller must drop its reference to the body_data object + * Once this function is called, the caller must drop its reference to the stream_data object * and not use it again. * - * @param body_data Pointer to a body data object returned by qd_message_next_body_data + * @param stream_data Pointer to a body data object returned by qd_message_next_stream_data */ -void qd_message_body_data_release(qd_message_body_data_t *body_data); +void qd_message_stream_data_release(qd_message_stream_data_t *stream_data); typedef enum { - QD_MESSAGE_BODY_DATA_OK, // A valid body data object have been returned - QD_MESSAGE_BODY_DATA_INCOMPLETE, // The next body data is incomplete, try again later - QD_MESSAGE_BODY_DATA_NO_MORE, // There are no more body data objects in this stream - QD_MESSAGE_BODY_DATA_INVALID, // The next body data is invalid, the stream is corrupted - QD_MESSAGE_BODY_DATA_NOT_DATA // The body of the message is not a DATA segment -} qd_message_body_data_result_t; + QD_MESSAGE_STREAM_DATA_BODY_OK, // A valid body data object has been returned + QD_MESSAGE_STREAM_DATA_FOOTER_OK, // A valid footer has been returned + QD_MESSAGE_STREAM_DATA_INCOMPLETE, // The next body data is incomplete, try again later + QD_MESSAGE_STREAM_DATA_NO_MORE, // There are no more body data objects in this stream + QD_MESSAGE_STREAM_DATA_INVALID // The next body data is invalid, the stream is corrupted +} qd_message_stream_data_result_t; /** - * qd_message_next_body_data + * qd_message_next_stream_data * * Get the next body-data section from this streaming message return the result and - * possibly the valid, completed body_data object. + * possibly the valid, completed stream_data object. * * @param msg Pointer to a message - * @param body_data Output pointer to a body_data object (or 0 if not OK) - * @return The body_data_result describing the result of this operation + * @param stream_data Output pointer to a stream_data object (or 0 if not OK) + * @return The stream_data_result describing the result of this operation */ -qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *msg, qd_message_body_data_t **body_data); +qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *msg, qd_message_stream_data_t **stream_data); /** - * qd_message_body_data_append + * qd_message_stream_data_append * * Append the buffers in data as a sequence of one or more BODY_DATA sections * to the given message. The buffers in data are moved into the message @@ -406,7 +406,7 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *msg, qd_me * @param data List of buffers containing body data. * @return The number of buffers stored in the message's content */ -int qd_message_body_data_append(qd_message_t *msg, qd_buffer_list_t *data); +int qd_message_stream_data_append(qd_message_t *msg, qd_buffer_list_t *data); /** Put string representation of a message suitable for logging in buffer. @@ -424,7 +424,7 @@ qd_log_source_t* qd_message_log_source(); * @param msg A pointer to the message * @return the parsed field */ -qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg); +qd_parsed_field_t *qd_message_get_ingress(qd_message_t *msg); /** * Accessor for message field phase @@ -432,7 +432,7 @@ qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg); * @param msg A pointer to the message * @return the parsed field */ -qd_parsed_field_t *qd_message_get_phase (qd_message_t *msg); +qd_parsed_field_t *qd_message_get_phase(qd_message_t *msg); /** * Accessor for message field to_override @@ -448,7 +448,7 @@ qd_parsed_field_t *qd_message_get_to_override(qd_message_t *msg); * @param msg A pointer to the message * @return the parsed field */ -qd_parsed_field_t *qd_message_get_trace (qd_message_t *msg); +qd_parsed_field_t *qd_message_get_trace(qd_message_t *msg); /** * Accessor for message field phase diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h index 29c7a8b..909b860 100644 --- a/include/qpid/dispatch/protocol_adaptor.h +++ b/include/qpid/dispatch/protocol_adaptor.h @@ -857,8 +857,45 @@ qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t * const uint8_t *tag, int tag_length, uint64_t remote_disposition, pn_data_t *remote_extension_state); + +/** + * qdr_link_process_deliveries + * + * This function is called by the protocol adaptor in the context of the link_push + * callback. It provides the core module access to the IO thread so the core can + * deliver outgoing messages to the adaptor. + * + * @param core Pointer to the router core object + * @param link Pointer to the link being processed + * @param credit The maximum number of deliveries to be processed on this link + * @return The number of deliveries that were completed during the processing + */ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit); + +/** + * qdr_link_complete_sent_message + * + * If an outgoing message is completed outside of the context of the link_deliver callback, + * this function must be called to inform the router core that the delivery on the head of + * the link's undelivered list can be moved out of that list. Ensure that the send-complete + * status of the message has been set before calling this function. This function will check + * the send-complete status of the head delivery on the link's undelivered list. If it is + * true, that delivery will be removed from the undelivered list. + * + * DO NOT call this function from within the link_deliver callback. Use it only if you must + * asynchronously complete the sending of the current message. + * + * This will typically occur when a message delivered to the protcol adaptor cannot be sent + * on the wire due to back-pressure. In this case, the removal of the back pressure is the + * stimulus for completing the send of the message. + * + * @param core Pointer to the router core object + * @param link Pointer to the link on which the head delivery has been completed + */ +void qdr_link_complete_sent_message(qdr_core_t *core, qdr_link_t *link); + + void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mode); /** diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c index 2207478..eb8349a 100644 --- a/src/adaptors/http1/http1_adaptor.c +++ b/src/adaptors/http1/http1_adaptor.c @@ -138,8 +138,8 @@ void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data) qdr_http1_out_data_t *od = DEQ_HEAD(out_data->fifo); while (od) { DEQ_REMOVE_HEAD(out_data->fifo); - if (od->body_data) - qd_message_body_data_release(od->body_data); + if (od->stream_data) + qd_message_stream_data_release(od->stream_data); else qd_buffer_list_free_buffers(&od->raw_buffers); free_qdr_http1_out_data_t(od); @@ -265,9 +265,9 @@ uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_d size_t limit = MIN(RAW_BUFFER_BATCH, od_len); int written = 0; - if (od->body_data) { // buffers stored in qd_message_t + if (od->stream_data) { // buffers stored in qd_message_t - written = qd_message_body_data_buffers(od->body_data, buffers, od->next_buffer, limit); + written = qd_message_stream_data_buffers(od->stream_data, buffers, od->next_buffer, limit); for (int i = 0; i < written; ++i) { // enforce this: we expect the context can be used by the adaptor! assert(buffers[i].context == 0); @@ -352,14 +352,14 @@ void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_li // The HTTP encoder has a message body data to be written to the raw connection. // Queue it to the outgoing data fifo. // -void qdr_http1_enqueue_body_data(qdr_http1_out_data_fifo_t *fifo, qd_message_body_data_t *body_data) +void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, qd_message_stream_data_t *stream_data) { - int count = qd_message_body_data_buffer_count(body_data); + int count = qd_message_stream_data_buffer_count(stream_data); if (count) { qdr_http1_out_data_t *od = new_qdr_http1_out_data_t(); ZERO(od); od->owning_fifo = fifo; - od->body_data = body_data; + od->stream_data = stream_data; od->buffer_count = count; DEQ_INSERT_TAIL(fifo->fifo, od); @@ -367,7 +367,7 @@ void qdr_http1_enqueue_body_data(qdr_http1_out_data_fifo_t *fifo, qd_message_bod fifo->write_ptr = od; } else { // empty body-data - qd_message_body_data_release(body_data); + qd_message_stream_data_release(stream_data); } } @@ -401,8 +401,8 @@ void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn) // all buffers returned qdr_http1_out_data_fifo_t *fifo = od->owning_fifo; DEQ_REMOVE(fifo->fifo, od); - if (od->body_data) - qd_message_body_data_release(od->body_data); + if (od->stream_data) + qd_message_stream_data_release(od->stream_data); else qd_buffer_list_free_buffers(&od->raw_buffers); free_qdr_http1_out_data_t(od); diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c index 708d2a9..0a82599 100644 --- a/src/adaptors/http1/http1_client.c +++ b/src/adaptors/http1/http1_client.c @@ -86,7 +86,7 @@ ALLOC_DEFINE(_client_request_t); static void _client_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, qd_buffer_list_t *blist, unsigned int len); -static void _client_tx_body_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_body_data_t *body_data); +static void _client_tx_stream_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_stream_data_t *stream_data); static int _client_rx_request_cb(h1_codec_request_state_t *lib_rs, const char *method, const char *target, @@ -133,7 +133,7 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li) h1_codec_config_t config = {0}; config.type = HTTP1_CONN_CLIENT; config.tx_buffers = _client_tx_buffers_cb; - config.tx_body_data = _client_tx_body_data_cb; + config.tx_stream_data = _client_tx_stream_data_cb; config.rx_request = _client_rx_request_cb; config.rx_response = _client_rx_response_cb; config.rx_header = _client_rx_header_cb; @@ -554,9 +554,9 @@ static void _client_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_ } -// Encoder callback: send body_data buffers (response msg) to client endpoint +// Encoder callback: send stream_data buffers (response msg) to client endpoint // -static void _client_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data) +static void _client_tx_stream_data_cb(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data) { _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs); qdr_http1_connection_t *hconn = hreq->base.hconn; @@ -565,7 +565,7 @@ static void _client_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_bo // client connection has been lost qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, "[C%"PRIu64"] Discarding outgoing data - client connection closed", hconn->conn_id); - qd_message_body_data_release(body_data); + qd_message_stream_data_release(stream_data); return; } @@ -578,7 +578,7 @@ static void _client_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_bo _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses); assert(rmsg); - qdr_http1_enqueue_body_data(&rmsg->out_data, body_data); + qdr_http1_enqueue_stream_data(&rmsg->out_data, stream_data); // if this happens to be the current outgoing response try writing to the // raw connection @@ -781,7 +781,7 @@ static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b "[C%"PRIu64"][L%"PRIu64"] HTTP request body received len=%zu.", hconn->conn_id, hconn->in_link_id, len); - qd_message_body_data_append(msg, body); + qd_message_stream_data_append(msg, body); // // Notify the router that more data is ready to be pushed out on the delivery @@ -1119,18 +1119,18 @@ static uint64_t _encode_response_message(_client_request_t *hreq, } } - qd_message_body_data_t *body_data = 0; + qd_message_stream_data_t *stream_data = 0; while (true) { - switch (qd_message_next_body_data(msg, &body_data)) { + switch (qd_message_next_stream_data(msg, &stream_data)) { - case QD_MESSAGE_BODY_DATA_OK: + case QD_MESSAGE_STREAM_DATA_BODY_OK: qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"][L%"PRIu64"] Encoding response body data", hconn->conn_id, hconn->out_link_id); - if (h1_codec_tx_body(hreq->base.lib_rs, body_data)) { + if (h1_codec_tx_body(hreq->base.lib_rs, stream_data)) { qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, "[C%"PRIu64"][L%"PRIu64"] body data encode failed", hconn->conn_id, hconn->out_link_id); @@ -1138,18 +1138,20 @@ static uint64_t _encode_response_message(_client_request_t *hreq, } break; - case QD_MESSAGE_BODY_DATA_NO_MORE: + case QD_MESSAGE_STREAM_DATA_FOOTER_OK: + break; + + case QD_MESSAGE_STREAM_DATA_NO_MORE: // indicate this message is complete qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] response message encoding completed", hconn->conn_id, hconn->out_link_id); return PN_ACCEPTED; - case QD_MESSAGE_BODY_DATA_INCOMPLETE: + case QD_MESSAGE_STREAM_DATA_INCOMPLETE: return 0; // wait for more - case QD_MESSAGE_BODY_DATA_INVALID: - case QD_MESSAGE_BODY_DATA_NOT_DATA: + case QD_MESSAGE_STREAM_DATA_INVALID: qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, "[C%"PRIu64"][L%"PRIu64"] Rejecting corrupted body data.", hconn->conn_id, hconn->out_link_id); diff --git a/src/adaptors/http1/http1_codec.c b/src/adaptors/http1/http1_codec.c index 93bc73a..5cb16b3 100644 --- a/src/adaptors/http1/http1_codec.c +++ b/src/adaptors/http1/http1_codec.c @@ -966,7 +966,7 @@ static bool parse_header(h1_codec_connection_t *conn, struct decoder_t *decoder) // Pass message body data up to the application. // -static inline int consume_body_data(h1_codec_connection_t *conn, bool flush) +static inline int consume_stream_data(h1_codec_connection_t *conn, bool flush) { struct decoder_t *decoder = &conn->decoder; qd_iterator_pointer_t *body_ptr = &decoder->body_ptr; @@ -1092,7 +1092,7 @@ static bool parse_body_chunked_data(h1_codec_connection_t *conn, struct decoder_ decoder->chunk_length -= skipped; body_ptr->remaining += skipped; - consume_body_data(conn, false); + consume_stream_data(conn, false); if (decoder->chunk_length == 0) { // end of chunk @@ -1117,7 +1117,7 @@ static bool parse_body_chunked_trailer(h1_codec_connection_t *conn, struct decod body_ptr->remaining += line.remaining; if (is_empty_line(&line)) { // end of message - consume_body_data(conn, true); + consume_stream_data(conn, true); decoder->state = HTTP1_MSG_STATE_DONE; } @@ -1164,7 +1164,7 @@ static bool parse_body_content(h1_codec_connection_t *conn, struct decoder_t *de body_ptr->remaining += skipped; bool eom = decoder->content_length == 0; - consume_body_data(conn, eom); + consume_stream_data(conn, eom); if (eom) decoder->state = HTTP1_MSG_STATE_DONE; @@ -1188,7 +1188,7 @@ static bool parse_body(h1_codec_connection_t *conn, struct decoder_t *decoder) decoder->read_ptr.remaining = 0; decoder->read_ptr.buffer = DEQ_TAIL(decoder->incoming); decoder->read_ptr.cursor = qd_buffer_cursor(decoder->read_ptr.buffer); - consume_body_data(conn, true); + consume_stream_data(conn, true); decoder->body_ptr = decoder->read_ptr = NULL_I_PTR; DEQ_INIT(decoder->incoming); } @@ -1489,7 +1489,7 @@ static inline void _flush_headers(h1_codec_request_state_t *hrs, struct encoder_ // just forward the body chain along -int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data) +int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data) { h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs); struct encoder_t *encoder = &conn->encoder; @@ -1498,8 +1498,8 @@ int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_body_data_t *body _flush_headers(hrs, encoder); // skip the outgoing queue and send directly - hrs->out_octets += qd_message_body_data_payload_length(body_data); - conn->config.tx_body_data(hrs, body_data); + hrs->out_octets += qd_message_stream_data_payload_length(stream_data); + conn->config.tx_stream_data(hrs, stream_data); return 0; } diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h index d0d2306..593803d 100644 --- a/src/adaptors/http1/http1_private.h +++ b/src/adaptors/http1/http1_private.h @@ -60,10 +60,10 @@ extern qdr_http1_adaptor_t *qdr_http1_adaptor; // Data to be written out the raw connection. // // This adaptor has to cope with two different data sources: the HTTP1 encoder -// and the qd_message_body_data_t list. The HTTP1 encoder produces a simple +// and the qd_message_stream_data_t list. The HTTP1 encoder produces a simple // qd_buffer_list_t for outgoing header data whose ownership is given to the // adaptor: the adaptor is free to deque/free these buffers as needed. The -// qd_message_body_data_t buffers are shared with the owning message and the +// qd_message_stream_data_t buffers are shared with the owning message and the // buffer list must not be modified by the adaptor. The qdr_http1_out_data_t // is used to manage both types of data sources. // @@ -76,7 +76,7 @@ struct qdr_http1_out_data_t { // or a message body data (not both!) qd_buffer_list_t raw_buffers; - qd_message_body_data_t *body_data; + qd_message_stream_data_t *stream_data; int buffer_count; // # total buffers int next_buffer; // offset to next buffer to send @@ -207,7 +207,7 @@ ALLOC_DECLARE(qdr_http1_connection_t); void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn); void qdr_http1_enqueue_buffer_list(qdr_http1_out_data_fifo_t *fifo, qd_buffer_list_t *blist); -void qdr_http1_enqueue_body_data(qdr_http1_out_data_fifo_t *fifo, qd_message_body_data_t *body_data); +void qdr_http1_enqueue_stream_data(qdr_http1_out_data_fifo_t *fifo, qd_message_stream_data_t *stream_data); uint64_t qdr_http1_write_out_data(qdr_http1_connection_t *hconn, qdr_http1_out_data_fifo_t *fifo); void qdr_http1_out_data_fifo_cleanup(qdr_http1_out_data_fifo_t *out_data); // return the number of buffers currently held by the proactor for writing diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c index 21e3e38..98af72a 100644 --- a/src/adaptors/http1/http1_server.c +++ b/src/adaptors/http1/http1_server.c @@ -95,7 +95,7 @@ ALLOC_DEFINE(_server_request_t); #define MAX_RECONNECT 5 // 5 * 500 = 2.5 sec static void _server_tx_buffers_cb(h1_codec_request_state_t *lib_hrs, qd_buffer_list_t *blist, unsigned int len); -static void _server_tx_body_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_body_data_t *body_data); +static void _server_tx_stream_data_cb(h1_codec_request_state_t *lib_hrs, qd_message_stream_data_t *stream_data); static int _server_rx_request_cb(h1_codec_request_state_t *hrs, const char *method, const char *target, @@ -321,7 +321,7 @@ static void _setup_server_links(qdr_http1_connection_t *hconn) h1_codec_config_t config = {0}; config.type = HTTP1_CONN_SERVER; config.tx_buffers = _server_tx_buffers_cb; - config.tx_body_data = _server_tx_body_data_cb; + config.tx_stream_data = _server_tx_stream_data_cb; config.rx_request = _server_rx_request_cb; config.rx_response = _server_rx_response_cb; config.rx_header = _server_rx_header_cb; @@ -684,7 +684,7 @@ static void _server_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_ // Encoder has body data to send to the server // -static void _server_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_body_data_t *body_data) +static void _server_tx_stream_data_cb(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data) { _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs); qdr_http1_connection_t *hconn = hreq->base.hconn; @@ -692,7 +692,7 @@ static void _server_tx_body_data_cb(h1_codec_request_state_t *hrs, qd_message_bo qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"][L%"PRIu64"] Sending body data to server", hconn->conn_id, hconn->out_link_id); - qdr_http1_enqueue_body_data(&hreq->out_data, body_data); + qdr_http1_enqueue_stream_data(&hreq->out_data, stream_data); if (hreq == (_server_request_t*) DEQ_HEAD(hconn->requests) && hconn->raw_conn) { _write_pending_request(hreq); } @@ -883,7 +883,7 @@ static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b "[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.", hconn->conn_id, hconn->in_link_id, len); - qd_message_body_data_append(msg, body); + qd_message_stream_data_append(msg, body); // // Notify the router that more data is ready to be pushed out on the delivery @@ -1246,17 +1246,17 @@ static uint64_t _encode_request_message(_server_request_t *hreq) hreq->headers_encoded = true; } - qd_message_body_data_t *body_data = 0; + qd_message_stream_data_t *stream_data = 0; while (true) { - switch (qd_message_next_body_data(msg, &body_data)) { - case QD_MESSAGE_BODY_DATA_OK: { + switch (qd_message_next_stream_data(msg, &stream_data)) { + case QD_MESSAGE_STREAM_DATA_BODY_OK: { qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"][L%"PRIu64"] Encoding request body data", hconn->conn_id, hconn->out_link_id); - if (h1_codec_tx_body(hreq->base.lib_rs, body_data)) { + if (h1_codec_tx_body(hreq->base.lib_rs, stream_data)) { qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, "[C%"PRIu64"][L%"PRIu64"] body data encode failed", hconn->conn_id, hconn->out_link_id); @@ -1265,21 +1265,23 @@ static uint64_t _encode_request_message(_server_request_t *hreq) break; } - case QD_MESSAGE_BODY_DATA_NO_MORE: + case QD_MESSAGE_STREAM_DATA_FOOTER_OK: + break; + + case QD_MESSAGE_STREAM_DATA_NO_MORE: // indicate this message is complete qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] request message encoding completed", hconn->conn_id, hconn->out_link_id); return PN_ACCEPTED; - case QD_MESSAGE_BODY_DATA_INCOMPLETE: + case QD_MESSAGE_STREAM_DATA_INCOMPLETE: qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] body data need more", hconn->conn_id, hconn->out_link_id); return 0; // wait for more - case QD_MESSAGE_BODY_DATA_INVALID: - case QD_MESSAGE_BODY_DATA_NOT_DATA: + case QD_MESSAGE_STREAM_DATA_INVALID: qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, "[C%"PRIu64"][L%"PRIu64"] Rejecting corrupted body data.", hconn->conn_id, hconn->out_link_id); diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c index 8618b81..996e897 100644 --- a/src/adaptors/http2/http2_adaptor.c +++ b/src/adaptors/http2/http2_adaptor.c @@ -405,7 +405,7 @@ static int on_data_chunk_recv_callback(nghttp2_session *session, qd_buffer_list_append(&buffers, (uint8_t *)data, len); if (stream_data->in_dlv) { - qd_message_body_data_append(stream_data->message, &buffers); + qd_message_stream_data_append(stream_data->message, &buffers); } else { if (!stream_data->body) { @@ -463,7 +463,7 @@ static int snd_data_callback(nghttp2_session *session, int bytes_sent = 0; // This should not include the header length of 9. if (length) { pn_raw_buffer_t pn_raw_buffs[stream_data->qd_buffers_to_send]; - qd_message_body_data_buffers(stream_data->curr_body_data, pn_raw_buffs, 0, stream_data->qd_buffers_to_send); + qd_message_stream_data_buffers(stream_data->curr_stream_data, pn_raw_buffs, 0, stream_data->qd_buffers_to_send); int idx = 0; while (idx < stream_data->qd_buffers_to_send) { @@ -481,16 +481,16 @@ static int snd_data_callback(nghttp2_session *session, // bytes_sent += bytes_remaining; // qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback memcpy bytes_remaining=%i", conn->conn_id, stream_data->stream_id, bytes_remaining); // } - stream_data->curr_body_data_qd_buff_offset += 1; + stream_data->curr_stream_data_qd_buff_offset += 1; } idx += 1; } } if (stream_data->full_payload_handled) { - qd_message_body_data_release(stream_data->curr_body_data); - stream_data->curr_body_data = 0; - stream_data->curr_body_data_qd_buff_offset = 0; - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, qd_message_body_data_release", conn->conn_id, stream_data->stream_id); + qd_message_stream_data_release(stream_data->curr_stream_data); + stream_data->curr_stream_data = 0; + stream_data->curr_stream_data_qd_buff_offset = 0; + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] snd_data_callback, full_payload_handled, qd_message_stream_data_release", conn->conn_id, stream_data->stream_id); } qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] HTTP2 snd_data_callback finished, length=%zu, bytes_sent=%i, stream_data=%p", conn->conn_id, stream_data->stream_id, length, bytes_sent, (void *)stream_data); @@ -935,21 +935,21 @@ ssize_t read_data_callback(nghttp2_session *session, // qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_DEPTH_OK", conn->conn_id, stream_data->stream_id); - if (stream_data->next_body_data) { - stream_data->curr_body_data = stream_data->next_body_data; - stream_data->curr_body_data_result = stream_data->next_body_data_result; - stream_data->next_body_data = 0; - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback Use next_body_data", conn->conn_id, stream_data->stream_id); + if (stream_data->next_stream_data) { + stream_data->curr_stream_data = stream_data->next_stream_data; + stream_data->curr_stream_data_result = stream_data->next_stream_data_result; + stream_data->next_stream_data = 0; + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback Use next_stream_data", conn->conn_id, stream_data->stream_id); } - if (!stream_data->curr_body_data) { - stream_data->curr_body_data_result = qd_message_next_body_data(message, &stream_data->curr_body_data); - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback No body data, get qd_message_next_body_data", conn->conn_id, stream_data->stream_id); + if (!stream_data->curr_stream_data) { + stream_data->curr_stream_data_result = qd_message_next_stream_data(message, &stream_data->curr_stream_data); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback No body data, get qd_message_next_stream_data", conn->conn_id, stream_data->stream_id); } - switch (stream_data->curr_body_data_result) { - case QD_MESSAGE_BODY_DATA_OK: { + switch (stream_data->curr_stream_data_result) { + case QD_MESSAGE_STREAM_DATA_BODY_OK: { // // We have a new valid body-data segment. Handle it // @@ -966,29 +966,29 @@ ssize_t read_data_callback(nghttp2_session *session, } // total length of the payload (across all qd_buffers in the current body data) - size_t payload_length = qd_message_body_data_payload_length(stream_data->curr_body_data); + size_t payload_length = qd_message_stream_data_payload_length(stream_data->curr_stream_data); if (payload_length == 0) { // - // Current body data has payload length zero. Release the curr_body_data + // Current body data has payload length zero. Release the curr_stream_data // qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, payload_length=0", conn->conn_id, stream_data->stream_id); - qd_message_body_data_release(stream_data->curr_body_data); - stream_data->curr_body_data = 0; + qd_message_stream_data_release(stream_data->curr_stream_data); + stream_data->curr_stream_data = 0; - // The payload length is zero on this body data. Look ahead one body data to see if it is QD_MESSAGE_BODY_DATA_NO_MORE - stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data); - if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) { + // The payload length is zero on this body data. Look ahead one body data to see if it is QD_MESSAGE_STREAM_DATA_NO_MORE + stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data); + if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) { *data_flags |= NGHTTP2_DATA_FLAG_EOF; stream_data->out_msg_body_sent = true; stream_data->full_payload_handled = true; - qd_message_body_data_release(stream_data->next_body_data); - stream_data->next_body_data = 0; + qd_message_stream_data_release(stream_data->next_stream_data); + stream_data->next_stream_data = 0; stream_data->out_dlv_local_disposition = PN_ACCEPTED; if ((*data_flags & NGHTTP2_DATA_FLAG_EOF) && conn->ingress) { _http_record_request(conn, stream_data); } - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, payload_length=0 and next_body_data=QD_MESSAGE_BODY_DATA_NO_MORE", conn->conn_id, stream_data->stream_id); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, payload_length=0 and next_stream_data=QD_MESSAGE_STREAM_DATA_NO_MORE", conn->conn_id, stream_data->stream_id); return 0; } @@ -1002,35 +1002,35 @@ ssize_t read_data_callback(nghttp2_session *session, return NGHTTP2_ERR_DEFERRED; } - stream_data->body_data_buff_count = qd_message_body_data_buffer_count(stream_data->curr_body_data); - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, stream_data->body_data_buff_count=%i, payload_length=%zu\n", conn->conn_id, stream_data->stream_id, stream_data->body_data_buff_count, payload_length); + stream_data->stream_data_buff_count = qd_message_stream_data_buffer_count(stream_data->curr_stream_data); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, stream_data->stream_data_buff_count=%i, payload_length=%zu\n", conn->conn_id, stream_data->stream_id, stream_data->stream_data_buff_count, payload_length); size_t bytes_to_send = 0; if (payload_length) { - size_t remaining_payload_length = payload_length - (stream_data->curr_body_data_qd_buff_offset * BUFFER_SIZE); + size_t remaining_payload_length = payload_length - (stream_data->curr_stream_data_qd_buff_offset * BUFFER_SIZE); if (remaining_payload_length <= QD_HTTP2_BUFFER_SIZE) { bytes_to_send = remaining_payload_length; - stream_data->qd_buffers_to_send = stream_data->body_data_buff_count; + stream_data->qd_buffers_to_send = stream_data->stream_data_buff_count; stream_data->full_payload_handled = true; qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback remaining_payload_length (%zu) <= QD_HTTP2_BUFFER_SIZE(16384), bytes_to_send=%zu, stream_data->qd_buffers_to_send=%zu", conn->conn_id, stream_data->stream_id, remaining_payload_length, bytes_to_send, stream_data->qd_buffers_to_send); // Look ahead one body data - stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data); - if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) { + stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data); + if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) { *data_flags |= NGHTTP2_DATA_FLAG_EOF; stream_data->out_msg_body_sent = true; - qd_message_body_data_release(stream_data->next_body_data); - stream_data->next_body_data = 0; + qd_message_stream_data_release(stream_data->next_stream_data); + stream_data->next_stream_data = 0; stream_data->out_dlv_local_disposition = PN_ACCEPTED; - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_BODY_DATA_NO_MORE", conn->conn_id, stream_data->stream_id); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_NO_MORE", conn->conn_id, stream_data->stream_id); } - else if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) { - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_BODY_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id); + else if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_INCOMPLETE) { + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id); } - else if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_OK) { - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_BODY_DATA_OK", conn->conn_id, stream_data->stream_id); + else if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) { + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback, looking ahead one body data QD_MESSAGE_STREAM_DATA_OK", conn->conn_id, stream_data->stream_id); } } @@ -1050,15 +1050,18 @@ ssize_t read_data_callback(nghttp2_session *session, return bytes_to_send; } - case QD_MESSAGE_BODY_DATA_INCOMPLETE: + case QD_MESSAGE_STREAM_DATA_FOOTER_OK: + break; + + case QD_MESSAGE_STREAM_DATA_INCOMPLETE: // // A new segment has not completely arrived yet. Check again later. // - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_INCOMPLETE", conn->conn_id, stream_data->stream_id); stream_data->out_dlv_local_disposition = 0; return NGHTTP2_ERR_DEFERRED; - case QD_MESSAGE_BODY_DATA_NO_MORE: { + case QD_MESSAGE_STREAM_DATA_NO_MORE: { // // We have already handled the last body-data segment for this delivery. // Complete the "sending" of this delivery and replenish credit. @@ -1066,7 +1069,7 @@ ssize_t read_data_callback(nghttp2_session *session, size_t pn_buffs_write_capacity = pn_raw_connection_write_buffers_capacity(conn->pn_raw_conn); if (pn_buffs_write_capacity == 0) { stream_data->out_dlv_local_disposition = 0; - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_NO_MORE - pn_buffs_write_capacity=0 send is not complete", conn->conn_id, stream_data->stream_id); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_NO_MORE - pn_buffs_write_capacity=0 send is not complete", conn->conn_id, stream_data->stream_id); return NGHTTP2_ERR_DEFERRED; } else { @@ -1075,32 +1078,21 @@ ssize_t read_data_callback(nghttp2_session *session, stream_data->full_payload_handled = true; stream_data->out_msg_body_sent = true; stream_data->out_dlv_local_disposition = PN_ACCEPTED; - qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_NO_MORE - stream_data->out_dlv_local_disposition = PN_ACCEPTED - send_complete=true, setting NGHTTP2_DATA_FLAG_EOF", conn->conn_id, stream_data->stream_id); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_NO_MORE - stream_data->out_dlv_local_disposition = PN_ACCEPTED - send_complete=true, setting NGHTTP2_DATA_FLAG_EOF", conn->conn_id, stream_data->stream_id); } break; } - case QD_MESSAGE_BODY_DATA_INVALID: + case QD_MESSAGE_STREAM_DATA_INVALID: // // The body-data is corrupt in some way. Stop handling the delivery and reject it. // *data_flags |= NGHTTP2_DATA_FLAG_EOF; - qd_message_body_data_release(stream_data->curr_body_data); - stream_data->curr_body_data = 0; - stream_data->out_dlv_local_disposition = PN_REJECTED; - qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_INVALID", conn->conn_id, stream_data->stream_id); - break; - - case QD_MESSAGE_BODY_DATA_NOT_DATA: - // - // Valid data was seen, but it is not a body-data performative. Reject the delivery. - // - *data_flags |= NGHTTP2_DATA_FLAG_EOF; - qd_message_body_data_release(stream_data->curr_body_data); - stream_data->curr_body_data = 0; + qd_message_stream_data_release(stream_data->curr_stream_data); + stream_data->curr_stream_data = 0; stream_data->out_dlv_local_disposition = PN_REJECTED; - qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_BODY_DATA_NOT_DATA", conn->conn_id, stream_data->stream_id); + qd_log(http2_adaptor->protocol_log_source, QD_LOG_ERROR, "[C%"PRIu64"][S%"PRId32"] read_data_callback QD_MESSAGE_STREAM_DATA_INVALID", conn->conn_id, stream_data->stream_id); break; } break; @@ -1418,16 +1410,16 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data) create_settings_frame(conn); uint8_t flags = 0; - stream_data->curr_body_data_result = qd_message_next_body_data(message, &stream_data->curr_body_data); - if (stream_data->curr_body_data_result == QD_MESSAGE_BODY_DATA_OK) { - size_t payload_length = qd_message_body_data_payload_length(stream_data->curr_body_data); + stream_data->curr_stream_data_result = qd_message_next_stream_data(message, &stream_data->curr_stream_data); + if (stream_data->curr_stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) { + size_t payload_length = qd_message_stream_data_payload_length(stream_data->curr_stream_data); if (payload_length == 0) { - stream_data->next_body_data_result = qd_message_next_body_data(message, &stream_data->next_body_data); - if (stream_data->next_body_data_result == QD_MESSAGE_BODY_DATA_NO_MORE) { - qd_message_body_data_release(stream_data->next_body_data); - stream_data->next_body_data = 0; - qd_message_body_data_release(stream_data->curr_body_data); - stream_data->curr_body_data = 0; + stream_data->next_stream_data_result = qd_message_next_stream_data(message, &stream_data->next_stream_data); + if (stream_data->next_stream_data_result == QD_MESSAGE_STREAM_DATA_NO_MORE) { + qd_message_stream_data_release(stream_data->next_stream_data); + stream_data->next_stream_data = 0; + qd_message_stream_data_release(stream_data->curr_stream_data); + stream_data->curr_stream_data = 0; flags = NGHTTP2_FLAG_END_STREAM; stream_data->out_msg_has_body = false; qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] Message has no body, sending NGHTTP2_FLAG_END_STREAM with nghttp2_submit_headers", conn->conn_id); diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h index 979f34d..047c440 100644 --- a/src/adaptors/http2/http2_adaptor.h +++ b/src/adaptors/http2/http2_adaptor.h @@ -80,14 +80,14 @@ struct qdr_http2_stream_data_t { qd_composed_field_t *app_properties; qd_composed_field_t *footer_properties; qd_composed_field_t *body; - qd_message_body_data_t *curr_body_data; - qd_message_body_data_t *next_body_data; + qd_message_stream_data_t *curr_stream_data; + qd_message_stream_data_t *next_stream_data; DEQ_LINKS(qdr_http2_stream_data_t); - qd_message_body_data_result_t curr_body_data_result; - qd_message_body_data_result_t next_body_data_result; - int curr_body_data_qd_buff_offset; - int body_data_buff_count; + qd_message_stream_data_result_t curr_stream_data_result; + qd_message_stream_data_result_t next_stream_data_result; + int curr_stream_data_qd_buff_offset; + int stream_data_buff_count; int in_link_credit; // provided by router int32_t stream_id; size_t qd_buffers_to_send; diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c index 7b9af30..84b14ba 100644 --- a/src/adaptors/reference_adaptor.c +++ b/src/adaptors/reference_adaptor.c @@ -248,39 +248,67 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t // over to the per-message extraction of body-data segments. // printf("qdr_ref_deliver: depth ok\n"); - qd_message_body_data_t *body_data; - qd_message_body_data_result_t body_data_result; + qd_message_stream_data_t *stream_data; + qd_message_stream_data_result_t stream_data_result; // // Process as many body-data segments as are available. // while (true) { - body_data_result = qd_message_next_body_data(msg, &body_data); + stream_data_result = qd_message_next_stream_data(msg, &stream_data); - switch (body_data_result) { - case QD_MESSAGE_BODY_DATA_OK: { + switch (stream_data_result) { + case QD_MESSAGE_STREAM_DATA_BODY_OK: { // // We have a new valid body-data segment. Handle it // - printf("qdr_ref_deliver: body_data_buffer_count: %d\n", qd_message_body_data_buffer_count(body_data)); + printf("qdr_ref_deliver: stream_data_buffer_count: %d\n", qd_message_stream_data_buffer_count(stream_data)); - qd_iterator_t *body_iter = qd_message_body_data_iterator(body_data); + qd_iterator_t *body_iter = qd_message_stream_data_iterator(stream_data); char *body = (char*) qd_iterator_copy(body_iter); printf("qdr_ref_deliver: message body-data received: %s\n", body); free(body); qd_iterator_free(body_iter); - qd_message_body_data_release(body_data); + qd_message_stream_data_release(stream_data); + break; + } + + case QD_MESSAGE_STREAM_DATA_FOOTER_OK: { + printf("qdr_ref_deliver: Received message footer\n"); + qd_iterator_t *footer_iter = qd_message_stream_data_iterator(stream_data); + qd_parsed_field_t *footer = qd_parse(footer_iter); + + if (qd_parse_ok(footer)) { + uint8_t tag = qd_parse_tag(footer); + if (tag == QD_AMQP_MAP8 || tag == QD_AMQP_MAP32) { + uint32_t item_count = qd_parse_sub_count(footer); + for (uint32_t i = 0; i < item_count; i++) { + qd_iterator_t *key_iter = qd_parse_raw(qd_parse_sub_key(footer, i)); + qd_iterator_t *value_iter = qd_parse_raw(qd_parse_sub_value(footer, i)); + char *key = (char*) qd_iterator_copy(key_iter); + char *value = (char*) qd_iterator_copy(value_iter); + printf("qdr_ref_deliver: %s: %s\n", key, value); + free(key); + free(value); + } + } else + printf("qdr_ref_deliver: Unexpected tag in footer: %02x\n", tag); + } else + printf("qdr_ref_deliver: Footer parse error: %s\n", qd_parse_error(footer)); + + qd_parse_free(footer); + qd_iterator_free(footer_iter); break; } - case QD_MESSAGE_BODY_DATA_INCOMPLETE: + case QD_MESSAGE_STREAM_DATA_INCOMPLETE: // // A new segment has not completely arrived yet. Check again later. // printf("qdr_ref_deliver: body-data incomplete\n"); return 0; - case QD_MESSAGE_BODY_DATA_NO_MORE: + case QD_MESSAGE_STREAM_DATA_NO_MORE: // // We have already handled the last body-data segment for this delivery. // Complete the "sending" of this delivery and replenish credit. @@ -293,21 +321,13 @@ static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t qdr_link_flow(adaptor->core, link, 1, false); return PN_ACCEPTED; // This will cause the delivery to be settled - case QD_MESSAGE_BODY_DATA_INVALID: + case QD_MESSAGE_STREAM_DATA_INVALID: // // The body-data is corrupt in some way. Stop handling the delivery and reject it. // printf("qdr_ref_deliver: body-data invalid\n"); qdr_link_flow(adaptor->core, link, 1, false); return PN_REJECTED; - - case QD_MESSAGE_BODY_DATA_NOT_DATA: - // - // Valid data was seen, but it is not a body-data performative. Reject the delivery. - // - printf("qdr_ref_deliver: body not data\n"); - qdr_link_flow(adaptor->core, link, 1, false); - return PN_REJECTED; } } @@ -456,16 +476,24 @@ static void on_stream(void *context) // // Accumulated buffer list // - qd_buffer_list_t buffer_list; - DEQ_INIT(buffer_list); - qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length); - qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length); - - // - // append this data to the streaming message as one or more DATA - // performatives - // - depth = qd_message_body_data_append(adaptor->streaming_message, &buffer_list); + for (int sections = 0; sections < 3; sections++) { + qd_buffer_list_t buffer_list; + DEQ_INIT(buffer_list); + qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length); + qd_buffer_list_append(&buffer_list, (const uint8_t*) content, content_length); + + // + // Compose a DATA performative for this section of the stream + // + qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); + qd_compose_insert_binary_buffers(field, &buffer_list); + + // + // Extend the streaming message and free the composed field + // + depth = qd_message_extend(adaptor->streaming_message, field); + qd_compose_free(field); + } // // Notify the router that more data is ready to be pushed out on the delivery @@ -478,6 +506,14 @@ static void on_stream(void *context) adaptor->stream_count++; printf("on_stream: sent streamed frame %d, depth=%d\n", adaptor->stream_count, depth); } else { + qd_composed_field_t *footer = qd_compose(QD_PERFORMATIVE_FOOTER, 0); + qd_compose_start_map(footer); + qd_compose_insert_symbol(footer, "trailer"); + qd_compose_insert_string(footer, "value"); + qd_compose_end_map(footer); + depth = qd_message_extend(adaptor->streaming_message, footer); + qd_compose_free(footer); + qd_message_set_receive_complete(adaptor->streaming_message); adaptor->streaming_message = 0; adaptor->stream_count = 0; diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index 397f8bd..d711c7a 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -63,7 +63,7 @@ struct qdr_tcp_connection_t { uint64_t last_in_time; uint64_t last_out_time; - qd_message_body_data_t *outgoing_body_data; // current segment + qd_message_stream_data_t *outgoing_stream_data; // current segment size_t outgoing_body_bytes; // bytes received from current segment int outgoing_body_offset; // buffer offset into current segment @@ -147,7 +147,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn) grant_read_buffers(conn); if (conn->instream) { - qd_message_body_data_append(qdr_delivery_message(conn->instream), &buffers); + qd_message_stream_data_append(qdr_delivery_message(conn->instream), &buffers); qdr_delivery_continue(tcp_adaptor->core, conn->instream, false); qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Continuing message with %i bytes", conn->conn_id, conn->incoming_id, count); } else { @@ -229,25 +229,23 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r { int used = 0; - // Advance to next body_data vbin segment if necessary. + // Advance to next stream_data vbin segment if necessary. // Return early if no data to process or error - if (conn->outgoing_body_data == 0) { - qd_message_body_data_result_t body_data_result = qd_message_next_body_data(msg, &conn->outgoing_body_data); - if (body_data_result == QD_MESSAGE_BODY_DATA_OK) { - // a new body_data segment has been found + if (conn->outgoing_stream_data == 0) { + qd_message_stream_data_result_t stream_data_result = qd_message_next_stream_data(msg, &conn->outgoing_stream_data); + if (stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) { + // a new stream_data segment has been found conn->outgoing_body_bytes = 0; conn->outgoing_body_offset = 0; // continue to process this segment - } else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) { + } else if (stream_data_result == QD_MESSAGE_STREAM_DATA_INCOMPLETE) { return 0; } else { - switch (body_data_result) { - case QD_MESSAGE_BODY_DATA_NO_MORE: + switch (stream_data_result) { + case QD_MESSAGE_STREAM_DATA_NO_MORE: qd_log(tcp_adaptor->log_source, QD_LOG_INFO, "[C%"PRIu64"] EOS", conn->conn_id); break; - case QD_MESSAGE_BODY_DATA_INVALID: + case QD_MESSAGE_STREAM_DATA_INVALID: qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] Invalid body data for streaming message", conn->conn_id); break; - case QD_MESSAGE_BODY_DATA_NOT_DATA: - qd_log(tcp_adaptor->log_source, QD_LOG_ERROR, "[C%"PRIu64"] Invalid body; expected data section", conn->conn_id); break; default: break; } @@ -256,30 +254,30 @@ static int read_message_body(qdr_tcp_connection_t *conn, qd_message_t *msg, pn_r } } - // A valid body_data is in place. + // A valid stream_data is in place. // Try to get a buffer set from it. - used = qd_message_body_data_buffers(conn->outgoing_body_data, buffers, conn->outgoing_body_offset, count); + used = qd_message_stream_data_buffers(conn->outgoing_stream_data, buffers, conn->outgoing_body_offset, count); if (used > 0) { // Accumulate the lengths of the returned buffers. for (int i=0; i<used; i++) { conn->outgoing_body_bytes += buffers[i].size; } - // Buffers returned should never exceed the body_data payload length - assert(conn->outgoing_body_bytes <= conn->outgoing_body_data->payload.length); + // Buffers returned should never exceed the stream_data payload length + assert(conn->outgoing_body_bytes <= conn->outgoing_stream_data->payload.length); - if (conn->outgoing_body_bytes == conn->outgoing_body_data->payload.length) { - // This buffer set consumes the remainder of the body_data segment. - // Attach the body_data struct to the last buffer so that the struct + if (conn->outgoing_body_bytes == conn->outgoing_stream_data->payload.length) { + // This buffer set consumes the remainder of the stream_data segment. + // Attach the stream_data struct to the last buffer so that the struct // can be freed after the buffer has been transmitted by raw connection out. - buffers[used-1].context = (uintptr_t) conn->outgoing_body_data; + buffers[used-1].context = (uintptr_t) conn->outgoing_stream_data; - // Erase the body_data struct from the connection so that + // Erase the stream_data struct from the connection so that // a new one gets created on the next pass. - conn->outgoing_body_data = 0; + conn->outgoing_stream_data = 0; } else { - // Returned buffer set did not consume the entire body_data segment. - // Leave existing body_data struct in place for use on next pass. + // Returned buffer set did not consume the entire stream_data segment. + // Leave existing stream_data struct in place for use on next pass. // Add the number of returned buffers to the offset for the next pass. conn->outgoing_body_offset += used; } @@ -517,7 +515,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void for (size_t i = 0; i < n; ++i) { written += buffs[i].size; if (buffs[i].context) { - qd_message_body_data_release((qd_message_body_data_t*) buffs[i].context); + qd_message_stream_data_release((qd_message_stream_data_t*) buffs[i].context); } } } diff --git a/src/message.c b/src/message.c index 360fd84..80d569f 100644 --- a/src/message.c +++ b/src/message.c @@ -86,7 +86,7 @@ PN_HANDLE(PN_DELIVERY_CTX) ALLOC_DEFINE_CONFIG(qd_message_t, sizeof(qd_message_pvt_t), 0, 0); ALLOC_DEFINE(qd_message_content_t); -ALLOC_DEFINE(qd_message_body_data_t); +ALLOC_DEFINE(qd_message_stream_data_t); typedef void (*buffer_process_t) (void *context, const unsigned char *base, int length); @@ -1901,7 +1901,7 @@ void qd_message_send(qd_message_t *in_msg, // by freeing a buffer there now may be room to restart a // stalled message receiver if (content->q2_input_holdoff) { - if (qd_message_Q2_holdoff_should_unblock((qd_message_t *)msg)) { + if (qd_message_Q2_holdoff_should_unblock((qd_message_t*) msg)) { // wake up receive side // Note: clearing holdoff here is easy compared to // clearing it in the deferred callback. Tracing @@ -2368,79 +2368,83 @@ static void find_last_buffer(qd_field_location_t *location, unsigned char **curs } -void trim_body_data_headers(qd_message_body_data_t *body_data) +void trim_stream_data_headers(qd_message_stream_data_t *stream_data, bool remove_vbin_header) { - const qd_field_location_t *location = &body_data->section; + const qd_field_location_t *location = &stream_data->section; qd_buffer_t *buffer = location->buffer; unsigned char *cursor = qd_buffer_base(buffer) + location->offset; bool good = advance(&cursor, &buffer, location->hdr_length); assert(good); if (good) { - unsigned char tag = 0; - size_t vbin_hdr_len = 1; - // coverity[check_return] - next_octet(&cursor, &buffer, &tag); - if (tag == QD_AMQP_VBIN8) { - advance(&cursor, &buffer, 1); - vbin_hdr_len += 1; - } else if (tag == QD_AMQP_VBIN32) { - advance(&cursor, &buffer, 4); - vbin_hdr_len += 4; + size_t vbin_hdr_len = 0; + unsigned char tag = 0; + + if (remove_vbin_header) { + vbin_hdr_len = 1; + // coverity[check_return] + next_octet(&cursor, &buffer, &tag); + if (tag == QD_AMQP_VBIN8) { + advance(&cursor, &buffer, 1); + vbin_hdr_len += 1; + } else if (tag == QD_AMQP_VBIN32) { + advance(&cursor, &buffer, 4); + vbin_hdr_len += 4; + } } // coverity[check_return] can_advance(&cursor, &buffer); // bump cursor to the next buffer if necessary - body_data->payload.buffer = buffer; - body_data->payload.offset = cursor - qd_buffer_base(buffer); - body_data->payload.length = location->length - vbin_hdr_len; - body_data->payload.hdr_length = 0; - body_data->payload.parsed = true; - body_data->payload.tag = tag; + stream_data->payload.buffer = buffer; + stream_data->payload.offset = cursor - qd_buffer_base(buffer); + stream_data->payload.length = location->length - vbin_hdr_len; + stream_data->payload.hdr_length = 0; + stream_data->payload.parsed = true; + stream_data->payload.tag = tag; } } /** - * qd_message_body_data_iterator + * qd_message_stream_data_iterator * - * Given a body_data object, return an iterator that refers to the content of that body data. This iterator + * Given a stream_data object, return an iterator that refers to the content of that body data. This iterator * shall not refer to the 3-byte performative header or the header for the vbin{8,32} field. * * The iterator must be freed eventually by the caller. */ -qd_iterator_t *qd_message_body_data_iterator(const qd_message_body_data_t *body_data) +qd_iterator_t *qd_message_stream_data_iterator(const qd_message_stream_data_t *stream_data) { - const qd_field_location_t *location = &body_data->payload; + const qd_field_location_t *location = &stream_data->payload; return qd_iterator_buffer(location->buffer, location->offset, location->length, ITER_VIEW_ALL); } /** - * qd_message_body_data_payload_length + * qd_message_stream_data_payload_length * - * Given a body_data object, return the length of the payload. + * Given a stream_data object, return the length of the payload. */ -size_t qd_message_body_data_payload_length(const qd_message_body_data_t *body_data) +size_t qd_message_stream_data_payload_length(const qd_message_stream_data_t *stream_data) { - return body_data->payload.length; + return stream_data->payload.length; } /** - * qd_message_body_data_buffer_count + * qd_message_stream_data_buffer_count * - * Return the number of buffers contained in payload portion of the body_data object. + * Return the number of buffers contained in payload portion of the stream_data object. */ -int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data) +int qd_message_stream_data_buffer_count(const qd_message_stream_data_t *stream_data) { - if (body_data->payload.length == 0) + if (stream_data->payload.length == 0) return 0; int count = 1; - qd_buffer_t *buffer = body_data->payload.buffer; - while (!!buffer && buffer != body_data->last_buffer) { + qd_buffer_t *buffer = stream_data->payload.buffer; + while (!!buffer && buffer != stream_data->last_buffer) { buffer = DEQ_NEXT(buffer); count++; } @@ -2450,23 +2454,23 @@ int qd_message_body_data_buffer_count(const qd_message_body_data_t *body_data) /** - * qd_message_body_data_buffers + * qd_message_stream_data_buffers * - * Populate the provided array of pn_raw_buffers with the addresses and lengths of the buffers in the body_data + * Populate the provided array of pn_raw_buffers with the addresses and lengths of the buffers in the stream_data * object. Don't fill more than count raw_buffers with data. Start at offset from the zero-th buffer in the - * body_data. + * stream_data. */ -int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffer_t *buffers, int offset, int count) +int qd_message_stream_data_buffers(qd_message_stream_data_t *stream_data, pn_raw_buffer_t *buffers, int offset, int count) { - qd_buffer_t *buffer = body_data->payload.buffer; - size_t data_offset = body_data->payload.offset; - size_t payload_len = body_data->payload.length; + qd_buffer_t *buffer = stream_data->payload.buffer; + size_t data_offset = stream_data->payload.offset; + size_t payload_len = stream_data->payload.length; // // Skip the buffer offset // if (offset > 0) { - assert(offset < qd_message_body_data_buffer_count(body_data)); + assert(offset < qd_message_stream_data_buffer_count(stream_data)); while (offset > 0 && payload_len > 0) { payload_len -= qd_buffer_size(buffer) - data_offset; offset--; @@ -2498,21 +2502,21 @@ int qd_message_body_data_buffers(qd_message_body_data_t *body_data, pn_raw_buffe /** - * qd_message_body_data_release + * qd_message_stream_data_release * - * Decrement the fanout ref-counts for all of the buffers referred to in the body_data. If any have reached zero, + * Decrement the fanout ref-counts for all of the buffers referred to in the stream_data. If any have reached zero, * remove them from the buffer list and free them. Never dec-ref the last buffer in the content's buffer list. */ -void qd_message_body_data_release(qd_message_body_data_t *body_data) +void qd_message_stream_data_release(qd_message_stream_data_t *stream_data) { - free_qd_message_body_data_t(body_data); + free_qd_message_stream_data_t(stream_data); } -qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd_message_body_data_t **out_body_data) +qd_message_stream_data_result_t qd_message_next_stream_data(qd_message_t *in_msg, qd_message_stream_data_t **out_stream_data) { - qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; - qd_message_body_data_t *body_data = 0; + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + qd_message_stream_data_t *stream_data = 0; if (!msg->body_cursor) { // @@ -2520,58 +2524,67 @@ qd_message_body_data_result_t qd_message_next_body_data(qd_message_t *in_msg, qd // qd_message_depth_status_t status = qd_message_check_depth(in_msg, QD_DEPTH_BODY); if (status == QD_MESSAGE_DEPTH_OK) { - body_data = new_qd_message_body_data_t(); - ZERO(body_data); - body_data->owning_message = msg; - body_data->section = msg->content->section_body; - - find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer); - body_data->last_buffer = msg->body_buffer; - trim_body_data_headers(body_data); - - assert(DEQ_SIZE(msg->body_data_list) == 0); - DEQ_INSERT_TAIL(msg->body_data_list, body_data); - *out_body_data = body_data; - return QD_MESSAGE_BODY_DATA_OK; + stream_data = new_qd_message_stream_data_t(); + ZERO(stream_data); + stream_data->owning_message = msg; + stream_data->section = msg->content->section_body; + + find_last_buffer(&stream_data->section, &msg->body_cursor, &msg->body_buffer); + stream_data->last_buffer = msg->body_buffer; + trim_stream_data_headers(stream_data, true); + + assert(DEQ_SIZE(msg->stream_data_list) == 0); + DEQ_INSERT_TAIL(msg->stream_data_list, stream_data); + *out_stream_data = stream_data; + return QD_MESSAGE_STREAM_DATA_BODY_OK; } else if (status == QD_MESSAGE_DEPTH_INCOMPLETE) - return QD_MESSAGE_BODY_DATA_INCOMPLETE; + return QD_MESSAGE_STREAM_DATA_INCOMPLETE; else if (status == QD_MESSAGE_DEPTH_INVALID) - return QD_MESSAGE_BODY_DATA_INVALID; + return QD_MESSAGE_STREAM_DATA_INVALID; } qd_section_status_t section_status; qd_field_location_t location; ZERO(&location); + bool is_footer = false; + section_status = message_section_check(&msg->body_buffer, &msg->body_cursor, BODY_DATA_SHORT, 3, TAGS_BINARY, &location, true); + if (section_status == QD_SECTION_INVALID || section_status == QD_SECTION_NO_MATCH) { + is_footer = true; + section_status = message_section_check(&msg->body_buffer, &msg->body_cursor, + FOOTER_SHORT, 3, TAGS_MAP, + &location, true); + } + switch (section_status) { case QD_SECTION_INVALID: case QD_SECTION_NO_MATCH: - return QD_MESSAGE_BODY_DATA_INVALID; + return QD_MESSAGE_STREAM_DATA_INVALID; case QD_SECTION_MATCH: - body_data = new_qd_message_body_data_t(); - ZERO(body_data); - body_data->owning_message = msg; - body_data->section = location; - find_last_buffer(&body_data->section, &msg->body_cursor, &msg->body_buffer); - body_data->last_buffer = msg->body_buffer; - trim_body_data_headers(body_data); - DEQ_INSERT_TAIL(msg->body_data_list, body_data); - *out_body_data = body_data; - return QD_MESSAGE_BODY_DATA_OK; + stream_data = new_qd_message_stream_data_t(); + ZERO(stream_data); + stream_data->owning_message = msg; + stream_data->section = location; + find_last_buffer(&stream_data->section, &msg->body_cursor, &msg->body_buffer); + stream_data->last_buffer = msg->body_buffer; + trim_stream_data_headers(stream_data, !is_footer); + DEQ_INSERT_TAIL(msg->stream_data_list, stream_data); + *out_stream_data = stream_data; + return is_footer ? QD_MESSAGE_STREAM_DATA_FOOTER_OK : QD_MESSAGE_STREAM_DATA_BODY_OK; case QD_SECTION_NEED_MORE: if (msg->content->receive_complete) - return QD_MESSAGE_BODY_DATA_NO_MORE; + return QD_MESSAGE_STREAM_DATA_NO_MORE; else - return QD_MESSAGE_BODY_DATA_INCOMPLETE; + return QD_MESSAGE_STREAM_DATA_INCOMPLETE; } - return QD_MESSAGE_BODY_DATA_NO_MORE; + return QD_MESSAGE_STREAM_DATA_NO_MORE; } @@ -2663,7 +2676,7 @@ void qd_message_release_body(qd_message_t *msg, pn_raw_buffer_t *buffers, int bu } -qd_parsed_field_t *qd_message_get_ingress (qd_message_t *msg) +qd_parsed_field_t *qd_message_get_ingress(qd_message_t *msg) { return ((qd_message_pvt_t*)msg)->content->ma_pf_ingress; } @@ -2755,7 +2768,7 @@ bool qd_message_oversize(const qd_message_t *msg) } -int qd_message_body_data_append(qd_message_t *message, qd_buffer_list_t *data) +int qd_message_stream_data_append(qd_message_t *message, qd_buffer_list_t *data) { unsigned int length = DEQ_SIZE(*data); qd_composed_field_t *field = 0; diff --git a/src/message_private.h b/src/message_private.h index a6ca077..3b83de0 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -61,16 +61,16 @@ typedef struct { } qd_field_location_t; -struct qd_message_body_data_t { - DEQ_LINKS(qd_message_body_data_t); // Linkage to form a DEQ +struct qd_message_stream_data_t { + DEQ_LINKS(qd_message_stream_data_t); // Linkage to form a DEQ qd_message_pvt_t *owning_message; // Pointer to the owning message qd_field_location_t section; // Section descriptor for the field qd_field_location_t payload; // Descriptor for the payload of the body data qd_buffer_t *last_buffer; // Pointer to the last buffer in the field }; -ALLOC_DECLARE(qd_message_body_data_t); -DEQ_DECLARE(qd_message_body_data_t, qd_message_body_data_list_t); +ALLOC_DECLARE(qd_message_stream_data_t); +DEQ_DECLARE(qd_message_stream_data_t, qd_message_stream_data_list_t); // TODO - consider using pointers to qd_field_location_t below to save memory // TODO - provide a way to allocate a message without a lock for the link-routing case. @@ -141,23 +141,23 @@ typedef struct { } qd_message_content_t; struct qd_message_pvt_t { - qd_iterator_pointer_t cursor; // A pointer to the current location of the outgoing byte stream. - qd_message_depth_t message_depth; // What is the depth of the message that has been received so far - qd_message_depth_t sent_depth; // How much of the message has been sent? QD_DEPTH_NONE means nothing has been sent so far, QD_DEPTH_HEADER means the header has already been sent, dont send it again and so on. - qd_message_content_t *content; // The actual content of the message. The content is never copied - qd_buffer_list_t ma_to_override; // to field in outgoing message annotations. - qd_buffer_list_t ma_trace; // trace list in outgoing message annotations - qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations - int ma_phase; // phase for the override address - bool ma_stream; // indicates whether this message is streaming - qd_message_body_data_list_t body_data_list; // TODO - move this to the content for one-time parsing (TLR) - qd_message_body_data_t *next_body_data; - unsigned char *body_cursor; - qd_buffer_t *body_buffer; - bool strip_annotations_in; - bool send_complete; // Has the message been completely received and completely sent? - bool tag_sent; // Tags are sent - bool is_fanout; // If msg is an outgoing fanout + qd_iterator_pointer_t cursor; // A pointer to the current location of the outgoing byte stream. + qd_message_depth_t message_depth; // What is the depth of the message that has been received so far + qd_message_depth_t sent_depth; // How much of the message has been sent? QD_DEPTH_NONE means nothing has been sent so far, QD_DEPTH_HEADER means the header has already been sent, dont send it again and so on. + qd_message_content_t *content; // The actual content of the message. The content is never copied + qd_buffer_list_t ma_to_override; // to field in outgoing message annotations. + qd_buffer_list_t ma_trace; // trace list in outgoing message annotations + qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations + int ma_phase; // phase for the override address + bool ma_stream; // indicates whether this message is streaming + qd_message_stream_data_list_t stream_data_list; // TODO - move this to the content for one-time parsing (TLR) + qd_message_stream_data_t *next_stream_data; + unsigned char *body_cursor; + qd_buffer_t *body_buffer; + bool strip_annotations_in; + bool send_complete; // Has the message been completely received and completely sent? + bool tag_sent; // Tags are sent + bool is_fanout; // If msg is an outgoing fanout }; ALLOC_DECLARE(qd_message_t); diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index 6ca2b2b..8c361a2 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -171,11 +171,13 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) break; } } while (settled != dlv->settled && !to_new_link); // oops missed the settlement + send_complete = qdr_delivery_send_complete(dlv); if (send_complete || to_new_link) { // - // The entire message has been sent. It is now the appropriate time to have the delivery removed - // from the head of the undelivered list and move it to the unsettled list if it is not settled. + // The entire message has been sent or the message has been moved from this link. + // It is now the appropriate time to remove the delivery from the head of the + // undelivered list to the unsettled list if it is not settled. // num_deliveries_completed++; @@ -252,6 +254,42 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) } +void qdr_link_complete_sent_message(qdr_core_t *core, qdr_link_t *link) +{ + if (!link || !link->conn) + return; + + qdr_connection_t *conn = link->conn; + bool activate = false; + + sys_mutex_lock(conn->work_lock); + qdr_delivery_t *dlv = DEQ_HEAD(link->undelivered); + if (!!dlv && qdr_delivery_send_complete(dlv)) { + DEQ_REMOVE_HEAD(link->undelivered); + if (!dlv->settled && !qdr_delivery_oversize(dlv) && !qdr_delivery_is_aborted(dlv)) { + DEQ_INSERT_TAIL(link->unsettled, dlv); + dlv->where = QDR_DELIVERY_IN_UNSETTLED; + qd_log(core->log, QD_LOG_DEBUG, "Delivery transfer: dlv:%lx qdr_link_complete_sent_message: undelivered-list -> unsettled-list", (long) dlv); + } else { + dlv->where = QDR_DELIVERY_NOWHERE; + qdr_delivery_decref(core, dlv, "qdr_link_complete_sent_message - removed from undelivered"); + } + + // + // If there's another delivery on the undelivered list, get the outbound process moving again. + // + if (DEQ_SIZE(link->undelivered) > 0) { + qdr_add_link_ref(&conn->links_with_work[link->priority], link, QDR_LINK_LIST_CLASS_WORK); + activate = true; + } + } + sys_mutex_unlock(conn->work_lock); + + if (activate) + conn->protocol_adaptor->activate_handler(conn->protocol_adaptor->user_context, conn); +} + + void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool drain_mode) { qdr_action_t *action = qdr_action(qdr_link_flow_CT, "link_flow"); diff --git a/tests/message_test.c b/tests/message_test.c index 87271df..1d3a5d3 100644 --- a/tests/message_test.c +++ b/tests/message_test.c @@ -706,10 +706,10 @@ exit: } // -// Testing protocol adapter 'body_data' interfaces +// Testing protocol adapter 'stream_data' interfaces // -static void body_data_generate_message(qd_message_t *msg, char *s_chunk_size, char *s_n_chunks) +static void stream_data_generate_message(qd_message_t *msg, char *s_chunk_size, char *s_n_chunks) { // Fill a message with n_chunks of vbin chunk_size body data. @@ -747,20 +747,20 @@ static void body_data_generate_message(qd_message_t *msg, char *s_chunk_size, ch } } -static void free_body_data_list(qd_message_t *msg_in) +static void free_stream_data_list(qd_message_t *msg_in) { // DISPATCH-1800 - this should not be required here qd_message_pvt_t *msg = (qd_message_pvt_t *)msg_in; - qd_message_body_data_t *bd = DEQ_HEAD(msg->body_data_list); + qd_message_stream_data_t *bd = DEQ_HEAD(msg->stream_data_list); while (bd) { - qd_message_body_data_t *next = DEQ_NEXT(bd); - free_qd_message_body_data_t(bd); + qd_message_stream_data_t *next = DEQ_NEXT(bd); + free_qd_message_stream_data_t(bd); bd = next; } } -static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten) +static char *check_stream_data(char *s_chunk_size, char *s_n_chunks, bool flatten) { // Fill a message with n chunks of vbin chunk_size body data. // Then test by retrieving n chunks from a message copy and verifing. @@ -786,7 +786,7 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten) qd_message_pvt_t *msg_pvt = (qd_message_pvt_t *)msg; // Set the original message content - body_data_generate_message(msg, s_chunk_size, s_n_chunks); + stream_data_generate_message(msg, s_chunk_size, s_n_chunks); // flatten if required if (flatten) { @@ -810,22 +810,22 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten) // Define the number of raw buffers to be extracted on each loop #define N_PN_RAW_BUFFS (2) - qd_message_body_data_t *body_data; + qd_message_stream_data_t *stream_data; for (int j=0; j<n_chunks; j++) { received = 0; // this chunk received size in bytes. - // Set up the next_body_data snapshot - qd_message_body_data_result_t body_data_result = qd_message_next_body_data(copy, &body_data); + // Set up the next_stream_data snapshot + qd_message_stream_data_result_t stream_data_result = qd_message_next_stream_data(copy, &stream_data); - if (body_data_result == QD_MESSAGE_BODY_DATA_OK) { - // check body_data payload length - if (body_data->payload.length != chunk_size) { - printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, " + if (stream_data_result == QD_MESSAGE_STREAM_DATA_BODY_OK) { + // check stream_data payload length + if (stream_data->payload.length != chunk_size) { + printf("********** check_stream_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, " "chunk_size:%s, n_chunks:%s, payload length error : %zu \n", - BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, body_data->payload.length); + BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, stream_data->payload.length); fflush(stdout); - result = "qd_message_next_body_data returned wrong payload length."; + result = "qd_message_next_stream_data returned wrong payload length."; break; } @@ -837,14 +837,14 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten) pn_raw_buffer_t buffs[N_PN_RAW_BUFFS]; // used_buffers - Number of qd_buffers in content buffer chain consumed so far. - // This number must increase as dictated by qd_message_body_data_buffers() - // when vbin segments are consumed from the current body_data chunk. + // This number must increase as dictated by qd_message_stream_data_buffers() + // when vbin segments are consumed from the current stream_data chunk. // A single vbin segment may consume 0, 1, or many qd_buffers. size_t used_buffers = 0; while (received < chunk_size) { ZERO(buffs); - size_t n_used = qd_message_body_data_buffers(body_data, buffs, used_buffers, N_PN_RAW_BUFFS); + size_t n_used = qd_message_stream_data_buffers(stream_data, buffs, used_buffers, N_PN_RAW_BUFFS); if (n_used > 0) { for (size_t ii=0; ii<n_used; ii++) { char e_char = (char)(j + 1); // expected char in payload @@ -852,7 +852,7 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten) for (uint32_t idx=0; idx < buffs[ii].size; idx++) { char actual = buffs[ii].bytes[buffs[ii].offset + idx]; if (e_char != actual) { - printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, " + printf("********** check_stream_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, " "chunk_size:%s, n_chunks:%s, verify error at index %d, expected:%d, actual:%d \n", BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, received + idx, e_char, actual); @@ -865,7 +865,7 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten) used_buffers += n_used; if (!!result) break; } else { - printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, " + printf("********** check_stream_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, " "chunk_size:%s, n_chunks:%s, received %d bytes (not enough) \n", BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, received); fflush(stdout); @@ -873,7 +873,7 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten) break; } if (received > chunk_size) { - printf("********** check_body_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, " + printf("********** check_stream_data: BUFFER_SIZE=%zu, pn-buf-array-size:%d, " "chunk_size:%s, n_chunks:%s, received %d bytes (too many) \n", BUFFER_SIZE, N_PN_RAW_BUFFS, s_chunk_size, s_n_chunks, received); result = "Received too much data"; @@ -882,32 +882,30 @@ static char *check_body_data(char *s_chunk_size, char *s_n_chunks, bool flatten) } // successful check - } else if (body_data_result == QD_MESSAGE_BODY_DATA_INCOMPLETE) { + } else if (stream_data_result == QD_MESSAGE_STREAM_DATA_INCOMPLETE) { result = "DATA_INCOMPLETE"; break; } else { - switch (body_data_result) { - case QD_MESSAGE_BODY_DATA_NO_MORE: + switch (stream_data_result) { + case QD_MESSAGE_STREAM_DATA_NO_MORE: result = "EOS"; break; - case QD_MESSAGE_BODY_DATA_INVALID: + case QD_MESSAGE_STREAM_DATA_INVALID: result = "Invalid body data for streaming message"; break; - case QD_MESSAGE_BODY_DATA_NOT_DATA: - result = "Invalid body; expected data section"; break; default: result = "result: default"; break; } } } - free_body_data_list(msg); + free_stream_data_list(msg); qd_message_free(msg); if (!!copy) { - free_body_data_list(copy); + free_stream_data_list(copy); qd_message_free(copy); } return result; } -static char *test_check_body_data(void * context) +static char *test_check_stream_data(void * context) { char *result = 0; @@ -919,16 +917,16 @@ static char *test_check_body_data(void * context) for (int i=0; i<N_CHUNK_SIZES; i++) { for (int j=0; j<N_N_CHUNKS; j++) { - result = check_body_data(chunk_sizes[i], n_chunks[j], false); + result = check_stream_data(chunk_sizes[i], n_chunks[j], false); if (!!result) { - printf("test_check_body_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s \n", + printf("test_check_stream_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s \n", chunk_sizes[i], n_chunks[j], "false", result); fflush(stdout); return result; } - result = check_body_data(chunk_sizes[i], n_chunks[j], true); + result = check_stream_data(chunk_sizes[i], n_chunks[j], true); if (!!result) { - printf("test_check_body_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s \n", + printf("test_check_stream_data: chunk_size:%s, n_chunks:%s, flatten:%s, result:%s \n", chunk_sizes[i], n_chunks[j], "true", result); fflush(stdout); return result; @@ -939,10 +937,10 @@ static char *test_check_body_data(void * context) } -// Verify that qd_message_body_data_append() will break up a long binary data +// Verify that qd_message_stream_data_append() will break up a long binary data // field in order to avoid triggering Q2 // -static char *test_check_body_data_append(void * context) +static char *test_check_stream_data_append(void * context) { char *result = 0; qd_message_t *msg = 0; @@ -977,7 +975,7 @@ static char *test_check_body_data_append(void * context) qd_message_compose_2(msg, field, false); qd_compose_free(field); - int depth = qd_message_body_data_append(msg, &bin_data); + int depth = qd_message_stream_data_append(msg, &bin_data); if (depth <= buffer_count) { // expected to add extra buffer(s) for meta-data result = "append length is incorrect"; @@ -995,29 +993,30 @@ static char *test_check_body_data_append(void * context) int bd_count = 0; int total_buffers = 0; - qd_message_body_data_t *body_data = 0; + qd_message_stream_data_t *stream_data = 0; bool done = false; while (!done) { - switch (qd_message_next_body_data(msg, &body_data)) { - case QD_MESSAGE_BODY_DATA_INCOMPLETE: - case QD_MESSAGE_BODY_DATA_INVALID: - case QD_MESSAGE_BODY_DATA_NOT_DATA: + switch (qd_message_next_stream_data(msg, &stream_data)) { + case QD_MESSAGE_STREAM_DATA_INCOMPLETE: + case QD_MESSAGE_STREAM_DATA_INVALID: result = "Next body data failed to get next body data"; goto exit; - case QD_MESSAGE_BODY_DATA_NO_MORE: + case QD_MESSAGE_STREAM_DATA_NO_MORE: done = true; break; - case QD_MESSAGE_BODY_DATA_OK: + case QD_MESSAGE_STREAM_DATA_BODY_OK: bd_count += 1; - // qd_message_body_data_append() breaks the buffer list up into + // qd_message_stream_data_append() breaks the buffer list up into // smaller lists that are no bigger than QD_QLIMIT_Q2_LOWER buffers // long - total_buffers += qd_message_body_data_buffer_count(body_data); - if (qd_message_body_data_buffer_count(body_data) > QD_QLIMIT_Q2_LOWER) { + total_buffers += qd_message_stream_data_buffer_count(stream_data); + if (qd_message_stream_data_buffer_count(stream_data) > QD_QLIMIT_Q2_LOWER) { result = "Body data list length too long!"; goto exit; } - qd_message_body_data_release(body_data); + qd_message_stream_data_release(stream_data); + break; + case QD_MESSAGE_STREAM_DATA_FOOTER_OK: break; } } @@ -1051,8 +1050,8 @@ int message_tests(void) TEST_CASE(test_q2_input_holdoff_sensing, 0); TEST_CASE(test_incomplete_annotations, 0); TEST_CASE(test_check_weird_messages, 0); - TEST_CASE(test_check_body_data, 0); - TEST_CASE(test_check_body_data_append, 0); + TEST_CASE(test_check_stream_data, 0); + TEST_CASE(test_check_stream_data_append, 0); return result; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org