This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bcca5f  DISPATCH-1898: eventChannel option for http1 adaptor
0bcca5f is described below

commit 0bcca5fb5e9ce18f1d38579f2d6cc326ee87ef4d
Author: Gordon Sim <g...@redhat.com>
AuthorDate: Thu Dec 17 21:58:23 2020 +0000

    DISPATCH-1898: eventChannel option for http1 adaptor
---
 src/adaptors/http1/http1_client.c | 76 ++++++++++++++++++++++++++++++++-------
 src/adaptors/http1/http1_server.c | 33 ++++++++++++++---
 2 files changed, 93 insertions(+), 16 deletions(-)

diff --git a/src/adaptors/http1/http1_client.c 
b/src/adaptors/http1/http1_client.c
index ffae7ec..fe0b6c2 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -34,6 +34,7 @@
 #define LISTENER_BACKLOG  16
 
 const char *CONTENT_LENGTH_KEY = "Content-Length";
+const char *POST_METHOD = "POST";
 
 //
 // State for a single response message to be sent to the client via the raw
@@ -77,6 +78,8 @@ typedef struct _client_request_t {
     //
     _client_response_msg_list_t responses;
 
+    uint32_t error_code;
+    char    *error_text;
     bool codec_completed;     // encoder/decoder done
     bool cancelled;
     bool close_on_complete;   // close the conn when this request is complete
@@ -110,6 +113,7 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
 static void _client_response_msg_free(_client_request_t *req, 
_client_response_msg_t *rmsg);
 static void _client_request_free(_client_request_t *req);
 static void _write_pending_response(_client_request_t *req);
+static void _deliver_request(qdr_http1_connection_t *hconn, _client_request_t 
*req);
 
 
 ////////////////////////////////////////////////////////
@@ -332,7 +336,14 @@ static void 
_setup_client_connection(qdr_http1_connection_t *hconn)
 
     // simulate a client publisher link to the HTTP server:
     qdr_terminus_t *target = qdr_terminus(0);
-    qdr_terminus_set_address(target, hconn->cfg.address);
+    if (hconn->cfg.event_channel) {
+        //For an event channel, we always want to be able to handle
+        //incoming requests. We use an anonymous publisher so that we
+        //get credit regardless of there being consumers.
+        qdr_terminus_set_address(target, NULL);
+    } else {
+        qdr_terminus_set_address(target, hconn->cfg.address);
+    }
     hconn->in_link = qdr_link_first_attach(hconn->qdr_conn,
                                            QD_INCOMING,
                                            qdr_terminus(0),  //qdr_terminus_t  
 *source,
@@ -414,7 +425,7 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
     case PN_RAW_CONNECTION_NEED_READ_BUFFERS: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] Need read buffers", 
hconn->conn_id);
         // @TODO(kgiusti): backpressure if no credit
-        if (hconn->client.reply_to_addr /* && hconn->in_link_credit > 0 */) {
+        if (hconn->client.reply_to_addr || hconn->cfg.event_channel /* && 
hconn->in_link_credit > 0 */) {
             int granted = qda_raw_conn_grant_read_buffers(hconn->raw_conn);
             qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] %d read buffers granted",
                    hconn->conn_id, granted);
@@ -478,6 +489,16 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
                        hconn->conn_id, hconn->out_link_id, hreq->base.msg_id);
             need_close = true;
         } else {
+            if (hreq->error_code) {
+                qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE, 
"[C%"PRIu64"][L%"PRIu64"] Responding with %i %s", hconn->conn_id,
+                       hconn->out_link_id, hreq->error_code, hreq->error_text);
+                _client_response_msg_t *rmsg = new__client_response_msg_t();
+                ZERO(rmsg);
+                DEQ_INIT(rmsg->out_data.fifo);
+                DEQ_INSERT_TAIL(hreq->responses, rmsg);
+                qdr_http1_error_response(&hreq->base, hreq->error_code, 
hreq->error_text);
+                _write_pending_response(hreq);
+            }
             // Can we retire the current outgoing response messages?
             //
             _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
@@ -529,9 +550,7 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
                        hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
 
                 hconn->in_link_credit -= 1;
-                hreq->request_dlv = qdr_link_deliver(hconn->in_link, 
hreq->request_msg, 0, false, 0, 0, 0, 0);
-                qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
-                hreq->request_msg = 0;
+                _deliver_request(hconn, hreq);
             }
 
             _write_pending_response(hreq);
@@ -659,6 +678,17 @@ static int _client_rx_request_cb(h1_codec_request_state_t 
*hrs,
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
            "[C%"PRIu64"] HTTP request received: msg-id=%"PRIu64" method=%s 
target=%s version=%"PRIi32".%"PRIi32,
            hconn->conn_id, creq->base.msg_id, method, target, version_major, 
version_minor);
+    if (hconn->cfg.event_channel) {
+        if (strcasecmp(method, POST_METHOD) == 0) {
+            creq->error_code = 204;
+            creq->error_text = "Event posted.";
+            qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"] Event 
posted", hconn->conn_id);
+        } else {
+            creq->error_code = 405;
+            creq->error_text = "Invalid method for event channel, only POST is 
allowed.";
+            qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING, "[C%"PRIu64"] HTTP 
%s request not allowed for event channel", hconn->conn_id, method);
+        }
+    }
 
     creq->request_props = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 
0);
     qd_compose_start_map(creq->request_props);
@@ -744,6 +774,10 @@ static int 
_client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo
     _client_request_t *hreq = (_client_request_t*) 
h1_codec_request_state_get_context(hrs);
     qdr_http1_connection_t *hconn = hreq->base.hconn;
 
+    if (hconn->cfg.event_channel && 
strcasecmp(h1_codec_request_state_method(hrs), POST_METHOD) != 0) {
+        return 0;
+    }
+
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
            "[C%"PRIu64"][L%"PRIu64"] HTTP request headers done.",
            hconn->conn_id, hconn->in_link_id);
@@ -770,7 +804,12 @@ static int 
_client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo
     // @TODO(kgiusti) set to: to target?
     qd_compose_insert_string(props, hconn->cfg.address); // to
     qd_compose_insert_string(props, h1_codec_request_state_method(hrs));  // 
subject
-    qd_compose_insert_string(props, hconn->client.reply_to_addr);   // reply-to
+    if (hconn->cfg.event_channel) {
+        // event channel does not want replies
+        qd_compose_insert_null(props);                                  // 
reply-to
+    } else {
+        qd_compose_insert_string(props, hconn->client.reply_to_addr);   // 
reply-to
+    }
     qd_compose_insert_null(props);                      // correlation-id
     qd_compose_insert_null(props);                      // content-type
     qd_compose_insert_null(props);                      // content-encoding
@@ -797,9 +836,7 @@ static int 
_client_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo
                "[C%"PRIu64"][L%"PRIu64"] Delivering request msg-id=%"PRIu64" 
to router",
                hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
 
-        hreq->request_dlv = qdr_link_deliver(hconn->in_link, 
hreq->request_msg, 0, false, 0, 0, 0, 0);
-        qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
-        hreq->request_msg = 0;
+        _deliver_request(hconn, hreq);
     }
 
     return 0;
@@ -814,6 +851,10 @@ static int _client_rx_body_cb(h1_codec_request_state_t 
*hrs, qd_buffer_list_t *b
 {
     _client_request_t       *hreq = (_client_request_t*) 
h1_codec_request_state_get_context(hrs);
     qdr_http1_connection_t *hconn = hreq->base.hconn;
+    if (hconn->cfg.event_channel && 
strcasecmp(h1_codec_request_state_method(hrs), POST_METHOD) != 0) {
+        qd_buffer_list_free_buffers(body);
+        return 0;
+    }
     qd_message_t             *msg = hreq->request_msg ? hreq->request_msg : 
qdr_delivery_message(hreq->request_dlv);
 
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
@@ -936,9 +977,7 @@ void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t    
*adaptor,
                    "[C%"PRIu64"][L%"PRIu64"] Delivering next request 
msg-id=%"PRIu64" to router",
                    hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
 
-            hreq->request_dlv = qdr_link_deliver(hconn->in_link, 
hreq->request_msg, 0, false, 0, 0, 0, 0);
-            qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
-            hreq->request_msg = 0;
+            _deliver_request(hconn, hreq);
         }
     }
 }
@@ -1639,3 +1678,16 @@ void 
qdr_http1_client_core_conn_close(qdr_http1_adaptor_t *adaptor,
     //
     qdr_http1_close_connection(hconn, error);
 }
+
+static void _deliver_request(qdr_http1_connection_t *hconn, _client_request_t 
*hreq)
+{
+    if (hconn->cfg.event_channel) {
+        qd_iterator_t *addr = qd_message_field_iterator(hreq->request_msg, 
QD_FIELD_TO);
+        qd_iterator_reset_view(addr, ITER_VIEW_ADDRESS_HASH);
+        hreq->request_dlv = qdr_link_deliver_to(hconn->in_link, 
hreq->request_msg, 0, addr, false, 0, 0, 0, 0);
+    } else {
+        hreq->request_dlv = qdr_link_deliver(hconn->in_link, 
hreq->request_msg, 0, false, 0, 0, 0, 0);
+    }
+    qdr_delivery_set_context(hreq->request_dlv, (void*) hreq);
+    hreq->request_msg = 0;
+}
diff --git a/src/adaptors/http1/http1_server.c 
b/src/adaptors/http1/http1_server.c
index 9481a4a..6377d8f 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -821,6 +821,10 @@ static int _server_rx_response_cb(h1_codec_request_state_t 
*hrs,
            hconn->conn_id, hconn->in_link_id, hreq->base.msg_id, status_code, 
reason_phrase ? reason_phrase : "<NONE>",
            version_major, version_minor);
 
+    if (hconn->cfg.event_channel) {
+        return 0;
+    }
+
     _server_response_msg_t *rmsg = new__server_response_msg_t();
     ZERO(rmsg);
     rmsg->hreq = hreq;
@@ -859,6 +863,10 @@ static int _server_rx_header_cb(h1_codec_request_state_t 
*hrs, const char *key,
            "[C%"PRIu64"]L%"PRIu64"] HTTP response header received: key='%s' 
value='%s'",
            hconn->conn_id, hconn->in_link_id, key, value);
 
+    if (hconn->cfg.event_channel) {
+        return 0;
+    }
+
     // expect: running incoming request at tail
     _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
     assert(rmsg);
@@ -885,6 +893,10 @@ static int 
_server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo
            "[C%"PRIu64"][L%"PRIu64"] HTTP response headers done.",
            hconn->conn_id, hconn->in_link_id);
 
+    if (hconn->cfg.event_channel) {
+        return 0;
+    }
+
     // expect: running incoming request at tail
     _server_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
     assert(rmsg && !rmsg->msg);
@@ -953,14 +965,20 @@ static int _server_rx_body_cb(h1_codec_request_state_t 
*hrs, qd_buffer_list_t *b
 {
     _server_request_t       *hreq = (_server_request_t*) 
h1_codec_request_state_get_context(hrs);
     qdr_http1_connection_t *hconn = hreq->base.hconn;
-    _server_response_msg_t *rmsg  = DEQ_TAIL(hreq->responses);
-
-    qd_message_t *msg = rmsg->msg ? rmsg->msg : 
qdr_delivery_message(rmsg->dlv);
 
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
            "[C%"PRIu64"][L%"PRIu64"] HTTP response body received len=%zu.",
            hconn->conn_id, hconn->in_link_id, len);
 
+    if (hconn->cfg.event_channel) {
+        qd_buffer_list_free_buffers(body);
+        return 0;
+    }
+
+    _server_response_msg_t *rmsg  = DEQ_TAIL(hreq->responses);
+
+    qd_message_t *msg = rmsg->msg ? rmsg->msg : 
qdr_delivery_message(rmsg->dlv);
+
     qd_message_stream_data_append(msg, body);
 
     //
@@ -981,6 +999,13 @@ static void _server_rx_done_cb(h1_codec_request_state_t 
*hrs)
 {
     _server_request_t       *hreq = (_server_request_t*) 
h1_codec_request_state_get_context(hrs);
     qdr_http1_connection_t *hconn = hreq->base.hconn;
+    if (hconn->cfg.event_channel) {
+        qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
+               "[C%"PRIu64"][L%"PRIu64"] HTTP response message 
msg-id=%"PRIu64" decoding complete.",
+               hconn->conn_id, hconn->in_link_id, hreq->base.msg_id);
+        hreq->response_complete = true;
+        return;
+    }
     _server_response_msg_t *rmsg  = DEQ_TAIL(hreq->responses);
 
     qd_message_t *msg = rmsg->msg ? rmsg->msg : 
qdr_delivery_message(rmsg->dlv);
@@ -1168,7 +1193,7 @@ static _server_request_t 
*_create_request_context(qdr_http1_connection_t *hconn,
     reply_to = (char*) qd_iterator_copy(reply_to_itr);
     qd_iterator_free(reply_to_itr);
 
-    if (!reply_to) {
+    if (!reply_to && !hconn->cfg.event_channel) {
         qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
                "[C%"PRIu64"][L%"PRIu64"] Rejecting message no reply-to.",
                hconn->conn_id, hconn->out_link_id);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to