kgiusti commented on a change in pull request #961: URL: https://github.com/apache/qpid-dispatch/pull/961#discussion_r544606515
########## File path: src/adaptors/http1/http1_codec.c ########## @@ -181,6 +184,8 @@ struct h1_codec_connection_t { bool is_request; bool is_chunked; + char *boundary_marker;//used for multipart content Review comment: Should the boundary_marker be freed+reset in the ecoder_reset() function? ########## File path: src/python_embedded.c ########## @@ -907,3 +907,72 @@ void qd_python_unlock(qd_python_lock_state_t lock_state) lock_held = false; sys_mutex_unlock(ilock); } + +void qd_json_msgs_init(PyObject **msgs) +{ + qd_python_lock_state_t lock_state = qd_python_lock(); + *msgs = PyList_New(0); + qd_python_unlock(lock_state); +} + +void qd_json_msgs_done(PyObject *msgs) +{ + qd_python_lock_state_t lock_state = qd_python_lock(); + Py_DECREF(msgs); + qd_python_unlock(lock_state); +} + +void qd_json_msgs_append(PyObject *msgs, qd_message_t *msg) +{ + // + // Parse the message through the body and exit if the message is not well formed. + // + if (qd_message_check_depth(msg, QD_DEPTH_BODY) != QD_MESSAGE_DEPTH_OK) + return; + + // This is called from non-python threads so we need to acquire the GIL to use python APIS. + qd_python_lock_state_t lock_state = qd_python_lock(); + PyObject *py_msg = PyObject_CallFunction(message_type, NULL); + if (!py_msg) { + qd_error_py(); + qd_python_unlock(lock_state); + return; + } + iter_to_py_attr(qd_message_field_iterator(msg, QD_FIELD_CONTENT_TYPE), py_iter_copy, py_msg, "content_type"); + iter_to_py_attr(qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES), py_iter_parse, py_msg, "properties"); + iter_to_py_attr(qd_message_field_iterator(msg, QD_FIELD_BODY), py_iter_parse, py_msg, "body"); + + PyList_Append(msgs, py_msg); + + Py_DECREF(py_msg); + qd_error_py(); + qd_python_unlock(lock_state); +} + +char *qd_json_msgs_string(PyObject *msgs) +{ + qd_python_lock_state_t lock_state = qd_python_lock(); + + PyObject *message_module = PyImport_ImportModule("qpid_dispatch_internal.router.message"); + if (!message_module) { + Py_DECREF(message_module); + qd_python_unlock(lock_state); + return NULL; + } + PyObject *messages_to_json = PyObject_GetAttrString(message_module, "messages_to_json"); Review comment: Py_DECREF messages_to_json needed IIUC ########## File path: src/adaptors/http1/http1_server.c ########## @@ -70,6 +70,7 @@ typedef struct _server_request_t { bool request_settled; // set by adaptor bool request_acked; // true if dispo sent to core bool headers_encoded; // True when header encode done + bool response_settled; Review comment: Sorry - late night but for the life of me I don't see where this flag is set! I'd assume it would be set in qdr_http1_server_core_delivery_update when the outcome for the response arrives... ########## File path: src/adaptors/http1/http1_client.c ########## @@ -1238,6 +1505,24 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor, _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses); assert(rmsg && rmsg->dlv == delivery); + // when aggregating responses, they are saved on the list until + // the request has been settled, then encoded in the configured + // aggregation format + if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) { + qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Received response (%i responses received), settling", hconn->conn_id, link->identity, DEQ_SIZE(hreq->responses)); + rmsg->dispo = PN_ACCEPTED; + qd_message_set_send_complete(msg); Review comment: Should this be predicated on qd_message_receive_complete(msg)? I assume responses may be fragmented and we want to avoid updating the outcome until it is done arriving. ########## File path: src/adaptors/http1/http1_client.c ########## @@ -1238,6 +1505,24 @@ uint64_t qdr_http1_client_core_link_deliver(qdr_http1_adaptor_t *adaptor, _client_response_msg_t *rmsg = DEQ_TAIL(hreq->responses); assert(rmsg && rmsg->dlv == delivery); + // when aggregating responses, they are saved on the list until + // the request has been settled, then encoded in the configured + // aggregation format + if (hconn->cfg.aggregation != QD_AGGREGATION_NONE) { + qd_log(qdr_http1_adaptor->log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] Received response (%i responses received), settling", hconn->conn_id, link->identity, DEQ_SIZE(hreq->responses)); + rmsg->dispo = PN_ACCEPTED; + qd_message_set_send_complete(msg); + qdr_link_flow(qdr_http1_adaptor->core, link, 1, false); + qdr_delivery_remote_state_updated(qdr_http1_adaptor->core, + rmsg->dlv, + rmsg->dispo, + true, // settled, + 0, // error + 0, // dispo data + false); + return PN_ACCEPTED; Review comment: Since we are holding the response message buffers, I believe it may be necessary to disable Q2 on the arriving response messages by calling qd_message_Q2_holdoff_disable(). and possibly "restart" the receiving thread via qd_link_restart_rx(qd_message_get_receiving_link(msg)). See https://github.com/apache/qpid-dispatch/blob/master/src/message.c#L2578 - something like that. This could be tested by having the multicast responses contain > 512K bodies. Oh what a tangled web we weave... ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org