This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new d2987b0  DISPATCH-1977: Introduced atomics raw_closed_read and 
raw_closed_write in http2 adaptor. Also made raw_closed_write and 
raw_closed_read as atomics in tcp_adaptor. This closes #1253.
d2987b0 is described below

commit d2987b015afa580d04c16939ed7f063c9c03c015
Author: Ganesh Murthy <gmur...@apache.org>
AuthorDate: Wed Jun 9 15:39:52 2021 -0400

    DISPATCH-1977: Introduced atomics raw_closed_read and raw_closed_write in 
http2 adaptor. Also made raw_closed_write and raw_closed_read as atomics in 
tcp_adaptor. This closes #1253.
---
 include/qpid/dispatch/atomic.h     |  5 ++++
 src/adaptors/http2/http2_adaptor.c | 54 ++++++++++++++++++++++++--------------
 src/adaptors/http2/http2_adaptor.h |  2 ++
 src/adaptors/tcp_adaptor.c         | 32 +++++++++++-----------
 src/message.c                      | 22 +++++++---------
 5 files changed, 67 insertions(+), 48 deletions(-)

diff --git a/include/qpid/dispatch/atomic.h b/include/qpid/dispatch/atomic.h
index 86754d5..9eb09d6 100644
--- a/include/qpid/dispatch/atomic.h
+++ b/include/qpid/dispatch/atomic.h
@@ -205,6 +205,11 @@ static inline void sys_atomic_destroy(sys_atomic_t *ref)
 
 #endif
 
+#define    SET_ATOMIC_FLAG(flag)   sys_atomic_set(flag, 1)
+#define    CLEAR_ATOMIC_FLAG(flag) sys_atomic_set(flag, 0)
+
+#define IS_ATOMIC_FLAG_SET(flag) (sys_atomic_get(flag) == 1)
+
 /** Atomic increase: NOTE returns value *before* increase, like i++ */
 static inline uint32_t sys_atomic_inc(sys_atomic_t *ref) { return 
sys_atomic_add((ref), 1); }
 
diff --git a/src/adaptors/http2/http2_adaptor.c 
b/src/adaptors/http2/http2_adaptor.c
index f802d62..775998b 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -422,6 +422,9 @@ void free_qdr_http2_connection(qdr_http2_connection_t* 
http_conn, bool on_shutdo
 
     qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] Freeing 
http2 connection in free_qdr_http2_connection", http_conn->conn_id);
 
+    sys_atomic_destroy(&http_conn->raw_closed_read);
+    sys_atomic_destroy(&http_conn->raw_closed_write);
+
     free_qdr_http2_connection_t(http_conn);
 }
 
@@ -1355,6 +1358,8 @@ qdr_http2_connection_t 
*qdr_http_connection_ingress(qd_http_listener_t* listener
     ingress_http_conn->config = &(listener->config);
     ingress_http_conn->server = listener->server;
     ingress_http_conn->pn_raw_conn = pn_raw_connection();
+    sys_atomic_init(&ingress_http_conn->raw_closed_read, 0);
+    sys_atomic_init(&ingress_http_conn->raw_closed_write, 0);
 
     ingress_http_conn->session_data = new_qdr_http2_session_data_t();
     ZERO(ingress_http_conn->session_data);
@@ -1376,26 +1381,27 @@ qdr_http2_connection_t 
*qdr_http_connection_ingress(qd_http_listener_t* listener
 
 static void grant_read_buffers(qdr_http2_connection_t *conn)
 {
+       if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_read))
+               return;
+
     pn_raw_buffer_t raw_buffers[READ_BUFFERS];
     if (conn->pn_raw_conn) {
-        if (!pn_raw_connection_is_read_closed(conn->pn_raw_conn)) {
-            size_t desired = 
pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
-            while (desired) {
-                size_t i;
-                for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
-                    qd_http2_buffer_t *buf = qd_http2_buffer();
-                    DEQ_INSERT_TAIL(conn->granted_read_buffs, buf);
-                    raw_buffers[i].bytes = (char*) qd_http2_buffer_base(buf);
-                    raw_buffers[i].capacity = qd_http2_buffer_capacity(buf);
-                    raw_buffers[i].size = 0;
-                    raw_buffers[i].offset = 0;
-                    raw_buffers[i].context = (uintptr_t) buf;
-                }
-                desired -= i;
-                qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] 
Calling pn_raw_connection_give_read_buffers in grant_read_buffers", 
conn->conn_id);
-                pn_raw_connection_give_read_buffers(conn->pn_raw_conn, 
raw_buffers, i);
-            }
-        }
+               size_t desired = 
pn_raw_connection_read_buffers_capacity(conn->pn_raw_conn);
+               while (desired) {
+                       size_t i;
+                       for (i = 0; i < desired && i < READ_BUFFERS; ++i) {
+                               qd_http2_buffer_t *buf = qd_http2_buffer();
+                               DEQ_INSERT_TAIL(conn->granted_read_buffs, buf);
+                               raw_buffers[i].bytes = (char*) 
qd_http2_buffer_base(buf);
+                               raw_buffers[i].capacity = 
qd_http2_buffer_capacity(buf);
+                               raw_buffers[i].size = 0;
+                               raw_buffers[i].offset = 0;
+                               raw_buffers[i].context = (uintptr_t) buf;
+                       }
+                       desired -= i;
+                       qd_log(http2_adaptor->log_source, QD_LOG_TRACE, 
"[C%"PRIu64"] Calling pn_raw_connection_give_read_buffers in 
grant_read_buffers", conn->conn_id);
+                       pn_raw_connection_give_read_buffers(conn->pn_raw_conn, 
raw_buffers, i);
+               }
     }
 }
 
@@ -1629,7 +1635,7 @@ static void qdr_http_activate(void *notused, 
qdr_connection_t *c)
     
sys_mutex_lock(qd_server_get_activation_lock(http2_adaptor->core->qd->server));
     qdr_http2_connection_t* conn = (qdr_http2_connection_t*) 
qdr_connection_get_context(c);
     if (conn) {
-        if (conn->pn_raw_conn) {
+        if (conn->pn_raw_conn && !(IS_ATOMIC_FLAG_SET(&conn->raw_closed_read) 
&& IS_ATOMIC_FLAG_SET(&conn->raw_closed_write))) {
             qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] 
Activation triggered, calling pn_raw_connection_wake()", conn->conn_id);
             pn_raw_connection_wake(conn->pn_raw_conn);
         }
@@ -1666,6 +1672,9 @@ uint64_t handle_outgoing_http(qdr_http2_stream_data_t 
*stream_data)
     qdr_http2_session_data_t *session_data = stream_data->session_data;
     qdr_http2_connection_t *conn = session_data->conn;
 
+       if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_write))
+               return 0;
+
     qd_log(http2_adaptor->protocol_log_source, QD_LOG_TRACE, "[C%"PRIu64"] 
Starting to handle_outgoing_http", conn->conn_id);
     if (stream_data->out_dlv) {
         qd_message_t *message = qdr_delivery_message(stream_data->out_dlv);
@@ -2370,7 +2379,8 @@ qdr_http2_connection_t 
*qdr_http_connection_egress(qd_http_connector_t *connecto
     DEQ_INIT(egress_http_conn->session_data->streams);
     DEQ_INIT(egress_http_conn->granted_read_buffs);
     egress_http_conn->session_data->conn = egress_http_conn;
-
+    sys_atomic_init(&egress_http_conn->raw_closed_read, 0);
+    sys_atomic_init(&egress_http_conn->raw_closed_write, 0);
     sys_mutex_lock(http2_adaptor->lock);
     DEQ_INSERT_TAIL(http2_adaptor->connections, egress_http_conn);
     sys_mutex_unlock(http2_adaptor->lock);
@@ -2428,6 +2438,8 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
             send_settings_frame(conn);
             qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] Accepted Ingress 
((PN_RAW_CONNECTION_CONNECTED)) from %s", conn->conn_id, conn->remote_address);
         } else {
+               CLEAR_ATOMIC_FLAG(&conn->raw_closed_read);
+               CLEAR_ATOMIC_FLAG(&conn->raw_closed_write);
             if (!conn->session_data->session) {
                 nghttp2_session_client_new(&conn->session_data->session, 
(nghttp2_session_callbacks *)http2_adaptor->callbacks, (void *)conn);
                 send_settings_frame(conn);
@@ -2442,6 +2454,7 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
         break;
     }
     case PN_RAW_CONNECTION_CLOSED_READ: {
+       SET_ATOMIC_FLAG(&conn->raw_closed_read);
         if (conn->pn_raw_conn)
             pn_raw_connection_close(conn->pn_raw_conn);
         qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] 
PN_RAW_CONNECTION_CLOSED_READ", conn->conn_id);
@@ -2449,6 +2462,7 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
     }
     case PN_RAW_CONNECTION_CLOSED_WRITE: {
         qd_log(log, QD_LOG_TRACE, "[C%"PRIu64"] 
PN_RAW_CONNECTION_CLOSED_WRITE", conn->conn_id);
+        SET_ATOMIC_FLAG(&conn->raw_closed_write);
         break;
     }
     case PN_RAW_CONNECTION_DISCONNECTED: {
diff --git a/src/adaptors/http2/http2_adaptor.h 
b/src/adaptors/http2/http2_adaptor.h
index 26fb161..bbef44b 100644
--- a/src/adaptors/http2/http2_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -150,6 +150,8 @@ struct qdr_http2_connection_t {
     bool                      first_pinged;
     bool                      delete_egress_connections;  // If set to true, 
the egress qdr_connection_t and qdr_http2_connection_t objects will be deleted
     bool                      goaway_received;
+    sys_atomic_t                     raw_closed_read;
+    sys_atomic_t                         raw_closed_write;
 
     DEQ_LINKS(qdr_http2_connection_t);
  };
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index a69de9d..b4cf3fc 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -75,8 +75,8 @@ struct qdr_tcp_connection_t {
     bool                  egress_dispatcher;
     bool                  connector_closed;//only used if 
egress_dispatcher=true
     bool                  in_list;         // This connection is in the 
adaptor's connections list
-    bool                  raw_closed_read;   // proton event seen
-    bool                  raw_closed_write;  // proton event seen or 
write_close called
+    sys_atomic_t             raw_closed_read;   // proton event seen
+    sys_atomic_t             raw_closed_write;  // proton event seen or 
write_close called
     bool                  raw_read_shutdown; // stream closed
     bool                  read_eos_seen;
     qdr_delivery_t       *initial_delivery;
@@ -188,7 +188,7 @@ static void on_activate(void *context)
 
 static void grant_read_buffers(qdr_tcp_connection_t *conn)
 {
-    if (conn->raw_closed_read || conn->read_pending)
+    if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_read) || conn->read_pending)
         return;
 
     conn->read_pending = true;
@@ -371,10 +371,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn, 
const char *msg)
     int count = handle_incoming_raw_read(conn, &buffers);
 
     // Grant more buffers to proton for reading if read side is still open
-    if (!conn->raw_closed_read) {
-        // normal path - keep on processing
-        grant_read_buffers(conn);
-    }
+       grant_read_buffers(conn);
 
     // Push the bytes just read into the streaming message
     if (count > 0) {
@@ -395,7 +392,7 @@ static int handle_incoming(qdr_tcp_connection_t *conn, 
const char *msg)
     }
 
     // Close the stream message if read side has closed
-    if (conn->raw_closed_read) {
+    if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_read)) {
         qd_log(log, QD_LOG_DEBUG,
             DLV_FMT" close %s instream delivery",
             DLV_ARGS(conn_instream), qdr_tcp_connection_role_name(conn));
@@ -427,6 +424,8 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t* 
tc)
     free(tc->remote_address);
     free(tc->global_id);
     sys_atomic_destroy(&tc->q2_restart);
+    sys_atomic_destroy(&tc->raw_closed_read);
+    sys_atomic_destroy(&tc->raw_closed_write);
     if (tc->activate_timer) {
         qd_timer_free(tc->activate_timer);
     }
@@ -596,7 +595,7 @@ static bool copy_outgoing_buffs(qdr_tcp_connection_t *conn)
 static void handle_outgoing(qdr_tcp_connection_t *conn)
 {
     if (conn->outstream) {
-        if (conn->raw_closed_write) {
+        if (IS_ATOMIC_FLAG_SET(&conn->raw_closed_write)) {
             // flush outgoing buffers and free attached stream_data objects
             flush_outgoing_buffs(conn);
             // give no more buffers to raw connection
@@ -640,7 +639,8 @@ static void handle_outgoing(qdr_tcp_connection_t *conn)
                    "[C%"PRIu64"] handle_outgoing calling 
pn_raw_connection_write_close(). rcv_complete:%s, send_complete:%s",
                     conn->conn_id, qd_message_receive_complete(msg) ? "T" : 
"F", qd_message_send_complete(msg) ? "T" : "F");
             sys_mutex_lock(conn->activation_lock);
-            conn->raw_closed_write = true;
+            SET_ATOMIC_FLAG(&conn->raw_closed_write);
+
             sys_mutex_unlock(conn->activation_lock);
             pn_raw_connection_write_close(conn->pn_raw_conn);
         }
@@ -798,9 +798,9 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
     case PN_RAW_CONNECTION_CLOSED_READ: {
         qd_log(log, QD_LOG_DEBUG, "[C%"PRIu64"][L%"PRIu64"] 
PN_RAW_CONNECTION_CLOSED_READ %s",
                conn->conn_id, conn->incoming_id, 
qdr_tcp_connection_role_name(conn));
+        SET_ATOMIC_FLAG(&conn->raw_closed_read);
         sys_mutex_lock(conn->activation_lock);
         conn->q2_blocked = false;
-        conn->raw_closed_read = true;
         sys_mutex_unlock(conn->activation_lock);
         handle_incoming(conn, "PNRC_CLOSED_READ");
         break;
@@ -809,9 +809,7 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
         qd_log(log, QD_LOG_DEBUG,
                "[C%"PRIu64"] PN_RAW_CONNECTION_CLOSED_WRITE %s",
                conn->conn_id, qdr_tcp_connection_role_name(conn));
-        sys_mutex_lock(conn->activation_lock);
-        conn->raw_closed_write = true;
-        sys_mutex_unlock(conn->activation_lock);
+        SET_ATOMIC_FLAG(&conn->raw_closed_write);
         break;
     }
     case PN_RAW_CONNECTION_DISCONNECTED: {
@@ -931,6 +929,8 @@ static qdr_tcp_connection_t 
*qdr_tcp_connection_ingress(qd_tcp_listener_t* liste
     tc->config = listener->config;
     tc->server = listener->server;
     sys_atomic_init(&tc->q2_restart, 0);
+    sys_atomic_init(&tc->raw_closed_read, 0);
+    sys_atomic_init(&tc->raw_closed_write, 0);
     tc->pn_raw_conn = pn_raw_connection();
     pn_raw_connection_set_context(tc->pn_raw_conn, tc);
     //the following call will cause a PN_RAW_CONNECTION_CONNECTED
@@ -1040,6 +1040,8 @@ static qdr_tcp_connection_t 
*qdr_tcp_connection_egress(qd_bridge_config_t *confi
     tc->config = *config;
     tc->server = server;
     sys_atomic_init(&tc->q2_restart, 0);
+    sys_atomic_init(&tc->raw_closed_read, 0);
+    sys_atomic_init(&tc->raw_closed_write, 0);
     tc->conn_id = qd_server_allocate_connection_id(tc->server);
 
     //
@@ -1602,7 +1604,7 @@ static void qdr_tcp_activate(void *notused, 
qdr_connection_t *c)
     if (context) {
         qdr_tcp_connection_t* conn = (qdr_tcp_connection_t*) context;
         sys_mutex_lock(conn->activation_lock);
-        if (conn->pn_raw_conn && !(conn->raw_closed_read && 
conn->raw_closed_write)) {
+        if (conn->pn_raw_conn && !(IS_ATOMIC_FLAG_SET(&conn->raw_closed_read) 
&& IS_ATOMIC_FLAG_SET(&conn->raw_closed_write))) {
             qd_log(tcp_adaptor->log_source, QD_LOG_DEBUG,
                    "[C%"PRIu64"] qdr_tcp_activate: call 
pn_raw_connection_wake()", conn->conn_id);
             pn_raw_connection_wake(conn->pn_raw_conn);
diff --git a/src/message.c b/src/message.c
index edf1355..d90cb3c 100644
--- a/src/message.c
+++ b/src/message.c
@@ -45,10 +45,6 @@
 #define LOCK   sys_mutex_lock
 #define UNLOCK sys_mutex_unlock
 
-// Implement bool flags with atomic variables
-#define    SET_FLAG(flag)  sys_atomic_set(flag, 1)
-#define IS_FLAG_SET(flag) (sys_atomic_get(flag) == 1)
-
 const char *STR_AMQP_NULL = "null";
 const char *STR_AMQP_TRUE = "T";
 const char *STR_AMQP_FALSE = "F";
@@ -1406,14 +1402,14 @@ qd_message_t *discard_receive(pn_delivery_t *delivery,
         } else if (rc == PN_EOS || rc < 0) {
             // End of message or error: finalize message_receive handling
             if (pn_delivery_aborted(delivery)) {
-                SET_FLAG(&msg->content->aborted);
+                SET_ATOMIC_FLAG(&msg->content->aborted);
             }
             pn_record_t *record = pn_delivery_attachments(delivery);
             pn_record_set(record, PN_DELIVERY_CTX, 0);
             if (msg->content->oversize) {
                 // Aborting the content disposes of downstream copies.
                 // This has no effect on the received message.
-                SET_FLAG(&msg->content->aborted);
+                SET_ATOMIC_FLAG(&msg->content->aborted);
             }
             qd_message_set_receive_complete((qd_message_t*) msg);
             break;
@@ -1536,7 +1532,7 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
                 content->q2_unblocker.handler = 0;
                 qd_nullify_safe_ptr(&content->q2_unblocker.context);
                 if (pn_delivery_aborted(delivery)) {
-                    SET_FLAG(&msg->content->aborted);
+                    SET_ATOMIC_FLAG(&msg->content->aborted);
                 }
                 // unlink message and delivery
                 pn_record_set(record, PN_DELIVERY_CTX, 0);
@@ -1781,7 +1777,7 @@ void qd_message_send(qd_message_t *in_msg,
 
     if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
 
-        if (IS_FLAG_SET(&content->aborted)) {
+        if (IS_ATOMIC_FLAG_SET(&content->aborted)) {
             // Message is aborted before any part of it has been sent.
             // Declare the message to be sent,
             msg->send_complete = true;
@@ -1888,7 +1884,7 @@ void qd_message_send(qd_message_t *in_msg,
     pn_session_t              *pns        = pn_link_session(pnl);
     const size_t               q3_upper   = BUFFER_SIZE * QD_QLIMIT_Q3_UPPER;
 
-    while (!IS_FLAG_SET(&content->aborted)
+    while (!IS_ATOMIC_FLAG_SET(&content->aborted)
            && buf
            && pn_session_outgoing_bytes(pns) < q3_upper) {
 
@@ -1910,7 +1906,7 @@ void qd_message_send(qd_message_t *in_msg,
             // send error - likely the link has failed and we will eventually
             // get a link detach event for this link
             //
-            SET_FLAG(&content->aborted);
+            SET_ATOMIC_FLAG(&content->aborted);
             msg->send_complete = true;
             if (!pn_delivery_aborted(pn_link_current(pnl))) {
                 pn_delivery_abort(pn_link_current(pnl));
@@ -1987,7 +1983,7 @@ void qd_message_send(qd_message_t *in_msg,
     if (q2_unblock.handler)
         q2_unblock.handler(q2_unblock.context);
 
-    if (IS_FLAG_SET(&content->aborted)) {
+    if (IS_ATOMIC_FLAG_SET(&content->aborted)) {
         if (pn_link_current(pnl)) {
             msg->send_complete = true;
             if (!pn_delivery_aborted(pn_link_current(pnl))) {
@@ -2922,7 +2918,7 @@ bool qd_message_aborted(const qd_message_t *msg)
 {
     assert(msg);
     qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg;
-    return IS_FLAG_SET(&msg_pvt->content->aborted);
+    return IS_ATOMIC_FLAG_SET(&msg_pvt->content->aborted);
 }
 
 void qd_message_set_aborted(const qd_message_t *msg)
@@ -2930,7 +2926,7 @@ void qd_message_set_aborted(const qd_message_t *msg)
     if (!msg)
         return;
     qd_message_pvt_t * msg_pvt = (qd_message_pvt_t *)msg;
-    SET_FLAG(&msg_pvt->content->aborted);
+    SET_ATOMIC_FLAG(&msg_pvt->content->aborted);
 }
 
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to