This is an automated email from the ASF dual-hosted git repository.
chug pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new fc745cd DISPATCH-2166: Improve message, message_content multithread
access correctness
fc745cd is described below
commit fc745cdf9759d3d2bf0bd1aea752a81e401a3653
Author: Chuck Rolke
AuthorDate: Wed Jul 21 16:32:02 2021 -0400
DISPATCH-2166: Improve message, message_content multithread access
correctness
Many message and message_content variables have implicit thread ownership.
* Each message has only one receiver that creates the content.
* Each message content may have many senders that consume the content.
Some variables are "owned" by the message_receive thread and are never
read nor written by the message_send threads. And other variables are owned
by the message send threads.
Then more variables are shared among all the senders and receiver. This
patch addresses the shared variables.
* Unsuppress tsan qd_message_set_send_complete
* Convert many bools to sys_atomic_t; adjust access methods
* Add locking to some variable accesses
This closes #1308
---
include/qpid/dispatch/atomic.h | 7 +--
src/message.c | 111 +
src/message_private.h | 79 +++--
tests/message_test.c | 12 ++---
tests/tsan.supp| 3 --
5 files changed, 119 insertions(+), 93 deletions(-)
diff --git a/include/qpid/dispatch/atomic.h b/include/qpid/dispatch/atomic.h
index 9eb09d6..32f4967 100644
--- a/include/qpid/dispatch/atomic.h
+++ b/include/qpid/dispatch/atomic.h
@@ -205,10 +205,11 @@ static inline void sys_atomic_destroy(sys_atomic_t *ref)
#endif
-#defineSET_ATOMIC_FLAG(flag) sys_atomic_set(flag, 1)
-#defineCLEAR_ATOMIC_FLAG(flag) sys_atomic_set(flag, 0)
+#defineSET_ATOMIC_FLAG(flag)sys_atomic_set((flag), 1)
+#define CLEAR_ATOMIC_FLAG(flag)sys_atomic_set((flag), 0)
+#defineSET_ATOMIC_BOOL(flag, value) sys_atomic_set((flag), ((value) ? 1 :
0))
-#define IS_ATOMIC_FLAG_SET(flag) (sys_atomic_get(flag) == 1)
+#define IS_ATOMIC_FLAG_SET(flag) (sys_atomic_get(flag) == 1)
/** Atomic increase: NOTE returns value *before* increase, like i++ */
static inline uint32_t sys_atomic_inc(sys_atomic_t *ref) { return
sys_atomic_add((ref), 1); }
diff --git a/src/message.c b/src/message.c
index e867913..2d8d579 100644
--- a/src/message.c
+++ b/src/message.c
@@ -926,8 +926,7 @@ static void qd_message_parse_priority(qd_message_t *in_msg)
qd_message_content_t *content = MSG_CONTENT(in_msg);
qd_iterator_t*iter = qd_message_field_iterator(in_msg,
QD_FIELD_HEADER);
-content->priority_parsed = true;
-content->priority_present = false;
+SET_ATOMIC_FLAG(>priority_parsed);
if (!!iter) {
qd_parsed_field_t *field = qd_parse(iter);
@@ -936,8 +935,8 @@ static void qd_message_parse_priority(qd_message_t *in_msg)
qd_parsed_field_t *priority_field = qd_parse_sub_value(field,
1);
if (qd_parse_tag(priority_field) != QD_AMQP_NULL) {
uint32_t value = qd_parse_as_uint(priority_field);
-content->priority = value > QDR_MAX_PRIORITY ?
QDR_MAX_PRIORITY : (uint8_t) (value & 0x00ff);
-content->priority_present = true;
+value = MIN(value, QDR_MAX_PRIORITY);
+sys_atomic_set(>priority, value);
}
}
}
@@ -1022,8 +1021,15 @@ qd_message_t *qd_message()
ZERO(msg->content);
msg->content->lock = sys_mutex();
-sys_atomic_init(>content->ref_count, 1);
sys_atomic_init(>content->aborted, 0);
+sys_atomic_init(>content->discard, 0);
+sys_atomic_init(>content->ma_stream, 0);
+sys_atomic_init(>content->no_body, 0);
+sys_atomic_init(>content->oversize, 0);
+sys_atomic_init(>content->priority, QDR_DEFAULT_PRIORITY);
+sys_atomic_init(>content->priority_parsed, 0);
+sys_atomic_init(>content->receive_complete, 0);
+sys_atomic_init(>content->ref_count, 1);
msg->content->parse_depth = QD_DEPTH_NONE;
return (qd_message_t*) msg;
}
@@ -1040,6 +1046,8 @@ void qd_message_free(qd_message_t *in_msg)
qd_buffer_list_free_buffers(>ma_trace);
qd_buffer_list_free_buffers(>ma_ingress);
+sys_atomic_destroy(>send_complete);
+
qd_message_content_t *content = msg->content;
if (msg->is_fanout) {
@@ -1104,6 +1112,14 @@ void qd_message_free(qd_message_t *in_msg)
sys_mutex_free(content->lock);
sys_atomic_destroy(>aborted);
+sys_atomic_destroy(>discard);
+sys_atomic_destroy(>ma_stream);
+sys_atomic_destroy(>no_body);
+sys_atomic_destroy(>oversize);