This is an automated email from the ASF dual-hosted git repository. gsim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new 5593989 DISPATCH-1780: initial support for aggregated multicast 5593989 is described below commit 5593989c3e117d993a1676706aac1875673a0d94 Author: Gordon Sim <g...@redhat.com> AuthorDate: Mon Nov 2 22:03:40 2020 +0000 DISPATCH-1780: initial support for aggregated multicast --- include/qpid/dispatch/http1_codec.h | 19 ++ python/qpid_dispatch/management/qdrouter.json | 30 +++ python/qpid_dispatch_internal/router/message.py | 17 +- src/adaptors/http1/http1_client.c | 305 +++++++++++++++++++++++- src/adaptors/http1/http1_codec.c | 100 +++++++- src/adaptors/http1/http1_private.h | 3 +- src/adaptors/http1/http1_server.c | 45 +++- src/adaptors/http_common.c | 13 + src/adaptors/http_common.h | 8 + src/python_embedded.c | 69 ++++++ src/python_private.h | 6 + 11 files changed, 590 insertions(+), 25 deletions(-) diff --git a/include/qpid/dispatch/http1_codec.h b/include/qpid/dispatch/http1_codec.h index 8aac8c4..79dfcbf 100644 --- a/include/qpid/dispatch/http1_codec.h +++ b/include/qpid/dispatch/http1_codec.h @@ -231,6 +231,10 @@ int h1_codec_tx_add_header(h1_codec_request_state_t *hrs, const char *key, const // int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data); +// Write body as string +// +int h1_codec_tx_body_str(h1_codec_request_state_t *hrs, char *data); + // outgoing message construction complete. The request_complete() callback MAY // occur during this call. // @@ -241,5 +245,20 @@ int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_stream_data_t *st // int h1_codec_tx_done(h1_codec_request_state_t *hrs, bool *need_close); +// begin multipart content; this will generate a boundary marker and set the content type header +// +int h1_codec_tx_begin_multipart(h1_codec_request_state_t *hrs); + +// begin a new multipart section +// +int h1_codec_tx_begin_multipart_section(h1_codec_request_state_t *hrs); + +// mark the end of multipart data +// +int h1_codec_tx_end_multipart(h1_codec_request_state_t *hrs); + +uint64_t h1_codec_tx_multipart_section_boundary_length(); +uint64_t h1_codec_tx_multipart_end_boundary_length(); + #endif // http1_codec_H diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index 792515d..0efab69 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -1124,6 +1124,21 @@ "default": "HTTP1", "required": false, "create": true + }, + "aggregation": { + "type": [ + "multipart", + "json" + ], + "required": false, + "description": "Aggregation mode for responses when used in conjunction with multicast address.", + "create": true + }, + "eventChannel": { + "type": "boolean", + "required": false, + "description": "Enables restricted event mode where no reponses are sent to request and only post is allowed", + "create": true } } }, @@ -1165,6 +1180,21 @@ "default": "HTTP1", "required": false, "create": true + }, + "aggregation": { + "type": [ + "multipart", + "json" + ], + "required": false, + "description": "Aggregation mode for responses when used in conjunction with multicast address.", + "create": true + }, + "eventChannel": { + "type": "boolean", + "required": false, + "description": "Enables restricted event mode where no reponses are sent to request and only post is allowed", + "create": true } } }, diff --git a/python/qpid_dispatch_internal/router/message.py b/python/qpid_dispatch_internal/router/message.py index ed73288..eb3ef7a 100644 --- a/python/qpid_dispatch_internal/router/message.py +++ b/python/qpid_dispatch_internal/router/message.py @@ -16,11 +16,11 @@ # specific language governing permissions and limitations # under the License # - from __future__ import unicode_literals from __future__ import division from __future__ import absolute_import from __future__ import print_function +import json """Python class to hold message data""" @@ -38,7 +38,7 @@ class Message(object): @ivar properties: Application properties. """ - _fields = ['address', 'properties', 'body', 'reply_to', 'correlation_id'] + _fields = ['address', 'properties', 'body', 'reply_to', 'correlation_id', 'content_type'] def __init__(self, **kwds): """All instance variables can be set as keywords. See L{Message}""" @@ -50,3 +50,16 @@ class Message(object): def __repr__(self): return "%s(%s)" % (type(self).__name__, ", ".join("%s=%r"%(f, getattr(self, f)) for f in self._fields)) + +def simplify(msg): + m = {} + for k, v in msg.properties.items(): + m[k] = v + if msg.body: + m["body"] = msg.body.decode() + if msg.content_type: + m["content_type"] = msg.content_type + return m + +def messages_to_json(msgs): + return json.dumps([simplify(m) for m in msgs], indent=4) diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c index 33412da..befe499 100644 --- a/src/adaptors/http1/http1_client.c +++ b/src/adaptors/http1/http1_client.c @@ -19,6 +19,7 @@ #include "http1_private.h" #include "adaptors/adaptor_utils.h" +#include "python_private.h" #include <proton/listener.h> #include <proton/proactor.h> @@ -32,6 +33,7 @@ #define DEFAULT_CAPACITY 250 #define LISTENER_BACKLOG 16 +const char *CONTENT_LENGTH_KEY = "Content-Length"; // // State for a single response message to be sent to the client via the raw @@ -80,6 +82,8 @@ typedef struct _client_request_t { bool close_on_complete; // close the conn when this request is complete bool conn_close_hdr; // add Connection: close to response msg + uint32_t version_major; + uint32_t version_minor; } _client_request_t; ALLOC_DECLARE(_client_request_t); ALLOC_DEFINE(_client_request_t); @@ -154,6 +158,8 @@ static qdr_http1_connection_t *_create_client_connection(qd_http_listener_t *li) hconn->cfg.port = qd_strdup(li->config.port); hconn->cfg.address = qd_strdup(li->config.address); hconn->cfg.site = li->config.site ? qd_strdup(li->config.site) : 0; + hconn->cfg.event_channel = li->config.event_channel; + hconn->cfg.aggregation = li->config.aggregation; hconn->raw_conn = pn_raw_connection(); pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context); @@ -478,7 +484,8 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses); while (rmsg && rmsg->dispo && - DEQ_IS_EMPTY(rmsg->out_data.fifo)) { + DEQ_IS_EMPTY(rmsg->out_data.fifo) && + hconn->cfg.aggregation == QD_AGGREGATION_NONE) { // response message fully received and forwarded to client qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"][L%"PRIu64"] HTTP client request msg-id=%"PRIu64" settling response, dispo=0x%"PRIx64, @@ -558,10 +565,19 @@ static void _client_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_ "[C%"PRIu64"][L%"PRIu64"] %u request octets encoded", hconn->conn_id, hconn->out_link_id, len); - // responses are decoded one at a time - the current response it at the - // tail of the response list - _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses); + _client_response_msg_t *rmsg; + if (hconn->cfg.aggregation == QD_AGGREGATION_NONE) { + // responses are decoded one at a time - the current response it at the + // tail of the response list + rmsg = DEQ_TAIL(hreq->responses); + } else { + // when responses are aggregated the buffers don't need to be + // correlated to specific responses as they will all be + // written out together, so can just use the head of the + // response list + rmsg = DEQ_HEAD(hreq->responses); + } assert(rmsg); qdr_http1_enqueue_buffer_list(&rmsg->out_data, blist); @@ -592,10 +608,19 @@ static void _client_tx_stream_data_cb(h1_codec_request_state_t *hrs, qd_message_ "[C%"PRIu64"][L%"PRIu64"] Sending body data to client", hconn->conn_id, hconn->out_link_id); - // responses are decoded one at a time - the current response it at the - // tail of the response list - _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses); + _client_response_msg_t *rmsg; + if (hconn->cfg.aggregation == QD_AGGREGATION_NONE) { + // responses are decoded one at a time - the current response it at the + // tail of the response list + rmsg = DEQ_TAIL(hreq->responses); + } else { + // when responses are aggregated the buffers don't need to be + // correlated to specific responses as they will all be + // written out together, so can just use the head of the + // response list + rmsg = DEQ_HEAD(hreq->responses); + } assert(rmsg); qdr_http1_enqueue_stream_data(&rmsg->out_data, stream_data); @@ -626,6 +651,8 @@ static int _client_rx_request_cb(h1_codec_request_state_t *hrs, creq->base.lib_rs = hrs; creq->base.hconn = hconn; creq->close_on_complete = (version_minor == 0); + creq->version_major = version_major; + creq->version_minor = version_minor; DEQ_INIT(creq->responses); qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, @@ -915,6 +942,226 @@ void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t *adaptor, } } +static bool _get_multipart_content_length(_client_request_t *hreq, char *value) +{ + uint64_t total = 0; + for (_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses); rmsg; rmsg = rmsg->next) { + qd_message_t *msg = qdr_delivery_message(rmsg->dlv); + uint64_t content_length = h1_codec_tx_multipart_section_boundary_length(); + bool got_body_length = false; + + qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES); + if (app_props_iter) { + qd_parsed_field_t *app_props = qd_parse(app_props_iter); + if (app_props && qd_parse_is_map(app_props)) { + // now send all headers in app properties + qd_parsed_field_t *key = qd_field_first_child(app_props); + while (key) { + qd_parsed_field_t *value = qd_field_next_child(key); + if (!value) + break; + + qd_iterator_t *i_key = qd_parse_raw(key); + if (!i_key) + break; + + if (qd_iterator_equal(i_key, (const unsigned char*) CONTENT_LENGTH_KEY)) { + qd_iterator_t *i_value = qd_parse_raw(value); + if (i_value) { + char *length_str = (char*) qd_iterator_copy(i_value); + uint64_t body_length; + sscanf(length_str, "%"SCNu64, &body_length); + free(length_str); + got_body_length = true; + content_length += body_length; + } + } else if (!qd_iterator_prefix(i_key, HTTP1_HEADER_PREFIX)) { + qd_iterator_t *i_value = qd_parse_raw(value); + if (!i_value) + break; + + content_length += qd_iterator_length(i_key) + 2 + qd_iterator_length(i_value) + 2; + } + + key = qd_field_next_child(value); + } + } + qd_parse_free(app_props); + } + qd_iterator_free(app_props_iter); + if (got_body_length) { + total += content_length; + } else { + return false; + } + } + total += h1_codec_tx_multipart_end_boundary_length(); + sprintf(value, "%"SCNu64, total); + return true; +} + +static void _encode_json_response(_client_request_t *hreq) +{ + qdr_http1_connection_t *hconn = hreq->base.hconn; + qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] encoding json response", hconn->conn_id); + bool ok = !h1_codec_tx_response(hreq->base.lib_rs, 200, NULL, hreq->version_major, hreq->version_minor); + if (!ok) { + qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] Could not encode response", hconn->conn_id); + return; + } + PyObject* msgs = 0; + qd_json_msgs_init(&msgs); + for (_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses); rmsg; rmsg = rmsg->next) { + qd_message_t *msg = qdr_delivery_message(rmsg->dlv); + qd_json_msgs_append(msgs, msg); + rmsg->encoded = true; + } + char *body = qd_json_msgs_string(msgs); + if (body) { + h1_codec_tx_add_header(hreq->base.lib_rs, "Content-Type", "application/json"); + int len = strlen(body); + char content_length[25]; + sprintf(content_length, "%i", len); + h1_codec_tx_add_header(hreq->base.lib_rs, CONTENT_LENGTH_KEY, content_length); + h1_codec_tx_body_str(hreq->base.lib_rs, body); + free(body); + } else { + qd_log(hconn->adaptor->log, QD_LOG_ERROR, "[C%"PRIu64"] No aggregated json response returned", hconn->conn_id); + } + bool need_close; + h1_codec_tx_done(hreq->base.lib_rs, &need_close); + hreq->close_on_complete = need_close || hreq->close_on_complete; + hreq->codec_completed = true; +} + +static void _encode_multipart_response(_client_request_t *hreq) +{ + qdr_http1_connection_t *hconn = hreq->base.hconn; + qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] encoding multipart response", hconn->conn_id); + bool ok = !h1_codec_tx_response(hreq->base.lib_rs, 200, NULL, hreq->version_major, hreq->version_minor); + char content_length[25]; + if (_get_multipart_content_length(hreq, content_length)) { + h1_codec_tx_add_header(hreq->base.lib_rs, CONTENT_LENGTH_KEY, content_length); + } + h1_codec_tx_begin_multipart(hreq->base.lib_rs); + for (_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses); rmsg; rmsg = rmsg->next) { + h1_codec_tx_begin_multipart_section(hreq->base.lib_rs); + qd_message_t *msg = qdr_delivery_message(rmsg->dlv); + + qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES); + if (app_props_iter) { + qd_parsed_field_t *app_props = qd_parse(app_props_iter); + if (app_props && qd_parse_is_map(app_props)) { + // now send all headers in app properties + qd_parsed_field_t *key = qd_field_first_child(app_props); + while (ok && key) { + qd_parsed_field_t *value = qd_field_next_child(key); + if (!value) + break; + + qd_iterator_t *i_key = qd_parse_raw(key); + if (!i_key) + break; + + // ignore the special headers added by the mapping and content-length field (TODO: case insensitive comparison for content-length) + if (!qd_iterator_prefix(i_key, HTTP1_HEADER_PREFIX) && !qd_iterator_equal(i_key, (const unsigned char*) CONTENT_LENGTH_KEY)) { + qd_iterator_t *i_value = qd_parse_raw(value); + if (!i_value) + break; + + char *header_key = (char*) qd_iterator_copy(i_key); + char *header_value = (char*) qd_iterator_copy(i_value); + ok = !h1_codec_tx_add_header(hreq->base.lib_rs, header_key, header_value); + + free(header_key); + free(header_value); + } + + key = qd_field_next_child(value); + } + } + qd_parse_free(app_props); + } + qd_iterator_free(app_props_iter); + rmsg->headers_encoded = true; + + qd_message_stream_data_t *body_data = 0; + bool done = false; + while (ok && !done) { + switch (qd_message_next_stream_data(msg, &body_data)) { + + case QD_MESSAGE_STREAM_DATA_BODY_OK: + + qd_log(hconn->adaptor->log, QD_LOG_TRACE, + "[C%"PRIu64"][L%"PRIu64"] Encoding response body data", + hconn->conn_id, hconn->out_link_id); + + if (h1_codec_tx_body(hreq->base.lib_rs, body_data)) { + qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, + "[C%"PRIu64"][L%"PRIu64"] body data encode failed", + hconn->conn_id, hconn->out_link_id); + ok = false; + } + break; + + case QD_MESSAGE_STREAM_DATA_NO_MORE: + // indicate this message is complete + qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] response message encoding completed", + hconn->conn_id, hconn->out_link_id); + done = true; + break; + + case QD_MESSAGE_STREAM_DATA_INCOMPLETE: + qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, + "[C%"PRIu64"][L%"PRIu64"] Ignoring incomplete body data in aggregated response.", + hconn->conn_id, hconn->out_link_id); + done = true; + break; // wait for more + + case QD_MESSAGE_STREAM_DATA_INVALID: + qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, + "[C%"PRIu64"][L%"PRIu64"] Ignoring corrupted body data in aggregated response.", + hconn->conn_id, hconn->out_link_id); + done = true; + break; + + case QD_MESSAGE_STREAM_DATA_FOOTER_OK: + qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, + "[C%"PRIu64"][L%"PRIu64"] Ignoring footer in aggregated response.", + hconn->conn_id, hconn->out_link_id); + done = true; + break; + } + } + rmsg->encoded = true; + + } + h1_codec_tx_end_multipart(hreq->base.lib_rs); + bool need_close; + h1_codec_tx_done(hreq->base.lib_rs, &need_close); + hreq->close_on_complete = need_close || hreq->close_on_complete; + hreq->codec_completed = true; +} + +static void _encode_aggregated_response(qdr_http1_connection_t *hconn, _client_request_t *hreq) +{ + if (hconn->cfg.aggregation == QD_AGGREGATION_MULTIPART) { + _encode_multipart_response(hreq); + } else if (hconn->cfg.aggregation == QD_AGGREGATION_JSON) { + _encode_json_response(hreq); + } +} + +static void _encode_empty_response(qdr_http1_connection_t *hconn, _client_request_t *hreq) +{ + qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] encoding empty response", hconn->conn_id); + h1_codec_tx_response(hreq->base.lib_rs, 204, NULL, hreq->version_major, hreq->version_minor); + bool need_close; + h1_codec_tx_done(hreq->base.lib_rs, &need_close); + hreq->close_on_complete = need_close || hreq->close_on_complete; + hreq->codec_completed = true; +} // Handle disposition/settlement update for the outstanding request msg // @@ -935,7 +1182,27 @@ void qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t *adaptor, if (disp && disp != PN_RECEIVED && hreq->request_dispo == 0) { // terminal disposition hreq->request_dispo = disp; - if (disp != PN_ACCEPTED) { + if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) { + // when aggregating response from a multicast request, the + // acknowledgement of the request triggers generating the + // output from the responses received + if (settled) { + if (DEQ_IS_EMPTY(hreq->responses)) { + qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, + "[C%"PRIu64"][L%"PRIu64"] Aggregation request settled but no responses received.", hconn->conn_id, hconn->in_link_id); + _encode_empty_response(hconn, hreq); + } else { + _encode_aggregated_response(hconn, hreq); + } + _write_pending_response(hreq); + } + } else if (disp != PN_ACCEPTED) { + // no response message is going to arrive. Now what? For now fake + // a response from the server by using the codec to write an error + // response on the behalf of the server. + qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, + "[C%"PRIu64"][L%"PRIu64"] HTTP request failure, outcome=0x%"PRIx64, + hconn->conn_id, hconn->in_link_id, disp); qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, "[C%"PRIu64"][L%"PRIu64"] HTTP request msg-id=%"PRIu64" failure, outcome=0x%"PRIx64, @@ -1238,6 +1505,28 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor, _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses); assert(rmsg && rmsg->dlv == delivery); + // when aggregating responses, they are saved on the list until + // the request has been settled, then encoded in the configured + // aggregation format + if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) { + if (!qd_message_receive_complete(msg)) { + qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Response incomplete (%i responses received)", hconn->conn_id, link->identity, DEQ_SIZE(hreq->responses)); + return 0; + } + qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Received response (%i responses received), settling", hconn->conn_id, link->identity, DEQ_SIZE(hreq->responses)); + rmsg->dispo = PN_ACCEPTED; + qd_message_set_send_complete(msg); + qdr_link_flow(qdr_http1_adaptor->core, link, 1, false); + qdr_delivery_remote_state_updated(qdr_http1_adaptor->core, + rmsg->dlv, + rmsg->dispo, + true, // settled, + 0, // error + 0, // dispo data + false); + return PN_ACCEPTED; + } + if (!rmsg->dispo) { rmsg->dispo = _encode_response_message(hreq, rmsg); if (rmsg->dispo) { diff --git a/src/adaptors/http1/http1_codec.c b/src/adaptors/http1/http1_codec.c index 36d7ac1..6c03e7f 100644 --- a/src/adaptors/http1/http1_codec.c +++ b/src/adaptors/http1/http1_codec.c @@ -23,6 +23,7 @@ #include <qpid/dispatch/iterator.h> #include <qpid/dispatch/buffer.h> #include <qpid/dispatch/alloc_pool.h> +#include <qpid/dispatch/discriminator.h> #include <ctype.h> #include <stdio.h> @@ -43,7 +44,9 @@ const uint8_t CR_TOKEN = '\r'; const uint8_t LF_TOKEN = '\n'; const char *CRLF = "\r\n"; - +const char *DOUBLE_HYPHEN = "--"; +const char *CONTENT_TYPE_KEY = "Content-Type"; +const char *MULTIPART_CONTENT_TYPE_PREFIX = "multipart/mixed; boundary="; const qd_iterator_pointer_t NULL_I_PTR = {0}; // true for informational response codes @@ -181,6 +184,8 @@ struct h1_codec_connection_t { bool is_request; bool is_chunked; + char *boundary_marker;//used for multipart content + // headers provided bool hdr_content_length; } encoder; @@ -292,6 +297,10 @@ static void encoder_reset(struct encoder_t *encoder) encoder->is_request = false; encoder->is_chunked = false; encoder->hdr_content_length = false; + if (encoder->boundary_marker) { + free(encoder->boundary_marker); + encoder->boundary_marker = 0; + } } @@ -1531,22 +1540,85 @@ int h1_codec_tx_add_header(h1_codec_request_state_t *hrs, const char *key, const } +static inline void _flush_output(h1_codec_request_state_t *hrs, struct encoder_t *encoder) +{ + // flush all pending output. From this point out the outgoing queue is + // no longer used for this message + hrs->conn->config.tx_buffers(hrs, &encoder->outgoing, qd_buffer_list_length(&encoder->outgoing)); + DEQ_INIT(encoder->outgoing); + encoder->write_ptr = NULL_I_PTR; +} + static inline void _flush_headers(h1_codec_request_state_t *hrs, struct encoder_t *encoder) { if (!encoder->headers_sent) { // need to terminate any headers by sending the plain CRLF that follows // the headers write_string(encoder, CRLF); - - // flush all pending output. From this point out the outgoing queue is - // no longer used for this message - hrs->conn->config.tx_buffers(hrs, &encoder->outgoing, qd_buffer_list_length(&encoder->outgoing)); - DEQ_INIT(encoder->outgoing); - encoder->write_ptr = NULL_I_PTR; + _flush_output(hrs, encoder); encoder->headers_sent = true; } } +int h1_codec_tx_begin_multipart(h1_codec_request_state_t *hrs) +{ + h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs); + struct encoder_t *encoder = &conn->encoder; + encoder->boundary_marker = (char*) malloc(QD_DISCRIMINATOR_SIZE + 2); + qd_generate_discriminator(encoder->boundary_marker); + char *content_type = (char*) malloc(strlen(MULTIPART_CONTENT_TYPE_PREFIX) + strlen(encoder->boundary_marker) + 1); + strcpy(content_type, MULTIPART_CONTENT_TYPE_PREFIX); + strcpy(content_type + strlen(content_type), encoder->boundary_marker); + h1_codec_tx_add_header(hrs, CONTENT_TYPE_KEY, content_type); + free(content_type); + + return 0; +} + +int h1_codec_tx_begin_multipart_section(h1_codec_request_state_t *hrs) +{ + h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs); + struct encoder_t *encoder = &conn->encoder; + + //reset headers_sent flag for the new section + encoder->headers_sent = false; + write_string(encoder, CRLF); + write_string(encoder, DOUBLE_HYPHEN); + write_string(encoder, encoder->boundary_marker); + write_string(encoder, CRLF); + + return 0; +} + +int h1_codec_tx_end_multipart(h1_codec_request_state_t *hrs) +{ + h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs); + struct encoder_t *encoder = &conn->encoder; + + write_string(encoder, CRLF); + write_string(encoder, DOUBLE_HYPHEN); + write_string(encoder, encoder->boundary_marker); + write_string(encoder, DOUBLE_HYPHEN); + write_string(encoder, CRLF); + encoder->headers_sent = false; + _flush_headers(hrs, encoder); + + free(encoder->boundary_marker); + encoder->boundary_marker = 0; + + return 0; +} + + +uint64_t h1_codec_tx_multipart_section_boundary_length() +{ + return QD_DISCRIMINATOR_SIZE + 4 + 2; +} + +uint64_t h1_codec_tx_multipart_end_boundary_length() +{ + return QD_DISCRIMINATOR_SIZE + 4 + 4; +} // just forward the body chain along int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_stream_data_t *stream_data) @@ -1564,6 +1636,20 @@ int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_stream_data_t *st return 0; } +int h1_codec_tx_body_str(h1_codec_request_state_t *hrs, char *data) +{ + h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs); + struct encoder_t *encoder = &conn->encoder; + if (!encoder->headers_sent) { + // need to terminate any headers by sending the plain CRLF that follows + // the headers + write_string(encoder, CRLF); + encoder->headers_sent = true; + } + write_string(encoder, data); + _flush_output(hrs, encoder); + return 0; +} int h1_codec_tx_done(h1_codec_request_state_t *hrs, bool *need_close) { diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h index e321a1c..59074d1 100644 --- a/src/adaptors/http1/http1_private.h +++ b/src/adaptors/http1/http1_private.h @@ -120,7 +120,6 @@ struct qdr_http1_request_base_t { }; DEQ_DECLARE(qdr_http1_request_base_t, qdr_http1_request_list_t); - // A single HTTP adaptor connection. // struct qdr_http1_connection_t { @@ -141,6 +140,8 @@ struct qdr_http1_connection_t { char *address; char *site; char *host_port; + bool event_channel; + qd_http_aggregation_t aggregation; } cfg; // State if connected to an HTTP client diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c index 92ea0d1..a0837bf 100644 --- a/src/adaptors/http1/http1_server.c +++ b/src/adaptors/http1/http1_server.c @@ -155,6 +155,8 @@ static qdr_http1_connection_t *_create_server_connection(qd_http_connector_t *ct hconn->cfg.host_port = qd_strdup(bconfig->host_port); hconn->server.connector = ctor; ctor->ctx = (void*)hconn; + hconn->cfg.event_channel = bconfig->event_channel; + hconn->cfg.aggregation = bconfig->aggregation; // for initiating a connection to the server hconn->server.reconnect_timer = qd_timer(qdr_http1_adaptor->core->qd, _do_reconnect, hconn); @@ -427,6 +429,22 @@ static void _do_reconnect(void *context) "[C%"PRIu64"] Connecting to HTTP server...", conn_id); } +static void _accept_and_settle_request(_server_request_t *hreq) +{ + qdr_delivery_remote_state_updated(qdr_http1_adaptor->core, + hreq->request_dlv, + hreq->request_dispo, + true, // settled + 0, // error + 0, // dispo data + false); + // can now release the delivery + qdr_delivery_set_context(hreq->request_dlv, 0); + qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 adaptor request settled"); + hreq->request_dlv = 0; + + hreq->request_settled = true; +} // Proton Raw Connection Events // @@ -613,7 +631,8 @@ static bool _process_requests(qdr_http1_connection_t *hconn) if (hreq->request_dlv) { qd_message_set_discard(qdr_delivery_message(hreq->request_dlv), true); - if (!hreq->request_acked || !hreq->request_settled) { + if ((!hreq->request_acked || !hreq->request_settled) && + hconn->cfg.aggregation == QD_AGGREGATION_NONE) { if (hreq->request_dispo == 0) hreq->request_dispo = (hreq->base.out_http1_octets > 0 @@ -676,8 +695,9 @@ static bool _process_requests(qdr_http1_connection_t *hconn) // hreq->out_data.fifo ==> request message written to raw conn // DEQ_IS_EMPTY(hreq->responses) - if (!hreq->request_acked || (!hreq->request_settled - && DEQ_IS_EMPTY(hreq->responses))) { + if ((!hreq->request_acked || (!hreq->request_settled + && DEQ_IS_EMPTY(hreq->responses))) + && hconn->cfg.aggregation == QD_AGGREGATION_NONE) { assert(hreq->request_dlv); assert(hreq->request_dispo == PN_ACCEPTED); @@ -861,7 +881,7 @@ static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo qdr_http1_connection_t *hconn = hreq->base.hconn; qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, - "[C%"PRIu64"][L%"PRIu64" HTTP response headers done.", + "[C%"PRIu64"][L%"PRIu64"] HTTP response headers done.", hconn->conn_id, hconn->in_link_id); // expect: running incoming request at tail @@ -978,7 +998,7 @@ static void _server_rx_done_cb(h1_codec_request_state_t *hrs) } } - if (rmsg->dlv) { + if (rmsg->dlv && hconn->cfg.aggregation == QD_AGGREGATION_NONE) { // We've finished the delivery, and don't care about outcome/settlement _server_response_msg_free(hreq, rmsg); } @@ -1064,7 +1084,10 @@ void qdr_http1_server_core_link_flow(qdr_http1_adaptor_t *adaptor, // stop here since response must be complete before we can deliver the next one. break; } - + if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) { + // stop here since response should not be freed until it is accepted + break; + } // else the delivery is complete no need to save it _server_response_msg_free(hreq, rmsg); rmsg = DEQ_HEAD(hreq->responses); @@ -1090,9 +1113,17 @@ void qdr_http1_server_core_delivery_update(qdr_http1_adaptor_t *adaptor, // Not much can be done with error dispositions (I think) if (disp != PN_ACCEPTED) { qd_log(adaptor->log, QD_LOG_WARNING, - "[C%"PRIu64"][L%"PRIu64"] response message not received, outcome=0x%"PRIx64, + "[C%"PRIu64"][L%"PRIu64"] response message was not accepted, outcome=0x%"PRIx64, hconn->conn_id, hconn->in_link_id, disp); } + if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) { + _server_request_t *hreq = (_server_request_t*)hbase; + _accept_and_settle_request(hreq); + hreq->request_acked = true; + qd_log(adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] request accepted", hconn->conn_id, hconn->in_link_id); + _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses); + _server_response_msg_free(hreq, rmsg); + } } diff --git a/src/adaptors/http_common.c b/src/adaptors/http_common.c index 9034e37..fc15c33 100644 --- a/src/adaptors/http_common.c +++ b/src/adaptors/http_common.c @@ -32,6 +32,7 @@ ALLOC_DEFINE(qd_http_connector_t); static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_http_bridge_config_t *config, qd_entity_t* entity) { char *version_str = 0; + char *aggregation_str = 0; qd_error_clear(); ZERO(config); @@ -43,6 +44,8 @@ static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_http_bridge_config_t config->address = qd_entity_get_string(entity, "address"); CHECK(); config->site = qd_entity_opt_string(entity, "siteId", 0); CHECK(); version_str = qd_entity_get_string(entity, "protocolVersion"); CHECK(); + aggregation_str = qd_entity_opt_string(entity, "aggregation", 0); CHECK(); + config->event_channel = qd_entity_opt_bool(entity, "eventChannel", false); CHECK(); if (strcmp(version_str, "HTTP2") == 0) { config->version = VERSION_HTTP2; @@ -52,6 +55,16 @@ static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_http_bridge_config_t free(version_str); version_str = 0; + if (aggregation_str && strcmp(aggregation_str, "json") == 0) { + config->aggregation = QD_AGGREGATION_JSON; + } else if (aggregation_str && strcmp(aggregation_str, "multipart") == 0) { + config->aggregation = QD_AGGREGATION_MULTIPART; + } else { + config->aggregation = QD_AGGREGATION_NONE; + } + free(aggregation_str); + aggregation_str = 0; + int hplen = strlen(config->host) + strlen(config->port) + 2; config->host_port = malloc(hplen); snprintf(config->host_port, hplen, "%s:%s", config->host, config->port); diff --git a/src/adaptors/http_common.h b/src/adaptors/http_common.h index 1fa5fd5..4d2ab96 100644 --- a/src/adaptors/http_common.h +++ b/src/adaptors/http_common.h @@ -34,6 +34,12 @@ typedef enum { VERSION_HTTP2, } qd_http_version_t; +typedef enum { + QD_AGGREGATION_NONE, + QD_AGGREGATION_JSON, + QD_AGGREGATION_MULTIPART +} qd_http_aggregation_t; + typedef struct qd_http_bridge_config_t { char *name; char *host; @@ -42,6 +48,8 @@ typedef struct qd_http_bridge_config_t { char *site; char *host_port; qd_http_version_t version; + bool event_channel; + qd_http_aggregation_t aggregation; } qd_http_bridge_config_t; void qd_http_free_bridge_config(qd_http_bridge_config_t *config); diff --git a/src/python_embedded.c b/src/python_embedded.c index 9465929..2207563 100644 --- a/src/python_embedded.c +++ b/src/python_embedded.c @@ -907,3 +907,72 @@ void qd_python_unlock(qd_python_lock_state_t lock_state) lock_held = false; sys_mutex_unlock(ilock); } + +void qd_json_msgs_init(PyObject **msgs) +{ + qd_python_lock_state_t lock_state = qd_python_lock(); + *msgs = PyList_New(0); + qd_python_unlock(lock_state); +} + +void qd_json_msgs_done(PyObject *msgs) +{ + qd_python_lock_state_t lock_state = qd_python_lock(); + Py_DECREF(msgs); + qd_python_unlock(lock_state); +} + +void qd_json_msgs_append(PyObject *msgs, qd_message_t *msg) +{ + // + // Parse the message through the body and exit if the message is not well formed. + // + if (qd_message_check_depth(msg, QD_DEPTH_BODY) != QD_MESSAGE_DEPTH_OK) + return; + + // This is called from non-python threads so we need to acquire the GIL to use python APIS. + qd_python_lock_state_t lock_state = qd_python_lock(); + PyObject *py_msg = PyObject_CallFunction(message_type, NULL); + if (!py_msg) { + qd_error_py(); + qd_python_unlock(lock_state); + return; + } + iter_to_py_attr(qd_message_field_iterator(msg, QD_FIELD_CONTENT_TYPE), py_iter_copy, py_msg, "content_type"); + iter_to_py_attr(qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES), py_iter_parse, py_msg, "properties"); + iter_to_py_attr(qd_message_field_iterator(msg, QD_FIELD_BODY), py_iter_parse, py_msg, "body"); + + PyList_Append(msgs, py_msg); + + Py_DECREF(py_msg); + qd_error_py(); + qd_python_unlock(lock_state); +} + +char *qd_json_msgs_string(PyObject *msgs) +{ + qd_python_lock_state_t lock_state = qd_python_lock(); + + PyObject *message_module = PyImport_ImportModule("qpid_dispatch_internal.router.message"); + if (!message_module) { + qd_python_unlock(lock_state); + return NULL; + } + PyObject *messages_to_json = PyObject_GetAttrString(message_module, "messages_to_json"); + Py_DECREF(message_module); + if (!messages_to_json) { + qd_python_unlock(lock_state); + return NULL; + } + + PyObject *py_value = PyObject_CallFunction(messages_to_json, "O", msgs); + Py_DECREF(messages_to_json); + if (!py_value) { + qd_python_unlock(lock_state); + return NULL; + } + char *c_value = py_string_2_c(py_value); + Py_XDECREF(py_value); + qd_python_unlock(lock_state); + return c_value; +} diff --git a/src/python_private.h b/src/python_private.h index b344a0f..6cf7d28 100644 --- a/src/python_private.h +++ b/src/python_private.h @@ -20,6 +20,7 @@ */ #include <Python.h> #include <stdint.h> +#include <qpid/dispatch/message.h> #if PY_MAJOR_VERSION <= 2 // deal with the two integer types in Python2 @@ -43,4 +44,9 @@ char *py_string_2_c(PyObject *py_str); // buffer. char *py_obj_2_c_string(PyObject *py_obj); +void qd_json_msgs_init(PyObject **msgs); +void qd_json_msgs_done(PyObject *msgs); +void qd_json_msgs_append(PyObject *msgs, qd_message_t *msg); +char *qd_json_msgs_string(PyObject *msgs); + #endif --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org