This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new f9a1639 DISPATCH-2132: atomic access to core uptime counter
f9a1639 is described below
commit f9a1639edef75d15a6ef5d8e4b466fda900a6a8a
Author: Kenneth Giusti <[email protected]>
AuthorDate: Thu Jan 27 15:39:39 2022 -0500
DISPATCH-2132: atomic access to core uptime counter
This closes #1501
---
src/adaptors/tcp_adaptor.c | 14 +++++++-------
src/router_core/agent_connection.c | 4 ++--
src/router_core/agent_link.c | 6 +++---
src/router_core/agent_router.c | 2 +-
src/router_core/connections.c | 12 ++++++------
src/router_core/core_link_endpoint.c | 2 +-
src/router_core/core_timer.c | 2 +-
src/router_core/delivery.c | 6 +++---
src/router_core/forwarder.c | 8 ++++----
.../modules/heartbeat_server/heartbeat_server.c | 6 +++---
.../modules/stuck_delivery_detection/delivery_tracker.c | 6 +++---
src/router_core/router_core.c | 1 +
src/router_core/router_core_private.h | 10 +++++++++-
src/router_core/transfer.c | 4 ++--
tests/tsan.supp | 3 ---
15 files changed, 46 insertions(+), 40 deletions(-)
diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c
index e97b29b..4bf6554 100644
--- a/src/adaptors/tcp_adaptor.c
+++ b/src/adaptors/tcp_adaptor.c
@@ -269,7 +269,7 @@ static int handle_incoming_raw_read(qdr_tcp_connection_t
*conn, qd_buffer_list_t
if (result > 0) {
// account for any incoming bytes just read
- conn->last_in_time = tcp_adaptor->core->uptime_ticks;
+ conn->last_in_time = qdr_core_uptime_ticks(tcp_adaptor->core);
conn->bytes_in += result;
LOCK(conn->bridge->stats_lock);
conn->bridge->bytes_in += result;
@@ -772,7 +772,7 @@ static void
qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc)
false,
NULL,
&(tc->incoming_id));
- tc->opened_time = tcp_adaptor->core->uptime_ticks;
+ tc->opened_time = qdr_core_uptime_ticks(tcp_adaptor->core);
qdr_link_set_context(tc->incoming, tc);
qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT,
"add_tcp_connection");
@@ -794,7 +794,7 @@ static void handle_connection_event(pn_event_t *e,
qd_server_t *qd_server, void
break;
} else {
conn->remote_address = get_address_string(conn->pn_raw_conn);
- conn->opened_time = tcp_adaptor->core->uptime_ticks;
+ conn->opened_time = qdr_core_uptime_ticks(tcp_adaptor->core);
qd_log(log, QD_LOG_INFO,
"[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Connector egress
connected to %s",
conn->conn_id, conn->remote_address);
@@ -896,7 +896,7 @@ static void handle_connection_event(pn_event_t *e,
qd_server_t *qd_server, void
conn->write_buffer.size = 0;
conn->write_buffer.offset = 0;
conn->write_buffer.context = 0;
- conn->last_out_time = tcp_adaptor->core->uptime_ticks;
+ conn->last_out_time = qdr_core_uptime_ticks(tcp_adaptor->core);
conn->bytes_out += written;
LOCK(conn->bridge->stats_lock);
conn->bridge->bytes_out += written;
@@ -1868,21 +1868,21 @@ static void insert_column(qdr_core_t *core,
qdr_tcp_connection_t *conn, int col,
break;
case QDR_TCP_CONNECTION_UPTIME_SECONDS:
- qd_compose_insert_uint(body, core->uptime_ticks - conn->opened_time);
+ qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) -
conn->opened_time);
break;
case QDR_TCP_CONNECTION_LAST_IN_SECONDS:
if (conn->last_in_time==0)
qd_compose_insert_null(body);
else
- qd_compose_insert_uint(body, core->uptime_ticks -
conn->last_in_time);
+ qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) -
conn->last_in_time);
break;
case QDR_TCP_CONNECTION_LAST_OUT_SECONDS:
if (conn->last_out_time==0)
qd_compose_insert_null(body);
else
- qd_compose_insert_uint(body, core->uptime_ticks -
conn->last_out_time);
+ qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) -
conn->last_out_time);
break;
}
diff --git a/src/router_core/agent_connection.c
b/src/router_core/agent_connection.c
index 713603f..9943d5d 100644
--- a/src/router_core/agent_connection.c
+++ b/src/router_core/agent_connection.c
@@ -260,14 +260,14 @@ static void qdr_connection_insert_column_CT(qdr_core_t
*core, qdr_connection_t *
break;
case QDR_CONNECTION_UPTIME_SECONDS:
- qd_compose_insert_uint(body, core->uptime_ticks - conn->conn_uptime);
+ qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) -
conn->conn_uptime);
break;
case QDR_CONNECTION_LAST_DLV_SECONDS:
if (conn->last_delivery_time==0)
qd_compose_insert_null(body);
else
- qd_compose_insert_uint(body, core->uptime_ticks -
conn->last_delivery_time);
+ qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) -
conn->last_delivery_time);
break;
case QDR_CONNECTION_PROPERTIES: {
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index 01296a7..70c7023 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -256,7 +256,7 @@ static void qdr_agent_write_column_CT(qdr_core_t *core,
qd_composed_field_t *bod
break;
case QDR_LINK_SETTLE_RATE: {
- uint32_t delta_time = core->uptime_ticks - link->core_ticks;
+ uint32_t delta_time = qdr_core_uptime_ticks(core) - link->core_ticks;
if (delta_time > 0) {
if (delta_time > QDR_LINK_RATE_DEPTH)
delta_time = QDR_LINK_RATE_DEPTH;
@@ -264,7 +264,7 @@ static void qdr_agent_write_column_CT(qdr_core_t *core,
qd_composed_field_t *bod
link->rate_cursor = (link->rate_cursor + 1) %
QDR_LINK_RATE_DEPTH;
link->settled_deliveries[link->rate_cursor] = 0;
}
- link->core_ticks = core->uptime_ticks;
+ link->core_ticks = qdr_core_uptime_ticks(core);
}
uint64_t total = 0;
@@ -282,7 +282,7 @@ static void qdr_agent_write_column_CT(qdr_core_t *core,
qd_composed_field_t *bod
if (link->zero_credit_time == 0)
qd_compose_insert_uint(body, 0);
else
- qd_compose_insert_uint(body, core->uptime_ticks -
link->zero_credit_time);
+ qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) -
link->zero_credit_time);
break;
default:
diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c
index 53c4b42..fcf389a 100644
--- a/src/router_core/agent_router.c
+++ b/src/router_core/agent_router.c
@@ -242,7 +242,7 @@ static void qdr_agent_write_column_CT(qd_composed_field_t
*body, int col, qdr_co
break;
case QDR_ROUTER_UPTIME_SECONDS:
- qd_compose_insert_uint(body, core->uptime_ticks);
+ qd_compose_insert_uint(body, qdr_core_uptime_ticks(core));
break;
case QDR_ROUTER_MEMORY_USAGE: {
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 9d657a2..52c833f 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -109,7 +109,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t
*core,
DEQ_INIT(conn->streaming_link_pool);
conn->connection_info->role = conn->role;
conn->work_lock = sys_mutex();
- conn->conn_uptime = core->uptime_ticks;
+ conn->conn_uptime = qdr_core_uptime_ticks(core);
if (vhost) {
conn->tenant_space_len = strlen(vhost) + 1;
@@ -259,7 +259,7 @@ void qdr_record_link_credit(qdr_core_t *core, qdr_link_t
*link)
//
// The link has transitioned from positive credit to zero credit.
//
- link->zero_credit_time = core->uptime_ticks;
+ link->zero_credit_time = qdr_core_uptime_ticks(core);
} else if (link->credit_reported == 0 && pn_credit > 0) {
//
// The link has transitioned from zero credit to positive credit.
@@ -641,8 +641,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn,
link->credit_pending = conn->link_capacity;
link->admin_enabled = true;
link->oper_status = QDR_LINK_OPER_DOWN;
- link->core_ticks = conn->core->uptime_ticks;
- link->zero_credit_time = conn->core->uptime_ticks;
+ link->core_ticks = qdr_core_uptime_ticks(conn->core);
+ link->zero_credit_time = link->core_ticks;
link->terminus_survives_disconnect =
qdr_terminus_survives_disconnect(local_terminus);
link->no_route = no_route;
link->priority = QDR_DEFAULT_PRIORITY;
@@ -1178,8 +1178,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core,
link->insert_prefix = 0;
link->strip_prefix = 0;
link->attach_count = 1;
- link->core_ticks = core->uptime_ticks;
- link->zero_credit_time = core->uptime_ticks;
+ link->core_ticks = qdr_core_uptime_ticks(core);
+ link->zero_credit_time = link->core_ticks;
link->priority = priority;
link->strip_annotations_in = conn->strip_annotations_in;
diff --git a/src/router_core/core_link_endpoint.c
b/src/router_core/core_link_endpoint.c
index b65ac6a..678c59a 100644
--- a/src/router_core/core_link_endpoint.c
+++ b/src/router_core/core_link_endpoint.c
@@ -134,7 +134,7 @@ qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core,
qdrc_endpoint_t *end
qdr_delivery_t *dlv = new_qdr_delivery_t();
if (endpoint->link->conn)
- endpoint->link->conn->last_delivery_time = core->uptime_ticks;
+ endpoint->link->conn->last_delivery_time = qdr_core_uptime_ticks(core);
ZERO(dlv);
set_safe_ptr_qdr_link_t(endpoint->link, &dlv->link_sp);
diff --git a/src/router_core/core_timer.c b/src/router_core/core_timer.c
index 286dda7..b8eab2a 100644
--- a/src/router_core/core_timer.c
+++ b/src/router_core/core_timer.c
@@ -108,7 +108,7 @@ void qdr_process_tick_CT(qdr_core_t *core, qdr_action_t
*action, bool discard)
if (discard)
return;
- core->uptime_ticks++;
+ sys_atomic_inc(&core->uptime_ticks);
qdr_core_timer_t *timer = DEQ_HEAD(core->scheduled_timers);
qdr_core_timer_t *timer_next = 0;
diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c
index 8a3432f..48c1f13 100644
--- a/src/router_core/delivery.c
+++ b/src/router_core/delivery.c
@@ -393,7 +393,7 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core,
qdr_delivery_t *delive
DLV_ARGS(delivery), delivery->presettled ? "pre-settled" : "",
pn_disposition_type_name(outcome), outcome);
- uint32_t delay = core->uptime_ticks - delivery->ingress_time;
+ uint32_t delay = qdr_core_uptime_ticks(core) - delivery->ingress_time;
if (delay > 10) {
link->deliveries_delayed_10sec++;
if (link->link_direction == QD_INCOMING)
@@ -419,7 +419,7 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core,
qdr_delivery_t *delive
// Compute the settlement rate
//
if (do_rate) {
- uint32_t delta_time = core->uptime_ticks - link->core_ticks;
+ uint32_t delta_time = qdr_core_uptime_ticks(core) -
link->core_ticks;
if (delta_time > 0) {
if (delta_time > QDR_LINK_RATE_DEPTH)
delta_time = QDR_LINK_RATE_DEPTH;
@@ -427,7 +427,7 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core,
qdr_delivery_t *delive
link->rate_cursor = (link->rate_cursor + 1) %
QDR_LINK_RATE_DEPTH;
link->settled_deliveries[link->rate_cursor] = 0;
}
- link->core_ticks = core->uptime_ticks;
+ link->core_ticks = qdr_core_uptime_ticks(core);
}
link->settled_deliveries[link->rate_cursor]++;
}
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index d37996b..c09ed57 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -146,7 +146,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t
*core, qdr_delivery_t *in
{
qdr_delivery_t *out_dlv = new_qdr_delivery_t();
if (out_link->conn)
- out_link->conn->last_delivery_time = core->uptime_ticks;
+ out_link->conn->last_delivery_time = qdr_core_uptime_ticks(core);
ZERO(out_dlv);
set_safe_ptr_qdr_link_t(out_link, &out_dlv->link_sp);
@@ -167,7 +167,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t
*core, qdr_delivery_t *in
}
} else {
out_dlv->settled = true;
- out_dlv->ingress_time = core->uptime_ticks;
+ out_dlv->ingress_time = qdr_core_uptime_ticks(core);
out_dlv->ingress_index = -1;
}
@@ -1033,8 +1033,8 @@ void qdr_forward_link_direct_CT(qdr_core_t *core,
out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ?
QD_INCOMING : QD_OUTGOING;
out_link->admin_enabled = true;
out_link->attach_count = 1;
- out_link->core_ticks = conn->core->uptime_ticks;
- out_link->zero_credit_time = core->uptime_ticks;
+ out_link->core_ticks = qdr_core_uptime_ticks(core);
+ out_link->zero_credit_time = out_link->core_ticks;
out_link->strip_annotations_in = conn->strip_annotations_in;
out_link->strip_annotations_out = conn->strip_annotations_out;
out_link->priority = in_link->priority;
diff --git a/src/router_core/modules/heartbeat_server/heartbeat_server.c
b/src/router_core/modules/heartbeat_server/heartbeat_server.c
index eed1cda..51db724 100644
--- a/src/router_core/modules/heartbeat_server/heartbeat_server.c
+++ b/src/router_core/modules/heartbeat_server/heartbeat_server.c
@@ -56,7 +56,7 @@ static void _on_transfer(void *link_context,
endpoint_ref_t *epr = (endpoint_ref_t*) link_context;
- epr->last_heartbeat = _server_state.core->uptime_ticks;
+ epr->last_heartbeat = qdr_core_uptime_ticks(_server_state.core);
qdrc_endpoint_settle_CT(_server_state.core, delivery, PN_ACCEPTED);
qdrc_endpoint_flow_CT(_server_state.core, epr->endpoint, 1, false);
@@ -87,7 +87,7 @@ static void _on_first_attach(void *bind_context,
endpoint_ref_t *epr = new_endpoint_ref_t();
ZERO(epr);
epr->endpoint = endpoint;
- epr->last_heartbeat = _server_state.core->uptime_ticks;
+ epr->last_heartbeat = qdr_core_uptime_ticks(_server_state.core);
epr->container_id = (conn->connection_info) ?
conn->connection_info->container : "<unknown>";
epr->conn = conn;
epr->conn_id = conn->identity;
@@ -131,7 +131,7 @@ static void on_timer(qdr_core_t *core, void *context)
qdr_core_timer_schedule_CT(core, _server_state.timer, 2);
endpoint_ref_t *epr = DEQ_HEAD(_server_state.endpoints);
while (epr) {
- if (core->uptime_ticks - epr->last_heartbeat > HEARTBEAT_THRESHOLD) {
+ if (qdr_core_uptime_ticks(core) - epr->last_heartbeat >
HEARTBEAT_THRESHOLD) {
qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Lost heartbeat from
container %s, closing connection",
epr->conn_id, epr->container_id);
qdr_close_connection_CT(core, epr->conn);
diff --git
a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
index a348011..a445cd0 100644
--- a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
+++ b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c
@@ -52,7 +52,7 @@ static void check_delivery_CT(qdr_core_t *core, qdr_link_t
*link, qdr_delivery_t
return;
}
- if (!dlv->stuck && ((core->uptime_ticks - link->core_ticks) > stuck_age)) {
+ if (!dlv->stuck && ((qdr_core_uptime_ticks(core) - link->core_ticks) >
stuck_age)) {
dlv->stuck = true;
link->deliveries_stuck++;
core->deliveries_stuck++;
@@ -80,14 +80,14 @@ static void process_link_CT(qdr_core_t *core, qdr_link_t
*link)
}
if (!link->reported_as_blocked && link->zero_credit_time > 0 &&
- (core->uptime_ticks - link->zero_credit_time > stuck_age)) {
+ (qdr_core_uptime_ticks(core) - link->zero_credit_time > stuck_age)) {
link->reported_as_blocked = true;
core->links_blocked++;
qd_log(core->log, QD_LOG_INFO,
"[C%"PRIu64"][L%"PRIu64"] "
"Link blocked with zero credit for %d seconds",
link->conn ? link->conn->identity : 0, link->identity,
- core->uptime_ticks - link->zero_credit_time);
+ qdr_core_uptime_ticks(core) - link->zero_credit_time);
}
}
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 08293e3..7018cdc 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -78,6 +78,7 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t
mode, const char *area,
core->router_area = area;
core->router_id = id;
core->worker_thread_count = qd->thread_count;
+ sys_atomic_init(&core->uptime_ticks, 0);
DEQ_INIT(core->exchanges);
diff --git a/src/router_core/router_core_private.h
b/src/router_core/router_core_private.h
index 6a0f4ba..53ea295 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -832,7 +832,7 @@ struct qdr_core_t {
qdr_core_timer_list_t scheduled_timers;
qdr_general_work_list_t work_list;
qd_timer_t *work_timer;
- uint32_t uptime_ticks;
+ sys_atomic_t uptime_ticks;
qdr_protocol_adaptor_list_t protocol_adaptors;
qdr_connection_list_t open_connections;
@@ -1072,4 +1072,12 @@ void qdr_reset_sheaf(qdr_core_t *core, uint8_t n);
*/
void qdr_record_link_credit(qdr_core_t *core, qdr_link_t *link);
+/**
+ * Access core uptime
+ */
+static inline uint32_t qdr_core_uptime_ticks(qdr_core_t *core)
+{
+ return sys_atomic_get(&core->uptime_ticks);
+}
+
#endif
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index d3edd38..6c05462 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -758,7 +758,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core,
qdr_action_t *action, bool dis
if (!link)
return;
if (link->conn)
- link->conn->last_delivery_time = core->uptime_ticks;
+ link->conn->last_delivery_time = qdr_core_uptime_ticks(core);
link->total_deliveries++;
@@ -768,7 +768,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core,
qdr_action_t *action, bool dis
//
// Record the ingress time so we can track the age of this delivery.
//
- dlv->ingress_time = core->uptime_ticks;
+ dlv->ingress_time = qdr_core_uptime_ticks(core);
//
// If the link is an edge link, mark this delivery as via-edge
diff --git a/tests/tsan.supp b/tests/tsan.supp
index 68496f7..149de61 100644
--- a/tests/tsan.supp
+++ b/tests/tsan.supp
@@ -47,9 +47,6 @@ deadlock:qd_policy_socket_close
# DISPATCH-2131
race:qdr_record_link_credit
-# DISPATCH-2132
-race:qdr_process_tick_CT
-
# DISPATCH-2133 (harmless)
race:qd_log_enabled
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]