This is an automated email from the ASF dual-hosted git repository. gmurthy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new d2987b0 DISPATCH-1977: Introduced atomics raw_closed_read and raw_closed_write in http2 adaptor. Also made raw_closed_write and raw_closed_read as atomics in tcp_adaptor. This closes #1253. d2987b0 is described below commit d2987b015afa580d04c16939ed7f063c9c03c015 Author: Ganesh Murthy <gmur...@apache.org> AuthorDate: Wed Jun 9 15:39:52 2021 -0400 DISPATCH-1977: Introduced atomics raw_closed_read and raw_closed_write in http2 adaptor. Also made raw_closed_write and raw_closed_read as atomics in tcp_adaptor. This closes #1253. --- include/qpid/dispatch/atomic.h | 5 ++++ src/adaptors/http2/http2_adaptor.c | 54 ++++++++++++++++++++++++-------------- src/adaptors/http2/http2_adaptor.h | 2 ++ src/adaptors/tcp_adaptor.c | 32 +++++++++++----------- src/message.c | 22 +++++++--------- 5 files changed, 67 insertions(+), 48 deletions(-) diff --git a/include/qpid/dispatch/atomic.h b/include/qpid/dispatch/atomic.h index 86754d5..9eb09d6 100644 --- a/include/qpid/dispatch/atomic.h +++ b/include/qpid/dispatch/atomic.h @@ -205,6 +205,11 @@ static inline void sys_atomic_destroy(sys_atomic_t *ref) #endif +#define SET_ATOMIC_FLAG(flag) sys_atomic_set(flag, 1) +#define CLEAR_ATOMIC_FLAG(flag) sys_atomic_set(flag, 0) + +#define IS_ATOMIC_FLAG_SET(flag) (sys_atomic_get(flag) == 1) + /** Atomic increase: NOTE returns value *before* increase, like i++ */ static inline uint32_t sys_atomic_inc(sys_atomic_t *ref) { return sys_atomic_add((ref), 1); } diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c index f802d62..775998b 100644 --- a/src/adaptors/http2/http2_adaptor.c +++ b/src/adaptors/http2/http2_adaptor.c @@ -422,6 +422,9 @@ void free_qdr_http2_connection(qdr_http2_connection_t* http_conn, bool on_shutdo qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Freeing http2 connection in free_qdr_http2_connection", http_conn->conn_id); + sys_atomic_destroy(&http_conn->raw_closed_read); + sys_atomic_destroy(&http_conn->raw_closed_write); + free_qdr_http2_connection_t(http_conn); } @@ -1355,6 +1358,8 @@ qdr_http2_connection_t *qdr_http_connection_ingress(qd_http_listener_t* listener ingress_http_conn->config = &(listener->config); ingress_http_conn->server = listener->server; ingress_http_conn->pn_raw_conn = pn_raw_connection(); + sys_atomic_init(&ingress_http_conn->raw_closed_read, 0); + sys_atomic_init(&ingress_http_conn->raw_closed_write, 0); ingress_http_conn->session_data = new_qdr_http2_session_data_t(); ZERO(ingress_http_conn->session_data); @@ -1376,26 +1381,27 @@ qdr_http2_connection_t *qdr_http_connection_ingress(qd_http_listener_t* listener static void grant_read_buffers(qdr_http2_connection_t *conn) { + if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_read)) + return; + pn_raw_buffer_t raw_buffers[READ_BUFFERS]; if (conn->pn_raw_conn) { - if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) { - size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn); - while (desired) { - size_t i; - for (i = 0; i < desired && i < READ_BUFFERS; ++i) { - qd_http2_buffer_t *buf = qd_http2_buffer(); - DEQ_INSERT_TAIL(conn->granted_read_buffs, buf); - raw_buffers[i].bytes = (char*) qd_http2_buffer_base(buf); - raw_buffers[i].capacity = qd_http2_buffer_capacity(buf); - raw_buffers[i].size = 0; - raw_buffers[i].offset = 0; - raw_buffers[i].context = (uintptr_t) buf; - } - desired -= i; - qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Calling pn_raw_connection_give_read_buffers in grant_read_buffers", conn->conn_id); - pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i); - } - } + size_t desired = pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn); + while (desired) { + size_t i; + for (i = 0; i < desired && i < READ_BUFFERS; ++i) { + qd_http2_buffer_t *buf = qd_http2_buffer(); + DEQ_INSERT_TAIL(conn->granted_read_buffs, buf); + raw_buffers[i].bytes = (char*) qd_http2_buffer_base(buf); + raw_buffers[i].capacity = qd_http2_buffer_capacity(buf); + raw_buffers[i].size = 0; + raw_buffers[i].offset = 0; + raw_buffers[i].context = (uintptr_t) buf; + } + desired -= i; + qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Calling pn_raw_connection_give_read_buffers in grant_read_buffers", conn->conn_id); + pn_raw_connection_give_read_buffers(conn->pn_raw_conn, raw_buffers, i); + } } } @@ -1629,7 +1635,7 @@ static void qdr_http_activate(void *notused, qdr_connection_t *c) sys_mutex_lock(qd_server_get_activation_lock(http2_adaptor->core->qd->server)); qdr_http2_connection_t* conn = (qdr_http2_connection_t*) qdr_connection_get_context(c); if (conn) { - if (conn->pn_raw_conn) { + if (conn->pn_raw_conn && !(IS_ATOMIC_FLAG_SET(&conn->raw_closed_read) && IS_ATOMIC_FLAG_SET(&conn->raw_closed_write))) { qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Activation triggered, calling pn_raw_connection_wake()", conn->conn_id); pn_raw_connection_wake(conn->pn_raw_conn); } @@ -1666,6 +1672,9 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t *stream_data) qdr_http2_session_data_t *session_data = stream_data->session_data; qdr_http2_connection_t *conn = session_data->conn; + if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_write)) + return 0; + qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] Starting to handle_outgoing_http", conn->conn_id); if (stream_data->out_dlv) { qd_message_t *message = qdr_delivery_message(stream_data->out_dlv); @@ -2370,7 +2379,8 @@ qdr_http2_connection_t *qdr_http_connection_egress(qd_http_connector_t *connecto DEQ_INIT(egress_http_conn->session_data->streams); DEQ_INIT(egress_http_conn->granted_read_buffs); egress_http_conn->session_data->conn = egress_http_conn; - + sys_atomic_init(&egress_http_conn->raw_closed_read, 0); + sys_atomic_init(&egress_http_conn->raw_closed_write, 0); sys_mutex_lock(http2_adaptor->lock); DEQ_INSERT_TAIL(http2_adaptor->connections, egress_http_conn); sys_mutex_unlock(http2_adaptor->lock); @@ -2428,6 +2438,8 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void send_settings_frame(conn); qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Accepted Ingress ((PN_RAW_CONNECTION_CONNECTED)) from %s", conn->conn_id, conn->remote_address); } else { + CLEAR_ATOMIC_FLAG(&conn->raw_closed_read); + CLEAR_ATOMIC_FLAG(&conn->raw_closed_write); if (!conn->session_data->session) { nghttp2_session_client_new(&conn->session_data->session, (nghttp2_session_callbacks *)http2_adaptor->callbacks, (void *)conn); send_settings_frame(conn); @@ -2442,6 +2454,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void break; } case PN_RAW_CONNECTION_CLOSED_READ: { + SET_ATOMIC_FLAG(&conn->raw_closed_read); if (conn->pn_raw_conn) pn_raw_connection_close(conn->pn_raw_conn); qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id); @@ -2449,6 +2462,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void } case PN_RAW_CONNECTION_CLOSED_WRITE: { qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id); + SET_ATOMIC_FLAG(&conn->raw_closed_write); break; } case PN_RAW_CONNECTION_DISCONNECTED: { diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h index 26fb161..bbef44b 100644 --- a/src/adaptors/http2/http2_adaptor.h +++ b/src/adaptors/http2/http2_adaptor.h @@ -150,6 +150,8 @@ struct qdr_http2_connection_t { bool first_pinged; bool delete_egress_connections; // If set to true, the egress qdr_connection_t and qdr_http2_connection_t objects will be deleted bool goaway_received; + sys_atomic_t raw_closed_read; + sys_atomic_t raw_closed_write; DEQ_LINKS(qdr_http2_connection_t); }; diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index a69de9d..b4cf3fc 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -75,8 +75,8 @@ struct qdr_tcp_connection_t { bool egress_dispatcher; bool connector_closed;//only used if egress_dispatcher=true bool in_list; // This connection is in the adaptor's connections list - bool raw_closed_read; // proton event seen - bool raw_closed_write; // proton event seen or write_close called + sys_atomic_t raw_closed_read; // proton event seen + sys_atomic_t raw_closed_write; // proton event seen or write_close called bool raw_read_shutdown; // stream closed bool read_eos_seen; qdr_delivery_t *initial_delivery; @@ -188,7 +188,7 @@ static void on_activate(void *context) static void grant_read_buffers(qdr_tcp_connection_t *conn) { - if (conn->raw_closed_read || conn->read_pending) + if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_read) || conn->read_pending) return; conn->read_pending = true; @@ -371,10 +371,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) int count = handle_incoming_raw_read(conn, &buffers); // Grant more buffers to proton for reading if read side is still open - if (!conn->raw_closed_read) { - // normal path - keep on processing - grant_read_buffers(conn); - } + grant_read_buffers(conn); // Push the bytes just read into the streaming message if (count > 0) { @@ -395,7 +392,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn, const char *msg) } // Close the stream message if read side has closed - if (conn->raw_closed_read) { + if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_read)) { qd_log(log, QD_LOG_DEBUG, DLV_FMT" close %s instream delivery", DLV_ARGS(conn_instream), qdr_tcp_connection_role_name(conn)); @@ -427,6 +424,8 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* tc) free(tc->remote_address); free(tc->global_id); sys_atomic_destroy(&tc->q2_restart); + sys_atomic_destroy(&tc->raw_closed_read); + sys_atomic_destroy(&tc->raw_closed_write); if (tc->activate_timer) { qd_timer_free(tc->activate_timer); } @@ -596,7 +595,7 @@ static bool copy_outgoing_buffs(qdr_tcp_connection_t *conn) static void handle_outgoing(qdr_tcp_connection_t *conn) { if (conn->outstream) { - if (conn->raw_closed_write) { + if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_write)) { // flush outgoing buffers and free attached stream_data objects flush_outgoing_buffs(conn); // give no more buffers to raw connection @@ -640,7 +639,8 @@ static void handle_outgoing(qdr_tcp_connection_t *conn) "[C%"PRIu64"] handle_outgoing calling pn_raw_connection_write_close(). rcv_complete:%s, send_complete:%s", conn->conn_id, qd_message_receive_complete(msg) ? "T" : "F", qd_message_send_complete(msg) ? "T" : "F"); sys_mutex_lock(conn->activation_lock); - conn->raw_closed_write = true; + SET_ATOMIC_FLAG(&conn->raw_closed_write); + sys_mutex_unlock(conn->activation_lock); pn_raw_connection_write_close(conn->pn_raw_conn); } @@ -798,9 +798,9 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void case PN_RAW_CONNECTION_CLOSED_READ: { qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] PN_RAW_CONNECTION_CLOSED_READ %s", conn->conn_id, conn->incoming_id, qdr_tcp_connection_role_name(conn)); + SET_ATOMIC_FLAG(&conn->raw_closed_read); sys_mutex_lock(conn->activation_lock); conn->q2_blocked = false; - conn->raw_closed_read = true; sys_mutex_unlock(conn->activation_lock); handle_incoming(conn, "PNRC_CLOSED_READ"); break; @@ -809,9 +809,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE %s", conn->conn_id, qdr_tcp_connection_role_name(conn)); - sys_mutex_lock(conn->activation_lock); - conn->raw_closed_write = true; - sys_mutex_unlock(conn->activation_lock); + SET_ATOMIC_FLAG(&conn->raw_closed_write); break; } case PN_RAW_CONNECTION_DISCONNECTED: { @@ -931,6 +929,8 @@ static qdr_tcp_connection_t *qdr_tcp_connection_ingress(qd_tcp_listener_t* liste tc->config = listener->config; tc->server = listener->server; sys_atomic_init(&tc->q2_restart, 0); + sys_atomic_init(&tc->raw_closed_read, 0); + sys_atomic_init(&tc->raw_closed_write, 0); tc->pn_raw_conn = pn_raw_connection(); pn_raw_connection_set_context(tc->pn_raw_conn, tc); //the following call will cause a PN_RAW_CONNECTION_CONNECTED @@ -1040,6 +1040,8 @@ static qdr_tcp_connection_t *qdr_tcp_connection_egress(qd_bridge_config_t *confi tc->config = *config; tc->server = server; sys_atomic_init(&tc->q2_restart, 0); + sys_atomic_init(&tc->raw_closed_read, 0); + sys_atomic_init(&tc->raw_closed_write, 0); tc->conn_id = qd_server_allocate_connection_id(tc->server); // @@ -1602,7 +1604,7 @@ static void qdr_tcp_activate(void *notused, qdr_connection_t *c) if (context) { qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context; sys_mutex_lock(conn->activation_lock); - if (conn->pn_raw_conn && !(conn->raw_closed_read && conn->raw_closed_write)) { + if (conn->pn_raw_conn && !(IS_ATOMIC_FLAG_SET(&conn->raw_closed_read) && IS_ATOMIC_FLAG_SET(&conn->raw_closed_write))) { qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] qdr_tcp_activate: call pn_raw_connection_wake()", conn->conn_id); pn_raw_connection_wake(conn->pn_raw_conn); diff --git a/src/message.c b/src/message.c index edf1355..d90cb3c 100644 --- a/src/message.c +++ b/src/message.c @@ -45,10 +45,6 @@ #define LOCK sys_mutex_lock #define UNLOCK sys_mutex_unlock -// Implement bool flags with atomic variables -#define SET_FLAG(flag) sys_atomic_set(flag, 1) -#define IS_FLAG_SET(flag) (sys_atomic_get(flag) == 1) - const char *STR_AMQP_NULL = "null"; const char *STR_AMQP_TRUE = "T"; const char *STR_AMQP_FALSE = "F"; @@ -1406,14 +1402,14 @@ qd_message_t *discard_receive(pn_delivery_t *delivery, } else if (rc == PN_EOS || rc < 0) { // End of message or error: finalize message_receive handling if (pn_delivery_aborted(delivery)) { - SET_FLAG(&msg->content->aborted); + SET_ATOMIC_FLAG(&msg->content->aborted); } pn_record_t *record = pn_delivery_attachments(delivery); pn_record_set(record, PN_DELIVERY_CTX, 0); if (msg->content->oversize) { // Aborting the content disposes of downstream copies. // This has no effect on the received message. - SET_FLAG(&msg->content->aborted); + SET_ATOMIC_FLAG(&msg->content->aborted); } qd_message_set_receive_complete((qd_message_t*) msg); break; @@ -1536,7 +1532,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery) content->q2_unblocker.handler = 0; qd_nullify_safe_ptr(&content->q2_unblocker.context); if (pn_delivery_aborted(delivery)) { - SET_FLAG(&msg->content->aborted); + SET_ATOMIC_FLAG(&msg->content->aborted); } // unlink message and delivery pn_record_set(record, PN_DELIVERY_CTX, 0); @@ -1781,7 +1777,7 @@ void qd_message_send(qd_message_t *in_msg, if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) { - if (IS_FLAG_SET(&content->aborted)) { + if (IS_ATOMIC_FLAG_SET(&content->aborted)) { // Message is aborted before any part of it has been sent. // Declare the message to be sent, msg->send_complete = true; @@ -1888,7 +1884,7 @@ void qd_message_send(qd_message_t *in_msg, pn_session_t *pns = pn_link_session(pnl); const size_t q3_upper = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER; - while (!IS_FLAG_SET(&content->aborted) + while (!IS_ATOMIC_FLAG_SET(&content->aborted) && buf && pn_session_outgoing_bytes(pns) < q3_upper) { @@ -1910,7 +1906,7 @@ void qd_message_send(qd_message_t *in_msg, // send error - likely the link has failed and we will eventually // get a link detach event for this link // - SET_FLAG(&content->aborted); + SET_ATOMIC_FLAG(&content->aborted); msg->send_complete = true; if (!pn_delivery_aborted(pn_link_current(pnl))) { pn_delivery_abort(pn_link_current(pnl)); @@ -1987,7 +1983,7 @@ void qd_message_send(qd_message_t *in_msg, if (q2_unblock.handler) q2_unblock.handler(q2_unblock.context); - if (IS_FLAG_SET(&content->aborted)) { + if (IS_ATOMIC_FLAG_SET(&content->aborted)) { if (pn_link_current(pnl)) { msg->send_complete = true; if (!pn_delivery_aborted(pn_link_current(pnl))) { @@ -2922,7 +2918,7 @@ bool qd_message_aborted(const qd_message_t *msg) { assert(msg); qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg; - return IS_FLAG_SET(&msg_pvt->content->aborted); + return IS_ATOMIC_FLAG_SET(&msg_pvt->content->aborted); } void qd_message_set_aborted(const qd_message_t *msg) @@ -2930,7 +2926,7 @@ void qd_message_set_aborted(const qd_message_t *msg) if (!msg) return; qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg; - SET_FLAG(&msg_pvt->content->aborted); + SET_ATOMIC_FLAG(&msg_pvt->content->aborted); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org