This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new f9a1639 DISPATCH-2132: atomic access to core uptime counter f9a1639 is described below commit f9a1639edef75d15a6ef5d8e4b466fda900a6a8a Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Thu Jan 27 15:39:39 2022 -0500 DISPATCH-2132: atomic access to core uptime counter This closes #1501 --- src/adaptors/tcp_adaptor.c | 14 +++++++------- src/router_core/agent_connection.c | 4 ++-- src/router_core/agent_link.c | 6 +++--- src/router_core/agent_router.c | 2 +- src/router_core/connections.c | 12 ++++++------ src/router_core/core_link_endpoint.c | 2 +- src/router_core/core_timer.c | 2 +- src/router_core/delivery.c | 6 +++--- src/router_core/forwarder.c | 8 ++++---- .../modules/heartbeat_server/heartbeat_server.c | 6 +++--- .../modules/stuck_delivery_detection/delivery_tracker.c | 6 +++--- src/router_core/router_core.c | 1 + src/router_core/router_core_private.h | 10 +++++++++- src/router_core/transfer.c | 4 ++-- tests/tsan.supp | 3 --- 15 files changed, 46 insertions(+), 40 deletions(-) diff --git a/src/adaptors/tcp_adaptor.c b/src/adaptors/tcp_adaptor.c index e97b29b..4bf6554 100644 --- a/src/adaptors/tcp_adaptor.c +++ b/src/adaptors/tcp_adaptor.c @@ -269,7 +269,7 @@ static int handle_incoming_raw_read(qdr_tcp_connection_t *conn, qd_buffer_list_t if (result > 0) { // account for any incoming bytes just read - conn->last_in_time = tcp_adaptor->core->uptime_ticks; + conn->last_in_time = qdr_core_uptime_ticks(tcp_adaptor->core); conn->bytes_in += result; LOCK(conn->bridge->stats_lock); conn->bridge->bytes_in += result; @@ -772,7 +772,7 @@ static void qdr_tcp_connection_ingress_accept(qdr_tcp_connection_t* tc) false, NULL, &(tc->incoming_id)); - tc->opened_time = tcp_adaptor->core->uptime_ticks; + tc->opened_time = qdr_core_uptime_ticks(tcp_adaptor->core); qdr_link_set_context(tc->incoming, tc); qdr_action_t *action = qdr_action(qdr_add_tcp_connection_CT, "add_tcp_connection"); @@ -794,7 +794,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void break; } else { conn->remote_address = get_address_string(conn->pn_raw_conn); - conn->opened_time = tcp_adaptor->core->uptime_ticks; + conn->opened_time = qdr_core_uptime_ticks(tcp_adaptor->core); qd_log(log, QD_LOG_INFO, "[C%"PRIu64"] PN_RAW_CONNECTION_CONNECTED Connector egress connected to %s", conn->conn_id, conn->remote_address); @@ -896,7 +896,7 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void conn->write_buffer.size = 0; conn->write_buffer.offset = 0; conn->write_buffer.context = 0; - conn->last_out_time = tcp_adaptor->core->uptime_ticks; + conn->last_out_time = qdr_core_uptime_ticks(tcp_adaptor->core); conn->bytes_out += written; LOCK(conn->bridge->stats_lock); conn->bridge->bytes_out += written; @@ -1868,21 +1868,21 @@ static void insert_column(qdr_core_t *core, qdr_tcp_connection_t *conn, int col, break; case QDR_TCP_CONNECTION_UPTIME_SECONDS: - qd_compose_insert_uint(body, core->uptime_ticks - conn->opened_time); + qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) - conn->opened_time); break; case QDR_TCP_CONNECTION_LAST_IN_SECONDS: if (conn->last_in_time==0) qd_compose_insert_null(body); else - qd_compose_insert_uint(body, core->uptime_ticks - conn->last_in_time); + qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) - conn->last_in_time); break; case QDR_TCP_CONNECTION_LAST_OUT_SECONDS: if (conn->last_out_time==0) qd_compose_insert_null(body); else - qd_compose_insert_uint(body, core->uptime_ticks - conn->last_out_time); + qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) - conn->last_out_time); break; } diff --git a/src/router_core/agent_connection.c b/src/router_core/agent_connection.c index 713603f..9943d5d 100644 --- a/src/router_core/agent_connection.c +++ b/src/router_core/agent_connection.c @@ -260,14 +260,14 @@ static void qdr_connection_insert_column_CT(qdr_core_t *core, qdr_connection_t * break; case QDR_CONNECTION_UPTIME_SECONDS: - qd_compose_insert_uint(body, core->uptime_ticks - conn->conn_uptime); + qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) - conn->conn_uptime); break; case QDR_CONNECTION_LAST_DLV_SECONDS: if (conn->last_delivery_time==0) qd_compose_insert_null(body); else - qd_compose_insert_uint(body, core->uptime_ticks - conn->last_delivery_time); + qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) - conn->last_delivery_time); break; case QDR_CONNECTION_PROPERTIES: { diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c index 01296a7..70c7023 100644 --- a/src/router_core/agent_link.c +++ b/src/router_core/agent_link.c @@ -256,7 +256,7 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *bod break; case QDR_LINK_SETTLE_RATE: { - uint32_t delta_time = core->uptime_ticks - link->core_ticks; + uint32_t delta_time = qdr_core_uptime_ticks(core) - link->core_ticks; if (delta_time > 0) { if (delta_time > QDR_LINK_RATE_DEPTH) delta_time = QDR_LINK_RATE_DEPTH; @@ -264,7 +264,7 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *bod link->rate_cursor = (link->rate_cursor + 1) % QDR_LINK_RATE_DEPTH; link->settled_deliveries[link->rate_cursor] = 0; } - link->core_ticks = core->uptime_ticks; + link->core_ticks = qdr_core_uptime_ticks(core); } uint64_t total = 0; @@ -282,7 +282,7 @@ static void qdr_agent_write_column_CT(qdr_core_t *core, qd_composed_field_t *bod if (link->zero_credit_time == 0) qd_compose_insert_uint(body, 0); else - qd_compose_insert_uint(body, core->uptime_ticks - link->zero_credit_time); + qd_compose_insert_uint(body, qdr_core_uptime_ticks(core) - link->zero_credit_time); break; default: diff --git a/src/router_core/agent_router.c b/src/router_core/agent_router.c index 53c4b42..fcf389a 100644 --- a/src/router_core/agent_router.c +++ b/src/router_core/agent_router.c @@ -242,7 +242,7 @@ static void qdr_agent_write_column_CT(qd_composed_field_t *body, int col, qdr_co break; case QDR_ROUTER_UPTIME_SECONDS: - qd_compose_insert_uint(body, core->uptime_ticks); + qd_compose_insert_uint(body, qdr_core_uptime_ticks(core)); break; case QDR_ROUTER_MEMORY_USAGE: { diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 9d657a2..52c833f 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -109,7 +109,7 @@ qdr_connection_t *qdr_connection_opened(qdr_core_t *core, DEQ_INIT(conn->streaming_link_pool); conn->connection_info->role = conn->role; conn->work_lock = sys_mutex(); - conn->conn_uptime = core->uptime_ticks; + conn->conn_uptime = qdr_core_uptime_ticks(core); if (vhost) { conn->tenant_space_len = strlen(vhost) + 1; @@ -259,7 +259,7 @@ void qdr_record_link_credit(qdr_core_t *core, qdr_link_t *link) // // The link has transitioned from positive credit to zero credit. // - link->zero_credit_time = core->uptime_ticks; + link->zero_credit_time = qdr_core_uptime_ticks(core); } else if (link->credit_reported == 0 && pn_credit > 0) { // // The link has transitioned from zero credit to positive credit. @@ -641,8 +641,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, link->credit_pending = conn->link_capacity; link->admin_enabled = true; link->oper_status = QDR_LINK_OPER_DOWN; - link->core_ticks = conn->core->uptime_ticks; - link->zero_credit_time = conn->core->uptime_ticks; + link->core_ticks = qdr_core_uptime_ticks(conn->core); + link->zero_credit_time = link->core_ticks; link->terminus_survives_disconnect = qdr_terminus_survives_disconnect(local_terminus); link->no_route = no_route; link->priority = QDR_DEFAULT_PRIORITY; @@ -1178,8 +1178,8 @@ qdr_link_t *qdr_create_link_CT(qdr_core_t *core, link->insert_prefix = 0; link->strip_prefix = 0; link->attach_count = 1; - link->core_ticks = core->uptime_ticks; - link->zero_credit_time = core->uptime_ticks; + link->core_ticks = qdr_core_uptime_ticks(core); + link->zero_credit_time = link->core_ticks; link->priority = priority; link->strip_annotations_in = conn->strip_annotations_in; diff --git a/src/router_core/core_link_endpoint.c b/src/router_core/core_link_endpoint.c index b65ac6a..678c59a 100644 --- a/src/router_core/core_link_endpoint.c +++ b/src/router_core/core_link_endpoint.c @@ -134,7 +134,7 @@ qdr_delivery_t *qdrc_endpoint_delivery_CT(qdr_core_t *core, qdrc_endpoint_t *end qdr_delivery_t *dlv = new_qdr_delivery_t(); if (endpoint->link->conn) - endpoint->link->conn->last_delivery_time = core->uptime_ticks; + endpoint->link->conn->last_delivery_time = qdr_core_uptime_ticks(core); ZERO(dlv); set_safe_ptr_qdr_link_t(endpoint->link, &dlv->link_sp); diff --git a/src/router_core/core_timer.c b/src/router_core/core_timer.c index 286dda7..b8eab2a 100644 --- a/src/router_core/core_timer.c +++ b/src/router_core/core_timer.c @@ -108,7 +108,7 @@ void qdr_process_tick_CT(qdr_core_t *core, qdr_action_t *action, bool discard) if (discard) return; - core->uptime_ticks++; + sys_atomic_inc(&core->uptime_ticks); qdr_core_timer_t *timer = DEQ_HEAD(core->scheduled_timers); qdr_core_timer_t *timer_next = 0; diff --git a/src/router_core/delivery.c b/src/router_core/delivery.c index 8a3432f..48c1f13 100644 --- a/src/router_core/delivery.c +++ b/src/router_core/delivery.c @@ -393,7 +393,7 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delive DLV_ARGS(delivery), delivery->presettled ? "pre-settled" : "", pn_disposition_type_name(outcome), outcome); - uint32_t delay = core->uptime_ticks - delivery->ingress_time; + uint32_t delay = qdr_core_uptime_ticks(core) - delivery->ingress_time; if (delay > 10) { link->deliveries_delayed_10sec++; if (link->link_direction == QD_INCOMING) @@ -419,7 +419,7 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delive // Compute the settlement rate // if (do_rate) { - uint32_t delta_time = core->uptime_ticks - link->core_ticks; + uint32_t delta_time = qdr_core_uptime_ticks(core) - link->core_ticks; if (delta_time > 0) { if (delta_time > QDR_LINK_RATE_DEPTH) delta_time = QDR_LINK_RATE_DEPTH; @@ -427,7 +427,7 @@ void qdr_delivery_increment_counters_CT(qdr_core_t *core, qdr_delivery_t *delive link->rate_cursor = (link->rate_cursor + 1) % QDR_LINK_RATE_DEPTH; link->settled_deliveries[link->rate_cursor] = 0; } - link->core_ticks = core->uptime_ticks; + link->core_ticks = qdr_core_uptime_ticks(core); } link->settled_deliveries[link->rate_cursor]++; } diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c index d37996b..c09ed57 100644 --- a/src/router_core/forwarder.c +++ b/src/router_core/forwarder.c @@ -146,7 +146,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in { qdr_delivery_t *out_dlv = new_qdr_delivery_t(); if (out_link->conn) - out_link->conn->last_delivery_time = core->uptime_ticks; + out_link->conn->last_delivery_time = qdr_core_uptime_ticks(core); ZERO(out_dlv); set_safe_ptr_qdr_link_t(out_link, &out_dlv->link_sp); @@ -167,7 +167,7 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t *core, qdr_delivery_t *in } } else { out_dlv->settled = true; - out_dlv->ingress_time = core->uptime_ticks; + out_dlv->ingress_time = qdr_core_uptime_ticks(core); out_dlv->ingress_index = -1; } @@ -1033,8 +1033,8 @@ void qdr_forward_link_direct_CT(qdr_core_t *core, out_link->link_direction = qdr_link_direction(in_link) == QD_OUTGOING ? QD_INCOMING : QD_OUTGOING; out_link->admin_enabled = true; out_link->attach_count = 1; - out_link->core_ticks = conn->core->uptime_ticks; - out_link->zero_credit_time = core->uptime_ticks; + out_link->core_ticks = qdr_core_uptime_ticks(core); + out_link->zero_credit_time = out_link->core_ticks; out_link->strip_annotations_in = conn->strip_annotations_in; out_link->strip_annotations_out = conn->strip_annotations_out; out_link->priority = in_link->priority; diff --git a/src/router_core/modules/heartbeat_server/heartbeat_server.c b/src/router_core/modules/heartbeat_server/heartbeat_server.c index eed1cda..51db724 100644 --- a/src/router_core/modules/heartbeat_server/heartbeat_server.c +++ b/src/router_core/modules/heartbeat_server/heartbeat_server.c @@ -56,7 +56,7 @@ static void _on_transfer(void *link_context, endpoint_ref_t *epr = (endpoint_ref_t*) link_context; - epr->last_heartbeat = _server_state.core->uptime_ticks; + epr->last_heartbeat = qdr_core_uptime_ticks(_server_state.core); qdrc_endpoint_settle_CT(_server_state.core, delivery, PN_ACCEPTED); qdrc_endpoint_flow_CT(_server_state.core, epr->endpoint, 1, false); @@ -87,7 +87,7 @@ static void _on_first_attach(void *bind_context, endpoint_ref_t *epr = new_endpoint_ref_t(); ZERO(epr); epr->endpoint = endpoint; - epr->last_heartbeat = _server_state.core->uptime_ticks; + epr->last_heartbeat = qdr_core_uptime_ticks(_server_state.core); epr->container_id = (conn->connection_info) ? conn->connection_info->container : "<unknown>"; epr->conn = conn; epr->conn_id = conn->identity; @@ -131,7 +131,7 @@ static void on_timer(qdr_core_t *core, void *context) qdr_core_timer_schedule_CT(core, _server_state.timer, 2); endpoint_ref_t *epr = DEQ_HEAD(_server_state.endpoints); while (epr) { - if (core->uptime_ticks - epr->last_heartbeat > HEARTBEAT_THRESHOLD) { + if (qdr_core_uptime_ticks(core) - epr->last_heartbeat > HEARTBEAT_THRESHOLD) { qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Lost heartbeat from container %s, closing connection", epr->conn_id, epr->container_id); qdr_close_connection_CT(core, epr->conn); diff --git a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c index a348011..a445cd0 100644 --- a/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c +++ b/src/router_core/modules/stuck_delivery_detection/delivery_tracker.c @@ -52,7 +52,7 @@ static void check_delivery_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t return; } - if (!dlv->stuck && ((core->uptime_ticks - link->core_ticks) > stuck_age)) { + if (!dlv->stuck && ((qdr_core_uptime_ticks(core) - link->core_ticks) > stuck_age)) { dlv->stuck = true; link->deliveries_stuck++; core->deliveries_stuck++; @@ -80,14 +80,14 @@ static void process_link_CT(qdr_core_t *core, qdr_link_t *link) } if (!link->reported_as_blocked && link->zero_credit_time > 0 && - (core->uptime_ticks - link->zero_credit_time > stuck_age)) { + (qdr_core_uptime_ticks(core) - link->zero_credit_time > stuck_age)) { link->reported_as_blocked = true; core->links_blocked++; qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"][L%"PRIu64"] " "Link blocked with zero credit for %d seconds", link->conn ? link->conn->identity : 0, link->identity, - core->uptime_ticks - link->zero_credit_time); + qdr_core_uptime_ticks(core) - link->zero_credit_time); } } diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 08293e3..7018cdc 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -78,6 +78,7 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, core->router_area = area; core->router_id = id; core->worker_thread_count = qd->thread_count; + sys_atomic_init(&core->uptime_ticks, 0); DEQ_INIT(core->exchanges); diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 6a0f4ba..53ea295 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -832,7 +832,7 @@ struct qdr_core_t { qdr_core_timer_list_t scheduled_timers; qdr_general_work_list_t work_list; qd_timer_t *work_timer; - uint32_t uptime_ticks; + sys_atomic_t uptime_ticks; qdr_protocol_adaptor_list_t protocol_adaptors; qdr_connection_list_t open_connections; @@ -1072,4 +1072,12 @@ void qdr_reset_sheaf(qdr_core_t *core, uint8_t n); */ void qdr_record_link_credit(qdr_core_t *core, qdr_link_t *link); +/** + * Access core uptime + */ +static inline uint32_t qdr_core_uptime_ticks(qdr_core_t *core) +{ + return sys_atomic_get(&core->uptime_ticks); +} + #endif diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index d3edd38..6c05462 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -758,7 +758,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis if (!link) return; if (link->conn) - link->conn->last_delivery_time = core->uptime_ticks; + link->conn->last_delivery_time = qdr_core_uptime_ticks(core); link->total_deliveries++; @@ -768,7 +768,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool dis // // Record the ingress time so we can track the age of this delivery. // - dlv->ingress_time = core->uptime_ticks; + dlv->ingress_time = qdr_core_uptime_ticks(core); // // If the link is an edge link, mark this delivery as via-edge diff --git a/tests/tsan.supp b/tests/tsan.supp index 68496f7..149de61 100644 --- a/tests/tsan.supp +++ b/tests/tsan.supp @@ -47,9 +47,6 @@ deadlock:qd_policy_socket_close # DISPATCH-2131 race:qdr_record_link_credit -# DISPATCH-2132 -race:qdr_process_tick_CT - # DISPATCH-2133 (harmless) race:qd_log_enabled --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org