This is an automated email from the ASF dual-hosted git repository. gsim pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new 0bcca5f DISPATCH-1898: eventChannel option for http1 adaptor 0bcca5f is described below commit 0bcca5fb5e9ce18f1d38579f2d6cc326ee87ef4d Author: Gordon Sim <g...@redhat.com> AuthorDate: Thu Dec 17 21:58:23 2020 +0000 DISPATCH-1898: eventChannel option for http1 adaptor --- src/adaptors/http1/http1_client.c | 76 ++++++++++++++++++++++++++++++++------- src/adaptors/http1/http1_server.c | 33 ++++++++++++++--- 2 files changed, 93 insertions(+), 16 deletions(-) diff --git a/src/adaptors/http1/http1_client.c b/src/adaptors/http1/http1_client.c index ffae7ec..fe0b6c2 100644 --- a/src/adaptors/http1/http1_client.c +++ b/src/adaptors/http1/http1_client.c @@ -34,6 +34,7 @@ #define LISTENER_BACKLOG 16 const char *CONTENT_LENGTH_KEY = "Content-Length"; +const char *POST_METHOD = "POST"; // // State for a single response message to be sent to the client via the raw @@ -77,6 +78,8 @@ typedef struct _client_request_t { // _client_response_msg_list_t responses; + uint32_t error_code; + char *error_text; bool codec_completed; // encoder/decoder done bool cancelled; bool close_on_complete; // close the conn when this request is complete @@ -110,6 +113,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi static void _client_response_msg_free(_client_request_t *req, _client_response_msg_t *rmsg); static void _client_request_free(_client_request_t *req); static void _write_pending_response(_client_request_t *req); +static void _deliver_request(qdr_http1_connection_t *hconn, _client_request_t *req); //////////////////////////////////////////////////////// @@ -332,7 +336,14 @@ static void _setup_client_connection(qdr_http1_connection_t *hconn) // simulate a client publisher link to the HTTP server: qdr_terminus_t *target = qdr_terminus(0); - qdr_terminus_set_address(target, hconn->cfg.address); + if (hconn->cfg.event_channel) { + //For an event channel, we always want to be able to handle + //incoming requests. We use an anonymous publisher so that we + //get credit regardless of there being consumers. + qdr_terminus_set_address(target, NULL); + } else { + qdr_terminus_set_address(target, hconn->cfg.address); + } hconn->in_link = qdr_link_first_attach(hconn->qdr_conn, QD_INCOMING, qdr_terminus(0), //qdr_terminus_t *source, @@ -414,7 +425,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi case PN_RAW_CONNECTION_NEED_READ_BUFFERS: { qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", hconn->conn_id); // @TODO(kgiusti): backpressure if no credit - if (hconn->client.reply_to_addr /* && hconn->in_link_credit > 0 */) { + if (hconn->client.reply_to_addr || hconn->cfg.event_channel /* && hconn->in_link_credit > 0 */) { int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn); qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted", hconn->conn_id, granted); @@ -478,6 +489,16 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi hconn->conn_id, hconn->out_link_id, hreq->base.msg_id); need_close = true; } else { + if (hreq->error_code) { + qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"][L%"PRIu64"] Responding with %i %s", hconn->conn_id, + hconn->out_link_id, hreq->error_code, hreq->error_text); + _client_response_msg_t *rmsg = new__client_response_msg_t(); + ZERO(rmsg); + DEQ_INIT(rmsg->out_data.fifo); + DEQ_INSERT_TAIL(hreq->responses, rmsg); + qdr_http1_error_response(&hreq->base, hreq->error_code, hreq->error_text); + _write_pending_response(hreq); + } // Can we retire the current outgoing response messages? // _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses); @@ -529,9 +550,7 @@ static void _handle_connection_events(pn_event_t *e, qd_server_t *qd_server, voi hconn->conn_id, hconn->in_link_id, hreq->base.msg_id); hconn->in_link_credit -= 1; - hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0); - qdr_delivery_set_context(hreq->request_dlv, (void*) hreq); - hreq->request_msg = 0; + _deliver_request(hconn, hreq); } _write_pending_response(hreq); @@ -659,6 +678,17 @@ static int _client_rx_request_cb(h1_codec_request_state_t *hrs, qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] HTTP request received: msg-id=%"PRIu64" method=%s target=%s version=%"PRIi32".%"PRIi32, hconn->conn_id, creq->base.msg_id, method, target, version_major, version_minor); + if (hconn->cfg.event_channel) { + if (strcasecmp(method, POST_METHOD) == 0) { + creq->error_code = 204; + creq->error_text = "Event posted."; + qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Event posted", hconn->conn_id); + } else { + creq->error_code = 405; + creq->error_text = "Invalid method for event channel, only POST is allowed."; + qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, "[C%"PRIu64"] HTTP %s request not allowed for event channel", hconn->conn_id, method); + } + } creq->request_props = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0); qd_compose_start_map(creq->request_props); @@ -744,6 +774,10 @@ static int _client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs); qdr_http1_connection_t *hconn = hreq->base.hconn; + if (hconn->cfg.event_channel && strcasecmp(h1_codec_request_state_method(hrs), POST_METHOD) != 0) { + return 0; + } + qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"][L%"PRIu64"] HTTP request headers done.", hconn->conn_id, hconn->in_link_id); @@ -770,7 +804,12 @@ static int _client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo // @TODO(kgiusti) set to: to target? qd_compose_insert_string(props, hconn->cfg.address); // to qd_compose_insert_string(props, h1_codec_request_state_method(hrs)); // subject - qd_compose_insert_string(props, hconn->client.reply_to_addr); // reply-to + if (hconn->cfg.event_channel) { + // event channel does not want replies + qd_compose_insert_null(props); // reply-to + } else { + qd_compose_insert_string(props, hconn->client.reply_to_addr); // reply-to + } qd_compose_insert_null(props); // correlation-id qd_compose_insert_null(props); // content-type qd_compose_insert_null(props); // content-encoding @@ -797,9 +836,7 @@ static int _client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo "[C%"PRIu64"][L%"PRIu64"] Delivering request msg-id=%"PRIu64" to router", hconn->conn_id, hconn->in_link_id, hreq->base.msg_id); - hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0); - qdr_delivery_set_context(hreq->request_dlv, (void*) hreq); - hreq->request_msg = 0; + _deliver_request(hconn, hreq); } return 0; @@ -814,6 +851,10 @@ static int _client_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b { _client_request_t *hreq = (_client_request_t*) h1_codec_request_state_get_context(hrs); qdr_http1_connection_t *hconn = hreq->base.hconn; + if (hconn->cfg.event_channel && strcasecmp(h1_codec_request_state_method(hrs), POST_METHOD) != 0) { + qd_buffer_list_free_buffers(body); + return 0; + } qd_message_t *msg = hreq->request_msg ? hreq->request_msg : qdr_delivery_message(hreq->request_dlv); qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, @@ -936,9 +977,7 @@ void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t *adaptor, "[C%"PRIu64"][L%"PRIu64"] Delivering next request msg-id=%"PRIu64" to router", hconn->conn_id, hconn->in_link_id, hreq->base.msg_id); - hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0); - qdr_delivery_set_context(hreq->request_dlv, (void*) hreq); - hreq->request_msg = 0; + _deliver_request(hconn, hreq); } } } @@ -1639,3 +1678,16 @@ void qdr_http1_client_core_conn_close(qdr_http1_adaptor_t *adaptor, // qdr_http1_close_connection(hconn, error); } + +static void _deliver_request(qdr_http1_connection_t *hconn, _client_request_t *hreq) +{ + if (hconn->cfg.event_channel) { + qd_iterator_t *addr = qd_message_field_iterator(hreq->request_msg, QD_FIELD_TO); + qd_iterator_reset_view(addr, ITER_VIEW_ADDRESS_HASH); + hreq->request_dlv = qdr_link_deliver_to(hconn->in_link, hreq->request_msg, 0, addr, false, 0, 0, 0, 0); + } else { + hreq->request_dlv = qdr_link_deliver(hconn->in_link, hreq->request_msg, 0, false, 0, 0, 0, 0); + } + qdr_delivery_set_context(hreq->request_dlv, (void*) hreq); + hreq->request_msg = 0; +} diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c index 9481a4a..6377d8f 100644 --- a/src/adaptors/http1/http1_server.c +++ b/src/adaptors/http1/http1_server.c @@ -821,6 +821,10 @@ static int _server_rx_response_cb(h1_codec_request_state_t *hrs, hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, status_code, reason_phrase ? reason_phrase : "<NONE>", version_major, version_minor); + if (hconn->cfg.event_channel) { + return 0; + } + _server_response_msg_t *rmsg = new__server_response_msg_t(); ZERO(rmsg); rmsg->hreq = hreq; @@ -859,6 +863,10 @@ static int _server_rx_header_cb(h1_codec_request_state_t *hrs, const char *key, "[C%"PRIu64"]L%"PRIu64"] HTTP response header received: key='%s' value='%s'", hconn->conn_id, hconn->in_link_id, key, value); + if (hconn->cfg.event_channel) { + return 0; + } + // expect: running incoming request at tail _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses); assert(rmsg); @@ -885,6 +893,10 @@ static int _server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo "[C%"PRIu64"][L%"PRIu64"] HTTP response headers done.", hconn->conn_id, hconn->in_link_id); + if (hconn->cfg.event_channel) { + return 0; + } + // expect: running incoming request at tail _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses); assert(rmsg && !rmsg->msg); @@ -953,14 +965,20 @@ static int _server_rx_body_cb(h1_codec_request_state_t *hrs, qd_buffer_list_t *b { _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs); qdr_http1_connection_t *hconn = hreq->base.hconn; - _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses); - - qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv); qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.", hconn->conn_id, hconn->in_link_id, len); + if (hconn->cfg.event_channel) { + qd_buffer_list_free_buffers(body); + return 0; + } + + _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses); + + qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv); + qd_message_stream_data_append(msg, body); // @@ -981,6 +999,13 @@ static void _server_rx_done_cb(h1_codec_request_state_t *hrs) { _server_request_t *hreq = (_server_request_t*) h1_codec_request_state_get_context(hrs); qdr_http1_connection_t *hconn = hreq->base.hconn; + if (hconn->cfg.event_channel) { + qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, + "[C%"PRIu64"][L%"PRIu64"] HTTP response message msg-id=%"PRIu64" decoding complete.", + hconn->conn_id, hconn->in_link_id, hreq->base.msg_id); + hreq->response_complete = true; + return; + } _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses); qd_message_t *msg = rmsg->msg ? rmsg->msg : qdr_delivery_message(rmsg->dlv); @@ -1168,7 +1193,7 @@ static _server_request_t *_create_request_context(qdr_http1_connection_t *hconn, reply_to = (char*) qd_iterator_copy(reply_to_itr); qd_iterator_free(reply_to_itr); - if (!reply_to) { + if (!reply_to && !hconn->cfg.event_channel) { qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, "[C%"PRIu64"][L%"PRIu64"] Rejecting message no reply-to.", hconn->conn_id, hconn->out_link_id); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org