This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push: new c20b29a DISPATCH-1744: refactor common HTTP code c20b29a is described below commit c20b29a1f70a507fd04a26b2d1692aa522fdb9b1 Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Thu Aug 6 10:22:35 2020 -0400 DISPATCH-1744: refactor common HTTP code This closes #815 --- include/qpid/dispatch/http1_lib.h | 118 +++ python/qpid_dispatch/management/qdrouter.json | 20 + src/CMakeLists.txt | 3 + src/adaptors/http1/http1_adaptor.c | 56 + src/adaptors/http1/http1_lib.c | 1377 +++++++++++++++++++++++++ src/adaptors/http_adaptor.c | 130 +-- src/adaptors/http_adaptor.h | 36 +- src/adaptors/http_common.c | 243 +++++ src/adaptors/http_common.h | 108 ++ tests/system_tests_qdmanage.py | 2 +- 10 files changed, 1947 insertions(+), 146 deletions(-) diff --git a/include/qpid/dispatch/http1_lib.h b/include/qpid/dispatch/http1_lib.h new file mode 100644 index 0000000..58385e0 --- /dev/null +++ b/include/qpid/dispatch/http1_lib.h @@ -0,0 +1,118 @@ +#ifndef http1_lib_H +#define http1_lib_H 1 +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <qpid/dispatch/buffer.h> + +#include <inttypes.h> + +#define HTTP1_VERSION_1_1 "HTTP/1.1" +#define HTTP1_VERSION_1_0 "HTTP/1.0" + +typedef struct http1_conn_t http1_conn_t; +typedef struct http1_transfer_t http1_transfer_t; + + +typedef enum { + HTTP1_CONN_CLIENT, // connection initiated by client + HTTP1_CONN_SERVER, // connection to server +} http1_conn_type_t; + +typedef enum { + HTTP1_STATUS_BAD_REQ = 400, + HTTP1_STATUS_SERVER_ERR = 500, + HTTP1_STATUS_BAD_VERSION = 505, +} http1_status_code_t; + +typedef struct http1_conn_config_t { + + http1_conn_type_t type; + + // called with output data to write to the network + void (*conn_tx_data)(http1_conn_t *conn, qd_buffer_list_t *data, size_t offset, unsigned int len); + + // @TODO(kgiusti) - remove? + //void (*conn_error)(http1_conn_t *conn, int code, const char *reason); + + // + // RX message callbacks + // + + // HTTP request received - new transfer created (xfer). This xfer must be + // supplied in the http1_response() method + int (*xfer_rx_request)(http1_transfer_t *xfer, + const char *method, + const char *target, + const char *version); + + // HTTP response received - the transfer comes from the return value of the + // corresponding http1_request method. Note well that if status_code is + // Informational (1xx) then this response is NOT the last response for the + // current request (See RFC7231, 6.2 Informational 1xx). The xfer_done + // callback will be called after the LAST response has been received. + // + int (*xfer_rx_response)(http1_transfer_t *xfer, + const char *version, + int status_code, + const char *reason_phrase); + + int (*xfer_rx_header)(http1_transfer_t *xfer, const char *key, const char *value); + int (*xfer_rx_headers_done)(http1_transfer_t *xfer); + + int (*xfer_rx_body)(http1_transfer_t *xfer, qd_buffer_list_t *body, size_t offset, size_t len); + + void (*xfer_rx_done)(http1_transfer_t *xfer); + + // Invoked when the request/response(s) exchange has completed + // + void (*xfer_done)(http1_transfer_t *xfer); +} http1_conn_config_t; + + +http1_conn_t *http1_connection(http1_conn_config_t *config, void *context); +void http1_connection_close(http1_conn_t *conn); +void *http1_connection_get_context(http1_conn_t *conn); + +// push inbound network data into the http1 library +int http1_connection_rx_data(http1_conn_t *conn, qd_buffer_list_t *data, size_t len); + + +// +// API for sending HTTP/1.1 messages +// +void http1_transfer_set_context(http1_transfer_t *xfer, void *context); +void *http1_transfer_get_context(const http1_transfer_t *xfer); +http1_conn_t *http1_transfer_get_connection(const http1_transfer_t *xfer); + +// initiate a request - this creates a new message transfer context +http1_transfer_t *http1_tx_request(http1_conn_t *conn, const char *method, const char *target, const char *version); + +// respond to a received request - the transfer context should be from the corresponding xfer_rx_request callback +int http1_tx_response(http1_transfer_t *xfer, const char *version, int status_code, const char *reason_phrase); + +int http1_tx_add_header(http1_transfer_t *xfer, const char *key, const char *value); +int http1_tx_body(http1_transfer_t *xfer, qd_buffer_list_t *data, size_t offset, size_t len); +int http1_tx_done(http1_transfer_t *xfer); + + + + + +#endif // http1_lib_H diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index dc1dcb7..421fa0a 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -1103,6 +1103,16 @@ "required": false, "description": "Name of the sslProfile..", "create": true + }, + "protocolVersion": { + "description": "The version of the HTTP protocol supported by this listener.", + "type": [ + "HTTP/1.x", + "HTTP/2.0" + ], + "default": "HTTP/1.x", + "required": false, + "create": true } } }, @@ -1127,6 +1137,16 @@ "type": "string", "create": true + }, + "protocolVersion": { + "description": "The version of the HTTP protocol supported by this connector.", + "type": [ + "HTTP/1.x", + "HTTP/2.0" + ], + "default": "HTTP/1.x", + "required": false, + "create": true } } }, diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 753436b..f7ac67e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -39,7 +39,10 @@ add_custom_command ( # Build the qpid-dispatch library. set(qpid_dispatch_SOURCES adaptors/reference_adaptor.c + adaptors/http_common.c adaptors/http_adaptor.c + adaptors/http1/http1_lib.c + adaptors/http1/http1_adaptor.c alloc_pool.c amqp.c bitmask.c diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c new file mode 100644 index 0000000..beb9eb4 --- /dev/null +++ b/src/adaptors/http1/http1_adaptor.c @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/http1_lib.h> +#include <qpid/dispatch/protocol_adaptor.h> +#include <qpid/dispatch/message.h> +#include "adaptors/http_common.h" + +#include <stdio.h> +#include <inttypes.h> + + +typedef struct qd_http1_adaptor_t { + qdr_core_t *core; + qdr_protocol_adaptor_t *adaptor; + qd_http_lsnr_list_t listeners; + qd_http_connector_list_t connectors; + qd_log_source_t *log; +} qd_http1_adaptor_t; + +//static qd_http1_adaptor_t *http1_adaptor; + +#define BUFF_BATCH 16 + + +// dummy for now: +qd_http_lsnr_t *qd_http1_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity) +{ + return 0; +} + +void qd_http1_delete_listener(qd_dispatch_t *qd, qd_http_lsnr_t *listener) {} + +qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity) +{ + return 0; +} + +void qd_http1_delete_connector(qd_dispatch_t *qd, qd_http_connector_t *conn) {} + diff --git a/src/adaptors/http1/http1_lib.c b/src/adaptors/http1/http1_lib.c new file mode 100644 index 0000000..991d492 --- /dev/null +++ b/src/adaptors/http1/http1_lib.c @@ -0,0 +1,1377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/dispatch/http1_lib.h> + +#include <qpid/dispatch/iterator.h> +#include <qpid/dispatch/buffer.h> +#include <qpid/dispatch/alloc_pool.h> + +#include <ctype.h> +#include <stdio.h> +#include <string.h> + + +const uint8_t CR_TOKEN = '\r'; +const uint8_t LF_TOKEN = '\n'; +const char *CRLF = "\r\n"; + +const qd_iterator_pointer_t NULL_I_PTR = {0}; + +// true for informational response codes +#define IS_INFO_RESPONSE(code) ((code) / 100 == 1) + +typedef enum { + HTTP1_MSG_STATE_START = 0, // parsing start-line + HTTP1_MSG_STATE_HEADERS, // parsing headers + HTTP1_MSG_STATE_BODY, // parsing body + HTTP1_MSG_STATE_DONE, // parsing complete +} http1_msg_state_t; + + +typedef enum { + HTTP1_CHUNK_HEADER = 0, // waiting for chunk header + HTTP1_CHUNK_DATA, // reading chunk data + HTTP1_CHUNK_TRAILERS, // reading until lone CRLF +} http1_chunk_state_t; + + +typedef struct scratch_buffer_t { + uint8_t *buf; + size_t size; // of buffer, not contents! +} scratch_buffer_t; + + +// state for a single request-response transaction +// +struct http1_transfer_t { + DEQ_LINKS(struct http1_transfer_t); + void *context; + http1_conn_t *conn; + uint32_t response_code; + + bool close_on_done; // true if connection must be closed when transfer completes + bool is_head_method; // true if request method is HEAD + bool is_connect_method; // true if request method is CONNECT TODO(kgiusti): supported? +}; +DEQ_DECLARE(http1_transfer_t, http1_transfer_list_t); +ALLOC_DECLARE(http1_transfer_t); +ALLOC_DEFINE(http1_transfer_t); + + +// The HTTP/1.1 connection +// +struct http1_conn_t { + void *context; + + // http requests are added to tail, + // in-progress response is at head + http1_transfer_list_t xfers; + + // Decoder for current incoming msg. + // + // incoming: holds the raw data received by the proactor from this + // connection. + // + // read_ptr: points to the next octet to be decoded on the incoming buffer + // list. Remaining is the length of the raw data to be decoded. + // + // body_ptr: points to the first unconsumed octet of the message + // body. Remaining is the number of octets that may be consumed. + // Invariant: body_ptr.buffer always points to the incoming.head as body + // data is being parsed. + // + struct decoder_t { + qd_buffer_list_t incoming; + qd_iterator_pointer_t read_ptr; + qd_iterator_pointer_t body_ptr; + + http1_transfer_t *xfer; // current transfer + http1_msg_state_t state; + scratch_buffer_t scratch; + int error; + const char *error_msg; + + uint64_t content_length; + http1_chunk_state_t chunk_state; + uint64_t chunk_length; + + bool is_request; + bool is_chunked; + bool is_1_0; + + // decoded headers + bool hdr_transfer_encoding; + bool hdr_content_length; + } decoder; + + // Encoder for current outgoing msg. + // outgoing: holds the encoded data that needs to be sent to proactor for + // sending out this connection + // write_ptr: points to the first empty octet to be written to by the + // encoder. Remaining is the total unused space in the outgoing list + // (capacity) + // Note that the outgoing list and the write_ptr are only used for the + // start line and headers. Body content buffer chains are past directly to + // the connection without encoding. + // + struct encoder_t { + qd_buffer_list_t outgoing; + qd_iterator_pointer_t write_ptr; + http1_transfer_t *xfer; // current transfer + + bool is_request; + bool crlf_sent; // true if the CRLF after headers has been sent + } encoder; + + http1_conn_config_t config; +}; +ALLOC_DECLARE(http1_conn_t); +ALLOC_DEFINE(http1_conn_t); + +static void decoder_reset(struct decoder_t *d); +static void encoder_reset(struct encoder_t *e); + + +// Create a new transfer - this is done when a new http request occurs +// Keep oldest outstanding tranfer at DEQ_HEAD(conn->xfers) +static http1_transfer_t *http1_transfer(http1_conn_t *conn) +{ + http1_transfer_t *xfer = new_http1_transfer_t(); + ZERO(xfer); + xfer->conn = conn; + DEQ_INSERT_TAIL(conn->xfers, xfer); + return xfer; +} + + +static void http1_transfer_free(http1_transfer_t *xfer) +{ + if (xfer) { + http1_conn_t *conn = xfer->conn; + assert(conn->decoder.xfer != xfer); + assert(conn->encoder.xfer != xfer); + DEQ_REMOVE(conn->xfers, xfer); + free_http1_transfer_t(xfer); + } +} + + +http1_conn_t *http1_connection(http1_conn_config_t *config, void *context) +{ + http1_conn_t *conn = new_http1_conn_t(); + ZERO(conn); + + conn->context = context; + conn->config = *config; + DEQ_INIT(conn->xfers); + + encoder_reset(&conn->encoder); + DEQ_INIT(conn->encoder.outgoing); + conn->encoder.write_ptr = NULL_I_PTR; + + decoder_reset(&conn->decoder); + DEQ_INIT(conn->decoder.incoming); + conn->decoder.read_ptr = NULL_I_PTR; + + return conn; +} + + +// Close the connection conn. +// +// This cancels all outstanding transfers and destroys the connection. If +// there is an incoming response message body being parsed when this function +// is invoked it will signal the end of the message. +// +void http1_connection_close(http1_conn_t *conn) +{ + if (conn) { + struct decoder_t *decoder = &conn->decoder; + if (!decoder->error) { + if (decoder->state == HTTP1_MSG_STATE_BODY + && decoder->xfer) { + + // terminate the incoming message as the server has closed the + // connection to indicate the end of the message + conn->config.xfer_rx_done(decoder->xfer); + } + + // notify any outstanding transfers + http1_transfer_t *xfer = DEQ_HEAD(conn->xfers); + while (xfer) { + conn->config.xfer_done(xfer); + xfer = DEQ_NEXT(xfer); + } + } + + decoder_reset(&conn->decoder); + encoder_reset(&conn->encoder); + qd_buffer_list_free_buffers(&conn->decoder.incoming); + qd_buffer_list_free_buffers(&conn->encoder.outgoing); + free(conn->decoder.scratch.buf); + + http1_transfer_t *xfer = DEQ_HEAD(conn->xfers); + while (xfer) { + http1_transfer_free(xfer); // removes from conn->xfers list + xfer = DEQ_HEAD(conn->xfers); + } + + free_http1_conn_t(conn); + } +} + + +// reset the rx decoder state after message received +// +static void decoder_reset(struct decoder_t *decoder) +{ + // do not touch the read_ptr or incoming buffer list as they + // track the current position in the incoming data stream + + decoder->body_ptr = NULL_I_PTR; + decoder->xfer = 0; + decoder->state = HTTP1_MSG_STATE_START; + decoder->content_length = 0; + decoder->chunk_state = HTTP1_CHUNK_HEADER; + decoder->chunk_length = 0; + decoder->error = 0; + decoder->error_msg = 0; + + decoder->is_request = false; + decoder->is_chunked = false; + decoder->is_1_0 = false; + + decoder->hdr_transfer_encoding = false; + decoder->hdr_content_length = false; +} + +// reset the tx encoder after message sent +static void encoder_reset(struct encoder_t *encoder) +{ + // do not touch the write_ptr or the outgoing queue as there may be more messages to send. + encoder->xfer = 0; + encoder->is_request = false; + encoder->crlf_sent = false; +} + + +// ensure the encoder has at least capacity octets available +// +static void ensure_outgoing_capacity(struct encoder_t *encoder, size_t capacity) +{ + while (encoder->write_ptr.remaining < capacity) { + qd_buffer_t *buf = qd_buffer(); + DEQ_INSERT_TAIL(encoder->outgoing, buf); + encoder->write_ptr.remaining += qd_buffer_capacity(buf); + } + if (!encoder->write_ptr.buffer) { + encoder->write_ptr.buffer = DEQ_HEAD(encoder->outgoing); + encoder->write_ptr.cursor = qd_buffer_cursor(encoder->write_ptr.buffer); + } +} + + +// Write a C string to the encoder. +// +static void write_string(struct encoder_t *encoder, const char *string) +{ + size_t needed = strlen(string); + ensure_outgoing_capacity(encoder, needed); + + qd_iterator_pointer_t *wptr = &encoder->write_ptr; + while (needed) { + if (qd_buffer_capacity(wptr->buffer) == 0) { + wptr->buffer = DEQ_NEXT(wptr->buffer); + wptr->cursor = qd_buffer_base(wptr->buffer); + } + + size_t avail = MIN(needed, qd_buffer_capacity(wptr->buffer)); + memcpy(wptr->cursor, string, avail); + qd_buffer_insert(wptr->buffer, avail); + wptr->cursor += avail; + wptr->remaining -= avail; + string += avail; + needed -= avail; + } +} + + +// +static inline size_t skip_octets(qd_iterator_pointer_t *data, size_t amount) +{ + size_t count = 0; + amount = MIN(data->remaining, amount); + while (count < amount) { + if (data->cursor == qd_buffer_cursor(data->buffer)) { + data->buffer = DEQ_NEXT(data->buffer); + assert(data->buffer); // else data->remaining is bad + data->cursor = qd_buffer_base(data->buffer); + } + size_t available = qd_buffer_cursor(data->buffer) - data->cursor; + available = MIN(available, amount - count); + data->cursor += available; + count += available; + } + data->remaining -= amount; + return amount; +} + +// consume next octet and advance the pointer +static inline bool get_octet(qd_iterator_pointer_t *data, uint8_t *octet) +{ + if (data->remaining > 0) { + if (data->cursor == qd_buffer_cursor(data->buffer)) { + data->buffer = DEQ_NEXT(data->buffer); + data->cursor = qd_buffer_base(data->buffer); + } + *octet = *data->cursor; + data->cursor += 1; + data->remaining -= 1; + return true; + } + return false; +} + + +// True if line contains just "CRLF" +// +static bool is_empty_line(const qd_iterator_pointer_t *line) +{ + if (line->remaining == 2) { + qd_iterator_pointer_t tmp = *line; + uint8_t octet; + return (get_octet(&tmp, &octet) && octet == CR_TOKEN + && get_octet(&tmp, &octet) && octet == LF_TOKEN); + } + return false; +} + +static void debug_print_iterator_pointer(const char *prefix, const qd_iterator_pointer_t *ptr) +{ + qd_iterator_pointer_t tmp = *ptr; + fprintf(stdout, "%s '", prefix); + size_t len = MIN(tmp.remaining, 80); + uint8_t octet; + while (len-- > 0 && get_octet(&tmp, &octet)) { + fputc(octet, stdout); + } + fprintf(stdout, "%s'\n", (tmp.remaining) ? " <truncated>" : ""); + fflush(stdout); +} + + +// read a CRLF terminated line starting at 'data'. +// On success, 'data' is advanced to the octet following the LF and 'line' is +// set to the read line (including trailing CRLF). Returns false if no CRLF found +// +static bool read_line(qd_iterator_pointer_t *data, qd_iterator_pointer_t *line) +{ + qd_iterator_pointer_t tmp = *data; + + *line = *data; + line->remaining = 0; + + bool eol = false; + + uint8_t octet; + while (!eol && get_octet(&tmp, &octet)) { + line->remaining += 1; + if (octet == CR_TOKEN) { + if (get_octet(&tmp, &octet)) { + line->remaining += 1; + if (octet == LF_TOKEN) { + eol = true; + } + } + } + } + + if (eol) { + *data = tmp; + return true; + } else { + *line = NULL_I_PTR; + return false; + } +} + + +static bool ensure_scratch_size(scratch_buffer_t *b, size_t required) +{ + if (b->size < required) { + if (b->buf) + free(b->buf); + b->size = required; + b->buf = malloc(b->size); + } + + // @TODO(kgiusti): deal with malloc failure + return true; +} + + +// trims any optional whitespace characters at the start of 'line' +// RFC7230 defines OWS as zero or more spaces or horizontal tabs +// +static void trim_whitespace(qd_iterator_pointer_t *line) +{ + qd_iterator_pointer_t ptr = *line; + size_t skip = 0; + uint8_t octet; + while (get_octet(&ptr, &octet) && isblank(octet)) + skip += 1; + if (skip) + skip_octets(line, skip); +} + +// copy out iterator to a buffer and null terminate. Return # of bytes written +// to str including terminating null. +static size_t pointer_2_str(const qd_iterator_pointer_t *line, unsigned char *str, size_t len) +{ + assert(len); + qd_iterator_pointer_t tmp = *line; + uint8_t *ptr = (uint8_t *)str; + len -= 1; // reserve for null terminator + while (len-- > 0 && get_octet(&tmp, ptr)) + ++ptr; + *ptr++ = 0; + return ptr - (uint8_t *)str; +} + + +// Parse out a token as defined by RFC7230 and store the result in 'token'. +// 'line' is advanced past the token. This is used for parsing fields that +// RFC7230 defines as 'tokens'. +// +static bool parse_token(qd_iterator_pointer_t *line, qd_iterator_pointer_t *token) +{ + static const char *TOKEN_EXTRA = "!#$%&’*+-.^_‘|~"; + + trim_whitespace(line); + qd_iterator_pointer_t tmp = *line; + *token = tmp; + size_t len = 0; + uint8_t octet; + while (get_octet(&tmp, &octet) + && (('A' <= octet && octet <= 'Z') || + ('a' <= octet && octet <= 'z') || + ('0' <= octet && octet <= '9') || + (strchr(TOKEN_EXTRA, octet)))) { + len++; + } + + if (len) { + token->remaining = len; + skip_octets(line, len); + return true; + } + *token = NULL_I_PTR; + return false; +} + + +// Parse out a text field delineated by whitespace. +// 'line' is advanced past the field. +// +static bool parse_field(qd_iterator_pointer_t *line, qd_iterator_pointer_t *field) +{ + trim_whitespace(line); + qd_iterator_pointer_t tmp = *line; + *field = tmp; + size_t len = 0; + uint8_t octet; + while (get_octet(&tmp, &octet) && !isspace(octet)) + len++; + + if (len) { + field->remaining = len; + skip_octets(line, len); + return true; + } + *field = NULL_I_PTR; + return false; +} + + +// parse the HTTP/1.1 request line: +// "method SP request-target SP HTTP-version CRLF" +// +static bool parse_request_line(http1_conn_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line) +{ + qd_iterator_pointer_t method = {0}; + qd_iterator_pointer_t target = {0}; + qd_iterator_pointer_t version = {0}; + + if (!parse_token(line, &method) || + !parse_field(line, &target) || + !parse_field(line, &version)) { + + decoder->error_msg = "Malformed request line"; + decoder->error = HTTP1_STATUS_BAD_REQ; + return decoder->error; + } + + // translate iterator pointers to C strings + ensure_scratch_size(&decoder->scratch, method.remaining + target.remaining + version.remaining + 3); + uint8_t *ptr = decoder->scratch.buf; + size_t avail = decoder->scratch.size; + + uint8_t *method_str = ptr; + size_t offset = pointer_2_str(&method, method_str, avail); + ptr += offset; + avail -= offset; + + uint8_t *target_str = ptr; + offset = pointer_2_str(&target, target_str, avail); + ptr += offset; + avail += offset; + + uint8_t *version_str = ptr; + pointer_2_str(&version, version_str, avail); + + uint32_t major = 0; + uint32_t minor = 0; + if (sscanf((char*)version_str, "HTTP/%"SCNu32".%"SCNu32, &major, &minor) != 2) { + decoder->error_msg = "Malformed version in request"; + decoder->error = HTTP1_STATUS_BAD_REQ; + return decoder->error; + } + + if (major != 1 || minor > 1) { + decoder->error_msg = "Unsupported HTTP version"; + decoder->error = HTTP1_STATUS_BAD_VERSION; + return decoder->error; + } + + http1_transfer_t *xfer = http1_transfer(conn); + + // check for methods that do not support body content in the response: + if (strcmp((char*)method_str, "HEAD") == 0) + xfer->is_head_method = true; + else if (strcmp((char*)method_str, "CONNECT") == 0) + xfer->is_connect_method = true; + + decoder->xfer = xfer; + decoder->is_request = true; + decoder->is_1_0 = (minor == 0); + + decoder->error = conn->config.xfer_rx_request(xfer, (char*)method_str, (char*)target_str, (char*)version_str); + if (decoder->error) + decoder->error_msg = "xfer_rx_request callback error"; + return decoder->error; +} + + +// parse the HTTP/1.1 response line +// "HTTP-version SP status-code [SP reason-phrase] CRLF" +// +static int parse_response_line(http1_conn_t *conn, struct decoder_t *decoder, qd_iterator_pointer_t *line) +{ + qd_iterator_pointer_t version = {0}; + qd_iterator_pointer_t status_code = {0}; + qd_iterator_pointer_t reason = {0}; + + if (!parse_field(line, &version) + || !parse_field(line, &status_code) + || status_code.remaining != 3) { + + decoder->error_msg = "Malformed response status line"; + decoder->error = HTTP1_STATUS_SERVER_ERR; + return decoder->error; + } + + // Responses arrive in the same order as requests are generated so this new + // response corresponds to head xfer + http1_transfer_t *xfer = DEQ_HEAD(conn->xfers); + if (!xfer) { + // receiving a response without a corresponding request + decoder->error_msg = "Spurious HTTP response received"; + decoder->error = HTTP1_STATUS_SERVER_ERR; + return decoder->error; + } + + assert(!decoder->xfer); // state machine violation + assert(xfer->response_code == 0); + + decoder->xfer = xfer; + + unsigned char code_str[4]; + pointer_2_str(&status_code, code_str, 4); + xfer->response_code = atoi((char*) code_str); + + // the reason phrase is optional, and may contain spaces + + reason = *line; + if (reason.remaining >= 2) // expected for CRLF + reason.remaining -= 2; + trim_whitespace(&reason); + + // convert to C strings + ensure_scratch_size(&decoder->scratch, version.remaining + reason.remaining + 2); + uint8_t *ptr = decoder->scratch.buf; + size_t avail = decoder->scratch.size; + + uint8_t *version_str = ptr; + size_t offset = pointer_2_str(&version, version_str, avail); + ptr += offset; + avail -= offset; + + uint8_t *reason_str = ptr; + offset = pointer_2_str(&reason, reason_str, avail); + + uint32_t major = 0; + uint32_t minor = 0; + if (sscanf((char*)version_str, "HTTP/%"SCNu32".%"SCNu32, &major, &minor) != 2) { + decoder->error_msg = "Malformed version in response"; + decoder->error = HTTP1_STATUS_SERVER_ERR; + return decoder->error; + } + + if (major != 1 || minor > 1) { + decoder->error_msg = "Unsupported HTTP version"; + decoder->error = HTTP1_STATUS_BAD_VERSION; + return decoder->error; + } + + decoder->is_request = false; + decoder->is_1_0 = (minor == 0); + + decoder->error = conn->config.xfer_rx_response(decoder->xfer, (char*)version_str, + xfer->response_code, + (offset) ? (char*)reason_str: 0); + if (decoder->error) + decoder->error_msg = "xfer_rx_response callback error"; + + return decoder->error; +} + + +// parse the first line of an incoming http message +// +static bool parse_start_line(http1_conn_t *conn, struct decoder_t *decoder) +{ + qd_iterator_pointer_t *rptr = &decoder->read_ptr; + qd_iterator_pointer_t line; + + if (read_line(rptr, &line)) { + debug_print_iterator_pointer("start line:", &line); + + if (!is_empty_line(&line)) { // RFC7230: ignore any preceding CRLF + if (conn->config.type == HTTP1_CONN_CLIENT) { + parse_request_line(conn, decoder, &line); + } else { + parse_response_line(conn, decoder, &line); + } + conn->decoder.state = HTTP1_MSG_STATE_HEADERS; + } + return !!rptr->remaining; + } + + return false; // pend for more input +} + +// +// Header parsing +// + +// Called after the last incoming header was decoded and passed to the +// application +// +static bool process_headers_done(http1_conn_t *conn, struct decoder_t *decoder) +{ + // Flush all buffers processed so far - no longer needed + + qd_buffer_t *head = DEQ_HEAD(decoder->incoming); + while (head && head != decoder->read_ptr.buffer) { + DEQ_REMOVE_HEAD(decoder->incoming); + qd_buffer_free(head); + head = DEQ_HEAD(decoder->incoming); + } + + // perform any post-headers validation: + + if (decoder->is_request) { + if (decoder->hdr_transfer_encoding && !decoder->is_chunked) { + // RFC7230 Message Body Length: If a Transfer-Encoding header field + // is present in a request and the chunked transfer coding is not + // the final encoding, the message body length cannot be determined + // reliably; the server MUST respond with the 400 (Bad Request) + // status code and then close the connection. + decoder->error_msg = "Non-chunked Tranfer-Encoding in request"; + decoder->error = HTTP1_STATUS_BAD_REQ; + return false; + } + } + + decoder->error = conn->config.xfer_rx_headers_done(decoder->xfer); + if (decoder->error) { + decoder->error_msg = "xfer_rx_headers_done callback error"; + return false; + } + + // determine if a body is present (ref RFC7230 sec 3.3.3 Message Body Length) + bool has_body; + if (decoder->is_request) { + // an http request will have a body ONLY if either chunked transfer or + // non-zero Content-Length header was given. + has_body = (decoder->is_chunked || decoder->content_length); + + } else { + // An HTTP response has a body if request method is NOT HEAD or CONNECT AND + // the response code indicates a body. A body will either have a specific + // size via Content-Length or chunked encoder, OR its length is unspecified + // and the message body is terminated by closing the connection. + // + http1_transfer_t *xfer = decoder->xfer; + has_body = !(xfer->is_head_method || + xfer->is_connect_method || + xfer->response_code == 204 || // No Content + xfer->response_code == 205 || // Reset Content + xfer->response_code == 304 || // Not Modified + IS_INFO_RESPONSE(xfer->response_code)); + if (has_body) { + // no body if explicit Content-Length of zero + if (decoder->hdr_content_length && decoder->content_length == 0) { + has_body = false; + } + } + } + + if (has_body) { + // start tracking the body buffer chain + decoder->body_ptr = decoder->read_ptr; + decoder->body_ptr.remaining = 0; + decoder->state = HTTP1_MSG_STATE_BODY; + } else { + decoder->state = HTTP1_MSG_STATE_DONE; + } + + return !!decoder->read_ptr.remaining; +} + + +// process a received header to determine message body length, etc. +// +static int process_header(http1_conn_t *conn, struct decoder_t *decoder, const uint8_t *key, const uint8_t *value) +{ + int parse_error = decoder->is_request ? HTTP1_STATUS_BAD_REQ : HTTP1_STATUS_SERVER_ERR; + + if (strcasecmp("Content-Length", (char*) key) == 0) { + uint64_t old = decoder->content_length; + if (sscanf((char*)value, "%"PRIu64, &decoder->content_length) != 1) { + decoder->error_msg = "Malformed Content-Length header"; + decoder->error = parse_error; + return decoder->error; + } + if (old && old != decoder->content_length) { + decoder->error_msg = "Invalid duplicate Content-Length header"; + decoder->error = parse_error; + return decoder->error; + } + decoder->hdr_content_length = true; + + } else if (strcasecmp("Transfer-Encoding", (char *)key) == 0) { + // check if "chunked" is present and it is the last item in the value + // string. And remember kids: coding type names are case insensitive! + // Also note "value" has already been trimmed of whitespace at both + // ends. + size_t len = strlen((char*)value); + if (len >= 7) { // 7 = strlen("chunked") + const char *ptr = ((char*) value) + len - 7; + decoder->is_chunked = strcasecmp("chunked", ptr) == 0; + } + decoder->hdr_transfer_encoding = true; + } + + return 0; +} + + +// Parse out the header key and value +// +static bool parse_header(http1_conn_t *conn, struct decoder_t *decoder) +{ + qd_iterator_pointer_t *rptr = &decoder->read_ptr; + qd_iterator_pointer_t line; + http1_transfer_t *xfer = decoder->xfer; + assert(xfer); // else state machine busted + + if (read_line(rptr, &line)) { + debug_print_iterator_pointer("header:", &line); + + if (is_empty_line(&line)) { + // end of headers + return process_headers_done(conn, decoder); + } + + qd_iterator_pointer_t key = {0}; + + if (!parse_token(&line, &key)) { + decoder->error_msg = "Malformed Header"; + decoder->error = (decoder->is_request) ? HTTP1_STATUS_BAD_REQ + : HTTP1_STATUS_SERVER_ERR; + return false; + } + + // advance line past the ':' + uint8_t octet; + while (get_octet(&line, &octet) && octet != ':') + ; + + // line now contains the value. convert to C strings and post callback + ensure_scratch_size(&decoder->scratch, key.remaining + line.remaining + 2); + uint8_t *ptr = decoder->scratch.buf; + size_t avail = decoder->scratch.size; + + uint8_t *key_str = ptr; + size_t offset = pointer_2_str(&key, key_str, avail); + ptr += offset; + avail -= offset; + + uint8_t *value_str = ptr; + pointer_2_str(&line, value_str, avail); + + // trim whitespace on both ends of value + while (isspace(*value_str)) + ++value_str; + ptr = value_str + strlen((char*) value_str); + while (ptr-- > value_str) { + if (!isspace(*ptr)) + break; + *ptr = 0; + } + + process_header(conn, decoder, key_str, value_str); + + if (!decoder->error) { + decoder->error = conn->config.xfer_rx_header(xfer, (char *)key_str, (char *)value_str); + if (decoder->error) + decoder->error_msg = "xfer_rx_header callback error"; + } + + return !!rptr->remaining; + } + + return false; // pend for more data +} + +// +// Chunked body encoding parser +// + + +// Pass message body data up to the application. +// +static inline int consume_body_data(http1_conn_t *conn, bool flush) +{ + struct decoder_t *decoder = &conn->decoder; + qd_iterator_pointer_t *body_ptr = &decoder->body_ptr; + qd_iterator_pointer_t *rptr = &decoder->read_ptr; + + // shortcut: if no more data to parse send the entire incoming chain + if (rptr->remaining == 0) { + + decoder->error = conn->config.xfer_rx_body(decoder->xfer, &decoder->incoming, + body_ptr->cursor - qd_buffer_base(body_ptr->buffer), + body_ptr->remaining); + DEQ_INIT(decoder->incoming); + *body_ptr = NULL_I_PTR; + *rptr = NULL_I_PTR; + return decoder->error; + } + + // The read pointer points to somewhere in the buffer chain that contains some + // unparsed data. Send any buffers preceding the current read pointer. + qd_buffer_list_t blist = DEQ_EMPTY; + size_t octets = 0; + const size_t body_offset = body_ptr->cursor - qd_buffer_base(body_ptr->buffer); + + // invariant: + assert(DEQ_HEAD(decoder->incoming) == body_ptr->buffer); + + while (body_ptr->buffer && body_ptr->buffer != rptr->buffer) { + DEQ_REMOVE_HEAD(decoder->incoming); + DEQ_INSERT_TAIL(blist, body_ptr->buffer); + octets += qd_buffer_cursor(body_ptr->buffer) - body_ptr->cursor; + body_ptr->buffer = DEQ_HEAD(decoder->incoming); + body_ptr->cursor = qd_buffer_base(body_ptr->buffer); + } + + // invariant: + assert(octets <= body_ptr->remaining); + body_ptr->remaining -= octets; + + if (flush && body_ptr->remaining) { + // need to copy out remaining body octets into new buffer + qd_buffer_t *tail = qd_buffer(); + + assert(body_ptr->remaining <= qd_buffer_capacity(tail)); + memcpy(qd_buffer_cursor(tail), body_ptr->cursor, body_ptr->remaining); + qd_buffer_insert(tail, body_ptr->remaining); + DEQ_INSERT_TAIL(blist, tail); + octets += body_ptr->remaining; + + *body_ptr = *rptr; + body_ptr->remaining = 0; + } + + decoder->error = conn->config.xfer_rx_body(decoder->xfer, &blist, body_offset, octets); + return decoder->error; +} + + + +// parsing the start of a chunked header: +// <chunk size in hex>CRLF +// +static bool parse_body_chunked_header(http1_conn_t *conn, struct decoder_t *decoder) +{ + qd_iterator_pointer_t *rptr = &decoder->read_ptr; + qd_iterator_pointer_t line; + + assert(decoder->chunk_state == HTTP1_CHUNK_HEADER); + assert(decoder->chunk_length == 0); + + if (read_line(rptr, &line)) { + decoder->body_ptr.remaining += line.remaining; + + ensure_scratch_size(&decoder->scratch, line.remaining + 1); + uint8_t *ptr = decoder->scratch.buf; + pointer_2_str(&line, (unsigned char*) ptr, line.remaining + 1); + int rc = sscanf((char*) ptr, "%"SCNx64, &decoder->chunk_length); + if (rc != 1) { + decoder->error_msg = "Invalid chunk header"; + decoder->error = (decoder->is_request) ? HTTP1_STATUS_BAD_REQ + : HTTP1_STATUS_SERVER_ERR; + return false; + } + + if (decoder->chunk_length == 0) { + // last chunk + decoder->chunk_state = HTTP1_CHUNK_TRAILERS; + } else { + decoder->chunk_state = HTTP1_CHUNK_DATA; + + // chunk_length does not include the CRLF trailer: + decoder->chunk_length += 2; + } + + + return !!rptr->remaining; + } + + return false; // pend for more input +} + + +// Parse the data section of a chunk +// +static bool parse_body_chunked_data(http1_conn_t *conn, struct decoder_t *decoder) +{ + qd_iterator_pointer_t *rptr = &decoder->read_ptr; + qd_iterator_pointer_t *body_ptr = &decoder->body_ptr; + + assert(decoder->chunk_state == HTTP1_CHUNK_DATA); + + size_t skipped = skip_octets(rptr, decoder->chunk_length); + decoder->chunk_length -= skipped; + body_ptr->remaining += skipped; + + if (decoder->chunk_length == 0) { + // end of chunk + decoder->chunk_state = HTTP1_CHUNK_HEADER; + consume_body_data(conn, false); + } + + return !!rptr->remaining; +} + + +// Keep reading chunk trailers until the terminating empty line is read +// +static bool parse_body_chunked_trailer(http1_conn_t *conn, struct decoder_t *decoder) +{ + qd_iterator_pointer_t *rptr = &decoder->read_ptr; + qd_iterator_pointer_t *body_ptr = &decoder->body_ptr; + qd_iterator_pointer_t line; + + assert(decoder->chunk_state == HTTP1_CHUNK_TRAILERS); + + if (read_line(rptr, &line)) { + body_ptr->remaining += line.remaining; + if (is_empty_line(&line)) { + // end of message + consume_body_data(conn, true); + decoder->state = HTTP1_MSG_STATE_DONE; + } + + return !!rptr->remaining; + } + + return false; // pend for full line +} + + +// parse an incoming message body which is chunk encoded +// Return True if there is more data pending to parse +static bool parse_body_chunked(http1_conn_t *conn, struct decoder_t *decoder) +{ + bool more; + switch (decoder->chunk_state) { + + case HTTP1_CHUNK_HEADER: + more = parse_body_chunked_header(conn, decoder); + break; + + case HTTP1_CHUNK_DATA: + more = parse_body_chunked_data(conn, decoder); + break; + + case HTTP1_CHUNK_TRAILERS: + more = parse_body_chunked_trailer(conn, decoder); + break; + } // end switch + + return more; +} + + +// parse an incoming message body which is Content-Length bytes long +// +static bool parse_body_content(http1_conn_t *conn, struct decoder_t *decoder) +{ + qd_iterator_pointer_t *rptr = &decoder->read_ptr; + qd_iterator_pointer_t *body_ptr = &decoder->body_ptr; + + size_t skipped = skip_octets(rptr, decoder->content_length); + decoder->content_length -= skipped; + body_ptr->remaining += skipped; + bool eom = decoder->content_length == 0; + + consume_body_data(conn, eom); + if (eom) + decoder->state = HTTP1_MSG_STATE_DONE; + + return !!rptr->remaining; +} + + +static bool parse_body(http1_conn_t *conn, struct decoder_t *decoder) +{ + if (decoder->is_chunked) + return parse_body_chunked(conn, decoder); + + if (decoder->content_length) + return parse_body_content(conn, decoder); + + // otherwise no explict body size, so just keep passing the entire unparsed + // incoming chain along until the remote closes the connection + decoder->error = conn->config.xfer_rx_body(decoder->xfer, + &decoder->incoming, + decoder->read_ptr.cursor + - qd_buffer_base(decoder->read_ptr.buffer), + decoder->read_ptr.remaining); + if (decoder->error) { + decoder->error_msg = "xfer_rx_body callback error"; + return false; + } + + decoder->body_ptr = decoder->read_ptr = NULL_I_PTR; + DEQ_INIT(decoder->incoming); + + return false; +} + + +// Called when incoming message is complete +// +static bool parse_done(http1_conn_t *conn, struct decoder_t *decoder) +{ + http1_transfer_t *xfer = decoder->xfer; + bool is_response = !decoder->is_request; + + if (!decoder->error) { + // signal the message receive is complete + conn->config.xfer_rx_done(xfer); + + if (is_response) { // request<->response transfer complete + + // Informational 1xx response codes are NOT teriminal - further responses are allowed! + if (IS_INFO_RESPONSE(xfer->response_code)) { + xfer->response_code = 0; + } else { + // The message exchange is complete + conn->config.xfer_done(xfer); + decoder->xfer = 0; + http1_transfer_free(xfer); + } + } + + decoder_reset(decoder); + return !!decoder->read_ptr.remaining; + } + return false; +} + + +// Main decode loop. +// Process received data until it is exhausted +// +static int decode_incoming(http1_conn_t *conn) +{ + struct decoder_t *decoder = &conn->decoder; + bool more = true; + while (more && !decoder->error) { + + if (decoder->state == HTTP1_MSG_STATE_START) + more = parse_start_line(conn, decoder); + + else if (decoder->state == HTTP1_MSG_STATE_HEADERS) + more = parse_header(conn, decoder); + + else if (decoder->state == HTTP1_MSG_STATE_BODY) + more = parse_body(conn, decoder); + + // Can reach DONE from any call above. + if (decoder->state == HTTP1_MSG_STATE_DONE) + more = parse_done(conn, decoder); + } + + return decoder->error; +} + + +void *http1_connection_get_context(http1_conn_t *conn) +{ + return conn->context; +} + +// Push inbound network data into the http1 protocol engine. +// +// All of the xfer_rx callback will occur in the context of this call. This +// returns zero on success otherwise an error code. Any error occuring during +// a callback will be reflected in the return value of this function. It is +// expected that the caller will call http1_connection_close on a non-zero +// return value. +// +int http1_connection_rx_data(http1_conn_t *conn, qd_buffer_list_t *data, size_t len) +{ + struct decoder_t *decoder = &conn->decoder; + bool init_ptrs = DEQ_IS_EMPTY(decoder->incoming); + + DEQ_APPEND(decoder->incoming, *data); + + if (init_ptrs) { + decoder->read_ptr.buffer = DEQ_HEAD(decoder->incoming); + decoder->read_ptr.cursor = qd_buffer_base(decoder->read_ptr.buffer); + decoder->read_ptr.remaining = len; + + if (decoder->state == HTTP1_MSG_STATE_BODY) { + decoder->body_ptr = decoder->read_ptr; + decoder->body_ptr.remaining = 0; + } + } else { + decoder->read_ptr.remaining += len; + } + + return decode_incoming(conn); +} + +void http1_transfer_set_context(http1_transfer_t *xfer, void *context) +{ + xfer->context = context; +} + +void *http1_transfer_get_context(const http1_transfer_t *xfer) +{ + return xfer->context; +} + +http1_conn_t *http1_transfer_get_connection(const http1_transfer_t *xfer) +{ + return xfer->conn; +} + + +// initiate a new HTTP request. This creates a new transfer. +// request = <method>SP<target>SP<version>CRLF +// Expects version to be in the format "HTTP/X.Y" +// +http1_transfer_t *http1_tx_request(http1_conn_t *conn, const char *method, const char *target, const char *version) +{ + struct encoder_t *encoder = &conn->encoder; + assert(!encoder->xfer); // error: transfer already in progress + assert(conn->config.type == HTTP1_CONN_SERVER); + + http1_transfer_t *xfer = encoder->xfer = http1_transfer(conn); + encoder->is_request = true; + encoder->crlf_sent = false; + + if (strcmp((char*)method, "HEAD") == 0) + xfer->is_head_method = true; + else if (strcmp((char*)method, "CONNECT") == 0) + xfer->is_connect_method = true; + + write_string(encoder, method); + write_string(encoder, " "); + write_string(encoder, target); + write_string(encoder, " "); + write_string(encoder, version); + write_string(encoder, CRLF); + + return xfer; +} + + +// Send an HTTP response msg. xfer must correspond to the "oldest" outstanding +// request that arrived via the xfer_rx_request callback for this connection. +// version is expected to be in the form "HTTP/x.y" +// status_code is expected to be 100 <= status_code <= 999 +// status-line = HTTP-version SP status-code SP reason-phrase CRLF +// +int http1_tx_response(http1_transfer_t *xfer, const char *version, int status_code, const char *reason_phrase) +{ + http1_conn_t *conn = http1_transfer_get_connection(xfer); + struct encoder_t *encoder = &conn->encoder; + + assert(conn->config.type == HTTP1_CONN_CLIENT); + assert(!encoder->xfer); // error: transfer already in progress + assert(DEQ_HEAD(conn->xfers) == xfer); // error: response not in order! + assert(xfer->response_code == 0); + + encoder->xfer = xfer; + encoder->is_request = false; + encoder->crlf_sent = false; + xfer->response_code = status_code; + + char code_str[32]; + snprintf(code_str, 32, "%d", status_code); + + write_string(encoder, version); + write_string(encoder, " "); + write_string(encoder, code_str); + if (reason_phrase) { + write_string(encoder, " "); + write_string(encoder, reason_phrase); + } + write_string(encoder, CRLF); + + return 0; +} + + +// Add a header field to an outgoing message +// header-field = field-name ":" OWS field-value OWS +int http1_tx_add_header(http1_transfer_t *xfer, const char *key, const char *value) +{ + http1_conn_t *conn = http1_transfer_get_connection(xfer); + struct encoder_t *encoder = &conn->encoder; + assert(encoder->xfer == xfer); // xfer not current transfer + + write_string(encoder, key); + write_string(encoder, ": "); + write_string(encoder, value); + write_string(encoder, CRLF); + + // check to see if there are any full buffers that can be sent. + + qd_buffer_list_t blist = DEQ_EMPTY; + qd_buffer_t *buf = DEQ_HEAD(encoder->outgoing); + size_t octets = 0; + while (buf && buf != encoder->write_ptr.buffer) { + DEQ_REMOVE_HEAD(encoder->outgoing); + DEQ_INSERT_TAIL(blist, buf); + octets += qd_buffer_size(buf); + } + if (!DEQ_IS_EMPTY(blist)) + conn->config.conn_tx_data(conn, &blist, 0, octets); + + return 0; +} + + +// just forward the body chain along +int http1_tx_body(http1_transfer_t *xfer, qd_buffer_list_t *data, size_t offset, size_t len) +{ + http1_conn_t *conn = http1_transfer_get_connection(xfer); + struct encoder_t *encoder = &conn->encoder; + + fprintf(stderr, "http1_tx_body(offset=%zu size=%zu)\n", offset, len); + + if (!encoder->crlf_sent) { + // need to terminate any headers by sending the plain CRLF that follows + // the headers + write_string(encoder, CRLF); + + // flush all pending output. From this point out the outgoing queue is + // no longer used for this message + fprintf(stderr, "Flushing before body: %u bytes\n", qd_buffer_list_length(&encoder->outgoing)); + conn->config.conn_tx_data(conn, &encoder->outgoing, 0, qd_buffer_list_length(&encoder->outgoing)); + DEQ_INIT(encoder->outgoing); + encoder->write_ptr = NULL_I_PTR; + encoder->crlf_sent = true; + } + + // skip the outgoing queue and send directly + fprintf(stderr, "Sending body data %zu bytes\n", len); + conn->config.conn_tx_data(conn, data, offset, len); + + return 0; +} + + +int http1_tx_done(http1_transfer_t *xfer) +{ + http1_conn_t *conn = http1_transfer_get_connection(xfer); + struct encoder_t *encoder = &conn->encoder; + + if (!encoder->crlf_sent) { + // need to send the plain CRLF that follows the headers + write_string(encoder, CRLF); + + // flush all pending output. + + fprintf(stderr, "Flushing at tx_done: %u bytes\n", qd_buffer_list_length(&encoder->outgoing)); + conn->config.conn_tx_data(conn, &encoder->outgoing, 0, qd_buffer_list_length(&encoder->outgoing)); + DEQ_INIT(encoder->outgoing); + encoder->write_ptr = NULL_I_PTR; + encoder->crlf_sent = true; + } + + bool is_response = !encoder->is_request; + encoder_reset(encoder); + + if (is_response) { + if (IS_INFO_RESPONSE(xfer->response_code)) { + // this is a non-terminal response. Another response is expected + // for this request so just reset the transfer state + xfer->response_code = 0; + } else { + // The message exchange is complete + conn->config.xfer_done(xfer); + http1_transfer_free(xfer); + } + } + + return 0; +} + + diff --git a/src/adaptors/http_adaptor.c b/src/adaptors/http_adaptor.c index 86708c8..0b70459 100644 --- a/src/adaptors/http_adaptor.c +++ b/src/adaptors/http_adaptor.c @@ -30,6 +30,7 @@ #include "qpid/dispatch/protocol_adaptor.h" #include "delivery.h" +#include "http_common.h" #include "http_adaptor.h" const char *PATH = ":path"; @@ -49,8 +50,6 @@ const char *CONTENT_ENCODING = "content-encoding"; NGHTTP2_NV_FLAG_NONE \ } -ALLOC_DEFINE(qd_http_lsnr_t); -ALLOC_DEFINE(qd_http_connector_t); ALLOC_DEFINE(qdr_http2_session_data_t); ALLOC_DEFINE(qdr_http2_stream_data_t); @@ -554,26 +553,6 @@ static void grant_read_buffers(qdr_http_connection_t *conn) } -static void free_bridge_config(qd_bridge_config_t *config) -{ - if (!config) { - return; - } - free(config->host); - free(config->port); - free(config->name); - free(config->address); - free(config->host_port); -} - -void qd_http_listener_decref(qd_http_lsnr_t* li) -{ - if (li && sys_atomic_dec(&li->ref_count) == 1) { - free_bridge_config(&li->config); - free_qd_http_lsnr_t(li); - } -} - static void qdr_http_detach(void *context, qdr_link_t *link, qdr_error_t *error, bool first, bool close) { } @@ -892,18 +871,8 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t return 0; } -void qd_http_connector_decref(qd_http_connector_t* c) +void qd_http2_delete_connector(qd_dispatch_t *qd, qd_http_connector_t *connector) { - if (c && sys_atomic_dec(&c->ref_count) == 1) { - free_bridge_config(&c->config); - free_qd_http_connector_t(c); - } -} - - -void qd_dispatch_delete_http_connector(qd_dispatch_t *qd, void *impl) -{ - qd_http_connector_t *connector = (qd_http_connector_t*) impl; if (connector) { //TODO: cleanup and close any associated active connections DEQ_REMOVE(http_adaptor->connectors, connector); @@ -1102,69 +1071,25 @@ static void handle_listener_event(pn_event_t *e, qd_server_t *qd_server, void *c } -static qd_http_lsnr_t *qd_http_lsnr(qd_server_t *server) -{ - qd_http_lsnr_t *li = new_qd_http_lsnr_t(); - if (!li) - return 0; - ZERO(li); - sys_atomic_init(&li->ref_count, 1); - li->server = server; - li->context.context = li; - li->context.handler = &handle_listener_event; - return li; -} - - -#define CHECK() if (qd_error_code()) goto error - - static const int BACKLOG = 50; /* Listening backlog */ static bool http_listener_listen(qd_http_lsnr_t *li) { - li->pn_listener = pn_listener(); - if (li->pn_listener) { - pn_listener_set_context(li->pn_listener, &li->context); - pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, li->config.host_port, BACKLOG); - sys_atomic_inc(&li->ref_count); /* In use by proactor, PN_LISTENER_CLOSE will dec */ - /* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN event */ - } else { - qd_log(http_adaptor->log_source, QD_LOG_CRITICAL, "Failed to create listener for %s", - li->config.host_port); - } + pn_proactor_listen(qd_server_proactor(li->server), li->pn_listener, li->config.host_port, BACKLOG); + sys_atomic_inc(&li->ref_count); /* In use by proactor, PN_LISTENER_CLOSE will dec */ + /* Listen is asynchronous, log "listening" message on PN_LISTENER_OPEN event */ return li->pn_listener; } -static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_bridge_config_t *config, qd_entity_t* entity) -{ - qd_error_clear(); - ZERO(config); - - config->name = qd_entity_get_string(entity, "name"); CHECK(); - config->host = qd_entity_get_string(entity, "host"); CHECK(); - config->port = qd_entity_get_string(entity, "port"); CHECK(); - config->address = qd_entity_get_string(entity, "address"); CHECK(); - - int hplen = strlen(config->host) + strlen(config->port) + 2; - config->host_port = malloc(hplen); - snprintf(config->host_port, hplen, "%s:%s", config->host, config->port); - - return QD_ERROR_NONE; - - error: - free_bridge_config(config); - return qd_error_code(); -} - -qd_http_lsnr_t *qd_dispatch_configure_http_lsnr(qd_dispatch_t *qd, qd_entity_t *entity) +qd_http_lsnr_t *qd_http2_configure_listener(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity) { - qd_http_lsnr_t *li = qd_http_lsnr(qd->server); - if (!li || load_bridge_config(qd, &li->config, entity) != QD_ERROR_NONE) { - qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create http listener: %s", qd_error_message()); - qd_http_listener_decref(li); + qd_http_lsnr_t *li = qd_http_lsnr(qd->server, &handle_listener_event); + if (!li) { + qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create http listener: no memory"); return 0; } + + li->config = *config; //DEQ_ITEM_INIT(li); DEQ_INSERT_TAIL(http_adaptor->listeners, li); qd_log(http_adaptor->log_source, QD_LOG_INFO, "Configured HTTP_ADAPTOR listener on %s", (&li->config)->host_port); @@ -1173,16 +1098,12 @@ qd_http_lsnr_t *qd_dispatch_configure_http_lsnr(qd_dispatch_t *qd, qd_entity_t * } -static qd_http_connector_t *qd_http_connector(qd_server_t *server) +void qd_http2_delete_listener(qd_dispatch_t *qd, qd_http_lsnr_t *listener) { - qd_http_connector_t *c = new_qd_http_connector_t(); - if (!c) return 0; - ZERO(c); - sys_atomic_init(&c->ref_count, 1); - c->server = server; - return c; + // TBD? } + static void on_activate(void *context) { qdr_http_connection_t* conn = (qdr_http_connection_t*) context; @@ -1278,20 +1199,18 @@ qdr_http_connection_t *qdr_http_connection_egress(qd_http_connector_t *connector } - -qd_http_connector_t *qd_dispatch_configure_http_connector(qd_dispatch_t *qd, qd_entity_t *entity) +qd_http_connector_t *qd_http2_configure_connector(qd_dispatch_t *qd, const qd_http_bridge_config_t *config, qd_entity_t *entity) { qd_http_connector_t *c = qd_http_connector(qd->server); - if (!c || load_bridge_config(qd, &c->config, entity) != QD_ERROR_NONE) { - qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create tcp connector: %s", qd_error_message()); - qd_http_connector_decref(c); + if (!c) { + qd_log(http_adaptor->log_source, QD_LOG_ERROR, "Unable to create http connector: no memory"); return 0; } + c->config = *config; DEQ_ITEM_INIT(c); DEQ_INSERT_TAIL(http_adaptor->connectors, c); qdr_http_connection_egress(c); return c; - } static void qdr_http_adaptor_final(void *adaptor_context) @@ -1302,17 +1221,6 @@ static void qdr_http_adaptor_final(void *adaptor_context) http_adaptor = NULL; } -qd_error_t qd_entity_refresh_httpListener(qd_entity_t* entity, void *impl) -{ - return QD_ERROR_NONE; -} - - -qd_error_t qd_entity_refresh_httpConnector(qd_entity_t* entity, void *impl) -{ - return QD_ERROR_NONE; -} - /** * This initialization function will be invoked when the router core is ready for the protocol * adaptor to be created. This function must: @@ -1341,7 +1249,7 @@ static void qdr_http_adaptor_init(qdr_core_t *core, void **adaptor_context) qdr_http_delivery_update, qdr_http_conn_close, qdr_http_conn_trace); - adaptor->log_source = qd_log_source("HTTP_ADAPTOR"); + adaptor->log_source = qd_log_source(QD_HTTP_LOG_SOURCE); adaptor->protocol_log_source = qd_log_source("PROTOCOL"); *adaptor_context = adaptor; DEQ_INIT(adaptor->listeners); diff --git a/src/adaptors/http_adaptor.h b/src/adaptors/http_adaptor.h index 5982622..2f0134a 100644 --- a/src/adaptors/http_adaptor.h +++ b/src/adaptors/http_adaptor.h @@ -25,44 +25,16 @@ #include <qpid/dispatch/log.h> #include <nghttp2/nghttp2.h> + // We already have a qd_http_listener_t defined in http-libwebsockets.c // We will call this as qd_http_lsnr_t in order to avoid a clash. // At a later point in time, we will handle websocket here as well // and get rid of http-libwebsockets.c and rename this as qd_http_listener_t -typedef struct qd_http_lsnr_t qd_http_lsnr_t; -typedef struct qd_http_connector_t qd_http_connector_t; typedef struct qdr_http2_session_data_t qdr_http2_session_data_t; -typedef struct qd_bridge_config_t qd_bridge_config_t; typedef struct qdr_http2_stream_data_t qdr_http2_stream_data_t; typedef struct qdr_http_connection_t qdr_http_connection_t; DEQ_DECLARE(qdr_http2_stream_data_t, qd_http2_stream_data_list_t); -struct qd_bridge_config_t { - char *name; - char *host; - char *port; - char *address; - char *host_port; -}; - -struct qd_http_lsnr_t { - qd_handler_context_t context; - sys_atomic_t ref_count; - qd_server_t *server; - qd_bridge_config_t config; - pn_listener_t *pn_listener; - DEQ_LINKS(qd_http_lsnr_t); -}; - -struct qd_http_connector_t { - sys_atomic_t ref_count; - qd_server_t *server; - qd_bridge_config_t config; - qd_timer_t *timer; - long delay; - DEQ_LINKS(qd_http_connector_t); -}; - struct qdr_http2_session_data_t { qd_http2_stream_data_list_t streams; // A session can have many streams. nghttp2_session *session; // A pointer to the nghttp2s' session object @@ -110,7 +82,7 @@ struct qdr_http_connection_t { pn_raw_buffer_t read_buffers[4]; bool ingress; qd_timer_t *activate_timer; - qd_bridge_config_t *config; + qd_http_bridge_config_t *config; qd_server_t *server; uint64_t conn_id; @@ -126,11 +98,7 @@ struct qdr_http_connection_t { nghttp2_data_provider data_prd; }; -DEQ_DECLARE(qd_http_lsnr_t, qd_http_lsnr_list_t); -ALLOC_DECLARE(qd_http_lsnr_t); ALLOC_DECLARE(qdr_http2_session_data_t); -ALLOC_DECLARE(qd_http_connector_t); ALLOC_DECLARE(qdr_http2_stream_data_t); -DEQ_DECLARE(qd_http_connector_t, qd_http_connector_list_t); diff --git a/src/adaptors/http_common.c b/src/adaptors/http_common.c new file mode 100644 index 0000000..b3a6fc9 --- /dev/null +++ b/src/adaptors/http_common.c @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "http_common.h" + +#include <proton/listener.h> + +#include <stdio.h> + +ALLOC_DECLARE(qd_http_lsnr_t); +ALLOC_DEFINE(qd_http_lsnr_t); +ALLOC_DECLARE(qd_http_connector_t); +ALLOC_DEFINE(qd_http_connector_t); + + +static qd_error_t load_bridge_config(qd_dispatch_t *qd, qd_http_bridge_config_t *config, qd_entity_t* entity) +{ + char *version_str = 0; + + qd_error_clear(); + ZERO(config); + +#define CHECK() if (qd_error_code()) goto error + config->name = qd_entity_get_string(entity, "name"); CHECK(); + config->host = qd_entity_get_string(entity, "host"); CHECK(); + config->port = qd_entity_get_string(entity, "port"); CHECK(); + config->address = qd_entity_get_string(entity, "address"); CHECK(); + version_str = qd_entity_get_string(entity, "protcolVersion"); CHECK(); + + if (strncmp(version_str, "HTTP/1", 6) == 0) { + config->version = VERSION_HTTP1; + } else { + config->version = VERSION_HTTP2; + } + free(version_str); + version_str = 0; + + int hplen = strlen(config->host) + strlen(config->port) + 2; + config->host_port = malloc(hplen); + snprintf(config->host_port, hplen, "%s:%s", config->host, config->port); + + return QD_ERROR_NONE; + +error: + qd_http_free_bridge_config(config); + free(version_str); + return qd_error_code(); +} + + +void qd_http_free_bridge_config(qd_http_bridge_config_t *config) +{ + if (!config) { + return; + } + free(config->host); + free(config->port); + free(config->name); + free(config->address); + free(config->host_port); +} + + +// +// HTTP Listener Management (HttpListenerEntity) +// + + +qd_http_lsnr_t *qd_dispatch_configure_http_lsnr(qd_dispatch_t *qd, qd_entity_t *entity) +{ + qd_http_lsnr_t *listener = 0; + qd_http_bridge_config_t config; + + if (load_bridge_config(qd, &config, entity) != QD_ERROR_NONE) { + qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_ERROR, + "Unable to create http listener: %s", qd_error_message()); + return 0; + } + + switch (config.version) { + case VERSION_HTTP1: + listener = qd_http1_configure_listener(qd, &config, entity); + break; + case VERSION_HTTP2: + listener = qd_http2_configure_listener(qd, &config, entity); + break; + } + + if (!listener) + qd_http_free_bridge_config(&config); + + return listener; +} + + +void qd_dispatch_delete_http_listener(qd_dispatch_t *qd, void *impl) +{ + qd_http_lsnr_t *listener = (qd_http_lsnr_t*) impl; + if (listener) { + switch (listener->config.version) { + case VERSION_HTTP1: + qd_http1_delete_listener(qd, listener); + break; + case VERSION_HTTP2: + qd_http2_delete_listener(qd, listener); + break; + } + } +} + + +qd_error_t qd_entity_refresh_httpListener(qd_entity_t* entity, void *impl) +{ + return QD_ERROR_NONE; +} + + +// +// HTTP Connector Management (HttpConnectorEntity) +// + + +qd_http_connector_t *qd_dispatch_configure_http_connector(qd_dispatch_t *qd, qd_entity_t *entity) +{ + qd_http_connector_t *conn = 0; + qd_http_bridge_config_t config; + + if (load_bridge_config(qd, &config, entity) != QD_ERROR_NONE) { + qd_log(qd_log_source(QD_HTTP_LOG_SOURCE), QD_LOG_ERROR, + "Unable to create http connector: %s", qd_error_message()); + return 0; + } + + switch (config.version) { + case VERSION_HTTP1: + conn = qd_http1_configure_connector(qd, &config, entity); + break; + case VERSION_HTTP2: + conn = qd_http2_configure_connector(qd, &config, entity); + break; + } + + if (!conn) + qd_http_free_bridge_config(&config); + + return conn; +} + + +void qd_dispatch_delete_http_connector(qd_dispatch_t *qd, void *impl) +{ + qd_http_connector_t *conn = (qd_http_connector_t*) impl; + + if (conn) { + switch (conn->config.version) { + case VERSION_HTTP1: + qd_http1_delete_connector(qd, conn); + break; + case VERSION_HTTP2: + qd_http2_delete_connector(qd, conn); + break; + } + } +} + +qd_error_t qd_entity_refresh_httpConnector(qd_entity_t* entity, void *impl) +{ + return QD_ERROR_NONE; +} + +// +// qd_http_lsnr_t constructor +// + +qd_http_lsnr_t *qd_http_lsnr(qd_server_t *server, qd_server_event_handler_t handler) +{ + qd_http_lsnr_t *li = new_qd_http_lsnr_t(); + if (!li) + return 0; + ZERO(li); + + li->pn_listener = pn_listener(); + if (!li->pn_listener) { + free_qd_http_lsnr_t(li); + return 0; + } + + sys_atomic_init(&li->ref_count, 1); + li->server = server; + li->context.context = li; + li->context.handler = handler; + pn_listener_set_context(li->pn_listener, &li->context); + + return li; +} + +void qd_http_listener_decref(qd_http_lsnr_t* li) +{ + if (li && sys_atomic_dec(&li->ref_count) == 1) { + qd_http_free_bridge_config(&li->config); + free_qd_http_lsnr_t(li); + } +} + +// +// qd_http_connector_t constructor +// + +qd_http_connector_t *qd_http_connector(qd_server_t *server) +{ + qd_http_connector_t *c = new_qd_http_connector_t(); + if (!c) return 0; + ZERO(c); + sys_atomic_init(&c->ref_count, 1); + c->server = server; + return c; +} + +void qd_http_connector_decref(qd_http_connector_t* c) +{ + if (c && sys_atomic_dec(&c->ref_count) == 1) { + qd_http_free_bridge_config(&c->config); + free_qd_http_connector_t(c); + } +} + + diff --git a/src/adaptors/http_common.h b/src/adaptors/http_common.h new file mode 100644 index 0000000..e8a7ff9 --- /dev/null +++ b/src/adaptors/http_common.h @@ -0,0 +1,108 @@ +#ifndef __http_common_h__ +#define __http_common_h__ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/atomic.h> +#include <qpid/dispatch/ctools.h> +#include <qpid/dispatch/alloc.h> +#include <qpid/dispatch/timer.h> + +#include "delivery.h" +#include "entity.h" + +#define QD_HTTP_LOG_SOURCE "HTTP_ADAPTOR" + +typedef enum { + VERSION_HTTP1, + VERSION_HTTP2, +} qd_http_version_t; + +typedef struct qd_http_bridge_config_t { + char *name; + char *host; + char *port; + char *address; + char *host_port; + qd_http_version_t version; +} qd_http_bridge_config_t; + +void qd_http_free_bridge_config(qd_http_bridge_config_t *config); + +typedef struct qd_http_lsnr_t qd_http_lsnr_t; +struct qd_http_lsnr_t { + qd_http_bridge_config_t config; + qd_handler_context_t context; + sys_atomic_t ref_count; + qd_server_t *server; + pn_listener_t *pn_listener; + DEQ_LINKS(qd_http_lsnr_t); +}; +DEQ_DECLARE(qd_http_lsnr_t, qd_http_lsnr_list_t); + +qd_http_lsnr_t *qd_http_lsnr(qd_server_t *server, qd_server_event_handler_t handler); +void qd_http_listener_decref(qd_http_lsnr_t* li); + +typedef struct qd_http_connector_t qd_http_connector_t; +struct qd_http_connector_t { + qd_http_bridge_config_t config; + sys_atomic_t ref_count; + qd_server_t *server; + qd_timer_t *timer; + long delay; + struct qdr_http_connection_t *dispatcher; // pseudo egress connection + DEQ_LINKS(qd_http_connector_t); +}; +DEQ_DECLARE(qd_http_connector_t, qd_http_connector_list_t); + +qd_http_connector_t *qd_http_connector(qd_server_t *server); +void qd_http_connector_decref(qd_http_connector_t* c); + + + +// +// Management Entity Interfaces (see HttpListenerEntity and HttpConnectorEntity in agent.py) +// + +qd_http_lsnr_t *qd_dispatch_configure_http_lsnr(qd_dispatch_t *qd, qd_entity_t *entity); +void qd_dispatch_delete_http_listener(qd_dispatch_t *qd, void *impl); +qd_error_t qd_entity_refresh_httpListener(qd_entity_t* entity, void *impl); + +qd_http_connector_t *qd_dispatch_configure_http_connector(qd_dispatch_t *qd, qd_entity_t *entity); +void qd_dispatch_delete_http_connector(qd_dispatch_t *qd, void *impl); +qd_error_t qd_entity_refresh_httpConnector(qd_entity_t* entity, void *impl); + +// +// These functions are defined in their respective HTTP adaptors: +// + +qd_http_lsnr_t *qd_http1_configure_listener(qd_dispatch_t *, const qd_http_bridge_config_t *, qd_entity_t *); +qd_http_lsnr_t *qd_http2_configure_listener(qd_dispatch_t *, const qd_http_bridge_config_t *, qd_entity_t *); + +void qd_http1_delete_listener(qd_dispatch_t *, qd_http_lsnr_t *); +void qd_http2_delete_listener(qd_dispatch_t *, qd_http_lsnr_t *); + +qd_http_connector_t *qd_http1_configure_connector(qd_dispatch_t *, const qd_http_bridge_config_t *, qd_entity_t *); +qd_http_connector_t *qd_http2_configure_connector(qd_dispatch_t *, const qd_http_bridge_config_t *, qd_entity_t *); + +void qd_http1_delete_connector(qd_dispatch_t *, qd_http_connector_t *); +void qd_http2_delete_connector(qd_dispatch_t *, qd_http_connector_t *); + + +#endif // __http_common_h__ diff --git a/tests/system_tests_qdmanage.py b/tests/system_tests_qdmanage.py index efd102f..8553e4f 100644 --- a/tests/system_tests_qdmanage.py +++ b/tests/system_tests_qdmanage.py @@ -39,7 +39,7 @@ DUMMY = "org.apache.qpid.dispatch.dummy" CONNECTION_PROPERTIES_UNICODE_STRING = {u'connection': u'properties', u'int_property': 6451} -TOTAL_ENTITIES=29 # for tests that check the total # of entities +TOTAL_ENTITIES=31 # for tests that check the total # of entities class QdmanageTest(TestCase): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org