This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new f9a1639  DISPATCH-2132: atomic access to core uptime counter
f9a1639 is described below

commit f9a1639edef75d15a6ef5d8e4b466fda900a6a8a
Author: Kenneth Giusti <kgiu...@apache.org>
AuthorDate: Thu Jan 27 15:39:39 2022 -0500

    DISPATCH-2132: atomic access to core uptime counter
    
    This closes #1501
---
 src/adaptors/tcp_adaptor.c                                 | 14 +++++++-------
 src/router_core/agent_connection.c                         |  4 ++--
 src/router_core/agent_link.c                               |  6 +++---
 src/router_core/agent_router.c                             |  2 +-
 src/router_core/connections.c                              | 12 ++++++------
 src/router_core/core_link_endpoint.c                       |  2 +-
 src/router_core/core_timer.c                               |  2 +-
 src/router_core/delivery.c                                 |  6 +++---
 src/router_core/forwarder.c                                |  8 ++++----
 .../modules/heartbeat_server/heartbeat_server.c            |  6 +++---
 .../modules/stuck_delivery_detection/delivery_tracker.c    |  6 +++---
 src/router_core/router_core.c                              |  1 +
 src/router_core/router_core_private.h                      | 10 +++++++++-
 src/router_core/transfer.c                                 |  4 ++--
 tests/tsan.supp                                            |  3 ---
 15 files changed, 46 insertions(+), 40 deletions(-)

diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index e97b29b..4bf6554 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -269,7 +269,7 @@ static int handle_incoming_raw_read(qdr_tcp_connection_t 
*conn, qd_buffer_list_t
     if (result > 0) {
         // account for any incoming bytes just read
 
-        conn->last_in_time = tcp_adaptor->core->uptime_ticks;
+        conn->last_in_time = qdr_core_uptime_ticks(tcp_adaptor->core);
         conn->bytes_in      += result;
         LOCK(conn->bridge->stats_lock);
         conn->bridge->bytes_in += result;
@@ -772,7 +772,7 @@ static void 
qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc)
                                          false,
                                          NULL,
                                          &(tc->incoming_id));
-    tc->opened_time = tcp_adaptor->core->uptime_ticks;
+    tc->opened_time = qdr_core_uptime_ticks(tcp_adaptor->core);
     qdr_link_set_context(tc->incoming, tc);
 
     qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, 
"add_tcp_connection");
@@ -794,7 +794,7 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
             break;
         } else {
             conn->remote_address = get_address_string(conn->pn_raw_conn);
-            conn->opened_time = tcp_adaptor->core->uptime_ticks;
+            conn->opened_time = qdr_core_uptime_ticks(tcp_adaptor->core);
             qd_log(log, QD_LOG_INFO,
                    "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Connector egress 
connected to %s",
                    conn->conn_id, conn->remote_address);
@@ -896,7 +896,7 @@ static void handle_connection_event(pn_event_t *e, 
qd_server_t *qd_server, void
             conn->write_buffer.size = 0;
             conn->write_buffer.offset = 0;
             conn->write_buffer.context = 0;
-            conn->last_out_time = tcp_adaptor->core->uptime_ticks;
+            conn->last_out_time = qdr_core_uptime_ticks(tcp_adaptor->core);
             conn->bytes_out += written;
             LOCK(conn->bridge->stats_lock);
             conn->bridge->bytes_out += written;
@@ -1868,21 +1868,21 @@ static void insert_column(qdr_core_t *core, 
qdr_tcp_connection_t *conn, int col,
         break;
 
     case QDR_TCP_CONNECTION_UPTIME_SECONDS:
-        qd_compose_insert_uint(body, core->uptime_ticks - conn->opened_time);
+        qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) - 
conn->opened_time);
         break;
 
     case QDR_TCP_CONNECTION_LAST_IN_SECONDS:
         if (conn->last_in_time==0)
             qd_compose_insert_null(body);
         else
-            qd_compose_insert_uint(body, core->uptime_ticks - 
conn->last_in_time);
+            qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) - 
conn->last_in_time);
         break;
 
     case QDR_TCP_CONNECTION_LAST_OUT_SECONDS:
         if (conn->last_out_time==0)
             qd_compose_insert_null(body);
         else
-            qd_compose_insert_uint(body, core->uptime_ticks - 
conn->last_out_time);
+            qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) - 
conn->last_out_time);
         break;
 
     }
diff --git a/src/router_core/agent_connection.c 
b/src/router_core/agent_connection.c
index 713603f..9943d5d 100644
--- a/src/router_core/agent_connection.c
+++ b/src/router_core/agent_connection.c
@@ -260,14 +260,14 @@ static void qdr_connection_insert_column_CT(qdr_core_t 
*core, qdr_connection_t *
         break;
 
     case QDR_CONNECTION_UPTIME_SECONDS:
-        qd_compose_insert_uint(body, core->uptime_ticks - conn->conn_uptime);
+        qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) - 
conn->conn_uptime);
         break;
 
     case QDR_CONNECTION_LAST_DLV_SECONDS:
         if (conn->last_delivery_time==0)
             qd_compose_insert_null(body);
         else
-            qd_compose_insert_uint(body, core->uptime_ticks - 
conn->last_delivery_time);
+            qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) - 
conn->last_delivery_time);
         break;
 
     case QDR_CONNECTION_PROPERTIES: {
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index 01296a7..70c7023 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -256,7 +256,7 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, 
qd_composed_field_t *bod
         break;
 
     case QDR_LINK_SETTLE_RATE: {
-        uint32_t delta_time = core->uptime_ticks - link->core_ticks;
+        uint32_t delta_time = qdr_core_uptime_ticks(core) - link->core_ticks;
         if (delta_time > 0) {
             if (delta_time > QDR_LINK_RATE_DEPTH)
                 delta_time = QDR_LINK_RATE_DEPTH;
@@ -264,7 +264,7 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, 
qd_composed_field_t *bod
                 link->rate_cursor = (link->rate_cursor + 1) % 
QDR_LINK_RATE_DEPTH;
                 link->settled_deliveries[link->rate_cursor] = 0;
             }
-            link->core_ticks = core->uptime_ticks;
+            link->core_ticks = qdr_core_uptime_ticks(core);
         }
 
         uint64_t total = 0;
@@ -282,7 +282,7 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, 
qd_composed_field_t *bod
         if (link->zero_credit_time == 0)
             qd_compose_insert_uint(body, 0);
         else
-            qd_compose_insert_uint(body, core->uptime_ticks - 
link->zero_credit_time);
+            qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) - 
link->zero_credit_time);
         break;
 
     default:
diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c
index 53c4b42..fcf389a 100644
--- a/src/router_core/agent_router.c
+++ b/src/router_core/agent_router.c
@@ -242,7 +242,7 @@ static void qdr_agent_write_column_CT(qd_composed_field_t 
*body, int col, qdr_co
         break;
 
     case QDR_ROUTER_UPTIME_SECONDS:
-        qd_compose_insert_uint(body, core->uptime_ticks);
+        qd_compose_insert_uint(body, qdr_core_uptime_ticks(core));
         break;
 
     case QDR_ROUTER_MEMORY_USAGE: {
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 9d657a2..52c833f 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -109,7 +109,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t          
         *core,
     DEQ_INIT(conn->streaming_link_pool);
     conn->connection_info->role = conn->role;
     conn->work_lock = sys_mutex();
-    conn->conn_uptime = core->uptime_ticks;
+    conn->conn_uptime = qdr_core_uptime_ticks(core);
 
     if (vhost) {
         conn->tenant_space_len = strlen(vhost) + 1;
@@ -259,7 +259,7 @@ void qdr_record_link_credit(qdr_core_t *core, qdr_link_t 
*link)
             //
             // The link has transitioned from positive credit to zero credit.
             //
-            link->zero_credit_time = core->uptime_ticks;
+            link->zero_credit_time = qdr_core_uptime_ticks(core);
         } else if (link->credit_reported == 0 && pn_credit > 0) {
             //
             // The link has transitioned from zero credit to positive credit.
@@ -641,8 +641,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
     link->credit_pending = conn->link_capacity;
     link->admin_enabled  = true;
     link->oper_status    = QDR_LINK_OPER_DOWN;
-    link->core_ticks     = conn->core->uptime_ticks;
-    link->zero_credit_time = conn->core->uptime_ticks;
+    link->core_ticks     = qdr_core_uptime_ticks(conn->core);
+    link->zero_credit_time = link->core_ticks;
     link->terminus_survives_disconnect = 
qdr_terminus_survives_disconnect(local_terminus);
     link->no_route = no_route;
     link->priority = QDR_DEFAULT_PRIORITY;
@@ -1178,8 +1178,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t        *core,
     link->insert_prefix  = 0;
     link->strip_prefix   = 0;
     link->attach_count   = 1;
-    link->core_ticks     = core->uptime_ticks;
-    link->zero_credit_time = core->uptime_ticks;
+    link->core_ticks     = qdr_core_uptime_ticks(core);
+    link->zero_credit_time = link->core_ticks;
     link->priority       = priority;
 
     link->strip_annotations_in  = conn->strip_annotations_in;
diff --git a/src/router_core/core_link_endpoint.c 
b/src/router_core/core_link_endpoint.c
index b65ac6a..678c59a 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -134,7 +134,7 @@ qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core, 
qdrc_endpoint_t *end
     qdr_delivery_t *dlv = new_qdr_delivery_t();
 
     if (endpoint->link->conn)
-        endpoint->link->conn->last_delivery_time = core->uptime_ticks;
+        endpoint->link->conn->last_delivery_time = qdr_core_uptime_ticks(core);
 
     ZERO(dlv);
     set_safe_ptr_qdr_link_t(endpoint->link, &dlv->link_sp);
diff --git a/src/router_core/core_timer.c b/src/router_core/core_timer.c
index 286dda7..b8eab2a 100644
--- a/src/router_core/core_timer.c
+++ b/src/router_core/core_timer.c
@@ -108,7 +108,7 @@ void qdr_process_tick_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard)
     if (discard)
         return;
 
-    core->uptime_ticks++;
+    sys_atomic_inc(&core->uptime_ticks);
 
     qdr_core_timer_t *timer = DEQ_HEAD(core->scheduled_timers);
     qdr_core_timer_t *timer_next = 0;
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 8a3432f..48c1f13 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -393,7 +393,7 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core, 
qdr_delivery_t *delive
                DLV_ARGS(delivery), delivery->presettled ? "pre-settled" : "",
                pn_disposition_type_name(outcome), outcome);
 
-        uint32_t delay = core->uptime_ticks - delivery->ingress_time;
+        uint32_t delay = qdr_core_uptime_ticks(core) - delivery->ingress_time;
         if (delay > 10) {
             link->deliveries_delayed_10sec++;
             if (link->link_direction ==  QD_INCOMING)
@@ -419,7 +419,7 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core, 
qdr_delivery_t *delive
         // Compute the settlement rate
         //
         if (do_rate) {
-            uint32_t delta_time = core->uptime_ticks - link->core_ticks;
+            uint32_t delta_time = qdr_core_uptime_ticks(core) - 
link->core_ticks;
             if (delta_time > 0) {
                 if (delta_time > QDR_LINK_RATE_DEPTH)
                     delta_time = QDR_LINK_RATE_DEPTH;
@@ -427,7 +427,7 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core, 
qdr_delivery_t *delive
                     link->rate_cursor = (link->rate_cursor + 1) % 
QDR_LINK_RATE_DEPTH;
                     link->settled_deliveries[link->rate_cursor] = 0;
                 }
-                link->core_ticks = core->uptime_ticks;
+                link->core_ticks = qdr_core_uptime_ticks(core);
             }
             link->settled_deliveries[link->rate_cursor]++;
         }
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index d37996b..c09ed57 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -146,7 +146,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t 
*core, qdr_delivery_t *in
 {
     qdr_delivery_t *out_dlv = new_qdr_delivery_t();
     if (out_link->conn)
-        out_link->conn->last_delivery_time = core->uptime_ticks;
+        out_link->conn->last_delivery_time = qdr_core_uptime_ticks(core);
 
     ZERO(out_dlv);
     set_safe_ptr_qdr_link_t(out_link, &out_dlv->link_sp);
@@ -167,7 +167,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t 
*core, qdr_delivery_t *in
         }
     } else {
         out_dlv->settled       = true;
-        out_dlv->ingress_time  = core->uptime_ticks;
+        out_dlv->ingress_time  = qdr_core_uptime_ticks(core);
         out_dlv->ingress_index = -1;
     }
 
@@ -1033,8 +1033,8 @@ void qdr_forward_link_direct_CT(qdr_core_t       *core,
     out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? 
QD_INCOMING : QD_OUTGOING;
     out_link->admin_enabled  = true;
     out_link->attach_count   = 1;
-    out_link->core_ticks     = conn->core->uptime_ticks;
-    out_link->zero_credit_time = core->uptime_ticks;
+    out_link->core_ticks     = qdr_core_uptime_ticks(core);
+    out_link->zero_credit_time = out_link->core_ticks;
     out_link->strip_annotations_in  = conn->strip_annotations_in;
     out_link->strip_annotations_out = conn->strip_annotations_out;
     out_link->priority       = in_link->priority;
diff --git a/src/router_core/modules/heartbeat_server/heartbeat_server.c 
b/src/router_core/modules/heartbeat_server/heartbeat_server.c
index eed1cda..51db724 100644
--- a/src/router_core/modules/heartbeat_server/heartbeat_server.c
+++ b/src/router_core/modules/heartbeat_server/heartbeat_server.c
@@ -56,7 +56,7 @@ static void _on_transfer(void           *link_context,
 
     endpoint_ref_t *epr = (endpoint_ref_t*) link_context;
 
-    epr->last_heartbeat = _server_state.core->uptime_ticks;
+    epr->last_heartbeat = qdr_core_uptime_ticks(_server_state.core);
 
     qdrc_endpoint_settle_CT(_server_state.core, delivery, PN_ACCEPTED);
     qdrc_endpoint_flow_CT(_server_state.core, epr->endpoint, 1, false);
@@ -87,7 +87,7 @@ static void _on_first_attach(void            *bind_context,
     endpoint_ref_t *epr = new_endpoint_ref_t();
     ZERO(epr);
     epr->endpoint       = endpoint;
-    epr->last_heartbeat = _server_state.core->uptime_ticks;
+    epr->last_heartbeat = qdr_core_uptime_ticks(_server_state.core);
     epr->container_id   = (conn->connection_info) ? 
conn->connection_info->container : "<unknown>";
     epr->conn           = conn;
     epr->conn_id        = conn->identity;
@@ -131,7 +131,7 @@ static void on_timer(qdr_core_t *core, void *context)
     qdr_core_timer_schedule_CT(core, _server_state.timer, 2);
     endpoint_ref_t *epr = DEQ_HEAD(_server_state.endpoints);
     while (epr) {
-        if (core->uptime_ticks - epr->last_heartbeat > HEARTBEAT_THRESHOLD) {
+        if (qdr_core_uptime_ticks(core) - epr->last_heartbeat > 
HEARTBEAT_THRESHOLD) {
             qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Lost heartbeat from 
container %s, closing connection",
             epr->conn_id, epr->container_id);
             qdr_close_connection_CT(core, epr->conn);
diff --git 
a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c 
b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
index a348011..a445cd0 100644
--- a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
+++ b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
@@ -52,7 +52,7 @@ static void check_delivery_CT(qdr_core_t *core, qdr_link_t 
*link, qdr_delivery_t
         return;
     }
 
-    if (!dlv->stuck && ((core->uptime_ticks - link->core_ticks) > stuck_age)) {
+    if (!dlv->stuck && ((qdr_core_uptime_ticks(core) - link->core_ticks) > 
stuck_age)) {
         dlv->stuck = true;
         link->deliveries_stuck++;
         core->deliveries_stuck++;
@@ -80,14 +80,14 @@ static void process_link_CT(qdr_core_t *core, qdr_link_t 
*link)
     }
 
     if (!link->reported_as_blocked && link->zero_credit_time > 0 &&
-        (core->uptime_ticks - link->zero_credit_time > stuck_age)) {
+        (qdr_core_uptime_ticks(core) - link->zero_credit_time > stuck_age)) {
         link->reported_as_blocked = true;
         core->links_blocked++;
         qd_log(core->log, QD_LOG_INFO,
                "[C%"PRIu64"][L%"PRIu64"] "
                "Link blocked with zero credit for %d seconds",
                link->conn ? link->conn->identity : 0, link->identity,
-               core->uptime_ticks - link->zero_credit_time);
+               qdr_core_uptime_ticks(core) - link->zero_credit_time);
     }
 }
 
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 08293e3..7018cdc 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -78,6 +78,7 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t 
mode, const char *area,
     core->router_area         = area;
     core->router_id           = id;
     core->worker_thread_count = qd->thread_count;
+    sys_atomic_init(&core->uptime_ticks, 0);
 
     DEQ_INIT(core->exchanges);
 
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 6a0f4ba..53ea295 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -832,7 +832,7 @@ struct qdr_core_t {
     qdr_core_timer_list_t    scheduled_timers;
     qdr_general_work_list_t  work_list;
     qd_timer_t              *work_timer;
-    uint32_t                 uptime_ticks;
+    sys_atomic_t             uptime_ticks;
 
     qdr_protocol_adaptor_list_t  protocol_adaptors;
     qdr_connection_list_t        open_connections;
@@ -1072,4 +1072,12 @@ void qdr_reset_sheaf(qdr_core_t *core, uint8_t n);
  */
 void qdr_record_link_credit(qdr_core_t *core, qdr_link_t *link);
 
+/**
+ * Access core uptime
+ */
+static inline uint32_t qdr_core_uptime_ticks(qdr_core_t *core)
+{
+    return sys_atomic_get(&core->uptime_ticks);
+}
+
 #endif
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index d3edd38..6c05462 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -758,7 +758,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core, 
qdr_action_t *action, bool dis
     if (!link)
         return;
     if (link->conn)
-        link->conn->last_delivery_time = core->uptime_ticks;
+        link->conn->last_delivery_time = qdr_core_uptime_ticks(core);
 
     link->total_deliveries++;
 
@@ -768,7 +768,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core, 
qdr_action_t *action, bool dis
     //
     // Record the ingress time so we can track the age of this delivery.
     //
-    dlv->ingress_time = core->uptime_ticks;
+    dlv->ingress_time = qdr_core_uptime_ticks(core);
 
     //
     // If the link is an edge link, mark this delivery as via-edge
diff --git a/tests/tsan.supp b/tests/tsan.supp
index 68496f7..149de61 100644
--- a/tests/tsan.supp
+++ b/tests/tsan.supp
@@ -47,9 +47,6 @@ deadlock:qd_policy_socket_close
 # DISPATCH-2131
 race:qdr_record_link_credit
 
-# DISPATCH-2132
-race:qdr_process_tick_CT
-
 # DISPATCH-2133 (harmless)
 race:qd_log_enabled
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to