This is an automated email from the ASF dual-hosted git repository.

gsim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new 5593989  DISPATCH-1780: initial support for aggregated multicast
5593989 is described below

commit 5593989c3e117d993a1676706aac1875673a0d94
Author: Gordon Sim <g...@redhat.com>
AuthorDate: Mon Nov 2 22:03:40 2020 +0000

    DISPATCH-1780: initial support for aggregated multicast
---
 include/qpid/dispatch/http1_codec.h             |  19 ++
 python/qpid_dispatch/management/qdrouter.json   |  30 +++
 python/qpid_dispatch_internal/router/message.py |  17 +-
 src/adaptors/http1/http1_client.c               | 305 +++++++++++++++++++++++-
 src/adaptors/http1/http1_codec.c                | 100 +++++++-
 src/adaptors/http1/http1_private.h              |   3 +-
 src/adaptors/http1/http1_server.c               |  45 +++-
 src/adaptors/http_common.c                      |  13 +
 src/adaptors/http_common.h                      |   8 +
 src/python_embedded.c                           |  69 ++++++
 src/python_private.h                            |   6 +
 11 files changed, 590 insertions(+), 25 deletions(-)

diff --git a/include/qpid/dispatch/http1_codec.h 
b/include/qpid/dispatch/http1_codec.h
index 8aac8c4..79dfcbf 100644
--- a/include/qpid/dispatch/http1_codec.h
+++ b/include/qpid/dispatch/http1_codec.h
@@ -231,6 +231,10 @@ int h1_codec_tx_add_header(h1_codec_request_state_t *hrs, 
const char *key, const
 //
 int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_stream_data_t 
*stream_data);
 
+// Write body as string
+//
+int h1_codec_tx_body_str(h1_codec_request_state_t *hrs, char *data);
+
 // outgoing message construction complete.  The request_complete() callback MAY
 // occur during this call.
 //
@@ -241,5 +245,20 @@ int h1_codec_tx_body(h1_codec_request_state_t *hrs, 
qd_message_stream_data_t *st
 //
 int h1_codec_tx_done(h1_codec_request_state_t *hrs, bool *need_close);
 
+// begin multipart content; this will generate a boundary marker and set the 
content type header
+//
+int h1_codec_tx_begin_multipart(h1_codec_request_state_t *hrs);
+
+// begin a new multipart section
+//
+int h1_codec_tx_begin_multipart_section(h1_codec_request_state_t *hrs);
+
+// mark the end of multipart data
+//
+int h1_codec_tx_end_multipart(h1_codec_request_state_t *hrs);
+
+uint64_t h1_codec_tx_multipart_section_boundary_length();
+uint64_t h1_codec_tx_multipart_end_boundary_length();
+
 
 #endif // http1_codec_H
diff --git a/python/qpid_dispatch/management/qdrouter.json 
b/python/qpid_dispatch/management/qdrouter.json
index 792515d..0efab69 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -1124,6 +1124,21 @@
                     "default": "HTTP1",
                     "required": false,
                     "create": true
+                },
+                "aggregation": {
+                    "type": [
+                        "multipart",
+                        "json"
+                    ],
+                    "required": false,
+                    "description": "Aggregation mode for responses when used 
in conjunction with multicast address.",
+                    "create": true
+                },
+                "eventChannel": {
+                    "type": "boolean",
+                    "required": false,
+                    "description": "Enables restricted event mode where no 
reponses are sent to request and only post is allowed",
+                    "create": true
                 }
             }
         },
@@ -1165,6 +1180,21 @@
                     "default": "HTTP1",
                     "required": false,
                     "create": true
+                },
+                "aggregation": {
+                    "type": [
+                        "multipart",
+                        "json"
+                    ],
+                    "required": false,
+                    "description": "Aggregation mode for responses when used 
in conjunction with multicast address.",
+                    "create": true
+                },
+                "eventChannel": {
+                    "type": "boolean",
+                    "required": false,
+                    "description": "Enables restricted event mode where no 
reponses are sent to request and only post is allowed",
+                    "create": true
                 }
             }
         },
diff --git a/python/qpid_dispatch_internal/router/message.py 
b/python/qpid_dispatch_internal/router/message.py
index ed73288..eb3ef7a 100644
--- a/python/qpid_dispatch_internal/router/message.py
+++ b/python/qpid_dispatch_internal/router/message.py
@@ -16,11 +16,11 @@
 # specific language governing permissions and limitations
 # under the License
 #
-
 from __future__ import unicode_literals
 from __future__ import division
 from __future__ import absolute_import
 from __future__ import print_function
+import json
 
 """Python class to hold message data"""
 
@@ -38,7 +38,7 @@ class Message(object):
     @ivar properties: Application properties.
     """
 
-    _fields = ['address', 'properties', 'body', 'reply_to', 'correlation_id']
+    _fields = ['address', 'properties', 'body', 'reply_to', 'correlation_id', 
'content_type']
 
     def __init__(self, **kwds):
         """All instance variables can be set as keywords. See L{Message}"""
@@ -50,3 +50,16 @@ class Message(object):
     def __repr__(self):
         return "%s(%s)" % (type(self).__name__,
                            ", ".join("%s=%r"%(f, getattr(self, f)) for f in 
self._fields))
+
+def simplify(msg):
+    m = {}
+    for k, v in msg.properties.items():
+        m[k] = v
+    if msg.body:
+        m["body"] = msg.body.decode()
+    if msg.content_type:
+        m["content_type"] = msg.content_type
+    return m
+
+def messages_to_json(msgs):
+    return json.dumps([simplify(m) for m in msgs], indent=4)
diff --git a/src/adaptors/http1/http1_client.c 
b/src/adaptors/http1/http1_client.c
index 33412da..befe499 100644
--- a/src/adaptors/http1/http1_client.c
+++ b/src/adaptors/http1/http1_client.c
@@ -19,6 +19,7 @@
 
 #include "http1_private.h"
 #include "adaptors/adaptor_utils.h"
+#include "python_private.h"
 
 #include <proton/listener.h>
 #include <proton/proactor.h>
@@ -32,6 +33,7 @@
 #define DEFAULT_CAPACITY 250
 #define LISTENER_BACKLOG  16
 
+const char *CONTENT_LENGTH_KEY = "Content-Length";
 
 //
 // State for a single response message to be sent to the client via the raw
@@ -80,6 +82,8 @@ typedef struct _client_request_t {
     bool close_on_complete;   // close the conn when this request is complete
     bool conn_close_hdr;      // add Connection: close to response msg
 
+    uint32_t version_major;
+    uint32_t version_minor;
 } _client_request_t;
 ALLOC_DECLARE(_client_request_t);
 ALLOC_DEFINE(_client_request_t);
@@ -154,6 +158,8 @@ static qdr_http1_connection_t 
*_create_client_connection(qd_http_listener_t *li)
     hconn->cfg.port = qd_strdup(li->config.port);
     hconn->cfg.address = qd_strdup(li->config.address);
     hconn->cfg.site = li->config.site ? qd_strdup(li->config.site) : 0;
+    hconn->cfg.event_channel = li->config.event_channel;
+    hconn->cfg.aggregation = li->config.aggregation;
 
     hconn->raw_conn = pn_raw_connection();
     pn_raw_connection_set_context(hconn->raw_conn, &hconn->handler_context);
@@ -478,7 +484,8 @@ static void _handle_connection_events(pn_event_t *e, 
qd_server_t *qd_server, voi
             _client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses);
             while (rmsg &&
                    rmsg->dispo &&
-                   DEQ_IS_EMPTY(rmsg->out_data.fifo)) {
+                   DEQ_IS_EMPTY(rmsg->out_data.fifo) &&
+                   hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
                 // response message fully received and forwarded to client
                 qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
                        "[C%"PRIu64"][L%"PRIu64"] HTTP client request 
msg-id=%"PRIu64" settling response, dispo=0x%"PRIx64,
@@ -558,10 +565,19 @@ static void 
_client_tx_buffers_cb(h1_codec_request_state_t *hrs, qd_buffer_list_
            "[C%"PRIu64"][L%"PRIu64"] %u request octets encoded",
            hconn->conn_id, hconn->out_link_id, len);
 
-    // responses are decoded one at a time - the current response it at the
-    // tail of the response list
 
-    _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+    _client_response_msg_t *rmsg;
+    if (hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
+        // responses are decoded one at a time - the current response it at the
+        // tail of the response list
+        rmsg = DEQ_TAIL(hreq->responses);
+    } else {
+        // when responses are aggregated the buffers don't need to be
+        // correlated to specific responses as they will all be
+        // written out together, so can just use the head of the
+        // response list
+        rmsg = DEQ_HEAD(hreq->responses);
+    }
     assert(rmsg);
     qdr_http1_enqueue_buffer_list(&rmsg->out_data, blist);
 
@@ -592,10 +608,19 @@ static void 
_client_tx_stream_data_cb(h1_codec_request_state_t *hrs, qd_message_
            "[C%"PRIu64"][L%"PRIu64"] Sending body data to client",
            hconn->conn_id, hconn->out_link_id);
 
-    // responses are decoded one at a time - the current response it at the
-    // tail of the response list
 
-    _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
+    _client_response_msg_t *rmsg;
+    if (hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
+        // responses are decoded one at a time - the current response it at the
+        // tail of the response list
+        rmsg = DEQ_TAIL(hreq->responses);
+    } else {
+        // when responses are aggregated the buffers don't need to be
+        // correlated to specific responses as they will all be
+        // written out together, so can just use the head of the
+        // response list
+        rmsg = DEQ_HEAD(hreq->responses);
+    }
     assert(rmsg);
     qdr_http1_enqueue_stream_data(&rmsg->out_data, stream_data);
 
@@ -626,6 +651,8 @@ static int _client_rx_request_cb(h1_codec_request_state_t 
*hrs,
     creq->base.lib_rs = hrs;
     creq->base.hconn = hconn;
     creq->close_on_complete = (version_minor == 0);
+    creq->version_major = version_major;
+    creq->version_minor = version_minor;
     DEQ_INIT(creq->responses);
 
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
@@ -915,6 +942,226 @@ void qdr_http1_client_core_link_flow(qdr_http1_adaptor_t  
  *adaptor,
     }
 }
 
+static bool _get_multipart_content_length(_client_request_t *hreq, char *value)
+{
+    uint64_t total = 0;
+    for (_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses); rmsg; rmsg 
= rmsg->next) {
+        qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
+        uint64_t content_length = 
h1_codec_tx_multipart_section_boundary_length();
+        bool got_body_length = false;
+
+        qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, 
QD_FIELD_APPLICATION_PROPERTIES);
+        if (app_props_iter) {
+            qd_parsed_field_t *app_props = qd_parse(app_props_iter);
+            if (app_props && qd_parse_is_map(app_props)) {
+                // now send all headers in app properties
+                qd_parsed_field_t *key = qd_field_first_child(app_props);
+                while (key) {
+                    qd_parsed_field_t *value = qd_field_next_child(key);
+                    if (!value)
+                        break;
+
+                    qd_iterator_t *i_key = qd_parse_raw(key);
+                    if (!i_key)
+                        break;
+
+                    if (qd_iterator_equal(i_key, (const unsigned char*) 
CONTENT_LENGTH_KEY)) {
+                        qd_iterator_t *i_value = qd_parse_raw(value);
+                        if (i_value) {
+                            char *length_str = (char*) 
qd_iterator_copy(i_value);
+                            uint64_t body_length;
+                            sscanf(length_str, "%"SCNu64, &body_length);
+                            free(length_str);
+                            got_body_length = true;
+                            content_length += body_length;
+                        }
+                    } else if (!qd_iterator_prefix(i_key, 
HTTP1_HEADER_PREFIX)) {
+                        qd_iterator_t *i_value = qd_parse_raw(value);
+                        if (!i_value)
+                            break;
+
+                        content_length += qd_iterator_length(i_key) + 2 + 
qd_iterator_length(i_value) + 2;
+                    }
+
+                    key = qd_field_next_child(value);
+                }
+            }
+            qd_parse_free(app_props);
+        }
+        qd_iterator_free(app_props_iter);
+        if (got_body_length) {
+            total += content_length;
+        } else {
+            return false;
+        }
+    }
+    total += h1_codec_tx_multipart_end_boundary_length();
+    sprintf(value, "%"SCNu64, total);
+    return true;
+}
+
+static void _encode_json_response(_client_request_t *hreq)
+{
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+    qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] encoding json 
response", hconn->conn_id);
+    bool ok = !h1_codec_tx_response(hreq->base.lib_rs, 200, NULL, 
hreq->version_major, hreq->version_minor);
+    if (!ok) {
+        qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] Could not 
encode response", hconn->conn_id);
+        return;
+    }
+    PyObject* msgs = 0;
+    qd_json_msgs_init(&msgs);
+    for (_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses); rmsg; rmsg 
= rmsg->next) {
+        qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
+        qd_json_msgs_append(msgs, msg);
+        rmsg->encoded = true;
+    }
+    char *body = qd_json_msgs_string(msgs);
+    if (body) {
+        h1_codec_tx_add_header(hreq->base.lib_rs, "Content-Type", 
"application/json");
+        int len = strlen(body);
+        char content_length[25];
+        sprintf(content_length, "%i", len);
+        h1_codec_tx_add_header(hreq->base.lib_rs, CONTENT_LENGTH_KEY, 
content_length);
+        h1_codec_tx_body_str(hreq->base.lib_rs, body);
+        free(body);
+    } else {
+        qd_log(hconn->adaptor->log, QD_LOG_ERROR, "[C%"PRIu64"] No aggregated 
json response returned", hconn->conn_id);
+    }
+    bool need_close;
+    h1_codec_tx_done(hreq->base.lib_rs, &need_close);
+    hreq->close_on_complete = need_close || hreq->close_on_complete;
+    hreq->codec_completed = true;
+}
+
+static void _encode_multipart_response(_client_request_t *hreq)
+{
+    qdr_http1_connection_t *hconn = hreq->base.hconn;
+    qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] encoding multipart 
response", hconn->conn_id);
+    bool ok = !h1_codec_tx_response(hreq->base.lib_rs, 200, NULL, 
hreq->version_major, hreq->version_minor);
+    char content_length[25];
+    if (_get_multipart_content_length(hreq, content_length)) {
+        h1_codec_tx_add_header(hreq->base.lib_rs, CONTENT_LENGTH_KEY, 
content_length);
+    }
+    h1_codec_tx_begin_multipart(hreq->base.lib_rs);
+    for (_client_response_msg_t *rmsg = DEQ_HEAD(hreq->responses); rmsg; rmsg 
= rmsg->next) {
+        h1_codec_tx_begin_multipart_section(hreq->base.lib_rs);
+        qd_message_t *msg = qdr_delivery_message(rmsg->dlv);
+
+        qd_iterator_t *app_props_iter = qd_message_field_iterator(msg, 
QD_FIELD_APPLICATION_PROPERTIES);
+        if (app_props_iter) {
+            qd_parsed_field_t *app_props = qd_parse(app_props_iter);
+            if (app_props && qd_parse_is_map(app_props)) {
+                // now send all headers in app properties
+                qd_parsed_field_t *key = qd_field_first_child(app_props);
+                while (ok && key) {
+                    qd_parsed_field_t *value = qd_field_next_child(key);
+                    if (!value)
+                        break;
+
+                    qd_iterator_t *i_key = qd_parse_raw(key);
+                    if (!i_key)
+                        break;
+
+                    // ignore the special headers added by the mapping and 
content-length field (TODO: case insensitive comparison for content-length)
+                    if (!qd_iterator_prefix(i_key, HTTP1_HEADER_PREFIX) && 
!qd_iterator_equal(i_key, (const unsigned char*) CONTENT_LENGTH_KEY)) {
+                        qd_iterator_t *i_value = qd_parse_raw(value);
+                        if (!i_value)
+                            break;
+
+                        char *header_key = (char*) qd_iterator_copy(i_key);
+                        char *header_value = (char*) qd_iterator_copy(i_value);
+                        ok = !h1_codec_tx_add_header(hreq->base.lib_rs, 
header_key, header_value);
+
+                        free(header_key);
+                        free(header_value);
+                    }
+
+                    key = qd_field_next_child(value);
+                }
+            }
+            qd_parse_free(app_props);
+        }
+        qd_iterator_free(app_props_iter);
+        rmsg->headers_encoded = true;
+
+        qd_message_stream_data_t *body_data = 0;
+        bool done = false;
+        while (ok && !done) {
+            switch (qd_message_next_stream_data(msg, &body_data)) {
+
+            case QD_MESSAGE_STREAM_DATA_BODY_OK:
+
+                qd_log(hconn->adaptor->log, QD_LOG_TRACE,
+                       "[C%"PRIu64"][L%"PRIu64"] Encoding response body data",
+                       hconn->conn_id, hconn->out_link_id);
+
+                if (h1_codec_tx_body(hreq->base.lib_rs, body_data)) {
+                    qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                           "[C%"PRIu64"][L%"PRIu64"] body data encode failed",
+                           hconn->conn_id, hconn->out_link_id);
+                    ok = false;
+                }
+                break;
+
+            case QD_MESSAGE_STREAM_DATA_NO_MORE:
+                // indicate this message is complete
+                qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+                       "[C%"PRIu64"][L%"PRIu64"] response message encoding 
completed",
+                       hconn->conn_id, hconn->out_link_id);
+                done = true;
+                break;
+
+            case QD_MESSAGE_STREAM_DATA_INCOMPLETE:
+                qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                       "[C%"PRIu64"][L%"PRIu64"] Ignoring incomplete body data 
in aggregated response.",
+                       hconn->conn_id, hconn->out_link_id);
+                done = true;
+                break;  // wait for more
+
+            case QD_MESSAGE_STREAM_DATA_INVALID:
+                qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                       "[C%"PRIu64"][L%"PRIu64"] Ignoring corrupted body data 
in aggregated response.",
+                       hconn->conn_id, hconn->out_link_id);
+                done = true;
+                break;
+
+            case QD_MESSAGE_STREAM_DATA_FOOTER_OK:
+                qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                       "[C%"PRIu64"][L%"PRIu64"] Ignoring footer in aggregated 
response.",
+                       hconn->conn_id, hconn->out_link_id);
+                done = true;
+                break;
+            }
+        }
+        rmsg->encoded = true;
+
+    }
+    h1_codec_tx_end_multipart(hreq->base.lib_rs);
+    bool need_close;
+    h1_codec_tx_done(hreq->base.lib_rs, &need_close);
+    hreq->close_on_complete = need_close || hreq->close_on_complete;
+    hreq->codec_completed = true;
+}
+
+static void _encode_aggregated_response(qdr_http1_connection_t *hconn, 
_client_request_t *hreq)
+{
+    if (hconn->cfg.aggregation == QD_AGGREGATION_MULTIPART) {
+        _encode_multipart_response(hreq);
+    } else if (hconn->cfg.aggregation == QD_AGGREGATION_JSON) {
+        _encode_json_response(hreq);
+    }
+}
+
+static void _encode_empty_response(qdr_http1_connection_t *hconn, 
_client_request_t *hreq)
+{
+    qd_log(hconn->adaptor->log, QD_LOG_TRACE, "[C%"PRIu64"] encoding empty 
response", hconn->conn_id);
+    h1_codec_tx_response(hreq->base.lib_rs, 204, NULL, hreq->version_major, 
hreq->version_minor);
+    bool need_close;
+    h1_codec_tx_done(hreq->base.lib_rs, &need_close);
+    hreq->close_on_complete = need_close || hreq->close_on_complete;
+    hreq->codec_completed = true;
+}
 
 // Handle disposition/settlement update for the outstanding request msg
 //
@@ -935,7 +1182,27 @@ void 
qdr_http1_client_core_delivery_update(qdr_http1_adaptor_t      *adaptor,
     if (disp && disp != PN_RECEIVED && hreq->request_dispo == 0) {
         // terminal disposition
         hreq->request_dispo = disp;
-        if (disp != PN_ACCEPTED) {
+        if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) {
+            // when aggregating response from a multicast request, the
+            // acknowledgement of the request triggers generating the
+            // output from the responses received
+            if (settled) {
+                if (DEQ_IS_EMPTY(hreq->responses)) {
+                    qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG,
+                           "[C%"PRIu64"][L%"PRIu64"] Aggregation request 
settled but no responses received.", hconn->conn_id, hconn->in_link_id);
+                    _encode_empty_response(hconn, hreq);
+                } else {
+                    _encode_aggregated_response(hconn, hreq);
+                }
+                _write_pending_response(hreq);
+            }
+        } else if (disp != PN_ACCEPTED) {
+            // no response message is going to arrive.  Now what?  For now fake
+            // a response from the server by using the codec to write an error
+            // response on the behalf of the server.
+            qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
+                   "[C%"PRIu64"][L%"PRIu64"] HTTP request failure, 
outcome=0x%"PRIx64,
+                   hconn->conn_id, hconn->in_link_id, disp);
 
             qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
                    "[C%"PRIu64"][L%"PRIu64"] HTTP request msg-id=%"PRIu64" 
failure, outcome=0x%"PRIx64,
@@ -1238,6 +1505,28 @@ uint64_t 
qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t    *adaptor,
     _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses);
     assert(rmsg && rmsg->dlv == delivery);
 
+    // when aggregating responses, they are saved on the list until
+    // the request has been settled, then encoded in the configured
+    // aggregation format
+    if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) {
+        if (!qd_message_receive_complete(msg)) {
+            qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, 
"[C%"PRIu64"][L%"PRIu64"] Response incomplete (%i responses received)", 
hconn->conn_id, link->identity, DEQ_SIZE(hreq->responses));
+            return 0;
+        }
+        qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] 
Received response (%i responses received), settling", hconn->conn_id, 
link->identity, DEQ_SIZE(hreq->responses));
+        rmsg->dispo = PN_ACCEPTED;
+        qd_message_set_send_complete(msg);
+        qdr_link_flow(qdr_http1_adaptor->core, link, 1, false);
+        qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+                                          rmsg->dlv,
+                                          rmsg->dispo,
+                                          true,   // settled,
+                                          0,      // error
+                                          0,      // dispo data
+                                          false);
+        return PN_ACCEPTED;
+    }
+
     if (!rmsg->dispo) {
         rmsg->dispo = _encode_response_message(hreq, rmsg);
         if (rmsg->dispo) {
diff --git a/src/adaptors/http1/http1_codec.c b/src/adaptors/http1/http1_codec.c
index 36d7ac1..6c03e7f 100644
--- a/src/adaptors/http1/http1_codec.c
+++ b/src/adaptors/http1/http1_codec.c
@@ -23,6 +23,7 @@
 #include <qpid/dispatch/iterator.h>
 #include <qpid/dispatch/buffer.h>
 #include <qpid/dispatch/alloc_pool.h>
+#include <qpid/dispatch/discriminator.h>
 
 #include <ctype.h>
 #include <stdio.h>
@@ -43,7 +44,9 @@
 const uint8_t CR_TOKEN = '\r';
 const uint8_t LF_TOKEN = '\n';
 const char   *CRLF = "\r\n";
-
+const char   *DOUBLE_HYPHEN = "--";
+const char   *CONTENT_TYPE_KEY = "Content-Type";
+const char   *MULTIPART_CONTENT_TYPE_PREFIX = "multipart/mixed; boundary=";
 const qd_iterator_pointer_t NULL_I_PTR = {0};
 
 // true for informational response codes
@@ -181,6 +184,8 @@ struct h1_codec_connection_t {
         bool is_request;
         bool is_chunked;
 
+        char *boundary_marker;//used for multipart content
+
         // headers provided
         bool hdr_content_length;
     } encoder;
@@ -292,6 +297,10 @@ static void encoder_reset(struct encoder_t *encoder)
     encoder->is_request = false;
     encoder->is_chunked = false;
     encoder->hdr_content_length = false;
+    if (encoder->boundary_marker) {
+        free(encoder->boundary_marker);
+        encoder->boundary_marker = 0;
+    }
 }
 
 
@@ -1531,22 +1540,85 @@ int h1_codec_tx_add_header(h1_codec_request_state_t 
*hrs, const char *key, const
 }
 
 
+static inline void _flush_output(h1_codec_request_state_t *hrs, struct 
encoder_t *encoder)
+{
+    // flush all pending output.  From this point out the outgoing queue is
+    // no longer used for this message
+    hrs->conn->config.tx_buffers(hrs, &encoder->outgoing, 
qd_buffer_list_length(&encoder->outgoing));
+    DEQ_INIT(encoder->outgoing);
+    encoder->write_ptr = NULL_I_PTR;
+}
+
 static inline void _flush_headers(h1_codec_request_state_t *hrs, struct 
encoder_t *encoder)
 {
     if (!encoder->headers_sent) {
         // need to terminate any headers by sending the plain CRLF that follows
         // the headers
         write_string(encoder, CRLF);
-
-        // flush all pending output.  From this point out the outgoing queue is
-        // no longer used for this message
-        hrs->conn->config.tx_buffers(hrs, &encoder->outgoing, 
qd_buffer_list_length(&encoder->outgoing));
-        DEQ_INIT(encoder->outgoing);
-        encoder->write_ptr = NULL_I_PTR;
+        _flush_output(hrs, encoder);
         encoder->headers_sent = true;
     }
 }
 
+int h1_codec_tx_begin_multipart(h1_codec_request_state_t *hrs)
+{
+    h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
+    struct encoder_t *encoder = &conn->encoder;
+    encoder->boundary_marker = (char*) malloc(QD_DISCRIMINATOR_SIZE + 2);
+    qd_generate_discriminator(encoder->boundary_marker);
+    char *content_type = (char*) malloc(strlen(MULTIPART_CONTENT_TYPE_PREFIX) 
+ strlen(encoder->boundary_marker) + 1);
+    strcpy(content_type, MULTIPART_CONTENT_TYPE_PREFIX);
+    strcpy(content_type + strlen(content_type), encoder->boundary_marker);
+    h1_codec_tx_add_header(hrs, CONTENT_TYPE_KEY, content_type);
+    free(content_type);
+
+    return 0;
+}
+
+int h1_codec_tx_begin_multipart_section(h1_codec_request_state_t *hrs)
+{
+    h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
+    struct encoder_t *encoder = &conn->encoder;
+
+    //reset headers_sent flag for the new section
+    encoder->headers_sent = false;
+    write_string(encoder, CRLF);
+    write_string(encoder, DOUBLE_HYPHEN);
+    write_string(encoder, encoder->boundary_marker);
+    write_string(encoder, CRLF);
+
+    return 0;
+}
+
+int h1_codec_tx_end_multipart(h1_codec_request_state_t *hrs)
+{
+    h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
+    struct encoder_t *encoder = &conn->encoder;
+
+    write_string(encoder, CRLF);
+    write_string(encoder, DOUBLE_HYPHEN);
+    write_string(encoder, encoder->boundary_marker);
+    write_string(encoder, DOUBLE_HYPHEN);
+    write_string(encoder, CRLF);
+    encoder->headers_sent = false;
+    _flush_headers(hrs, encoder);
+
+    free(encoder->boundary_marker);
+    encoder->boundary_marker = 0;
+
+    return 0;
+}
+
+
+uint64_t h1_codec_tx_multipart_section_boundary_length()
+{
+    return QD_DISCRIMINATOR_SIZE + 4 + 2;
+}
+
+uint64_t h1_codec_tx_multipart_end_boundary_length()
+{
+    return QD_DISCRIMINATOR_SIZE + 4 + 4;
+}
 
 // just forward the body chain along
 int h1_codec_tx_body(h1_codec_request_state_t *hrs, qd_message_stream_data_t 
*stream_data)
@@ -1564,6 +1636,20 @@ int h1_codec_tx_body(h1_codec_request_state_t *hrs, 
qd_message_stream_data_t *st
     return 0;
 }
 
+int h1_codec_tx_body_str(h1_codec_request_state_t *hrs, char *data)
+{
+    h1_codec_connection_t *conn = h1_codec_request_state_get_connection(hrs);
+    struct encoder_t *encoder = &conn->encoder;
+    if (!encoder->headers_sent) {
+        // need to terminate any headers by sending the plain CRLF that follows
+        // the headers
+        write_string(encoder, CRLF);
+        encoder->headers_sent = true;
+    }
+    write_string(encoder, data);
+    _flush_output(hrs, encoder);
+    return 0;
+}
 
 int h1_codec_tx_done(h1_codec_request_state_t *hrs, bool *need_close)
 {
diff --git a/src/adaptors/http1/http1_private.h 
b/src/adaptors/http1/http1_private.h
index e321a1c..59074d1 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -120,7 +120,6 @@ struct qdr_http1_request_base_t {
 };
 DEQ_DECLARE(qdr_http1_request_base_t, qdr_http1_request_list_t);
 
-
 // A single HTTP adaptor connection.
 //
 struct qdr_http1_connection_t {
@@ -141,6 +140,8 @@ struct qdr_http1_connection_t {
         char *address;
         char *site;
         char *host_port;
+        bool event_channel;
+        qd_http_aggregation_t aggregation;
     } cfg;
 
     // State if connected to an HTTP client
diff --git a/src/adaptors/http1/http1_server.c 
b/src/adaptors/http1/http1_server.c
index 92ea0d1..a0837bf 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -155,6 +155,8 @@ static qdr_http1_connection_t 
*_create_server_connection(qd_http_connector_t *ct
     hconn->cfg.host_port = qd_strdup(bconfig->host_port);
     hconn->server.connector = ctor;
     ctor->ctx = (void*)hconn;
+    hconn->cfg.event_channel = bconfig->event_channel;
+    hconn->cfg.aggregation = bconfig->aggregation;
 
     // for initiating a connection to the server
     hconn->server.reconnect_timer = qd_timer(qdr_http1_adaptor->core->qd, 
_do_reconnect, hconn);
@@ -427,6 +429,22 @@ static void _do_reconnect(void *context)
            "[C%"PRIu64"] Connecting to HTTP server...", conn_id);
 }
 
+static void _accept_and_settle_request(_server_request_t *hreq)
+{
+    qdr_delivery_remote_state_updated(qdr_http1_adaptor->core,
+                                      hreq->request_dlv,
+                                      hreq->request_dispo,
+                                      true,   // settled
+                                      0,      // error
+                                      0,      // dispo data
+                                      false);
+    // can now release the delivery
+    qdr_delivery_set_context(hreq->request_dlv, 0);
+    qdr_delivery_decref(qdr_http1_adaptor->core, hreq->request_dlv, "HTTP1 
adaptor request settled");
+    hreq->request_dlv = 0;
+
+    hreq->request_settled = true;
+}
 
 // Proton Raw Connection Events
 //
@@ -613,7 +631,8 @@ static bool _process_requests(qdr_http1_connection_t *hconn)
         if (hreq->request_dlv) {
             qd_message_set_discard(qdr_delivery_message(hreq->request_dlv), 
true);
 
-            if (!hreq->request_acked || !hreq->request_settled) {
+            if ((!hreq->request_acked || !hreq->request_settled) &&
+                hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
 
                 if (hreq->request_dispo == 0)
                     hreq->request_dispo = (hreq->base.out_http1_octets > 0
@@ -676,8 +695,9 @@ static bool _process_requests(qdr_http1_connection_t *hconn)
 
         // hreq->out_data.fifo ==> request message written to raw conn
         // DEQ_IS_EMPTY(hreq->responses)
-        if (!hreq->request_acked || (!hreq->request_settled
-                                     && DEQ_IS_EMPTY(hreq->responses))) {
+        if ((!hreq->request_acked || (!hreq->request_settled
+                                      && DEQ_IS_EMPTY(hreq->responses)))
+            && hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
 
             assert(hreq->request_dlv);
             assert(hreq->request_dispo == PN_ACCEPTED);
@@ -861,7 +881,7 @@ static int 
_server_rx_headers_done_cb(h1_codec_request_state_t *hrs, bool has_bo
     qdr_http1_connection_t *hconn = hreq->base.hconn;
 
     qd_log(qdr_http1_adaptor->log, QD_LOG_TRACE,
-           "[C%"PRIu64"][L%"PRIu64" HTTP response headers done.",
+           "[C%"PRIu64"][L%"PRIu64"] HTTP response headers done.",
            hconn->conn_id, hconn->in_link_id);
 
     // expect: running incoming request at tail
@@ -978,7 +998,7 @@ static void _server_rx_done_cb(h1_codec_request_state_t 
*hrs)
         }
     }
 
-    if (rmsg->dlv) {
+    if (rmsg->dlv && hconn->cfg.aggregation == QD_AGGREGATION_NONE) {
         // We've finished the delivery, and don't care about outcome/settlement
         _server_response_msg_free(hreq, rmsg);
     }
@@ -1064,7 +1084,10 @@ void qdr_http1_server_core_link_flow(qdr_http1_adaptor_t 
   *adaptor,
                     // stop here since response must be complete before we can 
deliver the next one.
                     break;
                 }
-
+                if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) {
+                    // stop here since response should not be freed until it 
is accepted
+                    break;
+                }
                 // else the delivery is complete no need to save it
                 _server_response_msg_free(hreq, rmsg);
                 rmsg = DEQ_HEAD(hreq->responses);
@@ -1090,9 +1113,17 @@ void 
qdr_http1_server_core_delivery_update(qdr_http1_adaptor_t      *adaptor,
     // Not much can be done with error dispositions (I think)
     if (disp != PN_ACCEPTED) {
         qd_log(adaptor->log, QD_LOG_WARNING,
-               "[C%"PRIu64"][L%"PRIu64"] response message not received, 
outcome=0x%"PRIx64,
+               "[C%"PRIu64"][L%"PRIu64"] response message was not accepted, 
outcome=0x%"PRIx64,
                hconn->conn_id, hconn->in_link_id, disp);
     }
+    if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) {
+        _server_request_t *hreq = (_server_request_t*)hbase;
+        _accept_and_settle_request(hreq);
+        hreq->request_acked = true;
+        qd_log(adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] request 
accepted", hconn->conn_id, hconn->in_link_id);
+        _server_response_msg_t *rmsg  = DEQ_TAIL(hreq->responses);
+        _server_response_msg_free(hreq, rmsg);
+    }
 }
 
 
diff --git a/src/adaptors/http_common.c b/src/adaptors/http_common.c
index 9034e37..fc15c33 100644
--- a/src/adaptors/http_common.c
+++ b/src/adaptors/http_common.c
@@ -32,6 +32,7 @@ ALLOC_DEFINE(qd_http_connector_t);
 static qd_error_t load_bridge_config(qd_dispatch_t *qd, 
qd_http_bridge_config_t *config, qd_entity_t* entity)
 {
     char *version_str = 0;
+    char *aggregation_str = 0;
 
     qd_error_clear();
     ZERO(config);
@@ -43,6 +44,8 @@ static qd_error_t load_bridge_config(qd_dispatch_t *qd, 
qd_http_bridge_config_t
     config->address = qd_entity_get_string(entity, "address");         CHECK();
     config->site    = qd_entity_opt_string(entity, "siteId", 0);       CHECK();
     version_str     = qd_entity_get_string(entity, "protocolVersion");  
CHECK();
+    aggregation_str = qd_entity_opt_string(entity, "aggregation", 0);  CHECK();
+    config->event_channel = qd_entity_opt_bool(entity, "eventChannel", false); 
CHECK();
 
     if (strcmp(version_str, "HTTP2") == 0) {
         config->version = VERSION_HTTP2;
@@ -52,6 +55,16 @@ static qd_error_t load_bridge_config(qd_dispatch_t *qd, 
qd_http_bridge_config_t
     free(version_str);
     version_str = 0;
 
+    if (aggregation_str && strcmp(aggregation_str, "json") == 0) {
+        config->aggregation = QD_AGGREGATION_JSON;
+    } else if (aggregation_str && strcmp(aggregation_str, "multipart") == 0) {
+        config->aggregation = QD_AGGREGATION_MULTIPART;
+    } else {
+        config->aggregation = QD_AGGREGATION_NONE;
+    }
+    free(aggregation_str);
+    aggregation_str = 0;
+
     int hplen = strlen(config->host) + strlen(config->port) + 2;
     config->host_port = malloc(hplen);
     snprintf(config->host_port, hplen, "%s:%s", config->host, config->port);
diff --git a/src/adaptors/http_common.h b/src/adaptors/http_common.h
index 1fa5fd5..4d2ab96 100644
--- a/src/adaptors/http_common.h
+++ b/src/adaptors/http_common.h
@@ -34,6 +34,12 @@ typedef enum {
     VERSION_HTTP2,
 } qd_http_version_t;
 
+typedef enum {
+    QD_AGGREGATION_NONE,
+    QD_AGGREGATION_JSON,
+    QD_AGGREGATION_MULTIPART
+} qd_http_aggregation_t;
+
 typedef struct qd_http_bridge_config_t {
     char              *name;
     char              *host;
@@ -42,6 +48,8 @@ typedef struct qd_http_bridge_config_t {
     char              *site;
     char              *host_port;
     qd_http_version_t  version;
+    bool                  event_channel;
+    qd_http_aggregation_t aggregation;
 } qd_http_bridge_config_t;
 
 void qd_http_free_bridge_config(qd_http_bridge_config_t *config);
diff --git a/src/python_embedded.c b/src/python_embedded.c
index 9465929..2207563 100644
--- a/src/python_embedded.c
+++ b/src/python_embedded.c
@@ -907,3 +907,72 @@ void qd_python_unlock(qd_python_lock_state_t lock_state)
     lock_held = false;
     sys_mutex_unlock(ilock);
 }
+
+void qd_json_msgs_init(PyObject **msgs)
+{
+    qd_python_lock_state_t lock_state = qd_python_lock();
+    *msgs = PyList_New(0);
+    qd_python_unlock(lock_state);
+}
+
+void qd_json_msgs_done(PyObject *msgs)
+{
+    qd_python_lock_state_t lock_state = qd_python_lock();
+    Py_DECREF(msgs);
+    qd_python_unlock(lock_state);
+}
+
+void qd_json_msgs_append(PyObject *msgs, qd_message_t *msg)
+{
+    //
+    // Parse the message through the body and exit if the message is not well 
formed.
+    //
+    if (qd_message_check_depth(msg, QD_DEPTH_BODY) != QD_MESSAGE_DEPTH_OK)
+        return;
+
+    // This is called from non-python threads so we need to acquire the GIL to 
use python APIS.
+    qd_python_lock_state_t lock_state = qd_python_lock();
+    PyObject *py_msg = PyObject_CallFunction(message_type, NULL);
+    if (!py_msg) {
+        qd_error_py();
+        qd_python_unlock(lock_state);
+        return;
+    }
+    iter_to_py_attr(qd_message_field_iterator(msg, QD_FIELD_CONTENT_TYPE), 
py_iter_copy, py_msg, "content_type");
+    iter_to_py_attr(qd_message_field_iterator(msg, 
QD_FIELD_APPLICATION_PROPERTIES), py_iter_parse, py_msg, "properties");
+    iter_to_py_attr(qd_message_field_iterator(msg, QD_FIELD_BODY), 
py_iter_parse, py_msg, "body");
+
+    PyList_Append(msgs, py_msg);
+
+    Py_DECREF(py_msg);
+    qd_error_py();
+    qd_python_unlock(lock_state);
+}
+
+char *qd_json_msgs_string(PyObject *msgs)
+{
+    qd_python_lock_state_t lock_state = qd_python_lock();
+
+    PyObject *message_module = 
PyImport_ImportModule("qpid_dispatch_internal.router.message");
+    if (!message_module) {
+        qd_python_unlock(lock_state);
+        return NULL;
+    }
+    PyObject *messages_to_json = PyObject_GetAttrString(message_module, 
"messages_to_json");
+    Py_DECREF(message_module);
+    if (!messages_to_json) {
+        qd_python_unlock(lock_state);
+        return NULL;
+    }
+
+    PyObject *py_value = PyObject_CallFunction(messages_to_json, "O", msgs);
+    Py_DECREF(messages_to_json);
+    if (!py_value) {
+        qd_python_unlock(lock_state);
+        return NULL;
+    }
+    char *c_value = py_string_2_c(py_value);
+    Py_XDECREF(py_value);
+    qd_python_unlock(lock_state);
+    return c_value;
+}
diff --git a/src/python_private.h b/src/python_private.h
index b344a0f..6cf7d28 100644
--- a/src/python_private.h
+++ b/src/python_private.h
@@ -20,6 +20,7 @@
  */
 #include <Python.h>
 #include <stdint.h>
+#include <qpid/dispatch/message.h>
 
 #if PY_MAJOR_VERSION <= 2
 // deal with the two integer types in Python2
@@ -43,4 +44,9 @@ char *py_string_2_c(PyObject *py_str);
 // buffer.
 char *py_obj_2_c_string(PyObject *py_obj);
 
+void qd_json_msgs_init(PyObject **msgs);
+void qd_json_msgs_done(PyObject *msgs);
+void qd_json_msgs_append(PyObject *msgs, qd_message_t *msg);
+char *qd_json_msgs_string(PyObject *msgs);
+
 #endif


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to